package io.confluent.controlcenter;

import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.confluent.command.CommandStore;
import io.confluent.controlcenter.KafkaHelper;
import io.confluent.controlcenter.alert.AlertManager;
import io.confluent.controlcenter.alert.SenderModule;
import io.confluent.controlcenter.command.CommandMigrator;
import io.confluent.controlcenter.command.CommandModule;
import io.confluent.controlcenter.data.ClusterMetadataInitializer;
import io.confluent.controlcenter.data.ConsumerOffsetsFetcher;
import io.confluent.controlcenter.data.ConsumerOffsetsModule;
import io.confluent.controlcenter.healthcheck.HealthCheck;
import io.confluent.controlcenter.healthcheck.HealthCheckModule;
import io.confluent.controlcenter.kafka.ClusterManagementModule;
import io.confluent.controlcenter.license.LicenseModule;
import io.confluent.controlcenter.rest.ControlCenterApplication;
import io.confluent.controlcenter.rest.ControlCenterRestModule;
import io.confluent.controlcenter.rest.RestModule;
import io.confluent.controlcenter.serialization.SerializationModule;
import io.confluent.controlcenter.servicehealthcheck.ServiceHealthCheckModule;
import io.confluent.controlcenter.streams.KafkaStreamsManager;
import io.confluent.controlcenter.streams.StreamsConfigModule;
import io.confluent.controlcenter.streams.StreamsModule;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.verify.MonitoringHeartbeatSender;
import io.confluent.monitoring.common.TimeBucket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/controlcenter/ControlCenter.class */
public class ControlCenter {
    private static final Logger log = LoggerFactory.getLogger(ControlCenter.class);
    private static final String TOPICS_FOR_TASKS_OPT = "runTasksFor";

    public static void main(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption(Option.builder(TOPICS_FOR_TASKS_OPT).hasArg().longOpt("run-tasks-for").desc("Comma-separated list of topics. Run only tasks whose input topics include this set.").build());
        CommandLine parse = new DefaultParser().parse(options, strArr);
        if (parse.getArgList().size() == 0) {
            System.err.println("You must provide a path to the config file");
            System.exit(1);
        }
        ControlCenterConfig controlCenterConfig = new ControlCenterConfig((String) parse.getArgList().get(0));
        boolean z = false;
        ImmutableSet immutableSet = null;
        if (parse.hasOption(TOPICS_FOR_TASKS_OPT)) {
            z = true;
            immutableSet = ImmutableSet.copyOf(parse.getOptionValue(TOPICS_FOR_TASKS_OPT).trim().split(","));
        }
        Injector makeInjector = makeInjector(controlCenterConfig, z, immutableSet);
        Version version = (Version) makeInjector.getInstance(Key.get(Version.class));
        final KafkaStreamsManager kafkaStreamsManager = (KafkaStreamsManager) makeInjector.getInstance(KafkaStreamsManager.class);
        final CommandStore commandStore = (CommandStore) makeInjector.getInstance(CommandStore.class);
        CommandMigrator commandMigrator = (CommandMigrator) makeInjector.getInstance(CommandMigrator.class);
        final MonitoringHeartbeatSender monitoringHeartbeatSender = (MonitoringHeartbeatSender) makeInjector.getInstance(MonitoringHeartbeatSender.class);
        ClusterMetadataInitializer clusterMetadataInitializer = (ClusterMetadataInitializer) makeInjector.getInstance(ClusterMetadataInitializer.class);
        final ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("control-center-heartbeat-%d").build());
        Runtime.getRuntime().addShutdownHook(new Thread("control-center-shutdown-hook") { // from class: io.confluent.controlcenter.ControlCenter.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ControlCenter.log.info("Shutting down due to shutdown hook signal");
                ControlCenter.shutdown(newScheduledThreadPool, monitoringHeartbeatSender, kafkaStreamsManager, commandStore);
            }
        });
        try {
            try {
                try {
                    HealthCheck healthCheck = (HealthCheck) makeInjector.getInstance(HealthCheck.class);
                    AlertManager alertManager = (AlertManager) makeInjector.getInstance(AlertManager.class);
                    ConsumerOffsetsFetcher consumerOffsetsFetcher = (ConsumerOffsetsFetcher) makeInjector.getInstance(ConsumerOffsetsFetcher.class);
                    log.info("Starting Control Center version={}", version);
                    if (!((KafkaHelper.ControlCenterPreconditions) makeInjector.getInstance(KafkaHelper.ControlCenterPreconditions.class)).call().booleanValue()) {
                        log.info("Shutting down due to unmet preconditions");
                        Runtime.getRuntime().exit(1);
                        shutdown(newScheduledThreadPool, monitoringHeartbeatSender, kafkaStreamsManager, commandStore);
                        return;
                    }
                    log.info("action=starting topology=command");
                    commandStore.start(controlCenterConfig.getLong(ControlCenterConfig.CONTROL_CENTER_COMMAND_STREAMS_START_TIMEOUT).longValue());
                    log.info("action=started topology=command");
                    if (!controlCenterConfig.getBoolean(ControlCenterConfig.ACTIVE_CONTROLLER_COUNT_TRIGGER_ENABLED).booleanValue()) {
                        log.info("action=starting operation=command-migration ");
                        commandMigrator.migrate();
                        log.info("action=completed operation=command-migration");
                    }
                    ControlCenterApplication controlCenterApplication = (ControlCenterApplication) makeInjector.getInstance(ControlCenterApplication.class);
                    log.info("action=starting topology=monitoring");
                    kafkaStreamsManager.start(controlCenterConfig.getLong(ControlCenterConfig.CONTROL_CENTER_INTERNAL_STREAMS_START_TIMEOUT).longValue());
                    log.info("action=started topology=monitoring");
                    clusterMetadataInitializer.call();
                    if (controlCenterConfig.getBoolean(ControlCenterConfig.CONTROL_CENTER_ALERT_CLUSTER_DOWN_AUTOCREATE).booleanValue()) {
                        log.info("Auto creating trigger and action for cluster down alerts");
                        alertManager.autoCreateClusterDownTriggerAndAction(clusterMetadataInitializer.getBootstrapClusterId());
                    }
                    log.info("Starting Health Check");
                    newScheduledThreadPool.scheduleAtFixedRate(healthCheck, 0L, 1L, TimeUnit.MINUTES);
                    log.info("Starting Alert Manager");
                    newScheduledThreadPool.scheduleAtFixedRate(alertManager, 0L, 1L, TimeUnit.MINUTES);
                    log.info("Starting Consumer Offsets Fetch");
                    newScheduledThreadPool.scheduleAtFixedRate(consumerOffsetsFetcher, 0L, 5L, TimeUnit.SECONDS);
                    newScheduledThreadPool.scheduleWithFixedDelay(monitoringHeartbeatSender, 500L, TimeBucket.SIZE, TimeUnit.MILLISECONDS);
                    controlCenterApplication.start();
                    controlCenterApplication.join();
                    shutdown(newScheduledThreadPool, monitoringHeartbeatSender, kafkaStreamsManager, commandStore);
                } catch (InterruptedException | TimeoutException e) {
                    log.error("failed to start topology", e);
                    commandStore.close();
                    shutdown(newScheduledThreadPool, monitoringHeartbeatSender, kafkaStreamsManager, commandStore);
                }
            } catch (Exception e2) {
                log.error("Unexpected error", e2);
                shutdown(newScheduledThreadPool, monitoringHeartbeatSender, kafkaStreamsManager, commandStore);
            }
        } catch (Throwable th) {
            shutdown(newScheduledThreadPool, monitoringHeartbeatSender, kafkaStreamsManager, commandStore);
            throw th;
        }
    }

    private static Injector makeInjector(ControlCenterConfig controlCenterConfig, boolean z, ImmutableSet<String> immutableSet) {
        return Guice.createInjector(new Module[]{new ControlCenterConfigModule(controlCenterConfig, z, immutableSet), new HealthCheckModule(), new ServiceHealthCheckModule(), new CommandModule(), new ConsumerOffsetsModule(), new ClusterManagementModule(), new StreamsConfigModule(), new TopicStoreModule(), new StreamsModule(), new RestModule(), new ControlCenterRestModule(), new SerializationModule(), new LicenseModule(), new SenderModule()});
    }

    private static void shutdownTasksAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                    log.warn("Failed to shutdown heartbeat sender");
                }
            }
        } catch (InterruptedException e) {
            log.warn("Failed to cleanly shutdown heartbeat sender");
            executorService.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void shutdown(ScheduledExecutorService scheduledExecutorService, MonitoringHeartbeatSender monitoringHeartbeatSender, KafkaStreamsManager kafkaStreamsManager, CommandStore commandStore) {
        shutdownTasksAndAwaitTermination(scheduledExecutorService);
        monitoringHeartbeatSender.close();
        kafkaStreamsManager.close();
        commandStore.close();
    }
}
