/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadataLoader;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadataTestUtils;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.LogicalClusterMetadata;
import org.apache.kafka.server.multitenant.MultiTenantMetadataPublisher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(value=60L)
public class KafkaLogicalClusterMetadataLoaderTest {
    private static final String LKC_METADATA_TOPIC = "_confluent-logical_cluster";
    private static final String EXTERNAL_LISTENER_NAME_1 = "EXTERNAL_A";
    private static final String EXTERNAL_LISTENER_NAME_2 = "EXTERNAL_B";
    private Metrics metrics;
    private Time time;
    private List<String> blockedListenerNames;
    private String topicName;
    private long topicLoadTimeoutMs;
    private KafkaBasedLog<String, byte[]> logicalClusterMetadataLog;
    private ToggleBlockingMultiTenantMetadataPublisher metadataPublisher;
    List<Endpoint> endpoints;
    private KafkaLogicalClusterMetadataLoader metadataLoader;

    @BeforeEach
    public void setUp() {
        this.metrics = new Metrics();
        this.time = new MockTime();
        this.blockedListenerNames = Arrays.asList(EXTERNAL_LISTENER_NAME_1, EXTERNAL_LISTENER_NAME_2);
        this.topicName = LKC_METADATA_TOPIC;
        this.topicLoadTimeoutMs = 600000L;
        this.logicalClusterMetadataLog = (KafkaBasedLog)Mockito.mock(KafkaBasedLog.class);
        this.metadataPublisher = new ToggleBlockingMultiTenantMetadataPublisher();
        this.endpoints = Arrays.asList(KafkaLogicalClusterMetadataTestUtils.createEndpoint("INTERNAL"), KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1));
        this.metadataLoader = new KafkaLogicalClusterMetadataLoader(this.metrics, this.time, this.blockedListenerNames, this.topicName, this.topicLoadTimeoutMs, this.logicalClusterMetadataLog);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.metrics.close();
        this.metadataLoader.close();
    }

    @Test
    public void testConstructor() {
        this.metadataLoader = new KafkaLogicalClusterMetadataLoader(Collections.emptyMap(), this.metrics, this.time);
    }

    @Test
    public void testOnlyBlockedEndpointsAreBlockedOnLogStartup() throws Exception {
        Map futures = this.metadataLoader.start(this.endpoints, Collections.singletonList(this.metadataPublisher));
        Map<String, Boolean> endpointIsBlocked = futures.entrySet().stream().collect(Collectors.toMap(e -> ((Endpoint)e.getKey()).listener(), e -> !((CompletableFuture)e.getValue()).isDone()));
        Assertions.assertFalse((boolean)endpointIsBlocked.containsKey("shouldn't happen"));
        Assertions.assertTrue((boolean)endpointIsBlocked.get(EXTERNAL_LISTENER_NAME_1));
        Assertions.assertFalse((boolean)endpointIsBlocked.get("INTERNAL"));
        this.metadataPublisher.unblock();
        ((CompletableFuture)futures.get(KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1))).get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testOnlyDoMetadataUpdateOncePerPublisherOnStartup() throws Exception {
        List<ConsumerRecord<String, byte[]>> toConsume = KafkaLogicalClusterMetadataTestUtils.generateRecords(1000);
        ((KafkaBasedLog)Mockito.doAnswer(invocation -> {
            toConsume.forEach(r -> this.metadataLoader.read(r));
            return null;
        }).when(this.logicalClusterMetadataLog)).start();
        Map futures = this.metadataLoader.start(this.endpoints, Collections.singletonList(this.metadataPublisher));
        this.metadataPublisher.unblock();
        ((CompletableFuture)futures.get(KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1))).get(10L, TimeUnit.SECONDS);
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((int)1000, (int)this.metadataPublisher.numLogicalClusters());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((int)3, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testOnlyDoMetadataUpdateForRecordsWithLargerSequenceId() throws Exception {
        this.startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ_DELETED));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        Assertions.assertEquals((int)3, (int)this.metadataPublisher.metadataUpdateCount());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        Assertions.assertEquals((int)3, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((int)4, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDoMetadataUpdateWithKafkaAndByoc() throws Exception {
        this.startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_XYZ, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT));
        Assertions.assertEquals((int)3, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT));
        Assertions.assertEquals((int)4, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
    }

    @Test
    public void testDelete() throws Exception {
        this.startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDeleteByoc() throws Exception {
        this.startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT, (Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
        Assertions.assertEquals((int)3, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
    }

    @Test
    public void testIgnoreBadMessages() throws Exception {
        this.startKafkaLogicalClusterMetadataLoader();
        ConsumerRecord noKey = new ConsumerRecord(LKC_METADATA_TOPIC, 0, 0L, null, (Object)KafkaLogicalClusterUtils.protoFromMetadata((KafkaLogicalClusterMetadata)KafkaLogicalClusterUtils.LC_META_ABC).toByteArray());
        this.metadataLoader.read(noKey);
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        ConsumerRecord noHeaders = new ConsumerRecord(LKC_METADATA_TOPIC, 0, 0L, (Object)KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), (Object)KafkaLogicalClusterUtils.protoFromMetadata((KafkaLogicalClusterMetadata)KafkaLogicalClusterUtils.LC_META_ABC).toByteArray());
        this.metadataLoader.read(noHeaders);
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, "lkc-other", KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata("lkc-other"));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, "lkc-other", KafkaLogicalClusterUtils.LC_META_ABC, false));
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata("lkc-other"));
    }

    @Test
    public void testIgnoreNonKafkaClusters() throws Exception {
        this.startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_NOT_KAFKA));
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull((Object)this.metadataPublisher.logicalClusterMetadata("not-kafka"));
    }

    @Test
    public void testSendUpdates() throws Exception {
        List<ConsumerRecord> toConsume = Arrays.asList(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ), KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_XYZ_DELETED), KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_TENANT2));
        ((KafkaBasedLog)Mockito.doAnswer(invocation -> {
            toConsume.forEach(r -> this.metadataLoader.read(r));
            return null;
        }).when(this.logicalClusterMetadataLog)).start();
        this.startKafkaLogicalClusterMetadataLoader();
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.numLogicalClusters());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(4L, KafkaLogicalClusterUtils.LC_META_TENANT2_DELETED));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.numLogicalClusters());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(5L, KafkaLogicalClusterUtils.LC_META_TENANT2));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(6L, KafkaLogicalClusterUtils.LC_META_TENANT2_DELETED));
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals((int)2, (int)this.metadataPublisher.numLogicalClusters());
    }

    private void startKafkaLogicalClusterMetadataLoader() throws Exception {
        Map futures = this.metadataLoader.start(this.endpoints, Collections.singletonList(this.metadataPublisher));
        this.metadataPublisher.unblock();
        ((CompletableFuture)futures.get(KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1))).get(10L, TimeUnit.SECONDS);
        Assertions.assertEquals((int)1, (int)this.metadataPublisher.metadataUpdateCount());
    }

    private static class ToggleBlockingMultiTenantMetadataPublisher
    implements MultiTenantMetadataPublisher {
        private final Semaphore sem = new Semaphore(0);
        private final Map<String, LogicalClusterMetadata> logicalClusterMetadataMap = new HashMap<String, LogicalClusterMetadata>();
        private int metadataUpdateCount = 0;

        private ToggleBlockingMultiTenantMetadataPublisher() {
        }

        public String name() {
            return this.getClass().getSimpleName();
        }

        public void onMetadataUpdate(Map<String, ? extends LogicalClusterMetadata> multiTenantMetadataDelta) throws Exception {
            this.sem.acquire();
            try {
                ++this.metadataUpdateCount;
                this.logicalClusterMetadataMap.putAll(multiTenantMetadataDelta);
            }
            finally {
                this.sem.release();
            }
        }

        public void unblock() {
            this.sem.release();
        }

        public int metadataUpdateCount() throws InterruptedException {
            this.sem.acquire();
            try {
                int n = this.metadataUpdateCount;
                return n;
            }
            finally {
                this.sem.release();
            }
        }

        public LogicalClusterMetadata logicalClusterMetadata(String logicalClusterId) throws InterruptedException {
            this.sem.acquire();
            try {
                LogicalClusterMetadata logicalClusterMetadata = this.logicalClusterMetadataMap.get(logicalClusterId);
                return logicalClusterMetadata;
            }
            finally {
                this.sem.release();
            }
        }

        public int numLogicalClusters() throws InterruptedException {
            this.sem.acquire();
            try {
                int n = this.logicalClusterMetadataMap.size();
                return n;
            }
            finally {
                this.sem.release();
            }
        }
    }
}

