package io.confluent.ksql.rest.server.restore;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
import io.confluent.ksql.util.JavaSystemExit;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.QueryApplicationId;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.SystemExit;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.json.JSONObject;

/* loaded from: input_file:io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.class */
public class KsqlRestoreCommandTopic {
    private static final Serializer<byte[]> BYTES_SERIALIZER = new ByteArraySerializer();
    private static final int COMMAND_TOPIC_PARTITION = 0;
    private static long timer;
    private final KsqlConfig serverConfig;
    private final String commandTopicName;
    private final KafkaTopicClient topicClient;
    private final Supplier<Producer<byte[], byte[]>> kafkaProducerSupplier;

    private static KsqlConfig loadServerConfig(File file) {
        return new KsqlConfig(PropertiesUtil.loadProperties(file));
    }

    public static List<Pair<byte[], byte[]>> loadBackup(File file, RestoreOptions restoreOptions, KsqlConfig ksqlConfig) throws IOException {
        BackupReplayFile readOnly = BackupReplayFile.readOnly(file);
        try {
            List<Pair<byte[], byte[]>> readRecords = readOnly.readRecords();
            if (readOnly != null) {
                readOnly.close();
            }
            return checkValidCommands(readRecords, restoreOptions.isSkipIncompatibleCommands(), ksqlConfig);
        } catch (Throwable th) {
            if (readOnly != null) {
                try {
                    readOnly.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static List<Pair<byte[], byte[]>> checkValidCommands(List<Pair<byte[], byte[]>> list, boolean z, KsqlConfig ksqlConfig) {
        int i = 0;
        int i2 = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Pair<byte[], byte[]> pair : list) {
            i++;
            try {
                Deserializer deserializer = InternalTopicSerdes.deserializer(CommandId.class);
                try {
                    deserializer.deserialize((String) null, (byte[]) pair.getLeft());
                    if (deserializer != null) {
                        deserializer.close();
                    }
                    try {
                        Deserializer deserializer2 = InternalTopicSerdes.deserializer(Command.class);
                        try {
                            deserializer2.deserialize((String) null, (byte[]) pair.getRight());
                            if (deserializer2 != null) {
                                deserializer2.close();
                            }
                            arrayList.add(pair);
                        } catch (Throwable th) {
                            if (deserializer2 != null) {
                                try {
                                    deserializer2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                            break;
                        }
                    } catch (Exception e) {
                        throw new KsqlException(String.format("Invalid Command string (line %d): %s (%s)", Integer.valueOf(i), new String((byte[]) pair.getRight(), StandardCharsets.UTF_8), e.getMessage()));
                    } catch (SerializationException | IncompatibleKsqlCommandVersionException e2) {
                        if (!z) {
                            throw new KsqlException(String.format("Incompatible Command string (line %d): %s (%s)", Integer.valueOf(i), new String((byte[]) pair.getLeft(), StandardCharsets.UTF_8), e2.getMessage()));
                        }
                        arrayList2.add((byte[]) pair.getRight());
                        i2++;
                    }
                } finally {
                }
            } catch (Exception e3) {
                throw new KsqlException(String.format("Invalid CommandId string (line %d): %s (%s)", Integer.valueOf(i), new String((byte[]) pair.getLeft(), StandardCharsets.UTF_8), e3.getMessage()));
            }
        }
        if (z) {
            System.out.printf("%s incompatible command(s) skipped from backup file.%n", Integer.valueOf(i2));
            arrayList2.forEach(bArr -> {
                maybeCleanUpQuery(bArr, ksqlConfig);
            });
        }
        return arrayList;
    }

    private static void checkFileExists(File file) throws Exception {
        if (!file.exists()) {
            throw new NoSuchFileException("File does not exist: " + file.getPath());
        }
        if (!file.isFile()) {
            throw new NoSuchFileException("Invalid file: " + file.getPath());
        }
        if (!file.canRead()) {
            throw new Exception("You don't have Read permissions on file: " + file.getPath());
        }
    }

    private static void resetTimer() {
        timer = System.currentTimeMillis();
    }

    private static long currentTimer() {
        return System.currentTimeMillis() - timer;
    }

    private static boolean promptQuestion() {
        System.out.println("Restoring the command topic will DELETE your actual metadata.");
        System.out.print("Continue [yes or no] (default: no)? ");
        return "yes".equalsIgnoreCase(System.console().readLine());
    }

    public static void main(String[] strArr) throws Exception {
        mainInternal(strArr, new JavaSystemExit());
    }

    @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH"})
    public static void mainInternal(String[] strArr, SystemExit systemExit) throws Exception {
        RestoreOptions parse = RestoreOptions.parse(strArr);
        if (parse == null) {
            systemExit.exit(1);
        }
        File configFile = parse.getConfigFile();
        File backupFile = parse.getBackupFile();
        try {
            checkFileExists(configFile);
            checkFileExists(backupFile);
        } catch (Exception e) {
            System.err.println(e.getMessage());
            systemExit.exit(2);
        }
        KsqlConfig loadServerConfig = loadServerConfig(configFile);
        KsqlRestoreCommandTopic ksqlRestoreCommandTopic = new KsqlRestoreCommandTopic(loadServerConfig);
        if (!parse.isAutomaticYes() && !promptQuestion()) {
            systemExit.exit(0);
        }
        System.out.println("Loading backup file ...");
        resetTimer();
        List<Pair<byte[], byte[]>> list = null;
        try {
            list = loadBackup(backupFile, parse, loadServerConfig);
        } catch (Exception e2) {
            System.err.printf("Failed loading backup file.%nError = %s%n", e2.getMessage());
            for (StackTraceElement stackTraceElement : e2.getStackTrace()) {
                System.err.printf("%s%n", stackTraceElement.toString());
            }
            systemExit.exit(1);
        }
        System.out.printf("Backup (%d records) loaded in memory in %s ms.%n", Integer.valueOf(list.size()), Long.valueOf(currentTimer()));
        System.out.println();
        System.out.println("Restoring command topic ...");
        resetTimer();
        try {
            ksqlRestoreCommandTopic.restore(list);
        } catch (Exception e3) {
            System.err.printf("Failed restoring command topic.%nError = %s%n", e3.getMessage());
            systemExit.exit(1);
        }
        System.out.printf("Restore process completed in %d ms.%n", Long.valueOf(currentTimer()));
        System.out.println();
        System.out.println("You need to restart the ksqlDB server to re-load the command topic.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KafkaProducer<byte[], byte[]> transactionalProducer(KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap(ksqlConfig.getProducerClientConfigProps());
        hashMap.put("transactional.id", ksqlConfig.getString("ksql.service.id"));
        hashMap.put("acks", "all");
        hashMap.putAll(ksqlConfig.originalsWithPrefix(KsqlRestConfig.COMMAND_CONSUMER_PREFIX));
        return new KafkaProducer<>(hashMap, BYTES_SERIALIZER, BYTES_SERIALIZER);
    }

    KsqlRestoreCommandTopic(KsqlConfig ksqlConfig) {
        this(ksqlConfig, ReservedInternalTopics.commandTopic(ksqlConfig), new KafkaTopicClientImpl(() -> {
            return createAdminClient(ksqlConfig);
        }), () -> {
            return transactionalProducer(ksqlConfig);
        });
    }

    @VisibleForTesting
    KsqlRestoreCommandTopic(KsqlConfig ksqlConfig, String str, KafkaTopicClient kafkaTopicClient, Supplier<Producer<byte[], byte[]>> supplier) {
        this.serverConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "serverConfig");
        this.commandTopicName = (String) Objects.requireNonNull(str, "commandTopicName");
        this.topicClient = (KafkaTopicClient) Objects.requireNonNull(kafkaTopicClient, "topicClient");
        this.kafkaProducerSupplier = (Supplier) Objects.requireNonNull(supplier, "kafkaProducerSupplier");
    }

    public void restore(List<Pair<byte[], byte[]>> list) {
        deleteCommandTopicIfExists();
        KsqlInternalTopicUtils.ensureTopic(this.commandTopicName, this.serverConfig, this.topicClient);
        restoreCommandTopic(list);
    }

    private void deleteCommandTopicIfExists() {
        if (this.topicClient.isTopicExists(this.commandTopicName)) {
            this.topicClient.deleteTopics(Collections.singletonList(this.commandTopicName));
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    }

    private void restoreCommandTopic(List<Pair<byte[], byte[]>> list) {
        Producer<byte[], byte[]> createTransactionalProducer = createTransactionalProducer();
        for (int i = 0; i < list.size(); i++) {
            try {
                Pair<byte[], byte[]> pair = list.get(i);
                try {
                    try {
                        createTransactionalProducer.beginTransaction();
                        enqueueCommand(createTransactionalProducer, (byte[]) pair.getLeft(), (byte[]) pair.getRight());
                        createTransactionalProducer.commitTransaction();
                    } catch (Exception e) {
                        createTransactionalProducer.abortTransaction();
                        throw new KsqlException(String.format("Failed restoring command (line %d): %s", Integer.valueOf(i + 1), new String((byte[]) list.get(i).getLeft(), StandardCharsets.UTF_8)), e);
                    }
                } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e2) {
                    throw new KsqlException(String.format("Failed restoring command (line %d): %s", Integer.valueOf(i + 1), new String((byte[]) list.get(i).getLeft(), StandardCharsets.UTF_8)), e2);
                } catch (InterruptedException e3) {
                    createTransactionalProducer.abortTransaction();
                    throw new KsqlException("Restore process was interrupted.", e3);
                }
            } catch (Throwable th) {
                if (createTransactionalProducer != null) {
                    try {
                        createTransactionalProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (createTransactionalProducer != null) {
            createTransactionalProducer.close();
        }
    }

    private void enqueueCommand(Producer<byte[], byte[]> producer, byte[] bArr, byte[] bArr2) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord(this.commandTopicName, 0, bArr, bArr2)).get();
    }

    private Producer<byte[], byte[]> createTransactionalProducer() {
        try {
            Producer<byte[], byte[]> producer = this.kafkaProducerSupplier.get();
            producer.initTransactions();
            return producer;
        } catch (Exception e) {
            throw new KsqlException("Failed to initialize topic transactions.", e);
        } catch (TimeoutException e2) {
            throw new KsqlException(new DefaultErrorMessages().transactionInitTimeoutErrorMessage(e2), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void maybeCleanUpQuery(byte[] bArr, KsqlConfig ksqlConfig) {
        boolean z = false;
        HashMap hashMap = new HashMap(ksqlConfig.getKsqlStreamConfigProps());
        boolean z2 = false;
        String str = "";
        JSONObject jSONObject = new JSONObject(new String(bArr, StandardCharsets.UTF_8));
        if (hasKey(jSONObject, "plan") && !jSONObject.isNull("plan")) {
            JSONObject jSONObject2 = jSONObject.getJSONObject("plan");
            if (hasKey(jSONObject2, "queryPlan") && !jSONObject2.isNull("queryPlan")) {
                JSONObject jSONObject3 = jSONObject2.getJSONObject("queryPlan");
                str = jSONObject3.getString("queryId");
                if (hasKey(jSONObject3, "runtimeId") && !jSONObject3.isNull("runtimeId") && ((Optional) jSONObject3.get("runtimeId")).isPresent()) {
                    hashMap.put("application.id", ((Optional) jSONObject3.get("runtimeId")).get());
                    z2 = true;
                } else {
                    hashMap.put("application.id", QueryApplicationId.build(ksqlConfig, true, new QueryId(str)));
                }
                z = true;
            }
        }
        if (z) {
            StreamsConfig streamsConfig = new StreamsConfig(hashMap);
            String string = z2 ? streamsConfig.getString("application.id") : QueryApplicationId.buildInternalTopicPrefix(ksqlConfig, z2) + str;
            try {
                Admin admin = new DefaultKafkaClientSupplier().getAdmin(ksqlConfig.getKsqlAdminClientConfigProps());
                new KafkaTopicClientImpl(() -> {
                    return admin;
                }).deleteInternalTopics(string);
                new StateDirectory(streamsConfig, Time.SYSTEM, true, ksqlConfig.getBoolean("ksql.runtime.feature.shared.enabled").booleanValue()).clean();
                System.out.printf("Cleaned up internal state store and internal topics for query %s%n", string);
            } catch (Exception e) {
                System.out.printf("Failed to clean up query %s %n", string);
            }
        }
    }

    private static boolean hasKey(JSONObject jSONObject, String str) {
        return jSONObject != null && jSONObject.has(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Admin createAdminClient(KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap(ksqlConfig.getKsqlAdminClientConfigProps());
        hashMap.putAll(ksqlConfig.originalsWithPrefix(KsqlRestConfig.COMMAND_CONSUMER_PREFIX));
        return new DefaultKafkaClientSupplier().getAdmin(hashMap);
    }
}
