package io.confluent.ksql.rest.server.computation;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.CommandTopic;
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.CommandTopicBackupImpl;
import io.confluent.ksql.rest.server.CommandTopicBackupNoOp;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandStore.class */
public class CommandStore implements CommandQueue, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(CommandStore.class);
    private static final Duration POLLING_TIMEOUT_FOR_COMMAND_TOPIC = Duration.ofMillis(5000);
    private static final int COMMAND_TOPIC_PARTITION = 0;
    private final CommandTopic commandTopic;
    private final Map<CommandId, CommandStatusFuture> commandStatusMap = Maps.newConcurrentMap();
    private final SequenceNumberFutureStore sequenceNumberFutureStore;
    private final String commandTopicName;
    private final Duration commandQueueCatchupTimeout;
    private final Map<String, Object> kafkaConsumerProperties;
    private final Map<String, Object> kafkaProducerProperties;
    private final Serializer<CommandId> commandIdSerializer;
    private final Serializer<Command> commandSerializer;
    private final Deserializer<CommandId> commandIdDeserializer;
    private final CommandTopicBackup commandTopicBackup;

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandStore$Factory.class */
    public static final class Factory {
        private Factory() {
        }

        public static CommandStore create(KsqlConfig ksqlConfig, String str, Duration duration, Map<String, Object> map, Map<String, Object> map2) {
            map.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
            map.put("auto.offset.reset", "none");
            map2.put("transactional.id", ksqlConfig.getString("ksql.service.id"));
            map2.put("acks", "all");
            CommandTopicBackup commandTopicBackupNoOp = new CommandTopicBackupNoOp();
            if (!ksqlConfig.getString("ksql.metastore.backup.location").isEmpty()) {
                commandTopicBackupNoOp = new CommandTopicBackupImpl(ksqlConfig.getString("ksql.metastore.backup.location"), str);
            }
            return new CommandStore(str, new CommandTopic(str, map, commandTopicBackupNoOp), new SequenceNumberFutureStore(), map, map2, duration, InternalTopicSerdes.serializer(), InternalTopicSerdes.serializer(), InternalTopicSerdes.deserializer(CommandId.class), commandTopicBackupNoOp);
        }
    }

    CommandStore(String str, CommandTopic commandTopic, SequenceNumberFutureStore sequenceNumberFutureStore, Map<String, Object> map, Map<String, Object> map2, Duration duration, Serializer<CommandId> serializer, Serializer<Command> serializer2, Deserializer<CommandId> deserializer, CommandTopicBackup commandTopicBackup) {
        this.commandTopic = (CommandTopic) Objects.requireNonNull(commandTopic, "commandTopic");
        this.sequenceNumberFutureStore = (SequenceNumberFutureStore) Objects.requireNonNull(sequenceNumberFutureStore, "sequenceNumberFutureStore");
        this.commandQueueCatchupTimeout = (Duration) Objects.requireNonNull(duration, "commandQueueCatchupTimeout");
        this.kafkaConsumerProperties = (Map) Objects.requireNonNull(map, "kafkaConsumerProperties");
        this.kafkaProducerProperties = (Map) Objects.requireNonNull(map2, "kafkaProducerProperties");
        this.commandTopicName = (String) Objects.requireNonNull(str, "commandTopicName");
        this.commandIdSerializer = (Serializer) Objects.requireNonNull(serializer, "commandIdSerializer");
        this.commandSerializer = (Serializer) Objects.requireNonNull(serializer2, "commandSerializer");
        this.commandIdDeserializer = (Deserializer) Objects.requireNonNull(deserializer, "commandIdDeserializer");
        this.commandTopicBackup = (CommandTopicBackup) Objects.requireNonNull(commandTopicBackup, "commandTopicBackup");
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public void wakeup() {
        this.commandTopic.wakeup();
    }

    public String getCommandTopicName() {
        return this.commandTopic.getCommandTopicName();
    }

    public void start() {
        this.commandTopic.start();
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.commandTopic.close();
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public QueuedCommandStatus enqueueCommand(CommandId commandId, Command command, Producer<CommandId, Command> producer) {
        try {
            return new QueuedCommandStatus(((RecordMetadata) producer.send(new ProducerRecord(this.commandTopicName, Integer.valueOf(COMMAND_TOPIC_PARTITION), commandId, command)).get()).offset(), this.commandStatusMap.compute(commandId, (commandId2, commandStatusFuture) -> {
                if (commandStatusFuture == null) {
                    return new CommandStatusFuture(commandId);
                }
                throw new IllegalStateException(String.format("Another command with the same id (%s) is being executed.", commandId));
            }));
        } catch (Exception e) {
            this.commandStatusMap.remove(commandId);
            throw new KsqlException(String.format("Could not write the statement '%s' into the command topic.", command.getStatement()), e);
        }
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public List<QueuedCommand> getNewCommands(Duration duration) {
        completeSatisfiedSequenceNumberFutures();
        ArrayList newArrayList = Lists.newArrayList();
        for (ConsumerRecord<byte[], byte[]> consumerRecord : this.commandTopic.getNewCommands(duration)) {
            if (consumerRecord.value() != null) {
                Optional empty = Optional.empty();
                try {
                    empty = Optional.ofNullable(this.commandStatusMap.remove((CommandId) this.commandIdDeserializer.deserialize(this.commandTopicName, (byte[]) consumerRecord.key())));
                } catch (Exception e) {
                    LOG.warn("Error while attempting to fetch from commandStatusMap for key {}", consumerRecord.key(), e);
                }
                newArrayList.add(new QueuedCommand((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), (Optional<CommandStatusFuture>) empty, Long.valueOf(consumerRecord.offset())));
            }
        }
        return newArrayList;
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public List<QueuedCommand> getRestoreCommands() {
        return this.commandTopic.getRestoreCommands(POLLING_TIMEOUT_FOR_COMMAND_TOPIC);
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public void ensureConsumedPast(long j, Duration duration) throws InterruptedException, TimeoutException {
        try {
            this.sequenceNumberFutureStore.getFutureForSequenceNumber(j).get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof RuntimeException)) {
                throw new RuntimeException("Error waiting for command sequence number of " + j, e.getCause());
            }
            throw ((RuntimeException) e.getCause());
        } catch (TimeoutException e2) {
            throw new TimeoutException(String.format("Timeout reached while waiting for command sequence number of %d. Caused by: %s (Timeout: %d ms)", Long.valueOf(j), e2.getMessage(), Long.valueOf(duration.toMillis())));
        }
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public Producer<CommandId, Command> createTransactionalProducer() {
        return new KafkaProducer(this.kafkaProducerProperties, this.commandIdSerializer, this.commandSerializer);
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public void abortCommand(CommandId commandId) {
        this.commandStatusMap.compute(commandId, (commandId2, commandStatusFuture) -> {
            if (commandStatusFuture == null) {
                return null;
            }
            LOG.info("Aborting existing command {}", commandId);
            return null;
        });
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public void waitForCommandConsumer() {
        try {
            ensureConsumedPast(getCommandTopicOffset() - 1, this.commandQueueCatchupTimeout);
        } catch (InterruptedException e) {
            throw new KsqlServerException("Interrupted while waiting for command topic consumer to process command topic");
        } catch (TimeoutException e2) {
            throw new KsqlServerException("Timeout while waiting for command topic consumer to process command topic");
        }
    }

    private long getCommandTopicOffset() {
        TopicPartition topicPartition = new TopicPartition(this.commandTopicName, COMMAND_TOPIC_PARTITION);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.kafkaConsumerProperties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Throwable th = COMMAND_TOPIC_PARTITION;
        try {
            try {
                kafkaConsumer.assign(Collections.singleton(topicPartition));
                long longValue = ((Long) kafkaConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition)).longValue();
                if (kafkaConsumer != null) {
                    if (th != null) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return longValue;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public boolean corruptionDetected() {
        return this.commandTopicBackup.commandTopicCorruption();
    }

    @Override // io.confluent.ksql.rest.server.computation.CommandQueue
    public boolean isEmpty() {
        return this.commandTopic.getEndOffset() == 0;
    }

    private void completeSatisfiedSequenceNumberFutures() {
        this.sequenceNumberFutureStore.completeFuturesUpToAndIncludingSequenceNumber(this.commandTopic.getCommandTopicConsumerPosition() - 1);
    }
}
