package io.confluent.ksql.api.server;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.api.auth.AuthenticationPlugin;
import io.confluent.ksql.api.spi.Endpoints;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.PushQueryId;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.execution.PullQueryExecutorMetrics;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.KeystoreUtil;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.netty.handler.ssl.OpenSsl;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.PfxOptions;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/server/Server.class */
public class Server {
    private static final Logger log = LoggerFactory.getLogger(Server.class);
    private final Vertx vertx;
    private final KsqlRestConfig config;
    private final Endpoints endpoints;
    private final int maxPushQueryCount;
    private final KsqlSecurityExtension securityExtension;
    private final Optional<AuthenticationPlugin> authenticationPlugin;
    private final ServerState serverState;
    private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
    private URI internalListener;
    private WorkerExecutor workerExecutor;
    private FileWatcher fileWatcher;
    private final Map<PushQueryId, PushQueryHolder> queries = new ConcurrentHashMap();
    private final Set<HttpConnection> connections = new ConcurrentHashSet();
    private final Set<String> deploymentIds = new HashSet();
    private final List<URI> listeners = new ArrayList();

    public Server(Vertx vertx, KsqlRestConfig ksqlRestConfig, Endpoints endpoints, KsqlSecurityExtension ksqlSecurityExtension, Optional<AuthenticationPlugin> optional, ServerState serverState, Optional<PullQueryExecutorMetrics> optional2) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.config = (KsqlRestConfig) Objects.requireNonNull(ksqlRestConfig);
        this.endpoints = (Endpoints) Objects.requireNonNull(endpoints);
        this.securityExtension = (KsqlSecurityExtension) Objects.requireNonNull(ksqlSecurityExtension);
        this.authenticationPlugin = (Optional) Objects.requireNonNull(optional);
        this.serverState = (ServerState) Objects.requireNonNull(serverState);
        this.maxPushQueryCount = ksqlRestConfig.getInt(KsqlRestConfig.MAX_PUSH_QUERIES).intValue();
        this.pullQueryMetrics = (Optional) Objects.requireNonNull(optional2, "pullQueryMetrics");
        if (OpenSsl.isAvailable()) {
            return;
        }
        log.warn("OpenSSL does not appear to be installed. ksqlDB will fall back to using the JDK TLS implementation. OpenSSL is recommended for better performance.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void start() {
        if (!this.deploymentIds.isEmpty()) {
            throw new IllegalStateException("Already started");
        }
        this.workerExecutor = this.vertx.createSharedWorkerExecutor("ksql-workers", this.config.getInt(KsqlRestConfig.WORKER_POOL_SIZE).intValue());
        configureTlsCertReload(this.config);
        List<URI> parseListeners = parseListeners(this.config);
        Optional<URI> parseInternalListener = parseInternalListener(this.config, parseListeners);
        ArrayList<URI> arrayList = new ArrayList(parseListeners);
        arrayList.getClass();
        parseInternalListener.ifPresent((v1) -> {
            r1.add(v1);
        });
        int intValue = this.config.getInt(KsqlRestConfig.VERTICLE_INSTANCES).intValue();
        log.debug("Deploying " + intValue + " instances of server verticle");
        ArrayList arrayList2 = new ArrayList();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (URI uri : arrayList) {
            Optional<U> map = parseInternalListener.map(uri2 -> {
                return Boolean.valueOf(uri2.equals(uri));
            });
            for (int i = 0; i < intValue; i++) {
                VertxCompletableFuture vertxCompletableFuture = new VertxCompletableFuture();
                ServerVerticle serverVerticle = new ServerVerticle(this.endpoints, createHttpServerOptions(this.config, uri.getHost(), uri.getPort(), uri.getScheme().equalsIgnoreCase("https"), ((Boolean) map.orElse(false)).booleanValue()), this, map, this.pullQueryMetrics);
                this.vertx.deployVerticle(serverVerticle, vertxCompletableFuture);
                int i2 = i;
                arrayList2.add(vertxCompletableFuture.thenApply(str -> {
                    if (i2 == 0) {
                        try {
                            concurrentHashMap.put(uri, new URI(uri.getScheme(), null, uri.getHost(), serverVerticle.actualPort(), null, null, null));
                        } catch (URISyntaxException e) {
                            throw new KsqlException(e);
                        }
                    }
                    return str;
                }));
            }
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[0])).get();
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                this.deploymentIds.add(((CompletableFuture) it.next()).get());
            }
            Iterator<URI> it2 = parseListeners.iterator();
            while (it2.hasNext()) {
                this.listeners.add(concurrentHashMap.get(it2.next()));
            }
            if (parseInternalListener.isPresent()) {
                this.internalListener = (URI) concurrentHashMap.get(parseInternalListener.get());
            }
            log.info("API server started");
        } catch (Exception e) {
            throw new KsqlException("Failed to start API server", e);
        }
    }

    public synchronized void stop() {
        if (this.deploymentIds.isEmpty()) {
            throw new IllegalStateException("Not started");
        }
        if (this.workerExecutor != null) {
            this.workerExecutor.close();
        }
        if (this.fileWatcher != null) {
            this.fileWatcher.shutdown();
        }
        ArrayList arrayList = new ArrayList();
        for (String str : this.deploymentIds) {
            VertxCompletableFuture vertxCompletableFuture = new VertxCompletableFuture();
            this.vertx.undeploy(str, vertxCompletableFuture);
            arrayList.add(vertxCompletableFuture);
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
            this.deploymentIds.clear();
            this.listeners.clear();
            log.info("API server stopped");
        } catch (Exception e) {
            throw new KsqlException("Failure in stopping API server", e);
        }
    }

    public synchronized void restart() {
        log.info("Restarting server");
        stop();
        start();
    }

    public WorkerExecutor getWorkerExecutor() {
        return this.workerExecutor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void registerQuery(PushQueryHolder pushQueryHolder) throws KsqlApiException {
        Objects.requireNonNull(pushQueryHolder);
        if (this.queries.size() == this.maxPushQueryCount) {
            throw new KsqlApiException("Maximum number of push queries exceeded", Errors.ERROR_CODE_MAX_PUSH_QUERIES_EXCEEDED);
        }
        if (this.queries.putIfAbsent(pushQueryHolder.getId(), pushQueryHolder) != null) {
            throw new IllegalStateException("Glitch in the matrix");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<PushQueryHolder> removeQuery(PushQueryId pushQueryId) {
        return Optional.ofNullable(this.queries.remove(pushQueryId));
    }

    public Set<PushQueryId> getQueryIDs() {
        return new HashSet(this.queries.keySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlSecurityExtension getSecurityExtension() {
        return this.securityExtension;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<AuthenticationPlugin> getAuthenticationPlugin() {
        return this.authenticationPlugin;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerState getServerState() {
        return this.serverState;
    }

    public KsqlRestConfig getConfig() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void registerQueryConnection(HttpConnection httpConnection) {
        this.connections.add(Objects.requireNonNull(httpConnection));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeQueryConnection(HttpConnection httpConnection) {
        this.connections.remove(Objects.requireNonNull(httpConnection));
    }

    public int queryConnectionCount() {
        return this.connections.size();
    }

    public synchronized List<URI> getListeners() {
        return ImmutableList.copyOf(this.listeners);
    }

    public synchronized Optional<URI> getInternalListener() {
        return Optional.ofNullable(this.internalListener);
    }

    private void configureTlsCertReload(KsqlRestConfig ksqlRestConfig) {
        if (ksqlRestConfig.getBoolean(KsqlRestConfig.SSL_KEYSTORE_RELOAD_CONFIG).booleanValue()) {
            Path path = !ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG).isEmpty() ? Paths.get(ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_WATCH_LOCATION_CONFIG), new String[0]) : Paths.get(ksqlRestConfig.getString("ssl.keystore.location"), new String[0]);
            try {
                this.fileWatcher = new FileWatcher(path, this::restart);
                this.fileWatcher.start();
                log.info("Enabled SSL cert auto reload for: " + path);
            } catch (IOException e) {
                log.error("Failed to enable SSL cert auto reload", e);
            }
        }
    }

    private static HttpServerOptions createHttpServerOptions(KsqlRestConfig ksqlRestConfig, String str, int i, boolean z, boolean z2) {
        HttpServerOptions perFrameWebSocketCompressionSupported = new HttpServerOptions().setHost(str).setPort(i).setReuseAddress(true).setReusePort(true).setIdleTimeout(600).setIdleTimeoutUnit(TimeUnit.SECONDS).setPerMessageWebSocketCompressionSupported(true).setPerFrameWebSocketCompressionSupported(true);
        if (z) {
            setTlsOptions(ksqlRestConfig, perFrameWebSocketCompressionSupported, ksqlRestConfig.getString(z2 ? KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_INTERNAL_CONFIG : KsqlRestConfig.KSQL_SSL_KEYSTORE_ALIAS_EXTERNAL_CONFIG), z2 ? ksqlRestConfig.getClientAuthInternal() : ksqlRestConfig.getClientAuth());
        }
        return perFrameWebSocketCompressionSupported;
    }

    private static void setTlsOptions(KsqlRestConfig ksqlRestConfig, HttpServerOptions httpServerOptions, String str, ClientAuth clientAuth) {
        httpServerOptions.setUseAlpn(true).setSsl(true);
        configureTlsKeyStore(ksqlRestConfig, httpServerOptions, str);
        configureTlsTrustStore(ksqlRestConfig, httpServerOptions);
        List list = ksqlRestConfig.getList(KsqlRestConfig.SSL_ENABLED_PROTOCOLS_CONFIG);
        if (!list.isEmpty()) {
            httpServerOptions.setEnabledSecureTransportProtocols(new HashSet(list));
        }
        List list2 = ksqlRestConfig.getList(KsqlRestConfig.SSL_CIPHER_SUITES_CONFIG);
        if (!list2.isEmpty()) {
            Set enabledCipherSuites = httpServerOptions.getEnabledCipherSuites();
            enabledCipherSuites.clear();
            enabledCipherSuites.addAll(list2);
        }
        httpServerOptions.setClientAuth(clientAuth);
    }

    private static void configureTlsKeyStore(KsqlRestConfig ksqlRestConfig, HttpServerOptions httpServerOptions, String str) {
        String string = ksqlRestConfig.getString("ssl.keystore.location");
        Password password = ksqlRestConfig.getPassword("ssl.keystore.password");
        if (string == null || string.isEmpty()) {
            return;
        }
        String string2 = ksqlRestConfig.getString(KsqlRestConfig.SSL_KEYSTORE_TYPE_CONFIG);
        if (str != null && !str.isEmpty()) {
            httpServerOptions.setKeyStoreOptions(new JksOptions().setValue(KeystoreUtil.getKeyStore(string2, string, Optional.ofNullable(Strings.emptyToNull(password.value())), Optional.ofNullable(Strings.emptyToNull(password.value())), str)).setPassword(password.value()));
        } else if (string2.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) {
            httpServerOptions.setKeyStoreOptions(new JksOptions().setPath(string).setPassword(password.value()));
        } else if (string2.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) {
            httpServerOptions.setPfxKeyCertOptions(new PfxOptions().setPath(string).setPassword(password.value()));
        }
    }

    private static void configureTlsTrustStore(KsqlRestConfig ksqlRestConfig, HttpServerOptions httpServerOptions) {
        String string = ksqlRestConfig.getString("ssl.truststore.location");
        Password password = ksqlRestConfig.getPassword("ssl.truststore.password");
        if (string == null || string.isEmpty()) {
            return;
        }
        String string2 = ksqlRestConfig.getString(KsqlRestConfig.SSL_TRUSTSTORE_TYPE_CONFIG);
        if (string2.equals(KsqlRestConfig.SSL_STORE_TYPE_JKS)) {
            httpServerOptions.setTrustStoreOptions(new JksOptions().setPath(string).setPassword(password.value()));
        } else if (string2.equals(KsqlRestConfig.SSL_STORE_TYPE_PKCS12)) {
            httpServerOptions.setPfxTrustOptions(new PfxOptions().setPath(string).setPassword(password.value()));
        }
    }

    private static List<URI> parseListeners(KsqlRestConfig ksqlRestConfig) {
        return parseListenerStrings(ksqlRestConfig, ksqlRestConfig.getList(KsqlRestConfig.LISTENERS_CONFIG));
    }

    private static Optional<URI> parseInternalListener(KsqlRestConfig ksqlRestConfig, List<URI> list) {
        if (ksqlRestConfig.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG) == null) {
            return Optional.empty();
        }
        URI uri = parseListenerStrings(ksqlRestConfig, ImmutableList.of(ksqlRestConfig.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG))).get(0);
        return list.contains(uri) ? Optional.empty() : Optional.of(uri);
    }

    private static List<URI> parseListenerStrings(KsqlRestConfig ksqlRestConfig, List<String> list) {
        String string;
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            try {
                URI uri = new URI(str);
                String scheme = uri.getScheme();
                if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) {
                    throw new ConfigException("Invalid URI scheme should be http or https: " + str);
                }
                if ("https".equalsIgnoreCase(scheme) && ((string = ksqlRestConfig.getString("ssl.keystore.location")) == null || string.isEmpty())) {
                    throw new ConfigException("https listener specified but no keystore provided");
                }
                arrayList.add(uri);
            } catch (URISyntaxException e) {
                throw new ConfigException("Invalid listener URI: " + str);
            }
        }
        return arrayList;
    }
}
