package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.entity.TableRowsEntity;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.resources.streaming.Flow;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.class */
class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> {
    private final ServiceContext serviceContext;
    private final ConfiguredStatement<Query> query;
    private final PullQueryExecutor pullQueryExecutor;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher$PullQuerySubscription.class */
    private static final class PullQuerySubscription implements Flow.Subscription {
        private final Flow.Subscriber<Collection<StreamedRow>> subscriber;
        private final Callable<TableRowsEntity> executor;
        private boolean done;

        private PullQuerySubscription(Flow.Subscriber<Collection<StreamedRow>> subscriber, Callable<TableRowsEntity> callable) {
            this.done = false;
            this.subscriber = (Flow.Subscriber) Objects.requireNonNull(subscriber, "subscriber");
            this.executor = (Callable) Objects.requireNonNull(callable, "executor");
        }

        @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Subscription
        public void request(long j) {
            Preconditions.checkArgument(j == 1, "number of requested items must be 1");
            if (this.done) {
                return;
            }
            this.done = true;
            try {
                TableRowsEntity call = this.executor.call();
                this.subscriber.onSchema(call.getSchema());
                this.subscriber.onNext((List) call.getRows().stream().map(PullQuerySubscription::toGenericRow).map(StreamedRow::row).collect(Collectors.toList()));
                this.subscriber.onComplete();
            } catch (Exception e) {
                this.subscriber.onError(e);
            }
        }

        @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Subscription
        public void cancel() {
        }

        private static GenericRow toGenericRow(List<?> list) {
            return new GenericRow().appendAll(list);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PullQueryPublisher(ServiceContext serviceContext, ConfiguredStatement<Query> configuredStatement, PullQueryExecutor pullQueryExecutor) {
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.query = (ConfiguredStatement) Objects.requireNonNull(configuredStatement, "query");
        this.pullQueryExecutor = (PullQueryExecutor) Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");
    }

    @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Publisher
    public synchronized void subscribe(Flow.Subscriber<Collection<StreamedRow>> subscriber) {
        subscriber.onSubscribe(new PullQuerySubscription(subscriber, () -> {
            return this.pullQueryExecutor.execute(this.query, this.serviceContext);
        }));
    }
}
