package io.confluent.ksql.api.server;

import io.confluent.ksql.api.server.protocol.ErrorResponse;
import io.vertx.core.Context;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/AcksSubscriber.class */
public class AcksSubscriber extends ReactiveSubscriber<JsonObject> {
    private static final Logger log = LoggerFactory.getLogger(AcksSubscriber.class);
    private static final int REQUEST_BATCH_SIZE = 1000;
    private final HttpServerResponse response;
    private final InsertsStreamResponseWriter insertsStreamResponseWriter;
    private Long insertsSent;
    private long acksSent;
    private boolean drainHandlerSet;

    public AcksSubscriber(Context context, HttpServerResponse httpServerResponse, InsertsStreamResponseWriter insertsStreamResponseWriter) {
        super(context);
        this.response = (HttpServerResponse) Objects.requireNonNull(httpServerResponse);
        this.insertsStreamResponseWriter = (InsertsStreamResponseWriter) Objects.requireNonNull(insertsStreamResponseWriter);
    }

    @Override // io.confluent.ksql.api.server.ReactiveSubscriber
    protected void afterSubscribe(Subscription subscription) {
        makeRequest(1000L);
    }

    @Override // io.confluent.ksql.api.server.ReactiveSubscriber
    public void handleValue(JsonObject jsonObject) {
        checkContext();
        this.insertsStreamResponseWriter.writeInsertResponse();
        this.acksSent++;
        if (this.insertsSent != null && this.insertsSent.longValue() == this.acksSent) {
            close();
            return;
        }
        if (!this.response.writeQueueFull()) {
            checkMakeRequest();
        } else {
            if (this.drainHandlerSet) {
                return;
            }
            this.response.drainHandler(r4 -> {
                this.drainHandlerSet = false;
                checkMakeRequest();
            });
            this.drainHandlerSet = true;
        }
    }

    @Override // io.confluent.ksql.api.server.ReactiveSubscriber
    public void handleComplete() {
        this.insertsStreamResponseWriter.end();
    }

    @Override // io.confluent.ksql.api.server.ReactiveSubscriber
    public void handleError(Throwable th) {
        log.error("Error in processing inserts", th);
        this.insertsStreamResponseWriter.writeError(new ErrorResponse(6, "Error in processing inserts")).end();
    }

    private void checkMakeRequest() {
        if (this.acksSent % 1000 == 0) {
            makeRequest(1000L);
        }
    }

    private void close() {
        this.insertsStreamResponseWriter.end();
        complete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void insertsSent(long j) {
        this.insertsSent = Long.valueOf(j);
        if (this.acksSent == j) {
            close();
        }
    }
}
