package io.confluent.ksql.rest.server.resources.streaming;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.rest.server.resources.streaming.Flow;
import io.confluent.ksql.services.ServiceContext;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PrintPublisher.class */
public class PrintPublisher implements Flow.Publisher<Collection<String>> {
    private static final Logger log = LogManager.getLogger(PrintPublisher.class);
    private final ListeningScheduledExecutorService exec;
    private final ServiceContext serviceContext;
    private final Map<String, Object> consumerProperties;
    private final PrintTopic printTopic;

    /* JADX INFO: Access modifiers changed from: private */
    @SuppressFBWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"})
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PrintPublisher$LimitIntervalCollection.class */
    public static final class LimitIntervalCollection<T> extends AbstractCollection<T> {
        private final Iterable<T> source;
        private final int limit;
        private final int interval;
        private final int size;

        private LimitIntervalCollection(Collection<T> collection, int i, int i2, int i3) {
            Preconditions.checkArgument(i2 > 0, "interval must be greater than 0");
            Preconditions.checkArgument(i3 >= 0, "start must be greater than or equal to 0");
            Preconditions.checkArgument(i >= 0, "limit must be greater than or equal to 0");
            Objects.requireNonNull(collection, "source");
            this.source = Iterables.skip(collection, i3);
            this.size = Math.min(IntMath.divide(collection.size() - i3, i2, RoundingMode.CEILING), i);
            this.interval = i2;
            this.limit = i;
        }

        @Override // java.util.AbstractCollection, java.util.Collection, java.lang.Iterable
        @Nonnull
        public Iterator<T> iterator() {
            return new Iterator<T>() { // from class: io.confluent.ksql.rest.server.resources.streaming.PrintPublisher.LimitIntervalCollection.1
                final Iterator<T> it;
                int remaining;

                {
                    this.it = LimitIntervalCollection.this.source.iterator();
                    this.remaining = LimitIntervalCollection.this.limit;
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.remaining > 0 && this.it.hasNext();
                }

                @Override // java.util.Iterator
                public T next() {
                    this.remaining--;
                    T next = this.it.next();
                    Iterators.advance(this.it, LimitIntervalCollection.this.interval - 1);
                    return next;
                }
            };
        }

        @Override // java.util.AbstractCollection, java.util.Collection
        public int size() {
            return this.size;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PrintPublisher$PrintSubscription.class */
    static class PrintSubscription extends PollingSubscription<Collection<String>> {
        private final PrintTopic printTopic;
        private final KafkaConsumer<Bytes, Bytes> topicConsumer;
        private final RecordFormatter formatter;
        private boolean closed;
        private int numPolled;
        private int numWritten;

        PrintSubscription(ListeningScheduledExecutorService listeningScheduledExecutorService, PrintTopic printTopic, Flow.Subscriber<Collection<String>> subscriber, KafkaConsumer<Bytes, Bytes> kafkaConsumer, RecordFormatter recordFormatter) {
            super(listeningScheduledExecutorService, subscriber, null);
            this.closed = false;
            this.numPolled = 0;
            this.numWritten = 0;
            this.printTopic = (PrintTopic) Objects.requireNonNull(printTopic, "printTopic");
            this.topicConsumer = (KafkaConsumer) Objects.requireNonNull(kafkaConsumer, "topicConsumer");
            this.formatter = (RecordFormatter) Objects.requireNonNull(recordFormatter, "formatter");
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.confluent.ksql.rest.server.resources.streaming.PollingSubscription
        public Collection<String> poll() {
            try {
                Iterable<ConsumerRecord<Bytes, Bytes>> poll = this.topicConsumer.poll(Duration.ZERO);
                if (poll.isEmpty()) {
                    return null;
                }
                List<String> format = this.formatter.format(poll);
                LimitIntervalCollection limitIntervalCollection = new LimitIntervalCollection(format, this.printTopic.getLimit().orElse(Integer.MAX_VALUE) - this.numWritten, this.printTopic.getIntervalValue(), this.numPolled % this.printTopic.getIntervalValue());
                this.numPolled += format.size();
                this.numWritten += limitIntervalCollection.size();
                if (this.printTopic.getLimit().isPresent() && this.numWritten >= this.printTopic.getLimit().getAsInt()) {
                    setDone();
                }
                return limitIntervalCollection;
            } catch (Exception e) {
                setError(e);
                return null;
            }
        }

        @Override // io.confluent.ksql.rest.server.resources.streaming.PollingSubscription
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            PrintPublisher.log.info("Closing consumer for topic {}", this.printTopic.getTopic());
            this.closed = true;
            this.topicConsumer.close();
        }
    }

    public PrintPublisher(ListeningScheduledExecutorService listeningScheduledExecutorService, ServiceContext serviceContext, Map<String, Object> map, PrintTopic printTopic) {
        this.exec = (ListeningScheduledExecutorService) Objects.requireNonNull(listeningScheduledExecutorService, "exec");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.consumerProperties = (Map) Objects.requireNonNull(map, "consumerProperties");
        this.printTopic = (PrintTopic) Objects.requireNonNull(printTopic, "printTopic");
    }

    @Override // io.confluent.ksql.rest.server.resources.streaming.Flow.Publisher
    public void subscribe(Flow.Subscriber<Collection<String>> subscriber) {
        subscriber.onSubscribe(new PrintSubscription(this.exec, this.printTopic, subscriber, PrintTopicUtil.createTopicConsumer(this.serviceContext, this.consumerProperties, this.printTopic), new RecordFormatter(this.serviceContext.getSchemaRegistryClient(), this.printTopic.getTopic())));
    }
}
