package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.execution.pull.PullQueryResult;
import io.confluent.ksql.query.PullQueryWriteStream;
import io.confluent.ksql.rest.entity.ConsistencyToken;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.streaming.Flow;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.RowMetadata;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.class */
class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
    private static final Logger LOG = LoggerFactory.getLogger(PullQueryPublisher.class);
    private final ListeningScheduledExecutorService exec;
    private final PullQueryResult result;
    private final MetricsCallbackHolder metricsCallbackHolder;
    private final long startTimeNanos;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher$PullQuerySubscription.class */
    private static final class PullQuerySubscription extends PollingSubscription<Collection<StreamedRow>> {
        private final Flow.Subscriber<Collection<StreamedRow>> subscriber;
        private final PullQueryResult result;

        private PullQuerySubscription(ListeningScheduledExecutorService listeningScheduledExecutorService, Flow.Subscriber<Collection<StreamedRow>> subscriber, PullQueryResult pullQueryResult) {
            super(listeningScheduledExecutorService, subscriber, pullQueryResult.getSchema());
            this.subscriber = (Flow.Subscriber) Objects.requireNonNull(subscriber, "subscriber");
            this.result = (PullQueryResult) Objects.requireNonNull(pullQueryResult, "result");
            pullQueryResult.onCompletion(r6 -> {
                Optional consistencyOffsetVector = pullQueryResult.getConsistencyOffsetVector();
                PullQueryWriteStream pullQueryQueue = pullQueryResult.getPullQueryQueue();
                pullQueryQueue.getClass();
                consistencyOffsetVector.ifPresent(pullQueryQueue::putConsistencyVector);
                setDone();
            });
            pullQueryResult.onException(this::setError);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.confluent.ksql.rest.server.resources.streaming.PollingSubscription
        public Collection<StreamedRow> poll() {
            LinkedList newLinkedList = Lists.newLinkedList();
            this.result.getPullQueryQueue().drainTo(newLinkedList);
            if (newLinkedList.isEmpty()) {
                return null;
            }
            return (Collection) newLinkedList.stream().map(keyValueMetadata -> {
                return (keyValueMetadata.getRowMetadata().isPresent() && ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getConsistencyOffsetVector().isPresent()) ? StreamedRow.consistencyToken(new ConsistencyToken(((ConsistencyOffsetVector) ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getConsistencyOffsetVector().get()).serialize())) : StreamedRow.pushRow((GenericRow) keyValueMetadata.getKeyValue().value());
            }).collect(Collectors.toCollection(Lists::newLinkedList));
        }

        @Override // io.confluent.ksql.rest.server.resources.streaming.PollingSubscription
        void close() {
            this.result.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PullQueryPublisher(ListeningScheduledExecutorService listeningScheduledExecutorService, PullQueryResult pullQueryResult, MetricsCallbackHolder metricsCallbackHolder, long j) {
        this.exec = (ListeningScheduledExecutorService) Objects.requireNonNull(listeningScheduledExecutorService, "exec");
        this.result = (PullQueryResult) Objects.requireNonNull(pullQueryResult, "result");
        this.metricsCallbackHolder = metricsCallbackHolder;
        this.startTimeNanos = j;
    }

    @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Publisher
    public synchronized void subscribe(Flow.Subscriber<Collection<StreamedRow>> subscriber) {
        PullQuerySubscription pullQuerySubscription = new PullQuerySubscription(this.exec, subscriber, this.result);
        this.result.start();
        ((WebSocketSubscriber) subscriber).onSubscribe(pullQuerySubscription, this.metricsCallbackHolder, this.startTimeNanos);
    }
}
