package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.KeyFormatUtils;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/KsqlServerMain.class */
public class KsqlServerMain {
    private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class);
    private final Executor shutdownHandler;
    private final Executable executable;

    public static void main(String[] strArr) {
        try {
            ServerOptions parse = ServerOptions.parse(strArr);
            if (parse == null) {
                return;
            }
            Map<String, String> applyOverrides = PropertiesUtil.applyOverrides(PropertiesUtil.loadProperties(parse.getPropertiesFile()), System.getProperties());
            String orDefault = applyOverrides.getOrDefault(KsqlRestConfig.INSTALL_DIR_CONFIG, KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_DEFAULT);
            KsqlConfig ksqlConfig = new KsqlConfig(applyOverrides);
            validateConfig(ksqlConfig);
            new KsqlServerMain(createExecutable(applyOverrides, parse.getQueriesFile(applyOverrides), orDefault, ksqlConfig), runnable -> {
                Runtime.getRuntime().addShutdownHook(new Thread(runnable));
            }).tryStartApp();
        } catch (Exception e) {
            log.error("Failed to start KSQL", e);
            System.exit(-1);
        }
    }

    KsqlServerMain(Executable executable, Executor executor) {
        this.executable = (Executable) Objects.requireNonNull(executable, "executable");
        this.shutdownHandler = (Executor) Objects.requireNonNull(executor, "shutdownHandler");
    }

    void tryStartApp() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.shutdownHandler.execute(() -> {
            this.executable.notifyTerminated();
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        try {
            try {
                try {
                    log.info("Starting server");
                    this.executable.startAsync();
                    log.info("Server up and running");
                    this.executable.awaitTerminated();
                    log.info("Server shutting down");
                    this.executable.shutdown();
                } finally {
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                log.error("Unhandled exception in server startup", th);
                throw th;
            }
        } catch (Throwable th2) {
            log.info("Server shutting down");
            this.executable.shutdown();
            throw th2;
        }
    }

    private static void validateConfig(KsqlConfig ksqlConfig) {
        validateStateDir(ksqlConfig);
        validateDefaultTopicFormats(ksqlConfig);
    }

    private static void validateStateDir(KsqlConfig ksqlConfig) {
        enforceStreamStateDirAvailability(new File(ksqlConfig.getKsqlStreamConfigProps().getOrDefault("state.dir", StreamsConfig.configDef().defaultValues().get("state.dir")).toString()));
    }

    @VisibleForTesting
    static void validateDefaultTopicFormats(KsqlConfig ksqlConfig) {
        validateTopicFormat(ksqlConfig, "ksql.persistence.default.format.key", "key");
        validateTopicFormat(ksqlConfig, "ksql.persistence.default.format.value", "value");
    }

    private static void validateTopicFormat(KsqlConfig ksqlConfig, String str, String str2) {
        String string = ksqlConfig.getString(str);
        if (string == null) {
            return;
        }
        try {
            Format fromName = FormatFactory.fromName(string);
            if (str2.equals("key") && !KeyFormatUtils.isSupportedKeyFormat(ksqlConfig, fromName)) {
                throw new KsqlException("Invalid value for config '" + str + "': The supplied format is not currently supported as a key format. Format: '" + string + "'.");
            }
        } catch (KsqlException e) {
            throw new KsqlException("Invalid value for config '" + str + "': " + string, e);
        }
    }

    private static Executable createExecutable(Map<String, String> map, Optional<String> optional, String str, KsqlConfig ksqlConfig) throws IOException {
        if (optional.isPresent()) {
            return StandaloneExecutorFactory.create(map, optional.get(), str);
        }
        KsqlRestApplication buildApplication = KsqlRestApplication.buildApplication(new KsqlRestConfig(map));
        String string = ksqlConfig.getString("ksql.connect.worker.config");
        return string.isEmpty() ? buildApplication : MultiExecutable.of(ConnectExecutable.of(string), buildApplication);
    }

    @VisibleForTesting
    static void enforceStreamStateDirAvailability(File file) {
        if (!file.exists() && !file.mkdirs()) {
            throw new KsqlServerException("Could not create the kafka streams state directory: " + file.getPath() + "\n Make sure the directory exists and is writable for KSQL server \n or its parent directory is writable by KSQL server\n or change it to a writable directory by setting 'ksql.streams.state.dir' config in the properties file.");
        }
        if (!file.isDirectory()) {
            throw new KsqlServerException(file.getPath() + " is not a directory.\n Make sure the directory exists and is writable for KSQL server \n or its parent directory is writable by KSQL server\n or change it to a writable directory by setting 'ksql.streams.state.dir' config in the properties file.");
        }
        if (!file.canWrite() || !file.canExecute()) {
            throw new KsqlServerException("The kafka streams state directory is not writable for KSQL server: " + file.getPath() + "\n Make sure the directory exists and is writable for KSQL server \n or change it to a writable directory by setting 'ksql.streams.state.dir' config in the properties file.");
        }
    }
}
