package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.assignor.TenantPartitionAssignorBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedPhysicalClusterMetadataIntegrationTest.class */
public class TopicBasedPhysicalClusterMetadataIntegrationTest {
    private final String topicName = "_confluent-logical_clusters";
    private final int brokerCount = 3;
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        Map map = (Map) Stream.of(new Object[]{KafkaConfig.ListenersProp(), "INTERNAL://localhost:0, EXTERNAL://localhost:0"}, new Object[]{"advertised.listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0"}, new Object[]{KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:PLAINTEXT, EXTERNAL:SASL_PLAINTEXT"}, new Object[]{"inter.broker.listener.name", "INTERNAL"}, new Object[]{"confluent.multitenant.listener.names", "EXTERNAL"}).collect(Collectors.toMap(objArr -> {
            return objArr[0].toString();
        }, objArr2 -> {
            return objArr2[1];
        }));
        Properties properties = new Properties();
        properties.putAll(map);
        properties.putAll(nodeProps());
        Properties properties2 = new Properties();
        properties2.putAll(nodeProps());
        this.testHarness = new IntegrationTestHarness(testInfo, 3);
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, 15000L, properties, properties2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    public Map<String, Object> nodeProps() {
        return (Map) Stream.of(new Object[]{KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName()}, new Object[]{"confluent.topic.replica.assignor.builder.class", TenantPartitionAssignorBuilder.class.getName()}, new Object[]{KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName()}, new Object[]{AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp(), "true"}, new Object[]{KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName()}, new Object[]{"confluent.plugins.topic.policy.replication.factor", "1"}, new Object[]{KafkaConfig.AutoCreateTopicsEnableProp(), "false"}, new Object[]{"confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters"}, new Object[]{"confluent.cdc.api.keys.topic.load.timeout.ms", Long.valueOf(15000 + TimeUnit.SECONDS.toMillis(3L))}, new Object[]{"multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName()}, new Object[]{"multitenant.tenant.delete.delay", Long.valueOf(TimeUnit.SECONDS.toMillis(1L))}, new Object[]{"multitenant.tenant.delete.check.ms", Long.valueOf(TimeUnit.SECONDS.toMillis(1L))}).collect(Collectors.toMap(objArr -> {
            return objArr[0].toString();
        }, objArr2 -> {
            return objArr2[1];
        }));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAddTenant(String str) throws Exception {
        HashSet hashSet = new HashSet();
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        Assertions.assertTrue(clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
            return basePhysicalClusterMetadata.kafkaLogicalClusterIds().equals(hashSet);
        }));
        String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", 1000 + 1, logicalClusterId, Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata2 -> {
                return basePhysicalClusterMetadata2.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
        hashSet.add(logicalClusterId);
        Assertions.assertTrue(clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata2 -> {
            return basePhysicalClusterMetadata2.kafkaLogicalClusterIds().equals(hashSet);
        }), "Single Kafka tenant should be recorded");
        String logicalClusterId2 = Utils.LC_META_XYZ.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", 1000 + 1, Utils.LC_META_XYZ.logicalClusterId(), Utils.LC_META_XYZ);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata3 -> {
                return basePhysicalClusterMetadata3.metadata(logicalClusterId2) != null;
            });
        }, "Expected metadata to get consumed");
        hashSet.add(logicalClusterId2);
        Assertions.assertTrue(clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata3 -> {
            return basePhysicalClusterMetadata3.kafkaLogicalClusterIds().equals(hashSet);
        }), "Multiple Kafka tenants should be recorded");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDelTenant(String str) throws Exception {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        Assertions.assertTrue(clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
            return basePhysicalClusterMetadata.kafkaLogicalClusterIds().isEmpty();
        }));
        String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", 2000 + 1, logicalClusterId, Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata2 -> {
                return basePhysicalClusterMetadata2.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", 2000 + 2, logicalClusterId, null);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata2 -> {
                return basePhysicalClusterMetadata2.metadata(logicalClusterId) == null;
            });
        }, "lkc should be deleted");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDeactivateTenant(String str) throws Exception {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        Assertions.assertTrue(clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
            return basePhysicalClusterMetadata.kafkaLogicalClusterIds().isEmpty();
        }));
        String logicalClusterId = Utils.LC_META_DED.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", 3000 + 1, logicalClusterId, Utils.LC_META_DED);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata2 -> {
                return basePhysicalClusterMetadata2.metadata(logicalClusterId) == null;
            });
        }, "Shouldn't return deactivated tenants");
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata2 -> {
                return ((TopicBasedPhysicalClusterMetadata) basePhysicalClusterMetadata2).tenantLifecycleManager.deletedClusters().contains(logicalClusterId);
            });
        }, "Expect to have the lkc actually marked for deletion");
    }
}
