package io.confluent.ksql.rest.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/LocalCommands.class */
public class LocalCommands implements Closeable {
    static final String LOCAL_COMMANDS_FILE_SUFFIX = ".cmds";
    static final String LOCAL_COMMANDS_PROCESSED_SUFFIX = ".processed";
    private final File directory;
    private final KsqlEngine ksqlEngine;
    private final LocalCommandsFile currentLocalCommands;
    private static final Logger LOG = LogManager.getLogger(LocalCommands.class);
    private static final Random RANDOM = new Random();

    LocalCommands(File file, KsqlEngine ksqlEngine, LocalCommandsFile localCommandsFile) {
        this.directory = file;
        this.ksqlEngine = ksqlEngine;
        this.currentLocalCommands = localCommandsFile;
    }

    public File getCurrentLocalCommandsFile() {
        return this.currentLocalCommands.getFile();
    }

    public void processLocalCommandFiles(ServiceContext serviceContext) {
        File[] listFiles = this.directory.listFiles((file, str) -> {
            return str.endsWith(LOCAL_COMMANDS_FILE_SUFFIX);
        });
        if (listFiles == null) {
            throw new KsqlServerException("Bad local commands directory " + this.directory.getAbsolutePath() + ". Please check your configuration for ksql.local.commands.location");
        }
        for (File file2 : listFiles) {
            if (!file2.equals(this.currentLocalCommands.getFile())) {
                try {
                    LocalCommandsFile createReadonly = LocalCommandsFile.createReadonly(file2);
                    try {
                        cleanUpTransientQueryState(createReadonly.readRecords(), serviceContext);
                        markFileAsProcessed(file2);
                        if (createReadonly != null) {
                            createReadonly.close();
                        }
                    } catch (Throwable th) {
                        if (createReadonly != null) {
                            try {
                                createReadonly.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e) {
                    LOG.error("Error processing local commands " + file2.getAbsolutePath() + ". There may be orphaned transient topics or abandoned state stores.", e);
                }
            }
        }
    }

    public void write(TransientQueryMetadata transientQueryMetadata) {
        try {
            this.currentLocalCommands.write(new TransientQueryLocalCommand(transientQueryMetadata.getQueryApplicationId()));
        } catch (IOException e) {
            LOG.error("Failed to write local command for transient query:" + transientQueryMetadata.getQueryApplicationId(), e);
        }
    }

    @SuppressFBWarnings({"DMI_RANDOM_USED_ONLY_ONCE"})
    public static LocalCommands open(KsqlEngine ksqlEngine, File file) {
        if (!file.exists()) {
            if (!file.mkdirs()) {
                throw new KsqlServerException("Couldn't create the local commands directory: " + file.getPath() + "\n Make sure the directory exists and is readable/writable for KSQL server \n or its parent directory is readable/writable by KSQL server\n or change it to a readable/writable directory by setting 'ksql.local.commands.location' config in the properties file.");
            }
            try {
                Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("rwx------"));
            } catch (IOException e) {
                throw new KsqlServerException(String.format("Couldn't set POSIX permissions on the backups directory: %s. Error = %s", file.getPath(), e.getMessage()));
            }
        }
        if (!file.isDirectory()) {
            throw new KsqlServerException(file.getPath() + " is not a directory.\n Make sure the directory exists and is readable/writable for KSQL server \n or its parent directory is readable/writable by KSQL server\n or change it to a readable/writable directory by setting 'ksql.local.commands.location' config in the properties file.");
        }
        if (file.canWrite() && file.canRead() && file.canExecute()) {
            return new LocalCommands(file, ksqlEngine, LocalCommandsFile.createWriteable(new File(file, String.format("local_commands_%d_%s%s", Long.valueOf(System.currentTimeMillis()), Integer.toHexString(RANDOM.nextInt()), LOCAL_COMMANDS_FILE_SUFFIX))));
        }
        throw new KsqlServerException("The local commands directory is not readable/writable for KSQL server: " + file.getPath() + "\n Make sure the directory exists and is readable/writable for KSQL server \n or change it to a readable/writable directory by setting 'ksql.local.commands.location' config in the properties file.");
    }

    private void markFileAsProcessed(File file) {
        if (!file.renameTo(new File(file.getParentFile(), file.getName() + ".processed"))) {
            throw new KsqlException("Couldn't rename file " + file.getAbsolutePath());
        }
    }

    private void cleanUpTransientQueryState(List<LocalCommand> list, ServiceContext serviceContext) {
        Stream<LocalCommand> filter = list.stream().filter(localCommand -> {
            return localCommand.getType().equals(TransientQueryLocalCommand.TYPE);
        });
        Class<TransientQueryLocalCommand> cls = TransientQueryLocalCommand.class;
        Objects.requireNonNull(TransientQueryLocalCommand.class);
        Set set = (Set) filter.map((v1) -> {
            return r1.cast(v1);
        }).map((v0) -> {
            return v0.getQueryApplicationId();
        }).collect(Collectors.toSet());
        if (set.size() > 0) {
            this.ksqlEngine.cleanupOrphanedInternalTopics(serviceContext, set);
            this.ksqlEngine.populateTransientQueryCleanupServiceWithOldCommands(set);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.currentLocalCommands.close();
    }
}
