package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/CommandTopicBackupImpl.class */
public class CommandTopicBackupImpl implements CommandTopicBackup {
    private static final Logger LOG = LoggerFactory.getLogger(CommandTopicBackupImpl.class);
    private static final String PREFIX = "backup_";
    private final File backupLocation;
    private final String topicName;
    private final Supplier<Long> ticker;
    private BackupReplayFile replayFile;
    private List<Pair<byte[], byte[]>> latestReplay;
    private int latestReplayIdx;
    private boolean corruptionDetected;
    private KafkaTopicClient kafkaTopicClient;

    public CommandTopicBackupImpl(String str, String str2, KafkaTopicClient kafkaTopicClient) {
        this(str, str2, System::currentTimeMillis, kafkaTopicClient);
    }

    @VisibleForTesting
    CommandTopicBackupImpl(String str, String str2, Supplier<Long> supplier, KafkaTopicClient kafkaTopicClient) {
        this.backupLocation = new File((String) Objects.requireNonNull(str, "location"));
        this.topicName = (String) Objects.requireNonNull(str2, "topicName");
        this.ticker = (Supplier) Objects.requireNonNull(supplier, "ticker");
        this.kafkaTopicClient = (KafkaTopicClient) Objects.requireNonNull(kafkaTopicClient, "kafkaTopicClient");
        ensureDirectoryExists(this.backupLocation);
    }

    @Override // io.confluent.ksql.rest.server.CommandTopicBackup
    public void initialize() {
        this.replayFile = openOrCreateReplayFile();
        try {
            this.latestReplay = this.replayFile.readRecords();
        } catch (IOException e) {
            LOG.warn("Failed to read the latest backup from {}. Continue with a new file. Error = {}", this.replayFile.getPath(), e.getMessage());
            try {
                this.replayFile = newReplayFile();
                this.latestReplay = Collections.emptyList();
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        }
        this.latestReplayIdx = 0;
        this.corruptionDetected = false;
        if (!this.kafkaTopicClient.isTopicExists(this.topicName) && this.latestReplay.size() > 0) {
            this.corruptionDetected = true;
        }
        LOG.info("Command topic will be backup on file: {}", this.replayFile.getPath());
    }

    @Override // io.confluent.ksql.rest.server.CommandTopicBackup
    public void close() {
        this.replayFile.close();
    }

    @VisibleForTesting
    BackupReplayFile getReplayFile() {
        return this.replayFile;
    }

    private boolean isRestoring() {
        return this.latestReplayIdx < this.latestReplay.size();
    }

    private boolean isRecordInLatestReplay(ConsumerRecord<byte[], byte[]> consumerRecord) {
        Pair<byte[], byte[]> pair = this.latestReplay.get(this.latestReplayIdx);
        if (!Arrays.equals((byte[]) consumerRecord.key(), (byte[]) pair.getLeft()) || !Arrays.equals((byte[]) consumerRecord.value(), (byte[]) pair.getRight())) {
            return false;
        }
        this.latestReplayIdx++;
        return true;
    }

    @Override // io.confluent.ksql.rest.server.CommandTopicBackup
    public void writeRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        if (this.corruptionDetected) {
            throw new CommandTopicCorruptionException("Failed to write record due to out of sync command topic and backup file: " + consumerRecord);
        }
        if (consumerRecord.key() == null || consumerRecord.value() == null) {
            LOG.warn(String.format("Can't backup a command topic record with a null key/value: key=%s, value=%s, partition=%d, offset=%d", consumerRecord.key(), consumerRecord.value(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())));
            return;
        }
        if (isRestoring()) {
            if (isRecordInLatestReplay(consumerRecord)) {
                return;
            }
            this.corruptionDetected = true;
            throw new CommandTopicCorruptionException("Failed to write record due to out of sync command topic and backup file: " + consumerRecord);
        }
        if (this.latestReplay.size() > 0) {
            this.latestReplay.clear();
        }
        try {
            this.replayFile.write(consumerRecord);
        } catch (Exception e) {
            LOG.warn("Failed to write to file {}. The command topic backup is not complete. Make sure the file exists and has permissions to write. KSQL must be restarted afterwards to complete the backup process. Error = {}", this.replayFile.getPath(), e.getMessage());
        }
    }

    @Override // io.confluent.ksql.rest.server.CommandTopicBackup
    public boolean commandTopicCorruption() {
        return this.corruptionDetected;
    }

    @VisibleForTesting
    BackupReplayFile openOrCreateReplayFile() {
        try {
            Optional<BackupReplayFile> latestReplayFile = latestReplayFile();
            return latestReplayFile.isPresent() ? latestReplayFile.get() : newReplayFile();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private BackupReplayFile newReplayFile() throws IOException {
        return BackupReplayFile.writable(Paths.get(this.backupLocation.getAbsolutePath(), String.format("%s%s_%s", PREFIX, this.topicName, this.ticker.get())).toFile());
    }

    private Optional<BackupReplayFile> latestReplayFile() throws IOException {
        String format = String.format("%s%s_", PREFIX, this.topicName);
        File[] listFiles = this.backupLocation.listFiles((file, str) -> {
            return str.toLowerCase().startsWith(format);
        });
        File file2 = null;
        if (listFiles != null) {
            long j = 0;
            for (File file3 : listFiles) {
                String substring = file3.getName().substring(format.length());
                try {
                    long parseLong = Long.parseLong(substring);
                    if (parseLong > j) {
                        j = parseLong;
                        file2 = file3;
                    }
                } catch (NumberFormatException e) {
                    LOG.warn("Invalid timestamp '{}' found in backup replay file (file ignored): {}", substring, file3.getName());
                }
            }
        }
        return file2 != null ? Optional.of(BackupReplayFile.writable(file2)) : Optional.empty();
    }

    private static void ensureDirectoryExists(File file) {
        if (!file.exists()) {
            if (!file.mkdirs()) {
                throw new KsqlServerException("Couldn't create the backups directory: " + file.getPath() + "\n Make sure the directory exists and is readable/writable for KSQL server \n or its parent directory is readable/writable by KSQL server\n or change it to a readable/writable directory by setting 'ksql.metastore.backup.location' config in the properties file.");
            }
            try {
                Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("rwx------"));
            } catch (IOException e) {
                throw new KsqlServerException(String.format("Couldn't set POSIX permissions on the backups directory: %s. Error = %s", file.getPath(), e.getMessage()));
            }
        }
        if (!file.isDirectory()) {
            throw new KsqlServerException(file.getPath() + " is not a directory.\n Make sure the directory exists and is readable/writable for KSQL server \n or its parent directory is readable/writable by KSQL server\n or change it to a readable/writable directory by setting 'ksql.metastore.backup.location' config in the properties file.");
        }
        if (!file.canWrite() || !file.canRead() || !file.canExecute()) {
            throw new KsqlServerException("The backups directory is not readable/writable for KSQL server: " + file.getPath() + "\n Make sure the directory exists and is readable/writable for KSQL server \n or change it to a readable/writable directory by setting 'ksql.metastore.backup.location' config in the properties file.");
        }
    }
}
