package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.impl.DefaultKsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.KsqlSecurityContextProvider;
import io.confluent.ksql.api.impl.MonitoredEndpoints;
import io.confluent.ksql.api.server.Server;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.pull.HARouting;
import io.confluent.ksql.execution.scalablepush.PushRouting;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingFilters;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.JmxDataPointsReporter;
import io.confluent.ksql.internal.LeakedResourcesMetrics;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.internal.StorageUtilizationMetricsReporter;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogServerUtils;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.healthcheck.HealthCheckAgent;
import io.confluent.ksql.rest.server.HeartbeatAgent;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.resources.LagReportingResource;
import io.confluent.ksql.rest.server.resources.ServerInfoResource;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
import io.confluent.ksql.rest.server.services.InternalKsqlClientFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.CommandTopicBackupUtil;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.RateLimiter;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlAuthorizationValidatorFactory;
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.ConnectClientFactory;
import io.confluent.ksql.services.DefaultConnectClientFactory;
import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConfigurable;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.confluent.ksql.utilization.PersistentQuerySaturationMetrics;
import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.dropwizard.DropwizardMetricsOptions;
import io.vertx.ext.dropwizard.Match;
import java.io.File;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/KsqlRestApplication.class */
public final class KsqlRestApplication implements Executable {
    private static final int NUM_MILLISECONDS_IN_HOUR = 3600000;
    private final KsqlConfig ksqlConfigNoPort;
    private final KsqlRestConfig restConfig;
    private final KsqlEngine ksqlEngine;
    private final CommandRunner commandRunner;
    private final CommandStore commandStore;
    private final StatusResource statusResource;
    private final StreamedQueryResource streamedQueryResource;
    private final KsqlResource ksqlResource;
    private final VersionCheckerAgent versionCheckerAgent;
    private final ServiceContext serviceContext;
    private final KsqlSecurityContextProvider ksqlSecurityContextProvider;
    private final KsqlSecurityExtension securityExtension;
    private final Optional<AuthenticationPlugin> authenticationPlugin;
    private final ServerState serverState;
    private final ProcessingLogContext processingLogContext;
    private final List<KsqlConfigurable> configurables;
    private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;
    private final Optional<HeartbeatAgent> heartbeatAgent;
    private final Optional<LagReportingAgent> lagReportingAgent;
    private final ServerInfoResource serverInfoResource;
    private final Optional<HeartbeatResource> heartbeatResource;
    private final Optional<ClusterStatusResource> clusterStatusResource;
    private final Optional<LagReportingResource> lagReportingResource;
    private final HealthCheckResource healthCheckResource;
    private final QueryExecutor queryExecutor;
    private volatile ServerMetadataResource serverMetadataResource;
    private volatile WSQueryEndpoint wsQueryEndpoint;
    private volatile ListeningScheduledExecutorService oldApiWebsocketExecutor;
    private final Vertx vertx;
    private final DenyListPropertyValidator denyListPropertyValidator;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics;
    private final Optional<LocalCommands> localCommands;
    private KafkaTopicClient internalTopicClient;
    private final Instant ksqlRestAppStartTime;
    private final KsqlRestApplicationMetrics restApplicationMetrics;
    private static final Logger log = LogManager.getLogger(KsqlRestApplication.class);
    private static final SourceName COMMANDS_STREAM_NAME = SourceName.of("KSQL_COMMANDS");
    private Server apiServer = null;
    private final CompletableFuture<Void> terminatedFuture = new CompletableFuture<>();
    private AtomicReference<Thread> startAsyncThreadRef = new AtomicReference<>(null);

    /* loaded from: input_file:io/confluent/ksql/rest/server/KsqlRestApplication$AbortApplicationStartException.class */
    static final class AbortApplicationStartException extends KsqlServerException {
        private AbortApplicationStartException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/KsqlRestApplication$KsqlFailedPrecondition.class */
    private static final class KsqlFailedPrecondition extends RuntimeException {
        private KsqlFailedPrecondition(String str) {
            super(str);
        }
    }

    public static SourceName getCommandsStreamName() {
        return COMMANDS_STREAM_NAME;
    }

    @VisibleForTesting
    KsqlRestApplication(ServiceContext serviceContext, KsqlEngine ksqlEngine, KsqlConfig ksqlConfig, KsqlRestConfig ksqlRestConfig, CommandRunner commandRunner, CommandStore commandStore, StatusResource statusResource, StreamedQueryResource streamedQueryResource, KsqlResource ksqlResource, VersionCheckerAgent versionCheckerAgent, KsqlSecurityContextProvider ksqlSecurityContextProvider, KsqlSecurityExtension ksqlSecurityExtension, Optional<AuthenticationPlugin> optional, ServerState serverState, ProcessingLogContext processingLogContext, List<KsqlConfigurable> list, Consumer<KsqlConfig> consumer, Optional<HeartbeatAgent> optional2, Optional<LagReportingAgent> optional3, Vertx vertx, DenyListPropertyValidator denyListPropertyValidator, Optional<PullQueryExecutorMetrics> optional4, Optional<ScalablePushQueryMetrics> optional5, Optional<LocalCommands> optional6, QueryExecutor queryExecutor, MetricCollectors metricCollectors, KafkaTopicClient kafkaTopicClient, Admin admin, Instant instant) {
        log.debug("Creating instance of ksqlDB API server");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.ksqlConfigNoPort = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.restConfig = (KsqlRestConfig) Objects.requireNonNull(ksqlRestConfig, "restConfig");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.commandRunner = (CommandRunner) Objects.requireNonNull(commandRunner, HealthCheckAgent.COMMAND_RUNNER_CHECK_NAME);
        this.statusResource = (StatusResource) Objects.requireNonNull(statusResource, "statusResource");
        this.streamedQueryResource = (StreamedQueryResource) Objects.requireNonNull(streamedQueryResource, "streamedQueryResource");
        this.ksqlResource = (KsqlResource) Objects.requireNonNull(ksqlResource, "ksqlResource");
        this.commandStore = (CommandStore) Objects.requireNonNull(commandStore, "commandStore");
        this.serverState = (ServerState) Objects.requireNonNull(serverState, "serverState");
        this.processingLogContext = (ProcessingLogContext) Objects.requireNonNull(processingLogContext, "processingLogContext");
        this.versionCheckerAgent = (VersionCheckerAgent) Objects.requireNonNull(versionCheckerAgent, "versionCheckerAgent");
        this.ksqlSecurityContextProvider = (KsqlSecurityContextProvider) Objects.requireNonNull(ksqlSecurityContextProvider, "ksqlSecurityContextProvider");
        this.securityExtension = (KsqlSecurityExtension) Objects.requireNonNull(ksqlSecurityExtension, "securityExtension");
        this.authenticationPlugin = (Optional) Objects.requireNonNull(optional, "authenticationPlugin");
        this.configurables = (List) Objects.requireNonNull(list, "configurables");
        this.rocksDBConfigSetterHandler = (Consumer) Objects.requireNonNull(consumer, "rocksDBConfigSetterHandler");
        this.heartbeatAgent = (Optional) Objects.requireNonNull(optional2, "heartbeatAgent");
        this.lagReportingAgent = (Optional) Objects.requireNonNull(optional3, "lagReportingAgent");
        this.vertx = (Vertx) Objects.requireNonNull(vertx, "vertx");
        this.denyListPropertyValidator = (DenyListPropertyValidator) Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
        this.serverInfoResource = new ServerInfoResource(serviceContext, this.ksqlConfigNoPort, commandRunner);
        if (optional2.isPresent()) {
            this.heartbeatResource = Optional.of(new HeartbeatResource(optional2.get()));
            this.clusterStatusResource = Optional.of(new ClusterStatusResource(ksqlEngine, optional2.get(), optional3));
        } else {
            this.heartbeatResource = Optional.empty();
            this.clusterStatusResource = Optional.empty();
        }
        this.lagReportingResource = optional3.map(LagReportingResource::new);
        this.healthCheckResource = HealthCheckResource.create(ksqlResource, serviceContext, this.restConfig, this.ksqlConfigNoPort, this.commandRunner, admin);
        metricCollectors.addConfigurableReporter(this.ksqlConfigNoPort);
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional4, "pullQueryMetrics");
        this.scalablePushQueryMetrics = (Optional) Objects.requireNonNull(optional5, "scalablePushQueryMetrics");
        log.debug("ksqlDB API server instance created");
        this.localCommands = (Optional) Objects.requireNonNull(optional6, "localCommands");
        this.queryExecutor = (QueryExecutor) Objects.requireNonNull(queryExecutor, "queryExecutor");
        this.internalTopicClient = (KafkaTopicClient) Objects.requireNonNull(kafkaTopicClient, "internalTopicClient");
        this.ksqlRestAppStartTime = (Instant) Objects.requireNonNull(instant, "ksqlRestAppStartTime");
        this.restApplicationMetrics = new KsqlRestApplicationMetrics(metricCollectors.getMetrics());
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void startAsync() {
        log.debug("Starting the ksqlDB API server");
        this.serverMetadataResource = ServerMetadataResource.create(this.serviceContext, this.ksqlConfigNoPort);
        StatementParser statementParser = new StatementParser(this.ksqlEngine);
        Optional create = KsqlAuthorizationValidatorFactory.create(this.ksqlConfigNoPort, this.serviceContext, this.securityExtension.getAuthorizationProvider());
        Errors errors = new Errors((ErrorMessages) this.restConfig.getConfiguredInstance("ksql.server.error.messages", ErrorMessages.class));
        KsqlRestConfig ksqlRestConfig = new KsqlRestConfig(this.ksqlConfigNoPort.originals());
        this.oldApiWebsocketExecutor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(ksqlRestConfig.getInt("ksql.server.websockets.num.threads").intValue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("websockets-query-thread-%d").build()));
        KsqlConfig ksqlConfig = this.ksqlConfigNoPort;
        KsqlEngine ksqlEngine = this.ksqlEngine;
        CommandStore commandStore = this.commandStore;
        ListeningScheduledExecutorService listeningScheduledExecutorService = this.oldApiWebsocketExecutor;
        VersionCheckerAgent versionCheckerAgent = this.versionCheckerAgent;
        Objects.requireNonNull(versionCheckerAgent);
        this.wsQueryEndpoint = new WSQueryEndpoint(ksqlConfig, statementParser, ksqlEngine, commandStore, listeningScheduledExecutorService, versionCheckerAgent::updateLastRequestTime, Duration.ofMillis(ksqlRestConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG).longValue()), create, errors, this.denyListPropertyValidator, this.queryExecutor);
        this.startAsyncThreadRef.set(Thread.currentThread());
        try {
            try {
                this.apiServer = new Server(this.vertx, ksqlRestConfig, new KsqlServerEndpoints(this.ksqlEngine, this.ksqlConfigNoPort, this.ksqlSecurityContextProvider, this.ksqlResource, this.streamedQueryResource, this.serverInfoResource, this.heartbeatResource, this.clusterStatusResource, this.statusResource, this.lagReportingResource, this.healthCheckResource, this.serverMetadataResource, this.wsQueryEndpoint, this.pullQueryMetrics, this.queryExecutor, this.securityExtension.getAuthTokenProvider()), this.securityExtension, this.authenticationPlugin, this.serverState, this.pullQueryMetrics);
                this.apiServer.start();
                KsqlConfig buildConfigWithPort = buildConfigWithPort();
                this.configurables.forEach(ksqlConfigurable -> {
                    ksqlConfigurable.configure(buildConfigWithPort);
                });
                startKsql(buildConfigWithPort);
                Properties properties = new Properties();
                properties.putAll(this.restConfig.getOriginals());
                this.versionCheckerAgent.start(KsqlModuleType.SERVER, properties);
                HashSet hashSet = new HashSet(this.apiServer.getProxyProtocolListeners());
                String str = (String) this.apiServer.getListeners().stream().map(uri -> {
                    return hashSet.contains(uri) ? uri.toString() + " (PROXY protocol enabled)" : uri.toString();
                }).collect(Collectors.joining(", "));
                log.info("ksqlDB API server listening on {}", str);
                displayWelcomeMessage(str);
                this.startAsyncThreadRef.set(null);
            } catch (AbortApplicationStartException e) {
                log.error("Aborting application start", e);
                this.startAsyncThreadRef.set(null);
            }
        } catch (Throwable th) {
            this.startAsyncThreadRef.set(null);
            throw th;
        }
    }

    @VisibleForTesting
    void startKsql(KsqlConfig ksqlConfig) {
        cleanupOldState();
        initialize(ksqlConfig);
    }

    @VisibleForTesting
    KsqlEngine getEngine() {
        return this.ksqlEngine;
    }

    private void cleanupOldState() {
        this.localCommands.ifPresent(localCommands -> {
            localCommands.processLocalCommandFiles(this.serviceContext);
        });
    }

    private void initialize(KsqlConfig ksqlConfig) {
        this.rocksDBConfigSetterHandler.accept(this.ksqlConfigNoPort);
        registerCommandTopic();
        this.commandStore.start();
        ProcessingLogServerUtils.maybeCreateProcessingLogTopic(this.serviceContext.getTopicClient(), this.processingLogContext.getConfig(), this.ksqlConfigNoPort);
        this.commandRunner.processPriorCommands(new PersistentQueryCleanupImpl(ksqlConfig.getKsqlStreamConfigProps().getOrDefault("state.dir", StreamsConfig.configDef().defaultValues().get("state.dir")).toString(), this.serviceContext, ksqlConfig));
        this.commandRunner.start();
        maybeCreateProcessingLogStream(this.processingLogContext.getConfig(), this.ksqlConfigNoPort, this.restConfig, this.ksqlResource, this.serviceContext);
        if (this.heartbeatAgent.isPresent()) {
            this.heartbeatAgent.get().setLocalAddress((String) ksqlConfig.getKsqlStreamConfigProps().get("application.server"));
            this.heartbeatAgent.get().startAgent();
        }
        if (this.lagReportingAgent.isPresent()) {
            this.lagReportingAgent.get().setLocalAddress((String) ksqlConfig.getKsqlStreamConfigProps().get("application.server"));
            this.lagReportingAgent.get().startAgent();
        }
        this.serverState.setReady();
        this.restApplicationMetrics.recordServerStartLatency(Duration.between(this.ksqlRestAppStartTime, Instant.now()));
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void notifyTerminated() {
        this.terminatedFuture.complete(null);
        Thread thread = this.startAsyncThreadRef.get();
        if (thread != null) {
            thread.interrupt();
        }
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void shutdown() {
        log.info("ksqlDB shutdown called");
        try {
            this.pullQueryMetrics.ifPresent((v0) -> {
                v0.close();
            });
        } catch (Exception e) {
            log.error("Exception while waiting for pull query metrics to close", e);
        }
        try {
            this.scalablePushQueryMetrics.ifPresent((v0) -> {
                v0.close();
            });
        } catch (Exception e2) {
            log.error("Exception while waiting for scalable push query metrics to close", e2);
        }
        this.localCommands.ifPresent(localCommands -> {
            try {
                localCommands.close();
            } catch (Exception e3) {
                log.error("Exception while closing local commands", e3);
            }
        });
        try {
            this.ksqlEngine.close();
        } catch (Exception e3) {
            log.error("Exception while waiting for Ksql Engine to close", e3);
        }
        try {
            this.commandRunner.close();
        } catch (Exception e4) {
            log.error("Exception while waiting for CommandRunner thread to complete", e4);
        }
        try {
            this.serviceContext.close();
        } catch (Exception e5) {
            log.error("Exception while closing services", e5);
        }
        try {
            this.securityExtension.close();
        } catch (Exception e6) {
            log.error("Exception while closing security extension", e6);
        }
        if (this.apiServer != null) {
            this.apiServer.stop();
            this.apiServer = null;
        }
        if (this.vertx != null) {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                this.vertx.close(asyncResult -> {
                    countDownLatch.countDown();
                });
                countDownLatch.await();
            } catch (InterruptedException e7) {
                log.error("Exception while closing vertx", e7);
            }
        }
        if (this.oldApiWebsocketExecutor != null) {
            this.oldApiWebsocketExecutor.shutdown();
        }
        shutdownAdditionalAgents();
        log.info("ksqlDB shutdown complete");
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void awaitTerminated() throws InterruptedException {
        try {
            this.terminatedFuture.get();
        } catch (ExecutionException e) {
            log.error("Exception in awaitTerminated", e);
            throw new KsqlException(e.getCause());
        }
    }

    private void shutdownAdditionalAgents() {
        if (this.heartbeatAgent.isPresent()) {
            try {
                this.heartbeatAgent.get().stopAgent();
            } catch (Exception e) {
                log.error("Exception while shutting down HeartbeatAgent", e);
            }
        }
        if (this.lagReportingAgent.isPresent()) {
            try {
                this.lagReportingAgent.get().stopAgent();
            } catch (Exception e2) {
                log.error("Exception while shutting down LagReportingAgent", e2);
            }
        }
    }

    List<URL> getListeners() {
        return (List) this.apiServer.getListeners().stream().map(uri -> {
            try {
                return uri.toURL();
            } catch (MalformedURLException e) {
                throw new KsqlException(e);
            }
        }).collect(Collectors.toList());
    }

    Optional<URL> getInternalListener() {
        return this.apiServer.getInternalListener().map(uri -> {
            try {
                return uri.toURL();
            } catch (MalformedURLException e) {
                throw new KsqlException(e);
            }
        });
    }

    public static KsqlRestApplication buildApplication(KsqlRestConfig ksqlRestConfig, ServerState serverState, MetricCollectors metricCollectors, FunctionRegistry functionRegistry, Instant instant) {
        Map<String, Object> originals = ksqlRestConfig.getOriginals();
        KsqlConfig ksqlConfig = new KsqlConfig(ksqlRestConfig.getKsqlConfigProperties());
        Vertx vertx = Vertx.vertx(new VertxOptions().setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS).setMaxWorkerExecuteTime(Long.MAX_VALUE).setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
        vertx.exceptionHandler(th -> {
            log.error("Unhandled exception in Vert.x", th);
        });
        KsqlClient createInternalClient = InternalKsqlClientFactory.createInternalClient(PropertiesUtil.toMapStrings(ksqlConfig.originals()), (v0, v1) -> {
            return SocketAddress.inetSocketAddress(v0, v1);
        }, vertx);
        KsqlSchemaRegistryClientFactory ksqlSchemaRegistryClientFactory = new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap());
        Supplier supplier = ksqlSchemaRegistryClientFactory::get;
        DefaultConnectClientFactory defaultConnectClientFactory = new DefaultConnectClientFactory(ksqlConfig);
        originals.putAll(metricCollectors.addConfluentMetricsContextConfigs(ksqlConfig.getString("ksql.service.id"), KafkaClusterUtil.getKafkaClusterId(new LazyServiceContext(() -> {
            return RestServiceContextFactory.create(ksqlConfig, Optional.empty(), supplier, defaultConnectClientFactory, createInternalClient, Collections.emptyList(), Optional.empty());
        }))));
        KsqlRestConfig ksqlRestConfig2 = new KsqlRestConfig(originals);
        return buildApplication("", ksqlRestConfig2, serverState, KsqlVersionCheckerAgent::new, Integer.MAX_VALUE, new LazyServiceContext(() -> {
            return RestServiceContextFactory.create(new KsqlConfig(ksqlRestConfig2.getKsqlConfigProperties()), Optional.empty(), supplier, defaultConnectClientFactory, createInternalClient, Collections.emptyList(), Optional.empty());
        }), supplier, defaultConnectClientFactory, vertx, createInternalClient, RestServiceContextFactory::create, RestServiceContextFactory::create, metricCollectors, functionRegistry, instant);
    }

    static KsqlRestApplication buildApplication(String str, KsqlRestConfig ksqlRestConfig, ServerState serverState, Function<Supplier<Boolean>, VersionCheckerAgent> function, int i, ServiceContext serviceContext, Supplier<SchemaRegistryClient> supplier, ConnectClientFactory connectClientFactory, Vertx vertx, KsqlClient ksqlClient, RestServiceContextFactory.DefaultServiceContextFactory defaultServiceContextFactory, RestServiceContextFactory.UserServiceContextFactory userServiceContextFactory, MetricCollectors metricCollectors, FunctionRegistry functionRegistry, Instant instant) {
        KsqlConfig ksqlConfig = new KsqlConfig(ksqlRestConfig.getKsqlConfigProperties());
        ProcessingLogConfig processingLogConfig = new ProcessingLogConfig(ksqlRestConfig.getOriginals());
        ProcessingLogContext create = ProcessingLogContext.create(processingLogConfig, metricCollectors.getMetrics(), ksqlConfig.getStringAsMap("ksql.metrics.tags.custom"));
        if (ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER).booleanValue()) {
            Thread.setDefaultUncaughtExceptionHandler(new KsqlUncaughtExceptionHandler(LogManager::shutdown));
        }
        SpecificQueryIdGenerator specificQueryIdGenerator = new SpecificQueryIdGenerator();
        String obj = ksqlConfig.getKsqlStreamConfigProps().getOrDefault("state.dir", StreamsConfig.configDef().defaultValues().get("state.dir")).toString();
        ServiceInfo create2 = ServiceInfo.create(ksqlConfig, str);
        ImmutableMap build = ImmutableMap.builder().putAll(create2.customMetricsTags()).put("ksql_service_id", create2.serviceId()).build();
        StorageUtilizationMetricsReporter.configureShared(new File(obj), metricCollectors.getMetrics(), ksqlConfig.getStringAsMap("ksql.metrics.tags.custom"));
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ksql-csu-metrics-reporter-%d").build());
        ScheduledExecutorService newScheduledThreadPool2 = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ksql-leaked-resources-metrics-reporter-%d").build());
        KsqlEngine ksqlEngine = new KsqlEngine(serviceContext, create, functionRegistry, create2, specificQueryIdGenerator, new KsqlConfig(ksqlRestConfig.getKsqlConfigProperties()), Collections.emptyList(), metricCollectors);
        newScheduledThreadPool.scheduleAtFixedRate(new PersistentQuerySaturationMetrics(ksqlEngine, new JmxDataPointsReporter(metricCollectors.getMetrics(), "ksqldb_utilization", Duration.ofMinutes(1L)), Duration.ofMinutes(5L), Duration.ofSeconds(30L), ksqlConfig.getStringAsMap("ksql.metrics.tags.custom")), 0L, Duration.ofMinutes(1L).toMillis(), TimeUnit.MILLISECONDS);
        int intValue = ksqlConfig.getInt("ksql.transient.query.cleanup.service.period.seconds").intValue();
        newScheduledThreadPool2.scheduleAtFixedRate(new LeakedResourcesMetrics(ksqlEngine, new JmxDataPointsReporter(metricCollectors.getMetrics(), "_confluent-ksql-" + ksqlConfig.getString("ksql.service.id") + ".leaked_resources_metrics", Duration.ofSeconds(intValue)), ksqlConfig.getStringAsMap("ksql.metrics.tags.custom")), 0L, intValue, TimeUnit.SECONDS);
        String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);
        Admin createCommandTopicAdminClient = createCommandTopicAdminClient(ksqlRestConfig, ksqlConfig);
        KafkaTopicClientImpl kafkaTopicClientImpl = new KafkaTopicClientImpl(() -> {
            return createCommandTopicAdminClient;
        });
        CommandStore create3 = CommandStore.Factory.create(ksqlConfig, commandTopic, Duration.ofMillis(ksqlRestConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG).longValue()), ksqlConfig.addConfluentMetricsContextConfigsKafka(ksqlRestConfig.getCommandConsumerProperties()), ksqlConfig.addConfluentMetricsContextConfigsKafka(ksqlRestConfig.getCommandProducerProperties()), kafkaTopicClientImpl);
        InteractiveStatementExecutor interactiveStatementExecutor = new InteractiveStatementExecutor(serviceContext, ksqlEngine, specificQueryIdGenerator);
        StatusResource statusResource = new StatusResource(interactiveStatementExecutor);
        Objects.requireNonNull(ksqlEngine);
        VersionCheckerAgent apply = function.apply(ksqlEngine::hasActiveQueries);
        KsqlSecurityExtension loadSecurityExtension = loadSecurityExtension(ksqlConfig);
        DefaultKsqlSecurityContextProvider defaultKsqlSecurityContextProvider = new DefaultKsqlSecurityContextProvider(loadSecurityExtension, defaultServiceContextFactory, userServiceContextFactory, ksqlConfig, supplier, connectClientFactory, ksqlClient);
        Optional<AuthenticationPlugin> loadAuthenticationPlugin = loadAuthenticationPlugin(ksqlRestConfig);
        Optional create4 = KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext, loadSecurityExtension.getAuthorizationProvider());
        Errors errors = new Errors((ErrorMessages) ksqlRestConfig.getConfiguredInstance("ksql.server.error.messages", ErrorMessages.class));
        Optional<LagReportingAgent> initializeLagReportingAgent = initializeLagReportingAgent(ksqlRestConfig, ksqlEngine, serviceContext);
        Optional<HeartbeatAgent> initializeHeartbeatAgent = initializeHeartbeatAgent(ksqlRestConfig, ksqlEngine, serviceContext, initializeLagReportingAgent);
        RoutingFilter.RoutingFilterFactory initializeRoutingFilterFactory = initializeRoutingFilterFactory(ksqlConfig, initializeHeartbeatAgent, initializeLagReportingAgent);
        RateLimiter rateLimiter = new RateLimiter(ksqlConfig.getInt("ksql.query.pull.max.qps").intValue(), "pull", metricCollectors.getMetrics(), build);
        ConcurrencyLimiter concurrencyLimiter = new ConcurrencyLimiter(ksqlConfig.getInt("ksql.query.pull.max.concurrent.requests").intValue(), "pull", metricCollectors.getMetrics(), build);
        SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(ksqlConfig.getInt("ksql.query.pull.max.hourly.bandwidth.megabytes").intValue(), 3600000L, "pull", metricCollectors.getMetrics(), build);
        SlidingWindowRateLimiter slidingWindowRateLimiter2 = new SlidingWindowRateLimiter(ksqlConfig.getInt("ksql.query.push.v2.max.hourly.bandwidth.megabytes").intValue(), 3600000L, "push", metricCollectors.getMetrics(), build);
        DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(ksqlConfig.getList("ksql.properties.overrides.denylist"));
        Optional of = ksqlConfig.getBoolean("ksql.query.pull.metrics.enabled").booleanValue() ? Optional.of(new PullQueryExecutorMetrics(ksqlEngine.getServiceId(), ksqlConfig.getStringAsMap("ksql.metrics.tags.custom"), Time.SYSTEM, metricCollectors.getMetrics())) : Optional.empty();
        Optional of2 = ksqlConfig.getBoolean("ksql.query.push.v2.metrics.enabled").booleanValue() ? Optional.of(new ScalablePushQueryMetrics(ksqlEngine.getServiceId(), ksqlConfig.getStringAsMap("ksql.metrics.tags.custom"), Time.SYSTEM, metricCollectors.getMetrics())) : Optional.empty();
        HARouting hARouting = new HARouting(initializeRoutingFilterFactory, of, ksqlConfig);
        PushRouting pushRouting = new PushRouting();
        Optional<LocalCommands> createLocalCommands = createLocalCommands(ksqlRestConfig, ksqlEngine);
        QueryExecutor queryExecutor = new QueryExecutor(ksqlEngine, ksqlRestConfig, ksqlConfig, of, of2, rateLimiter, concurrencyLimiter, slidingWindowRateLimiter, slidingWindowRateLimiter2, hARouting, pushRouting, createLocalCommands);
        Duration ofMillis = Duration.ofMillis(ksqlRestConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG).longValue());
        Duration ofMillis2 = Duration.ofMillis(ksqlRestConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG).longValue());
        Objects.requireNonNull(apply);
        StreamedQueryResource streamedQueryResource = new StreamedQueryResource(ksqlEngine, ksqlRestConfig, create3, ofMillis, ofMillis2, apply::updateLastRequestTime, create4, errors, denyListPropertyValidator, queryExecutor);
        LinkedList linkedList = new LinkedList();
        linkedList.add(commandTopic);
        if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE).booleanValue()) {
            linkedList.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig));
        }
        CommandRunner commandRunner = new CommandRunner(interactiveStatementExecutor, create3, i, new ClusterTerminator(ksqlEngine, serviceContext, linkedList), serverState, ksqlConfig.getString("ksql.service.id"), Duration.ofMillis(ksqlRestConfig.getLong("ksql.server.command.blocked.threshold.error.ms").longValue()), str, InternalTopicSerdes.deserializer(Command.class), errors, kafkaTopicClientImpl, commandTopic, metricCollectors.getMetrics());
        Duration ofMillis3 = Duration.ofMillis(ksqlRestConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG).longValue());
        Objects.requireNonNull(apply);
        KsqlResource ksqlResource = new KsqlResource(ksqlEngine, commandRunner, ofMillis3, apply::updateLastRequestTime, create4, errors, denyListPropertyValidator);
        return new KsqlRestApplication(serviceContext, ksqlEngine, ksqlConfig, ksqlRestConfig, commandRunner, create3, statusResource, streamedQueryResource, ksqlResource, apply, defaultKsqlSecurityContextProvider, loadSecurityExtension, loadAuthenticationPlugin, serverState, create, ImmutableList.of(ksqlResource, ksqlEngine), RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter, initializeHeartbeatAgent, initializeLagReportingAgent, vertx, denyListPropertyValidator, of, of2, createLocalCommands, queryExecutor, metricCollectors, kafkaTopicClientImpl, createCommandTopicAdminClient, instant);
    }

    private static Optional<HeartbeatAgent> initializeHeartbeatAgent(KsqlRestConfig ksqlRestConfig, KsqlEngine ksqlEngine, ServiceContext serviceContext, Optional<LagReportingAgent> optional) {
        if (!ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG).booleanValue()) {
            return Optional.empty();
        }
        HeartbeatAgent.Builder builder = HeartbeatAgent.builder();
        builder.heartbeatSendInterval(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG).longValue()).heartbeatCheckInterval(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG).longValue()).heartbeatMissedThreshold(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_MISSED_THRESHOLD_CONFIG).longValue()).heartbeatWindow(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_WINDOW_MS_CONFIG).longValue()).discoverClusterInterval(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG).longValue()).threadPoolSize(ksqlRestConfig.getInt(KsqlRestConfig.KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG).intValue());
        Objects.requireNonNull(builder);
        optional.ifPresent((v1) -> {
            r1.addHostStatusListener(v1);
        });
        return Optional.of(builder.build(ksqlEngine, serviceContext));
    }

    private static Optional<LagReportingAgent> initializeLagReportingAgent(KsqlRestConfig ksqlRestConfig, KsqlEngine ksqlEngine, ServiceContext serviceContext) {
        return (ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_LAG_REPORTING_ENABLE_CONFIG).booleanValue() && ksqlRestConfig.getBoolean(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG).booleanValue()) ? Optional.of(LagReportingAgent.builder().lagSendIntervalMs(ksqlRestConfig.getLong(KsqlRestConfig.KSQL_LAG_REPORTING_SEND_INTERVAL_MS_CONFIG).longValue()).build(ksqlEngine, serviceContext)) : Optional.empty();
    }

    private static RoutingFilter.RoutingFilterFactory initializeRoutingFilterFactory(KsqlConfig ksqlConfig, Optional<HeartbeatAgent> optional, Optional<LagReportingAgent> optional2) {
        return (routingOptions, list, hostInfo, str, str2, i) -> {
            ImmutableList.Builder builder = ImmutableList.builder();
            if (routingOptions.getIsSkipForwardRequest()) {
                Optional<MaximumLagFilter> create = MaximumLagFilter.create(optional2, routingOptions, list, str, str2, i);
                Objects.requireNonNull(builder);
                create.map((v1) -> {
                    return r1.add(v1);
                });
            } else {
                if (!ksqlConfig.getBoolean("ksql.query.pull.enable.standby.reads").booleanValue()) {
                    builder.add(new ActiveHostFilter(hostInfo));
                }
                builder.add(new LivenessFilter(optional));
                Optional<MaximumLagFilter> create2 = MaximumLagFilter.create(optional2, routingOptions, list, str, str2, i);
                Objects.requireNonNull(builder);
                create2.map((v1) -> {
                    return r1.add(v1);
                });
            }
            return new RoutingFilters(builder.build());
        };
    }

    private void registerCommandTopic() {
        String commandTopicName = this.commandStore.getCommandTopicName();
        if (checkMigrationConditions(commandTopicName)) {
            log.warn("Migrating command topic from the service context Kafka to the command producer/consumer Kafka for ksql with id {}.", this.ksqlConfigNoPort.getString("ksql.service.id"));
            KsqlInternalTopicUtils.ensureTopic(commandTopicName, this.ksqlConfigNoPort, this.internalTopicClient);
            try {
                CommandTopicMigrationUtil.commandTopicMigration(commandTopicName, this.restConfig, this.ksqlConfigNoPort);
            } catch (Exception e) {
                log.warn("Failed to migrate command topic from the service context Kafka to the command producer/consumer Kafka for ksql with id {}.", this.ksqlConfigNoPort.getString("ksql.service.id"), e);
                throw e;
            }
        } else if (this.restConfig.getString(KsqlRestConfig.KSQL_COMMAND_TOPIC_MIGRATION_CONFIG).equals(KsqlRestConfig.KSQL_COMMAND_TOPIC_MIGRATION_MIGRATING)) {
            RetryUtil.retryWithBackoff(Integer.MAX_VALUE, 10000, 10000, () -> {
                if (!this.internalTopicClient.isTopicExists(commandTopicName)) {
                    throw new RuntimeException("command topic migration still in process, no new command topic on command producer/consumer Kafka.");
                }
                if (((Long) this.internalTopicClient.listTopicsEndOffsets(Collections.singletonList(commandTopicName)).get(new TopicPartition(commandTopicName, 0))).longValue() == 0) {
                    throw new RuntimeException("command topic migration still in process, empty command topic on command producer/consumer Kafka.");
                }
            }, new Class[]{WakeupException.class});
        }
        if (CommandTopicBackupUtil.commandTopicMissingWithValidBackup(commandTopicName, this.internalTopicClient, this.ksqlConfigNoPort)) {
            log.warn("Command topic is not found and it is not in sync with backup. Use backup to recover the command topic.");
        } else {
            KsqlInternalTopicUtils.ensureTopic(commandTopicName, this.ksqlConfigNoPort, this.internalTopicClient);
        }
    }

    private boolean checkMigrationConditions(String str) {
        boolean z;
        if (this.internalTopicClient.isTopicExists(str)) {
            z = ((Long) this.internalTopicClient.listTopicsEndOffsets(Collections.singletonList(str)).get(new TopicPartition(str, 0))).longValue() == 0;
        } else {
            z = true;
        }
        return z && this.serviceContext.getTopicClient().isTopicExists(str) && this.restConfig.getString(KsqlRestConfig.KSQL_COMMAND_TOPIC_MIGRATION_CONFIG).equals(KsqlRestConfig.KSQL_COMMAND_TOPIC_MIGRATION_MIGRATOR);
    }

    private static KsqlSecurityExtension loadSecurityExtension(KsqlConfig ksqlConfig) {
        KsqlSecurityExtension ksqlSecurityExtension = (KsqlSecurityExtension) Optional.ofNullable((KsqlSecurityExtension) ksqlConfig.getConfiguredInstance("ksql.security.extension.class", KsqlSecurityExtension.class)).orElse(new KsqlDefaultSecurityExtension());
        ksqlSecurityExtension.initialize(ksqlConfig);
        return ksqlSecurityExtension;
    }

    private static Optional<AuthenticationPlugin> loadAuthenticationPlugin(KsqlRestConfig ksqlRestConfig) {
        Optional<AuthenticationPlugin> ofNullable = Optional.ofNullable((AuthenticationPlugin) ksqlRestConfig.getConfiguredInstance(KsqlRestConfig.KSQL_AUTHENTICATION_PLUGIN_CLASS, AuthenticationPlugin.class));
        ofNullable.ifPresent(authenticationPlugin -> {
            authenticationPlugin.configure(ksqlRestConfig.originals());
        });
        return ofNullable;
    }

    private void displayWelcomeMessage(String str) {
        if (System.console() == null) {
            return;
        }
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
        WelcomeMsgUtils.displayWelcomeMessage(80, printWriter);
        printWriter.printf("Server %s listening on %s%n", AppInfo.getVersion(), str);
        printWriter.println();
        printWriter.println("To access the KSQL CLI, run:");
        printWriter.println("ksql " + String.valueOf(getListeners().get(0)));
        printWriter.println();
        printWriter.flush();
    }

    private static void maybeCreateProcessingLogStream(ProcessingLogConfig processingLogConfig, KsqlConfig ksqlConfig, KsqlRestConfig ksqlRestConfig, KsqlResource ksqlResource, ServiceContext serviceContext) {
        if (processingLogConfig.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE).booleanValue()) {
            try {
                ServerInternalKsqlClient serverInternalKsqlClient = new ServerInternalKsqlClient(ksqlResource, new KsqlSecurityContext(Optional.empty(), serviceContext));
                URI serverAddress = ServerUtil.getServerAddress(ksqlRestConfig);
                if (!processingLogStreamExists(serverInternalKsqlClient, serverAddress, processingLogConfig.getString(ProcessingLogConfig.STREAM_NAME)) && serverInternalKsqlClient.makeKsqlRequest(serverAddress, ProcessingLogServerUtils.processingLogStreamCreateStatement(processingLogConfig, ksqlConfig), ImmutableMap.of()).isSuccessful()) {
                    log.info("Successfully created processing log stream.");
                }
            } catch (Exception e) {
                log.error("Error while sending processing log CreateStream request to KsqlResource: ", e);
            }
        }
    }

    private static boolean processingLogStreamExists(SimpleKsqlClient simpleKsqlClient, URI uri, String str) {
        return ((StreamsList) ((KsqlEntityList) simpleKsqlClient.makeKsqlRequest(uri, "list streams;", ImmutableMap.of()).getResponse()).get(0)).getStreams().stream().anyMatch(stream -> {
            return stream.getName().equals(str);
        });
    }

    private static Optional<LocalCommands> createLocalCommands(KsqlRestConfig ksqlRestConfig, KsqlEngine ksqlEngine) {
        return !ksqlRestConfig.getString(KsqlRestConfig.KSQL_LOCAL_COMMANDS_LOCATION_CONFIG).isEmpty() ? Optional.of(LocalCommands.open(ksqlEngine, new File(ksqlRestConfig.getString(KsqlRestConfig.KSQL_LOCAL_COMMANDS_LOCATION_CONFIG)))) : Optional.empty();
    }

    @VisibleForTesting
    KsqlConfig buildConfigWithPort() {
        Map originals = this.ksqlConfigNoPort.originals();
        originals.put("ksql.streams.application.server", this.restConfig.getInterNodeListener(this::resolvePort).toString());
        return new KsqlConfig(originals);
    }

    private int resolvePort(URL url) {
        return ((Integer) getListeners().stream().filter(url2 -> {
            return url2.getProtocol().equals(url.getProtocol()) && url2.getHost().equals(url.getHost());
        }).map((v0) -> {
            return v0.getPort();
        }).findFirst().orElseThrow(() -> {
            return new IllegalStateException("Failed resolve port for listener: " + String.valueOf(url));
        })).intValue();
    }

    private static DropwizardMetricsOptions setUpHttpMetrics(KsqlConfig ksqlConfig) {
        DropwizardMetricsOptions jmxDomain = new DropwizardMetricsOptions().setJmxEnabled(true).setBaseName("_confluent-ksql-" + ksqlConfig.getString("ksql.service.id")).setJmxDomain("io.confluent.ksql.metrics");
        Iterator<Match> it = MonitoredEndpoints.getMonitoredEndpoints().iterator();
        while (it.hasNext()) {
            jmxDomain.addMonitoredHttpServerUri(it.next());
        }
        return jmxDomain;
    }

    private static Admin createCommandTopicAdminClient(KsqlRestConfig ksqlRestConfig, KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap(ksqlConfig.getKsqlAdminClientConfigProps());
        hashMap.putAll(ksqlRestConfig.getCommandProducerProperties());
        return new DefaultKafkaClientSupplier().getAdmin(hashMap);
    }
}
