package io.confluent.connect.replicator;

import io.airlift.airline.Command;
import io.airlift.airline.Help;
import io.airlift.airline.HelpOption;
import io.airlift.airline.Option;
import io.airlift.airline.ParseArgumentsUnexpectedException;
import io.airlift.airline.ParseOptionMissingException;
import io.airlift.airline.SingleCommand;
import io.confluent.connect.replicator.exec.ExecutableConfigProvider;
import io.confluent.connect.replicator.exec.ReplicatorCli;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.utils.Exit;
import org.apache.zookeeper.audit.AuditConstants;
import org.jose4j.json.internal.json_simple.JSONObject;
import org.jose4j.json.internal.json_simple.parser.JSONParser;
import org.jose4j.json.internal.json_simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Command(name = "replicator-verifier", description = "Verification tool for Confluent Replicator configuration. There are three ways to use this: either   specify --replicator-json to validate replicator configuration as a connector, or    specify --producer.config, --consumer.config, and --replicator.config to  validate replicator configuration when running as an 'executable', or    specify --connect-url and --connector-name to validate an already running replicator instance. Use --destination-test-topic to override the (otherwise randomized) topic used during dry run checks at the destination cluster (this topic must already exist).The tool will  check connectivity to source/destination, check Replicator's ability to create/list topics, print out any white-listed/black-listed topics, and list topics covered by any regex in the config.")
/* loaded from: input_file:io/confluent/connect/replicator/Verifier.class */
public class Verifier {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Verifier.class);
    private static SingleCommand<Verifier> parser = SingleCommand.singleCommand(Verifier.class);
    private Mode verificationMode;
    private Map<String, String> connectorProperties;
    private TopicProvider topicProvider;
    private TestTopicNameProvider testTopicNameProvider;
    private Set<String> srcTopics;
    private static final int DEFAULT_PORT = 8083;

    @Inject
    public HelpOption helpOption;

    @Option(name = {"--connect-url"}, title = "Connect URL", description = "URL for the Connect instance running connectors")
    private String connectURL;

    @Option(name = {"--connector-name"}, title = "Connector name", description = "Name for the replicator connector")
    private String connectorName;

    @Option(name = {"--replicator-json"}, title = "Replicator JSON config", description = "JSON file containing configuration for replicator running as a connector")
    private String replicatorJson;

    @Option(name = {"--producer.config"}, title = "Replicator executable producer config ", description = "Producer config file for replicator running in executable mode")
    private String replicatorExecProducerConfig;

    @Option(name = {"--consumer.config"}, title = "Replicator executable consumer config", description = "Consumer config file for replicator running in executable mode")
    private String replicatorExecConsumerConfig;

    @Option(name = {"--replication.config"}, title = "Replicator executable config", description = "Replicator config file for replicator running in executable mode")
    private String replicatorExecReplicatorConfig;

    @Option(name = {"--destination-test-topic"}, title = "Destination test topic", description = "Name of destination topic, an override for the randomized topic name used for dry run checks at the destination cluster (this topic must already exist)")
    private String destinationTestTopicName;
    private Map<ConfigurationCheck, Boolean> checkSummary = new HashMap();
    private boolean timestampTopicCreatedOnSource = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/connect/replicator/Verifier$Mode.class */
    public enum Mode {
        Connector,
        Json,
        Executable,
        Invalid
    }

    private static boolean isNonEmpty(String str) {
        return !(str == null || str.isEmpty() || str.trim().isEmpty());
    }

    private void validate() {
        int i = 0;
        if (isNonEmpty(this.connectorName) && isNonEmpty(this.connectURL)) {
            this.verificationMode = Mode.Connector;
            i = 0 + 1;
        }
        if (isNonEmpty(this.replicatorJson)) {
            this.verificationMode = Mode.Json;
            i++;
        }
        if (isNonEmpty(this.replicatorExecConsumerConfig) && isNonEmpty(this.replicatorExecProducerConfig) && isNonEmpty(this.replicatorExecReplicatorConfig)) {
            this.verificationMode = Mode.Executable;
            i++;
        }
        if (i != 1) {
            this.verificationMode = Mode.Invalid;
            Help.help(parser.getCommandMetadata());
        }
    }

    private void run() {
        log.info("Replicator verifier running in " + this.verificationMode.name() + " mode.");
        switch (this.verificationMode) {
            case Json:
                this.connectorProperties = readConfigFromFile(this.replicatorJson);
                break;
            case Connector:
                this.connectorProperties = retrieveConfigFromConnect(this.connectURL, this.connectorName);
                break;
            case Executable:
                this.connectorProperties = convertExecutableConfig(this.replicatorExecProducerConfig, this.replicatorExecConsumerConfig, this.replicatorExecReplicatorConfig);
                break;
            default:
                throw new InvalidConfigurationException("Invalid verification mode");
        }
        log.info("Retrieved the following config :-\n");
        this.connectorProperties.forEach((str, str2) -> {
            log.info(str + " => " + str2);
        });
        ReplicatorSourceConnectorConfig replicatorSourceConnectorConfig = new ReplicatorSourceConnectorConfig(this.connectorProperties);
        checkConnectivity(replicatorSourceConnectorConfig.srcAdminClientConfig(), "Source");
        checkConnectivity(replicatorSourceConnectorConfig.dstAdminClientConfig(), HttpHeaders.DESTINATION);
        this.srcTopics = this.topicProvider.getTopics(replicatorSourceConnectorConfig.srcAdminClientConfig());
        checkWhitelistBlacklistTopics(replicatorSourceConnectorConfig.getTopics(), replicatorSourceConnectorConfig.getBlacklistTopics());
        checkRegexTopics(replicatorSourceConnectorConfig.getTopicPattern(), replicatorSourceConnectorConfig.getTopics(), replicatorSourceConnectorConfig.getBlacklistTopics());
        checkLicense(replicatorSourceConnectorConfig);
        checkBasicReadACLs(replicatorSourceConnectorConfig.srcAdminClientConfig(), replicatorSourceConnectorConfig.getTopics(), this.connectorProperties);
        if (replicatorSourceConnectorConfig.getTopicConfigSync() || replicatorSourceConnectorConfig.getTopicAutoCreate()) {
            checkDescribeConfigACLs(replicatorSourceConnectorConfig.srcAdminClientConfig(), replicatorSourceConnectorConfig.getTopics());
        }
        interactiveDryRunChecks(replicatorSourceConnectorConfig);
        log.info("All checks completed.");
    }

    private void interactiveDryRunChecks(ReplicatorSourceConnectorConfig replicatorSourceConnectorConfig) {
        boolean z;
        String testTopicName;
        Scanner scanner = new Scanner(System.in, "UTF-8");
        System.out.println("The verifier tool will now begin a number of \"dry run\" checks. These will test the operations performed by replicator and may create topics and consumer groups required to verify these operations. Do you wish to continue with these checks [y/N]?");
        if (scanner.nextLine().toLowerCase().equals("y")) {
            if (isNonEmpty(this.destinationTestTopicName)) {
                z = false;
                testTopicName = this.destinationTestTopicName;
            } else {
                z = true;
                if (this.testTopicNameProvider == null) {
                    this.testTopicNameProvider = new TestTopicNameProvider();
                    this.testTopicNameProvider.setClientConfig(replicatorSourceConnectorConfig.dstAdminClientConfig());
                }
                testTopicName = this.testTopicNameProvider.getTestTopicName();
            }
            if (testTopicName == null) {
                log.error("could not determine a non existing topic to create for ACL checks. Terminating..");
                return;
            }
            System.out.println("The verifier tool will " + (z ? AuditConstants.OP_CREATE : "modify") + " topic: " + testTopicName + " on the destination cluster. If necessary please add ACLs for this topic equivalent to those you have added for Replicator. If enabled, the verifier will also commit offsets to the source and destination clusters to confirm offset tracking and offset translation. The group used for this will be: " + replicatorSourceConnectorConfig.getName() + " please create suitable ACLs to allow this. When thisis done please enter the full name of the topic: " + testTopicName + " to confirm this topic can be created on the destination and suitable ACLs are in place.");
            if (scanner.nextLine().equals(testTopicName)) {
                checkDryRunAcls(replicatorSourceConnectorConfig, testTopicName, z);
                if (this.timestampTopicCreatedOnSource) {
                    log.info("\n*********************\nNote: the timestamps topic (__consumer_timestamps) was created on the source cluster in order to check permissions.\n*********************\n\n");
                }
                log.info("\n*********************\nA topic [" + testTopicName + "] was created for this check. For security, this tool will not delete it; you may now delete it manually.\n*********************\n\n");
                log.info("Please run: `kafka-topics.sh`, with your bootstrap-servers and command-config, and pass in `--delete " + testTopicName + "` as an argument.\n\n");
            }
        }
    }

    private boolean shouldCheckTimestamps(ReplicatorSourceConnectorConfig replicatorSourceConnectorConfig) {
        if (!replicatorSourceConnectorConfig.isOffsetTimestampsCommitEnabled()) {
            return false;
        }
        this.timestampTopicCreatedOnSource = this.srcTopics.contains("__consumer_timestamps");
        return true;
    }

    private void checkDryRunAcls(ReplicatorSourceConnectorConfig replicatorSourceConnectorConfig, String str, boolean z) {
        if (replicatorSourceConnectorConfig.getTopicAutoCreate()) {
            checkDestinationTopicAcls(str, z, replicatorSourceConnectorConfig.getTopicConfigSync(), replicatorSourceConnectorConfig.dstAdminClientConfig());
        }
        if (replicatorSourceConnectorConfig.isOffsetTopicCommitEnabled()) {
            checkSourceOffsetCommit(str, replicatorSourceConnectorConfig.srcAdminClientConfig(), new SimpleConsumerProvider(this.connectorProperties).getConsumer(), replicatorSourceConnectorConfig.getName());
        }
        if (replicatorSourceConnectorConfig.getOffsetTranslatorTasksMax() != 0) {
            checkDestOffsetCommit(str, replicatorSourceConnectorConfig.dstAdminClientConfig(), replicatorSourceConnectorConfig.getName());
        }
        if (shouldCheckTimestamps(replicatorSourceConnectorConfig)) {
            checkConsumerTimestampsACLs(replicatorSourceConnectorConfig.srcAdminClientConfig(), this.connectorProperties);
        }
    }

    private void checkDestinationTopicAcls(String str, boolean z, boolean z2, Map<String, Object> map) {
        DestinationTopicAclCheck destinationTopicAclCheck = new DestinationTopicAclCheck();
        destinationTopicAclCheck.setCheckTopic(str).setShouldCreate(z).setSyncing(z2).setClientConfig(map);
        boolean performCheck = destinationTopicAclCheck.performCheck();
        this.checkSummary.put(destinationTopicAclCheck, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Check for ACLs at Destination cluster passed.");
        } else {
            log.error("Check for ACLs at Destination cluster failed!");
        }
    }

    private void checkDestOffsetCommit(String str, Map<String, Object> map, String str2) {
        ConsumerOffsetCommitCheck tag = new ConsumerOffsetCommitCheck().setCheckTopic(str).setClientConfig(map).setName(str2).setTag("destination");
        boolean performCheck = tag.performCheck();
        this.checkSummary.put(tag, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Offset commit check passed on destination cluster.");
        } else {
            log.error("Offset commit check failed on destination cluster.");
        }
    }

    private void checkSourceOffsetCommit(String str, Map<String, Object> map, Consumer consumer, String str2) {
        ConsumerOffsetCommitCheck tag = new ConsumerOffsetCommitCheck().setCheckTopic(str).setClientConfig(map).setConsumer(consumer).setName(str2).setTag("source");
        boolean performCheck = tag.performCheck();
        this.checkSummary.put(tag, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Offset commit check passed on destination cluster.");
        } else {
            log.error("Offset commit check failed on destination cluster.");
        }
    }

    private void checkConsumerTimestampsACLs(Map<String, Object> map, Map<String, String> map2) {
        ConsumerTimestampsCheck consumerProvider = new ConsumerTimestampsCheck().setAdminClientConfig(map).setConsumerProvider(new SimpleConsumerProvider(map2));
        boolean performCheck = consumerProvider.performCheck();
        this.checkSummary.put(consumerProvider, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Checks for ACLs on the consumer timestamps topic passed.");
        } else {
            log.error("Checks for ACLs on the consumer timestamps topic failed !");
        }
    }

    private void checkDescribeConfigACLs(Map<String, Object> map, Set<String> set) {
        DescribeAclCheck topics = new DescribeAclCheck().setAdminClientConfig(map).setTopics(set);
        boolean performCheck = topics.performCheck();
        this.checkSummary.put(topics, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Check for DescribeConfig ACLs at source cluster passed.");
        } else {
            log.error("Check for DescribeConfig ACLs at source cluster failed!");
        }
    }

    private void checkBasicReadACLs(Map<String, Object> map, Set<String> set, Map<String, String> map2) {
        ReadAclCheck consumerProvider = new ReadAclCheck().setAdminClientConfig(map).setTopics(set).setConsumerProvider(new SimpleConsumerProvider(map2));
        boolean performCheck = consumerProvider.performCheck();
        this.checkSummary.put(consumerProvider, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Check for ACLs at source cluster passed.");
        } else {
            log.error("Check for ACLs at source cluster failed!");
        }
    }

    private void checkConnectivity(Map<String, Object> map, String str) {
        ConnectivityCheck tag = new ConnectivityCheck().setAdminConfig(map).setTag(str);
        boolean performCheck = tag.performCheck();
        this.checkSummary.put(tag, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Connectivity check at " + str + " succeeded.");
        } else {
            log.error("Could not verify connectivity to brokers at " + str);
        }
    }

    private void checkRegexTopics(Pattern pattern, Set<String> set, Set<String> set2) {
        RegexCheck blacklistTopics = new RegexCheck().setTopicPattern(pattern).setSourceTopics(this.srcTopics).setWhitelistTopics(set).setBlacklistTopics(set2);
        boolean performCheck = blacklistTopics.performCheck();
        this.checkSummary.put(blacklistTopics, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Regex pattern sanity check passed!");
        } else {
            log.error("Regex pattern sanity check failed!");
        }
    }

    private void checkWhitelistBlacklistTopics(Set<String> set, Set<String> set2) {
        WhitelistBlacklistCheck blacklistTopics = new WhitelistBlacklistCheck().setSourceTopics(this.srcTopics).setWhitelistTopics(set).setBlacklistTopics(set2);
        boolean performCheck = blacklistTopics.performCheck();
        this.checkSummary.put(blacklistTopics, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Topic list sanity check passed!");
        } else {
            log.error("Topic list sanity check failed!");
        }
    }

    private void checkLicense(ReplicatorSourceConnectorConfig replicatorSourceConnectorConfig) {
        LicenseManagerCheck licenseManagerCheck = new LicenseManagerCheck();
        licenseManagerCheck.setLicenseConfig(new ReplicatorSourceConnector().getLicenseConfigs(replicatorSourceConnectorConfig)).setLicenseTopic(replicatorSourceConnectorConfig.getString(ReplicatorSourceConnectorConfig.CONFLUENT_TOPIC_CONFIG)).setLicense(replicatorSourceConnectorConfig.getPassword("confluent.license").value());
        boolean performCheck = licenseManagerCheck.performCheck();
        this.checkSummary.put(licenseManagerCheck, Boolean.valueOf(performCheck));
        if (performCheck) {
            log.info("Valid license discovered using License Manager");
        } else {
            log.warn("Could not validate license through License Manager.");
            log.info(licenseManagerCheck.helpText());
        }
    }

    private Map<String, String> convertExecutableConfig(String str, String str2, String str3) {
        ReplicatorCli replicatorCli = new ReplicatorCli();
        replicatorCli.producerConfig = str;
        replicatorCli.consumerConfig = str2;
        replicatorCli.replicationConfig = str3;
        ExecutableConfigProvider executableConfigProvider = new ExecutableConfigProvider();
        executableConfigProvider.setAllProps(replicatorCli.validate());
        return executableConfigProvider.getConnectorConfig();
    }

    private Map<String, String> retrieveConfigFromConnect(String str, String str2) {
        CloseableHttpResponse closeableHttpResponse = null;
        try {
            closeableHttpResponse = HttpClients.createMinimal().execute(new HttpHost(str, retrievePortFromURL(str, DEFAULT_PORT)), (HttpRequest) new HttpGet("/connectors/" + str2));
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (closeableHttpResponse.getStatusLine().getStatusCode() != 200) {
            log.error("Unable to retrieve connector info from url: " + str + ", for: " + str2);
            return new HashMap();
        }
        try {
            JSONObject jSONObject = (JSONObject) new JSONParser().parse(EntityUtils.toString(closeableHttpResponse.getEntity()));
            String str3 = (String) jSONObject.get("name");
            if (!str3.equals(str2)) {
                log.error("Found inconsistent connector name: expected " + str2 + ", found " + str3);
            }
            return (Map) jSONObject.get("config");
        } catch (IOException | ParseException e2) {
            e2.printStackTrace();
            return new HashMap();
        }
    }

    private Map<String, String> readConfigFromFile(String str) {
        JSONParser jSONParser = new JSONParser();
        log.info("Reading in config from: " + str);
        try {
            InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(str), "UTF-8");
            Throwable th = null;
            try {
                try {
                    JSONObject jSONObject = (JSONObject) jSONParser.parse(inputStreamReader);
                    log.info("Read in json config for connector: " + ((String) jSONObject.get("name")));
                    Map<String, String> map = (Map) ((Map) jSONObject.get("config")).entrySet().stream().filter(entry -> {
                        return entry.getValue() != null;
                    }).collect(Collectors.toMap(entry2 -> {
                        return (String) entry2.getKey();
                    }, entry3 -> {
                        return entry3.getValue().toString();
                    }));
                    if (inputStreamReader != null) {
                        if (0 != 0) {
                            try {
                                inputStreamReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStreamReader.close();
                        }
                    }
                    return map;
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStreamReader != null) {
                    if (th != null) {
                        try {
                            inputStreamReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStreamReader.close();
                    }
                }
                throw th3;
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            return new HashMap();
        } catch (IOException e2) {
            e2.printStackTrace();
            return new HashMap();
        } catch (ParseException e3) {
            e3.printStackTrace();
            return new HashMap();
        }
    }

    public int retrievePortFromURL(String str, int i) {
        Matcher matcher = Pattern.compile(":(\\d){1,5}").matcher(str);
        return matcher.find() ? Integer.parseInt(matcher.group().substring(1)) : i;
    }

    public void setTopicProvider(TopicProvider topicProvider) {
        this.topicProvider = topicProvider;
    }

    public static void main(String[] strArr) {
        Verifier verifier = null;
        try {
            verifier = parser.parse(strArr);
            if (verifier.helpOption.showHelpIfRequested()) {
                return;
            }
        } catch (ParseArgumentsUnexpectedException | ParseOptionMissingException e) {
            log.error(e.getMessage());
            Help.help(parser.getCommandMetadata());
            Exit.exit(1);
        }
        verifier.execute();
        verifier.getCheckSummary().forEach((configurationCheck, bool) -> {
            log.info(String.format("%1$50s", configurationCheck.getName()) + " : " + (bool.booleanValue() ? "PASSED" : "FAILED\n" + configurationCheck.helpText()));
        });
    }

    public void execute() {
        setTopicProvider(new TopicProvider());
        validate();
        if (this.verificationMode != Mode.Invalid) {
            run();
            log.info("Verifier run complete.");
        }
    }

    public Map<ConfigurationCheck, Boolean> getCheckSummary() {
        return this.checkSummary;
    }

    public void setConnectorName(String str) {
        this.connectorName = str;
    }

    public void setReplicatorJson(String str) {
        this.replicatorJson = str;
    }

    public void setTestTopicNameProvider(TestTopicNameProvider testTopicNameProvider) {
        this.testTopicNameProvider = testTopicNameProvider;
    }
}
