package io.confluent.ksql.api.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.reactive.BasePublisher;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/impl/BlockingQueryPublisher.class */
public class BlockingQueryPublisher extends BasePublisher<GenericRow> implements QueryPublisher {
    private static final Logger log = LoggerFactory.getLogger(BlockingQueryPublisher.class);
    public static final int SEND_MAX_BATCH_SIZE = 200;
    private final WorkerExecutor workerExecutor;
    private BlockingRowQueue queue;
    private PushQueryHandle queryHandle;
    private List<String> columnNames;
    private List<String> columnTypes;
    private boolean complete;
    private volatile boolean closed;

    public BlockingQueryPublisher(Context context, WorkerExecutor workerExecutor) {
        super(context);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(workerExecutor);
    }

    public void setQueryHandle(PushQueryHandle pushQueryHandle) {
        this.columnNames = pushQueryHandle.getColumnNames();
        this.columnTypes = pushQueryHandle.getColumnTypes();
        this.queue = pushQueryHandle.getQueue();
        this.queue.setQueuedCallback(this::maybeSend);
        this.queue.setLimitHandler(() -> {
            this.complete = true;
        });
        this.queryHandle = pushQueryHandle;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public List<String> getColumnNames() {
        return this.columnNames;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public List<String> getColumnTypes() {
        return this.columnTypes;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        PushQueryHandle pushQueryHandle = this.queryHandle;
        pushQueryHandle.getClass();
        executeOnWorker(pushQueryHandle::stop);
        super.close();
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public boolean isPullQuery() {
        return false;
    }

    protected void maybeSend() {
        this.ctx.runOnContext(r3 -> {
            doSend();
        });
    }

    protected void afterSubscribe() {
        PushQueryHandle pushQueryHandle = this.queryHandle;
        pushQueryHandle.getClass();
        executeOnWorker(pushQueryHandle::start);
    }

    private void executeOnWorker(Runnable runnable) {
        this.workerExecutor.executeBlocking(promise -> {
            runnable.run();
        }, false, asyncResult -> {
            if (asyncResult.failed()) {
                log.error("Failed to close query", asyncResult.cause());
            }
        });
    }

    @SuppressFBWarnings(value = {"IS2_INCONSISTENT_SYNC"}, justification = "Vert.x ensures this is executed on event loop only")
    private void doSend() {
        checkContext();
        int i = 0;
        while (getDemand() > 0 && !this.queue.isEmpty()) {
            if (i >= 200) {
                this.ctx.runOnContext(r3 -> {
                    doSend();
                });
                return;
            }
            doOnNext(this.queue.poll());
            if (this.complete && this.queue.isEmpty()) {
                this.ctx.runOnContext(r32 -> {
                    sendComplete();
                });
            }
            i++;
        }
    }
}
