package io.confluent.ksql.query;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.pull.PullQueryRow;
import io.confluent.ksql.execution.pull.StreamedRowTranslator;
import io.confluent.ksql.rest.entity.ConsistencyToken;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KeyValue;
import io.confluent.ksql.util.KeyValueMetadata;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.RowMetadata;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.SucceededFuture;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.OptionalInt;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:io/confluent/ksql/query/PullQueryWriteStream.class */
public class PullQueryWriteStream implements WriteStream<List<StreamedRow>>, BlockingRowQueue {
    private static final int DEFAULT_SOFT_CAPACITY = 50;
    private final OptionalInt queryLimit;
    private final StreamedRowTranslator translator;
    private final Monitor monitor = new Monitor();

    @GuardedBy("monitor")
    private final Queue<HandledRow> queue = new ArrayDeque();

    @GuardedBy("monitor")
    private int totalRowsQueued = 0;

    @GuardedBy("monitor")
    private boolean closed = false;

    @GuardedBy("monitor")
    private int queueCapacity = DEFAULT_SOFT_CAPACITY;
    private final Monitor.Guard hasData = this.monitor.newGuard(() -> {
        return !isEmpty();
    });
    private final Monitor.Guard atHalfCapacity = this.monitor.newGuard(() -> {
        return isDone() || size() <= this.queueCapacity / 2;
    });
    private final ConcurrentHashSet<Handler<Void>> drainHandler = new ConcurrentHashSet<>();
    private CompletionHandler endHandler = () -> {
    };
    private Handler<AsyncResult<Void>> limitHandler = asyncResult -> {
    };
    private Runnable queueCallback = () -> {
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/query/PullQueryWriteStream$HandledRow.class */
    public static final class HandledRow {
        private final PullQueryRow row;
        private final Handler<AsyncResult<Void>> handler;

        private HandledRow(PullQueryRow pullQueryRow, Handler<AsyncResult<Void>> handler) {
            this.row = pullQueryRow;
            this.handler = handler;
        }
    }

    public PullQueryWriteStream(OptionalInt optionalInt, StreamedRowTranslator streamedRowTranslator) {
        this.queryLimit = optionalInt;
        this.translator = streamedRowTranslator;
        this.drainHandler.add(r3 -> {
            this.monitor.enter();
            this.monitor.leave();
        });
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void drainTo(Collection<? super KeyValueMetadata<List<?>, GenericRow>> collection) {
        this.monitor.enter();
        while (!this.queue.isEmpty()) {
            try {
                collection.add(poll());
            } finally {
                this.monitor.leave();
            }
        }
    }

    public void drainRowsTo(Collection<PullQueryRow> collection) {
        this.monitor.enter();
        while (!this.queue.isEmpty()) {
            try {
                collection.add(pollRow());
            } finally {
                this.monitor.leave();
            }
        }
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public KeyValueMetadata<List<?>, GenericRow> poll(long j, TimeUnit timeUnit) throws InterruptedException {
        if (!this.monitor.enterWhen(this.hasData, j, timeUnit)) {
            return null;
        }
        try {
            KeyValueMetadata<List<?>, GenericRow> poll = poll();
            this.monitor.leave();
            return poll;
        } catch (Throwable th) {
            this.monitor.leave();
            throw th;
        }
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public KeyValueMetadata<List<?>, GenericRow> poll() {
        PullQueryRow pollRow = pollRow();
        if (pollRow == null) {
            return null;
        }
        return pollRow.getConsistencyOffsetVector().isPresent() ? new KeyValueMetadata<>(RowMetadata.of(pollRow.getConsistencyOffsetVector().get())) : new KeyValueMetadata<>(KeyValue.keyValue((Object) null, pollRow.getGenericRow()), pollRow.getSourceNode().map(ksqlHostInfoEntity -> {
            return new KsqlHostInfo(ksqlHostInfoEntity.getHost(), ksqlHostInfoEntity.getPort());
        }).map(RowMetadata::of));
    }

    public PullQueryRow pollRow(long j, TimeUnit timeUnit) throws InterruptedException {
        if (!this.monitor.enterWhen(this.hasData, j, timeUnit)) {
            return null;
        }
        try {
            PullQueryRow pollRow = pollRow();
            this.monitor.leave();
            return pollRow;
        } catch (Throwable th) {
            this.monitor.leave();
            throw th;
        }
    }

    private PullQueryRow pollRow() {
        this.monitor.enter();
        try {
            HandledRow poll = this.queue.poll();
            if (poll == null) {
                return null;
            }
            poll.handler.handle(new SucceededFuture((ContextInternal) null, (Object) null));
            if (this.monitor.enterIf(this.atHalfCapacity)) {
                try {
                    this.drainHandler.forEach(handler -> {
                        handler.handle((Object) null);
                    });
                } finally {
                }
            }
            return poll.row;
        } finally {
        }
    }

    public int getTotalRowsQueued() {
        this.monitor.enter();
        try {
            return this.totalRowsQueued;
        } finally {
            this.monitor.leave();
        }
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public int size() {
        this.monitor.enter();
        try {
            return this.queue.size();
        } finally {
            this.monitor.leave();
        }
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public boolean isEmpty() {
        return size() == 0;
    }

    public boolean isDone() {
        boolean z;
        this.monitor.enter();
        try {
            if (!this.closed) {
                if (!hardLimitHit()) {
                    z = false;
                    return z;
                }
            }
            z = true;
            return z;
        } finally {
            this.monitor.leave();
        }
    }

    private boolean hardLimitHit() {
        boolean z;
        this.monitor.enter();
        try {
            if (this.queryLimit.isPresent()) {
                if (this.totalRowsQueued >= this.queryLimit.getAsInt()) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.monitor.leave();
        }
    }

    public PullQueryWriteStream exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public PullQueryWriteStream drainHandler(Handler<Void> handler) {
        this.drainHandler.add(handler);
        return this;
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void setCompletionHandler(CompletionHandler completionHandler) {
        this.endHandler = completionHandler;
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void setLimitHandler(LimitHandler limitHandler) {
        this.limitHandler = asyncResult -> {
            limitHandler.limitReached();
        };
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void setQueuedCallback(Runnable runnable) {
        Runnable runnable2 = this.queueCallback;
        this.queueCallback = () -> {
            runnable2.run();
            runnable.run();
        };
    }

    public void putConsistencyVector(ConsistencyOffsetVector consistencyOffsetVector) {
        write((List<StreamedRow>) ImmutableList.of(StreamedRow.consistencyToken(new ConsistencyToken(consistencyOffsetVector.serialize()))));
    }

    public Future<Void> write(List<StreamedRow> list) {
        Promise promise = Promise.promise();
        write(list, (Handler<AsyncResult<Void>>) promise);
        return promise.future();
    }

    /* JADX WARN: Code restructure failed: missing block: B:17:0x0069, code lost:
    
        end(r7.limitHandler);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void write(java.util.List<io.confluent.ksql.rest.entity.StreamedRow> r8, io.vertx.core.Handler<io.vertx.core.AsyncResult<java.lang.Void>> r9) {
        /*
            r7 = this;
            r0 = r7
            com.google.common.util.concurrent.Monitor r0 = r0.monitor
            r0.enter()
            r0 = r7
            boolean r0 = r0.isDone()     // Catch: java.lang.Throwable -> L81
            if (r0 == 0) goto L16
            r0 = r7
            com.google.common.util.concurrent.Monitor r0 = r0.monitor
            r0.leave()
            return
        L16:
            r0 = r7
            io.confluent.ksql.execution.pull.StreamedRowTranslator r0 = r0.translator     // Catch: java.lang.Throwable -> L81
            r1 = r8
            java.util.List r0 = r0.apply(r1)     // Catch: java.lang.Throwable -> L81
            java.util.Iterator r0 = r0.iterator()     // Catch: java.lang.Throwable -> L81
            r10 = r0
        L24:
            r0 = r10
            boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> L81
            if (r0 == 0) goto L77
            r0 = r10
            java.lang.Object r0 = r0.next()     // Catch: java.lang.Throwable -> L81
            io.confluent.ksql.execution.pull.PullQueryRow r0 = (io.confluent.ksql.execution.pull.PullQueryRow) r0     // Catch: java.lang.Throwable -> L81
            r11 = r0
            r0 = r7
            java.util.Queue<io.confluent.ksql.query.PullQueryWriteStream$HandledRow> r0 = r0.queue     // Catch: java.lang.Throwable -> L81
            io.confluent.ksql.query.PullQueryWriteStream$HandledRow r1 = new io.confluent.ksql.query.PullQueryWriteStream$HandledRow     // Catch: java.lang.Throwable -> L81
            r2 = r1
            r3 = r11
            r4 = r9
            r5 = 0
            r2.<init>(r3, r4)     // Catch: java.lang.Throwable -> L81
            boolean r0 = r0.offer(r1)     // Catch: java.lang.Throwable -> L81
            if (r0 == 0) goto L74
            r0 = r7
            r1 = r0
            int r1 = r1.totalRowsQueued     // Catch: java.lang.Throwable -> L81
            r2 = 1
            int r1 = r1 + r2
            r0.totalRowsQueued = r1     // Catch: java.lang.Throwable -> L81
            r0 = r7
            java.lang.Runnable r0 = r0.queueCallback     // Catch: java.lang.Throwable -> L81
            r0.run()     // Catch: java.lang.Throwable -> L81
            r0 = r7
            boolean r0 = r0.hardLimitHit()     // Catch: java.lang.Throwable -> L81
            if (r0 == 0) goto L74
            r0 = r7
            r1 = r7
            io.vertx.core.Handler<io.vertx.core.AsyncResult<java.lang.Void>> r1 = r1.limitHandler     // Catch: java.lang.Throwable -> L81
            r0.end(r1)     // Catch: java.lang.Throwable -> L81
            goto L77
        L74:
            goto L24
        L77:
            r0 = r7
            com.google.common.util.concurrent.Monitor r0 = r0.monitor
            r0.leave()
            goto L8d
        L81:
            r12 = move-exception
            r0 = r7
            com.google.common.util.concurrent.Monitor r0 = r0.monitor
            r0.leave()
            r0 = r12
            throw r0
        L8d:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.confluent.ksql.query.PullQueryWriteStream.write(java.util.List, io.vertx.core.Handler):void");
    }

    @Override // io.confluent.ksql.query.BlockingRowQueue
    public void close() {
        end();
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        this.monitor.enter();
        try {
            this.closed = true;
            this.endHandler.complete();
            handler.handle(new SucceededFuture((ContextInternal) null, (Object) null));
        } finally {
            this.monitor.leave();
        }
    }

    /* renamed from: setWriteQueueMaxSize, reason: merged with bridge method [inline-methods] */
    public PullQueryWriteStream m253setWriteQueueMaxSize(int i) {
        this.monitor.enter();
        try {
            this.queueCapacity = i;
            return this;
        } finally {
            this.monitor.leave();
        }
    }

    public boolean writeQueueFull() {
        boolean z;
        this.monitor.enter();
        try {
            if (!isDone()) {
                if (this.queue.size() < this.queueCapacity) {
                    z = false;
                    return z;
                }
            }
            z = true;
            return z;
        } finally {
            this.monitor.leave();
        }
    }

    public boolean awaitCapacity(long j, TimeUnit timeUnit) throws InterruptedException {
        if (!writeQueueFull()) {
            return true;
        }
        if (!this.monitor.enterWhen(this.atHalfCapacity, j, timeUnit)) {
            return false;
        }
        try {
            return !writeQueueFull();
        } finally {
            this.monitor.leave();
        }
    }

    /* renamed from: drainHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m252drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    public /* bridge */ /* synthetic */ void write(Object obj, Handler handler) {
        write((List<StreamedRow>) obj, (Handler<AsyncResult<Void>>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m254exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m255exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
