package io.confluent.ksql.api.impl;

import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.PullQueryWriteStream;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.reactive.BasePublisher;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KeyValueMetadata;
import io.confluent.ksql.util.PushQueryMetadata;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.WorkerExecutor;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/api/impl/BlockingQueryPublisher.class */
public class BlockingQueryPublisher extends BasePublisher<KeyValueMetadata<List<?>, GenericRow>> implements QueryPublisher {
    private static final Logger log = LogManager.getLogger(BlockingQueryPublisher.class);
    public static final int SEND_MAX_BATCH_SIZE = 200;
    private final WorkerExecutor workerExecutor;
    private BlockingRowQueue queue;
    private boolean isPullQuery;
    private boolean isScalablePushQuery;
    private QueryHandle queryHandle;
    private ImmutableList<String> columnNames;
    private ImmutableList<String> columnTypes;
    private LogicalSchema logicalSchema;
    private QueryId queryId;
    private boolean complete;
    private boolean hitLimit;
    private volatile boolean closed;
    private volatile boolean started;

    public BlockingQueryPublisher(Context context, WorkerExecutor workerExecutor) {
        super(context);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(workerExecutor);
    }

    public void setQueryHandle(QueryHandle queryHandle, boolean z, boolean z2) {
        this.columnNames = ImmutableList.copyOf(queryHandle.getColumnNames());
        this.columnTypes = ImmutableList.copyOf(queryHandle.getColumnTypes());
        this.logicalSchema = queryHandle.getLogicalSchema();
        this.queue = queryHandle.getQueue();
        this.isPullQuery = z;
        this.isScalablePushQuery = z2;
        this.queryId = queryHandle.getQueryId();
        this.queue.setQueuedCallback(this::maybeSend);
        this.queue.setLimitHandler(() -> {
            if (z) {
                Optional<ConsistencyOffsetVector> consistencyOffsetVector = queryHandle.getConsistencyOffsetVector();
                PullQueryWriteStream pullQueryWriteStream = this.queue;
                Objects.requireNonNull(pullQueryWriteStream);
                consistencyOffsetVector.ifPresent(pullQueryWriteStream::putConsistencyVector);
                maybeSend();
            }
            this.complete = true;
            this.hitLimit = true;
            if (this.queue.isEmpty()) {
                this.ctx.runOnContext(r3 -> {
                    sendComplete();
                });
            }
        });
        this.queue.setCompletionHandler(() -> {
            if (z) {
                Optional<ConsistencyOffsetVector> consistencyOffsetVector = queryHandle.getConsistencyOffsetVector();
                PullQueryWriteStream pullQueryWriteStream = this.queue;
                Objects.requireNonNull(pullQueryWriteStream);
                consistencyOffsetVector.ifPresent(pullQueryWriteStream::putConsistencyVector);
                maybeSend();
            }
            this.complete = true;
            if (this.queue.isEmpty()) {
                this.ctx.runOnContext(r3 -> {
                    sendComplete();
                });
            }
        });
        this.queryHandle = queryHandle;
        queryHandle.onException(th -> {
            this.ctx.runOnContext(r5 -> {
                sendError(th);
            });
        });
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "columnNames is ImmutableList")
    public List<String> getColumnNames() {
        return this.columnNames;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "columnTypes is ImmutableList")
    public List<String> getColumnTypes() {
        return this.columnTypes;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public LogicalSchema geLogicalSchema() {
        return this.logicalSchema;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public Future<Void> close() {
        if (this.closed) {
            return Future.succeededFuture();
        }
        this.closed = true;
        QueryHandle queryHandle = this.queryHandle;
        Objects.requireNonNull(queryHandle);
        executeOnWorker(queryHandle::stop);
        return super.close();
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public boolean isPullQuery() {
        return this.isPullQuery;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public boolean isScalablePushQuery() {
        return this.isScalablePushQuery;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public QueryId queryId() {
        return this.queryId;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public boolean hitLimit() {
        return this.hitLimit;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public Optional<PushQueryMetadata.ResultType> getResultType() {
        return this.queryHandle.getResultType();
    }

    protected void maybeSend() {
        this.ctx.runOnContext(r3 -> {
            doSend();
        });
    }

    protected void afterSubscribe() {
        if (this.started) {
            return;
        }
        this.started = true;
        QueryHandle queryHandle = this.queryHandle;
        Objects.requireNonNull(queryHandle);
        executeOnWorker(queryHandle::start);
    }

    public void startFromWorkerThread() {
        VertxUtils.checkIsWorker();
        this.started = true;
        this.queryHandle.start();
    }

    private void executeOnWorker(Runnable runnable) {
        this.workerExecutor.executeBlocking(promise -> {
            runnable.run();
        }, false, asyncResult -> {
            if (asyncResult.failed()) {
                log.error("Failed to close query", asyncResult.cause());
            }
        });
    }

    @SuppressFBWarnings(value = {"IS2_INCONSISTENT_SYNC"}, justification = "Vert.x ensures this is executed on event loop only")
    private void doSend() {
        checkContext();
        int i = 0;
        while (getDemand() > 0 && !this.queue.isEmpty()) {
            if (i >= 200) {
                this.ctx.runOnContext(r3 -> {
                    doSend();
                });
                return;
            }
            doOnNext(this.queue.poll());
            if (this.complete && this.queue.isEmpty()) {
                this.ctx.runOnContext(r32 -> {
                    sendComplete();
                });
            }
            i++;
        }
    }
}
