/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter;

import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.confluent.command.CommandStore;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.ControlCenterConfigModule;
import io.confluent.controlcenter.KafkaHelper;
import io.confluent.controlcenter.alert.AlertManager;
import io.confluent.controlcenter.alert.SenderModule;
import io.confluent.controlcenter.command.CommandMigrator;
import io.confluent.controlcenter.command.CommandModule;
import io.confluent.controlcenter.data.ClusterMetadataInitializer;
import io.confluent.controlcenter.data.ConsumerOffsetsFetcher;
import io.confluent.controlcenter.data.ConsumerOffsetsModule;
import io.confluent.controlcenter.healthcheck.HealthCheck;
import io.confluent.controlcenter.healthcheck.HealthCheckModule;
import io.confluent.controlcenter.kafka.ClusterManagementModule;
import io.confluent.controlcenter.license.LicenseModule;
import io.confluent.controlcenter.rest.ControlCenterApplication;
import io.confluent.controlcenter.rest.ControlCenterRestModule;
import io.confluent.controlcenter.rest.RestModule;
import io.confluent.controlcenter.serialization.SerializationModule;
import io.confluent.controlcenter.servicehealthcheck.ServiceHealthCheckModule;
import io.confluent.controlcenter.streams.KafkaStreamsManager;
import io.confluent.controlcenter.streams.StreamsConfigModule;
import io.confluent.controlcenter.streams.StreamsModule;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.verify.MonitoringHeartbeatSender;
import io.confluent.monitoring.common.TimeBucket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlCenter {
    private static final Logger log = LoggerFactory.getLogger(ControlCenter.class);
    private static final String TOPICS_FOR_TASKS_OPT = "runTasksFor";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        Options options = new Options();
        Option topicsForTasksOpt = Option.builder((String)TOPICS_FOR_TASKS_OPT).hasArg().longOpt("run-tasks-for").desc("Comma-separated list of topics. Run only tasks whose input topics include this set.").build();
        options.addOption(topicsForTasksOpt);
        DefaultParser parser = new DefaultParser();
        CommandLine cmdLine = parser.parse(options, args);
        if (cmdLine.getArgList().size() == 0) {
            System.err.println("You must provide a path to the config file");
            System.exit(1);
        }
        ControlCenterConfig config = new ControlCenterConfig((String)cmdLine.getArgList().get(0));
        boolean enableSubTopologies = false;
        ImmutableSet topicsForTasks = null;
        if (cmdLine.hasOption(TOPICS_FOR_TASKS_OPT)) {
            enableSubTopologies = true;
            topicsForTasks = ImmutableSet.copyOf((Object[])cmdLine.getOptionValue(TOPICS_FOR_TASKS_OPT).trim().split(","));
        }
        Injector injector = ControlCenter.makeInjector(config, enableSubTopologies, topicsForTasks);
        Version c3Version = (Version)injector.getInstance(Key.get(Version.class));
        final KafkaStreamsManager kafkaMonitoringStreamsManager = (KafkaStreamsManager)injector.getInstance(KafkaStreamsManager.class);
        final CommandStore commandStore = (CommandStore)injector.getInstance(CommandStore.class);
        CommandMigrator commandMigrator = (CommandMigrator)injector.getInstance(CommandMigrator.class);
        final MonitoringHeartbeatSender monitoringHeartbeatSender = (MonitoringHeartbeatSender)injector.getInstance(MonitoringHeartbeatSender.class);
        ClusterMetadataInitializer clusterMetadataInitializer = (ClusterMetadataInitializer)injector.getInstance(ClusterMetadataInitializer.class);
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("control-center-heartbeat-%d").build());
        Runtime.getRuntime().addShutdownHook(new Thread("control-center-shutdown-hook"){

            @Override
            public void run() {
                log.info("Shutting down due to shutdown hook signal");
                ControlCenter.shutdown(ses, monitoringHeartbeatSender, kafkaMonitoringStreamsManager, commandStore);
            }
        });
        try {
            HealthCheck healthCheck = (HealthCheck)injector.getInstance(HealthCheck.class);
            AlertManager alertManager = (AlertManager)injector.getInstance(AlertManager.class);
            ConsumerOffsetsFetcher consumerOffsetsFetcher = (ConsumerOffsetsFetcher)injector.getInstance(ConsumerOffsetsFetcher.class);
            log.info("Starting Control Center version={}", (Object)c3Version);
            KafkaHelper.ControlCenterPreconditions ccp = (KafkaHelper.ControlCenterPreconditions)injector.getInstance(KafkaHelper.ControlCenterPreconditions.class);
            if (!ccp.call().booleanValue()) {
                log.info("Shutting down due to unmet preconditions");
                Runtime.getRuntime().exit(1);
                return;
            }
            log.info("action=starting topology=command");
            commandStore.start(config.getLong("confluent.controlcenter.command.streams.start.timeout").longValue());
            log.info("action=started topology=command");
            if (!config.getBoolean("confluent.controlcenter.trigger.active-controller-count.enable").booleanValue()) {
                log.info("action=starting operation=command-migration ");
                commandMigrator.migrate();
                log.info("action=completed operation=command-migration");
            }
            ControlCenterApplication a = (ControlCenterApplication)((Object)injector.getInstance(ControlCenterApplication.class));
            log.info("action=starting topology=monitoring");
            kafkaMonitoringStreamsManager.start(config.getLong("confluent.controlcenter.internal.streams.start.timeout"));
            log.info("action=started topology=monitoring");
            clusterMetadataInitializer.call();
            if (config.getBoolean("confluent.controlcenter.alert.cluster.down.autocreate").booleanValue()) {
                log.info("Auto creating trigger and action for cluster down alerts");
                alertManager.autoCreateClusterDownTriggerAndAction(clusterMetadataInitializer.getBootstrapClusterId());
            }
            log.info("Starting Health Check");
            ses.scheduleAtFixedRate(healthCheck, 0L, 1L, TimeUnit.MINUTES);
            log.info("Starting Alert Manager");
            ses.scheduleAtFixedRate(alertManager, 0L, 1L, TimeUnit.MINUTES);
            log.info("Starting Consumer Offsets Fetch");
            ses.scheduleAtFixedRate(consumerOffsetsFetcher, 0L, 5L, TimeUnit.SECONDS);
            ses.scheduleWithFixedDelay(monitoringHeartbeatSender, 500L, TimeBucket.SIZE, TimeUnit.MILLISECONDS);
            a.start();
            a.join();
        }
        catch (InterruptedException | TimeoutException e) {
            log.error("failed to start topology", (Throwable)e);
            commandStore.close();
        }
        catch (Exception e) {
            log.error("Unexpected error", (Throwable)e);
        }
        finally {
            ControlCenter.shutdown(ses, monitoringHeartbeatSender, kafkaMonitoringStreamsManager, commandStore);
        }
    }

    private static Injector makeInjector(ControlCenterConfig config, boolean enableSubTopologies, ImmutableSet<String> topicsForTasks) {
        return Guice.createInjector((Module[])new Module[]{new ControlCenterConfigModule(config, enableSubTopologies, topicsForTasks), new HealthCheckModule(), new ServiceHealthCheckModule(), new CommandModule(), new ConsumerOffsetsModule(), new ClusterManagementModule(), new StreamsConfigModule(), new TopicStoreModule(), new StreamsModule(), new RestModule(), new ControlCenterRestModule(), new SerializationModule(), new LicenseModule(), new SenderModule()});
    }

    private static void shutdownTasksAndAwaitTermination(ExecutorService es) {
        es.shutdown();
        try {
            if (!es.awaitTermination(10L, TimeUnit.SECONDS)) {
                es.shutdownNow();
                if (!es.awaitTermination(60L, TimeUnit.SECONDS)) {
                    log.warn("Failed to shutdown heartbeat sender");
                }
            }
        }
        catch (InterruptedException ie) {
            log.warn("Failed to cleanly shutdown heartbeat sender");
            es.shutdownNow();
        }
    }

    private static void shutdown(ScheduledExecutorService ses, MonitoringHeartbeatSender monitoringHeartbeatSender, KafkaStreamsManager kafkaMonitoringStreamsManager, CommandStore commandStore) {
        ControlCenter.shutdownTasksAndAwaitTermination(ses);
        monitoringHeartbeatSender.close();
        kafkaMonitoringStreamsManager.close();
        commandStore.close();
    }
}

