package io.confluent.connect.replicator.exec;

import io.airlift.airline.Command;
import io.airlift.airline.Help;
import io.airlift.airline.HelpOption;
import io.airlift.airline.Option;
import io.airlift.airline.ParseArgumentsUnexpectedException;
import io.airlift.airline.ParseOptionMissingException;
import io.airlift.airline.SingleCommand;
import io.airlift.airline.model.OptionMetadata;
import io.confluent.connect.replicator.KafkaConfigs;
import io.confluent.connect.replicator.ReplicatorSourceConnector;
import io.confluent.connect.replicator.ReplicatorSourceConnectorConfig;
import io.confluent.connect.replicator.util.ByteArrayConverter;
import io.confluent.license.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Command(name = ReplicatorCli.COMMAND_NAME, description = "Replicator by Confluent.")
/* loaded from: input_file:io/confluent/connect/replicator/exec/ReplicatorCli.class */
public class ReplicatorCli {
    private static final String COMMAND_NAME = "replicator";

    @Inject
    public HelpOption helpOption;

    @Option(name = {"--cluster.id"}, title = "Replicator Cluster Id", description = "Specifies the unique identifier for the Replicator cluster.", required = true)
    public String clusterId;

    @Option(name = {"--producer.config"}, title = "producer.properties", description = "Specifies the location of the file that contains the configuration settings for the producer writing to the destination cluster.", required = true)
    public String producerConfig;

    @Option(name = {"--consumer.config"}, title = "consumer.properties", description = "Specifies the location of the file that contains the configuration settings for the consumer reading from the origin cluster.", required = true)
    public String consumerConfig;

    @Option(name = {"--cluster.threads"}, title = "Total Replicator threads", description = "The total number of threads across all workers in the Replicator cluster. If this command starts another Replicator worker in an existing cluster, this can be used to change the number of threads in the whole cluster.")
    public Integer clusterThreads;

    @Option(name = {"--confluent.license"}, title = "Confluent License Key", description = "Your Confluent license key that enables you to use Replicator. Without the license key, you can use Replicator for a 30-day trial period. If you are a subscriber, please contact Confluent Support for more information.")
    public String confluentLicense;

    @Option(name = {"--topic.regex"}, title = "Regular Expression to Match Topics for Replication", description = "A regular expression that matches the names of the topics to be replicated. Any topic that matches this expression (or is listed in the whitelist) and not in the blacklist will be replicated.")
    public String topicRegex;

    @Option(name = {"--whitelist"}, title = "Topic Whitelist", description = "A comma-separated list of the names of topics that should be replicated. Any topic that is in this list and not in the blacklist will be replicated.")
    public String whitelist;

    @Option(name = {"--blacklist"}, title = "Topic Blacklist", description = "A comma-separated list of topics that should not be replicated, even if they are included in the whitelist or matched by the regular expression.")
    public String blacklist;

    @Option(name = {"--topic.rename.format"}, title = "Rename Format", description = "A format string for the topic name in the destination cluster, which may contain ${topic} as a placeholder for the originating topic name. For example, ${topic}_dc1 for the topic 'orders' will map to the destination topic name 'orders_dc1.' Can be placed inside the file specified by --replication.config.")
    public String topicRenameFormat;

    @Option(name = {"--topic.auto.create"}, title = "Auto-create Topics on Destination", description = "Whether to automatically create topics in the destination cluster if required.", arity = 1)
    public Boolean topicAutoCreate;

    @Option(name = {"--topic.preserve.partitions"}, title = "Auto-create Topics on Destination", description = "Whether to automatically increase the number of partitions in the destination cluster to match the source cluster and ensure that messages replicated from the source cluster use the same partition in the destination cluster.", arity = 1)
    public Boolean topicPreservePartitions;

    @Option(name = {"--topic.config.sync"}, title = "Sync Topic Configs", description = "Whether to periodically sync topic configuration to the destination cluster.", arity = 1)
    public Boolean topicConfigSync;

    @Option(name = {"--topic.timestamp.type"}, title = "Topic Timestamp Type", description = "The timestamp type for the topics in the destination cluster.")
    public Long topicTimestampType;

    @Option(name = {"--topic.poll.interval.ms"}, title = "Topic Config Sync Interval (ms)", description = "Specifies how frequently to poll the source cluster for new topics matching the whitelist or regular expression. Can also be read from the file given by --replication.config.")
    public Long topicPollIntervalMs;

    @Option(name = {"--topic.create.backoff.ms"}, title = "Topic Creation Backoff (ms)", description = "Time to wait before retrying auto topic creation or expansion.")
    public Long topicCreateBackoffMs;

    @Option(name = {"--topic.config.sync.interval.ms"}, title = "Topic Config Sync Interval (ms)", description = "How often to check for configuration changes when 'topic.config.sync' is enabled.", allowedValues = {"CreateTime", "LogAppendTime"})
    public Long topicConfigSyncIntervalMs;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReplicatorCli.class);
    private static final List<String> WARN_ON_DEFAULTS = Arrays.asList(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, DistributedConfig.CONFIG_TOPIC_CONFIG);
    private static final List<String> REPLICATION_CONFIG_BLACKLIST_PREFIXES = Arrays.asList(KafkaConfigs.SRC_CONSUMER_PREFIX, KafkaConfigs.KafkaCluster.SOURCE.prefix(), KafkaConfigs.KafkaCluster.DESTINATION.prefix());

    @Option(name = {"--replication.config"}, title = "replication.properties", description = "Specifies the location of the file that contains the configuration settings for replication. When used, any property in this file can be overridden via a command line parameter. When this is not supplied, all of the properties defining how topics are to be replicated should be specified on the command line.")
    public String replicationConfig = "";

    @Option(name = {"--producer.monitoring.config"}, title = "producer-monitoring.properties", description = "Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator producer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as --producer-config to write metrics to the destination cluster.")
    public String producerMonitoringConfig = "";

    @Option(name = {"--consumer.monitoring.config"}, title = "consumer-monitoring.properties", description = "Specifies the location of the file that contains the producer settings for the Kafka cluster where monitoring information about the Replicator consumer is to be sent. This must be specified if monitoring is to be enabled, but may point to a different Kafka cluster than the origin or destination clusters. Use the same file as `--producer-config` to write metrics to the destination cluster.")
    public String consumerMonitoringConfig = "";
    private final Map<String, String> allProps = new HashMap();
    private final SingleCommand<ReplicatorCli> cmd = SingleCommand.singleCommand(ReplicatorCli.class);
    private final Map<String, OptionMetadata> options = new HashMap();

    public ReplicatorCli() {
        for (OptionMetadata optionMetadata : this.cmd.getCommandMetadata().getAllOptions()) {
            Iterator<String> it = optionMetadata.getOptions().iterator();
            while (it.hasNext()) {
                this.options.put(it.next(), optionMetadata);
            }
        }
    }

    public Map<String, String> parse(String[] strArr) {
        try {
            ReplicatorCli parse = this.cmd.parse(strArr);
            if (parse.helpOption.showHelpIfRequested()) {
                Exit.exit(0);
            }
            return parse.validate();
        } catch (ParseArgumentsUnexpectedException | ParseOptionMissingException e) {
            log.error(e.getMessage());
            try {
                Help.help(this.cmd.getCommandMetadata());
            } catch (Exception e2) {
                log.error("Unable to print usage: ", (Throwable) e2);
            }
            Exit.exit(1);
            return Collections.emptyMap();
        } catch (Exception e3) {
            log.error("Error running {}", COMMAND_NAME, e3);
            Exit.exit(1);
            return Collections.emptyMap();
        }
    }

    public Map<String, String> validate() {
        log.info("Kafka Connect distributed worker initializing ...");
        new WorkerInfo().logAll();
        warnOnDefaults(this.producerConfig, this.consumerConfig, this.replicationConfig);
        errorOnBlacklistConfig(this.replicationConfig);
        this.allProps.clear();
        loadRequiredDefaults();
        this.allProps.putAll(parseAndValidatePropsFile(this.producerConfig, "producer."));
        this.allProps.putAll(parseAndValidatePropsFile(this.consumerConfig, KafkaConfigs.SRC_CONSUMER_PREFIX));
        this.allProps.putAll(parseAndValidatePropsFile(this.producerMonitoringConfig, "producer.confluent.monitoring.interceptor."));
        this.allProps.putAll(parseAndValidatePropsFile(this.consumerMonitoringConfig, "consumer.confluent.monitoring.interceptor."));
        Map<String, String> extractAndLoadProperties = extractAndLoadProperties("producer.", "");
        ReplicatorSourceConnector.filterOutInterceptorPropertiesInPlace(extractAndLoadProperties);
        this.allProps.putAll(extractAndLoadProperties);
        this.allProps.putAll(ReplicatorSourceConnector.filterOutInterceptorPropertiesInPlace(extractAndLoadProperties("producer.", KafkaConfigs.KafkaCluster.DESTINATION.prefix())));
        this.allProps.putAll(ReplicatorSourceConnector.filterOutInterceptorPropertiesInPlace(extractAndLoadProperties(KafkaConfigs.SRC_CONSUMER_PREFIX, KafkaConfigs.KafkaCluster.SOURCE.prefix())));
        this.allProps.putAll(parseAndValidatePropsFile(this.replicationConfig));
        this.allProps.putIfAbsent(DistributedConfig.CONNECT_PROTOCOL_CONFIG, "eager");
        parseAndValidateSingularCommandLineOptions();
        return this.allProps;
    }

    private void errorOnBlacklistConfig(String str) {
        ArrayList arrayList = new ArrayList();
        for (String str2 : parseAndValidatePropsFile(str).keySet()) {
            Iterator<String> it = REPLICATION_CONFIG_BLACKLIST_PREFIXES.iterator();
            while (it.hasNext()) {
                if (str2.startsWith(it.next())) {
                    arrayList.add(str2);
                }
            }
        }
        if (!arrayList.isEmpty()) {
            throw new InvalidConfigurationException("The following invalid configurations were found in " + str + ": " + arrayList + ". These configurations should be provided in --consumer.config and --producer.config instead.");
        }
    }

    private void warnOnDefaults(String... strArr) {
        for (String str : WARN_ON_DEFAULTS) {
            boolean z = true;
            for (String str2 : strArr) {
                if (parseAndValidatePropsFile(str2, "").containsKey(str)) {
                    z = false;
                }
            }
            if (z) {
                log.warn("Property: " + str + " is configured to it's default value. Please consider overriding this to avoid conflicts between replicator instances.");
            }
        }
    }

    protected Map<String, String> parseAndValidatePropsFile(String str) {
        return parseAndValidatePropsFile(str, null);
    }

    protected Map<String, String> parseAndValidatePropsFile(String str, String str2) {
        try {
            if (str.isEmpty()) {
                return Collections.emptyMap();
            }
            Map<String, String> propsToStringMap = Utils.propsToStringMap(Utils.loadProps(str));
            if (!StringUtils.isNotBlank(str2)) {
                return propsToStringMap;
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, String> entry : propsToStringMap.entrySet()) {
                hashMap.put(str2 + entry.getKey(), entry.getValue());
            }
            return hashMap;
        } catch (IOException e) {
            throw new ConnectException("Unable to read properties from file: " + str);
        }
    }

    protected Map<String, String> extractAndLoadProperties(String str, String str2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : this.allProps.entrySet()) {
            String key = entry.getKey();
            if (key.startsWith(str)) {
                if (StringUtils.isNotBlank(str2)) {
                    hashMap.put(str2 + key.substring(str.length()), entry.getValue());
                } else {
                    hashMap.put(key.substring(str.length()), entry.getValue());
                }
            }
        }
        return hashMap;
    }

    protected void loadRequiredDefaults() {
        maybeOverride("internal.key.converter.schemas.enable", false, this.allProps);
        maybeOverride("internal.value.converter.schemas.enable", false, this.allProps);
        maybeOverride(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets", this.allProps);
        maybeOverride(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs", this.allProps);
        maybeOverride(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connect-status", this.allProps);
        maybeOverride("name", COMMAND_NAME, this.allProps);
        maybeOverride("connector.class", ReplicatorSourceConnector.class.getName(), this.allProps);
        maybeOverride("header.converter", ByteArrayConverter.class.getName(), this.allProps);
        maybeOverride("key.converter", ByteArrayConverter.class.getName(), this.allProps);
        maybeOverride("value.converter", ByteArrayConverter.class.getName(), this.allProps);
    }

    protected void parseAndValidateSingularCommandLineOptions() {
        this.allProps.put("group.id", this.clusterId);
        maybeOverride(ConnectorConfig.TASKS_MAX_CONFIG, this.clusterThreads, this.allProps);
        maybeOverride("confluent.license", this.confluentLicense, this.allProps);
        maybeOverride("topic.regex", this.topicRegex, this.allProps);
        maybeOverride("topic.whitelist", this.whitelist, this.allProps);
        maybeOverride("topic.blacklist", this.blacklist, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_RENAME_FORMAT_CONFIG, this.topicRenameFormat, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_AUTO_CREATE_CONFIG, this.topicAutoCreate, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_PRESERVE_PARTITIONS_CONFIG, this.topicPreservePartitions, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_CONFIG_SYNC_CONFIG, this.topicConfigSync, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_TIMESTAMP_TYPE_CONFIG, this.topicTimestampType, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_POLL_INTERVAL_MS_CONFIG, this.topicPollIntervalMs, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_CREATE_BACKOFF_MS_CONFIG, this.topicCreateBackoffMs, this.allProps);
        maybeOverride(ReplicatorSourceConnectorConfig.TOPIC_CONFIG_SYNC_INTERVAL_MS_CONFIG, this.topicConfigSyncIntervalMs, this.allProps);
    }

    private static <T> void maybeOverride(String str, T t, Map<String, String> map) {
        if (t != null) {
            map.put(str, String.valueOf(t));
        }
    }
}
