package io.confluent.ksql.rest.server;

import com.google.common.collect.Lists;
import io.confluent.ksql.rest.server.computation.CommandStatusFuture;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/CommandTopic.class */
public class CommandTopic {
    private static final Logger log = LoggerFactory.getLogger(CommandTopic.class);
    private final TopicPartition commandTopicPartition;
    private Consumer<byte[], byte[]> commandConsumer;
    private final String commandTopicName;
    private CommandTopicBackup commandTopicBackup;

    public CommandTopic(String str, Map<String, Object> map, CommandTopicBackup commandTopicBackup) {
        this(str, (Consumer<byte[], byte[]>) new KafkaConsumer((Map) Objects.requireNonNull(map, "kafkaClientProperties"), new ByteArrayDeserializer(), new ByteArrayDeserializer()), commandTopicBackup);
    }

    CommandTopic(String str, Consumer<byte[], byte[]> consumer, CommandTopicBackup commandTopicBackup) {
        this.commandTopicPartition = new TopicPartition(str, 0);
        this.commandConsumer = (Consumer) Objects.requireNonNull(consumer, "commandConsumer");
        this.commandTopicName = (String) Objects.requireNonNull(str, "commandTopicName");
        this.commandTopicBackup = (CommandTopicBackup) Objects.requireNonNull(commandTopicBackup, "commandTopicBackup");
    }

    public String getCommandTopicName() {
        return this.commandTopicName;
    }

    public void start() {
        this.commandTopicBackup.initialize();
        this.commandConsumer.assign(Collections.singleton(this.commandTopicPartition));
    }

    public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(Duration duration) {
        ConsumerRecords<ConsumerRecord<byte[], byte[]>> poll = this.commandConsumer.poll(duration);
        ArrayList arrayList = new ArrayList();
        if (poll != null) {
            for (ConsumerRecord<byte[], byte[]> consumerRecord : poll) {
                try {
                    backupRecord(consumerRecord);
                    arrayList.add(consumerRecord);
                } catch (Exception e) {
                    log.warn("Backup is out of sync with the current command topic. Backups will not work until the previous command topic is restored or all backup files are deleted.", e);
                    return arrayList;
                }
            }
        }
        return arrayList;
    }

    public List<QueuedCommand> getRestoreCommands(Duration duration) {
        ArrayList newArrayList = Lists.newArrayList();
        this.commandConsumer.seekToBeginning(Collections.singletonList(this.commandTopicPartition));
        log.debug("Reading prior command records");
        ConsumerRecords poll = this.commandConsumer.poll(duration);
        while (true) {
            ConsumerRecords consumerRecords = poll;
            if (consumerRecords.isEmpty()) {
                return newArrayList;
            }
            log.debug("Received {} records from poll", Integer.valueOf(consumerRecords.count()));
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                try {
                    backupRecord(consumerRecord);
                    if (consumerRecord.value() != null) {
                        newArrayList.add(new QueuedCommand((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), (Optional<CommandStatusFuture>) Optional.empty(), Long.valueOf(consumerRecord.offset())));
                    }
                } catch (Exception e) {
                    log.warn("Backup is out of sync with the current command topic. Backups will not work until the previous command topic is restored or all backup files are deleted.", e);
                    return newArrayList;
                }
            }
            poll = this.commandConsumer.poll(duration);
        }
    }

    public long getCommandTopicConsumerPosition() {
        return this.commandConsumer.position(this.commandTopicPartition);
    }

    public long getEndOffset() {
        return ((Long) this.commandConsumer.endOffsets(Collections.singletonList(this.commandTopicPartition)).get(this.commandTopicPartition)).longValue();
    }

    public void wakeup() {
        this.commandConsumer.wakeup();
    }

    public void close() {
        this.commandConsumer.close();
        this.commandTopicBackup.close();
    }

    private void backupRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        this.commandTopicBackup.writeRecord(consumerRecord);
    }
}
