package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.rest.entity.PushContinuationToken;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.streaming.Flow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PushOffsetRange;
import io.confluent.ksql.util.PushQueryMetadata;
import io.confluent.ksql.util.RowMetadata;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher.class */
final class PushQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
    private static final Logger log = LogManager.getLogger(PushQueryPublisher.class);
    private final ListeningScheduledExecutorService exec;
    private final PushQueryMetadata queryMetadata;
    private final MetricsCallbackHolder metricsCallbackHolder;
    private final long startTimeNanos;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PushQueryPublisher$PushQuerySubscription.class */
    static class PushQuerySubscription extends PollingSubscription<Collection<StreamedRow>> {
        private final PushQueryMetadata queryMetadata;
        private boolean closed;

        PushQuerySubscription(ListeningScheduledExecutorService listeningScheduledExecutorService, Flow.Subscriber<Collection<StreamedRow>> subscriber, PushQueryMetadata pushQueryMetadata) {
            super(listeningScheduledExecutorService, subscriber, PushQueryPublisher.valueColumnOnly(pushQueryMetadata.getLogicalSchema()));
            this.closed = false;
            this.queryMetadata = (PushQueryMetadata) Objects.requireNonNull(pushQueryMetadata, "queryMetadata");
            pushQueryMetadata.setLimitHandler(this::setDone);
            pushQueryMetadata.setCompletionHandler(this::setDone);
            pushQueryMetadata.setUncaughtExceptionHandler(th -> {
                setError(th);
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
            });
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.confluent.ksql.rest.server.resources.streaming.PollingSubscription
        public Collection<StreamedRow> poll() {
            LinkedList newLinkedList = Lists.newLinkedList();
            this.queryMetadata.getRowQueue().drainTo(newLinkedList);
            if (newLinkedList.isEmpty()) {
                return null;
            }
            return (Collection) newLinkedList.stream().map(keyValueMetadata -> {
                return (keyValueMetadata.getRowMetadata().isPresent() && ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getPushOffsetsRange().isPresent()) ? StreamedRow.continuationToken(new PushContinuationToken(((PushOffsetRange) ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getPushOffsetsRange().get()).serialize())) : StreamedRow.pushRow((GenericRow) keyValueMetadata.getKeyValue().value());
            }).collect(Collectors.toCollection(Lists::newLinkedList));
        }

        @Override // io.confluent.ksql.rest.server.resources.streaming.PollingSubscription
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            PushQueryPublisher.log.info("Terminating query {}", this.queryMetadata.getQueryId().toString());
            this.queryMetadata.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PushQueryPublisher(ListeningScheduledExecutorService listeningScheduledExecutorService, PushQueryMetadata pushQueryMetadata, MetricsCallbackHolder metricsCallbackHolder, long j) {
        this.exec = listeningScheduledExecutorService;
        this.queryMetadata = pushQueryMetadata;
        this.metricsCallbackHolder = metricsCallbackHolder;
        this.startTimeNanos = j;
    }

    @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Publisher
    public synchronized void subscribe(Flow.Subscriber<Collection<StreamedRow>> subscriber) {
        PushQuerySubscription pushQuerySubscription = new PushQuerySubscription(this.exec, subscriber, this.queryMetadata);
        log.info("Running query {}", this.queryMetadata.getQueryId().toString());
        this.queryMetadata.start();
        ((WebSocketSubscriber) subscriber).onSubscribe(pushQuerySubscription, this.metricsCallbackHolder, this.startTimeNanos);
    }

    private static LogicalSchema valueColumnOnly(LogicalSchema logicalSchema) {
        LogicalSchema.Builder builder = LogicalSchema.builder();
        List value = logicalSchema.value();
        Objects.requireNonNull(builder);
        value.forEach((v1) -> {
            r1.valueColumn(v1);
        });
        return builder.build();
    }
}
