package io.confluent.ksql.api.server;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.vertx.core.Context;
import io.vertx.core.http.HttpServerResponse;
import java.util.Objects;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/QuerySubscriber.class */
public class QuerySubscriber extends BaseSubscriber<GenericRow> {
    private static final Logger log = LoggerFactory.getLogger(QuerySubscriber.class);
    private static final int REQUEST_BATCH_SIZE = 200;
    private final HttpServerResponse response;
    private final QueryStreamResponseWriter queryStreamResponseWriter;
    private int tokens;

    public QuerySubscriber(Context context, HttpServerResponse httpServerResponse, QueryStreamResponseWriter queryStreamResponseWriter) {
        super(context);
        this.response = (HttpServerResponse) Objects.requireNonNull(httpServerResponse);
        this.queryStreamResponseWriter = (QueryStreamResponseWriter) Objects.requireNonNull(queryStreamResponseWriter);
    }

    protected void afterSubscribe(Subscription subscription) {
        checkMakeRequest();
    }

    public void handleValue(GenericRow genericRow) {
        this.queryStreamResponseWriter.writeRow(genericRow);
        this.tokens--;
        if (this.response.writeQueueFull()) {
            this.response.drainHandler(r3 -> {
                checkMakeRequest();
            });
        } else {
            checkMakeRequest();
        }
    }

    private void checkMakeRequest() {
        if (this.tokens == 0) {
            this.tokens = 200;
            makeRequest(200L);
        }
    }

    public void handleError(Throwable th) {
        log.error("Error in processing query", th);
        this.queryStreamResponseWriter.writeError(new KsqlErrorMessage(Errors.ERROR_CODE_SERVER_ERROR, "Error in processing query. Check server logs for details.")).end();
    }

    public void handleComplete() {
        this.queryStreamResponseWriter.end();
    }
}
