package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.class */
public class StreamQueryResponseHandler extends QueryResponseHandler<CompletableFuture<StreamedQueryResult>> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamQueryResponseHandler.class);
    private StreamedQueryResultImpl queryResult;
    private Map<String, Integer> columnNameToIndex;
    private boolean paused;
    private AtomicReference<String> serializedConsistencyVector;
    private AtomicReference<String> continuationToken;
    private String sql;
    private Map<String, Object> properties;
    private ClientImpl client;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamQueryResponseHandler(Context context, RecordParser recordParser, CompletableFuture<StreamedQueryResult> completableFuture, AtomicReference<String> atomicReference, AtomicReference<String> atomicReference2, String str, Map<String, Object> map, ClientImpl clientImpl) {
        super(context, recordParser, completableFuture);
        this.serializedConsistencyVector = (AtomicReference) Objects.requireNonNull(atomicReference, "serializedCV");
        this.continuationToken = (AtomicReference) Objects.requireNonNull(atomicReference2, "continuationToken");
        this.sql = (String) Objects.requireNonNull(str, "sql");
        this.properties = (Map) Objects.requireNonNull(map, "properties");
        this.client = (ClientImpl) Objects.requireNonNull(clientImpl, "client");
    }

    @Override // io.confluent.ksql.api.client.impl.QueryResponseHandler
    protected void handleMetadata(QueryResponseMetadata queryResponseMetadata) {
        this.queryResult = new StreamedQueryResultImpl(this.context, queryResponseMetadata.queryId, queryResponseMetadata.columnNames, RowUtil.columnTypesFromStrings(queryResponseMetadata.columnTypes), this.continuationToken, this.sql, this.properties, this.client);
        this.columnNameToIndex = RowUtil.valueToIndexMap(queryResponseMetadata.columnNames);
        this.cf.complete(this.queryResult);
    }

    @Override // io.confluent.ksql.api.client.impl.QueryResponseHandler
    protected void handleRow(Buffer buffer) {
        if (this.queryResult == null) {
            throw new IllegalStateException("handleRow called before metadata processed");
        }
        Object json = buffer.toJson();
        if (json instanceof JsonArray) {
            if (!this.queryResult.accept(new RowImpl(this.queryResult.columnNames(), this.queryResult.columnTypes(), (JsonArray) json, this.columnNameToIndex)) || this.paused) {
                return;
            }
            this.recordParser.pause();
            this.queryResult.drainHandler(this::publisherReceptive);
            this.paused = true;
            return;
        }
        if (!(json instanceof JsonObject)) {
            throw new RuntimeException("Could not decode JSON: " + json);
        }
        JsonObject jsonObject = (JsonObject) json;
        if (jsonObject.getMap() != null && jsonObject.getMap().containsKey("consistencyToken")) {
            LOG.info("Response contains consistency vector " + jsonObject);
            this.serializedConsistencyVector.set((String) ((JsonObject) json).getMap().get("consistencyToken"));
        }
        if (jsonObject.getMap() != null && jsonObject.getMap().containsKey("continuationToken")) {
            LOG.info("Response contains continuation token " + jsonObject);
            this.continuationToken.set((String) ((JsonObject) json).getMap().get("continuationToken"));
        }
        if (jsonObject.getMap() == null || !jsonObject.getMap().containsKey("consistencyToken")) {
            if (jsonObject.getMap() == null || !jsonObject.getMap().containsKey("continuationToken")) {
                this.queryResult.handleError(new KsqlException(jsonObject.getString("message")));
            }
        }
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    protected void doHandleBodyEnd() {
        this.queryResult.complete();
    }

    @Override // io.confluent.ksql.api.client.impl.QueryResponseHandler
    public void handleExceptionAfterFutureCompleted(Throwable th) {
        this.queryResult.handleError(new Exception(th));
    }

    private void publisherReceptive() {
        checkContext();
        this.paused = false;
        this.recordParser.resume();
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    public /* bridge */ /* synthetic */ void handleBodyEnd(Void r4) {
        super.handleBodyEnd(r4);
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    public /* bridge */ /* synthetic */ void handleException(Throwable th) {
        super.handleException(th);
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    public /* bridge */ /* synthetic */ void handleBodyBuffer(Buffer buffer) {
        super.handleBodyBuffer(buffer);
    }
}
