package io.confluent.kafka.multitenant;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.LogicalClusterMetadata;
import org.apache.kafka.server.multitenant.MultiTenantMetadataPublisher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(60)
/* loaded from: input_file:io/confluent/kafka/multitenant/KafkaLogicalClusterMetadataLoaderTest.class */
public class KafkaLogicalClusterMetadataLoaderTest {
    private static final String LKC_METADATA_TOPIC = "_confluent-logical_cluster";
    private static final String EXTERNAL_LISTENER_NAME_1 = "EXTERNAL_A";
    private static final String EXTERNAL_LISTENER_NAME_2 = "EXTERNAL_B";
    private Metrics metrics;
    private Time time;
    private List<String> blockedListenerNames;
    private String topicName;
    private long topicLoadTimeoutMs;
    private KafkaBasedLog<String, byte[]> logicalClusterMetadataLog;
    private ToggleBlockingMultiTenantMetadataPublisher metadataPublisher;
    List<Endpoint> endpoints;
    private KafkaLogicalClusterMetadataLoader metadataLoader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/KafkaLogicalClusterMetadataLoaderTest$ToggleBlockingMultiTenantMetadataPublisher.class */
    public static class ToggleBlockingMultiTenantMetadataPublisher implements MultiTenantMetadataPublisher {
        private final Semaphore sem = new Semaphore(0);
        private final Map<String, LogicalClusterMetadata> logicalClusterMetadataMap = new HashMap();
        private int metadataUpdateCount = 0;

        private ToggleBlockingMultiTenantMetadataPublisher() {
        }

        public String name() {
            return getClass().getSimpleName();
        }

        public void onMetadataUpdate(Map<String, ? extends LogicalClusterMetadata> map) throws Exception {
            this.sem.acquire();
            try {
                this.metadataUpdateCount++;
                this.logicalClusterMetadataMap.putAll(map);
            } finally {
                this.sem.release();
            }
        }

        public void unblock() {
            this.sem.release();
        }

        public int metadataUpdateCount() throws InterruptedException {
            this.sem.acquire();
            try {
                return this.metadataUpdateCount;
            } finally {
                this.sem.release();
            }
        }

        public LogicalClusterMetadata logicalClusterMetadata(String str) throws InterruptedException {
            this.sem.acquire();
            try {
                return this.logicalClusterMetadataMap.get(str);
            } finally {
                this.sem.release();
            }
        }

        public int numLogicalClusters() throws InterruptedException {
            this.sem.acquire();
            try {
                return this.logicalClusterMetadataMap.size();
            } finally {
                this.sem.release();
            }
        }
    }

    @BeforeEach
    public void setUp() {
        this.metrics = new Metrics();
        this.time = new MockTime();
        this.blockedListenerNames = Arrays.asList(EXTERNAL_LISTENER_NAME_1, EXTERNAL_LISTENER_NAME_2);
        this.topicName = LKC_METADATA_TOPIC;
        this.topicLoadTimeoutMs = 600000L;
        this.logicalClusterMetadataLog = (KafkaBasedLog) Mockito.mock(KafkaBasedLog.class);
        this.metadataPublisher = new ToggleBlockingMultiTenantMetadataPublisher();
        this.endpoints = Arrays.asList(KafkaLogicalClusterMetadataTestUtils.createEndpoint("INTERNAL"), KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1));
        this.metadataLoader = new KafkaLogicalClusterMetadataLoader(this.metrics, this.time, this.blockedListenerNames, this.topicName, this.topicLoadTimeoutMs, this.logicalClusterMetadataLog);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.metrics.close();
        this.metadataLoader.close();
    }

    @Test
    public void testConstructor() {
        this.metadataLoader = new KafkaLogicalClusterMetadataLoader(Collections.emptyMap(), this.metrics, this.time);
    }

    @Test
    public void testOnlyBlockedEndpointsAreBlockedOnLogStartup() throws Exception {
        Map start = this.metadataLoader.start(this.endpoints, Collections.singletonList(this.metadataPublisher));
        Map map = (Map) start.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (String) ((Endpoint) entry.getKey()).listenerName().orElse("shouldn't happen");
        }, entry2 -> {
            return Boolean.valueOf(!((CompletableFuture) entry2.getValue()).isDone());
        }));
        Assertions.assertFalse(map.containsKey("shouldn't happen"));
        Assertions.assertTrue(((Boolean) map.get(EXTERNAL_LISTENER_NAME_1)).booleanValue());
        Assertions.assertFalse(((Boolean) map.get("INTERNAL")).booleanValue());
        this.metadataPublisher.unblock();
        ((CompletableFuture) start.get(KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1))).get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testOnlyDoMetadataUpdateOncePerPublisherOnStartup() throws Exception {
        List<ConsumerRecord<String, byte[]>> generateRecords = KafkaLogicalClusterMetadataTestUtils.generateRecords(1000);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            generateRecords.forEach(consumerRecord -> {
                this.metadataLoader.read(consumerRecord);
            });
            return null;
        }).when(this.logicalClusterMetadataLog)).start();
        Map start = this.metadataLoader.start(this.endpoints, Collections.singletonList(this.metadataPublisher));
        this.metadataPublisher.unblock();
        ((CompletableFuture) start.get(KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1))).get(10L, TimeUnit.SECONDS);
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(1000, this.metadataPublisher.numLogicalClusters());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals(3, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_XYZ, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
    }

    @Test
    public void testOnlyDoMetadataUpdateForRecordsWithLargerSequenceId() throws Exception {
        startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_XYZ, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ_DELETED));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_XYZ, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        Assertions.assertEquals(3, this.metadataPublisher.metadataUpdateCount());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        Assertions.assertEquals(3, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals(4, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDoMetadataUpdateWithKafkaAndByoc() throws Exception {
        startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_XYZ, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT));
        Assertions.assertEquals(3, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT));
        Assertions.assertEquals(4, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
    }

    @Test
    public void testDelete() throws Exception {
        startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
    }

    @Test
    public void testDeleteByoc() throws Exception {
        startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT, this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createEmptyConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
        Assertions.assertEquals(3, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_INTERNAL_BYOC_TENANT.logicalClusterId()));
    }

    @Test
    public void testIgnoreBadMessages() throws Exception {
        startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(new ConsumerRecord(LKC_METADATA_TOPIC, 0, 0L, (Object) null, KafkaLogicalClusterUtils.protoFromMetadata(KafkaLogicalClusterUtils.LC_META_ABC).toByteArray()));
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(new ConsumerRecord(LKC_METADATA_TOPIC, 0, 0L, KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), KafkaLogicalClusterUtils.protoFromMetadata(KafkaLogicalClusterUtils.LC_META_ABC).toByteArray()));
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, "lkc-other", KafkaLogicalClusterUtils.LC_META_ABC));
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata("lkc-other"));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, "lkc-other", KafkaLogicalClusterUtils.LC_META_ABC, false));
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata("lkc-other"));
    }

    @Test
    public void testIgnoreNonKafkaClusters() throws Exception {
        startKafkaLogicalClusterMetadataLoader();
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_NOT_KAFKA));
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertNull(this.metadataPublisher.logicalClusterMetadata("not-kafka"));
    }

    @Test
    public void testSendUpdates() throws Exception {
        List asList = Arrays.asList(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(1L, KafkaLogicalClusterUtils.LC_META_XYZ), KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(2L, KafkaLogicalClusterUtils.LC_META_XYZ_DELETED), KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(3L, KafkaLogicalClusterUtils.LC_META_TENANT2));
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            asList.forEach(consumerRecord -> {
                this.metadataLoader.read(consumerRecord);
            });
            return null;
        }).when(this.logicalClusterMetadataLog)).start();
        startKafkaLogicalClusterMetadataLoader();
        Assertions.assertEquals(2, this.metadataPublisher.numLogicalClusters());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(4L, KafkaLogicalClusterUtils.LC_META_TENANT2_DELETED));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(2, this.metadataPublisher.numLogicalClusters());
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(5L, KafkaLogicalClusterUtils.LC_META_TENANT2));
        this.metadataLoader.read(KafkaLogicalClusterMetadataTestUtils.createConsumerRecord(6L, KafkaLogicalClusterUtils.LC_META_TENANT2_DELETED));
        Assertions.assertEquals(2, this.metadataPublisher.metadataUpdateCount());
        Assertions.assertEquals(2, this.metadataPublisher.numLogicalClusters());
    }

    private void startKafkaLogicalClusterMetadataLoader() throws Exception {
        Map start = this.metadataLoader.start(this.endpoints, Collections.singletonList(this.metadataPublisher));
        this.metadataPublisher.unblock();
        ((CompletableFuture) start.get(KafkaLogicalClusterMetadataTestUtils.createEndpoint(EXTERNAL_LISTENER_NAME_1))).get(10L, TimeUnit.SECONDS);
        Assertions.assertEquals(1, this.metadataPublisher.metadataUpdateCount());
    }
}
