package io.confluent.ksql.api.server;

import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.InsertAck;
import io.confluent.ksql.rest.entity.InsertError;
import io.vertx.core.Context;
import io.vertx.core.http.HttpServerResponse;
import java.util.Objects;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/AcksSubscriber.class */
public class AcksSubscriber extends BaseSubscriber<InsertResult> {
    private static final Logger log = LoggerFactory.getLogger(AcksSubscriber.class);
    private static final int REQUEST_BATCH_SIZE = 200;
    private final HttpServerResponse response;
    private final InsertsStreamResponseWriter insertsStreamResponseWriter;
    private Long insertsSent;
    private long acksSent;
    private boolean drainHandlerSet;
    private boolean responseEnded;

    public AcksSubscriber(Context context, HttpServerResponse httpServerResponse, InsertsStreamResponseWriter insertsStreamResponseWriter) {
        super(context);
        this.response = (HttpServerResponse) Objects.requireNonNull(httpServerResponse);
        this.insertsStreamResponseWriter = (InsertsStreamResponseWriter) Objects.requireNonNull(insertsStreamResponseWriter);
    }

    protected void afterSubscribe(Subscription subscription) {
        makeRequest(200L);
    }

    public void handleValue(InsertResult insertResult) {
        checkContext();
        if (this.responseEnded) {
            return;
        }
        if (insertResult.succeeded()) {
            handleSuccessfulInsert(insertResult);
        } else {
            handleFailedInsert(insertResult);
        }
    }

    public void handleComplete() {
        this.insertsStreamResponseWriter.end();
    }

    public void handleError(Throwable th) {
        log.error("Error in processing inserts", th);
    }

    private void handleSuccessfulInsert(InsertResult insertResult) {
        this.insertsStreamResponseWriter.writeInsertResponse(new InsertAck(insertResult.sequenceNumber()));
        this.acksSent++;
        if (this.insertsSent != null && this.insertsSent.longValue() == this.acksSent) {
            close();
            return;
        }
        if (!this.response.writeQueueFull()) {
            checkMakeRequest();
        } else {
            if (this.drainHandlerSet) {
                return;
            }
            this.response.drainHandler(r4 -> {
                this.drainHandlerSet = false;
                checkMakeRequest();
            });
            this.drainHandlerSet = true;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleFailedInsert(InsertResult insertResult) {
        log.error("Error in processing inserts", insertResult.exception());
        Exception exception = insertResult.exception();
        this.insertsStreamResponseWriter.writeError(exception instanceof KsqlApiException ? new InsertError(insertResult.sequenceNumber(), ((KsqlApiException) exception).getErrorCode(), exception.getMessage()) : new InsertError(insertResult.sequenceNumber(), Errors.ERROR_CODE_SERVER_ERROR, "Error in processing inserts. Check server logs for details.")).end();
        this.responseEnded = true;
    }

    private void checkMakeRequest() {
        if (this.acksSent % 200 == 0) {
            makeRequest(200L);
        }
    }

    private void close() {
        this.insertsStreamResponseWriter.end();
        complete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void insertsSent(long j) {
        this.insertsSent = Long.valueOf(j);
        if (this.acksSent == j) {
            close();
        }
    }
}
