package org.apache.kafka.connect.cli;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.fips.FipsValidator;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.util.ConnectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/cli/AbstractConnectCli.class */
public abstract class AbstractConnectCli<T extends WorkerConfig> {
    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
    private final String[] args;
    private final Time time = Time.SYSTEM;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractConnectCli(String... strArr) {
        this.args = strArr;
    }

    protected abstract String usage();

    protected void processExtraArgs(Herder herder, Connect connect, String[] strArr) {
    }

    protected abstract Herder createHerder(T t, String str, Plugins plugins, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, RestServer restServer, RestClient restClient);

    protected abstract T createConfig(Map<String, String> map);

    public void run() {
        if (this.args.length < 1 || Arrays.asList(this.args).contains("--help")) {
            log.info("Usage: {}", usage());
            Exit.exit(1);
        }
        try {
            String str = this.args[0];
            startConnect(!str.isEmpty() ? Utils.propsToStringMap(Utils.loadProps(str)) : Collections.emptyMap(), (String[]) Arrays.copyOfRange(this.args, 1, this.args.length)).awaitStop();
        } catch (Throwable th) {
            log.error("Stopping due to error", th);
            Exit.exit(2);
        }
    }

    public Connect startConnect(Map<String, String> map, String... strArr) {
        log.info("Kafka Connect worker initializing ...");
        long hiResClockMs = this.time.hiResClockMs();
        new WorkerInfo().logAll();
        log.info("Scanning for plugin classes. This might take a moment ...");
        Plugins plugins = new Plugins(map);
        plugins.compareAndSwapWithDelegatingLoader();
        SecurityUtils.addConfiguredSecurityProviders(map);
        T createConfig = createConfig(map);
        log.debug("Kafka cluster ID: {}", createConfig.kafkaClusterId());
        if (createConfig.getBoolean("enable.fips").booleanValue()) {
            FipsValidator buildFipsValidator = ConfluentConfigs.buildFipsValidator();
            String string = createConfig.getString("security.protocol");
            HashMap hashMap = new HashMap();
            hashMap.put(string, SecurityProtocol.forName(string));
            String determineAdvertisedProtocol = ConnectUtils.determineAdvertisedProtocol(createConfig);
            buildFipsValidator.validateFipsTls(createConfig.valuesWithPrefixAllOrNothing("listeners.https."));
            buildFipsValidator.validateFipsBrokerProtocol(hashMap);
            buildFipsValidator.validateRestProtocol(determineAdvertisedProtocol);
        }
        log.info("FIPS mode enabled in connect: ${config.fipsEnabled}");
        RestClient restClient = new RestClient(createConfig);
        ConnectRestServer connectRestServer = new ConnectRestServer(createConfig.rebalanceTimeout(), restClient, createConfig.originals());
        connectRestServer.initializeServer();
        URI advertisedUrl = connectRestServer.advertisedUrl();
        Herder createHerder = createHerder(createConfig, advertisedUrl.getHost() + ":" + advertisedUrl.getPort(), plugins, (ConnectorClientConfigOverridePolicy) plugins.newPlugin(createConfig.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG), createConfig, ConnectorClientConfigOverridePolicy.class), connectRestServer, restClient);
        Connect connect = new Connect(createHerder, connectRestServer);
        log.info("Kafka Connect worker initialization took {}ms", Long.valueOf(this.time.hiResClockMs() - hiResClockMs));
        try {
            connect.start();
        } catch (Exception e) {
            log.error("Failed to start Connect", e);
            connect.stop();
            Exit.exit(3);
        }
        processExtraArgs(createHerder, connect, strArr);
        return connect;
    }
}
