/*
 * Decompiled with CFR 0.152.
 */
package kafka.catalog;

import io.confluent.telemetry.api.events.EventEmitter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.catalog.MetadataCollectorEventQueue;
import kafka.catalog.TopicInfo;
import kafka.catalog.ZKTopicMetadataCollectorConfig;
import kafka.catalog.ZKTopicMetadataCollectorContext;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.common.TenantHelpers;
import kafka.log.LogConfig;
import kafka.server.KafkaConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public class ZKTopicMetadataCollector {
    private final Logger log;
    private final ZKTopicMetadataCollectorConfig config;
    private final KafkaZkClient zkClient;
    private final KafkaConfig kafkaConfig;
    private final Metrics metrics;
    private final MetadataCollectorEventQueue eventQueue;
    private final Time time;
    private volatile Optional<ZKTopicMetadataCollectorContext> context;
    private final MetricName activeCollectorMetricName;

    public ZKTopicMetadataCollector(KafkaConfig config, KafkaZkClient zkClient, Metrics metrics, Time time) {
        this(config, new ZKTopicMetadataCollectorConfig(config), zkClient, metrics, new LogContext("[ZKTopicMetadataCollector id=" + config.nodeId() + "]").logger(ZKTopicMetadataCollector.class.getClass()), new MetadataCollectorEventQueue(time), Optional.empty(), time);
    }

    ZKTopicMetadataCollector(KafkaConfig kafkaConfig, ZKTopicMetadataCollectorConfig config, KafkaZkClient zkClient, Metrics metrics, Logger log, MetadataCollectorEventQueue eventQueue, Optional<ZKTopicMetadataCollectorContext> context, Time time) {
        this.kafkaConfig = kafkaConfig;
        this.config = config;
        this.zkClient = zkClient;
        this.metrics = metrics;
        this.log = log;
        this.eventQueue = eventQueue;
        this.context = context;
        this.time = time;
        this.activeCollectorMetricName = this.metrics.metricName("active-collector", "catalog-metrics", "Reports 1 if the catalog metadata collector is active, 0 otherwise.");
        this.registerMetric();
        log.info("Constructed, snapshot init delay {}s, interval {}s", (Object)this.config.snapshotInitDelaySec, (Object)this.config.snapshotIntervalSec);
    }

    public void start(Map<String, TopicInfo> topicsWithInfo, int epoch) {
        this.tryExecute(() -> this.eventQueue.append(new CollectorStartupEvent(this, this.config, topicsWithInfo, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, epoch, this.time)), e -> {
            this.log.error("Failed to start due to", (Throwable)e);
            this.stop();
        });
    }

    public void stop() {
        this.tryExecute(() -> this.appendToQueue(new CollectorStopEvent(this, this.time)), e -> this.log.error("Failed to stop due to", (Throwable)e));
    }

    public void shutdown() {
        try {
            this.stop();
            this.eventQueue.close();
            this.log.info("Finished shutdown.");
        }
        catch (Exception e) {
            this.log.error("Failed to shutdown due to", (Throwable)e);
        }
        finally {
            this.removeMetric();
        }
    }

    public boolean isActive() {
        return this.context.isPresent();
    }

    public Optional<ZKTopicMetadataCollectorContext> collectorContext() {
        return this.context;
    }

    public void setCollectorContext(Optional<ZKTopicMetadataCollectorContext> contextOptional) {
        this.context = contextOptional;
    }

    EventEmitter eventEmitter() {
        return this.metrics.eventEmitter();
    }

    public void onTopicCreate(Map<String, TopicInfo> newTopicWithInfo) {
        this.tryExecute(() -> {
            Map<String, TopicInfo> filteredMap = newTopicWithInfo.entrySet().stream().filter(entry -> !Objects.equals(((TopicInfo)entry.getValue()).logicalClusterId(), "")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            if (filteredMap.size() > 0) {
                this.appendToQueue(new TopicCreationEvent(this, new HashMap<String, TopicInfo>(filteredMap), this.time));
            }
        }, e -> this.log.error("Failed to process topic creation due to", (Throwable)e));
    }

    public void onTopicDelete(Set<String> deleteTopics) {
        this.tryExecute(() -> {
            Set filteredSet = deleteTopics.stream().filter(TenantHelpers::isTenantPrefixed).collect(Collectors.toSet());
            if (filteredSet.size() > 0) {
                this.appendToQueue(new TopicDeletionEvent(this, new HashSet<String>(filteredSet), this.time));
            }
        }, e -> this.log.error("Failed to process topic deletion due to", (Throwable)e));
    }

    public void onTopicPartitionChange(String topic, int newPartition) {
        this.tryExecute(() -> {
            String logicalClusterId = TenantHelpers.extractTenantPrefix(topic, false);
            if (logicalClusterId == null) {
                return;
            }
            this.appendToQueue(new TopicPartitionChangeEvent(this, logicalClusterId, topic, newPartition, this.time));
        }, e -> this.log.error("Failed to process topic partition change due to", (Throwable)e));
    }

    public void onTopicConfigChange(String topic, LogConfig newConfig) {
        this.tryExecute(() -> {
            String logicalClusterId = TenantHelpers.extractTenantPrefix(topic, false);
            if (logicalClusterId == null) {
                return;
            }
            this.appendToQueue(new TopicConfigChangeEvent(this, logicalClusterId, topic, newConfig, this.time));
        }, e -> this.log.error("Failed to process topic config change due to", (Throwable)e));
    }

    private void appendToQueue(MetadataCollectorEvent event) {
        try {
            this.eventQueue.append(event);
        }
        catch (IllegalStateException e) {
            this.log.warn("Event {} will be ignore because the EventQueue is closing due to {}.", (Object)event, (Object)e);
        }
    }

    private void registerMetric() {
        this.metrics.addMetric(this.activeCollectorMetricName, (config, now) -> this.isActive() ? 1.0 : 0.0);
    }

    private void removeMetric() {
        this.metrics.removeMetric(this.activeCollectorMetricName);
    }

    private void tryExecute(Runnable runnable, Consumer<Exception> errorHandler) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            errorHandler.accept(e);
        }
    }
}

