/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.health;

import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.health.BaseHealth;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;

public class KafkaSourceReadinessHealth
extends BaseHealth {
    private final KafkaAdminClient admin;
    private final KafkaConnectorIncomingConfiguration config;
    private final Pattern pattern;
    private final String channel;
    private final Set<String> topics;
    private final Metric metric;
    private final KafkaSource<?, ?> source;

    public KafkaSourceReadinessHealth(KafkaSource<?, ?> source, Vertx vertx, KafkaConnectorIncomingConfiguration config, Map<String, ?> kafkaConfiguration, Consumer<?, ?> consumer, Set<String> topics, Pattern pattern) {
        super(config.getChannel());
        this.config = config;
        this.channel = config.getChannel();
        this.topics = topics;
        this.pattern = pattern;
        this.source = source;
        if (config.getHealthReadinessTopicVerification().booleanValue()) {
            HashMap<String, Object> adminConfiguration = new HashMap<String, Object>(kafkaConfiguration);
            this.admin = KafkaAdminHelper.createAdminClient(vertx, adminConfiguration, config.getChannel(), true);
            this.metric = null;
        } else {
            this.admin = null;
            Map metrics = consumer.metrics();
            this.metric = this.getMetric(metrics);
        }
    }

    @Override
    protected void metricsBasedHealthCheck(HealthReport.HealthReportBuilder builder) {
        if (this.metric != null) {
            boolean connected = (Double)this.metric.metricValue() >= 1.0;
            boolean hasSubscribers = this.source.hasSubscribers();
            if (connected) {
                builder.add(this.channel, true);
            } else if (!hasSubscribers) {
                builder.add(this.channel, true, "no subscription yet, so no connection to the Kafka broker yet");
            } else {
                builder.add(this.channel, false);
            }
        } else {
            builder.add(this.channel, true).build();
        }
    }

    @Override
    protected void adminBasedHealthCheck(HealthReport.HealthReportBuilder builder) {
        try {
            Set existingTopics = (Set)this.admin.listTopics().await().atMost(Duration.ofMillis(this.config.getHealthReadinessTimeout()));
            if (this.pattern == null && existingTopics.containsAll(this.topics)) {
                builder.add(this.channel, true);
            } else if (this.pattern != null) {
                boolean ok = existingTopics.stream().anyMatch(s -> this.pattern.matcher((CharSequence)s).matches());
                if (ok) {
                    builder.add(this.channel, ok);
                } else {
                    builder.add(this.channel, false, "Unable to find a topic matching the given pattern: " + this.pattern);
                }
            } else {
                String missing = this.topics.stream().filter(s -> !existingTopics.contains(s)).collect(Collectors.joining());
                builder.add(this.channel, false, "Unable to find topic(s): " + missing);
            }
        }
        catch (Exception failed) {
            builder.add(this.channel, false, "No response from broker for channel " + this.channel + " : " + failed);
        }
    }

    @Override
    public KafkaAdminClient getAdmin() {
        return this.admin;
    }
}

