package io.confluent.ksql.query;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.physical.pull.PullQueryRow;
import io.confluent.ksql.util.KeyValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/query/PullQueryQueue.class */
public class PullQueryQueue implements BlockingRowQueue {
    private static final Logger LOG = LoggerFactory.getLogger(PullQueryQueue.class);
    private static final int BLOCKING_QUEUE_CAPACITY = 50;
    private static final long DEFAULT_OFFER_TIMEOUT_MS = 100;
    private final BlockingQueue<PullQueryRow> rowQueue;
    private final long offerTimeoutMs;
    private AtomicBoolean closed;
    private AtomicLong totalRowsQueued;
    private LimitHandler limitHandler;
    private Runnable queuedCallback;

    public PullQueryQueue() {
        this(BLOCKING_QUEUE_CAPACITY, DEFAULT_OFFER_TIMEOUT_MS);
    }

    public PullQueryQueue(int i, long j) {
        this.closed = new AtomicBoolean(false);
        this.totalRowsQueued = new AtomicLong(0L);
        this.queuedCallback = () -> {
        };
        this.limitHandler = () -> {
        };
        this.rowQueue = new ArrayBlockingQueue(i);
        this.offerTimeoutMs = j;
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void setLimitHandler(LimitHandler limitHandler) {
        this.limitHandler = limitHandler;
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void setQueuedCallback(Runnable runnable) {
        Runnable runnable2 = this.queuedCallback;
        this.queuedCallback = () -> {
            runnable2.run();
            runnable.run();
        };
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public KeyValue<List<?>, GenericRow> poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return pullQueryRowToKeyValue(this.rowQueue.poll(j, timeUnit));
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public KeyValue<List<?>, GenericRow> poll() {
        return pullQueryRowToKeyValue(this.rowQueue.poll());
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void drainTo(Collection<? super KeyValue<List<?>, GenericRow>> collection) {
        ArrayList arrayList = new ArrayList();
        drainRowsTo(arrayList);
        Stream map = arrayList.stream().map(PullQueryQueue::pullQueryRowToKeyValue);
        collection.getClass();
        map.forEach((v1) -> {
            r1.add(v1);
        });
    }

    public PullQueryRow pollRow(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.rowQueue.poll(j, timeUnit);
    }

    public void drainRowsTo(Collection<PullQueryRow> collection) {
        this.rowQueue.drainTo(collection);
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public int size() {
        return this.rowQueue.size();
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public boolean isEmpty() {
        return this.rowQueue.isEmpty();
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.limitHandler.limitReached();
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public boolean acceptRows(List<PullQueryRow> list) {
        if (list == null) {
            return false;
        }
        Iterator<PullQueryRow> it = list.iterator();
        while (it.hasNext()) {
            if (!acceptRow(it.next())) {
                return false;
            }
        }
        return true;
    }

    private static KeyValue<List<?>, GenericRow> pullQueryRowToKeyValue(PullQueryRow pullQueryRow) {
        if (pullQueryRow == null) {
            return null;
        }
        return KeyValue.keyValue((Object) null, pullQueryRow.getGenericRow());
    }

    public boolean acceptRow(PullQueryRow pullQueryRow) {
        if (pullQueryRow == null) {
            return false;
        }
        do {
            try {
                if (this.closed.get()) {
                    return false;
                }
            } catch (InterruptedException e) {
                LOG.error("Interrupted while trying to offer row to queue", e);
                Thread.currentThread().interrupt();
                return false;
            }
        } while (!this.rowQueue.offer(pullQueryRow, this.offerTimeoutMs, TimeUnit.MILLISECONDS));
        this.totalRowsQueued.incrementAndGet();
        this.queuedCallback.run();
        return true;
    }

    public void putSentinelRow(PullQueryRow pullQueryRow) {
        try {
            this.rowQueue.put(pullQueryRow);
        } catch (InterruptedException e) {
            LOG.error("Interrupted while trying to put row into queue", e);
            Thread.currentThread().interrupt();
        }
    }

    public long getTotalRowsQueued() {
        return this.totalRowsQueued.get();
    }
}
