/*
 * Decompiled with CFR 0.152.
 */
package kafka.catalog;

import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.catalog.MetadataCollectorEventQueue;
import kafka.catalog.ZKMetadataCollectorConfig;
import kafka.catalog.ZKMetadataCollectorContext;
import kafka.catalog.event.BrokerDefaultConfigChangeEvent;
import kafka.catalog.event.ClusterLinkConfigChangeEvent;
import kafka.catalog.event.ClusterLinkCreationEvent;
import kafka.catalog.event.ClusterLinkDeletionEvent;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.MirrorTopicChangeEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.catalog.metadata.ClusterLinkInfo;
import kafka.catalog.metadata.TopicInfo;
import kafka.common.TenantHelpers;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.slf4j.Logger;

public class ZKMetadataCollector {
    private final Logger log;
    private final ZKMetadataCollectorConfig config;
    private final KafkaZkClient zkClient;
    private final KafkaConfig kafkaConfig;
    private final Metrics metrics;
    private final MetadataCollectorEventQueue eventQueue;
    private final Time time;
    private volatile Optional<ZKMetadataCollectorContext> context;
    private final MetricName activeCollectorMetricName;
    private volatile Boolean isCollecting;

    public ZKMetadataCollector(KafkaConfig config, KafkaZkClient zkClient, Metrics metrics, Time time) {
        this(config, new ZKMetadataCollectorConfig(config), zkClient, metrics, new LogContext("[ZKMetadataCollector id=" + config.nodeId() + "]").logger(ZKMetadataCollector.class), new MetadataCollectorEventQueue(time), Optional.empty(), false, time);
    }

    ZKMetadataCollector(KafkaConfig kafkaConfig, ZKMetadataCollectorConfig config, KafkaZkClient zkClient, Metrics metrics, Logger log, MetadataCollectorEventQueue eventQueue, Optional<ZKMetadataCollectorContext> context, Boolean isCollecting, Time time) {
        this.isCollecting = isCollecting;
        this.kafkaConfig = kafkaConfig;
        this.config = config;
        this.zkClient = zkClient;
        this.metrics = metrics;
        this.log = log;
        this.eventQueue = eventQueue;
        this.context = context;
        this.time = time;
        this.activeCollectorMetricName = this.metrics.metricName("active-collector", "catalog-metrics", "Reports 1 if the catalog metadata collector is active, 0 otherwise.");
        this.registerMetric();
        log.info("Constructed, snapshot init delay {}s, interval {}s", (Object)this.config.snapshotInitDelaySec, (Object)this.config.snapshotIntervalSec);
    }

    public void disable() {
        if (this.isCollecting.booleanValue()) {
            this.log.info("Disabling collector");
            this.stop();
        } else {
            this.log.warn("Trying to disable an already disabled collector");
        }
    }

    public void enable(Map<String, TopicInfo> topicsWithInfo, Map<String, ClusterLinkInfo> clusterLinksWithInfo, int epoch) {
        if (this.isCollecting.booleanValue()) {
            this.log.warn("Trying to re-enable an already enabled collector");
        } else {
            this.log.info("Enabling collector");
            this.start(topicsWithInfo, clusterLinksWithInfo, epoch);
        }
    }

    private void start(Map<String, TopicInfo> topicsWithInfo, Map<String, ClusterLinkInfo> clusterLinksWithInfo, int epoch) {
        this.tryExecute(() -> {
            this.eventQueue.append(new CollectorStartupEvent(this, this.config, topicsWithInfo, clusterLinksWithInfo, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, epoch, this.time));
            this.isCollecting = true;
        }, e -> {
            this.log.error("Failed to start due to", (Throwable)e);
            this.stop();
        });
    }

    private void stop() {
        this.tryExecute(() -> {
            this.appendToQueue(new CollectorStopEvent(this, this.time));
            this.isCollecting = false;
        }, e -> this.log.error("Failed to stop due to", (Throwable)e));
    }

    public void shutdown() {
        try {
            this.stop();
            this.eventQueue.close();
            this.log.info("Finished shutdown.");
        }
        catch (Exception e) {
            this.log.error("Failed to shutdown due to", (Throwable)e);
        }
        finally {
            this.removeMetric();
        }
    }

    public boolean isActive() {
        return this.isCollecting != false && this.context.isPresent();
    }

    public Optional<ZKMetadataCollectorContext> collectorContext() {
        return this.context;
    }

    public void setCollectorContext(Optional<ZKMetadataCollectorContext> contextOptional) {
        this.context = contextOptional;
    }

    EventEmitter eventEmitter() {
        return this.metrics.eventEmitter();
    }

    public void onClusterLinkCreate(ClusterLinkInfo newClusterLink) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            String linkName = newClusterLink.clusterLinkName();
            if (TenantHelpers.isTenantPrefixed(linkName)) {
                this.appendToQueue(new ClusterLinkCreationEvent(this, Collections.singletonMap(linkName, newClusterLink), this.time));
            }
        }, e -> this.log.error("Failed to process cluster link creation: ", (Throwable)e));
    }

    public void onClusterLinkConfigChange(String clusterLink, ClusterLinkConfig newConfig) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            String logicalClusterId = TenantHelpers.extractTenantPrefix(clusterLink, false);
            if (logicalClusterId == null) {
                return;
            }
            this.appendToQueue(new ClusterLinkConfigChangeEvent(this, logicalClusterId, clusterLink, newConfig, this.time));
        }, e -> this.log.error("Failed to process cluster link config change: ", (Throwable)e));
    }

    public void onClusterLinkDelete(String deletedClusterLink) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            if (TenantHelpers.isTenantPrefixed(deletedClusterLink)) {
                this.appendToQueue(new ClusterLinkDeletionEvent(this, Collections.singleton(deletedClusterLink), this.time));
            }
        }, e -> this.log.error("Failed to process cluster link deletion: ", (Throwable)e));
    }

    public void onMirrorTopicStateChange(String topic, String mirrorTopicState) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            if (TenantHelpers.isTenantPrefixed(topic)) {
                String logicalClusterId = TenantHelpers.extractTenantPrefix(topic, false);
                this.appendToQueue(new MirrorTopicChangeEvent(this, logicalClusterId, topic, mirrorTopicState, this.time));
            }
        }, e -> this.log.error("Failed to process mirror topic state change due to ", (Throwable)e));
    }

    public void onTopicCreate(Map<String, TopicInfo> newTopicWithInfo) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            Map<String, TopicInfo> filteredMap = newTopicWithInfo.entrySet().stream().filter(entry -> !Objects.equals(((TopicInfo)entry.getValue()).logicalClusterId(), "")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            if (filteredMap.size() > 0) {
                this.appendToQueue(new TopicCreationEvent(this, new HashMap<String, TopicInfo>(filteredMap), this.time));
            }
        }, e -> this.log.error("Failed to process topic creation due to", (Throwable)e));
    }

    public void onTopicDelete(Set<String> deleteTopics) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            Set filteredSet = deleteTopics.stream().filter(TenantHelpers::isTenantPrefixed).collect(Collectors.toSet());
            if (filteredSet.size() > 0) {
                this.appendToQueue(new TopicDeletionEvent(this, new HashSet<String>(filteredSet), this.time));
            }
        }, e -> this.log.error("Failed to process topic deletion due to", (Throwable)e));
    }

    public void onTopicPartitionChange(String topic, int newPartition) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            String logicalClusterId = TenantHelpers.extractTenantPrefix(topic, false);
            if (logicalClusterId == null) {
                return;
            }
            this.appendToQueue(new TopicPartitionChangeEvent(this, logicalClusterId, topic, newPartition, this.time));
        }, e -> this.log.error("Failed to process topic partition change due to", (Throwable)e));
    }

    public void onTopicConfigChange(String topic, LogConfig newConfig, Properties updateProperties) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> {
            String logicalClusterId = TenantHelpers.extractTenantPrefix(topic, false);
            if (logicalClusterId == null) {
                return;
            }
            Properties propertiesCopy = new Properties();
            propertiesCopy.putAll((Map<?, ?>)updateProperties);
            this.appendToQueue(new TopicConfigChangeEvent(this, logicalClusterId, topic, newConfig, propertiesCopy, this.time));
        }, e -> this.log.error("Failed to process topic config change due to", (Throwable)e));
    }

    public void onBrokerDefaultConfigChange(KafkaConfig oldConfig, KafkaConfig newConfig) {
        if (!this.isCollecting.booleanValue()) {
            return;
        }
        this.tryExecute(() -> this.appendToQueue(new BrokerDefaultConfigChangeEvent(this, oldConfig, newConfig, this.time)), e -> this.log.error("Failed to process broker config change due to", (Throwable)e));
    }

    private void appendToQueue(MetadataCollectorEvent event) {
        try {
            this.eventQueue.append(event);
        }
        catch (IllegalStateException e) {
            this.log.warn("Event {} will be ignore because the EventQueue is closing due to {}.", (Object)event, (Object)e);
        }
    }

    private void registerMetric() {
        this.metrics.addMetric(this.activeCollectorMetricName, (config, now) -> this.isActive() ? 1.0 : 0.0);
    }

    private void removeMetric() {
        this.metrics.removeMetric(this.activeCollectorMetricName);
    }

    private void tryExecute(Runnable runnable, Consumer<Exception> errorHandler) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            errorHandler.accept(e);
        }
    }
}

