package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.fips.FipsValidator;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/KsqlServerMain.class */
public class KsqlServerMain {
    private static final Logger log = LogManager.getLogger(KsqlServerMain.class);
    private final Executor shutdownHandler;
    private final Executable preconditionChecker;
    private final Supplier<Executable> executable;

    public static void main(String[] strArr) {
        try {
            ServerOptions parse = ServerOptions.parse(strArr);
            if (parse == null) {
                return;
            }
            Supplier supplier = () -> {
                return PropertiesUtil.applyOverrides(PropertiesUtil.loadProperties(parse.getPropertiesFile()), System.getProperties());
            };
            Map<String, String> map = (Map) supplier.get();
            String orDefault = map.getOrDefault(KsqlRestConfig.INSTALL_DIR_CONFIG, "");
            KsqlConfig ksqlConfig = new KsqlConfig(map);
            validateConfig(ksqlConfig, new KsqlRestConfig(map));
            QueryLogger.configure(ksqlConfig);
            Optional<String> queriesFile = parse.getQueriesFile(map);
            MetricCollectors metricCollectors = new MetricCollectors();
            ServerState serverState = new ServerState();
            FunctionRegistry loadFunctions = loadFunctions(supplier, metricCollectors);
            new KsqlServerMain(new PreconditionChecker(supplier, serverState), () -> {
                return createExecutable(supplier, serverState, queriesFile, orDefault, metricCollectors, loadFunctions);
            }, runnable -> {
                Runtime.getRuntime().addShutdownHook(new Thread(runnable));
            }).tryStartApp();
        } catch (Exception e) {
            log.error("Failed to start KSQL", e);
            System.exit(-1);
        }
    }

    KsqlServerMain(Executable executable, Supplier<Executable> supplier, Executor executor) {
        this.preconditionChecker = (Executable) Objects.requireNonNull(executable, "preconditionChecker");
        this.executable = (Supplier) Objects.requireNonNull(supplier, "executableFactory");
        this.shutdownHandler = (Executor) Objects.requireNonNull(executor, "shutdownHandler");
    }

    void tryStartApp() throws Exception {
        if (runExecutable(this.preconditionChecker)) {
            return;
        }
        runExecutable(this.executable.get());
    }

    private static FunctionRegistry loadFunctions(Supplier<Map<String, String>> supplier, MetricCollectors metricCollectors) {
        KsqlRestConfig ksqlRestConfig = new KsqlRestConfig(supplier.get());
        String string = ksqlRestConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);
        KsqlConfig ksqlConfig = new KsqlConfig(ksqlRestConfig.getKsqlConfigProperties());
        InternalFunctionRegistry internalFunctionRegistry = new InternalFunctionRegistry();
        UserFunctionLoader.newInstance(ksqlConfig, internalFunctionRegistry, string, metricCollectors.getMetrics()).load();
        return internalFunctionRegistry;
    }

    boolean runExecutable(Executable executable) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.shutdownHandler.execute(() -> {
            executable.notifyTerminated();
            try {
                atomicBoolean.set(true);
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        try {
            try {
                try {
                    log.info("Starting server");
                    executable.startAsync();
                    log.info("Server up and running");
                    executable.awaitTerminated();
                    log.info("Server shutting down");
                    executable.shutdown();
                    return atomicBoolean.get();
                } finally {
                    countDownLatch.countDown();
                }
            } catch (Throwable th) {
                log.error("Unhandled exception in server startup", th);
                throw th;
            }
        } catch (Throwable th2) {
            log.info("Server shutting down");
            executable.shutdown();
            throw th2;
        }
    }

    private static void validateConfig(KsqlConfig ksqlConfig, KsqlRestConfig ksqlRestConfig) {
        validateFips(ksqlConfig, ksqlRestConfig);
        validateStateDir(ksqlConfig);
        validateDefaultTopicFormats(ksqlConfig);
    }

    private static void validateStateDir(KsqlConfig ksqlConfig) {
        enforceStreamStateDirAvailability(new File(ksqlConfig.getKsqlStreamConfigProps().getOrDefault("state.dir", StreamsConfig.configDef().defaultValues().get("state.dir")).toString()));
    }

    @VisibleForTesting
    static void validateDefaultTopicFormats(KsqlConfig ksqlConfig) {
        validateTopicFormat(ksqlConfig, "ksql.persistence.default.format.key", "key");
        validateTopicFormat(ksqlConfig, "ksql.persistence.default.format.value", "value");
    }

    @VisibleForTesting
    static void validateFips(KsqlConfig ksqlConfig, KsqlRestConfig ksqlRestConfig) {
        if (ksqlConfig.getBoolean("enable.fips").booleanValue()) {
            FipsValidator buildFipsValidator = ConfluentConfigs.buildFipsValidator();
            validateCipherSuites(buildFipsValidator, ksqlRestConfig);
            validateBroker(buildFipsValidator, ksqlConfig);
            validateSslEndpointAlgo(buildFipsValidator, ksqlRestConfig);
            validateSrUrl(buildFipsValidator, ksqlRestConfig);
            validateListeners(buildFipsValidator, ksqlRestConfig);
            log.info("FIPS mode enabled for ksqlDB!");
        }
    }

    private static void validateCipherSuites(FipsValidator fipsValidator, KsqlRestConfig ksqlRestConfig) {
        HashMap hashMap = new HashMap();
        List list = ksqlRestConfig.getList(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG);
        if (!list.isEmpty()) {
            hashMap.put(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG, list);
        }
        hashMap.put(KsqlRestConfig.SSL_ENABLED_PROTOCOLS_CONFIG, ksqlRestConfig.getList(KsqlRestConfig.SSL_ENABLED_PROTOCOLS_CONFIG));
        try {
            fipsValidator.validateFipsTls(hashMap);
        } catch (Exception e) {
            log.error(e.getMessage());
            throw new SecurityException(e.getMessage());
        }
    }

    private static void validateBroker(FipsValidator fipsValidator, KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap();
        if (!ksqlConfig.originals().containsKey("security.protocol")) {
            log.error("The security protocol ('security.protocol') is not specified.");
            throw new SecurityException("The security protocol ('security.protocol') is not specified.");
        }
        hashMap.put("security.protocol", SecurityProtocol.forName(ksqlConfig.originals().get("security.protocol").toString()));
        try {
            fipsValidator.validateFipsBrokerProtocol(hashMap);
        } catch (Exception e) {
            log.error(e.getMessage());
            throw new SecurityException(e.getMessage());
        }
    }

    private static void validateSslEndpointAlgo(FipsValidator fipsValidator, KsqlRestConfig ksqlRestConfig) {
        if (!ksqlRestConfig.originals().containsKey("ssl.endpoint.identification.algorithm")) {
            log.error("The SSL endpoint identification algorithm ('ssl.endpoint.identification.algorithm') is not specified.");
            throw new SecurityException("The SSL endpoint identification algorithm ('ssl.endpoint.identification.algorithm') is not specified.");
        }
        try {
            fipsValidator.validateRestProtocol(ksqlRestConfig.originals().get("ssl.endpoint.identification.algorithm").toString());
        } catch (Exception e) {
            String str = e.getMessage() + "\nInvalid rest protocol for ssl.endpoint.identification.algorithm";
            log.error(str);
            throw new SecurityException(str);
        }
    }

    private static void validateSrUrl(FipsValidator fipsValidator, KsqlRestConfig ksqlRestConfig) {
        if (ksqlRestConfig.originals().containsKey("ksql.schema.registry.url")) {
            try {
                fipsValidator.validateRestProtocol(determineProtocol(ksqlRestConfig.originals().get("ksql.schema.registry.url").toString()));
            } catch (Exception e) {
                String str = e.getMessage() + "\nInvalid rest protocol for ksql.schema.registry.url";
                log.error(str);
                throw new SecurityException(str);
            }
        }
    }

    private static void validateListeners(FipsValidator fipsValidator, KsqlRestConfig ksqlRestConfig) {
        try {
            Iterator it = ksqlRestConfig.getList(KsqlRestConfig.LISTENERS_CONFIG).iterator();
            while (it.hasNext()) {
                fipsValidator.validateRestProtocol(determineProtocol((String) it.next()));
            }
            Iterator it2 = ksqlRestConfig.getList(KsqlRestConfig.PROXY_PROTOCOL_LISTENERS_CONFIG).iterator();
            while (it2.hasNext()) {
                fipsValidator.validateRestProtocol(determineProtocol((String) it2.next()));
            }
            String string = ksqlRestConfig.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG);
            if (!Strings.isNullOrEmpty(string)) {
                fipsValidator.validateRestProtocol(determineProtocol(string));
            }
            String string2 = ksqlRestConfig.getString(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG);
            if (!Strings.isNullOrEmpty(string2)) {
                fipsValidator.validateRestProtocol(determineProtocol(string2));
            }
        } catch (Exception e) {
            String str = e.getMessage() + "\nInvalid rest protocol for listeners.\nMake sure that all listeners, listeners.proxy.protocol, ksql.advertised.listener, and ksql.internal.listener follow FIPS 140-2.";
            log.error(str);
            throw new SecurityException(str);
        }
    }

    private static String determineProtocol(String str) {
        return (str.isEmpty() || str.contains(String.format("%s://", "https"))) ? "https" : "http";
    }

    private static void validateTopicFormat(KsqlConfig ksqlConfig, String str, String str2) {
        String string = ksqlConfig.getString(str);
        if (string == null) {
            return;
        }
        try {
            FormatFactory.fromName(string);
        } catch (KsqlException e) {
            throw new KsqlException("Invalid value for config '" + str + "': " + string, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Executable createExecutable(Supplier<Map<String, String>> supplier, ServerState serverState, Optional<String> optional, String str, MetricCollectors metricCollectors, FunctionRegistry functionRegistry) {
        Map<String, String> map = supplier.get();
        KsqlConfig ksqlConfig = new KsqlConfig(map);
        if (optional.isPresent()) {
            return StandaloneExecutorFactory.create(map, optional.get(), str, metricCollectors);
        }
        KsqlRestApplication buildApplication = KsqlRestApplication.buildApplication(new KsqlRestConfig(map), serverState, metricCollectors, functionRegistry, Instant.now());
        String string = ksqlConfig.getString("ksql.connect.worker.config");
        if (string.isEmpty()) {
            return buildApplication;
        }
        try {
            return MultiExecutable.of(ConnectExecutable.of(string), buildApplication);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    static void enforceStreamStateDirAvailability(File file) {
        if (!file.exists() && !file.mkdirs()) {
            throw new KsqlServerException("Could not create the kafka streams state directory: " + file.getPath() + "\n Make sure the directory exists and is writable for KSQL server \n or its parent directory is writable by KSQL server\n or change it to a writable directory by setting 'ksql.streams.state.dir' config in the properties file.");
        }
        if (!file.isDirectory()) {
            throw new KsqlServerException(file.getPath() + " is not a directory.\n Make sure the directory exists and is writable for KSQL server \n or its parent directory is writable by KSQL server\n or change it to a writable directory by setting 'ksql.streams.state.dir' config in the properties file.");
        }
        if (!file.canWrite() || !file.canExecute()) {
            throw new KsqlServerException("The kafka streams state directory is not writable for KSQL server: " + file.getPath() + "\n Make sure the directory exists and is writable for KSQL server \n or change it to a writable directory by setting 'ksql.streams.state.dir' config in the properties file.");
        }
    }
}
