package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantKafkaIntegrationTest.class */
public class MultiTenantKafkaIntegrationTest {
    private static final int BROKER_COUNT = 2;
    private IntegrationTestHarness testHarness;
    private LogicalCluster logicalCluster1;
    private LogicalCluster logicalCluster2;
    private PhysicalCluster physicalCluster;
    private final ListenerName externalListenerName = new ListenerName("external");
    private final String externalListenerPrefix = this.externalListenerName.configPrefix();

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    public void setUp() {
        setUp(BROKER_COUNT, Collections.emptyList());
    }

    public void setUp(int i, List<String> list) {
        setUp(i, list, Collections.emptyList());
    }

    public void setUp(int i, List<String> list, List<String> list2) {
        this.testHarness = new IntegrationTestHarness(i, list, list2);
    }

    private void createPhysicalAndLogicalClusters() {
        createPhysicalAndLogicalClusters(brokerProps());
    }

    private void createPhysicalAndLogicalClusters(Properties properties) {
        this.physicalCluster = this.testHarness.start(properties);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("tenantA", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("tenantB", 200, 9, 21, 22);
    }

    @After
    public void tearDown() {
        this.testHarness.shutdown();
    }

    private Properties brokerProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put("confluent.max.acls.per.tenant", "100");
        properties.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp(), "true");
        properties.put("multitenant.metadata.class", "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put("multitenant.metadata.dir", this.tempFolder.getRoot().getAbsolutePath());
        properties.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        properties.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        properties.put("confluent.plugins.topic.policy.replication.factor", "1");
        properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        return properties;
    }

    @Test
    public void testMultiTenantMetadataInstances() {
        setUp();
        createPhysicalAndLogicalClusters();
        List list = (List) this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> {
            Object obj = kafkaServer.config().values().get(KafkaConfig.BrokerSessionUuidProp());
            return obj == null ? "" : obj.toString();
        }).distinct().collect(Collectors.toList());
        Assert.assertEquals("Expect each broker to have unique session UUID.", 2L, list.size());
        list.forEach(str -> {
            Assert.assertNotNull("Expect valid instance of PhysicalClusterMetadata for broker session UUID " + str, PhysicalClusterMetadata.getInstance(str));
        });
    }

    @Test
    public void testProduceConsume() throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0);
        this.testHarness.produceConsume(this.logicalCluster2.user(21), this.logicalCluster2.user(22), "testtopic", "group1", 1000);
    }

    @Test
    public void testInvalidTopicCreation() {
        setUp();
        createPhysicalAndLogicalClusters();
        CreateTopicsResult createTopics = this.testHarness.createAdminClient(this.logicalCluster1.adminUser()).createTopics(Collections.singletonList(new NewTopic(".", BROKER_COUNT, (short) 1)));
        Throwable cause = Assert.assertThrows(ExecutionException.class, () -> {
        }).getCause();
        Assert.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assert.assertEquals("Invalid topic name specified.", cause.getMessage());
    }

    @Test
    public void testTopicCreationWithNoRacks() throws Exception {
        validateRackAwareAssignment(Collections.emptyList());
    }

    @Test
    public void testRackAwareTopicCreation() throws Exception {
        validateRackAwareAssignment(Arrays.asList("0", "0", "2", "1", "1", "2"));
    }

    @Test
    public void testRackAwareTopicCreationWithOneRack() throws Exception {
        validateRackAwareAssignment(Arrays.asList("0", "0", "0", "0", "0", "0"));
    }

    @Test
    public void testRackAwareTopicCreationUnbalancedRacks() throws Exception {
        validateRackAwareAssignment(Arrays.asList("0", "0", "2", "1", "1", "0"));
    }

    private void validateRackAwareAssignment(List<String> list) throws InterruptedException, ExecutionException {
        setUp(6, list);
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", (short) 3);
        createPhysicalAndLogicalClusters(brokerProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", 6, (short) 3))).all().get();
        Map map = (Map) createAdminClient.describeTopics(Collections.singleton("test_topic")).all().get();
        Assert.assertTrue(map.toString(), map.containsKey("test_topic"));
        Iterator it = ((TopicDescription) map.get("test_topic")).partitions().iterator();
        while (it.hasNext()) {
            assertRackDistribution((short) 3, (TopicPartitionInfo) it.next());
        }
    }

    @Test
    public void testCellAwareTopicCreationWithoutRack() throws Exception {
        validateCellAwareAssignment(Collections.emptyList(), Arrays.asList("0", "0", "0", "1", "1", "1"));
    }

    @Test
    public void testCellAwareTopicCreationWithOneRack() throws Exception {
        validateCellAwareAssignment(Arrays.asList("0", "0", "0", "0", "0", "0"), Arrays.asList("0", "0", "0", "1", "1", "1"));
    }

    @Test
    public void testCellAwareTopicCreationWithThreeRack() throws Exception {
        validateCellAwareAssignment(Arrays.asList("0", "1", "2", "0", "1", "2"), Arrays.asList("0", "0", "0", "1", "1", "1"));
    }

    @Test
    public void testCellAwareTopicCreationWithThreeRackRandomlyDistributed() throws Exception {
        validateCellAwareAssignment(shuffle(Arrays.asList("0", "1", "2", "0", "1", "2")), shuffle(Arrays.asList("0", "0", "0", "1", "1", "1")));
    }

    private <T> List<T> shuffle(List<T> list) {
        Collections.shuffle(list);
        return list;
    }

    private void validateCellAwareAssignment(List<String> list, List<String> list2) throws InterruptedException, ExecutionException {
        setUp(6, list, list2);
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", (short) 3);
        createPhysicalAndLogicalClusters(brokerProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", 6, (short) 3))).all().get();
        Map map = (Map) createAdminClient.describeTopics(Collections.singleton("test_topic")).all().get();
        Assert.assertTrue(map.toString(), map.containsKey("test_topic"));
        for (TopicPartitionInfo topicPartitionInfo : ((TopicDescription) map.get("test_topic")).partitions()) {
            assertCellDistribution(list2, topicPartitionInfo);
            assertRackDistribution((short) 3, topicPartitionInfo);
        }
    }

    private void assertCellDistribution(List<String> list, TopicPartitionInfo topicPartitionInfo) {
        Assert.assertEquals(1L, ((Set) topicPartitionInfo.replicas().stream().map(node -> {
            return (String) list.get(node.id());
        }).collect(Collectors.toSet())).size());
    }

    private void assertRackDistribution(short s, TopicPartitionInfo topicPartitionInfo) {
        HashMap hashMap = new HashMap();
        Iterator it = topicPartitionInfo.replicas().iterator();
        while (it.hasNext()) {
            hashMap.compute(((Node) it.next()).rack(), (str, num) -> {
                return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
            });
        }
        int intValue = ((Integer) hashMap.values().stream().max(Comparator.naturalOrder()).get()).intValue();
        int intValue2 = ((Integer) hashMap.values().stream().min(Comparator.naturalOrder()).get()).intValue();
        Assert.assertEquals(hashMap.toString(), s / hashMap.size(), intValue2);
        Assert.assertEquals(hashMap.toString(), s % hashMap.size() == 0 ? intValue2 : intValue2 + 1, intValue);
    }

    @Test
    public void testRackUnAwareTopicCreation() throws Exception {
        setUp(6, Collections.emptyList());
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", "3");
        createPhysicalAndLogicalClusters(brokerProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", 6, (short) 3))).all().get();
        HashMap hashMap = new HashMap(6);
        Map map = (Map) createAdminClient.describeTopics(Collections.singleton("test_topic")).all().get();
        Assert.assertTrue(map.toString(), map.containsKey("test_topic"));
        TopicDescription topicDescription = (TopicDescription) map.get("test_topic");
        Iterator it = topicDescription.partitions().iterator();
        while (it.hasNext()) {
            for (Node node : ((TopicPartitionInfo) it.next()).replicas()) {
                hashMap.put(Integer.valueOf(node.id()), Integer.valueOf(((Integer) hashMap.getOrDefault(Integer.valueOf(node.id()), 0)).intValue() + 1));
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getValue()).intValue();
            Assert.assertEquals("Broker " + entry.getKey() + " has " + intValue + " replicas. Topic description: " + topicDescription, 3L, intValue);
        }
    }

    @Test
    public void testInvalidTopicCreationWithAutoTopicCreation() throws Throwable {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
        createPhysicalAndLogicalClusters(brokerProps);
        HashSet hashSet = new HashSet();
        hashSet.add(new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.CREATE, AclPermissionType.ALLOW)));
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createAcls(hashSet).all().get();
        Assert.assertEquals(hashSet, describeAllAcls(createAdminClient));
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(this.logicalCluster1.user(11), SecurityProtocol.SASL_PLAINTEXT);
        Throwable th = null;
        try {
            try {
                Assert.assertThrows(AuthorizationException.class, () -> {
                    KafkaTestUtils.sendRecords(createProducer, ".", 0, 10);
                });
                if (createProducer != null) {
                    if (0 == 0) {
                        createProducer.close();
                        return;
                    }
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createProducer != null) {
                if (th != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAlterBrokerConfigsWhenConfigDisabled() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        expectAlterBrokerConfigsViaExternalListenerRejected(this.testHarness.createAdminClient(this.logicalCluster1.adminUser()), this.physicalCluster.superAdminClient(), new ConfigResource(ConfigResource.Type.BROKER, "0"));
    }

    @Test
    public void testAlterClusterConfigsWhenConfigDisabled() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        expectAlterBrokerConfigsViaExternalListenerRejected(this.testHarness.createAdminClient(this.logicalCluster1.adminUser()), this.physicalCluster.superAdminClient(), new ConfigResource(ConfigResource.Type.BROKER, ""));
    }

    private void expectAlterBrokerConfigsViaExternalListenerRejected(AdminClient adminClient, AdminClient adminClient2, ConfigResource configResource) throws Exception {
        KafkaServer kafkaServer = this.physicalCluster.kafkaCluster().kafkas().get(0).kafkaServer();
        List list = kafkaServer.config().getList("ssl.cipher.suites");
        int intValue = kafkaServer.config().messageMaxBytes().intValue();
        Map singletonMap = Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "10000"), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry("ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"))));
        Assert.assertEquals(PolicyViolationException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assert.assertEquals(intValue, kafkaServer.config().messageMaxBytes().intValue());
        Assert.assertEquals(false, kafkaServer.config().autoCreateTopicsEnable());
        Assert.assertEquals(list, kafkaServer.config().get("ssl.cipher.suites"));
        Map singletonMap2 = Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "15000"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET)));
        Assert.assertEquals(PolicyViolationException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assert.assertEquals(intValue, kafkaServer.config().messageMaxBytes().intValue());
        Assert.assertEquals(false, kafkaServer.config().autoCreateTopicsEnable());
        Assert.assertEquals(list, kafkaServer.config().get("ssl.cipher.suites"));
        adminClient2.alterConfigs(Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "10000"), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(this.externalListenerPrefix + "ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"))))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().messageMaxBytes().intValue() == 10000;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256".equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config not updated");
        adminClient2.incrementalAlterConfigs(Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "15000"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(this.externalListenerPrefix + "ssl.cipher.suites", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().messageMaxBytes().intValue() == 15000;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384".equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config not updated");
        Map singletonMap3 = Collections.singletonMap(configResource, new Config(Collections.singleton(new ConfigEntry(KafkaConfig.BrokerIdProp(), "20"))));
        Assert.assertEquals(InvalidRequestException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assert.assertEquals(0L, kafkaServer.config().brokerId());
    }

    @Test
    public void testAlterBrokerConfigsWhenConfigEnabled() throws Exception {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.alter.cluster.configs.enable", "true");
        createPhysicalAndLogicalClusters(brokerProps);
        expectAlterBrokerConfigsViaExternalListenerRejected(this.testHarness.createAdminClient(this.logicalCluster1.adminUser()), this.physicalCluster.superAdminClient(), new ConfigResource(ConfigResource.Type.BROKER, "0"));
    }

    @Test
    public void testAlterClusterConfigsWhenConfigEnabled() throws Exception {
        setUp();
        Properties brokerProps = brokerProps();
        brokerProps.put("confluent.alter.cluster.configs.enable", "true");
        createPhysicalAndLogicalClusters(brokerProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
        String str = "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256";
        KafkaServer kafkaServer = this.physicalCluster.kafkaCluster().kafkas().get(0).kafkaServer();
        long j = 3600000;
        createAdminClient.alterConfigs(Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600000L)), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"), new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483646L)))))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        String str2 = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
        long j2 = 3600001;
        int i = BROKER_COUNT;
        createAdminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600001L)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.NumPartitionsProp(), String.valueOf(BROKER_COUNT)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483647L)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j2;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().numPartitions().intValue() == i;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return !kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str2.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        superAdminClient.alterConfigs(Collections.singletonMap(configResource, new Config(Arrays.asList(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600000L)), new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "true"), new ConfigEntry(this.externalListenerPrefix + KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"), new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483646L)))))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, Arrays.asList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(3600001L)), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.AutoCreateTopicsEnableProp(), "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(this.externalListenerPrefix + KafkaConfig.SslCipherSuitesProp(), "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(KafkaConfig.LogCleanerMaxCompactionLagMsProp(), String.valueOf(2147483647L)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return kafkaServer.config().logRetentionTimeMillis() == j2;
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return !kafkaServer.config().autoCreateTopicsEnable().booleanValue();
        }, "Dynamic config not updated");
        TestUtils.waitForCondition(() -> {
            return str2.equals(sslCipherSuitesFromConfig(kafkaServer.config(), this.externalListenerName));
        }, "Dynamic config for ssl-cipher-suites not updated.");
        Map singletonMap = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp(), String.valueOf(2599999L)), AlterConfigOp.OpType.SET)));
        Assert.assertEquals(PolicyViolationException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assert.assertEquals(3600001L, kafkaServer.config().logRetentionTimeMillis());
        int intValue = kafkaServer.config().messageMaxBytes().intValue();
        Map singletonMap2 = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.MessageMaxBytesProp(), "500000"), AlterConfigOp.OpType.SET)));
        Assert.assertEquals(PolicyViolationException.class, ((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause().getClass());
        Assert.assertEquals(intValue, kafkaServer.config().messageMaxBytes().intValue());
    }

    private String sslCipherSuitesFromConfig(KafkaConfig kafkaConfig, ListenerName listenerName) {
        return (String) kafkaConfig.originals().get(listenerName.configPrefix() + KafkaConfig.SslCipherSuitesProp());
    }

    @Test
    public void testAcls() throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.logicalCluster2.adminUser());
        Assert.assertEquals(Collections.emptySet(), describeAllAcls(createAdminClient));
        List asList = Arrays.asList(ResourceType.TOPIC, ResourceType.GROUP, ResourceType.TRANSACTIONAL_ID);
        HashSet hashSet = new HashSet();
        asList.forEach(resourceType -> {
            hashSet.add(new AclBinding(new ResourcePattern(resourceType, "test.resource", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)));
        });
        asList.forEach(resourceType2 -> {
            hashSet.add(new AclBinding(new ResourcePattern(resourceType2, "test.", PatternType.PREFIXED), new AccessControlEntry(this.logicalCluster1.user(12).unprefixedKafkaPrincipal().toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)));
        });
        asList.forEach(resourceType3 -> {
            hashSet.add(new AclBinding(new ResourcePattern(resourceType3, "*", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
        });
        createAdminClient.createAcls(hashSet).all().get();
        Assert.assertEquals(hashSet, describeAllAcls(createAdminClient));
        Assert.assertEquals(Collections.emptySet(), describeAllAcls(createAdminClient2));
        createAdminClient2.createAcls(hashSet).all().get();
        Assert.assertEquals(hashSet, describeAllAcls(createAdminClient2));
        createAdminClient2.deleteAcls(Collections.singletonList(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, "test", PatternType.PREFIXED), new AccessControlEntryFilter("User:*", "*", AclOperation.ANY, AclPermissionType.ANY)))).all().get();
    }

    private Set<AclBinding> describeAllAcls(AdminClient adminClient) throws Exception {
        return new HashSet((Collection) adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, (String) null, PatternType.ANY), new AccessControlEntryFilter((String) null, (String) null, AclOperation.ANY, AclPermissionType.ANY))).values().get());
    }

    @Test
    public void testCreateTopicPolicyMaxPartitionPerTenantIsDynamicallyUpdated() throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        TestUtils.assertFutureThrows(createAdminClient.createTopics(Collections.singletonList(new NewTopic("test", 513, (short) 1))).all(), PolicyViolationException.class, String.format("You may not create more than the maximum number of partitions (%d).", 512));
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.topic.policy.max.partitions.per.tenant", String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
        });
    }
}
