/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.security.InvalidParameterException;
import java.security.NoSuchAlgorithmException;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.crypto.KeyGenerator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.TopicCreationConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DistributedConfig
extends WorkerConfig {
    private static final Logger log = LoggerFactory.getLogger(DistributedConfig.class);
    public static final String GROUP_ID_CONFIG = "group.id";
    private static final String GROUP_ID_DOC = "A unique string that identifies the Connect cluster group this worker belongs to.";
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect worker failures. The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove the worker from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> and <code>group.max.session.timeout.ms</code>.";
    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
    public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
    private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.";
    public static final String METADATA_RECOVERY_STRATEGY_CONFIG = "metadata.recovery.strategy";
    private static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how the client recovers when none of the brokers known to it is available. If set to <code>none</code>, the client fails. If set to <code>rebootstrap</code>, the client repeats the bootstrap process using <code>bootstrap.servers</code>. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. Brokers appear unavailable when disconnected and no current retry attempt is in-progress. Consider increasing <code>reconnect.backoff.ms</code> and <code>reconnect.backoff.max.ms</code> and decreasing <code>socket.connection.setup.timeout.ms</code> and <code>socket.connection.setup.timeout.max.ms</code> for the client. Rebootstrap is also triggered if connection cannot be established to any of the brokers for <code>metadata.recovery.rebootstrap.trigger.ms</code> milliseconds or if server requests rebootstrap.";
    public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
    public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG = "metadata.recovery.rebootstrap.trigger.ms";
    private static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = "If a client configured to rebootstrap using <code>metadata.recovery.strategy=rebootstrap</code> is unable to obtain metadata from any of the brokers in the last known metadata for this interval, client repeats the bootstrap process using <code>bootstrap.servers</code> configuration.";
    public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS = 300000L;
    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms";
    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining.";
    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms";
    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and  fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.";
    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 300000;
    public static final String CONFIG_STORAGE_PREFIX = "config.storage.";
    public static final String OFFSET_STORAGE_PREFIX = "offset.storage.";
    public static final String STATUS_STORAGE_PREFIX = "status.storage.";
    public static final String TOPIC_SUFFIX = "topic";
    public static final String PARTITIONS_SUFFIX = "partitions";
    public static final String REPLICATION_FACTOR_SUFFIX = "replication.factor";
    public static final String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
    private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "The name of the Kafka topic where source connector offsets are stored";
    public static final String OFFSET_STORAGE_PARTITIONS_CONFIG = "offset.storage.partitions";
    private static final String OFFSET_STORAGE_PARTITIONS_CONFIG_DOC = "The number of partitions used when creating the offset storage topic";
    public static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG = "offset.storage.replication.factor";
    private static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the offset storage topic";
    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
    private static final String CONFIG_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector configurations are stored";
    public static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG = "config.storage.replication.factor";
    private static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the configuration storage topic";
    public static final String STATUS_STORAGE_TOPIC_CONFIG = "status.storage.topic";
    public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector and task status are stored";
    public static final String STATUS_STORAGE_PARTITIONS_CONFIG = "status.storage.partitions";
    private static final String STATUS_STORAGE_PARTITIONS_CONFIG_DOC = "The number of partitions used when creating the status storage topic";
    public static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG = "status.storage.replication.factor";
    private static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the status storage topic";
    public static final String CONNECT_PROTOCOL_CONFIG = "connect.protocol";
    public static final String CONNECT_PROTOCOL_DOC = "Compatibility mode for Kafka Connect Protocol";
    public static final String CONNECT_PROTOCOL_DEFAULT = ConnectProtocolCompatibility.SESSIONED.toString();
    public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG = "scheduled.rebalance.max.delay.ms";
    public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC = "The maximum delay that is scheduled in order to wait for the return of one or more departed workers before rebalancing and reassigning their connectors and tasks to the group. During this period the connectors and tasks of the departed workers remain unassigned";
    public static final int SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT = Math.toIntExact(TimeUnit.SECONDS.toMillis(300L));
    public static final String INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG = "inter.worker.key.generation.algorithm";
    public static final String INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT = "HmacSHA256";
    public static final String INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC = "The algorithm to use for generating internal request keys. The algorithm 'HmacSHA256' will be used as a default on JVMs that support it; on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
    public static final String INTER_WORKER_KEY_SIZE_CONFIG = "inter.worker.key.size";
    public static final String INTER_WORKER_KEY_SIZE_DOC = "The size of the key to use for signing internal requests, in bits. If null, the default key size for the key generation algorithm will be used.";
    public static final Long INTER_WORKER_KEY_SIZE_DEFAULT = null;
    public static final String INTER_WORKER_KEY_TTL_MS_CONFIG = "inter.worker.key.ttl.ms";
    public static final String INTER_WORKER_KEY_TTL_MS_MS_DOC = "The TTL of generated session keys used for internal request validation (in milliseconds)";
    public static final int INTER_WORKER_KEY_TTL_MS_MS_DEFAULT = Math.toIntExact(TimeUnit.HOURS.toMillis(1L));
    public static final String INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG = "inter.worker.signature.algorithm";
    public static final String INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT = "HmacSHA256";
    public static final String INTER_WORKER_SIGNATURE_ALGORITHM_DOC = "The algorithm used to sign internal requestsThe algorithm 'inter.worker.signature.algorithm' will be used as a default on JVMs that support it; on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
    public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG = "inter.worker.verification.algorithms";
    public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList("HmacSHA256");
    public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests, which must include the algorithm used for the inter.worker.signature.algorithm property. The algorithm(s) '" + String.valueOf(INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) + "' will be used as a default on JVMs that provide them; on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
    public static final String CONFLUENT_PREEMPTIVE_SCHEDULED_REBALANCE_END_ENABLE = "confluent.preemptive.scheduled.rebalance.end.enable";
    public static final String CONFLUENT_PREEMPTIVE_SCHEDULED_REBALANCE_END_ENABLE_DOC = "Flag to indicate whether pre-emptive end to scheduled rebalance delays are enabled in a connect cluster when workers join back during rebalance. If this config is set to false, then invoking the 'PUT /cluster/rebalance' endpoint will fail.";
    public static final String CONFLUENT_REST_REQUEST_AUTHORIZATION_SECRET_PATH_CONFIG = "confluent.rest.request.authorization.secret.path";
    public static final String CONFLUENT_REST_REQUEST_AUTHORIZATION_SECRET_PATH_DOC = "The path to the file containing the secret to be used in the authorization headers for inter-worker communication. If the 'confluent.rest.request.authorization.enable' configuration is set to true, the secret from this file will be used to create authorization headers for the REST client requests.";
    public static final String CONFLUENT_REST_REQUEST_AUTHORIZATION_ENABLE_CONFIG = "confluent.rest.request.authorization.enable";
    public static final String CONFLUENT_REST_REQUEST_AUTHORIZATION_ENABLE_DOC = "Determines if authorization headers are used for the REST client requests. If this configuration is set to true, the REST client will use authorization headers with the secret present at path configured withconfluent.rest.request.authorization.secret.path";
    public static final String ENABLE_CONNECT_RESOURCE_AWARE_SCHEDULING = "confluent.connect.resource.aware.scheduling.enable";
    protected static final boolean ENABLE_CONNECT_RESOURCE_AWARE_SCHEDULING_DEFAULT = false;
    protected static final String ENABLE_CONNECT_RESOURCE_AWARE_SCHEDULING_DOC = "Whether or not to enable resource aware scheduling for connectors and tasks This is an internal configuration and only meant for use on Confluent Cloud.";
    public static final String CONNECT_METRICS_STORE_TYPE_CONFIG = "connect.metrics.store";
    protected static final String CONNECT_METRICS_STORE_TYPE_DEFAULT = "AMP";
    protected static final String CONNECT_METRICS_STORE_TYPE_DOC = "The metrics store implementation to use for the Connect worker.";
    public static final String CONNECT_AMP_METRICS_STORE_WORKSPACE_ID = "confluent.connect.amp.metrics.store.workspace.id";
    protected static final String CONNECT_AMP_METRICS_STORE_WORKSPACE_ID_DEFAULT = "";
    public static final String CONNECT_AMP_METRICS_STORE_WORKSPACE_ID_DOC = "The AMP workspace id for the metrics store implementation.";
    public static final String CONNECT_AMP_READER_ARN = "confluent.connect.amp.reader.arn";
    protected static final String CONNECT_AMP_READER_ARN_DEFAULT = "";
    public static final String CONNECT_AMP_READER_ARN_DOC = "The AMP reader ARN which has read access to the AMP workspace and will be assumed by connect workers";
    public static final String CONNECT_WORKER_CPU_LOAD_METRIC_NAME = "confluent.connect.worker.cpu.load.metric.name";
    protected static final String CONNECT_WORKER_CPU_LOAD_METRIC_NAME_DEFAULT = "";
    public static final String CONNECT_WORKER_CPU_LOAD_METRIC_NAME_DOC = "The metric name for the worker CPU load metric.";
    public static final String CONNECT_WORKER_MEMORY_LOAD_METRIC_NAME = "confluent.connect.worker.memory.load.metric.name";
    protected static final String CONNECT_WORKER_MEMORY_LOAD_METRIC_NAME_DEFAULT = "";
    public static final String CONNECT_WORKER_MEMORY_LOAD_METRIC_NAME_DOC = "The metric name for the worker memory load metric.";
    public static final String CONNECT_TASK_CPU_LOAD_METRIC_NAME = "confluent.connect.task.cpu.load.metric.name";
    protected static final String CONNECT_TASK_CPU_LOAD_METRIC_NAME_DEFAULT = "";
    public static final String CONNECT_TASK_CPU_LOAD_METRIC_NAME_DOC = "The metric name for the task CPU load metric.";
    public static final String CONNECT_AMP_CREDENTIALS_FILE_PATH = "confluent.connect.amp.credentials.file.path";
    protected static final String CONNECT_AMP_CREDENTIALS_FILE_PATH_DEFAULT = "";
    public static final String CONNECT_AMP_CREDENTIALS_FILE_PATH_DOC = "The path to the file containing the credentials for the AMP workspace. This file should contain the AWS access key and secret key in properties file format. ";
    private final Crypto crypto;
    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support";
    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones.\nTo enable exactly-once source support on a new cluster, set this property to '" + String.valueOf((Object)ExactlyOnceSourceSupport.ENABLED) + "'. To enable support on an existing cluster, first set to '" + String.valueOf((Object)ExactlyOnceSourceSupport.PREPARING) + "' on every worker in the cluster, then set to '" + String.valueOf((Object)ExactlyOnceSourceSupport.ENABLED) + "'. A rolling upgrade may be used for both changes. For more information on this feature, see the <a href=\"https://kafka.apache.org/documentation.html#connect_exactlyoncesource\">exactly-once source support documentation</a>.";
    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString();
    private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;

    private static Object defaultKeyGenerationAlgorithm(Crypto crypto) {
        try {
            DistributedConfig.validateKeyAlgorithm(crypto, INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, "HmacSHA256");
            return "HmacSHA256";
        }
        catch (Throwable t) {
            log.info("The default key generation algorithm '{}' does not appear to be available on this worker.A key algorithm will have to be manually specified via the '{}' worker property", (Object)"HmacSHA256", (Object)INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG);
            return ConfigDef.NO_DEFAULT_VALUE;
        }
    }

    private static Object defaultSignatureAlgorithm(Crypto crypto) {
        try {
            DistributedConfig.validateSignatureAlgorithm(crypto, INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, "HmacSHA256");
            return "HmacSHA256";
        }
        catch (Throwable t) {
            log.info("The default signature algorithm '{}' does not appear to be available on this worker.A signature algorithm will have to be manually specified via the '{}' worker property", (Object)"HmacSHA256", (Object)INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG);
            return ConfigDef.NO_DEFAULT_VALUE;
        }
    }

    private static Object defaultVerificationAlgorithms(Crypto crypto) {
        ArrayList<String> result = new ArrayList<String>();
        for (String verificationAlgorithm : INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) {
            try {
                DistributedConfig.validateSignatureAlgorithm(crypto, INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, verificationAlgorithm);
                result.add(verificationAlgorithm);
            }
            catch (Throwable t) {
                log.trace("Verification algorithm '{}' not found", (Object)verificationAlgorithm);
            }
        }
        if (result.isEmpty()) {
            log.info("The default verification algorithm '{}' does not appear to be available on this worker.One or more verification algorithms will have to be manually specified via the '{}' worker property", INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT, (Object)INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
            return ConfigDef.NO_DEFAULT_VALUE;
        }
        return result;
    }

    public static ConfigDef config(Crypto crypto) {
        return DistributedConfig.baseConfigDef().define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC).define(SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)Math.toIntExact(TimeUnit.SECONDS.toMillis(10L)), ConfigDef.Importance.HIGH, SESSION_TIMEOUT_MS_DOC).define(REBALANCE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)Math.toIntExact(TimeUnit.MINUTES.toMillis(1L)), ConfigDef.Importance.HIGH, REBALANCE_TIMEOUT_MS_DOC).define(HEARTBEAT_INTERVAL_MS_CONFIG, ConfigDef.Type.INT, (Object)Math.toIntExact(TimeUnit.SECONDS.toMillis(3L)), ConfigDef.Importance.HIGH, HEARTBEAT_INTERVAL_MS_DOC).define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, ConfigDef.Type.STRING, (Object)EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT, (ConfigDef.Validator)ConfigDef.CaseInsensitiveValidString.in((String[])Utils.enumOptions(ExactlyOnceSourceSupport.class)), ConfigDef.Importance.HIGH, EXACTLY_ONCE_SOURCE_SUPPORT_DOC).define("metadata.max.age.ms", ConfigDef.Type.LONG, (Object)TimeUnit.MINUTES.toMillis(5L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.").define("client.id", ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.").define("send.buffer.bytes", ConfigDef.Type.INT, (Object)131072, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.MEDIUM, "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.").define("receive.buffer.bytes", ConfigDef.Type.INT, (Object)32768, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.MEDIUM, "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.").define("reconnect.backoff.ms", ConfigDef.Type.LONG, (Object)50L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the <code>reconnect.backoff.max.ms</code> value.").define("reconnect.backoff.max.ms", ConfigDef.Type.LONG, (Object)TimeUnit.SECONDS.toMillis(1L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.").define("socket.connection.setup.timeout.ms", ConfigDef.Type.LONG, (Object)CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the <code>socket.connection.setup.timeout.max.ms</code> value.").define("socket.connection.setup.timeout.max.ms", ConfigDef.Type.LONG, (Object)CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.").define("retry.backoff.ms", ConfigDef.Type.LONG, (Object)CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. This value is the initial backoff value and will increase exponentially for each failed request, up to the <code>retry.backoff.max.ms</code> value.").define("retry.backoff.max.ms", ConfigDef.Type.LONG, (Object)CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MAX_MS, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failed. If provided, the backoff per client will increase exponentially for each failed request, up to this maximum. To prevent all clients from being synchronized upon retry, a randomized jitter with a factor of 0.2 will be applied to the backoff, resulting in the backoff falling within a range between 20% below and 20% above the computed value. If <code>retry.backoff.ms</code> is set to be higher than <code>retry.backoff.max.ms</code>, then <code>retry.backoff.max.ms</code> will be used as a constant backoff from the beginning without any exponential increase").define("request.timeout.ms", ConfigDef.Type.INT, (Object)Math.toIntExact(TimeUnit.SECONDS.toMillis(40L)), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define("connections.max.idle.ms", ConfigDef.Type.LONG, (Object)TimeUnit.MINUTES.toMillis(9L), ConfigDef.Importance.MEDIUM, "Close idle connections after the number of milliseconds specified by this config.").define("security.protocol", ConfigDef.Type.STRING, (Object)"PLAINTEXT", (ConfigDef.Validator)ConfigDef.CaseInsensitiveValidString.in((String[])Utils.enumOptions(SecurityProtocol.class)), ConfigDef.Importance.MEDIUM, "Protocol used to communicate with brokers.").withClientSaslSupport().define(WORKER_SYNC_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)3000, ConfigDef.Importance.MEDIUM, WORKER_SYNC_TIMEOUT_MS_DOC).define(WORKER_UNSYNC_BACKOFF_MS_CONFIG, ConfigDef.Type.INT, (Object)300000, ConfigDef.Importance.MEDIUM, WORKER_UNSYNC_BACKOFF_MS_DOC).define(OFFSET_STORAGE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, OFFSET_STORAGE_TOPIC_CONFIG_DOC).define(OFFSET_STORAGE_PARTITIONS_CONFIG, ConfigDef.Type.INT, (Object)25, TopicCreationConfig.PARTITIONS_VALIDATOR, ConfigDef.Importance.LOW, OFFSET_STORAGE_PARTITIONS_CONFIG_DOC).define(OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, (Object)3, TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR, ConfigDef.Importance.LOW, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC).define(CONFIG_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONFIG_TOPIC_CONFIG_DOC).define(CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, (Object)3, TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR, ConfigDef.Importance.LOW, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC).define(STATUS_STORAGE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, STATUS_STORAGE_TOPIC_CONFIG_DOC).define(STATUS_STORAGE_PARTITIONS_CONFIG, ConfigDef.Type.INT, (Object)5, TopicCreationConfig.PARTITIONS_VALIDATOR, ConfigDef.Importance.LOW, STATUS_STORAGE_PARTITIONS_CONFIG_DOC).define(STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, (Object)3, TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR, ConfigDef.Importance.LOW, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC).define(CONNECT_PROTOCOL_CONFIG, ConfigDef.Type.STRING, (Object)CONNECT_PROTOCOL_DEFAULT, (ConfigDef.Validator)ConfigDef.LambdaValidator.with((name, value) -> {
            try {
                ConnectProtocolCompatibility.compatibility((String)value);
            }
            catch (Throwable t) {
                throw new ConfigException(name, value, "Invalid Connect protocol compatibility");
            }
        }, () -> Arrays.stream(ConnectProtocolCompatibility.values()).map(ConnectProtocolCompatibility::toString).collect(Collectors.joining(", ", "[", "]"))), ConfigDef.Importance.LOW, CONNECT_PROTOCOL_DOC).define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, ConfigDef.Type.INT, (Object)SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT, (ConfigDef.Validator)ConfigDef.Range.between((Number)0, (Number)Integer.MAX_VALUE), ConfigDef.Importance.LOW, SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC).define(INTER_WORKER_KEY_TTL_MS_CONFIG, ConfigDef.Type.INT, (Object)INTER_WORKER_KEY_TTL_MS_MS_DEFAULT, (ConfigDef.Validator)ConfigDef.Range.between((Number)0, (Number)Integer.MAX_VALUE), ConfigDef.Importance.LOW, INTER_WORKER_KEY_TTL_MS_MS_DOC).define(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, DistributedConfig.defaultKeyGenerationAlgorithm(crypto), (ConfigDef.Validator)ConfigDef.LambdaValidator.with((name, value) -> DistributedConfig.validateKeyAlgorithm(crypto, name, (String)value), () -> "Any KeyGenerator algorithm supported by the worker JVM"), ConfigDef.Importance.LOW, INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC).define(INTER_WORKER_KEY_SIZE_CONFIG, ConfigDef.Type.INT, (Object)INTER_WORKER_KEY_SIZE_DEFAULT, ConfigDef.Importance.LOW, INTER_WORKER_KEY_SIZE_DOC).define(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, ConfigDef.Type.STRING, DistributedConfig.defaultSignatureAlgorithm(crypto), (ConfigDef.Validator)ConfigDef.LambdaValidator.with((name, value) -> DistributedConfig.validateSignatureAlgorithm(crypto, name, (String)value), () -> "Any MAC algorithm supported by the worker JVM"), ConfigDef.Importance.LOW, INTER_WORKER_SIGNATURE_ALGORITHM_DOC).define(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, ConfigDef.Type.LIST, DistributedConfig.defaultVerificationAlgorithms(crypto), (ConfigDef.Validator)ConfigDef.LambdaValidator.with((name, value) -> DistributedConfig.validateVerificationAlgorithms(crypto, name, (List)value), () -> "A list of one or more MAC algorithms, each supported by the worker JVM"), ConfigDef.Importance.LOW, INTER_WORKER_VERIFICATION_ALGORITHMS_DOC).define(METADATA_RECOVERY_STRATEGY_CONFIG, ConfigDef.Type.STRING, (Object)DEFAULT_METADATA_RECOVERY_STRATEGY, (ConfigDef.Validator)ConfigDef.CaseInsensitiveValidString.in((String[])Utils.enumOptions(MetadataRecoveryStrategy.class)), ConfigDef.Importance.LOW, METADATA_RECOVERY_STRATEGY_DOC).define(METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, ConfigDef.Type.LONG, (Object)300000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC).defineInternal(CONFLUENT_PREEMPTIVE_SCHEDULED_REBALANCE_END_ENABLE, ConfigDef.Type.BOOLEAN, (Object)false, ConfigDef.Importance.MEDIUM, CONFLUENT_PREEMPTIVE_SCHEDULED_REBALANCE_END_ENABLE_DOC).defineInternal(CONFLUENT_REST_REQUEST_AUTHORIZATION_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)false, ConfigDef.Importance.LOW, CONFLUENT_REST_REQUEST_AUTHORIZATION_ENABLE_DOC).defineInternal(CONFLUENT_REST_REQUEST_AUTHORIZATION_SECRET_PATH_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONFLUENT_REST_REQUEST_AUTHORIZATION_SECRET_PATH_DOC).defineInternal(ENABLE_CONNECT_RESOURCE_AWARE_SCHEDULING, ConfigDef.Type.BOOLEAN, (Object)false, ConfigDef.Importance.LOW, ENABLE_CONNECT_RESOURCE_AWARE_SCHEDULING_DOC).defineInternal(CONNECT_METRICS_STORE_TYPE_CONFIG, ConfigDef.Type.STRING, (Object)CONNECT_METRICS_STORE_TYPE_DEFAULT, ConfigDef.Importance.LOW, CONNECT_METRICS_STORE_TYPE_DOC).defineInternal(CONNECT_AMP_METRICS_STORE_WORKSPACE_ID, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONNECT_AMP_METRICS_STORE_WORKSPACE_ID_DOC).defineInternal(CONNECT_AMP_READER_ARN, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONNECT_AMP_READER_ARN_DOC).defineInternal(CONNECT_WORKER_CPU_LOAD_METRIC_NAME, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONNECT_WORKER_CPU_LOAD_METRIC_NAME_DOC).defineInternal(CONNECT_WORKER_MEMORY_LOAD_METRIC_NAME, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONNECT_WORKER_MEMORY_LOAD_METRIC_NAME_DOC).defineInternal(CONNECT_TASK_CPU_LOAD_METRIC_NAME, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONNECT_TASK_CPU_LOAD_METRIC_NAME_DOC).defineInternal(CONNECT_AMP_CREDENTIALS_FILE_PATH, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, CONNECT_AMP_CREDENTIALS_FILE_PATH_DOC);
    }

    @Override
    public Integer rebalanceTimeout() {
        return this.getInt(REBALANCE_TIMEOUT_MS_CONFIG);
    }

    private static Map<String, String> addExposeInternalEndpointConfig(Map<String, String> props) {
        HashMap<String, String> result = new HashMap<String, String>(props);
        result.putIfAbsent("expose.internal.connect.endpoints", "true");
        return result;
    }

    @Override
    public boolean exactlyOnceSourceEnabled() {
        return this.exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
    }

    public boolean transactionalLeaderEnabled() {
        return this.exactlyOnceSourceSupport.usesTransactionalLeader;
    }

    public String transactionalProducerId() {
        return DistributedConfig.transactionalProducerId(this.groupId());
    }

    public static String transactionalProducerId(String groupId) {
        return "connect-cluster-" + groupId;
    }

    @Override
    public String offsetsTopic() {
        return this.getString(OFFSET_STORAGE_TOPIC_CONFIG);
    }

    @Override
    public boolean connectorOffsetsTopicsPermitted() {
        return true;
    }

    @Override
    public String groupId() {
        return this.getString(GROUP_ID_CONFIG);
    }

    @Override
    protected Map<String, Object> postProcessParsedConfig(Map<String, Object> parsedValues) {
        CommonClientConfigs.warnDisablingExponentialBackoff((AbstractConfig)this);
        return super.postProcessParsedConfig(parsedValues);
    }

    public DistributedConfig(Map<String, String> props) {
        this(Crypto.SYSTEM, DistributedConfig.addExposeInternalEndpointConfig(props));
    }

    DistributedConfig(Crypto crypto, Map<String, String> props) {
        super(DistributedConfig.config(crypto), DistributedConfig.addExposeInternalEndpointConfig(props));
        this.crypto = crypto;
        this.exactlyOnceSourceSupport = ExactlyOnceSourceSupport.fromProperty(this.getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
        this.validateInterWorkerKeyConfigs();
    }

    public static void main(String[] args) {
        System.out.println(DistributedConfig.config(Crypto.SYSTEM).toHtml(4, config -> "connectconfigs_" + config));
    }

    public KeyGenerator getInternalRequestKeyGenerator() {
        try {
            KeyGenerator result = this.crypto.keyGenerator(this.getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG));
            Optional.ofNullable(this.getInt(INTER_WORKER_KEY_SIZE_CONFIG)).ifPresent(result::init);
            return result;
        }
        catch (InvalidParameterException | NoSuchAlgorithmException e) {
            throw new ConfigException(String.format("Unable to create key generator with algorithm %s and key size %d: %s", this.getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG), this.getInt(INTER_WORKER_KEY_SIZE_CONFIG), e.getMessage()));
        }
    }

    private Map<String, Object> topicSettings(String prefix) {
        Object removedPolicy;
        Map result = this.originalsWithPrefix(prefix);
        if (CONFIG_STORAGE_PREFIX.equals(prefix) && result.containsKey(PARTITIONS_SUFFIX)) {
            log.warn("Ignoring '{}{}={}' setting, since config topic partitions is always 1", new Object[]{prefix, PARTITIONS_SUFFIX, result.get(PARTITIONS_SUFFIX)});
        }
        if ((removedPolicy = result.remove("cleanup.policy")) != null) {
            log.warn("Ignoring '{}cleanup.policy={}' setting, since compaction is always used", (Object)prefix, removedPolicy);
        }
        result.remove(TOPIC_SUFFIX);
        result.remove(REPLICATION_FACTOR_SUFFIX);
        result.remove(PARTITIONS_SUFFIX);
        return result;
    }

    public Map<String, Object> configStorageTopicSettings() {
        return this.topicSettings(CONFIG_STORAGE_PREFIX);
    }

    public Map<String, Object> offsetStorageTopicSettings() {
        return this.topicSettings(OFFSET_STORAGE_PREFIX);
    }

    public Map<String, Object> statusStorageTopicSettings() {
        return this.topicSettings(STATUS_STORAGE_PREFIX);
    }

    private void validateInterWorkerKeyConfigs() {
        this.getInternalRequestKeyGenerator();
        this.ensureVerificationAlgorithmsIncludeSignatureAlgorithm();
    }

    private void ensureVerificationAlgorithmsIncludeSignatureAlgorithm() {
        String signatureAlgorithm = this.getString(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG);
        List verificationAlgorithms = this.getList(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
        if (!verificationAlgorithms.contains(signatureAlgorithm)) {
            throw new ConfigException(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, (Object)signatureAlgorithm, String.format("Signature algorithm must be present in %s list", INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG));
        }
    }

    private static void validateVerificationAlgorithms(Crypto crypto, String configName, List<String> algorithms) {
        if (algorithms.isEmpty()) {
            throw new ConfigException(configName, algorithms, "At least one signature verification algorithm must be provided");
        }
        for (String algorithm : algorithms) {
            try {
                crypto.mac(algorithm);
            }
            catch (NoSuchAlgorithmException e) {
                throw DistributedConfig.unsupportedAlgorithmException(configName, algorithm, "Mac");
            }
        }
    }

    private static void validateSignatureAlgorithm(Crypto crypto, String configName, String algorithm) {
        try {
            crypto.mac(algorithm);
        }
        catch (NoSuchAlgorithmException e) {
            throw DistributedConfig.unsupportedAlgorithmException(configName, algorithm, "Mac");
        }
    }

    private static void validateKeyAlgorithm(Crypto crypto, String configName, String algorithm) {
        try {
            crypto.keyGenerator(algorithm);
        }
        catch (NoSuchAlgorithmException e) {
            throw DistributedConfig.unsupportedAlgorithmException(configName, algorithm, "KeyGenerator");
        }
    }

    private static ConfigException unsupportedAlgorithmException(String name, Object value, String type) {
        return new ConfigException(name, value, "the algorithm is not supported by this JVM; the supported algorithms are: " + String.valueOf(DistributedConfig.supportedAlgorithms(type)));
    }

    static Set<String> supportedAlgorithms(String type) {
        HashSet<String> result = new HashSet<String>();
        for (Provider provider : Security.getProviders()) {
            for (Provider.Service service : provider.getServices()) {
                if (!type.equals(service.getType())) continue;
                result.add(service.getAlgorithm());
            }
        }
        return result;
    }

    public static enum ExactlyOnceSourceSupport {
        DISABLED(false),
        PREPARING(true),
        ENABLED(true);

        public final boolean usesTransactionalLeader;

        private ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
            this.usesTransactionalLeader = usesTransactionalLeader;
        }

        public static ExactlyOnceSourceSupport fromProperty(String property) {
            return ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
        }

        public String toString() {
            return this.name().toLowerCase(Locale.ROOT);
        }
    }
}

