package io.confluent.ksql.api.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/BlockingQueryPublisher.class */
public class BlockingQueryPublisher extends BasePublisher<GenericRow> implements QueryPublisher, Consumer<GenericRow> {
    private static final Logger log = LoggerFactory.getLogger(BlockingQueryPublisher.class);
    public static final int SEND_MAX_BATCH_SIZE = 10;
    public static final int BLOCKING_QUEUE_CAPACITY = 1000;
    private final BlockingQueue<GenericRow> queue;
    private final WorkerExecutor workerExecutor;
    private PushQueryHandler queryHandle;
    private List<String> columnNames;
    private List<String> columnTypes;
    private OptionalInt limit;
    private int numAccepted;
    private boolean complete;
    private volatile boolean closed;

    public BlockingQueryPublisher(Context context, WorkerExecutor workerExecutor) {
        super(context);
        this.queue = new LinkedBlockingQueue(BLOCKING_QUEUE_CAPACITY);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(workerExecutor);
    }

    public void setQueryHandle(PushQueryHandler pushQueryHandler) {
        this.queryHandle = (PushQueryHandler) Objects.requireNonNull(pushQueryHandler);
        this.limit = pushQueryHandler.getLimit();
        this.columnNames = pushQueryHandler.getColumnNames();
        this.columnTypes = pushQueryHandler.getColumnTypes();
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public List<String> getColumnNames() {
        return this.columnNames;
    }

    @Override // io.confluent.ksql.api.spi.QueryPublisher
    public List<String> getColumnTypes() {
        return this.columnTypes;
    }

    @Override // io.confluent.ksql.api.server.BasePublisher, io.confluent.ksql.api.spi.QueryPublisher
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.workerExecutor.executeBlocking(promise -> {
            this.queryHandle.stop();
        }, asyncResult -> {
            if (asyncResult.failed()) {
                log.error("Failed to close query", asyncResult.cause());
            }
        });
        super.close();
    }

    @Override // java.util.function.Consumer
    public synchronized void accept(GenericRow genericRow) {
        Objects.requireNonNull(genericRow);
        if (this.closed || this.complete || !hasReachedLimit()) {
            return;
        }
        while (!this.closed) {
            try {
                if (this.queue.offer(genericRow, 250L, TimeUnit.MILLISECONDS)) {
                    this.numAccepted++;
                    maybeSend();
                    return;
                }
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    public int queueSize() {
        return this.queue.size();
    }

    @Override // io.confluent.ksql.api.server.BasePublisher
    protected void maybeSend() {
        this.ctx.runOnContext(r3 -> {
            doSend();
        });
    }

    @Override // io.confluent.ksql.api.server.BasePublisher
    protected void afterSubscribe() {
        this.queryHandle.start();
    }

    private boolean hasReachedLimit() {
        if (!this.limit.isPresent()) {
            return true;
        }
        int asInt = this.limit.getAsInt();
        if (this.numAccepted == asInt) {
            return false;
        }
        if (this.numAccepted != asInt - 1) {
            return true;
        }
        this.complete = true;
        return true;
    }

    @SuppressFBWarnings(value = {"IS2_INCONSISTENT_SYNC"}, justification = "Vert.x ensures this is executed on event loop only")
    private void doSend() {
        checkContext();
        int i = 0;
        while (getDemand() > 0 && !this.queue.isEmpty()) {
            if (i >= 10) {
                this.ctx.runOnContext(r3 -> {
                    doSend();
                });
                return;
            }
            doOnNext(this.queue.poll());
            if (this.complete && this.queue.isEmpty()) {
                this.ctx.runOnContext(r32 -> {
                    sendComplete();
                });
            }
            i++;
        }
    }
}
