/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.healthcheck;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Multisets;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.confluent.C3Version;
import io.confluent.command.record.alert.CommandAlert;
import io.confluent.controlcenter.BootstrapClientSupplier;
import io.confluent.controlcenter.ControlCenterConfigModule;
import io.confluent.controlcenter.data.ClusterMetadataDao;
import io.confluent.controlcenter.healthcheck.HealthCheckModule;
import io.confluent.controlcenter.kafka.AdminClientSupplier;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.rest.res.KafkaClusterDisplay;
import io.confluent.controlcenter.rest.res.StatusMeasurement;
import io.confluent.controlcenter.util.TopicInfo;
import io.confluent.controlcenter.version.metrics.ControlCenterMetrics;
import io.confluent.monitoring.common.Clock;
import io.confluent.support.metrics.serde.AvroSerializer;
import io.confluent.support.metrics.submitters.ConfluentSubmitter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import org.apache.avro.generic.GenericContainer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class HealthCheck
implements Runnable {
    private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(HealthCheck.class);
    private static final int HEALTHCHECK_TIMEOUT = 15;
    static final long METRICS_PERIOD = TimeUnit.HOURS.toMillis(1L);
    private final BootstrapClientSupplier bootstrapClient;
    private final AdminClientSupplier<String> adminClientSupplier;
    private final ClusterMetadataDao clusterMetadataDao;
    private final Map<String, TopicInfo> healthCheckTopics;
    private final Map<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> clusterStatus;
    private final Clock clock;
    private final Logger log;
    private final ConfluentSubmitter submitter;
    private final String sessionId;
    private final boolean phoneHomeEnabled;
    private final AvroSerializer avroSerializer = new AvroSerializer();
    private final String controlCenterId;
    private String bootstrapClusterId;
    private Multimap<Integer, Node> previousNodes = ImmutableMultimap.of();
    private Node controller;
    private long lastSubmit = 0L;

    @Inject
    public HealthCheck(BootstrapClientSupplier bootstrapClient, AdminClientSupplier<String> adminClientSupplier, ClusterMetadataDao clusterMetadataDao, @ControlCenterConfigModule.ControlTopics Set<TopicInfo> healthCheckTopics, @HealthCheckModule.ClusterStatus Map<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> clusterStatus, Clock clock, @HealthCheckModule.PhoneHomeEnabled boolean phoneHomeEnabled, ConfluentSubmitter submitter, @HealthCheckModule.SessionId String sessionId, @HealthCheckModule.ControlCenterInstance String controlCenterId) {
        this(bootstrapClient, adminClientSupplier, clusterMetadataDao, healthCheckTopics, clusterStatus, clock, DEFAULT_LOGGER, phoneHomeEnabled, submitter, sessionId, controlCenterId);
    }

    public HealthCheck(BootstrapClientSupplier bootstrapClient, AdminClientSupplier adminClientSupplier, ClusterMetadataDao clusterMetadataDao, Set<TopicInfo> healthCheckTopics, Map<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> clusterStatus, Clock clock, Logger log, boolean phoneHomeEnabled, ConfluentSubmitter submitter, String sessionId, String controlCenterId) {
        this.bootstrapClient = bootstrapClient;
        this.adminClientSupplier = adminClientSupplier;
        this.clusterMetadataDao = clusterMetadataDao;
        this.healthCheckTopics = Maps.uniqueIndex(healthCheckTopics, (Function)new Function<TopicInfo, String>(){

            public String apply(TopicInfo input) {
                return input.name;
            }
        });
        this.clusterStatus = clusterStatus;
        this.clock = clock;
        this.log = log;
        this.phoneHomeEnabled = phoneHomeEnabled;
        this.submitter = submitter;
        this.sessionId = sessionId;
        this.controlCenterId = controlCenterId;
        if (phoneHomeEnabled) {
            log.info("CONTROL CENTER UI\n\nBy using Control Center, subject to any license you may have with Confluent, you agree to the Confluent Data Protection Agreement.  In particular, please note that the version check feature of Control Center is enabled.\n\nWith this enabled, this instance is configured to collect and report certain data (version information, time stamped session IDs, instance ID, instance uptime, license key for subscription customers, IP address, and other product data)  to Confluent, Inc. (\"Confluent\") or its parent, subsidiaries, affiliates or service providers every hour.  By proceeding with `confluent.support.metrics.enable=true`, you agree to all such collection, transfer and use of Version information by Confluent. You can turn the version check feature off by setting `confluent.support.metrics.enable=false` in the Control Center configuration and restarting Control Center.  See the Confluent Enterprise documentation for further information.\n");
        }
    }

    @Override
    public void run() {
        try {
            this.checkBootstrapKafkaCluster();
            this.checkManagedKafkaClusters();
            this.checkTopics();
        }
        catch (Throwable t) {
            this.log.warn("unable to perform health check", t);
        }
        try {
            this.sendMetrics();
        }
        catch (Throwable throwable) {
            this.log.warn("unable to send metrics", throwable);
        }
    }

    void checkBootstrapKafkaCluster() throws Throwable {
        this.log.debug("attempting to describe bootstrap cluster");
        try (AdminClient adminClient = this.bootstrapClient.get();){
            DescribeClusterResult dcr = adminClient.describeCluster();
            this.updateBootstrapClusterId((String)dcr.clusterId().get(15L, TimeUnit.SECONDS));
            this.updatedNodes((Multimap<Integer, Node>)FluentIterable.from((Iterable)((Iterable)dcr.nodes().get())).index((Function)new Function<Node, Integer>(){

                public Integer apply(Node node) {
                    return node.id();
                }
            }));
            this.updateController((Node)dcr.controller().get());
            this.checkBrokerConfigs();
            this.updateClusterStatus(this.bootstrapClusterId, CommandAlert.StatusValue.ONLINE);
        }
        catch (ExecutionException ee) {
            if (ee.getCause() instanceof TimeoutException) {
                this.log.warn("timeout trying to connect to kafka cluster");
            }
            this.updateClusterStatus(this.bootstrapClusterId, CommandAlert.StatusValue.OFFLINE);
        }
    }

    void checkManagedKafkaClusters() throws Throwable {
        for (KafkaClusterDisplay kafkaCluster : this.clusterMetadataDao.getKafkaClustersForManagement().clusters) {
            this.log.debug("attempting to describe cluster:{}", (Object)kafkaCluster.displayName);
            try {
                AdminClient adminClient = this.adminClientSupplier.getClient(kafkaCluster.clusterId);
                Throwable throwable = null;
                try {
                    DescribeClusterResult dcr = adminClient.describeCluster();
                    String clusterId = (String)dcr.clusterId().get(15L, TimeUnit.SECONDS);
                    if (clusterId == null || !clusterId.equals(kafkaCluster.clusterId)) {
                        this.log.warn("current clusterId={} does not match initial={} for cluster={}", new Object[]{clusterId, kafkaCluster.clusterId, kafkaCluster.displayName});
                    }
                    this.updateClusterStatus(kafkaCluster.clusterId, CommandAlert.StatusValue.ONLINE);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (adminClient == null) continue;
                    if (throwable != null) {
                        try {
                            adminClient.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    adminClient.close();
                }
            }
            catch (IllegalArgumentException | ExecutionException e) {
                if (e.getCause() instanceof TimeoutException) {
                    this.log.warn("timeout trying to connect to kafka cluster");
                }
                this.updateClusterStatus(kafkaCluster.clusterId, CommandAlert.StatusValue.OFFLINE);
            }
        }
    }

    void sendMetrics() throws Throwable {
        long now = this.clock.currentTimeMillis();
        if (this.phoneHomeEnabled && this.lastSubmit < now - METRICS_PERIOD) {
            ControlCenterMetrics metrics = new ControlCenterMetrics();
            metrics.setConfluentPlatformVersion(C3Version.getVersionString());
            metrics.setTimestamp(Long.valueOf(now));
            metrics.setSession(this.sessionId);
            metrics.setControlCenterInstance(this.controlCenterId);
            metrics.setClusterId(this.bootstrapClusterId);
            this.log.debug("sending metrics={}", (Object)metrics);
            this.submitter.submit(this.avroSerializer.serialize((GenericContainer)metrics));
            this.lastSubmit = now;
        }
    }

    void checkTopics() throws Throwable {
        try (AdminClient adminClient = this.bootstrapClient.get();){
            final Set extantTopics = (Set)adminClient.listTopics().names().get(15L, TimeUnit.SECONDS);
            this.log.trace("extantTopics={}", (Object)extantTopics);
            ImmutableList topicResources = FluentIterable.from(this.healthCheckTopics.values()).filter((Predicate)new Predicate<TopicInfo>(){

                public boolean apply(TopicInfo input) {
                    if (!extantTopics.contains(input.name)) {
                        HealthCheck.this.log.trace("not checking topic={} because it hasn't been created", (Object)input);
                        return false;
                    }
                    return true;
                }
            }).transform((Function)new Function<TopicInfo, ConfigResource>(){

                public ConfigResource apply(TopicInfo input) {
                    return new ConfigResource(ConfigResource.Type.TOPIC, input.name);
                }
            }).toList();
            Map topicConfigs = (Map)adminClient.describeConfigs((Collection)topicResources).all().get(15L, TimeUnit.SECONDS);
            this.log.trace("retrieved topicConfigs={}", (Object)topicConfigs);
            for (ConfigResource resource : topicResources) {
                if (topicConfigs.containsKey(resource)) {
                    Config config = (Config)topicConfigs.get(resource);
                    TopicInfo topicInfo = this.healthCheckTopics.get(resource.name());
                    for (ConfigEntry entry : topicInfo.config.entries()) {
                        ConfigEntry actualConfig = config.get(entry.name());
                        String actualValue = actualConfig == null ? "unset" : actualConfig.value();
                        if (actualValue.equals(entry.value())) continue;
                        if (topicInfo.validateConfig) {
                            this.log.warn("misconfigured topic={} config={} value={} expected={}", new Object[]{resource.name(), entry.name(), actualValue, entry.value()});
                            continue;
                        }
                        this.log.info("misconfigured topic={} config={} value={} expected={}", new Object[]{resource.name(), entry.name(), actualValue, entry.value()});
                    }
                    continue;
                }
                this.log.info("missing config information for topic={}", (Object)resource.name());
            }
        }
        catch (ExecutionException ee) {
            if (ee.getCause() instanceof UnsupportedVersionException) {
                this.log.info(ee.getLocalizedMessage());
            }
            throw ee.getCause();
        }
    }

    private void checkBrokerConfigs() throws Throwable {
        try (AdminClient adminClient = this.bootstrapClient.get();){
            Map brokerConfigs = (Map)adminClient.describeConfigs(Collections2.transform((Collection)this.previousNodes.keySet(), (Function)new Function<Integer, ConfigResource>(){

                public ConfigResource apply(Integer input) {
                    return new ConfigResource(ConfigResource.Type.BROKER, input.toString());
                }
            })).all().get(15L, TimeUnit.SECONDS);
            for (Map.Entry brokerEntry : brokerConfigs.entrySet()) {
                String broker = ((ConfigResource)brokerEntry.getKey()).name();
                Config config = (Config)brokerEntry.getValue();
                ConfigEntry metricReporters = config.get(KafkaConfig.MetricReporterClassesProp());
                if (metricReporters == null || !metricReporters.value().contains("ConfluentMetricsReporter")) {
                    this.log.warn("broker={} is not instrumented with ConfluentMetricsReporter", (Object)broker);
                }
                this.validateLogDirs(broker, config);
            }
        }
        catch (ExecutionException ee) {
            if (ee.getCause() instanceof UnsupportedVersionException) {
                this.log.info(ee.getLocalizedMessage());
            }
            if (ee.getCause() instanceof TimeoutException) {
                this.log.warn("timeout trying to connect to kafka cluster");
            }
            throw ee.getCause();
        }
    }

    void validateLogDirs(String broker, Config config) {
        ConfigEntry logDirsConfigEntry = config.get(KafkaConfig.LogDirsProp());
        ImmutableList actualLogDirs = Collections.emptyList();
        if (logDirsConfigEntry != null && !logDirsConfigEntry.isDefault()) {
            actualLogDirs = ImmutableList.copyOf((Object[])logDirsConfigEntry.value().split(","));
        } else {
            ConfigEntry logDirConfigEntry = config.get(KafkaConfig.LogDirProp());
            if (logDirConfigEntry != null) {
                actualLogDirs = ImmutableList.of((Object)logDirConfigEntry.value());
            } else {
                this.log.info("broker={} unable to determine log directory", (Object)broker);
            }
        }
        for (String logDir : actualLogDirs) {
            if (!logDir.startsWith("/tmp")) continue;
            this.log.error("broker={} is storing logs in {}, Kafka expects to store log data in a persistent location", (Object)broker, (Object)logDir);
        }
    }

    private void updateController(Node currentController) {
        if (this.controller == null || !this.controller.equals((Object)currentController)) {
            this.log.info("new controller={}", (Object)currentController);
            this.controller = currentController;
        }
    }

    private void updatedNodes(Multimap<Integer, Node> nodes) {
        if (!this.previousNodes.keys().equals((Object)nodes.keys())) {
            this.log.info("broker id set has changed new={} removed={}", (Object)Multimaps.filterKeys(nodes, (Predicate)Predicates.in((Collection)Multisets.difference((Multiset)nodes.keys(), (Multiset)this.previousNodes.keys()))), (Object)Multimaps.filterKeys(this.previousNodes, (Predicate)Predicates.in((Collection)Multisets.difference((Multiset)this.previousNodes.keys(), (Multiset)nodes.keys()))));
        }
        for (Integer i : nodes.keySet()) {
            Collection iNodes = nodes.get((Object)i);
            if (iNodes.size() == 1) continue;
            this.log.warn("found count={} brokers with id={}", (Object)iNodes.size(), (Object)i);
        }
        this.previousNodes = nodes;
    }

    void updateBootstrapClusterId(String clusterId) {
        if (this.bootstrapClusterId == null) {
            this.log.info("current clusterId={}", (Object)clusterId);
        } else if (!this.bootstrapClusterId.equals(clusterId)) {
            this.log.error("current clusterId={} does not match initial={}", (Object)clusterId, (Object)this.bootstrapClusterId);
            this.clusterStatus.remove(this.bootstrapClusterId);
        }
        this.bootstrapClusterId = clusterId;
    }

    private void updateClusterStatus(String clusterId, CommandAlert.StatusValue status) {
        Controlcenter.MetricMeasurement measurement = Controlcenter.MetricMeasurement.newBuilder().setBrokerMetric(CommandAlert.BrokerTriggerMetricType.CLUSTER_STATUS).setStatusValue(status).build();
        this.updateClusterMetric(clusterId, measurement);
    }

    private void updateClusterMetric(String clusterId, Controlcenter.MetricMeasurement metric) {
        Controlcenter.TriggerMeasurement stored;
        if (clusterId == null || clusterId.isEmpty()) {
            this.log.warn("ClusterId is not defined, cannot record healthcheck status");
            return;
        }
        if (metric == null) {
            return;
        }
        CommandAlert.BrokerTriggerMetricType metricType = metric.getBrokerMetric();
        Controlcenter.TriggerMeasurement triggerMeasurement = Controlcenter.TriggerMeasurement.newBuilder().setClusterId(clusterId).setComponentType(Controlcenter.ComponentType.BROKER_CLUSTER).setMetricMeasurement(metric).setArrivalTime(this.clock.currentTimeMillis()).build();
        Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement> measurements = this.clusterStatus.get(clusterId);
        if (measurements == null) {
            measurements = new ConcurrentHashMap<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>();
            this.clusterStatus.put(clusterId, measurements);
        }
        if ((stored = measurements.get(metricType)) != null && stored.getMetricMeasurement().equals((Object)metric)) {
            return;
        }
        measurements.put(metricType, triggerMeasurement);
    }

    public String getBootstrapClusterId() {
        return this.bootstrapClusterId;
    }

    public Map<String, StatusMeasurement> getAllClusterStatus() {
        HashMap<String, StatusMeasurement> statusMap = new HashMap<String, StatusMeasurement>();
        for (Map.Entry<String, Map<CommandAlert.BrokerTriggerMetricType, Controlcenter.TriggerMeasurement>> entry : this.clusterStatus.entrySet()) {
            Controlcenter.TriggerMeasurement measurement = entry.getValue().get(CommandAlert.BrokerTriggerMetricType.CLUSTER_STATUS);
            if (measurement == null) continue;
            statusMap.put(entry.getKey(), StatusMeasurement.fromTriggerMeasurement(measurement));
        }
        return statusMap;
    }
}

