package io.confluent.ksql.api.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.ks.NotUpToBoundException;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ConsistencyToken;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.PushContinuationToken;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KeyValueMetadata;
import io.confluent.ksql.util.PushOffsetRange;
import io.confluent.ksql.util.RowMetadata;
import io.vertx.core.Context;
import io.vertx.core.http.HttpServerResponse;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/QuerySubscriber.class */
public class QuerySubscriber extends BaseSubscriber<KeyValueMetadata<List<?>, GenericRow>> {
    private static final Logger log = LoggerFactory.getLogger(QuerySubscriber.class);
    private static final int REQUEST_BATCH_SIZE = 200;
    private final HttpServerResponse response;
    private final QueryStreamResponseWriter queryStreamResponseWriter;
    private final Supplier<Boolean> hitLimit;
    private int tokens;

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public QuerySubscriber(Context context, HttpServerResponse httpServerResponse, QueryStreamResponseWriter queryStreamResponseWriter, Supplier<Boolean> supplier) {
        super(context);
        this.response = (HttpServerResponse) Objects.requireNonNull(httpServerResponse);
        this.queryStreamResponseWriter = (QueryStreamResponseWriter) Objects.requireNonNull(queryStreamResponseWriter);
        this.hitLimit = supplier;
    }

    protected void afterSubscribe(Subscription subscription) {
        checkMakeRequest();
    }

    public void handleValue(KeyValueMetadata<List<?>, GenericRow> keyValueMetadata) {
        if (!keyValueMetadata.getRowMetadata().isPresent() || !((RowMetadata) keyValueMetadata.getRowMetadata().get()).isStandaloneRow()) {
            this.queryStreamResponseWriter.writeRow(keyValueMetadata);
        } else if (((RowMetadata) keyValueMetadata.getRowMetadata().get()).getPushOffsetsRange().isPresent()) {
            this.queryStreamResponseWriter.writeContinuationToken(new PushContinuationToken(((PushOffsetRange) ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getPushOffsetsRange().get()).serialize()));
        } else if (((RowMetadata) keyValueMetadata.getRowMetadata().get()).getConsistencyOffsetVector().isPresent()) {
            this.queryStreamResponseWriter.writeConsistencyToken(new ConsistencyToken(((ConsistencyOffsetVector) ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getConsistencyOffsetVector().get()).serialize()));
        }
        this.tokens--;
        if (this.response.writeQueueFull()) {
            this.response.drainHandler(r3 -> {
                checkMakeRequest();
            });
        } else {
            checkMakeRequest();
        }
    }

    private void checkMakeRequest() {
        if (this.tokens == 0) {
            this.tokens = 200;
            makeRequest(200L);
        }
    }

    public void handleError(Throwable th) {
        StringBuilder sb = new StringBuilder();
        sb.append(th);
        for (Throwable th2 : th.getSuppressed()) {
            if (th2 instanceof NotUpToBoundException) {
                sb.append(" Failed to get value from materialized table, reason: NOT_UP_TO_BOUND");
            } else {
                sb.append(th2.getMessage());
            }
        }
        KsqlErrorMessage ksqlErrorMessage = new KsqlErrorMessage(Errors.ERROR_CODE_SERVER_ERROR, sb.toString());
        log.error("Error in processing query {}", sb, th);
        this.queryStreamResponseWriter.writeError(ksqlErrorMessage).end();
    }

    public void handleComplete() {
        if (this.hitLimit.get().booleanValue()) {
            this.queryStreamResponseWriter.writeLimitMessage();
        } else {
            this.queryStreamResponseWriter.writeCompletionMessage();
        }
        this.queryStreamResponseWriter.end();
    }
}
