package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.api.plugin.KsqlServerEndpoints;
import io.confluent.ksql.api.server.ApiServerConfig;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingFilters;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.UserFunctionLoader;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.HeartbeatAgent;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.LagReportingResource;
import io.confluent.ksql.rest.server.resources.RootDocument;
import io.confluent.ksql.rest.server.resources.ServerInfoResource;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.server.state.ServerStateDynamicBinding;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlAuthorizationValidatorFactory;
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.confluent.rest.RestConfig;
import io.confluent.rest.validation.JacksonMessageBodyProvider;
import io.vertx.core.Vertx;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import javax.ws.rs.core.Configurable;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.jsr356.server.ServerContainer;
import org.glassfish.hk2.utilities.Binder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/KsqlRestApplication.class */
public final class KsqlRestApplication extends ExecutableApplication<KsqlRestConfig> {
    private static final Logger log = LoggerFactory.getLogger(KsqlRestApplication.class);
    private static final SourceName COMMANDS_STREAM_NAME = SourceName.of("KSQL_COMMANDS");
    private final KsqlConfig ksqlConfigNoPort;
    private final KsqlRestConfig restConfig;
    private final KsqlEngine ksqlEngine;
    private final CommandRunner commandRunner;
    private final CommandStore commandStore;
    private final RootDocument rootDocument;
    private final StatusResource statusResource;
    private final StreamedQueryResource streamedQueryResource;
    private final KsqlResource ksqlResource;
    private final VersionCheckerAgent versionCheckerAgent;
    private final ServiceContext serviceContext;
    private final BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> serviceContextBinderFactory;
    private final KsqlSecurityExtension securityExtension;
    private final ServerState serverState;
    private final ProcessingLogContext processingLogContext;
    private final List<KsqlServerPrecondition> preconditions;
    private final List<KsqlConfigurable> configurables;
    private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;
    private final Optional<HeartbeatAgent> heartbeatAgent;
    private final Optional<LagReportingAgent> lagReportingAgent;
    private Vertx vertx;
    private Server apiServer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/KsqlRestApplication$KsqlFailedPrecondition.class */
    public static final class KsqlFailedPrecondition extends RuntimeException {
        private KsqlFailedPrecondition(String str) {
            super(str);
        }
    }

    public static SourceName getCommandsStreamName() {
        return COMMANDS_STREAM_NAME;
    }

    @VisibleForTesting
    KsqlRestApplication(ServiceContext serviceContext, KsqlEngine ksqlEngine, KsqlConfig ksqlConfig, KsqlRestConfig ksqlRestConfig, CommandRunner commandRunner, CommandStore commandStore, RootDocument rootDocument, StatusResource statusResource, StreamedQueryResource streamedQueryResource, KsqlResource ksqlResource, VersionCheckerAgent versionCheckerAgent, BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> biFunction, KsqlSecurityExtension ksqlSecurityExtension, ServerState serverState, ProcessingLogContext processingLogContext, List<KsqlServerPrecondition> list, List<KsqlConfigurable> list2, Consumer<KsqlConfig> consumer, Optional<HeartbeatAgent> optional, Optional<LagReportingAgent> optional2) {
        super(ksqlRestConfig);
        this.vertx = null;
        this.apiServer = null;
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.ksqlConfigNoPort = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.restConfig = (KsqlRestConfig) Objects.requireNonNull(ksqlRestConfig, "restConfig");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.commandRunner = (CommandRunner) Objects.requireNonNull(commandRunner, "commandRunner");
        this.rootDocument = (RootDocument) Objects.requireNonNull(rootDocument, "rootDocument");
        this.statusResource = (StatusResource) Objects.requireNonNull(statusResource, "statusResource");
        this.streamedQueryResource = (StreamedQueryResource) Objects.requireNonNull(streamedQueryResource, "streamedQueryResource");
        this.ksqlResource = (KsqlResource) Objects.requireNonNull(ksqlResource, "ksqlResource");
        this.commandStore = (CommandStore) Objects.requireNonNull(commandStore, "commandStore");
        this.serverState = (ServerState) Objects.requireNonNull(serverState, "serverState");
        this.processingLogContext = (ProcessingLogContext) Objects.requireNonNull(processingLogContext, "processingLogContext");
        this.preconditions = (List) Objects.requireNonNull(list, "preconditions");
        this.versionCheckerAgent = (VersionCheckerAgent) Objects.requireNonNull(versionCheckerAgent, "versionCheckerAgent");
        this.serviceContextBinderFactory = (BiFunction) Objects.requireNonNull(biFunction, "serviceContextBinderFactory");
        this.securityExtension = (KsqlSecurityExtension) Objects.requireNonNull(ksqlSecurityExtension, "securityExtension");
        this.configurables = (List) Objects.requireNonNull(list2, "configurables");
        this.rocksDBConfigSetterHandler = (Consumer) Objects.requireNonNull(consumer, "rocksDBConfigSetterHandler");
        this.heartbeatAgent = (Optional) Objects.requireNonNull(optional, "heartbeatAgent");
        this.lagReportingAgent = (Optional) Objects.requireNonNull(optional2, "lagReportingAgent");
    }

    public void setupResources(Configurable<?> configurable, KsqlRestConfig ksqlRestConfig) {
        configurable.register(this.rootDocument);
        configurable.register(new ServerInfoResource(this.serviceContext, this.ksqlConfigNoPort));
        configurable.register(ServerMetadataResource.create(this.serviceContext, this.ksqlConfigNoPort));
        configurable.register(this.statusResource);
        configurable.register(this.ksqlResource);
        configurable.register(this.streamedQueryResource);
        configurable.register(HealthCheckResource.create(this.ksqlResource, this.serviceContext, (KsqlRestConfig) this.config));
        if (this.heartbeatAgent.isPresent()) {
            configurable.register(new HeartbeatResource(this.heartbeatAgent.get()));
            configurable.register(new ClusterStatusResource(this.ksqlEngine, this.heartbeatAgent.get(), this.lagReportingAgent));
        }
        if (this.lagReportingAgent.isPresent()) {
            configurable.register(new LagReportingResource(this.lagReportingAgent.get()));
        }
        configurable.register(new KsqlExceptionMapper());
        configurable.register(new ServerStateDynamicBinding(this.serverState));
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void startAsync() {
        log.info("KSQL RESTful API listening on {}", StringUtils.join(getListeners(), ", "));
        KsqlConfig buildConfigWithPort = buildConfigWithPort();
        this.configurables.forEach(ksqlConfigurable -> {
            ksqlConfigurable.configure(buildConfigWithPort);
        });
        startKsql(buildConfigWithPort);
        Properties properties = new Properties();
        properties.putAll(((KsqlRestConfig) getConfiguration()).getOriginals());
        if (this.versionCheckerAgent != null) {
            this.versionCheckerAgent.start(KsqlModuleType.SERVER, properties);
        }
        if (buildConfigWithPort.getBoolean("ksql.new.api.enabled").booleanValue()) {
            startApiServer(buildConfigWithPort);
        }
        displayWelcomeMessage();
    }

    @VisibleForTesting
    void startKsql(KsqlConfig ksqlConfig) {
        waitForPreconditions();
        initialize(ksqlConfig);
    }

    void startApiServer(KsqlConfig ksqlConfig) {
        this.vertx = Vertx.vertx();
        KsqlServerEndpoints ksqlServerEndpoints = new KsqlServerEndpoints(this.ksqlEngine, ksqlConfig, this.securityExtension, RestServiceContextFactory::create);
        this.apiServer = new Server(this.vertx, new ApiServerConfig(ksqlConfig.originals()), ksqlServerEndpoints);
        this.apiServer.start();
        log.info("KSQL New API Server started");
    }

    @VisibleForTesting
    KsqlEngine getEngine() {
        return this.ksqlEngine;
    }

    private void checkPreconditions() {
        Iterator<KsqlServerPrecondition> it = this.preconditions.iterator();
        while (it.hasNext()) {
            Optional<KsqlErrorMessage> checkPrecondition = it.next().checkPrecondition((KsqlRestConfig) this.config, this.serviceContext);
            if (checkPrecondition.isPresent()) {
                this.serverState.setInitializingReason(checkPrecondition.get());
                throw new KsqlFailedPrecondition(checkPrecondition.get().toString());
            }
        }
    }

    private void waitForPreconditions() {
        RetryUtil.retryWithBackoff(Integer.MAX_VALUE, 1000, 30000, this::checkPreconditions, ImmutableList.of(exc -> {
            return !(exc instanceof KsqlFailedPrecondition);
        }));
    }

    private void initialize(KsqlConfig ksqlConfig) {
        this.rocksDBConfigSetterHandler.accept(this.ksqlConfigNoPort);
        registerCommandTopic();
        this.commandStore.start();
        ProcessingLogServerUtils.maybeCreateProcessingLogTopic(this.serviceContext.getTopicClient(), this.processingLogContext.getConfig(), this.ksqlConfigNoPort);
        this.commandRunner.processPriorCommands();
        this.commandRunner.start();
        maybeCreateProcessingLogStream(this.processingLogContext.getConfig(), this.ksqlConfigNoPort, this.restConfig, this.ksqlResource, this.serviceContext);
        if (this.heartbeatAgent.isPresent()) {
            this.heartbeatAgent.get().setLocalAddress((String) ksqlConfig.getKsqlStreamConfigProps().get("application.server"));
            this.heartbeatAgent.get().startAgent();
        }
        if (this.lagReportingAgent.isPresent()) {
            this.lagReportingAgent.get().setLocalAddress((String) ksqlConfig.getKsqlStreamConfigProps().get("application.server"));
            this.lagReportingAgent.get().startAgent();
        }
        this.serverState.setReady();
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void triggerShutdown() {
        try {
            this.ksqlEngine.close();
        } catch (Exception e) {
            log.error("Exception while waiting for Ksql Engine to close", e);
        }
        try {
            this.commandRunner.close();
        } catch (Exception e2) {
            log.error("Exception while waiting for CommandRunner thread to complete", e2);
        }
        try {
            this.serviceContext.close();
        } catch (Exception e3) {
            log.error("Exception while closing services", e3);
        }
        try {
            this.securityExtension.close();
        } catch (Exception e4) {
            log.error("Exception while closing security extension", e4);
        }
        if (this.apiServer != null) {
            this.apiServer.stop();
            this.apiServer = null;
        }
        if (this.vertx != null) {
            this.vertx.close();
            this.vertx = null;
        }
        shutdownAdditionalAgents();
    }

    private void shutdownAdditionalAgents() {
        if (this.heartbeatAgent.isPresent()) {
            try {
                this.heartbeatAgent.get().stopAgent();
            } catch (Exception e) {
                log.error("Exception while shutting down HeartbeatAgent", e);
            }
        }
        if (this.lagReportingAgent.isPresent()) {
            try {
                this.lagReportingAgent.get().stopAgent();
            } catch (Exception e2) {
                log.error("Exception while shutting down LagReportingAgent", e2);
            }
        }
    }

    public void onShutdown() {
        triggerShutdown();
    }

    List<URL> getListeners() {
        Function function = url -> {
            Stream filter = Arrays.stream(this.server.getConnectors()).filter(connector -> {
                return connector instanceof ServerConnector;
            });
            Class<ServerConnector> cls = ServerConnector.class;
            ServerConnector.class.getClass();
            return (Set) filter.map((v1) -> {
                return r1.cast(v1);
            }).filter(serverConnector -> {
                return (serverConnector.getProtocols().stream().map((v0) -> {
                    return v0.toLowerCase();
                }).anyMatch(str -> {
                    return str.equals("ssl");
                }) ? "https" : "http").equalsIgnoreCase(url.getProtocol());
            }).map((v0) -> {
                return v0.getLocalPort();
            }).collect(Collectors.toSet());
        };
        return (List) this.restConfig.getList("listeners").stream().flatMap(str -> {
            try {
                URL url2 = new URL(str);
                return url2.getPort() != 0 ? Stream.of(url2) : ((Set) function.apply(url2)).stream().map(num -> {
                    try {
                        return new URL(url2.getProtocol(), url2.getHost(), num.intValue(), url2.getFile());
                    } catch (MalformedURLException e) {
                        throw new KsqlServerException("Malformed URL specified in 'listeners' config: " + str, e);
                    }
                });
            } catch (MalformedURLException e) {
                throw new KsqlServerException("Malformed URL specified in 'listeners' config: " + str, e);
            }
        }).distinct().collect(Collectors.toList());
    }

    public void configureBaseApplication(Configurable<?> configurable, Map<String, String> map) {
        configurable.register(new JacksonMessageBodyProvider(JsonMapper.INSTANCE.mapper));
        configurable.register(JsonParseExceptionMapper.class);
        configurable.register(this.serviceContextBinderFactory.apply(this.ksqlConfigNoPort, this.securityExtension));
        configurable.property("jersey.config.server.contentLength.buffer", 0);
        configurable.property("jersey.config.server.wadl.disableWadl", true);
        this.securityExtension.getAuthorizationProvider().ifPresent(ksqlAuthorizationProvider -> {
            configurable.register(new KsqlAuthorizationFilter(ksqlAuthorizationProvider));
        });
    }

    protected void registerWebSocketEndpoints(ServerContainer serverContainer) {
        try {
            final ListeningScheduledExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(((KsqlRestConfig) this.config).getInt("ksql.server.websockets.num.threads").intValue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("websockets-query-thread-%d").build()));
            final StatementParser statementParser = new StatementParser(this.ksqlEngine);
            final Optional create = KsqlAuthorizationValidatorFactory.create(this.ksqlConfigNoPort, this.serviceContext);
            final Errors errors = new Errors((ErrorMessages) this.restConfig.getConfiguredInstance("ksql.server.error.messages", ErrorMessages.class));
            final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(this.ksqlEngine, this.heartbeatAgent, initializeRoutingFilterFactory(this.ksqlConfigNoPort, this.heartbeatAgent, this.lagReportingAgent));
            serverContainer.addEndpoint(ServerEndpointConfig.Builder.create(WSQueryEndpoint.class, WSQueryEndpoint.class.getAnnotation(ServerEndpoint.class).value()).configurator(new ServerEndpointConfig.Configurator() { // from class: io.confluent.ksql.rest.server.KsqlRestApplication.1
                public <T> T getEndpointInstance(Class<T> cls) {
                    KsqlConfig buildConfigWithPort = KsqlRestApplication.this.buildConfigWithPort();
                    ObjectMapper objectMapper = JsonMapper.INSTANCE.mapper;
                    StatementParser statementParser2 = statementParser;
                    KsqlEngine ksqlEngine = KsqlRestApplication.this.ksqlEngine;
                    CommandStore commandStore = KsqlRestApplication.this.commandStore;
                    ListeningScheduledExecutorService listeningScheduledExecutorService = listeningDecorator;
                    VersionCheckerAgent versionCheckerAgent = KsqlRestApplication.this.versionCheckerAgent;
                    versionCheckerAgent.getClass();
                    return (T) new WSQueryEndpoint(buildConfigWithPort, objectMapper, statementParser2, ksqlEngine, commandStore, listeningScheduledExecutorService, versionCheckerAgent::updateLastRequestTime, Duration.ofMillis(((KsqlRestConfig) KsqlRestApplication.this.config).getLong("ksql.server.command.response.timeout.ms").longValue()), create, errors, KsqlRestApplication.this.securityExtension, KsqlRestApplication.this.serverState, KsqlRestApplication.this.serviceContext.getSchemaRegistryClientFactory(), pullQueryExecutor);
                }
            }).build());
        } catch (DeploymentException e) {
            log.error("Unable to create websockets endpoint", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KsqlRestApplication buildApplication(KsqlRestConfig ksqlRestConfig, Function<Supplier<Boolean>, VersionCheckerAgent> function) {
        KsqlConfig ksqlConfig = new KsqlConfig(ksqlRestConfig.getKsqlConfigProperties());
        KsqlSchemaRegistryClientFactory ksqlSchemaRegistryClientFactory = new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap());
        ksqlSchemaRegistryClientFactory.getClass();
        Supplier supplier = ksqlSchemaRegistryClientFactory::get;
        return buildApplication("", ksqlRestConfig, function, Integer.MAX_VALUE, new LazyServiceContext(() -> {
            return RestServiceContextFactory.create(ksqlConfig, Optional.empty(), supplier);
        }), (ksqlConfig2, ksqlSecurityExtension) -> {
            return new KsqlSecurityContextBinder(ksqlConfig2, ksqlSecurityExtension, supplier);
        });
    }

    static KsqlRestApplication buildApplication(String str, KsqlRestConfig ksqlRestConfig, Function<Supplier<Boolean>, VersionCheckerAgent> function, int i, ServiceContext serviceContext, BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> biFunction) {
        String string = ksqlRestConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);
        KsqlConfig ksqlConfig = new KsqlConfig(ksqlRestConfig.getKsqlConfigProperties());
        MetricCollectors.addConfigurableReporter(ksqlConfig);
        ProcessingLogConfig processingLogConfig = new ProcessingLogConfig(ksqlRestConfig.getOriginals());
        ProcessingLogContext create = ProcessingLogContext.create(processingLogConfig);
        InternalFunctionRegistry internalFunctionRegistry = new InternalFunctionRegistry();
        if (ksqlRestConfig.getBoolean("ksql.server.exception.uncaught.handler.enable").booleanValue()) {
            Thread.setDefaultUncaughtExceptionHandler(new KsqlUncaughtExceptionHandler(LogManager::shutdown));
        }
        SpecificQueryIdGenerator specificQueryIdGenerator = new SpecificQueryIdGenerator();
        KsqlEngine ksqlEngine = new KsqlEngine(serviceContext, create, internalFunctionRegistry, ServiceInfo.create(ksqlConfig, str), specificQueryIdGenerator);
        UserFunctionLoader.newInstance(ksqlConfig, internalFunctionRegistry, string).load();
        String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);
        CommandStore create2 = CommandStore.Factory.create(commandTopic, ksqlConfig.getString("ksql.service.id"), Duration.ofMillis(ksqlRestConfig.getLong("ksql.server.command.response.timeout.ms").longValue()), ksqlRestConfig.getCommandConsumerProperties(), ksqlRestConfig.getCommandProducerProperties());
        InteractiveStatementExecutor interactiveStatementExecutor = new InteractiveStatementExecutor(serviceContext, ksqlEngine, specificQueryIdGenerator);
        RootDocument rootDocument = new RootDocument();
        StatusResource statusResource = new StatusResource(interactiveStatementExecutor);
        ksqlEngine.getClass();
        VersionCheckerAgent apply = function.apply(ksqlEngine::hasActiveQueries);
        ServerState serverState = new ServerState();
        KsqlSecurityExtension loadSecurityExtension = loadSecurityExtension(ksqlConfig);
        Optional create3 = KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext);
        Errors errors = new Errors((ErrorMessages) ksqlRestConfig.getConfiguredInstance("ksql.server.error.messages", ErrorMessages.class));
        Optional<LagReportingAgent> initializeLagReportingAgent = initializeLagReportingAgent(ksqlRestConfig, ksqlEngine, serviceContext);
        Optional<HeartbeatAgent> initializeHeartbeatAgent = initializeHeartbeatAgent(ksqlRestConfig, ksqlEngine, serviceContext, initializeLagReportingAgent);
        PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(ksqlEngine, initializeHeartbeatAgent, initializeRoutingFilterFactory(ksqlConfig, initializeHeartbeatAgent, initializeLagReportingAgent));
        Duration ofMillis = Duration.ofMillis(ksqlRestConfig.getLong("query.stream.disconnect.check").longValue());
        Duration ofMillis2 = Duration.ofMillis(ksqlRestConfig.getLong("ksql.server.command.response.timeout.ms").longValue());
        apply.getClass();
        StreamedQueryResource streamedQueryResource = new StreamedQueryResource(ksqlEngine, create2, ofMillis, ofMillis2, apply::updateLastRequestTime, create3, errors, pullQueryExecutor);
        Duration ofMillis3 = Duration.ofMillis(ksqlRestConfig.getLong("ksql.server.command.response.timeout.ms").longValue());
        apply.getClass();
        KsqlResource ksqlResource = new KsqlResource(ksqlEngine, create2, ofMillis3, apply::updateLastRequestTime, create3, errors);
        LinkedList linkedList = new LinkedList();
        linkedList.add(commandTopic);
        if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE).booleanValue()) {
            linkedList.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig));
        }
        return new KsqlRestApplication(serviceContext, ksqlEngine, ksqlConfig, injectPathsWithoutAuthentication(ksqlRestConfig), new CommandRunner(interactiveStatementExecutor, create2, i, new ClusterTerminator(ksqlEngine, serviceContext, linkedList), serverState, ksqlConfig.getString("ksql.service.id"), Duration.ofMillis(ksqlRestConfig.getLong("ksql.server.command.blocked.threshold.error.ms").longValue()), str), create2, rootDocument, statusResource, streamedQueryResource, ksqlResource, apply, biFunction, loadSecurityExtension, serverState, create, ksqlRestConfig.getConfiguredInstances("ksql.server.preconditions", KsqlServerPrecondition.class), ImmutableList.of(ksqlResource, streamedQueryResource, interactiveStatementExecutor), RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter, initializeHeartbeatAgent, initializeLagReportingAgent);
    }

    private static Optional<HeartbeatAgent> initializeHeartbeatAgent(KsqlRestConfig ksqlRestConfig, KsqlEngine ksqlEngine, ServiceContext serviceContext, Optional<LagReportingAgent> optional) {
        if (!ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG).booleanValue()) {
            return Optional.empty();
        }
        HeartbeatAgent.Builder builder = HeartbeatAgent.builder();
        builder.heartbeatSendInterval(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG).longValue()).heartbeatCheckInterval(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG).longValue()).heartbeatMissedThreshold(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_MISSED_THRESHOLD_CONFIG).longValue()).heartbeatWindow(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_WINDOW_MS_CONFIG).longValue()).discoverClusterInterval(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG).longValue()).threadPoolSize(ksqlRestConfig.getInt(KsqlRestConfig.KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG).intValue());
        if (optional.isPresent()) {
            builder.addHostStatusListener(optional.get());
        }
        return Optional.of(builder.build(ksqlEngine, serviceContext));
    }

    private static Optional<LagReportingAgent> initializeLagReportingAgent(KsqlRestConfig ksqlRestConfig, KsqlEngine ksqlEngine, ServiceContext serviceContext) {
        return (ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_LAG_REPORTING_ENABLE_CONFIG).booleanValue() && ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG).booleanValue()) ? Optional.of(LagReportingAgent.builder().lagSendIntervalMs(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_LAG_REPORTING_SEND_INTERVAL_MS_CONFIG).longValue()).build(ksqlEngine, serviceContext)) : Optional.empty();
    }

    private static RoutingFilter.RoutingFilterFactory initializeRoutingFilterFactory(KsqlConfig ksqlConfig, Optional<HeartbeatAgent> optional, Optional<LagReportingAgent> optional2) {
        return (routingOptions, list, hostInfo, str, str2, i) -> {
            ImmutableList.Builder builder = ImmutableList.builder();
            if (!ksqlConfig.getBoolean("ksql.query.pull.enable.standby.reads").booleanValue()) {
                builder.add(new ActiveHostFilter(hostInfo));
            }
            builder.add(new LivenessFilter(optional));
            Optional<MaximumLagFilter> create = MaximumLagFilter.create(optional2, routingOptions, list, str, str2, i);
            builder.getClass();
            create.map((v1) -> {
                return r1.add(v1);
            });
            return new RoutingFilters(builder.build());
        };
    }

    private void registerCommandTopic() {
        String commandTopicName = this.commandStore.getCommandTopicName();
        KsqlInternalTopicUtils.ensureTopic(commandTopicName, this.ksqlConfigNoPort, this.serviceContext.getTopicClient());
        this.ksqlEngine.execute(this.serviceContext, ConfiguredStatement.of(this.ksqlEngine.prepare((KsqlParser.ParsedStatement) this.ksqlEngine.parse("CREATE STREAM " + COMMANDS_STREAM_NAME + " (STATEMENT STRING) WITH(VALUE_FORMAT='JSON', KAFKA_TOPIC='" + commandTopicName + "');").get(0)), ImmutableMap.of(), this.ksqlConfigNoPort));
    }

    private static KsqlSecurityExtension loadSecurityExtension(KsqlConfig ksqlConfig) {
        KsqlSecurityExtension ksqlSecurityExtension = (KsqlSecurityExtension) Optional.ofNullable(ksqlConfig.getConfiguredInstance("ksql.security.extension.class", KsqlSecurityExtension.class)).orElse(new KsqlDefaultSecurityExtension());
        ksqlSecurityExtension.initialize(ksqlConfig);
        return ksqlSecurityExtension;
    }

    private void displayWelcomeMessage() {
        if (System.console() == null) {
            return;
        }
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
        WelcomeMsgUtils.displayWelcomeMessage(80, printWriter);
        String version = Version.getVersion();
        List<URL> listeners = getListeners();
        printWriter.printf("Server %s listening on %s%n", version, (String) listeners.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
        printWriter.println();
        printWriter.println("To access the KSQL CLI, run:");
        printWriter.println("ksql " + listeners.get(0));
        printWriter.println();
        printWriter.flush();
    }

    private static void maybeCreateProcessingLogStream(ProcessingLogConfig processingLogConfig, KsqlConfig ksqlConfig, KsqlRestConfig ksqlRestConfig, KsqlResource ksqlResource, ServiceContext serviceContext) {
        if (processingLogConfig.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE).booleanValue()) {
            try {
                if (new ServerInternalKsqlClient(ksqlResource, new KsqlSecurityContext(Optional.empty(), serviceContext)).makeKsqlRequest(ServerUtil.getServerAddress(ksqlRestConfig), ProcessingLogServerUtils.processingLogStreamCreateStatement(processingLogConfig, ksqlConfig)).isSuccessful()) {
                    log.info("Successfully created processing log stream.");
                }
            } catch (Exception e) {
                log.error("Error while sending processing log CreateStream request to KsqlResource: ", e);
            }
        }
    }

    @VisibleForTesting
    KsqlConfig buildConfigWithPort() {
        Map originals = this.ksqlConfigNoPort.originals();
        originals.put("ksql.streams.application.server", this.restConfig.getInterNodeListener(this::resolvePort).toString());
        return new KsqlConfig(originals);
    }

    private int resolvePort(URL url) {
        return ((Integer) getListeners().stream().filter(url2 -> {
            return url2.getProtocol().equals(url.getProtocol()) && url2.getHost().equals(url.getHost());
        }).map((v0) -> {
            return v0.getPort();
        }).findFirst().orElseThrow(() -> {
            return new IllegalStateException("Failed resolve port for listener: " + url);
        })).intValue();
    }

    private static KsqlRestConfig injectPathsWithoutAuthentication(KsqlRestConfig ksqlRestConfig) {
        HashSet hashSet = new HashSet(ksqlRestConfig.getList("authentication.skip.paths"));
        hashSet.addAll(KsqlAuthorizationFilter.getPathsWithoutAuthorization());
        Map<String, Object> originals = ksqlRestConfig.getOriginals();
        originals.put("authentication.skip.paths", Joiner.on(",").join(hashSet));
        return new KsqlRestConfig(originals);
    }

    public /* bridge */ /* synthetic */ void setupResources(Configurable configurable, RestConfig restConfig) {
        setupResources((Configurable<?>) configurable, (KsqlRestConfig) restConfig);
    }
}
