/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

public class KafkaStreamsBinderHealthIndicator
extends AbstractHealthIndicator
implements DisposableBean {
    private static ClassLoader CLASS_LOADER = KafkaStreamsBinderHealthIndicator.class.getClassLoader();
    private static boolean isKafkaStreams25 = true;
    private static Method methodForIsRunning;
    private final KafkaStreamsRegistry kafkaStreamsRegistry;
    private final KafkaStreamsBinderConfigurationProperties configurationProperties;
    private final Map<String, Object> adminClientProperties;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private AdminClient adminClient;
    private final Lock lock = new ReentrantLock();

    KafkaStreamsBinderHealthIndicator(KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, KafkaProperties kafkaProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
        super("Kafka-streams health check failed");
        kafkaProperties.buildAdminProperties();
        this.configurationProperties = kafkaStreamsBinderConfigurationProperties;
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        KafkaTopicProvisioner.normalalizeBootPropsWithBinder(this.adminClientProperties, (KafkaProperties)kafkaProperties, (KafkaBinderConfigurationProperties)kafkaStreamsBinderConfigurationProperties);
        this.kafkaStreamsRegistry = kafkaStreamsRegistry;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doHealthCheck(Health.Builder builder) throws Exception {
        try {
            this.lock.lock();
            if (this.adminClient == null) {
                this.adminClient = AdminClient.create(this.adminClientProperties);
            }
            ListTopicsResult listTopicsResult = this.adminClient.listTopics();
            listTopicsResult.listings().get((long)this.configurationProperties.getHealthTimeout(), TimeUnit.SECONDS);
            if (this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans().isEmpty()) {
                builder.withDetail("No Kafka Streams bindings have been established", (Object)"Kafka Streams binder did not detect any processors");
                builder.status(Status.UNKNOWN);
            } else {
                boolean up = true;
                Set<KafkaStreams> kafkaStreams = this.kafkaStreamsRegistry.getKafkaStreams();
                HashSet<KafkaStreams> allKafkaStreams = new HashSet<KafkaStreams>(kafkaStreams);
                if (this.configurationProperties.isIncludeStoppedProcessorsForHealthCheck()) {
                    allKafkaStreams.addAll(this.kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().values());
                }
                for (KafkaStreams kStream : allKafkaStreams) {
                    if (isKafkaStreams25) {
                        up &= kStream.state().isRunningOrRebalancing();
                    } else {
                        boolean isRunningInvokedResult = (Boolean)methodForIsRunning.invoke((Object)kStream.state(), new Object[0]);
                        up &= isRunningInvokedResult;
                    }
                    builder.withDetails(this.buildDetails(kStream));
                }
                builder.status(up ? Status.UP : Status.DOWN);
            }
        }
        catch (Exception e) {
            builder.withDetail("No topic information available", (Object)"Kafka broker is not reachable");
            builder.status(Status.DOWN);
            builder.withException((Throwable)e);
        }
        finally {
            this.lock.unlock();
        }
    }

    private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) throws Exception {
        HashMap<String, Object> details = new HashMap<String, Object>();
        HashMap<String, Object> perAppdIdDetails = new HashMap<String, Object>();
        boolean isRunningResult = isKafkaStreams25 ? kafkaStreams.state().isRunningOrRebalancing() : ((Boolean)methodForIsRunning.invoke((Object)kafkaStreams.state(), new Object[0])).booleanValue();
        if (isRunningResult) {
            Set threadMetadata = kafkaStreams.metadataForLocalThreads();
            for (ThreadMetadata metadata : threadMetadata) {
                perAppdIdDetails.put("threadName", metadata.threadName());
                perAppdIdDetails.put("threadState", metadata.threadState());
                perAppdIdDetails.put("adminClientId", metadata.adminClientId());
                perAppdIdDetails.put("consumerClientId", metadata.consumerClientId());
                perAppdIdDetails.put("restoreConsumerClientId", metadata.restoreConsumerClientId());
                perAppdIdDetails.put("producerClientIds", metadata.producerClientIds());
                perAppdIdDetails.put("activeTasks", KafkaStreamsBinderHealthIndicator.taskDetails(metadata.activeTasks()));
                perAppdIdDetails.put("standbyTasks", KafkaStreamsBinderHealthIndicator.taskDetails(metadata.standbyTasks()));
            }
            StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams);
            String applicationId = (String)streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id");
            details.put(applicationId, perAppdIdDetails);
        } else {
            StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams);
            String applicationId = null;
            if (streamsBuilderFactoryBean != null) {
                applicationId = (String)streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id");
            } else {
                Map<String, KafkaStreams> stoppedKafkaStreamsPerBinding = this.kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams();
                for (String appId : stoppedKafkaStreamsPerBinding.keySet()) {
                    if (!stoppedKafkaStreamsPerBinding.get(appId).equals(kafkaStreams)) continue;
                    applicationId = appId;
                }
            }
            details.put(applicationId, String.format("The processor with application.id %s is down. Current state: %s", applicationId, kafkaStreams.state()));
        }
        return details;
    }

    private static Map<String, Object> taskDetails(Set<TaskMetadata> taskMetadata) {
        HashMap<String, Object> details = new HashMap<String, Object>();
        for (TaskMetadata metadata : taskMetadata) {
            details.put("taskId", metadata.taskId());
            if (details.containsKey("partitions")) {
                List partitionsInfo = (List)details.get("partitions");
                partitionsInfo.addAll(KafkaStreamsBinderHealthIndicator.addPartitionsInfo(metadata));
                continue;
            }
            details.put("partitions", KafkaStreamsBinderHealthIndicator.addPartitionsInfo(metadata));
        }
        return details;
    }

    private static List<String> addPartitionsInfo(TaskMetadata metadata) {
        return metadata.topicPartitions().stream().map(p -> "partition=" + p.partition() + ", topic=" + p.topic()).collect(Collectors.toList());
    }

    public void destroy() throws Exception {
        if (this.adminClient != null) {
            this.adminClient.close(Duration.ofSeconds(0L));
        }
    }

    static {
        try {
            Method[] declaredMethods;
            Class<?> KAFKA_STREAMS_STATE_CLASS = CLASS_LOADER.loadClass("org.apache.kafka.streams.KafkaStreams$State");
            for (Method m : declaredMethods = KAFKA_STREAMS_STATE_CLASS.getDeclaredMethods()) {
                if (!m.getName().equals("isRunning")) continue;
                isKafkaStreams25 = false;
                methodForIsRunning = m;
            }
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException("KafkaStreams$State class not found", e);
        }
    }
}

