/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import io.confluent.kafka.multitenant.LogicalClusterMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TopicBasedPhysicalClusterMetadataTest {
    private static final String SSL_CERTS_DIR = "mnt/sslcerts/";
    private static final String BROKER_ID = "0";
    private static final String TOPIC = "_confluent-logical_cluster";
    private MockAdminClient mockAdminClient;
    private MockTime time;
    private Metrics metrics;
    private String sslCertsPath;
    private Path tempDir;
    private KafkaBasedLog<String, byte[]> lkcLog;
    private TopicBasedPhysicalClusterMetadata metadataCache;
    private static final String LKC_ID_CHARS = "1234567890abcdefghijklmnopqrstuvwxyz-";
    private static final int LKC_ID_LENGTH = 9;

    @BeforeEach
    public void setUp() throws Exception {
        this.tempDir = TestUtils.tempDirectory().toPath();
        this.metrics = new Metrics();
        long mockClockStartMs = Utils.LC_META_DED.lifecycleMetadata().deletionDate().getTime() - 1L;
        this.time = new MockTime(0L, mockClockStartMs, System.nanoTime());
        this.metadataCache = new TopicBasedPhysicalClusterMetadata(this.metrics, (Time)this.time);
        Node node = new Node(0, "localhost", 9092);
        this.mockAdminClient = (MockAdminClient)Mockito.spy((Object)new MockAdminClient.Builder().brokers(Collections.singletonList(node)).controller(0).build());
        this.sslCertsPath = this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "spec.json";
        this.metadataCache.configure((ConfluentAdmin)this.mockAdminClient, BROKER_ID, this.sslCertsPath, 0L, 1L);
    }

    @AfterEach
    public void tearDown() {
        this.metadataCache.shutdown();
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long seqId, LogicalClusterMetadata value) {
        return this.createConsumerRecord(seqId, value, true);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long seqId, LogicalClusterMetadata value, boolean newSeqIdHeader) {
        return this.createConsumerRecord(seqId, value.logicalClusterId(), value, newSeqIdHeader);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long seqId, String key, LogicalClusterMetadata value) {
        return this.createConsumerRecord(seqId, key, value, true);
    }

    private ConsumerRecord<String, byte[]> createConsumerRecord(long seqId, String key, LogicalClusterMetadata value, boolean newSeqIdHeader) {
        RecordHeaders headers = KafkaTestUtils.createGoodSequenceIdRecordHeaders(seqId, newSeqIdHeader);
        byte[] serializedValue = Utils.protoFromMetadata(value).toByteArray();
        return new ConsumerRecord(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, (Object)key, (Object)serializedValue, (Headers)headers, Optional.empty());
    }

    private ConsumerRecord<String, byte[]> createEmptyConsumerRecord(long seqId, String key) {
        return this.createEmptyConsumerRecord(seqId, key, true);
    }

    private ConsumerRecord<String, byte[]> createEmptyConsumerRecord(long seqId, String key, boolean newSeqIdHeader) {
        RecordHeaders headers = KafkaTestUtils.createGoodSequenceIdRecordHeaders(seqId, newSeqIdHeader);
        return new ConsumerRecord(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, (Object)key, null, (Headers)headers, Optional.empty());
    }

    private String generateLkcId(Random random) {
        return random.ints(9L, 0, LKC_ID_CHARS.length()).collect(StringBuilder::new, (sb, i) -> sb.append(LKC_ID_CHARS.charAt(i)), StringBuilder::append).toString();
    }

    private LogicalClusterMetadata createLcm(String lkcId) {
        return new LogicalClusterMetadata(lkcId, "pkc-dontcare", lkcId, "my-account", "k8s-abc", "kafka", Long.valueOf(0x6400000L), Long.valueOf(512000L), Long.valueOf(1024000L), Long.valueOf(512000L), Long.valueOf(1024000L), Long.valueOf(1600L), LogicalClusterMetadata.DEFAULT_NETWORK_QUOTA_OVERHEAD_PERCENTAGE, new LogicalClusterMetadata.LifecycleMetadata("lkc-tenant1", "pkc-xyz", new Date(), null), Integer.valueOf(12000), "some-org", "some-env");
    }

    private List<ConsumerRecord<String, byte[]>> generateRecords(int count) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        HashSet<String> lkcIds = new HashSet<String>();
        for (int i = 0; i < count; ++i) {
            String key;
            while (lkcIds.contains(key = this.generateLkcId(random))) {
            }
            lkcIds.add(key);
        }
        return lkcIds.stream().map(id -> this.createConsumerRecord(1L, (String)id, this.createLcm((String)id), true)).collect(Collectors.toList());
    }

    @Test
    public void testOnlyCallExpensiveOperationsOnceOnStartup() {
        final List<ConsumerRecord<String, byte[]>> toConsume = this.generateRecords(1000);
        KafkaBasedLog mockedLog = (KafkaBasedLog)Mockito.mock(KafkaBasedLog.class);
        ((KafkaBasedLog)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                toConsume.forEach(r -> {
                    TopicBasedPhysicalClusterMetadataTest.this.metadataCache.consume(r);
                    Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)TopicBasedPhysicalClusterMetadataTest.this.metrics, (String)"number-of-tenants"));
                    Assertions.assertTrue((boolean)TopicBasedPhysicalClusterMetadataTest.this.metadataCache.kafkaLogicalClusterIds().isEmpty(), (String)"Id list only updated at the end");
                });
                return null;
            }
        }).when((Object)mockedLog)).start();
        this.metadataCache.start(mockedLog);
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        Assertions.assertEquals((int)1000, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertEquals(toConsume.stream().map(r -> (String)r.key()).sorted().collect(Collectors.toList()), this.metadataCache.kafkaLogicalClusterIds().stream().sorted().collect(Collectors.toList()));
        this.metadataCache.consume(this.createConsumerRecord(2L, Utils.LC_META_ABC));
        Assertions.assertEquals((int)1001, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertTrue((boolean)this.metadataCache.kafkaLogicalClusterIds().contains(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createConsumerRecord(3L, Utils.LC_META_XYZ));
        Assertions.assertEquals((int)1002, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertTrue((boolean)this.metadataCache.kafkaLogicalClusterIds().contains(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testKafkaLogicalClusterId() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet<String> lkcIds = new HashSet<String>();
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_HEALTHCHECK));
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(this.createConsumerRecord(2L, Utils.LC_META_ABC));
        lkcIds.add(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(this.createConsumerRecord(3L, Utils.LC_META_XYZ));
        lkcIds.add(Utils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(this.createEmptyConsumerRecord(4L, Utils.LC_META_ABC.logicalClusterId()));
        lkcIds.remove(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testKafkaLogicalClusterIdOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet<String> lkcIds = new HashSet<String>();
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_HEALTHCHECK, false));
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(this.createConsumerRecord(2L, Utils.LC_META_ABC, false));
        lkcIds.add(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(this.createConsumerRecord(3L, Utils.LC_META_XYZ, false));
        lkcIds.add(Utils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(this.createEmptyConsumerRecord(4L, Utils.LC_META_ABC.logicalClusterId(), false));
        lkcIds.remove(Utils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testCreate() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_ABC));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_XYZ));
        Assertions.assertEquals((Object)Utils.LC_META_XYZ, (Object)this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testCreateOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_ABC, false));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_XYZ, false));
        Assertions.assertEquals((Object)Utils.LC_META_XYZ, (Object)this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testDelete() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(2L, Utils.LC_META_ABC));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createEmptyConsumerRecord(1L, Utils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createEmptyConsumerRecord(3L, Utils.LC_META_ABC.logicalClusterId()));
        Assertions.assertNull((Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDeleteOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(2L, Utils.LC_META_ABC, false));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createEmptyConsumerRecord(1L, Utils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createEmptyConsumerRecord(3L, Utils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertNull((Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testIgnoreBadMessages() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        ConsumerRecord noHeaders = new ConsumerRecord(TOPIC, 0, 0L, (Object)Utils.LC_META_ABC.logicalClusterId(), (Object)Utils.protoFromMetadata(Utils.LC_META_ABC).toByteArray());
        this.metadataCache.consume(noHeaders);
        Assertions.assertNull((Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createConsumerRecord(2L, "lkc-other", Utils.LC_META_ABC));
        Assertions.assertNull((Object)this.metadataCache.metadata("lkc-other"));
        this.metadataCache.consume(this.createConsumerRecord(3L, "lkc-other", Utils.LC_META_ABC, false));
        Assertions.assertNull((Object)this.metadataCache.metadata("lkc-other"));
    }

    @Test
    public void testEndToEndLoadTimeMetric() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_ABC));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_XYZ));
        Assertions.assertEquals((Object)Utils.LC_META_XYZ, (Object)this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
        long delay1 = this.time.milliseconds() - Utils.CREATION_DATE_1.getTime();
        long delay2 = this.time.milliseconds() - Utils.CREATION_DATE_2.getTime();
        Assertions.assertEquals((double)delay1, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)"lkc-metadata-end-to-end-load-time-min"), (double)0.0);
        Assertions.assertEquals((double)delay2, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)"lkc-metadata-end-to-end-load-time-max"), (double)0.0);
        Assertions.assertEquals((double)(0.5 * (double)(delay1 + delay2)), (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)"lkc-metadata-end-to-end-load-time-avg"), (double)0.0);
    }

    @Test
    public void testEndToEndLoadTimeMetricOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_ABC, false));
        Assertions.assertEquals((Object)Utils.LC_META_ABC, (Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_XYZ, false));
        Assertions.assertEquals((Object)Utils.LC_META_XYZ, (Object)this.metadataCache.metadata(Utils.LC_META_XYZ.logicalClusterId()));
        long delay1 = this.time.milliseconds() - Utils.CREATION_DATE_1.getTime();
        long delay2 = this.time.milliseconds() - Utils.CREATION_DATE_2.getTime();
        Assertions.assertEquals((double)delay1, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)"lkc-metadata-end-to-end-load-time-min"), (double)0.0);
        Assertions.assertEquals((double)delay2, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)"lkc-metadata-end-to-end-load-time-max"), (double)0.0);
        Assertions.assertEquals((double)(0.5 * (double)(delay1 + delay2)), (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)"lkc-metadata-end-to-end-load-time-avg"), (double)0.0);
    }

    @Test
    public void testNumTenantsMetric() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        LogicalClusterMetadata[] clusters = new LogicalClusterMetadata[]{Utils.LC_META_ABC, Utils.LC_META_XYZ, Utils.LC_META_HEALTHCHECK};
        int count = 0;
        for (LogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(this.createConsumerRecord(1L, lcm));
            Assertions.assertEquals((int)(++count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
        for (LogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(this.createEmptyConsumerRecord(2L, lcm.logicalClusterId()));
            Assertions.assertEquals((int)(--count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
    }

    @Test
    public void testNumTenantsMetricOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        LogicalClusterMetadata[] clusters = new LogicalClusterMetadata[]{Utils.LC_META_ABC, Utils.LC_META_XYZ, Utils.LC_META_HEALTHCHECK};
        int count = 0;
        for (LogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(this.createConsumerRecord(1L, lcm, false));
            Assertions.assertEquals((int)(++count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
        for (LogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(this.createEmptyConsumerRecord(2L, lcm.logicalClusterId(), false));
            Assertions.assertEquals((int)(--count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
    }

    @Test
    public void testShouldNotReturnDeletedLogicalClusters() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        long deactivationTime = Utils.LC_META_DED.lifecycleMetadata().deletionDate().getTime();
        this.time.setCurrentTimeMs(deactivationTime + 1L);
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_ABC));
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_DED));
        Assertions.assertNotNull((Object)this.metadataCache.metadata(Utils.LC_META_ABC.logicalClusterId()), (String)"Tenant should be active");
        Assertions.assertNull((Object)this.metadataCache.metadata(Utils.LC_META_DED.logicalClusterId()), (String)"Tenant shouldn't be active");
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.deletedClusters().contains(Utils.LC_META_ABC.logicalClusterId()), (String)"We expect that the non deactivated cluster won't be in the process of getting deleted");
        Assertions.assertTrue((boolean)this.metadataCache.tenantLifecycleManager.deletedClusters().contains(Utils.LC_META_DED.logicalClusterId()), (String)"We expect that the deactivated cluster will be in process of getting deleted");
    }

    @Test
    public void testShouldCallAdminClientToDelete() throws IOException, InterruptedException {
        long deleteDelayMs = 3L;
        long checkIntervalMs = 50L;
        this.metadataCache.configure((ConfluentAdmin)this.mockAdminClient, BROKER_ID, this.sslCertsPath, deleteDelayMs, checkIntervalMs);
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        String clusterId = Utils.LC_META_DED.logicalClusterId();
        long deactivationTime = Utils.LC_META_DED.lifecycleMetadata().deletionDate().getTime();
        long deletionTime = deactivationTime + deleteDelayMs;
        Assertions.assertTrue((this.time.milliseconds() < deactivationTime ? 1 : 0) != 0, (String)"LC_META_DED uses the current Date() as deactivation\ndate, which should be newer then the mock time.");
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_META_DED));
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.deleteInProgressClusters().contains(clusterId));
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(clusterId));
        this.time.setCurrentTimeMs(deactivationTime + 1L);
        TestUtils.waitForCondition(() -> this.metadataCache.tenantLifecycleManager.deactivatedClusters().contains(clusterId), (long)15000L, (String)"Once deactivation time has passed, cluster should be marked for deactivation");
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.deleteInProgressClusters().contains(clusterId));
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(clusterId));
        this.time.setCurrentTimeMs(deletionTime + 1L);
        TestUtils.waitForCondition(() -> this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(clusterId), (long)15000L, (String)"Cluster should actually be deleted (that is, AdminClient called)");
        ((MockAdminClient)Mockito.verify((Object)this.mockAdminClient)).listTopics((ListTopicsOptions)ArgumentMatchers.any());
    }

    @Test
    public void testIgnoreNonKafkaClusters() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(this.createConsumerRecord(1L, Utils.LC_NOT_KAFKA));
        Assertions.assertEquals((int)0, (int)this.metadataCache.logicalClusterIds().size());
        Assertions.assertNull((Object)this.metadataCache.metadata("not-kafka"));
        Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
    }
}

