package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.rest.RestConfig;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/KsqlRestConfig.class */
public class KsqlRestConfig extends RestConfig {
    private static final String KSQL_CONFIG_PREFIX = "ksql.";
    private static final String COMMAND_CONSUMER_PREFIX = "ksql.server.command.consumer.";
    private static final String COMMAND_PRODUCER_PREFIX = "ksql.server.command.producer.";
    private static final Logger LOGGER = LoggerFactory.getLogger(KsqlRestConfig.class);
    private static final String KSQL_SERVER_ERRORS_DOC = "A class the implementing " + ErrorMessages.class.getSimpleName() + " interface.This allows the KSQL server to return pluggable error messages.";
    public static final String ADVERTISED_LISTENER_CONFIG = "ksql.advertised.listener";
    private static final String ADVERTISED_LISTENER_DOC = "The listener used for communication between KSQL nodes in the cluster, if different to the 'listeners' config property. In IaaS environments, this may need to be different from the interface to which the server binds. If this is not set, the first value from listeners will be used. Unlike listeners, it is not valid to use the 0.0.0.0 (IPv4) or [::] (IPv6) wildcard addresses.";
    static final String STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG = "query.stream.disconnect.check";
    private static final String STREAMED_QUERY_DISCONNECT_CHECK_MS_DOC = "How often to send an empty line as part of the response while streaming queries as JSON; this helps proactively determine if the connection has been terminated in order to avoid keeping the created streams job alive longer than necessary";
    static final String DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG = "ksql.server.command.response.timeout.ms";
    private static final String DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_DOC = "How long to wait for a distributed command to be executed by the local node before returning a response";
    public static final String INSTALL_DIR_CONFIG = "ksql.server.install.dir";
    private static final String INSTALL_DIR_DOC = "The directory that ksql is installed in. This is set in the ksql-server-start script.";
    static final String KSQL_WEBSOCKETS_NUM_THREADS = "ksql.server.websockets.num.threads";
    private static final String KSQL_WEBSOCKETS_NUM_THREADS_DOC = "The number of websocket threads to handle query results";
    static final String KSQL_SERVER_PRECONDITIONS = "ksql.server.preconditions";
    private static final String KSQL_SERVER_PRECONDITIONS_DOC = "A comma separated list of classes implementing KsqlServerPrecondition. The KSQL server will not start serving requests until all preconditions are satisfied. Until that time, requests will return a 503 error";
    static final String KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER = "ksql.server.exception.uncaught.handler.enable";
    private static final String KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC = "Whether or not to set KsqlUncaughtExceptionHandler as the UncaughtExceptionHandler for all threads in the application (this can be overridden). Default is false.";
    public static final String KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG = "ksql.healthcheck.interval.ms";
    private static final String KSQL_HEALTHCHECK_INTERVAL_MS_DOC = "Minimum time between consecutive health check evaluations. Health check queries before the interval has elapsed will receive cached responses.";
    static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS = "ksql.server.command.blocked.threshold.error.ms";
    private static final String KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC = "How long to wait for the command runner to process a command from the command topic before reporting an error metric.";
    static final String KSQL_SERVER_ERROR_MESSAGES = "ksql.server.error.messages";
    public static final String KSQL_HEARTBEAT_ENABLE_CONFIG = "ksql.heartbeat.enable";
    private static final String KSQL_HEARTBEAT_ENABLE_DOC = "Whether the heartheat mechanism is enabled or not. It is disabled by default.";
    public static final String KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG = "ksql.heartbeat.send.interval.ms";
    private static final String KSQL_HEARTBEAT_SEND_INTERVAL_MS_DOC = "Interval at which heartbeats are broadcasted to servers.";
    public static final String KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG = "ksql.heartbeat.check.interval.ms";
    private static final String KSQL_HEARTBEAT_CHECK_INTERVAL_MS_DOC = "Interval at which server processes received heartbeats.";
    public static final String KSQL_HEARTBEAT_WINDOW_MS_CONFIG = "ksql.heartbeat.window.ms";
    private static final String KSQL_HEARTBEAT_WINDOW_MS_DOC = "Size of time window across which to count missed heartbeats.";
    public static final String KSQL_HEARTBEAT_MISSED_THRESHOLD_CONFIG = "ksql.heartbeat.missed.threshold.ms";
    private static final String KSQL_HEARTBEAT_MISSED_THRESHOLD_DOC = "Minimum number of consecutive missed heartbeats that flag a server as down.";
    public static final String KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG = "ksql.heartbeat.discover.interval.ms";
    private static final String KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_DOC = "Interval at which server attempts to discover what other ksql servers exist in the cluster.";
    public static final String KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG = "ksql.heartbeat.thread.pool.size";
    private static final String KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG_DOC = "Size of thread pool used for sending / processing heartbeats and cluster discovery.";
    public static final String KSQL_LAG_REPORTING_ENABLE_CONFIG = "ksql.lag.reporting.enable";
    private static final String KSQL_LAG_REPORTING_ENABLE_DOC = "Whether lag reporting is enabled or not. It is disabled by default.";
    public static final String KSQL_LAG_REPORTING_SEND_INTERVAL_MS_CONFIG = "ksql.lag.reporting.send.interval.ms";
    private static final String KSQL_LAG_REPORTING_SEND_INTERVAL_MS_DOC = "Interval at which lag reports are broadcasted to servers.";
    private static final ConfigDef CONFIG_DEF = baseConfigDef().define(ADVERTISED_LISTENER_CONFIG, ConfigDef.Type.STRING, (Object) null, ConfigValidators.nullsAllowed(ConfigValidators.validUrl()), ConfigDef.Importance.HIGH, ADVERTISED_LISTENER_DOC).define(STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG, ConfigDef.Type.LONG, 1000L, ConfigDef.Importance.LOW, STREAMED_QUERY_DISCONNECT_CHECK_MS_DOC).define(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, 5000L, ConfigDef.Importance.LOW, DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_DOC).define(INSTALL_DIR_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, INSTALL_DIR_DOC).define(KSQL_WEBSOCKETS_NUM_THREADS, ConfigDef.Type.INT, 5, ConfigDef.Importance.LOW, KSQL_WEBSOCKETS_NUM_THREADS_DOC).define(KSQL_SERVER_PRECONDITIONS, ConfigDef.Type.LIST, "", ConfigDef.Importance.LOW, KSQL_SERVER_PRECONDITIONS_DOC).define(KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC).define(KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, 5000L, ConfigDef.Importance.LOW, KSQL_HEALTHCHECK_INTERVAL_MS_DOC).define(KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS, ConfigDef.Type.LONG, 15000L, ConfigDef.Importance.LOW, KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS_DOC).define(KSQL_SERVER_ERROR_MESSAGES, ConfigDef.Type.CLASS, DefaultErrorMessages.class, ConfigDef.Importance.LOW, KSQL_SERVER_ERRORS_DOC).define(KSQL_HEARTBEAT_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_ENABLE_DOC).define(KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, 100L, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_SEND_INTERVAL_MS_DOC).define(KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, 200L, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_CHECK_INTERVAL_MS_DOC).define(KSQL_HEARTBEAT_WINDOW_MS_CONFIG, ConfigDef.Type.LONG, 2000L, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_WINDOW_MS_DOC).define(KSQL_HEARTBEAT_MISSED_THRESHOLD_CONFIG, ConfigDef.Type.LONG, 3L, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_MISSED_THRESHOLD_DOC).define(KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG, ConfigDef.Type.LONG, 2000L, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_DOC).define(KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG, ConfigDef.Type.INT, 3, ConfigDef.Importance.MEDIUM, KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG_DOC).define(KSQL_LAG_REPORTING_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, KSQL_LAG_REPORTING_ENABLE_DOC).define(KSQL_LAG_REPORTING_SEND_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, 5000L, ConfigDef.Importance.MEDIUM, KSQL_LAG_REPORTING_SEND_INTERVAL_MS_DOC);

    public KsqlRestConfig(Map<?, ?> map) {
        super(CONFIG_DEF, map);
        List list = getList("listeners");
        if (list.isEmpty()) {
            throw new KsqlException("listeners must be supplied.  List of listeners. http and https are supported. Each listener must include the protocol, hostname, and port. For example: http://myhost:8080, https://0.0.0.0:8081");
        }
        list.forEach(str -> {
            ConfigValidators.validUrl().ensureValid("listeners", str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Object> getOriginals() {
        return originalsWithPrefix("");
    }

    private Map<String, Object> getPropertiesWithOverrides(String str) {
        Map<String, Object> originals = getOriginals();
        originals.putAll(originalsWithPrefix(str));
        return originals;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Object> getCommandConsumerProperties() {
        return getPropertiesWithOverrides(COMMAND_CONSUMER_PREFIX);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Object> getCommandProducerProperties() {
        return getPropertiesWithOverrides(COMMAND_PRODUCER_PREFIX);
    }

    public Map<String, Object> getKsqlConfigProperties() {
        return getOriginals();
    }

    public URL getInterNodeListener(Function<URL, Integer> function) {
        return getInterNodeListener(function, LOGGER);
    }

    @VisibleForTesting
    URL getInterNodeListener(Function<URL, Integer> function, Logger logger) {
        return getString(ADVERTISED_LISTENER_CONFIG) == null ? getInterNodeListenerFromFirstListener(function, logger) : getInterNodeListenerFromExplicitConfig(logger);
    }

    private URL getInterNodeListenerFromFirstListener(Function<URL, Integer> function, Logger logger) {
        List list = getList("listeners");
        URL parseUrl = parseUrl((String) list.get(0), "listeners");
        InetAddress orElseThrow = parseInetAddress(parseUrl.getHost()).orElseThrow(() -> {
            return new ConfigException("listeners", list, "Could not resolve first host");
        });
        URL sanitizeInterNodeListener = sanitizeInterNodeListener(parseUrl, function, orElseThrow.isAnyLocalAddress());
        logInterNodeListener(logger, sanitizeInterNodeListener, Optional.of(orElseThrow), "first 'listeners'");
        return sanitizeInterNodeListener;
    }

    private URL getInterNodeListenerFromExplicitConfig(Logger logger) {
        String string = getString(ADVERTISED_LISTENER_CONFIG);
        URL parseUrl = parseUrl(string, ADVERTISED_LISTENER_CONFIG);
        if (parseUrl.getPort() <= 0) {
            throw new ConfigException(ADVERTISED_LISTENER_CONFIG, string, "Must have valid port");
        }
        Optional<InetAddress> parseInetAddress = parseInetAddress(parseUrl.getHost());
        parseInetAddress.ifPresent(inetAddress -> {
            if (inetAddress.isAnyLocalAddress()) {
                throw new ConfigException(ADVERTISED_LISTENER_CONFIG, string, "Can not be wildcard");
            }
        });
        URL sanitizeInterNodeListener = sanitizeInterNodeListener(parseUrl, url -> {
            return Integer.valueOf(parseUrl.getPort());
        }, false);
        logInterNodeListener(logger, sanitizeInterNodeListener, parseInetAddress, "'ksql.advertised.listener'");
        return sanitizeInterNodeListener;
    }

    private static void logInterNodeListener(Logger logger, URL url, Optional<InetAddress> optional, String str) {
        optional.ifPresent(inetAddress -> {
            if (inetAddress.isLoopbackAddress()) {
                logger.warn("{} config is set to a loopback address: {}. Intra-node communication will only work between nodes running on the same machine.", str, url);
            }
            if (inetAddress.isAnyLocalAddress()) {
                logger.warn("{} config uses wildcard address: {}. Intra-node communication will only work between nodes running on the same machine.", str, url);
            }
        });
        logger.info("Using {} config for intra-node communication: {}", str, url);
    }

    private static URL sanitizeInterNodeListener(URL url, Function<URL, Integer> function, boolean z) {
        try {
            return new URL(url.getProtocol(), z ? getLocalHostName() : url.getHost(), url.getPort() == 0 ? function.apply(url).intValue() : url.getPort(), "");
        } catch (MalformedURLException e) {
            throw new KsqlServerException("Resolved first listener to malformed URL", e);
        }
    }

    private static URL parseUrl(String str, String str2) {
        try {
            return new URL(str);
        } catch (MalformedURLException e) {
            throw new ConfigException(str2, str, e.getMessage());
        }
    }

    private static Optional<InetAddress> parseInetAddress(String str) {
        try {
            return Optional.of(InetAddress.getByName(str));
        } catch (UnknownHostException e) {
            return Optional.empty();
        }
    }

    private static String getLocalHostName() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName();
        } catch (UnknownHostException e) {
            throw new KsqlServerException("Failed to obtain local host info", e);
        }
    }
}
