package io.confluent.ksql.api.impl;

import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.reactive.BasePublisher;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.server.resources.streaming.PrintTopicUtil;
import io.confluent.ksql.rest.server.resources.streaming.RecordFormatter;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.WorkerExecutor;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/api/impl/BlockingPrintPublisher.class */
public class BlockingPrintPublisher extends BasePublisher<String> {
    private static final Logger log = LogManager.getLogger(BlockingPrintPublisher.class);
    private static final Duration POLL_TIMEOUT = Duration.ofMillis(5000);
    public static final int SEND_MAX_BATCH_SIZE = 200;
    private final WorkerExecutor workerExecutor;
    private final KafkaConsumer<Bytes, Bytes> topicConsumer;
    private final PrintTopic printTopic;
    private final int limit;
    private final long interval;
    private final RecordFormatter formatter;
    private volatile boolean closed;
    private volatile boolean started;

    public BlockingPrintPublisher(Context context, WorkerExecutor workerExecutor, ServiceContext serviceContext, KsqlConfig ksqlConfig, Map<String, Object> map, PrintTopic printTopic) {
        super(context);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(workerExecutor);
        this.printTopic = printTopic;
        this.limit = printTopic.getLimit().orElse(Integer.MAX_VALUE);
        this.interval = printTopic.getIntervalValue();
        this.formatter = new RecordFormatter(serviceContext.getSchemaRegistryClient(), printTopic.getTopic());
        this.topicConsumer = createTopicConsumer(serviceContext, ksqlConfig, map, printTopic);
    }

    public Future<Void> close() {
        if (this.closed) {
            return Future.succeededFuture();
        }
        this.closed = true;
        this.ctx.runOnContext(r3 -> {
            sendComplete();
        });
        return super.close();
    }

    protected void maybeSend() {
        executeOnWorker(this::doSend);
    }

    protected void afterSubscribe() {
        if (this.started) {
            return;
        }
        this.started = true;
    }

    public void startFromWorkerThread() {
        VertxUtils.checkIsWorker();
        this.started = true;
    }

    private void executeOnWorker(Runnable runnable) {
        this.workerExecutor.executeBlocking(promise -> {
            runnable.run();
        }, false, asyncResult -> {
            if (asyncResult.failed()) {
                log.error("Failed to close print", asyncResult.cause());
            }
        });
    }

    private void doSend() {
        VertxUtils.checkIsWorker();
        int i = 0;
        int i2 = 0;
        while (!isClosed() && getDemand() > 0 && i < this.limit) {
            ConsumerRecords poll = this.topicConsumer.poll(POLL_TIMEOUT);
            if (!poll.isEmpty()) {
                Iterator<String> it = this.formatter.format(poll.records(this.printTopic.getTopic())).iterator();
                while (true) {
                    if (it.hasNext()) {
                        String next = it.next();
                        int i3 = i2;
                        i2++;
                        if (i3 % this.interval == 0) {
                            i++;
                            doOnNext(next);
                        }
                        if (i >= this.limit) {
                            this.ctx.runOnContext(r3 -> {
                                sendComplete();
                            });
                            break;
                        } else if (i >= 200) {
                            this.ctx.runOnContext(r32 -> {
                                doSend();
                            });
                            break;
                        }
                    }
                }
            }
        }
        if (isClosed()) {
            this.topicConsumer.close();
        }
    }

    private KafkaConsumer<Bytes, Bytes> createTopicConsumer(ServiceContext serviceContext, KsqlConfig ksqlConfig, Map<String, Object> map, PrintTopic printTopic) {
        try {
            if (((Set) serviceContext.getAdminClient().listTopics().names().get()).stream().anyMatch(str -> {
                return str.equalsIgnoreCase(printTopic.getTopic());
            })) {
                return PrintTopicUtil.createTopicConsumer(serviceContext, populateKsqlStreamConfigProps(overrideDefaultKsqlStreamConfigProps(ksqlConfig), map), printTopic);
            }
            throw new KsqlApiException("Topic does not exist: " + printTopic.getTopic(), Errors.ERROR_CODE_BAD_STATEMENT);
        } catch (InterruptedException | ExecutionException e) {
            throw new KsqlException("Could not list existing kafka topics" + String.valueOf(e));
        }
    }

    private Map<String, Object> populateKsqlStreamConfigProps(Map<String, Object> map, Map<String, Object> map2) {
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(map2);
        return hashMap;
    }

    private Map<String, Object> overrideDefaultKsqlStreamConfigProps(KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap(ksqlConfig.getKsqlStreamConfigProps());
        hashMap.put("auto.offset.reset", "latest");
        return hashMap;
    }

    private boolean isClosed() {
        return this.closed;
    }
}
