package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/CommandTopicMigrationUtil.class */
public final class CommandTopicMigrationUtil {
    private static final Logger log = LoggerFactory.getLogger(CommandTopicMigrationUtil.class);
    public static final CommandId MIGRATION_COMMAND_ID = new CommandId(CommandId.Type.CLUSTER, "migration", CommandId.Action.ALTER);

    private CommandTopicMigrationUtil() {
    }

    public static void commandTopicMigration(String str, KsqlRestConfig ksqlRestConfig, KsqlConfig ksqlConfig) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        KafkaProducer kafkaProducer = new KafkaProducer(ksqlConfig.originals(), InternalTopicSerdes.serializer(), InternalTopicSerdes.serializer());
        kafkaProducer.send(new ProducerRecord(str, Integer.valueOf(topicPartition.partition()), MIGRATION_COMMAND_ID, new Command("", Collections.emptyMap(), Collections.emptyMap(), Optional.empty(), Optional.of(2147483646), Integer.MAX_VALUE)));
        kafkaProducer.close();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(ksqlConfig.originals(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        kafkaConsumer.assign(Collections.singleton(topicPartition));
        List<QueuedCommand> allCommandsInCommandTopic = CommandTopic.getAllCommandsInCommandTopic(kafkaConsumer, topicPartition, Optional.empty(), CommandStore.POLLING_TIMEOUT_FOR_COMMAND_TOPIC);
        kafkaConsumer.close();
        ArrayList<QueuedCommand> arrayList = new ArrayList();
        for (QueuedCommand queuedCommand : allCommandsInCommandTopic) {
            if (queuedCommand.getAndDeserializeCommandId().equals(MIGRATION_COMMAND_ID)) {
                log.info("skipping migration command sent to old command topic when migrating to new one");
            } else {
                arrayList.add(queuedCommand);
            }
        }
        Map<String, Object> commandProducerProperties = ksqlRestConfig.getCommandProducerProperties();
        commandProducerProperties.put("transactional.id", ksqlConfig.getString("ksql.service.id") + "-migration-producer");
        try {
            KafkaProducer kafkaProducer2 = new KafkaProducer(commandProducerProperties, new ByteArraySerializer(), new ByteArraySerializer());
            Throwable th = null;
            try {
                try {
                    kafkaProducer2.initTransactions();
                    kafkaProducer2.beginTransaction();
                    for (QueuedCommand queuedCommand2 : arrayList) {
                        kafkaProducer2.send(new ProducerRecord(str, 0, queuedCommand2.getCommandId(), queuedCommand2.getCommand()));
                    }
                    kafkaProducer2.commitTransaction();
                    if (kafkaProducer2 != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer2.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaProducer2.close();
                        }
                    }
                    log.info("Finished migrating command topic for ksql with id {}", ksqlConfig.getString("ksql.service.id"));
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new KsqlException("error producing messages to command topic during migration", e);
        }
    }
}
