/*
 * Decompiled with CFR 0.152.
 */
package kafka.catalog.event;

import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import io.confluent.telemetry.api.events.NoOpEventEmitter;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import kafka.catalog.MetadataEventUtils;
import kafka.catalog.ZKTopicMetadataCollector;
import kafka.catalog.ZKTopicMetadataCollectorContext;
import kafka.catalog.exceptions.CollectorContextNotInitializedException;
import kafka.catalog.exceptions.TopicConfigFetchRequestException;
import kafka.log.LogConfig;
import kafka.zk.TopicZNode;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Map;

public abstract class MetadataCollectorEvent
implements EventQueue.Event {
    protected static final Logger LOG = LoggerFactory.getLogger(MetadataCollectorEvent.class);
    protected ZKTopicMetadataCollector collector;
    protected long eventObserveTime;
    protected final Time time;
    public static final String SNAPSHOT_EVENT_TAG = "SNAPSHOT_EVENT";
    public static final String CACHE_BUILD_EVENT_TAG = "CACHE_BUILD_EVENT";

    public MetadataCollectorEvent(ZKTopicMetadataCollector collector, Time time) {
        this.collector = collector;
        this.time = time;
        this.eventObserveTime = time.nanoseconds();
    }

    public void handleException(Throwable e) {
        if (e instanceof RejectedExecutionException) {
            LOG.info("Not processing {} because the event queue is closed.", (Object)this, (Object)e);
        } else {
            LOG.error("Unexpected error handling {}", (Object)this, (Object)e);
        }
        this.recordEventHandleError();
    }

    protected void recordEventHandleError() {
        try {
            this.context().catalogMetrics().collectorEventHandleErrorSensor.record();
        }
        catch (CollectorContextNotInitializedException ex) {
            LOG.warn("Ignore error in metrics due to ", (Throwable)ex);
        }
    }

    protected ZKTopicMetadataCollectorContext context() throws CollectorContextNotInitializedException {
        if (this.collector.collectorContext().isPresent()) {
            return this.collector.collectorContext().get();
        }
        throw new CollectorContextNotInitializedException("CollectorContext is not initialized yet.");
    }

    protected Optional<LogConfig> getLogConfigsForTopic(ZKTopicMetadataCollectorContext context, String topic) throws TopicConfigFetchRequestException {
        context.throttler().maybeThrottle(1.0);
        Set<String> topicSet = Collections.singleton(topic);
        Tuple2<Map<String, LogConfig>, Map<String, Exception>> logConfigsResult = context.zkClient().getLogConfigs((scala.collection.immutable.Set<String>)JavaConverters.asScalaSet(topicSet).toSet(), context.originalConfig().originals());
        java.util.Map exceptions = JavaConverters.mapAsJavaMap((Map)((Map)logConfigsResult._2()));
        if (exceptions != null && !exceptions.isEmpty()) {
            throw new TopicConfigFetchRequestException(String.format("Encounter error when getting LogConfig for topic %s", topic), (Throwable)exceptions.get(topic));
        }
        return Optional.ofNullable(JavaConverters.mapAsJavaMap((Map)((Map)logConfigsResult._1())).get(topic));
    }

    protected TopicZNode.TopicIdReplicaAssignment getTopicIdReplicaAssignmentFromZk(ZKTopicMetadataCollectorContext context, String topic) {
        context.throttler().maybeThrottle(1.0);
        return (TopicZNode.TopicIdReplicaAssignment)context.zkClient().getReplicaAssignmentAndTopicIdForTopics((scala.collection.immutable.Set<String>)JavaConverters.asScalaSet(Collections.singleton(topic)).toSet()).head();
    }

    protected void emitDeltaEvent(ZKTopicMetadataCollectorContext context, MetadataChange metadataChange) {
        Event toEmit = MetadataEventUtils.topicMetadataDeltaCloudEvent(metadataChange, context.epoch(), context.config().destTopic);
        this.emitAndLogError(context, toEmit);
    }

    protected void emitSnapshotEvent(ZKTopicMetadataCollectorContext context, MetadataChange metadataChange, int page, int total) {
        Event toEmit = MetadataEventUtils.topicMetadataSnapshotCloudEvent(metadataChange, context.epoch(), context.config().destTopic, page, total);
        this.emitAndLogError(context, toEmit);
    }

    private void emitAndLogError(ZKTopicMetadataCollectorContext context, Event toEmit) {
        EventEmitter eventEmitter = context.eventEmitter();
        if (eventEmitter instanceof NoOpEventEmitter) {
            throw new IllegalStateException("No EventEmitter configured.");
        }
        ((CompletableFuture)eventEmitter.emit(toEmit).thenApplyAsync(emitted -> {
            if (!emitted.booleanValue()) {
                LOG.error("Failed to emit event {}", (Object)toEmit);
                context.catalogMetrics().collectorEventHandleErrorSensor.record();
            } else if (toEmit.type().equals("TOPIC_SNAPSHOT")) {
                context.catalogMetrics().snapshotEventEmitSensor.record();
            } else {
                context.catalogMetrics().deltaEventEmitSensor.record();
            }
            return null;
        })).exceptionally(e -> {
            LOG.error("Failed to emit event {}. This should never happen because the emitter would not throw an exception", (Object)toEmit, e);
            context.catalogMetrics().collectorEventHandleErrorSensor.record();
            return null;
        });
    }
}

