/*
 * Decompiled with CFR 0.152.
 */
package kafka.catalog;

import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.OpType;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import kafka.catalog.MetadataCollectorEventQueue;
import kafka.catalog.TopicInfo;
import kafka.catalog.ZKTopicMetadataCollector;
import kafka.catalog.ZKTopicMetadataCollectorConfig;
import kafka.catalog.ZKTopicMetadataCollectorContext;
import kafka.catalog.event.CacheBuildEvent;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.SnapshotEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.log.LogConfig;
import kafka.server.KafkaConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Set;

public class MetadataCollectorEventTest {
    private ZKTopicMetadataCollectorContext context;
    private ZKTopicMetadataCollectorConfig config;
    private ArgumentCaptor<MetadataCollectorEvent> submittedEvent;
    private ArgumentCaptor<Event> emittedEvent;
    private Time time;
    private Metrics metrics;
    String tenant = "lkc-abc";
    String topic1 = "topic1";
    String topic2 = "topic2";
    String fullTopic1 = this.tenant + '_' + this.topic1;
    String fullTopic2 = this.tenant + '_' + this.topic2;
    Uuid topicId1 = Uuid.randomUuid();
    Uuid topicId2 = Uuid.randomUuid();
    @Mock
    private ZKTopicMetadataCollector collector;
    @Mock
    private MetadataCollectorEventQueue eventQueue;
    @Mock
    private EventEmitter mockEmitter;
    @Mock
    private KafkaZkClient zkClient;
    @Mock
    private KafkaConfig kafkaConfig;

    @BeforeEach
    void setup() {
        MockitoAnnotations.openMocks((Object)this);
        this.config = this.getZKTopicMetadataCollectorConfig();
        this.time = new MockTime(0L, 0L, 0L);
        this.submittedEvent = ArgumentCaptor.forClass(MetadataCollectorEvent.class);
        this.emittedEvent = ArgumentCaptor.forClass(Event.class);
        this.metrics = (Metrics)Mockito.spy(Metrics.class);
        Mockito.when((Object)this.metrics.eventEmitter()).thenReturn((Object)this.mockEmitter);
        Mockito.when((Object)this.mockEmitter.emit((Event)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
    }

    private ZKTopicMetadataCollectorConfig getZKTopicMetadataCollectorConfig() {
        return new ZKTopicMetadataCollectorConfig(0, 1, 1, 10, 1, "destination");
    }

    private void createInitialTopics() {
        HashMap<String, TopicInfo> topicsWithInfo = new HashMap<String, TopicInfo>();
        topicsWithInfo.put(this.fullTopic1, new TopicInfo(this.fullTopic1, this.topicId1, 1, 1));
        topicsWithInfo.put(this.fullTopic2, new TopicInfo(this.fullTopic2, this.topicId2, 2, 2));
        HashMap<String, Object> configMap = new HashMap<String, Object>();
        configMap.put(this.fullTopic1, Mockito.mock(LogConfig.class));
        configMap.put(this.fullTopic2, Mockito.mock(LogConfig.class));
        Mockito.when((Object)this.zkClient.getLogConfigs((Set)ArgumentMatchers.any(), (Map)ArgumentMatchers.any())).thenReturn((Object)new Tuple2((Object)JavaConverters.mapAsScalaMap(configMap), (Object)JavaConverters.mapAsScalaMap(Collections.emptyMap())));
        this.createContext(topicsWithInfo);
    }

    private void createContext(Map<String, TopicInfo> topicsWithInfo) {
        this.context = new ZKTopicMetadataCollectorContext(this.config, topicsWithInfo, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
    }

    private void createContextWithLocalStoreValue(ZKTopicMetadataCollectorConfig config) {
        this.context = new ZKTopicMetadataCollectorContext(config, Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        this.context.localStore().addMetadataEvent(this.tenant, this.fullTopic2, MetadataEvent.newBuilder().setTopicMetadata(TopicMetadata.newBuilder().setTopicName(this.topic2).build()).build());
        this.context.localStore().addMetadataEvent(this.tenant, this.fullTopic1, MetadataEvent.newBuilder().setTopicMetadata(TopicMetadata.newBuilder().setTopicName(this.topic1).build()).build());
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
    }

    @Test
    void testCollectorStartupEvent() throws Exception {
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.empty());
        ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Optional.class);
        CollectorStartupEvent csEvent = new CollectorStartupEvent(this.collector, this.config, Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        csEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).appendWithTag((String)ArgumentMatchers.eq((Object)"CACHE_BUILD_EVENT"), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        ((ZKTopicMetadataCollector)Mockito.verify((Object)this.collector)).setCollectorContext((Optional)contextArgumentCaptor.capture());
        Assertions.assertTrue((boolean)((Optional)contextArgumentCaptor.getValue()).isPresent());
    }

    @Test
    void testCollectorStopEvent() throws Exception {
        this.createInitialTopics();
        ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Optional.class);
        CollectorStopEvent cstopEvent = new CollectorStopEvent(this.collector, this.time);
        cstopEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).cancel("SNAPSHOT_EVENT");
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).cancel("CACHE_BUILD_EVENT");
        ((ZKTopicMetadataCollector)Mockito.verify((Object)this.collector)).setCollectorContext((Optional)contextArgumentCaptor.capture());
        Assertions.assertFalse((boolean)((Optional)contextArgumentCaptor.getValue()).isPresent());
    }

    @Test
    void testCacheBuildEventWithoutRemaining() throws Exception {
        this.context = new ZKTopicMetadataCollectorContext(this.config, Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
        CacheBuildEvent cbEvent = new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time);
        cbEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).scheduleDeferred((String)ArgumentMatchers.eq((Object)"SNAPSHOT_EVENT"), (Function)ArgumentMatchers.any(), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
    }

    @Test
    void testCacheBuildEventWithRemaining() throws Exception {
        this.createInitialTopics();
        CacheBuildEvent cbEvent = new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time);
        cbEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).appendWithTag((String)ArgumentMatchers.eq((Object)"CACHE_BUILD_EVENT"), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        Assertions.assertEquals((int)1, (int)this.context.localStore().logicalClusters().size());
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertFalse((boolean)this.context.cacheInitialized());
    }

    @Test
    void testSnapshotEvent() throws Exception {
        this.createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        snapshotEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).scheduleDeferred((String)ArgumentMatchers.eq((Object)"SNAPSHOT_EVENT"), (Function)ArgumentMatchers.any(), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)2))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        List<String> resultTopics = Arrays.asList(this.topic1, this.topic2);
        for (int i = 0; i < events.size(); ++i) {
            Assertions.assertEquals((Object)"TOPIC_SNAPSHOT", (Object)((Event)events.get(i)).type());
            Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topics", this.tenant), (Object)((Event)events.get(i)).source().toString());
            Assertions.assertEquals((Object)String.valueOf(i), (Object)((Event)events.get(i)).extension("page"));
            MetadataChange snapshot = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(i)).data().get()));
            Assertions.assertEquals((Object)resultTopics.get(i), (Object)snapshot.getEvents(0).getTopicMetadata().getTopicName(), (String)"Topics are not sorted.");
        }
    }

    @Test
    void testSnapshotEventWithDifferentConfig() throws Exception {
        Mockito.when((Object)this.mockEmitter.emit((Event)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        this.config = new ZKTopicMetadataCollectorConfig(0, 1, 10, 10, 1, "destination");
        this.createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        snapshotEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        MetadataChange snapshot = MetadataChange.parseFrom((byte[])((byte[])((Event)this.emittedEvent.getValue()).data().get()));
        List<String> resultTopics = Arrays.asList(this.topic1, this.topic2);
        Assertions.assertEquals((int)2, (int)snapshot.getEventsList().size(), (String)"Snapshot contains less topic than expected");
        for (int i = 0; i < snapshot.getEventsList().size(); ++i) {
            Assertions.assertEquals((Object)resultTopics.get(i), (Object)snapshot.getEvents(i).getTopicMetadata().getTopicName(), (String)"Topics are not sorted.");
        }
    }

    @Test
    void testSnapshotEmittedDelayMetrics() throws Exception {
        MetricName snapshotEmittedDelayMetric = this.metrics.metricName("snapshot-emitting-delay-ms", "catalog-metrics", "Delay between each snapshot emitted (include snapshot taken time) in milliseconds.");
        this.createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        this.time.sleep(200L);
        snapshotEvent.run();
        Assertions.assertEquals((Object)200L, (Object)this.metrics.metric(snapshotEmittedDelayMetric).metricValue(), (String)"The snapshot emitted delay is not the actual delay");
        snapshotEvent = new SnapshotEvent(this.collector, this.time);
        this.time.sleep(300L);
        snapshotEvent.run();
        Assertions.assertEquals((Object)300L, (Object)this.metrics.metric(snapshotEmittedDelayMetric).metricValue(), (String)"The snapshot emitted delay is not the actual delay");
    }

    @Test
    void testTopicConfigChange() throws Exception {
        this.createInitialTopics();
        LogConfig newConfigForTopic1 = (LogConfig)Mockito.mock(LogConfig.class);
        Mockito.when((Object)newConfigForTopic1.getBoolean((String)ArgumentMatchers.eq((Object)LogConfig.KeySchemaValidationEnableProp()))).thenReturn((Object)true);
        TopicConfigChangeEvent tccEvent = new TopicConfigChangeEvent(this.collector, this.tenant, this.fullTopic1, newConfigForTopic1, this.time);
        tccEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"TOPIC_DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        Assertions.assertTrue((boolean)incrementalChange.getEvents(0).getTopicMetadata().getKeySchemaValidation());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
    }

    @Test
    void testTopicPartitionChange() throws Exception {
        this.createInitialTopics();
        TopicPartitionChangeEvent tpcEvent = new TopicPartitionChangeEvent(this.collector, this.tenant, this.fullTopic1, 3, this.time);
        tpcEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"TOPIC_DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
    }

    @Test
    void testTopicLifecycleChange() throws Exception {
        this.createInitialTopics();
        String tenant2 = "lkc-efg";
        String topic3 = "other-topic";
        String fullTopic3 = tenant2 + '_' + topic3;
        HashMap<String, Object> configMap = new HashMap<String, Object>();
        configMap.put(fullTopic3, Mockito.mock(LogConfig.class));
        Mockito.when((Object)this.zkClient.getLogConfigs((Set)ArgumentMatchers.any(), (Map)ArgumentMatchers.any())).thenReturn((Object)new Tuple2((Object)JavaConverters.mapAsScalaMap(configMap), (Object)JavaConverters.mapAsScalaMap(Collections.emptyMap())));
        TopicCreationEvent tcEvent = new TopicCreationEvent(this.collector, Collections.singletonMap(fullTopic3, new TopicInfo(fullTopic3, Uuid.randomUuid(), 3, 3)), this.time);
        tcEvent.run();
        TopicDeletionEvent tdEvent = new TopicDeletionEvent(this.collector, new HashSet<String>(Arrays.asList(this.fullTopic1)), this.time);
        tdEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)2))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        Assertions.assertEquals((Object)"TOPIC_DELTA", (Object)((Event)events.get(0)).type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", tenant2, topic3), (Object)((Event)events.get(0)).source().toString());
        Assertions.assertTrue((boolean)((Event)events.get(0)).data().isPresent());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(0)).data().get()));
        Assertions.assertEquals((Object)OpType.CREATE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getReplicationFactor());
        Assertions.assertEquals((Object)"TOPIC_DELTA", (Object)((Event)events.get(1)).type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)((Event)events.get(1)).source().toString());
        Assertions.assertTrue((boolean)((Event)events.get(1)).data().isPresent());
        incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(1)).data().get()));
        Assertions.assertEquals((Object)OpType.DELETE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((Object)this.topic1, (Object)incrementalChange.getEvents(0).getTopicMetadata().getTopicName());
        Assertions.assertEquals((Object)this.topicId1.toString(), (Object)incrementalChange.getEvents(0).getTopicMetadata().getTopicId());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(tenant2));
        Assertions.assertTrue((boolean)this.context.localStore().topics(tenant2).contains(fullTopic3));
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
    }

    @Test
    public void testDeletingTopicNotInCache() throws Exception {
        this.createContext(Collections.emptyMap());
        TopicDeletionEvent tdEvent = new TopicDeletionEvent(this.collector, new HashSet<String>(Arrays.asList(this.fullTopic1)), this.time);
        tdEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)1))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        Assertions.assertEquals((Object)"TOPIC_DELTA", (Object)((Event)events.get(0)).type());
        Assertions.assertTrue((boolean)((Event)events.get(0)).data().isPresent());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(0)).data().get()));
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)((Event)events.get(0)).source().toString());
        Assertions.assertEquals((Object)OpType.DELETE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((Object)this.topic1, (Object)incrementalChange.getEvents(0).getTopicMetadata().getTopicName());
        Assertions.assertTrue((boolean)incrementalChange.getEvents(0).getTopicMetadata().getTopicId().isEmpty());
    }
}

