/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.clients.CloudAdmin;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadataTestUtils;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.server.quota.ElasticCkuDefinitionUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TopicBasedPhysicalClusterMetadataTest {
    private static final String SSL_CERTS_DIR = "mnt/sslcerts/";
    private static final String SSL_CERTS_BOOTSTRAP_DIR = "tmp/keystore.jks";
    private static final String BROKER_ID = "0";
    private static final String LC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = "lkc-metadata-end-to-end-load-time-min";
    private static final String LC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = "lkc-metadata-end-to-end-load-time-max";
    private static final String LC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = "lkc-metadata-end-to-end-load-time-avg";
    private static final String BROKER_UUID = "test-uuid-3";
    private MockAdminClient mockAdminClient;
    private MockTime time;
    private Metrics metrics;
    private String sslCertsPath;
    private Path tempDir;
    private KafkaBasedLog<String, byte[]> lkcLog;
    private TopicBasedPhysicalClusterMetadata metadataCache;
    private static final String EXTERNAL_LISTENER_NAME_1 = "EXTERNAL_A";
    private static final String EXTERNAL_LISTENER_NAME_2 = "EXTERNAL_B";
    private Endpoint endpoint;

    @BeforeEach
    public void setUp() throws Exception {
        this.tempDir = TestUtils.tempDirectory().toPath();
        this.metrics = new Metrics();
        long mockClockStartMs = KafkaLogicalClusterUtils.LC_META_DED.lifecycleMetadata().deletionDate().getTime() - 1L;
        this.time = new MockTime(0L, mockClockStartMs, System.nanoTime());
        this.metadataCache = new TopicBasedPhysicalClusterMetadata(this.metrics, (Time)this.time);
        Node node = new Node(0, "localhost", 9092);
        this.mockAdminClient = (MockAdminClient)Mockito.spy((Object)new MockAdminClient.Builder().brokers(Collections.singletonList(node)).controller(0).build());
        this.sslCertsPath = String.valueOf(this.tempDir.toRealPath(new LinkOption[0])) + "/mnt/sslcerts/spec.json";
        this.endpoint = new Endpoint("INTERNAL", SecurityProtocol.SSL, node.host(), node.port());
        this.metadataCache.configure((CloudAdmin)this.mockAdminClient, BROKER_ID, this.sslCertsPath, 0L, false, 1L, Arrays.asList(EXTERNAL_LISTENER_NAME_1, EXTERNAL_LISTENER_NAME_2), false);
    }

    @AfterEach
    public void tearDown() {
        this.metadataCache.shutdown();
    }

    @Test
    public void testSslCertificateManagerDisabled() throws Exception {
        TopicBasedPhysicalClusterMetadata metadata = new TopicBasedPhysicalClusterMetadata(this.metrics, (Time)this.time);
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.broker.listeners.ssl.file.store.reload.enable", "true");
        configs.put("broker.session.uuid", "session_id");
        configs.put("confluent.cdc.lkc.metadata.topic", "topic_config");
        configs.put("confluent.cdc.api.keys.topic.load.timeout.ms", 200L);
        metadata.configure(configs);
        Assertions.assertNull((Object)metadata.sslCertificateManager);
    }

    @Test
    public void testEndpointsToFutures() {
        CompletableFuture dummyFuture = new CompletableFuture();
        List<Endpoint> endpoints = Arrays.asList(KafkaLogicalClusterMetadataTestUtils.createEndpoint("internal"), KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1));
        Map futures = this.metadataCache.endpointsToFutures(endpoints, dummyFuture);
        Map<String, Boolean> endpointIsBlocked = futures.entrySet().stream().collect(Collectors.toMap(e -> ((Endpoint)e.getKey()).listenerName().orElse("shouldn't happen"), e -> !((CompletableFuture)e.getValue()).isDone()));
        Assertions.assertFalse((boolean)endpointIsBlocked.containsKey("shouldn't happen"));
        Assertions.assertTrue((boolean)endpointIsBlocked.get(EXTERNAL_LISTENER_NAME_1));
        Assertions.assertFalse((boolean)endpointIsBlocked.get("internal"));
    }

    @Test
    public void testLinkRequestPercentageMultipler() {
        String lkcId = KafkaLogicalClusterMetadataTestUtils.generateLkcId(ThreadLocalRandom.current());
        KafkaLogicalClusterMetadata lcm = KafkaLogicalClusterMetadataTestUtils.createLcm(lkcId);
        Double brokerRequestPercentage = lcm.brokerRequestPercentage();
        boolean brokerId = true;
        TenantQuotaCallback quotaCallback = new TenantQuotaCallback();
        quotaCallback.configure(Collections.singletonMap("broker.id", BROKER_ID));
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet<String> lkcIds = new HashSet<String>();
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, lkcId, lcm, true));
        lkcIds.add(lkcId);
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        Map<String, String> tags = Collections.singletonMap("tenant", lkcId);
        Assertions.assertEquals((double)brokerRequestPercentage, (double)quotaCallback.quotaLimit(ClientQuotaType.REQUEST, tags), (double)0.001);
        Assertions.assertEquals((double)brokerRequestPercentage, (double)quotaCallback.quotaLimit(ClientQuotaType.LINK_REQUEST, tags), (double)0.001);
        double multiplier = 0.8;
        this.metadataCache.reconfigure(Collections.singletonMap("confluent.cluster.link.request.quota.request.percentage.multiplier", multiplier));
        Assertions.assertEquals((double)brokerRequestPercentage, (double)quotaCallback.quotaLimit(ClientQuotaType.REQUEST, tags), (double)0.001);
        Assertions.assertEquals((double)(brokerRequestPercentage * multiplier), (double)quotaCallback.quotaLimit(ClientQuotaType.LINK_REQUEST, tags), (double)0.001);
        Assertions.assertTrue((boolean)quotaCallback.quotaResetRequired(ClientQuotaType.REQUEST));
        Assertions.assertTrue((boolean)quotaCallback.quotaResetRequired(ClientQuotaType.LINK_REQUEST));
        multiplier = 0.5;
        this.metadataCache.reconfigure(Collections.singletonMap("confluent.cluster.link.request.quota.request.percentage.multiplier", multiplier));
        Assertions.assertEquals((double)brokerRequestPercentage, (double)quotaCallback.quotaLimit(ClientQuotaType.REQUEST, tags), (double)0.001);
        Assertions.assertEquals((double)(brokerRequestPercentage * multiplier), (double)quotaCallback.quotaLimit(ClientQuotaType.LINK_REQUEST, tags), (double)0.001);
    }

    @Test
    public void testOnlyCallExpensiveOperationsOnceOnStartup() {
        final List<ConsumerRecord<String, byte[]>> toConsume = KafkaLogicalClusterMetadataTestUtils.generateRecords(1000);
        KafkaBasedLog mockedLog = (KafkaBasedLog)Mockito.mock(KafkaBasedLog.class);
        ((KafkaBasedLog)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                toConsume.forEach(r -> {
                    TopicBasedPhysicalClusterMetadataTest.this.metadataCache.consume(r);
                    Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)TopicBasedPhysicalClusterMetadataTest.this.metrics, (String)"number-of-tenants"));
                    Assertions.assertTrue((boolean)TopicBasedPhysicalClusterMetadataTest.this.metadataCache.kafkaLogicalClusterIds().isEmpty(), (String)"Id list only updated at the end");
                });
                return null;
            }
        }).when((Object)mockedLog)).start();
        this.metadataCache.start(mockedLog);
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        Assertions.assertEquals((int)1000, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertEquals(toConsume.stream().map(r -> (String)r.key()).sorted().collect(Collectors.toList()), this.metadataCache.kafkaLogicalClusterIds().stream().sorted().collect(Collectors.toList()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((int)1001, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertTrue((boolean)this.metadataCache.kafkaLogicalClusterIds().contains(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((int)1002, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertTrue((boolean)this.metadataCache.kafkaLogicalClusterIds().contains(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testKafkaLogicalClusterId() {
        TenantQuotaCallback quotaCallback = new TenantQuotaCallback();
        quotaCallback.configure(Collections.singletonMap("broker.id", BROKER_ID));
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet<String> lkcIds = new HashSet<String>();
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_HEALTHCHECK));
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        Assertions.assertEquals((Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId())), (Object)KafkaLogicalClusterUtils.ELASTIC_CKU_METADATA_DEFAULT);
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        lkcIds.add(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ));
        lkcIds.add(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN));
        lkcIds.add(KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        Assertions.assertEquals((Object)ElasticCkuDefinitionUtils.DEFAULT_ELASTIC_CKU_DEFINITION, (Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId())));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN));
        lkcIds.add(KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        Assertions.assertNull((Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId())));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(4L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        lkcIds.remove(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testKafkaLogicalClusterInvalidEckuDefinition() throws InterruptedException {
        TenantQuotaCallback quotaCallback = new TenantQuotaCallback();
        quotaCallback.configure(Collections.singletonMap("broker.id", BROKER_ID));
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet lkcIds = new HashSet();
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_WITH_VALID_ECKU_DEFN));
        Assertions.assertEquals((Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.LC_WITH_VALID_ECKU_DEFN.logicalClusterId())), (Object)KafkaLogicalClusterUtils.LC_WITH_VALID_ECKU_DEFN.elasticCkuMetadata());
        Utils.verifyInvalidECkuMetadataMetricNotPresent(KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN));
        Assertions.assertNull((Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId())));
        Utils.verifyInvalidECkuMetadataMetricNotPresent(KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN));
        Assertions.assertEquals((Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId())), (Object)ElasticCkuDefinitionUtils.DEFAULT_ELASTIC_CKU_DEFINITION);
        Utils.verifyInvalidECkuMetadataMetricPresent(KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN));
        Assertions.assertEquals((Object)quotaCallback.elasticCkuDefinition(Map.of("tenant", KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN.logicalClusterId())), (Object)KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN.elasticCkuMetadata());
        Utils.verifyInvalidECkuMetadataMetricNotPresent(KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
    }

    @Test
    public void testDedicatedCKUCountMetric() throws Throwable {
        TenantQuotaCallback quotaCallback = new TenantQuotaCallback();
        quotaCallback.configure(Collections.singletonMap("broker.id", BROKER_ID));
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet<String> lkcIds = new HashSet<String>();
        lkcIds.add(KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT.logicalClusterId());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT));
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.verifyDedicatedCKUMetric(KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT.logicalClusterId(), KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT.cku());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT_UPDATED_CKU));
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.verifyDedicatedCKUMetric(KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT.logicalClusterId(), KafkaLogicalClusterUtils.LC_META_DEDICATED_TENANT_UPDATED_CKU.cku());
    }

    private void verifyDedicatedCKUMetric(String lkcId, int cku) throws InterruptedException {
        String name = "dedicated-cku";
        String type = "KafkaLogicalClusterMetadata";
        Map metricsMap = KafkaYammerMetrics.defaultRegistry().allMetrics();
        TestCondition condition = () -> metricsMap.entrySet().stream().filter(e -> {
            MetricName metricName = (MetricName)e.getKey();
            return metricName.getName().equals(name) && metricName.getType().equals(type) && metricName.getMBeanName().contains(String.format("%s=%s", "tenant", lkcId));
        }).filter(e -> {
            Gauge metricValue = (Gauge)e.getValue();
            return metricValue != null && (Integer)metricValue.value() == cku;
        }).count() == 1L;
        TestUtils.waitForCondition((TestCondition)condition, (String)("Dedicated-CKU metric is not present for " + lkcId));
    }

    @Test
    public void testTenantRegisterCallBacks() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        AtomicInteger tenantCount = new AtomicInteger(0);
        AtomicInteger otherTenantCount = new AtomicInteger(0);
        Function<String, Boolean> testTenantCallbackFunction = tenant -> {
            tenantCount.getAndIncrement();
            return Boolean.TRUE;
        };
        this.metadataCache.registerTenantCallback(testTenantCallbackFunction);
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_TENANT1));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_TENANT2));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_TENANT3));
        Assertions.assertEquals((int)3, (int)tenantCount.get());
        Function<String, Boolean> otherTestTenantCallbackFunction = tenant -> {
            otherTenantCount.getAndIncrement();
            return Boolean.TRUE;
        };
        this.metadataCache.registerTenantCallback(otherTestTenantCallbackFunction);
        Assertions.assertEquals((int)3, (int)otherTenantCount.get());
    }

    @Test
    public void testTenantDeactivateCallBacks() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        AtomicInteger tenantCount = new AtomicInteger(0);
        Function<String, Boolean> incrementTenantCount = tenant -> {
            tenantCount.getAndIncrement();
            return Boolean.TRUE;
        };
        Function<String, Boolean> decrementTenantCount = tenant -> {
            tenantCount.getAndDecrement();
            return Boolean.TRUE;
        };
        this.metadataCache.registerTenantCallback(incrementTenantCount);
        this.metadataCache.registerTenantDeactivatedCallback(decrementTenantCount);
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_TENANT1));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_TENANT2));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_TENANT3));
        Assertions.assertEquals((int)3, (int)tenantCount.get());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(4L, KafkaLogicalClusterUtils.LC_META_TENANT1_DELETED));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(5L, KafkaLogicalClusterUtils.LC_META_TENANT2_DELETED));
        Assertions.assertEquals((int)1, (int)tenantCount.get());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(6L, KafkaLogicalClusterUtils.LC_META_TENANT1));
        Assertions.assertEquals((int)2, (int)tenantCount.get());
    }

    @Test
    public void testKafkaLogicalClusterIdOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        HashSet<String> lkcIds = new HashSet<String>();
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, false));
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC, false));
        lkcIds.add(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ, false));
        lkcIds.add(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(4L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false));
        lkcIds.remove(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        Assertions.assertEquals(lkcIds, (Object)this.metadataCache.kafkaLogicalClusterIds());
    }

    @Test
    public void testCreate() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testCreateOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC, false));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ, false));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testDelete() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertNull((Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDeleteOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC, false));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false));
        Assertions.assertNull((Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testIgnoreBadMessages() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        ConsumerRecord noHeaders = new ConsumerRecord("_confluent-logical_cluster", 0, 0L, (Object)KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), (Object)KafkaLogicalClusterUtils.protoFromMetadata((KafkaLogicalClusterMetadata)KafkaLogicalClusterUtils.LC_META_ABC).toByteArray());
        this.metadataCache.consume(noHeaders);
        Assertions.assertNull((Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, "lkc-other", KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertNull((Object)this.metadataCache.metadata("lkc-other"));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, "lkc-other", KafkaLogicalClusterUtils.LC_META_ABC, false));
        Assertions.assertNull((Object)this.metadataCache.metadata("lkc-other"));
    }

    @Test
    public void testInvalidECKUMetadataOnStartup() {
        KafkaBasedLog log = (KafkaBasedLog)Mockito.mock(KafkaBasedLog.class);
        TenantQuotaCallback quotaCallback = new TenantQuotaCallback();
        quotaCallback.configure(Collections.singletonMap("broker.id", BROKER_ID));
        String lkcId = KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_QUOTA.logicalClusterId();
        ConsumerRecord<String, byte[]> invalidECKUMetadata = KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, lkcId, KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_QUOTA);
        ((KafkaBasedLog)Mockito.doAnswer(invocation -> {
            this.metadataCache.consume(invalidECKUMetadata);
            return null;
        }).when((Object)log)).start();
        this.metadataCache.start(log);
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
    }

    @Test
    public void testEndToEndLoadTimeMetric() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        long delay1 = this.time.milliseconds() - KafkaLogicalClusterUtils.CREATION_DATE_1.getTime();
        long delay2 = this.time.milliseconds() - KafkaLogicalClusterUtils.CREATION_DATE_2.getTime();
        Assertions.assertEquals((double)delay1, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)LC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME), (double)0.0);
        Assertions.assertEquals((double)delay2, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)LC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME), (double)0.0);
        Assertions.assertEquals((double)(0.5 * (double)(delay1 + delay2)), (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)LC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME), (double)0.0);
    }

    @Test
    public void testEndToEndLoadTimeMetricOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC, false));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ, false));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        long delay1 = this.time.milliseconds() - KafkaLogicalClusterUtils.CREATION_DATE_1.getTime();
        long delay2 = this.time.milliseconds() - KafkaLogicalClusterUtils.CREATION_DATE_2.getTime();
        Assertions.assertEquals((double)delay1, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)LC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME), (double)0.0);
        Assertions.assertEquals((double)delay2, (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)LC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME), (double)0.0);
        Assertions.assertEquals((double)(0.5 * (double)(delay1 + delay2)), (double)TestUtils.getMetricValue((Metrics)this.metrics, (String)LC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME), (double)0.0);
    }

    @Test
    public void testNumTenantsMetric() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        KafkaLogicalClusterMetadata[] clusters = new KafkaLogicalClusterMetadata[]{KafkaLogicalClusterUtils.LC_META_ABC, KafkaLogicalClusterUtils.LC_META_XYZ, KafkaLogicalClusterUtils.LC_META_HEALTHCHECK};
        int count = 0;
        for (KafkaLogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, lcm));
            Assertions.assertEquals((int)(++count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
        for (KafkaLogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(2L, lcm.logicalClusterId()));
            Assertions.assertEquals((int)(--count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
    }

    @Test
    public void testNumTenantsMetricOldSeqIdHeader() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        KafkaLogicalClusterMetadata[] clusters = new KafkaLogicalClusterMetadata[]{KafkaLogicalClusterUtils.LC_META_ABC, KafkaLogicalClusterUtils.LC_META_XYZ, KafkaLogicalClusterUtils.LC_META_HEALTHCHECK};
        int count = 0;
        for (KafkaLogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, lcm, false));
            Assertions.assertEquals((int)(++count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
        for (KafkaLogicalClusterMetadata lcm : clusters) {
            this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(2L, lcm.logicalClusterId(), false));
            Assertions.assertEquals((int)(--count), (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        }
    }

    @Test
    public void testActiveNonHealthcheckLkcCount() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_HEALTHCHECK));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((int)3, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertEquals((int)2, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-non-hc-tenants"));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(4L, KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId(), false));
        Assertions.assertEquals((int)2, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
        Assertions.assertEquals((int)1, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-non-hc-tenants"));
    }

    @Test
    public void testShouldNotReturnDeletedLogicalClusters() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        long deactivationTime = KafkaLogicalClusterUtils.LC_META_DED.lifecycleMetadata().deletionDate().getTime();
        this.time.setCurrentTimeMs(deactivationTime + 1L);
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC));
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_DED));
        Assertions.assertNotNull((Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()), (String)"Tenant should be active");
        Assertions.assertNull((Object)this.metadataCache.metadata(KafkaLogicalClusterUtils.LC_META_DED.logicalClusterId()), (String)"Tenant shouldn't be active");
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.deletedClusters().contains(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()), (String)"We expect that the non deactivated cluster won't be in the process of getting deleted");
        Assertions.assertTrue((boolean)this.metadataCache.tenantLifecycleManager.deletedClusters().contains(KafkaLogicalClusterUtils.LC_META_DED.logicalClusterId()), (String)"We expect that the deactivated cluster will be in process of getting deleted");
    }

    @Test
    public void testShouldCallAdminClientToDelete() throws IOException, InterruptedException {
        long deleteDelayMs = 3L;
        long checkIntervalMs = 50L;
        this.metadataCache.configure((CloudAdmin)this.mockAdminClient, BROKER_ID, this.sslCertsPath, deleteDelayMs, false, checkIntervalMs, Collections.emptyList(), false);
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        String clusterId = KafkaLogicalClusterUtils.LC_META_DED.logicalClusterId();
        long deactivationTime = KafkaLogicalClusterUtils.LC_META_DED.lifecycleMetadata().deletionDate().getTime();
        long deletionTime = deactivationTime + deleteDelayMs;
        Assertions.assertTrue((this.time.milliseconds() < deactivationTime ? 1 : 0) != 0, (String)"LC_META_DED uses the current Date() as deactivation\ndate, which should be newer then the mock time.");
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_DED));
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.deleteInProgressClusters().contains(clusterId));
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(clusterId));
        this.time.setCurrentTimeMs(deactivationTime + 1L);
        TestUtils.waitForCondition(() -> this.metadataCache.tenantLifecycleManager.deactivatedClusters().contains(clusterId), (long)15000L, (String)"Once deactivation time has passed, cluster should be marked for deactivation");
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.deleteInProgressClusters().contains(clusterId));
        Assertions.assertFalse((boolean)this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(clusterId));
        this.time.setCurrentTimeMs(deletionTime + 1L);
        TestUtils.waitForCondition(() -> this.metadataCache.tenantLifecycleManager.fullyDeletedClusters().contains(clusterId), (long)15000L, (String)"Cluster should actually be deleted (that is, AdminClient called)");
        ((MockAdminClient)Mockito.verify((Object)this.mockAdminClient)).listTopics((ListTopicsOptions)ArgumentMatchers.any());
    }

    @Test
    public void testIgnoreNonKafkaClusters() {
        this.metadataCache.start((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class));
        Assertions.assertTrue((boolean)this.metadataCache.isUp());
        this.metadataCache.consume(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_NOT_KAFKA));
        Assertions.assertEquals((int)0, (int)this.metadataCache.logicalClusterIds().size());
        Assertions.assertNull((Object)this.metadataCache.metadata("not-kafka"));
        Assertions.assertEquals((int)0, (int)TestUtils.getIntMetricValue((Metrics)this.metrics, (String)"number-of-tenants"));
    }

    @Test
    public void testFlexFanountTenantEnableIsLoadedOnStartup() {
        TenantQuotaCallback quotaCallback = new TenantQuotaCallback();
        quotaCallback.configure(Collections.singletonMap("broker.id", BROKER_ID));
        String lkcId = KafkaLogicalClusterUtils.LC_META_TENANT1.logicalClusterId();
        ConsumerRecord<String, byte[]> tenantWithFlexFanoutEnabledRecord = KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, lkcId, KafkaLogicalClusterUtils.LC_META_TENANT1);
        KafkaBasedLog mockedLog = (KafkaBasedLog)Mockito.mock(KafkaBasedLog.class);
        ((KafkaBasedLog)Mockito.doAnswer(invocation -> {
            this.metadataCache.consume(tenantWithFlexFanoutEnabledRecord);
            return null;
        }).when((Object)mockedLog)).start();
        this.metadataCache.start(mockedLog);
        Set tenantWithFlexFanoutEnabled = quotaCallback.getTenantWithFlexFanoutEnabled();
        Assertions.assertEquals((int)1, (int)tenantWithFlexFanoutEnabled.size());
        Assertions.assertTrue((boolean)tenantWithFlexFanoutEnabled.contains(lkcId));
    }
}

