package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderHealthIndicator.class */
public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator implements DisposableBean {
    private static ClassLoader CLASS_LOADER = KafkaStreamsBinderHealthIndicator.class.getClassLoader();
    private static boolean isKafkaStreams25;
    private static Method methodForIsRunning;
    private final KafkaStreamsRegistry kafkaStreamsRegistry;
    private final KafkaStreamsBinderConfigurationProperties configurationProperties;
    private final Map<String, Object> adminClientProperties;
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private static final ThreadLocal<Status> healthStatusThreadLocal;
    private AdminClient adminClient;
    private final Lock lock;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsBinderHealthIndicator(KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, KafkaProperties kafkaProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
        super("Kafka-streams health check failed");
        this.lock = new ReentrantLock();
        kafkaProperties.buildAdminProperties();
        this.configurationProperties = kafkaStreamsBinderConfigurationProperties;
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        KafkaTopicProvisioner.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaStreamsBinderConfigurationProperties);
        this.kafkaStreamsRegistry = kafkaStreamsRegistry;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
    }

    protected void doHealthCheck(Health.Builder builder) throws Exception {
        try {
            try {
                this.lock.lock();
                if (this.adminClient == null) {
                    this.adminClient = AdminClient.create(this.adminClientProperties);
                }
                if (healthStatusThreadLocal.get() == Status.DOWN) {
                    builder.withDetail("No topic information available", "Kafka broker is not reachable");
                    builder.status(Status.DOWN);
                } else {
                    this.adminClient.listTopics().listings().get(this.configurationProperties.getHealthTimeout(), TimeUnit.SECONDS);
                    if (this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans().isEmpty()) {
                        builder.withDetail("No Kafka Streams bindings have been established", "Kafka Streams binder did not detect any processors");
                        builder.status(Status.UNKNOWN);
                    } else {
                        boolean z = true;
                        for (KafkaStreams kafkaStreams : this.kafkaStreamsRegistry.getKafkaStreams()) {
                            z = isKafkaStreams25 ? z & kafkaStreams.state().isRunningOrRebalancing() : z & ((Boolean) methodForIsRunning.invoke(kafkaStreams.state(), new Object[0])).booleanValue();
                            builder.withDetails(buildDetails(kafkaStreams));
                        }
                        builder.status(z ? Status.UP : Status.DOWN);
                    }
                }
                this.lock.unlock();
            } catch (Exception e) {
                builder.withDetail("No topic information available", "Kafka broker is not reachable");
                builder.status(Status.DOWN);
                builder.withException(e);
                healthStatusThreadLocal.set(Status.DOWN);
                this.lock.unlock();
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) throws Exception {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (isKafkaStreams25 ? kafkaStreams.state().isRunningOrRebalancing() : ((Boolean) methodForIsRunning.invoke(kafkaStreams.state(), new Object[0])).booleanValue()) {
            for (ThreadMetadata threadMetadata : kafkaStreams.localThreadsMetadata()) {
                hashMap2.put("threadName", threadMetadata.threadName());
                hashMap2.put("threadState", threadMetadata.threadState());
                hashMap2.put("adminClientId", threadMetadata.adminClientId());
                hashMap2.put("consumerClientId", threadMetadata.consumerClientId());
                hashMap2.put("restoreConsumerClientId", threadMetadata.restoreConsumerClientId());
                hashMap2.put("producerClientIds", threadMetadata.producerClientIds());
                hashMap2.put("activeTasks", taskDetails(threadMetadata.activeTasks()));
                hashMap2.put("standbyTasks", taskDetails(threadMetadata.standbyTasks()));
            }
            hashMap.put((String) this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams).getStreamsConfiguration().get("application.id"), hashMap2);
        } else {
            String str = (String) this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams).getStreamsConfiguration().get("application.id");
            hashMap.put(str, String.format("The processor with application.id %s is down", str));
        }
        return hashMap;
    }

    private static Map<String, Object> taskDetails(Set<TaskMetadata> set) {
        HashMap hashMap = new HashMap();
        for (TaskMetadata taskMetadata : set) {
            hashMap.put("taskId", taskMetadata.taskId());
            if (hashMap.containsKey("partitions")) {
                ((List) hashMap.get("partitions")).addAll(addPartitionsInfo(taskMetadata));
            } else {
                hashMap.put("partitions", addPartitionsInfo(taskMetadata));
            }
        }
        return hashMap;
    }

    private static List<String> addPartitionsInfo(TaskMetadata taskMetadata) {
        return (List) taskMetadata.topicPartitions().stream().map(topicPartition -> {
            return "partition=" + topicPartition.partition() + ", topic=" + topicPartition.topic();
        }).collect(Collectors.toList());
    }

    public void destroy() throws Exception {
        if (this.adminClient != null) {
            this.adminClient.close(Duration.ofSeconds(0L));
        }
    }

    static {
        isKafkaStreams25 = true;
        try {
            for (Method method : CLASS_LOADER.loadClass("org.apache.kafka.streams.KafkaStreams$State").getDeclaredMethods()) {
                if (method.getName().equals("isRunning")) {
                    isKafkaStreams25 = false;
                    methodForIsRunning = method;
                }
            }
            healthStatusThreadLocal = new ThreadLocal<>();
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException("KafkaStreams$State class not found", e);
        }
    }
}
