package kafka.catalog;

import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.TenantHelpers;
import kafka.log.LogConfig;
import kafka.server.KafkaConfig;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataImageListener;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.codehaus.plexus.util.SelectorUtils;
import org.slf4j.Logger;
import scala.Function0;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:kafka/catalog/KRaftTopicMetadataCollector.class */
public class KRaftTopicMetadataCollector implements MetadataImageListener {
    private final Logger log;
    private final int snapshotInitDelay;
    private final int snapshotInterval;
    private final int maxTopicsInSnapshot;
    private final String destTopic;
    private final int nodeId;
    private final KafkaConfig kafkaConfig;
    private final Time time;
    private final Metrics metrics;
    private final MetricName activeCollectorMetricName;
    private final AtomicReference<MetadataImage> latestImage = new AtomicReference<>(null);
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final AtomicInteger epoch = new AtomicInteger(-1);
    private final AtomicReference<CatalogMetrics> catalogMetrics = new AtomicReference<>(null);
    private final AtomicReference<Scheduler> snapshotScheduler = new AtomicReference<>(null);

    public KRaftTopicMetadataCollector(Metrics metrics, int i, int i2, int i3, String str, int i4, KafkaConfig kafkaConfig, Time time) {
        this.metrics = metrics;
        this.snapshotInitDelay = i;
        this.snapshotInterval = i2;
        this.maxTopicsInSnapshot = i3;
        this.destTopic = str;
        this.nodeId = i4;
        this.kafkaConfig = kafkaConfig;
        this.time = time;
        this.activeCollectorMetricName = metrics.metricName(CatalogMetrics.ACTIVE_COLLECTOR, CatalogMetrics.GROUP_NAME, CatalogMetrics.ACTIVE_COLLECTOR_DOC);
        registerMetric();
        this.log = new LogContext("[KRaftTopicMedataCollector id=" + i4 + SelectorUtils.PATTERN_HANDLER_SUFFIX).logger(getClass());
        this.log.info("Constructed, snapshot init delay {}s, interval {}s", Integer.valueOf(i), Integer.valueOf(i2));
    }

    private Scheduler registerSnapshotTask(int i, int i2) {
        KafkaScheduler kafkaScheduler = new KafkaScheduler(1, "catalog-metadata-snapshot-", true, false);
        kafkaScheduler.startup();
        kafkaScheduler.schedule("TopicMetadataSnapshotEmitter", emitMetadataSnapshot(), i, i2, TimeUnit.SECONDS);
        return kafkaScheduler;
    }

    private void deregisterSnapshotTask(Scheduler scheduler) {
        if (scheduler == null) {
            return;
        }
        scheduler.shutdown();
    }

    public void start() {
        if (this.snapshotScheduler.get() != null) {
            throw new IllegalStateException("Cannot start a topic metadata collector multiple times");
        }
        this.snapshotScheduler.set(registerSnapshotTask(this.snapshotInitDelay, this.snapshotInterval));
    }

    public void stop() {
        deregisterSnapshotTask(this.snapshotScheduler.get());
        removeMetric();
    }

    public boolean isActive() {
        return this.isActive.get();
    }

    EventEmitter eventEmitter() {
        return this.metrics.eventEmitter();
    }

    CatalogMetrics catalogMetrics() {
        return this.catalogMetrics.get();
    }

    @Override // org.apache.kafka.image.MetadataImageListener
    public void onLeaderUpdate(LeaderAndEpoch leaderAndEpoch) {
        try {
            this.epoch.set(leaderAndEpoch.epoch());
            if (leaderAndEpoch.leaderId().equals(OptionalInt.of(this.nodeId))) {
                CatalogMetrics andSet = this.catalogMetrics.getAndSet(null);
                if (andSet != null) {
                    andSet.removeCatalogMetrics();
                }
                this.catalogMetrics.set(new CatalogMetrics(this.metrics, () -> {
                    return 0;
                }));
                this.isActive.set(true);
                this.log.info("MetadataCollector is active");
            } else if (this.isActive.compareAndSet(true, false)) {
                CatalogMetrics andSet2 = this.catalogMetrics.getAndSet(null);
                if (andSet2 != null) {
                    andSet2.removeCatalogMetrics();
                }
                this.log.info("MetadataCollector is no longer active");
            }
        } catch (Exception e) {
            this.log.error("Encountered exception when reacting to leadership change", (Throwable) e);
            logEventHandleError();
        }
    }

    @Override // org.apache.kafka.image.MetadataImageListener
    public void onMetadataImageUpdate(MetadataDelta metadataDelta, MetadataImage metadataImage) {
        try {
            this.latestImage.set(metadataImage);
            if (this.isActive.get()) {
                processDeletedTopics(metadataDelta);
                processChangedTopics(metadataDelta, metadataImage);
            }
        } catch (Exception e) {
            this.log.error("Encountered exception when processing metadata image updates", (Throwable) e);
            logEventHandleError();
        }
    }

    public Function0<BoxedUnit> emitMetadataSnapshot() {
        return () -> {
            try {
            } catch (Exception e) {
                this.log.error("Encountered exception when emitting snapshot events", (Throwable) e);
                logEventHandleError();
            }
            if (!this.isActive.get()) {
                return BoxedUnit.UNIT;
            }
            MetadataImage metadataImage = this.latestImage.get();
            if (metadataImage != null && !metadataImage.isEmpty()) {
                HashMap hashMap = new HashMap();
                metadataImage.topics().topicsByName().forEach((str, topicImage) -> {
                    String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
                    if (extractTenantPrefix == null || extractTenantPrefix.isEmpty()) {
                        return;
                    }
                    ((Set) hashMap.computeIfAbsent(extractTenantPrefix, str -> {
                        return new TreeSet();
                    })).add(topicImage.id());
                });
                hashMap.forEach((str2, set) -> {
                    sendSnapshot(str2, set, metadataImage);
                });
            }
            return BoxedUnit.UNIT;
        };
    }

    private void sendSnapshot(String str, Set<Uuid> set, MetadataImage metadataImage) {
        int min = Math.min(set.size(), this.maxTopicsInSnapshot);
        int numberOfSnapshotPages = MetadataEventUtils.getNumberOfSnapshotPages(set.size(), min);
        int i = 0;
        ArrayList arrayList = new ArrayList(min);
        this.log.debug("Creating Snapshot for tenant {} with {} topics, {} pages and {} total", str, Integer.valueOf(set.size()), 0, Integer.valueOf(numberOfSnapshotPages));
        for (Uuid uuid : set) {
            if (arrayList.size() >= min) {
                MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataSnapshotCloudEvent(MetadataEventUtils.snapshotEvent(str, arrayList), this.epoch.get(), this.destTopic, i, numberOfSnapshotPages), this.catalogMetrics.get(), this.log);
                arrayList = new ArrayList(min);
                i++;
            }
            arrayList.add(getMetadataEventFromImage(metadataImage, metadataImage.topics().getTopic(uuid), false, false));
        }
        MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataSnapshotCloudEvent(MetadataEventUtils.snapshotEvent(str, arrayList), this.epoch.get(), this.destTopic, i, numberOfSnapshotPages), this.catalogMetrics.get(), this.log);
    }

    private void processDeletedTopics(MetadataDelta metadataDelta) {
        TopicsDelta topicsDelta = metadataDelta.topicsDelta();
        if (topicsDelta == null) {
            return;
        }
        for (Uuid uuid : topicsDelta.deletedTopicIds()) {
            String name = topicsDelta.image().getTopic(uuid).name();
            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(name, false);
            if (extractTenantPrefix != null && !extractTenantPrefix.isEmpty()) {
                MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.topicDeleteEvent(extractTenantPrefix, MetadataEventUtils.topicMetadataEventForDeletion(TenantHelpers.extractLogicalName(name), Optional.of(uuid.toString()), Timestamps.fromMillis(this.time.milliseconds()))), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
            }
        }
    }

    private void processChangedTopics(MetadataDelta metadataDelta, MetadataImage metadataImage) {
        TopicsDelta topicsDelta = metadataDelta.topicsDelta();
        HashSet<Uuid> hashSet = new HashSet();
        if (topicsDelta != null) {
            for (Uuid uuid : topicsDelta.changedTopics().keySet()) {
                if (topicsDelta.image().getTopic(uuid) == null) {
                    TopicImage topic = metadataImage.topics().getTopic(uuid);
                    String extractTenantPrefix = TenantHelpers.extractTenantPrefix(topic.name(), false);
                    if (extractTenantPrefix != null && !extractTenantPrefix.isEmpty()) {
                        MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.topicCreateEvent(extractTenantPrefix, getMetadataEventFromImage(metadataImage, topic, true, false)), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                    }
                } else {
                    hashSet.add(uuid);
                }
            }
        }
        if (metadataDelta.configsDelta() != null) {
            metadataDelta.configsDelta().changes().keySet().stream().filter(configResource -> {
                return configResource.type().equals(ConfigResource.Type.TOPIC);
            }).filter(configResource2 -> {
                return metadataDelta.image().topics().getTopic(configResource2.name()) != null;
            }).map(configResource3 -> {
                return metadataImage.topics().getTopic(configResource3.name());
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(topicImage -> {
                hashSet.add(topicImage.id());
            });
        }
        for (Uuid uuid2 : hashSet) {
            TopicImage topic2 = metadataImage.topics().getTopic(uuid2);
            String extractTenantPrefix2 = TenantHelpers.extractTenantPrefix(topic2.name(), false);
            if (extractTenantPrefix2 != null && !extractTenantPrefix2.isEmpty()) {
                TopicImage topic3 = metadataDelta.image().topics().getTopic(uuid2);
                MetadataEvent metadataEventFromImage = getMetadataEventFromImage(metadataImage, topic2, true, true);
                if (MetadataEventUtils.eventHasChanged(getMetadataEventFromImage(metadataDelta.image(), topic3, true, true), metadataEventFromImage)) {
                    MetadataEventUtils.emitAndLogError(this.metrics.eventEmitter(), MetadataEventUtils.topicMetadataDeltaCloudEvent(MetadataEventUtils.topicUpdateEvent(extractTenantPrefix2, metadataEventFromImage), this.epoch.get(), this.destTopic), this.catalogMetrics.get(), this.log);
                }
            }
        }
    }

    private LogConfig extractLogConfigFromImage(MetadataImage metadataImage, String str) {
        return LogConfig.fromProps(LogConfig.extractLogConfigMap(this.kafkaConfig), metadataImage.configs().configProperties(new ConfigResource(ConfigResource.Type.TOPIC, str)));
    }

    private MetadataEvent getMetadataEventFromImage(MetadataImage metadataImage, TopicImage topicImage, boolean z, boolean z2) {
        LogConfig extractLogConfigFromImage = extractLogConfigFromImage(metadataImage, topicImage.name());
        Timestamp fromMillis = Timestamps.fromMillis(this.time.milliseconds());
        return MetadataEventUtils.topicMetadataEventFromLogConfig(extractLogConfigFromImage, TenantHelpers.extractLogicalName(topicImage.name()), topicImage.id(), topicImage.partitions().size(), topicImage.partitions().values().iterator().next().replicas.length, (!z || z2) ? null : fromMillis, (z && z2) ? fromMillis : null);
    }

    private void registerMetric() {
        this.metrics.addMetric(this.activeCollectorMetricName, (metricConfig, j) -> {
            if (isActive()) {
                return 1.0d;
            }
            return ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT;
        });
    }

    private void removeMetric() {
        this.metrics.removeMetric(this.activeCollectorMetricName);
    }

    private void logEventHandleError() {
        if (this.catalogMetrics.get() != null) {
            this.catalogMetrics.get().collectorEventHandleErrorSensor.record();
        }
    }
}
