package io.confluent.ksql.rest.server.resources.streaming;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.services.ServiceContext;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriter.class */
public class TopicStreamWriter implements StreamingOutput {
    private static final Logger log = LoggerFactory.getLogger(TopicStreamWriter.class);
    private final long interval;
    private final Duration disconnectCheckInterval;
    private final KafkaConsumer<Bytes, Bytes> topicConsumer;
    private final SchemaRegistryClient schemaRegistryClient;
    private final String topicName;
    private final Predicate<Long> limitReached;
    private long messagesWritten;
    private long messagesPolled;
    private volatile boolean connectionClosed;
    private boolean closed;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriter$FormatsTracker.class */
    private static final class FormatsTracker {
        private final PrintStream out;
        private final List<String> keyFormats = new ArrayList();
        private final List<String> valueFormats = new ArrayList();

        FormatsTracker(PrintStream printStream) {
            this.out = (PrintStream) Objects.requireNonNull(printStream, "out");
            this.keyFormats.add("add an entry to force output for formats on the first loop");
            this.valueFormats.add("add an entry to force output for formats on the first loop");
        }

        public void update(RecordFormatter recordFormatter) {
            update(this.out, this.keyFormats, recordFormatter.getPossibleKeyFormats(), "Key format: ");
            update(this.out, this.valueFormats, recordFormatter.getPossibleValueFormats(), "Value format: ");
        }

        private static void update(PrintStream printStream, List<String> list, List<String> list2, String str) {
            if (list.equals(list2)) {
                return;
            }
            list.clear();
            list.addAll(list2);
            printStream.print(str);
            if (list2.isEmpty()) {
                printStream.println(" does not match any supported format. It may be a STRING with encoding other than UTF8, or some other format.");
            } else {
                printStream.println(String.join(" or ", list2));
            }
        }
    }

    public static TopicStreamWriter create(ServiceContext serviceContext, Map<String, Object> map, PrintTopic printTopic, Duration duration, CompletableFuture<Void> completableFuture) {
        return new TopicStreamWriter(serviceContext.getSchemaRegistryClient(), PrintTopicUtil.createTopicConsumer(serviceContext, map, printTopic), printTopic.getTopic(), printTopic.getIntervalValue(), duration, printTopic.getLimit(), completableFuture);
    }

    TopicStreamWriter(SchemaRegistryClient schemaRegistryClient, KafkaConsumer<Bytes, Bytes> kafkaConsumer, String str, long j, Duration duration, OptionalInt optionalInt, CompletableFuture<Void> completableFuture) {
        this.topicConsumer = (KafkaConsumer) Objects.requireNonNull(kafkaConsumer, "topicConsumer");
        this.schemaRegistryClient = (SchemaRegistryClient) Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient");
        this.topicName = (String) Objects.requireNonNull(str, "topicName");
        this.interval = j;
        this.limitReached = ((OptionalInt) Objects.requireNonNull(optionalInt, "limit")).isPresent() ? l -> {
            return l.longValue() >= ((long) optionalInt.getAsInt());
        } : l2 -> {
            return false;
        };
        this.disconnectCheckInterval = (Duration) Objects.requireNonNull(duration, "disconnectCheckInterval");
        this.messagesWritten = 0L;
        this.messagesPolled = 0L;
        completableFuture.thenAccept(r4 -> {
            this.connectionClosed = true;
        });
        if (j < 1) {
            throw new IllegalArgumentException("INTERVAL must be greater than one, but was: " + j);
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x00AC: MOVE_MULTI, method: io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter.write(java.io.OutputStream):void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    @Override // io.confluent.ksql.api.server.StreamingOutput
    public void write(java.io.OutputStream r9) {
        /*
            Method dump skipped, instructions count: 301
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter.write(java.io.OutputStream):void");
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.topicConsumer.close();
        this.closed = true;
    }

    private static void outputException(OutputStream outputStream, Exception exc) {
        try {
            outputStream.write(exc.getMessage().getBytes(StandardCharsets.UTF_8));
            outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
        } catch (IOException e) {
            log.debug("Client disconnected while attempting to write an error message");
        }
    }
}
