package io.confluent.ksql.rest.server;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.properties.PropertyOverrider;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/StandaloneExecutor.class */
public class StandaloneExecutor implements Executable {
    private static final Logger log = LoggerFactory.getLogger(StandaloneExecutor.class);
    private final ServiceContext serviceContext;
    private final ProcessingLogConfig processingLogConfig;
    private final KsqlConfig ksqlConfig;
    private final KsqlEngine ksqlEngine;
    private final String queriesFile;
    private final UserFunctionLoader udfLoader;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final boolean failOnNoQueries;
    private final VersionCheckerAgent versionChecker;
    private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/StandaloneExecutor$StatementExecutor.class */
    public static final class StatementExecutor {
        private static final Map<Class<? extends Statement>, Handler<Statement>> HANDLERS = ImmutableMap.builder().put(SetProperty.class, createHandler((v0, v1) -> {
            v0.handleSetProperty(v1);
        }, SetProperty.class, "SET")).put(UnsetProperty.class, createHandler((v0, v1) -> {
            v0.handleUnsetProperty(v1);
        }, UnsetProperty.class, "UNSET")).put(CreateStream.class, createHandler((v0, v1) -> {
            v0.handleExecutableDdl(v1);
        }, CreateStream.class, "CREATE STREAM")).put(CreateTable.class, createHandler((v0, v1) -> {
            v0.handleExecutableDdl(v1);
        }, CreateTable.class, "CREATE TABLE")).put(RegisterType.class, createHandler((v0, v1) -> {
            v0.handleExecutableDdl(v1);
        }, RegisterType.class, "REGISTER TYPE")).put(CreateStreamAsSelect.class, createHandler((v0, v1) -> {
            v0.handlePersistentQuery(v1);
        }, CreateStreamAsSelect.class, "CREAETE STREAM AS SELECT")).put(CreateTableAsSelect.class, createHandler((v0, v1) -> {
            v0.handlePersistentQuery(v1);
        }, CreateTableAsSelect.class, "CREATE TABLE AS SELECT")).put(InsertInto.class, createHandler((v0, v1) -> {
            v0.handlePersistentQuery(v1);
        }, InsertInto.class, "INSERT INTO")).build();
        private static final String SUPPORTED_STATEMENTS = generateSupportedMessage();
        private final ServiceContext serviceContext;
        private final KsqlExecutionContext executionContext;
        private final Map<String, Object> configOverrides;
        private final KsqlConfig ksqlConfig;
        private final Injector injector;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/confluent/ksql/rest/server/StandaloneExecutor$StatementExecutor$Handler.class */
        public interface Handler<T extends Statement> {
            void handle(StatementExecutor statementExecutor, ConfiguredStatement<T> configuredStatement);

            String getName();
        }

        private StatementExecutor(ServiceContext serviceContext, KsqlExecutionContext ksqlExecutionContext, Injector injector, KsqlConfig ksqlConfig) {
            this.configOverrides = new HashMap();
            this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
            this.executionContext = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "executionContext");
            this.injector = (Injector) Objects.requireNonNull(injector, "injector");
            this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        }

        boolean execute(KsqlParser.ParsedStatement parsedStatement) {
            ConfiguredStatement<Statement> prepare = prepare(parsedStatement);
            throwOnMissingSchema(prepare);
            Handler<Statement> handler = HANDLERS.get(prepare.getStatement().getClass());
            if (handler == null) {
                throw new KsqlStatementException("Unsupported statement. Only the following statements are supporting in standalone mode:" + System.lineSeparator() + SUPPORTED_STATEMENTS, parsedStatement.getStatementText());
            }
            handler.handle(this, prepare);
            return prepare.getStatement() instanceof QueryContainer;
        }

        private ConfiguredStatement<?> prepare(KsqlParser.ParsedStatement parsedStatement) {
            return this.injector.inject(ConfiguredStatement.of(this.executionContext.prepare(parsedStatement), this.configOverrides, this.ksqlConfig));
        }

        private static void throwOnMissingSchema(ConfiguredStatement<?> configuredStatement) {
            if ((configuredStatement.getStatement() instanceof CreateSource) && Iterables.isEmpty(configuredStatement.getStatement().getElements())) {
                throw new KsqlStatementException("statement does not define the schema and the supplied format does not support schema inference", configuredStatement.getStatementText());
            }
        }

        private void handleSetProperty(ConfiguredStatement<SetProperty> configuredStatement) {
            PropertyOverrider.set(configuredStatement, this.configOverrides);
        }

        private void handleUnsetProperty(ConfiguredStatement<UnsetProperty> configuredStatement) {
            PropertyOverrider.unset(configuredStatement, this.configOverrides);
        }

        private void handleExecutableDdl(ConfiguredStatement<?> configuredStatement) {
            this.executionContext.execute(this.serviceContext, configuredStatement);
        }

        private void handlePersistentQuery(ConfiguredStatement<?> configuredStatement) {
            this.executionContext.execute(this.serviceContext, configuredStatement).getQuery().filter(queryMetadata -> {
                return queryMetadata instanceof PersistentQueryMetadata;
            }).orElseThrow(() -> {
                return new KsqlStatementException("Could not build the query", configuredStatement.getStatementText());
            });
        }

        private static String generateSupportedMessage() {
            return (String) HANDLERS.values().stream().map((v0) -> {
                return v0.getName();
            }).sorted().collect(Collectors.joining(System.lineSeparator()));
        }

        private static <T extends Statement> Handler<Statement> createHandler(final BiConsumer<StatementExecutor, ConfiguredStatement<T>> biConsumer, Class<T> cls, final String str) {
            return new Handler<Statement>() { // from class: io.confluent.ksql.rest.server.StandaloneExecutor.StatementExecutor.1
                @Override // io.confluent.ksql.rest.server.StandaloneExecutor.StatementExecutor.Handler
                public void handle(StatementExecutor statementExecutor, ConfiguredStatement<Statement> configuredStatement) {
                    biConsumer.accept(statementExecutor, configuredStatement);
                }

                @Override // io.confluent.ksql.rest.server.StandaloneExecutor.StatementExecutor.Handler
                public String getName() {
                    return str;
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandaloneExecutor(ServiceContext serviceContext, ProcessingLogConfig processingLogConfig, KsqlConfig ksqlConfig, KsqlEngine ksqlEngine, String str, UserFunctionLoader userFunctionLoader, boolean z, VersionCheckerAgent versionCheckerAgent, BiFunction<KsqlExecutionContext, ServiceContext, Injector> biFunction) {
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.processingLogConfig = (ProcessingLogConfig) Objects.requireNonNull(processingLogConfig, "processingLogConfig");
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.queriesFile = (String) Objects.requireNonNull(str, "queriesFile");
        this.udfLoader = (UserFunctionLoader) Objects.requireNonNull(userFunctionLoader, "udfLoader");
        this.failOnNoQueries = z;
        this.versionChecker = (VersionCheckerAgent) Objects.requireNonNull(versionCheckerAgent, "versionChecker");
        this.injectorFactory = (BiFunction) Objects.requireNonNull(biFunction, "injectorFactory");
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void startAsync() {
        try {
            this.udfLoader.load();
            ProcessingLogServerUtils.maybeCreateProcessingLogTopic(this.serviceContext.getTopicClient(), this.processingLogConfig, this.ksqlConfig);
            if (this.processingLogConfig.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE).booleanValue()) {
                log.warn("processing log auto-create is enabled, but this is not supported for headless mode.");
            }
            processesQueryFile(readQueriesFile(this.queriesFile));
            showWelcomeMessage();
            Properties properties = new Properties();
            this.ksqlConfig.originals().forEach((str, obj) -> {
                if (Objects.nonNull(obj)) {
                    properties.put(str, obj.toString());
                }
            });
            this.versionChecker.start(KsqlModuleType.SERVER, properties);
        } catch (Exception e) {
            log.error("Failed to start KSQL Server with query file: " + this.queriesFile, e);
            triggerShutdown();
            throw e;
        }
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void triggerShutdown() {
        try {
            this.ksqlEngine.close();
        } catch (Exception e) {
            log.warn("Failed to cleanly shutdown the KSQL Engine", e);
        }
        try {
            this.serviceContext.close();
        } catch (Exception e2) {
            log.warn("Failed to cleanly shutdown services", e2);
        }
        this.shutdownLatch.countDown();
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void awaitTerminated() throws InterruptedException {
        this.shutdownLatch.await();
    }

    private void showWelcomeMessage() {
        if (System.console() == null) {
            return;
        }
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
        WelcomeMsgUtils.displayWelcomeMessage(80, printWriter);
        printWriter.printf("Server %s started with query file %s. Interactive mode is disabled.%n", Version.getVersion(), this.queriesFile);
        printWriter.flush();
    }

    private void processesQueryFile(String str) {
        List<KsqlParser.ParsedStatement> parse = this.ksqlEngine.parse(str);
        validateStatements(parse);
        executeStatements(parse, new StatementExecutor(this.serviceContext, this.ksqlEngine, this.injectorFactory.apply(this.ksqlEngine, this.serviceContext), this.ksqlConfig));
        this.ksqlEngine.getPersistentQueries().forEach((v0) -> {
            v0.start();
        });
    }

    private void validateStatements(List<KsqlParser.ParsedStatement> list) {
        KsqlExecutionContext createSandbox = this.ksqlEngine.createSandbox(this.serviceContext);
        boolean executeStatements = executeStatements(list, new StatementExecutor(createSandbox.getServiceContext(), createSandbox, this.injectorFactory.apply(createSandbox, createSandbox.getServiceContext()), this.ksqlConfig));
        if (this.failOnNoQueries && !executeStatements) {
            throw new KsqlException("The SQL file does not contain any persistent queries. i.e. it contains no 'INSERT INTO', 'CREATE TABLE x AS SELECT' or 'CREATE STREAM x AS SELECT' style statements.");
        }
    }

    private static boolean executeStatements(List<KsqlParser.ParsedStatement> list, StatementExecutor statementExecutor) {
        boolean z = false;
        Iterator<KsqlParser.ParsedStatement> it = list.iterator();
        while (it.hasNext()) {
            z |= statementExecutor.execute(it.next());
        }
        return z;
    }

    private static String readQueriesFile(String str) {
        try {
            return new String(Files.readAllBytes(Paths.get(str, new String[0])), StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new KsqlException(String.format("Could not read the query file: %s. Details: %s", str, e.getMessage()), e);
        }
    }
}
