/*
 * Decompiled with CFR 0.152.
 */
package kafka.catalog;

import io.confluent.kafka.link.ClusterLinkConfig;
import io.confluent.protobuf.events.catalog.v1.ClusterLinkMetadata;
import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.MirrorTopicMetadata;
import io.confluent.protobuf.events.catalog.v1.OpType;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.api.events.EventEmitter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import kafka.catalog.MetadataCollectorEventQueue;
import kafka.catalog.ZKMetadataCollector;
import kafka.catalog.ZKMetadataCollectorConfig;
import kafka.catalog.ZKMetadataCollectorContext;
import kafka.catalog.event.BrokerDefaultConfigChangeEvent;
import kafka.catalog.event.CacheBuildEvent;
import kafka.catalog.event.ClusterLinkConfigChangeEvent;
import kafka.catalog.event.ClusterLinkCreationEvent;
import kafka.catalog.event.ClusterLinkDeletionEvent;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.MirrorTopicChangeEvent;
import kafka.catalog.event.SnapshotEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.catalog.metadata.ClusterLinkInfo;
import kafka.catalog.metadata.MirrorTopicInfo;
import kafka.catalog.metadata.TopicInfo;
import kafka.controller.ReplicaAssignment;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ConnectionMode;
import kafka.utils.CoreUtils;
import kafka.zk.KafkaZkClient;
import kafka.zk.TopicZNode;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.Set;

public class MetadataCollectorEventTest {
    private ZKMetadataCollectorContext context;
    private ZKMetadataCollectorConfig config;
    private ArgumentCaptor<MetadataCollectorEvent> submittedEvent;
    private ArgumentCaptor<Event> emittedEvent;
    private Time time;
    private Metrics metrics;
    private KafkaConfig kafkaConfig;
    String tenant = "lkc-abc";
    String topic1 = "topic1";
    String topic2 = "topic2";
    String link1 = "link1";
    String link2 = "link2";
    String fullTopic1 = this.tenant + '_' + this.topic1;
    String fullTopic2 = this.tenant + '_' + this.topic2;
    String fullLink1 = this.tenant + "_" + this.link1;
    String fullLink2 = this.tenant + "_" + this.link2;
    Uuid topicId1 = Uuid.randomUuid();
    Uuid topicId2 = Uuid.randomUuid();
    Uuid linkId1 = Uuid.randomUuid();
    Uuid linkId2 = Uuid.randomUuid();
    final MetadataEvent topicEvent1 = MetadataEvent.newBuilder().setTopicMetadata(TopicMetadata.newBuilder().setTopicName(this.topic1).build()).build();
    final MetadataEvent topicEvent2 = MetadataEvent.newBuilder().setTopicMetadata(TopicMetadata.newBuilder().setTopicName(this.topic2).setRetentionMs(30000L).build()).build();
    final MetadataEvent clusterLinkEvent1 = MetadataEvent.newBuilder().setClusterLinkMetadata(ClusterLinkMetadata.newBuilder().setClusterLinkName(this.link1).setClusterLinkId(this.linkId1.toString()).build()).build();
    final MetadataEvent clusterLinkEvent2 = MetadataEvent.newBuilder().setClusterLinkMetadata(ClusterLinkMetadata.newBuilder().setClusterLinkName(this.link2).setClusterLinkId(this.linkId2.toString()).build()).build();
    String mirrorTopicStateActive = "ACTIVE";
    String mirrorTopicStatePaused = "PAUSED";
    String remoteClusterId = "remoteCluster";
    @Mock
    private ZKMetadataCollector collector;
    @Mock
    private MetadataCollectorEventQueue eventQueue;
    @Mock
    private EventEmitter mockEmitter;
    @Mock
    private KafkaZkClient zkClient;

    @BeforeEach
    void setup() {
        MockitoAnnotations.openMocks((Object)this);
        this.config = this.getZKMetadataCollectorConfig();
        this.time = new MockTime(0L, 0L, 0L);
        this.submittedEvent = ArgumentCaptor.forClass(MetadataCollectorEvent.class);
        this.emittedEvent = ArgumentCaptor.forClass(Event.class);
        this.metrics = (Metrics)Mockito.spy(Metrics.class);
        Mockito.when((Object)this.metrics.eventEmitter()).thenReturn((Object)this.mockEmitter);
        Mockito.when((Object)this.mockEmitter.emit((Event)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        this.kafkaConfig = KafkaConfig.fromProps((Properties)MetadataCollectorEventTest.defaultProperties());
    }

    public static Properties defaultProperties() {
        Properties defaultProperties = new Properties();
        defaultProperties.put("zookeeper.connect", "localhost:2181");
        return defaultProperties;
    }

    private ZKMetadataCollectorConfig getZKMetadataCollectorConfig() {
        return new ZKMetadataCollectorConfig(false, 0, 1, 500, 10, 1, "destination");
    }

    private java.util.Map<String, TopicInfo> initialTopics() {
        HashMap<String, TopicInfo> topicsWithInfo = new HashMap<String, TopicInfo>();
        topicsWithInfo.put(this.fullTopic1, new TopicInfo(this.fullTopic1, this.topicId1, 1, 1));
        topicsWithInfo.put(this.fullTopic2, new TopicInfo(this.fullTopic2, this.topicId2, 2, 2));
        Mockito.when((Object)this.zkClient.getEntityConfigs((String)ArgumentMatchers.eq((Object)"topics"), (String)ArgumentMatchers.eq((Object)this.fullTopic1))).thenReturn((Object)new Properties());
        Mockito.when((Object)this.zkClient.getEntityConfigs((String)ArgumentMatchers.eq((Object)"topics"), (String)ArgumentMatchers.eq((Object)this.fullTopic2))).thenReturn((Object)new Properties());
        return topicsWithInfo;
    }

    private java.util.Map<String, TopicInfo> initialMirrorTopics() {
        HashMap<String, TopicInfo> topicsWithInfo = new HashMap<String, TopicInfo>();
        MirrorTopicInfo mirrorTopicInfo1 = new MirrorTopicInfo(this.fullLink1, this.linkId1, this.fullTopic1, this.topicId1, this.mirrorTopicStateActive, this.remoteClusterId);
        MirrorTopicInfo mirrorTopicInfo2 = new MirrorTopicInfo(this.fullLink2, this.linkId2, this.fullTopic2, this.topicId2, this.mirrorTopicStatePaused, this.remoteClusterId);
        topicsWithInfo.put(this.fullTopic1, new TopicInfo(this.fullTopic1, this.topicId1, 1, 1, Optional.of(mirrorTopicInfo1)));
        topicsWithInfo.put(this.fullTopic2, new TopicInfo(this.fullTopic2, this.topicId2, 2, 2, Optional.of(mirrorTopicInfo2)));
        Mockito.when((Object)this.zkClient.getEntityConfigs((String)ArgumentMatchers.eq((Object)"topics"), (String)ArgumentMatchers.any())).thenReturn((Object)new Properties());
        return topicsWithInfo;
    }

    private java.util.Map<String, ClusterLinkInfo> initialClusterLinks() {
        HashMap<String, ClusterLinkInfo> clusterLinksWithInfo = new HashMap<String, ClusterLinkInfo>();
        clusterLinksWithInfo.put(this.fullLink1, new ClusterLinkInfo(this.fullLink1, this.linkId1, ClusterLinkConfig.LinkMode.fromString((String)"destination"), ConnectionMode.fromString((String)"outbound"), this.remoteClusterId));
        clusterLinksWithInfo.put(this.fullLink2, new ClusterLinkInfo(this.fullLink2, this.linkId2, ClusterLinkConfig.LinkMode.fromString((String)"source"), ConnectionMode.fromString((String)"inbound"), this.remoteClusterId));
        Properties props1 = new Properties();
        props1.put("bootstrap.servers", "localhost:1234");
        props1.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "foo");
        props1.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
        props1.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
        Properties props2 = new Properties();
        props2.put("bootstrap.servers", "localhost:4321");
        props2.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "bar");
        props2.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
        props2.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
        Mockito.when((Object)this.zkClient.getEntityConfigs("cluster-links", CoreUtils.toJavaUUID((Uuid)this.linkId1).toString())).thenReturn((Object)props1);
        Mockito.when((Object)this.zkClient.getEntityConfigs("cluster-links", CoreUtils.toJavaUUID((Uuid)this.linkId2).toString())).thenReturn((Object)props2);
        return clusterLinksWithInfo;
    }

    private void createInitialTopics() {
        this.createContext(this.initialTopics(), Collections.emptyMap());
    }

    private void createInitialClusterLinks() {
        this.createContext(Collections.emptyMap(), this.initialClusterLinks());
    }

    private void createInitialMirrorTopics() {
        this.createContext(this.initialMirrorTopics(), Collections.emptyMap());
    }

    private void createContext(java.util.Map<String, TopicInfo> topicsWithInfo, java.util.Map<String, ClusterLinkInfo> clusterLinksWithInfo) {
        this.context = new ZKMetadataCollectorContext(this.config, topicsWithInfo, clusterLinksWithInfo, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
    }

    private void createContextWithLocalStoreValue(ZKMetadataCollectorConfig config) {
        this.context = new ZKMetadataCollectorContext(config, Collections.emptyMap(), Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        this.context.localStore().addTopicMetadataEvent(this.tenant, this.fullTopic2, this.topicEvent2, Collections.singleton("retention.ms"), Collections.emptySet());
        this.context.localStore().addTopicMetadataEvent(this.tenant, this.fullTopic1, this.topicEvent1);
        this.context.localStore().addClusterLinkMetadataEvent(this.tenant, this.fullLink1, this.clusterLinkEvent1);
        this.context.localStore().addClusterLinkMetadataEvent(this.tenant, this.fullLink2, this.clusterLinkEvent2);
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
    }

    @Test
    void testCollectorStartupEvent() throws Exception {
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.empty());
        ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Optional.class);
        CollectorStartupEvent csEvent = new CollectorStartupEvent(this.collector, this.config, Collections.emptyMap(), Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        csEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).appendWithTag((String)ArgumentMatchers.eq((Object)"CACHE_BUILD_EVENT"), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        ((ZKMetadataCollector)Mockito.verify((Object)this.collector)).setCollectorContext((Optional)contextArgumentCaptor.capture());
        Assertions.assertTrue((boolean)((Optional)contextArgumentCaptor.getValue()).isPresent());
    }

    @Test
    void testCollectorStopEvent() throws Exception {
        this.createInitialTopics();
        ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Optional.class);
        CollectorStopEvent cstopEvent = new CollectorStopEvent(this.collector, this.time);
        cstopEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).cancel("SNAPSHOT_EVENT");
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).cancel("CACHE_BUILD_EVENT");
        ((ZKMetadataCollector)Mockito.verify((Object)this.collector)).setCollectorContext((Optional)contextArgumentCaptor.capture());
        Assertions.assertFalse((boolean)((Optional)contextArgumentCaptor.getValue()).isPresent());
    }

    @Test
    void testCacheBuildEventWithoutRemaining() throws Exception {
        this.context = new ZKMetadataCollectorContext(this.config, Collections.emptyMap(), Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
        CacheBuildEvent cbEvent = new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time);
        cbEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).scheduleDeferred((String)ArgumentMatchers.eq((Object)"SNAPSHOT_EVENT"), (Function)ArgumentMatchers.any(), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
    }

    @Test
    void testCacheBuildEventWithBadEntities() throws Exception {
        HashMap<String, TopicInfo> topicsWithInfo = new HashMap<String, TopicInfo>();
        String badTopic = "";
        String fullBadTopic = this.tenant + '_' + badTopic;
        topicsWithInfo.put(fullBadTopic, new TopicInfo(fullBadTopic, this.topicId1, 1, 1));
        topicsWithInfo.put(this.fullTopic2, new TopicInfo(this.fullTopic2, this.topicId2, 1, 1));
        Mockito.when((Object)this.zkClient.getEntityConfigs((String)ArgumentMatchers.eq((Object)"topics"), (String)ArgumentMatchers.any())).thenReturn((Object)new Properties());
        HashMap<String, ClusterLinkInfo> clusterLinksWithInfo = new HashMap<String, ClusterLinkInfo>();
        String badLink = "";
        String fullBadLink = this.tenant + '_' + badLink;
        Uuid badLinkId = Uuid.randomUuid();
        clusterLinksWithInfo.put(fullBadLink, new ClusterLinkInfo(fullBadLink, badLinkId, ClusterLinkConfig.LinkMode.fromString((String)"destination"), ConnectionMode.fromString((String)"outbound"), this.remoteClusterId));
        clusterLinksWithInfo.put(this.fullLink2, new ClusterLinkInfo(this.fullLink2, this.linkId2, ClusterLinkConfig.LinkMode.fromString((String)"destination"), ConnectionMode.fromString((String)"outbound"), this.remoteClusterId));
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:1234");
        props.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "foo");
        props.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
        props.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
        Mockito.when((Object)this.zkClient.getEntityConfigs("cluster-links", CoreUtils.toJavaUUID((Uuid)this.linkId2).toString())).thenReturn((Object)props);
        Mockito.when((Object)this.zkClient.getEntityConfigs("cluster-links", CoreUtils.toJavaUUID((Uuid)badLinkId).toString())).thenReturn((Object)props);
        this.createContext(topicsWithInfo, clusterLinksWithInfo);
        CacheBuildEvent cbEvent = new CacheBuildEvent(this.collector, 10, this.time);
        cbEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).scheduleDeferred((String)ArgumentMatchers.eq((Object)"SNAPSHOT_EVENT"), (Function)ArgumentMatchers.any(), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        Assertions.assertEquals((int)1, (int)this.context.localStore().logicalClusters().size());
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertTrue((boolean)this.context.localStore().topics(this.tenant).contains(this.fullTopic2));
        Assertions.assertEquals((int)1, (int)this.context.localStore().clusterLinks(this.tenant).size());
        Assertions.assertTrue((boolean)this.context.localStore().clusterLinks(this.tenant).contains(this.fullLink2));
    }

    @Test
    void testCacheBuildEventWithTopicsWithRemaining() throws Exception {
        this.createInitialTopics();
        CacheBuildEvent cbEvent = new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time);
        cbEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).appendWithTag((String)ArgumentMatchers.eq((Object)"CACHE_BUILD_EVENT"), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        Assertions.assertEquals((int)1, (int)this.context.localStore().logicalClusters().size());
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertFalse((boolean)this.context.cacheInitialized());
    }

    @Test
    void testCacheBuildEventWithClusterLinksWithRemaining() throws Exception {
        this.createInitialClusterLinks();
        CacheBuildEvent cbEvent = new CacheBuildEvent(this.collector, this.config.maxNumTopicsProcess, this.time);
        cbEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).appendWithTag((String)ArgumentMatchers.eq((Object)"CACHE_BUILD_EVENT"), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(CacheBuildEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        Assertions.assertEquals((int)1, (int)this.context.localStore().logicalClusters().size());
        Assertions.assertEquals((int)1, (int)this.context.localStore().clusterLinks(this.tenant).size());
        Assertions.assertFalse((boolean)this.context.cacheInitialized());
    }

    @Test
    void testSnapshotEvent() throws Exception {
        this.createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        snapshotEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).scheduleDeferred((String)ArgumentMatchers.eq((Object)"SNAPSHOT_EVENT"), (Function)ArgumentMatchers.any(), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        Event event = (Event)this.emittedEvent.getValue();
        Assertions.assertEquals((Object)"SNAPSHOT", (Object)event.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topics-and-cluster-links", this.tenant), (Object)event.source().toString());
        Assertions.assertEquals((Object)"0", (Object)event.extension("page"));
        Assertions.assertEquals((Object)"true", (Object)event.extension("lastpage"));
        Assertions.assertNotNull((Object)event.extension("snapshotid"));
        MetadataChange snapshot = MetadataChange.parseFrom((byte[])((byte[])event.data().get()));
        List<String> resultEntities = Arrays.asList(this.topic1, this.topic2, this.link1, this.link2);
        Assertions.assertEquals((int)4, (int)snapshot.getEventsList().size(), (String)"Snapshot contains less entities than expected");
        for (int i = 0; i < snapshot.getEventsList().size(); ++i) {
            if (i < 2) {
                Assertions.assertEquals((Object)resultEntities.get(i), (Object)snapshot.getEvents(i).getTopicMetadata().getTopicName(), (String)"Topics are not sorted.");
                continue;
            }
            Assertions.assertEquals((Object)resultEntities.get(i), (Object)snapshot.getEvents(i).getClusterLinkMetadata().getClusterLinkName(), (String)"Cluster links are not sorted.");
        }
    }

    @Test
    void testSnapshotEventWithSmallerMaxBytesInSnapshot() throws Exception {
        Mockito.when((Object)this.mockEmitter.emit((Event)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(true));
        this.config = new ZKMetadataCollectorConfig(false, 0, 1, this.topicEvent1.getSerializedSize() + this.topicEvent2.getSerializedSize() + 1, 10, 1, "destination");
        this.createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        snapshotEvent.run();
        ((MetadataCollectorEventQueue)Mockito.verify((Object)this.eventQueue)).scheduleDeferred((String)ArgumentMatchers.eq((Object)"SNAPSHOT_EVENT"), (Function)ArgumentMatchers.any(), (MetadataCollectorEvent)this.submittedEvent.capture());
        Assertions.assertNotNull((Object)this.submittedEvent.getValue());
        Assertions.assertEquals(SnapshotEvent.class, ((MetadataCollectorEvent)this.submittedEvent.getValue()).getClass());
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)4))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        List<String> resultEntities = Arrays.asList(this.topic1, this.topic2, this.link1, this.link2);
        for (int i = 0; i < events.size(); ++i) {
            Assertions.assertEquals((Object)"SNAPSHOT", (Object)((Event)events.get(i)).type());
            Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topics-and-cluster-links", this.tenant), (Object)((Event)events.get(i)).source().toString());
            Assertions.assertEquals((Object)String.valueOf(i), (Object)((Event)events.get(i)).extension("page"));
            Assertions.assertEquals((Object)((Event)events.get(0)).extension("snapshotid"), (Object)((Event)events.get(i)).extension("snapshotid"));
            if (i < events.size() - 1) {
                Assertions.assertEquals((Object)"false", (Object)((Event)events.get(i)).extension("lastpage"));
            } else {
                Assertions.assertEquals((Object)"true", (Object)((Event)events.get(i)).extension("lastpage"));
            }
            MetadataChange snapshot = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(i)).data().get()));
            if (i < 2) {
                Assertions.assertEquals((Object)resultEntities.get(i), (Object)snapshot.getEvents(0).getTopicMetadata().getTopicName(), (String)"Topics are not sorted.");
                continue;
            }
            Assertions.assertEquals((Object)resultEntities.get(i), (Object)snapshot.getEvents(0).getClusterLinkMetadata().getClusterLinkName(), (String)"Cluster links are not sorted.");
        }
    }

    @Test
    void testSnapshotEmittedDelayMetrics() throws Exception {
        MetricName snapshotEmittedDelayMetric = this.metrics.metricName("snapshot-emitting-delay-ms", "catalog-metrics", "Delay between each snapshot emitted (include snapshot taken time) in milliseconds.");
        this.createContextWithLocalStoreValue(this.config);
        SnapshotEvent snapshotEvent = new SnapshotEvent(this.collector, this.time);
        this.time.sleep(200L);
        snapshotEvent.run();
        Assertions.assertEquals((Object)200L, (Object)this.metrics.metric(snapshotEmittedDelayMetric).metricValue(), (String)"The snapshot emitted delay is not the actual delay");
        snapshotEvent = new SnapshotEvent(this.collector, this.time);
        this.time.sleep(300L);
        snapshotEvent.run();
        Assertions.assertEquals((Object)300L, (Object)this.metrics.metric(snapshotEmittedDelayMetric).metricValue(), (String)"The snapshot emitted delay is not the actual delay");
    }

    @Test
    void testClusterLinkConfigChange() throws Exception {
        this.createInitialClusterLinks();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:5678");
        props.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "bar");
        props.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
        props.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
        ClusterLinkConfig config = ClusterLinkConfig.create((java.util.Map)props, (Option)Option.apply(null), (boolean)false);
        ClusterLinkConfigChangeEvent clChangeEvent = new ClusterLinkConfigChangeEvent(this.collector, this.tenant, this.fullLink1, config, this.time);
        clChangeEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/cluster-link=%s", this.tenant, this.link1), (Object)result.source().toString());
    }

    @Test
    void testTopicConfigChange() throws Exception {
        this.createInitialTopics();
        Properties updateProperties = new Properties();
        updateProperties.put("confluent.key.schema.validation", (Object)true);
        LogConfig newConfigForTopic1 = new LogConfig((java.util.Map)updateProperties, Collections.singleton("confluent.key.schema.validation"));
        TopicConfigChangeEvent tccEvent = new TopicConfigChangeEvent(this.collector, this.tenant, this.fullTopic1, newConfigForTopic1, updateProperties, this.time);
        tccEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        Assertions.assertTrue((boolean)incrementalChange.getEvents(0).getTopicMetadata().getKeySchemaValidation());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertTrue((boolean)this.context.localStore().topicConfigOverrides(this.fullTopic1).contains("confluent.key.schema.validation"));
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
        Mockito.clearInvocations((Object[])new EventEmitter[]{this.mockEmitter});
        updateProperties = new Properties();
        updateProperties.put("confluent.key.schema.validation", (Object)false);
        newConfigForTopic1 = new LogConfig((java.util.Map)updateProperties);
        tccEvent = new TopicConfigChangeEvent(this.collector, this.tenant, this.fullTopic1, newConfigForTopic1, updateProperties, this.time);
        tccEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        Assertions.assertFalse((boolean)incrementalChange.getEvents(0).getTopicMetadata().getKeySchemaValidation());
        Assertions.assertFalse((boolean)this.context.localStore().topicConfigOverrides(this.fullTopic1).contains("confluent.key.schema.validation"));
    }

    @Test
    void testMirrorTopicChange() throws Exception {
        this.createInitialMirrorTopics();
        String mirrorTopicStateActive = "ACTIVE";
        String mirrorTopicStateFailed = "FAILED";
        MirrorTopicChangeEvent mtcEvent = new MirrorTopicChangeEvent(this.collector, this.tenant, this.fullTopic1, mirrorTopicStateFailed, this.time);
        mtcEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        MirrorTopicMetadata mirrorTopicMetadata = incrementalChange.getEvents(0).getTopicMetadata().getMirrorTopicMetadata();
        Assertions.assertNotNull((Object)mirrorTopicMetadata);
        Assertions.assertEquals((Object)mirrorTopicStateFailed, (Object)mirrorTopicMetadata.getMirrorTopicState());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
        Mockito.clearInvocations((Object[])new EventEmitter[]{this.mockEmitter});
        mtcEvent = new MirrorTopicChangeEvent(this.collector, this.tenant, this.fullTopic2, mirrorTopicStateActive, this.time);
        mtcEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic2), (Object)result.source().toString());
        incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        mirrorTopicMetadata = incrementalChange.getEvents(0).getTopicMetadata().getMirrorTopicMetadata();
        Assertions.assertNotNull((Object)mirrorTopicMetadata);
        Assertions.assertEquals((Object)mirrorTopicStateActive, (Object)mirrorTopicMetadata.getMirrorTopicState());
        Assertions.assertEquals((int)2, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic2));
    }

    @Test
    void testNonCatalogTopicConfigChange() throws Exception {
        this.createInitialTopics();
        Properties updateProperties = new Properties();
        updateProperties.put("confluent.tier.segment.hotset.roll.min.bytes", (Object)5);
        LogConfig newConfigForTopic1 = new LogConfig((java.util.Map)updateProperties, Collections.singleton("confluent.tier.segment.hotset.roll.min.bytes"));
        TopicConfigChangeEvent tccEvent = new TopicConfigChangeEvent(this.collector, this.tenant, this.fullTopic1, newConfigForTopic1, updateProperties, this.time);
        tccEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.never())).emit((Event)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)this.context.localStore().topicConfigOverrides(this.fullTopic1).isEmpty());
    }

    @Test
    void testBrokerDefaultConfigChange() throws Exception {
        this.createContextWithLocalStoreValue(this.config);
        Properties oldProp = MetadataCollectorEventTest.defaultProperties();
        oldProp.put(KafkaConfig.LogRetentionTimeMillisProp(), "30000");
        KafkaConfig oldConfig = KafkaConfig.fromProps((Properties)oldProp);
        Properties newProp = MetadataCollectorEventTest.defaultProperties();
        newProp.put(KafkaConfig.LogRetentionTimeMillisProp(), "20000");
        KafkaConfig newConfig = KafkaConfig.fromProps((Properties)newProp);
        BrokerDefaultConfigChangeEvent bdccEvent = new BrokerDefaultConfigChangeEvent(this.collector, oldConfig, newConfig, this.time);
        bdccEvent.run();
        Assertions.assertEquals((long)20000L, (long)this.context.localStore().topicMetadataEvent(this.fullTopic1).getTopicMetadata().getRetentionMs());
        Assertions.assertEquals((long)30000L, (long)this.context.localStore().topicMetadataEvent(this.fullTopic2).getTopicMetadata().getRetentionMs());
    }

    @Test
    void testPropagateBrokerConfigChange() {
        this.createContextWithLocalStoreValue(this.config);
        Properties oldProp = MetadataCollectorEventTest.defaultProperties();
        Properties newProp = MetadataCollectorEventTest.defaultProperties();
        oldProp.put(KafkaConfig.LogRetentionTimeMillisProp(), "10000");
        KafkaConfig oldConfig = KafkaConfig.fromProps((Properties)oldProp);
        KafkaConfig newConfig = KafkaConfig.fromProps((Properties)newProp);
        TopicMetadata.Builder topic2Builder = TopicMetadata.newBuilder().mergeFrom(this.context.localStore().topicMetadataEvent(this.fullTopic2).getTopicMetadata());
        Assertions.assertFalse((boolean)BrokerDefaultConfigChangeEvent.propagateBrokerConfigChange((ZKMetadataCollectorContext)this.context, (KafkaConfig)oldConfig, (KafkaConfig)newConfig, (String)this.fullTopic2, (TopicMetadata.Builder)topic2Builder));
        Assertions.assertEquals((long)30000L, (long)topic2Builder.getRetentionMs(), (String)"Broker config should not override topic config");
        oldProp = MetadataCollectorEventTest.defaultProperties();
        newProp = MetadataCollectorEventTest.defaultProperties();
        oldProp.put(KafkaConfig.LogCleanupPolicyProp(), Collections.EMPTY_LIST);
        newProp.put(KafkaConfig.LogCleanupPolicyProp(), Collections.singletonList("delete"));
        oldConfig = KafkaConfig.fromProps((Properties)oldProp);
        newConfig = KafkaConfig.fromProps((Properties)newProp);
        Assertions.assertTrue((boolean)BrokerDefaultConfigChangeEvent.propagateBrokerConfigChange((ZKMetadataCollectorContext)this.context, (KafkaConfig)oldConfig, (KafkaConfig)newConfig, (String)this.fullTopic2, (TopicMetadata.Builder)topic2Builder));
        Assertions.assertEquals((Object)TopicMetadata.CleanupPolicy.DELETE, (Object)topic2Builder.getCleanupPolicy(), (String)"Broker config should propagate");
        oldProp = MetadataCollectorEventTest.defaultProperties();
        newProp = MetadataCollectorEventTest.defaultProperties();
        newProp.put(KafkaConfig.LogSegmentBytesProp(), "20000");
        oldConfig = KafkaConfig.fromProps((Properties)oldProp);
        newConfig = KafkaConfig.fromProps((Properties)newProp);
        Assertions.assertFalse((boolean)BrokerDefaultConfigChangeEvent.propagateBrokerConfigChange((ZKMetadataCollectorContext)this.context, (KafkaConfig)oldConfig, (KafkaConfig)newConfig, (String)this.fullTopic2, (TopicMetadata.Builder)topic2Builder), (String)"Additional configs does not propagate");
    }

    @Test
    void testPropagateAdditionalBrokerConfigChange() {
        this.config = new ZKMetadataCollectorConfig(true, 0, 1, 20, 10, 1, "destination");
        this.createContextWithLocalStoreValue(this.config);
        Properties oldProp = MetadataCollectorEventTest.defaultProperties();
        Properties newProp = MetadataCollectorEventTest.defaultProperties();
        newProp.put(KafkaConfig.LogSegmentBytesProp(), "20000");
        KafkaConfig oldConfig = KafkaConfig.fromProps((Properties)oldProp);
        KafkaConfig newConfig = KafkaConfig.fromProps((Properties)newProp);
        TopicMetadata.Builder topic2Builder = TopicMetadata.newBuilder().mergeFrom(this.context.localStore().topicMetadataEvent(this.fullTopic2).getTopicMetadata());
        Assertions.assertTrue((boolean)BrokerDefaultConfigChangeEvent.propagateBrokerConfigChange((ZKMetadataCollectorContext)this.context, (KafkaConfig)oldConfig, (KafkaConfig)newConfig, (String)this.fullTopic2, (TopicMetadata.Builder)topic2Builder));
        Assertions.assertEquals((int)20000, (int)topic2Builder.getSegmentBytes(), (String)"Additional configs should propagate");
    }

    @Test
    void testTopicPartitionChange() throws Exception {
        this.createInitialTopics();
        TopicPartitionChangeEvent tpcEvent = new TopicPartitionChangeEvent(this.collector, this.tenant, this.fullTopic1, 3, this.time);
        tpcEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
    }

    @Test
    void testTopicPartitionChangeWhenReplicaUnderAssignment() throws Exception {
        this.context = new ZKMetadataCollectorContext(this.config, Collections.emptyMap(), Collections.emptyMap(), this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, 0, this.time);
        Mockito.when((Object)this.collector.collectorContext()).thenReturn(Optional.of(this.context));
        this.setupZkClientMock();
        TopicPartitionChangeEvent tpcEvent = new TopicPartitionChangeEvent(this.collector, this.tenant, this.fullTopic1, 3, this.time);
        tpcEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter)).emit((Event)this.emittedEvent.capture());
        Event result = (Event)this.emittedEvent.getValue();
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((Object)"DELTA", (Object)result.type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)result.source().toString());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])result.data().get()));
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getReplicationFactor());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(this.tenant));
        Assertions.assertEquals((int)1, (int)this.context.localStore().topics(this.tenant).size());
    }

    private void setupZkClientMock() {
        List<Object> replicas = Arrays.asList(1, 2, 3, 4, 5, 6);
        List<Object> addingReplicas = Arrays.asList(2, 3, 4);
        List<Object> removingReplicas = Arrays.asList(1, 5, 6);
        ReplicaAssignment replicaAssignment = new ReplicaAssignment((Seq)JavaConverters.asScalaBuffer(replicas).toSeq(), (Seq)JavaConverters.asScalaBuffer(addingReplicas).toSeq(), (Seq)JavaConverters.asScalaBuffer(removingReplicas).toSeq(), (Seq)JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq(), Option.empty());
        HashMap<TopicPartition, ReplicaAssignment> assignment = new HashMap<TopicPartition, ReplicaAssignment>();
        for (int i = 1; i <= 3; ++i) {
            assignment.put(new TopicPartition(this.fullTopic1, i), replicaAssignment);
        }
        TopicZNode.TopicIdReplicaAssignment topicIdReplicaAssignment = new TopicZNode.TopicIdReplicaAssignment(this.fullTopic1, Option.apply((Object)Uuid.randomUuid()), Option.empty(), (Map)JavaConverters.mapAsScalaMap(assignment), Option.empty());
        Set topicIdReplicaAssignmentSet = JavaConverters.asScalaSet(Collections.singleton(topicIdReplicaAssignment)).toSet();
        Mockito.when((Object)this.zkClient.getReplicaAssignmentAndTopicIdForTopics((Set)ArgumentMatchers.any())).thenReturn((Object)topicIdReplicaAssignmentSet);
        Mockito.when((Object)this.zkClient.getEntityConfigs((String)ArgumentMatchers.eq((Object)"topics"), (String)ArgumentMatchers.eq((Object)this.fullTopic1))).thenReturn((Object)new Properties());
        Mockito.when((Object)this.zkClient.getClusterLinkForTopic((String)ArgumentMatchers.any())).thenReturn((Object)Option.empty());
    }

    @Test
    void testClusterLinkLifecycleChange() throws Exception {
        this.createInitialClusterLinks();
        String tenant2 = "lkc-dfg";
        String link3 = "other-link";
        Uuid linkId3 = Uuid.randomUuid();
        String fullLink3 = tenant2 + "_" + link3;
        Properties props3 = new Properties();
        props3.put("bootstrap.servers", "localhost:5678");
        props3.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "bar");
        props3.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
        props3.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
        Mockito.when((Object)this.zkClient.getEntityConfigs("cluster-links", CoreUtils.toJavaUUID((Uuid)linkId3).toString())).thenReturn((Object)props3);
        ClusterLinkCreationEvent clCreationEvent = new ClusterLinkCreationEvent(this.collector, Collections.singletonMap(fullLink3, new ClusterLinkInfo(fullLink3, linkId3, ClusterLinkConfig.LinkMode.fromString((String)"source"), ConnectionMode.fromString((String)"outbound"), this.remoteClusterId)), this.time);
        clCreationEvent.run();
        ClusterLinkDeletionEvent clDeletionEvent = new ClusterLinkDeletionEvent(this.collector, Collections.singleton(this.fullLink1), this.time);
        clDeletionEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)2))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        Assertions.assertEquals((Object)"DELTA", (Object)((Event)events.get(0)).type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/cluster-link=%s", tenant2, link3), (Object)((Event)events.get(0)).source().toString());
        Assertions.assertTrue((boolean)((Event)events.get(0)).data().isPresent());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(0)).data().get()));
        Assertions.assertEquals((Object)OpType.CREATE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((Object)linkId3.toString(), (Object)incrementalChange.getEvents(0).getClusterLinkMetadata().getClusterLinkId());
        Assertions.assertTrue((boolean)incrementalChange.getEvents(0).getClusterLinkMetadata().getLinkMode().equalsIgnoreCase("source"));
        Assertions.assertTrue((boolean)incrementalChange.getEvents(0).getClusterLinkMetadata().getConnectionMode().equalsIgnoreCase("outbound"));
        Assertions.assertEquals((Object)this.remoteClusterId, (Object)incrementalChange.getEvents(0).getClusterLinkMetadata().getRemoteClusterId());
        Assertions.assertEquals((Object)tenant2, (Object)incrementalChange.getEvents(0).getClusterLinkMetadata().getLocalClusterId());
        Assertions.assertEquals((Object)"DELTA", (Object)((Event)events.get(1)).type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/cluster-link=%s", this.tenant, this.link1), (Object)((Event)events.get(1)).source().toString());
        Assertions.assertTrue((boolean)((Event)events.get(1)).data().isPresent());
        incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(1)).data().get()));
        Assertions.assertEquals((Object)OpType.DELETE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((Object)this.link1, (Object)incrementalChange.getEvents(0).getClusterLinkMetadata().getClusterLinkName());
        Assertions.assertEquals((Object)this.linkId1.toString(), (Object)incrementalChange.getEvents(0).getClusterLinkMetadata().getClusterLinkId());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(tenant2));
        Assertions.assertTrue((boolean)this.context.localStore().clusterLinks(tenant2).contains(fullLink3));
        Assertions.assertNull((Object)this.context.clusterLinkInfo(this.fullLink1));
        Assertions.assertNotNull((Object)this.context.clusterLinkInfo(this.fullLink2));
    }

    @Test
    void testTopicLifecycleChange() throws Exception {
        this.createInitialTopics();
        String tenant2 = "lkc-efg";
        String topic3 = "other-topic";
        String fullTopic3 = tenant2 + '_' + topic3;
        Mockito.when((Object)this.zkClient.getEntityConfigs((String)ArgumentMatchers.eq((Object)"topics"), (String)ArgumentMatchers.any())).thenReturn((Object)new Properties());
        TopicCreationEvent tcEvent = new TopicCreationEvent(this.collector, Collections.singletonMap(fullTopic3, new TopicInfo(fullTopic3, Uuid.randomUuid(), 3, 3)), this.time);
        tcEvent.run();
        TopicDeletionEvent tdEvent = new TopicDeletionEvent(this.collector, new HashSet<String>(Arrays.asList(this.fullTopic1)), this.time);
        tdEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)2))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        Assertions.assertEquals((Object)"DELTA", (Object)((Event)events.get(0)).type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", tenant2, topic3), (Object)((Event)events.get(0)).source().toString());
        Assertions.assertTrue((boolean)((Event)events.get(0)).data().isPresent());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(0)).data().get()));
        Assertions.assertEquals((Object)OpType.CREATE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getPartitionsCount());
        Assertions.assertEquals((int)3, (int)incrementalChange.getEvents(0).getTopicMetadata().getReplicationFactor());
        Assertions.assertEquals((Object)"DELTA", (Object)((Event)events.get(1)).type());
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)((Event)events.get(1)).source().toString());
        Assertions.assertTrue((boolean)((Event)events.get(1)).data().isPresent());
        incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(1)).data().get()));
        Assertions.assertEquals((Object)OpType.DELETE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((Object)this.topic1, (Object)incrementalChange.getEvents(0).getTopicMetadata().getTopicName());
        Assertions.assertEquals((Object)this.topicId1.toString(), (Object)incrementalChange.getEvents(0).getTopicMetadata().getTopicId());
        Assertions.assertTrue((boolean)this.context.localStore().logicalClusters().contains(tenant2));
        Assertions.assertTrue((boolean)this.context.localStore().topics(tenant2).contains(fullTopic3));
        Assertions.assertNull((Object)this.context.topicInfo(this.fullTopic1));
        Assertions.assertNotNull((Object)this.context.topicInfo(this.fullTopic2));
    }

    @Test
    public void testDeletingTopicNotInCache() throws Exception {
        this.createContext(Collections.emptyMap(), Collections.emptyMap());
        TopicDeletionEvent tdEvent = new TopicDeletionEvent(this.collector, new HashSet<String>(Arrays.asList(this.fullTopic1)), this.time);
        tdEvent.run();
        ((EventEmitter)Mockito.verify((Object)this.mockEmitter, (VerificationMode)Mockito.times((int)1))).emit((Event)this.emittedEvent.capture());
        List events = this.emittedEvent.getAllValues();
        Assertions.assertEquals((Object)"DELTA", (Object)((Event)events.get(0)).type());
        Assertions.assertTrue((boolean)((Event)events.get(0)).data().isPresent());
        MetadataChange incrementalChange = MetadataChange.parseFrom((byte[])((byte[])((Event)events.get(0)).data().get()));
        Assertions.assertEquals((Object)String.format("crn://confluent.cloud/kafka=%s/topic=%s", this.tenant, this.topic1), (Object)((Event)events.get(0)).source().toString());
        Assertions.assertEquals((Object)OpType.DELETE, (Object)incrementalChange.getOp());
        Assertions.assertEquals((Object)this.topic1, (Object)incrementalChange.getEvents(0).getTopicMetadata().getTopicName());
        Assertions.assertTrue((boolean)incrementalChange.getEvents(0).getTopicMetadata().getTopicId().isEmpty());
    }
}

