/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.TestCluster;
import io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterBrokerReplicaExclusionsResult;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

@Tag(value="integration")
public class MultiTenantKafkaTopicCreationIntegrationTest
extends AbstractMultiTenantKafkaIntegrationTest {
    private short replicationFactor = (short)3;
    private final int defaultClusterSize = 6;

    @Override
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
    }

    @Override
    protected void createPhysicalAndLogicalClusters() {
        super.createPhysicalAndLogicalClusters();
        this.awaitMetadataPropagation();
    }

    @Override
    protected void createPhysicalAndLogicalClusters(Properties brokerProperties) {
        super.createPhysicalAndLogicalClusters(brokerProperties);
        this.awaitMetadataPropagation();
    }

    @Test
    public void testTopicCreationWithExclusionAndNoRacks() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).build(), new TestCluster.BrokerBuilder(1).build(), new TestCluster.BrokerBuilder(2).build(), new TestCluster.BrokerBuilder(3).build(), new TestCluster.BrokerBuilder(4).build(), new TestCluster.BrokerBuilder(5).build());
        this.setUpRackAwareCluster(cluster);
        Set<Integer> eligibleBrokerIds = cluster.brokerIds();
        List<Integer> brokersToExclude = Arrays.asList(0, 1, 2);
        brokersToExclude.forEach(eligibleBrokerIds::remove);
        this.excludeBrokers(brokersToExclude);
        TopicDescription topicDescWithExclusion = this.createAndValidateRackAwareAssignment("exclusion_topic", 6, new HashSet<String>());
        this.validateSpannedNodes(topicDescWithExclusion, eligibleBrokerIds);
        this.removeExclusions(brokersToExclude);
        eligibleBrokerIds.addAll(brokersToExclude);
        TopicDescription topicDesc = this.createAndValidateRackAwareAssignment("no_exclusion_topic", 6, new HashSet<String>());
        this.validateSpannedNodes(topicDesc, eligibleBrokerIds);
    }

    @Test
    public void testRackAwareTopicCreation() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack("0").build(), new TestCluster.BrokerBuilder(1).setRack("0").build(), new TestCluster.BrokerBuilder(2).setRack("2").build(), new TestCluster.BrokerBuilder(3).setRack("1").build(), new TestCluster.BrokerBuilder(4).setRack("1").build(), new TestCluster.BrokerBuilder(5).setRack("2").build());
        this.setUpRackAwareCluster(cluster);
        this.createAndValidateRackAwareAssignment(6, cluster.uniqueRacks());
    }

    @Test
    public void testRackAwareTopicCreationWithOneRack() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack("0").build(), new TestCluster.BrokerBuilder(1).setRack("0").build(), new TestCluster.BrokerBuilder(2).setRack("0").build(), new TestCluster.BrokerBuilder(3).setRack("0").build(), new TestCluster.BrokerBuilder(4).setRack("0").build(), new TestCluster.BrokerBuilder(5).setRack("0").build());
        this.setUpRackAwareCluster(cluster);
        this.createAndValidateRackAwareAssignment(6, cluster.uniqueRacks());
    }

    @Test
    public void testRackAwareTopicCreationUnbalancedRacks() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack("0").build(), new TestCluster.BrokerBuilder(1).setRack("0").build(), new TestCluster.BrokerBuilder(2).setRack("2").build(), new TestCluster.BrokerBuilder(3).setRack("1").build(), new TestCluster.BrokerBuilder(4).setRack("1").build(), new TestCluster.BrokerBuilder(5).setRack("0").build());
        this.setUpRackAwareCluster(cluster);
        this.createAndValidateRackAwareAssignment(6, cluster.uniqueRacks());
    }

    @Test
    public void testRackAwareTopicCreationWithExclusion() throws ExecutionException, InterruptedException {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack("1").build(), new TestCluster.BrokerBuilder(1).setRack("1").build(), new TestCluster.BrokerBuilder(2).setRack("1").build(), new TestCluster.BrokerBuilder(3).setRack("0").build(), new TestCluster.BrokerBuilder(4).setRack("0").build(), new TestCluster.BrokerBuilder(5).setRack("2").build(), new TestCluster.BrokerBuilder(6).setRack("0").build(), new TestCluster.BrokerBuilder(7).setRack("2").build(), new TestCluster.BrokerBuilder(8).setRack("2").build());
        this.setUpRackAwareCluster(cluster);
        HashSet<String> expectedSpannedRacks = new HashSet<String>(Arrays.asList("0", "2"));
        HashSet<Integer> nonExcludedBrokers = new HashSet<Integer>(Arrays.asList(3, 4, 5, 6, 7, 8));
        List<Integer> brokersToExclude = Arrays.asList(0, 1, 2);
        this.excludeBrokers(brokersToExclude);
        TopicDescription topicDescWithExclusion = this.createAndValidateRackAwareAssignment("exclusion_topic", cluster.brokerCount(), expectedSpannedRacks);
        this.validateSpannedNodes(topicDescWithExclusion, nonExcludedBrokers);
        this.removeExclusions(brokersToExclude);
        expectedSpannedRacks.add("1");
        TopicDescription topicDesc = this.createAndValidateRackAwareAssignment("no_exclusion_topic", cluster.brokerCount(), expectedSpannedRacks);
        this.validateSpannedNodes(topicDesc, cluster.brokerIds());
    }

    private void excludeBrokers(List<Integer> brokersToExclude) throws ExecutionException, InterruptedException {
        this.alterBrokerReplicaExclusions(brokersToExclude, ExclusionOp.OpType.SET);
    }

    private void removeExclusions(List<Integer> brokersToExclude) throws ExecutionException, InterruptedException {
        this.alterBrokerReplicaExclusions(brokersToExclude, ExclusionOp.OpType.DELETE);
    }

    private void alterBrokerReplicaExclusions(List<Integer> brokersToExclude, ExclusionOp.OpType opType) throws ExecutionException, InterruptedException {
        AlterBrokerReplicaExclusionsResult.ExclusionsResult result = (AlterBrokerReplicaExclusionsResult.ExclusionsResult)this.physicalCluster.superConfluentAdmin().alterBrokerReplicaExclusions(brokersToExclude.stream().collect(Collectors.toMap(k -> k, v -> new ExclusionOp(opType)))).result().get();
        Assertions.assertTrue((boolean)result.isSuccessful(), (String)String.format("Expected %s exclusion of brokers %s to have applied successfully. Result was %s", opType, brokersToExclude, result.exclusionResultByBroker()));
    }

    private void setUpRackAwareCluster(TestCluster cluster) {
        this.setUp(cluster.brokerCount(), cluster.consecutiveRacks());
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", (Object)this.replicationFactor);
        this.createPhysicalAndLogicalClusters(brokerProps);
    }

    private TopicDescription createAndValidateRackAwareAssignment(int numPartitions, Set<String> uniqueRacks) throws InterruptedException, ExecutionException {
        return this.createAndValidateRackAwareAssignment("test_topic", numPartitions, uniqueRacks);
    }

    private TopicDescription createAndValidateRackAwareAssignment(String topicName, int numPartitions, Set<String> uniqueRacks) throws InterruptedException, ExecutionException {
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, this.replicationFactor)));
        createTopicsResult.all().get();
        DescribeTopicsResult describeTopicsResult = tenantAdminClient.describeTopics(Collections.singleton(topicName));
        Map topicDescriptions = (Map)describeTopicsResult.allTopicNames().get();
        Assertions.assertTrue((boolean)topicDescriptions.containsKey(topicName), (String)String.format("Expected fetched topics to contain topic %s but they did not - fetched topics: %s", topicName, topicDescriptions.toString()));
        TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
        for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
            this.assertRackDistribution(this.replicationFactor, partitionInfo, uniqueRacks);
        }
        return topicDescription;
    }

    private void validateSpannedNodes(TopicDescription topicDescription, Set<Integer> expectedNodesToSpan) {
        HashSet allSpannedNodes = new HashSet();
        for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
            Set partitionSpannedNodes = partitionInfo.replicas().stream().map(Node::id).collect(Collectors.toSet());
            allSpannedNodes.addAll(partitionSpannedNodes);
        }
        Assertions.assertEquals(expectedNodesToSpan, allSpannedNodes, (String)String.format("Expected topic to span broker ids %s. Topic Description: %s", expectedNodesToSpan, topicDescription));
    }

    @Test
    public void testCellAwareTopicCreationWithoutRack() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell("0").build(), new TestCluster.BrokerBuilder(1).setCell("0").build(), new TestCluster.BrokerBuilder(2).setCell("0").build(), new TestCluster.BrokerBuilder(3).setCell("1").build(), new TestCluster.BrokerBuilder(4).setCell("1").build(), new TestCluster.BrokerBuilder(5).setCell("1").build());
        this.setUpCellAwareCluster(cluster);
        this.createAndValidateCellAwareAssignment(cluster.brokerCount(), cluster);
    }

    @Test
    public void testCellAwareTopicCreationWithOneRack() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell("0").setRack("0").build(), new TestCluster.BrokerBuilder(1).setCell("0").setRack("0").build(), new TestCluster.BrokerBuilder(2).setCell("0").setRack("0").build(), new TestCluster.BrokerBuilder(3).setCell("1").setRack("0").build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack("0").build(), new TestCluster.BrokerBuilder(5).setCell("1").setRack("0").build());
        this.setUpCellAwareCluster(cluster);
        this.createAndValidateCellAwareAssignment(cluster.brokerCount(), cluster);
    }

    @Test
    public void testCellAwareTopicCreationWithThreeRack() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell("0").setRack("0").build(), new TestCluster.BrokerBuilder(1).setCell("0").setRack("1").build(), new TestCluster.BrokerBuilder(2).setCell("0").setRack("2").build(), new TestCluster.BrokerBuilder(3).setCell("1").setRack("0").build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack("1").build(), new TestCluster.BrokerBuilder(5).setCell("1").setRack("2").build());
        this.setUpCellAwareCluster(cluster);
        this.createAndValidateCellAwareAssignment(cluster.brokerCount(), cluster);
    }

    @Test
    public void testCellAwareTopicCreationWithThreeRackAlternativeOrdering() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell("1").setRack("2").build(), new TestCluster.BrokerBuilder(1).setCell("0").setRack("1").build(), new TestCluster.BrokerBuilder(2).setCell("1").setRack("0").build(), new TestCluster.BrokerBuilder(3).setCell("0").setRack("0").build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack("1").build(), new TestCluster.BrokerBuilder(5).setCell("0").setRack("2").build());
        this.setUpCellAwareCluster(cluster);
        this.createAndValidateCellAwareAssignment(cluster.brokerCount(), cluster);
    }

    @Test
    public void testCellAwareTopicCreationWithAnExcludedCell() throws Exception {
        TestCluster cluster = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell("0").setRack("0").build(), new TestCluster.BrokerBuilder(1).setCell("0").setRack("1").build(), new TestCluster.BrokerBuilder(2).setCell("0").setRack("2").build(), new TestCluster.BrokerBuilder(3).setCell("1").setRack("0").build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack("1").build(), new TestCluster.BrokerBuilder(5).setCell("1").setRack("2").build(), new TestCluster.BrokerBuilder(6).setCell("2").setRack("0").build(), new TestCluster.BrokerBuilder(7).setCell("2").setRack("1").build(), new TestCluster.BrokerBuilder(8).setCell("2").setRack("2").build());
        this.setUpCellAwareCluster(cluster);
        this.excludeBrokers(Arrays.asList(6, 7, 8));
        HashSet<String> expectedSpannedCells = new HashSet<String>(Arrays.asList("0", "1"));
        this.createAndValidateCellAwareAssignment("fully_excluded_cell", cluster.brokerCount(), cluster.cellsByBrokerId(), cluster.uniqueRacksByCellId(), expectedSpannedCells);
        this.removeExclusions(Arrays.asList(6, 7));
        this.createAndValidateCellAwareAssignment("insufficient_rf_cell", cluster.brokerCount(), cluster.cellsByBrokerId(), cluster.uniqueRacksByCellId(), expectedSpannedCells);
        this.removeExclusions(Collections.singletonList(8));
        expectedSpannedCells.add("2");
        this.createAndValidateCellAwareAssignment("healthy_cell", cluster.brokerCount() * 3, cluster.cellsByBrokerId(), cluster.uniqueRacksByCellId(), expectedSpannedCells);
    }

    private void setUpCellAwareCluster(TestCluster cluster) {
        List<String> brokerRacks = cluster.consecutiveRacks();
        List<String> brokerCells = cluster.consecutiveCells();
        int clusterSize = cluster.brokerCount();
        this.setUp(clusterSize, brokerRacks, brokerCells);
        short replicationFactor = 3;
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", (Object)replicationFactor);
        this.createPhysicalAndLogicalClusters(brokerProps);
    }

    private void createAndValidateCellAwareAssignment(int numPartitions, TestCluster cluster) throws InterruptedException, ExecutionException {
        Assertions.assertTrue((numPartitions > cluster.uniqueCells().size() ? 1 : 0) != 0, (String)"Expected to create more partitions than the number of cells");
        this.createAndValidateCellAwareAssignment("test_topic", numPartitions, cluster.cellsByBrokerId(), cluster.uniqueRacksByCellId(), cluster.uniqueCells());
    }

    private void createAndValidateCellAwareAssignment(String topicName, int numPartitions, Map<Integer, String> cellsByBrokerId, Map<String, Set<String>> uniqueRacksByCellId, Set<String> expectedSpannedCells) throws InterruptedException, ExecutionException {
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        NewTopic newTopic = new NewTopic(topicName, numPartitions, this.replicationFactor);
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(newTopic));
        createTopicsResult.all().get();
        DescribeTopicsResult describeTopicsResult = tenantAdminClient.describeTopics(Collections.singleton(topicName));
        Map topicDescriptions = (Map)describeTopicsResult.allTopicNames().get();
        Assertions.assertTrue((boolean)topicDescriptions.containsKey(topicName), (String)topicDescriptions.toString());
        TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
        HashSet<String> spannedCells = new HashSet<String>();
        for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
            String cell = this.assertCellDistribution(cellsByBrokerId, partitionInfo);
            spannedCells.add(cell);
            Set<String> expectedSpannedRacks = uniqueRacksByCellId.get(cell);
            this.assertRackDistribution(this.replicationFactor, partitionInfo, expectedSpannedRacks);
        }
        Assertions.assertEquals(expectedSpannedCells, spannedCells);
    }

    private String assertCellDistribution(Map<Integer, String> cellsByBrokerId, TopicPartitionInfo partitionInfo) {
        Set cells = partitionInfo.replicas().stream().map(replica -> (String)cellsByBrokerId.get(replica.id())).collect(Collectors.toSet());
        Assertions.assertEquals((int)1, (int)cells.size());
        return (String)cells.iterator().next();
    }

    private void assertRackDistribution(short replicationFactor, TopicPartitionInfo partitionInfo, Set<String> racks) {
        HashMap<String, Integer> partitionRacks = new HashMap<String, Integer>();
        for (Node node : partitionInfo.replicas()) {
            partitionRacks.compute(node.rack(), (rack, count) -> count == null ? 1 : count + 1);
        }
        int maxPerRack = (Integer)partitionRacks.values().stream().max(Comparator.naturalOrder()).get();
        int minPerRack = (Integer)partitionRacks.values().stream().min(Comparator.naturalOrder()).get();
        Assertions.assertEquals((int)(replicationFactor / partitionRacks.size()), (int)minPerRack, (String)((Object)partitionRacks).toString());
        Assertions.assertEquals((int)(replicationFactor % partitionRacks.size() == 0 ? minPerRack : minPerRack + 1), (int)maxPerRack, (String)((Object)partitionRacks).toString());
        Assertions.assertEquals(racks, partitionRacks.keySet().stream().filter(Objects::nonNull).collect(Collectors.toSet()));
    }

    @Test
    public void testRackUnAwareTopicCreation() throws Exception {
        int brokerCount = 6;
        this.setUp(brokerCount, Collections.emptyList());
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        brokerProps.put("confluent.plugins.topic.policy.replication.factor", "3");
        this.createPhysicalAndLogicalClusters(brokerProps);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        String topicName = "test_topic";
        short replicationFactor = 3;
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 6, replicationFactor)));
        createTopicsResult.all().get();
        HashMap<Integer, Integer> brokerToReplicaMap = new HashMap<Integer, Integer>(brokerCount);
        DescribeTopicsResult describeTopicsResult = tenantAdminClient.describeTopics(Collections.singleton(topicName));
        Map topicDescriptions = (Map)describeTopicsResult.allTopicNames().get();
        Assertions.assertTrue((boolean)topicDescriptions.containsKey(topicName), (String)topicDescriptions.toString());
        TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
        for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
            for (Node node : topicPartitionInfo.replicas()) {
                int currentCount = brokerToReplicaMap.getOrDefault(node.id(), 0);
                brokerToReplicaMap.put(node.id(), currentCount + 1);
            }
        }
        for (Map.Entry entry : brokerToReplicaMap.entrySet()) {
            int numReplicas = (Integer)entry.getValue();
            Assertions.assertEquals((int)3, (int)numReplicas, (String)("Broker " + entry.getKey() + " has " + numReplicas + " replicas. Topic description: " + topicDescription));
        }
    }

    @Test
    public void testInvalidTopicCreation() {
        CreateTopicsResult invalidPartitionsResult;
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic(".", 2, 1)));
        Throwable exception = Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)createTopicsResult.all().get();
        });
        Throwable cause = exception.getCause();
        Assertions.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assertions.assertEquals((Object)"Invalid topic name specified.", (Object)cause.getMessage());
        for (Integer invalidPartitionCount : Arrays.asList(Integer.MIN_VALUE, -10, 0)) {
            invalidPartitionsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", invalidPartitionCount.intValue(), 1)));
            exception = Assertions.assertThrows(ExecutionException.class, () -> {
                Void cfr_ignored_0 = (Void)invalidPartitionsResult.all().get();
            });
            cause = exception.getCause();
            Assertions.assertEquals(cause.getClass(), InvalidPartitionsException.class);
        }
        for (Short invalidRf : Arrays.asList((short)Short.MIN_VALUE, (short)-10, (short)0, (short)3)) {
            invalidPartitionsResult = tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", 3, invalidRf.shortValue())));
            exception = Assertions.assertThrows(ExecutionException.class, () -> {
                Void cfr_ignored_0 = (Void)invalidPartitionsResult.all().get();
            });
            cause = exception.getCause();
            Assertions.assertEquals(cause.getClass(), InvalidReplicationFactorException.class);
        }
    }

    @Test
    public void testTopicCreationWithMaxNumPartitions() throws ExecutionException, InterruptedException {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        String belowMaxTopic = "belowMaxTopic";
        String atMaxTopic = "atMaxTopic";
        String aboveMaxTopic = "aboveMaxTopic";
        CreateTopicsResult result = tenantAdminClient.createTopics(Arrays.asList(this.newTopic(belowMaxTopic, 99), this.newTopic(atMaxTopic, 100), this.newTopic(aboveMaxTopic, 101)));
        ((KafkaFuture)result.values().get(belowMaxTopic)).get();
        ((KafkaFuture)result.values().get(atMaxTopic)).get();
        ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)((KafkaFuture)result.values().get(aboveMaxTopic)).get();
        });
        Throwable cause = exception.getCause();
        Assertions.assertEquals(cause.getClass(), InvalidRequestException.class);
        Assertions.assertTrue((boolean)cause.getMessage().contains(Integer.toString(100)), (String)String.format("Expected the error message to contain the max partition count - instead, it was: %s", cause.getMessage()));
    }

    @Test
    public void testPartitionCreationWithMaxNumPartitions() throws ExecutionException, InterruptedException {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        int currentNumPartitions = 5;
        tenantAdminClient.createTopics(Arrays.asList(this.newTopic("topic", currentNumPartitions), this.newTopic("topic2", currentNumPartitions))).all().get();
        HashMap<String, NewPartitions> createPartitionsMap = new HashMap<String, NewPartitions>();
        createPartitionsMap.put("topic", NewPartitions.increaseTo((int)(currentNumPartitions + 100)));
        createPartitionsMap.put("topic2", NewPartitions.increaseTo((int)100));
        CreatePartitionsResult createPartitionsResult = tenantAdminClient.createPartitions(createPartitionsMap);
        ((KafkaFuture)createPartitionsResult.values().get("topic2")).get();
        ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)createPartitionsResult.all().get();
        });
        Throwable cause = exception.getCause();
        Assertions.assertEquals(cause.getClass(), InvalidRequestException.class);
        Assertions.assertTrue((boolean)cause.getMessage().contains(Integer.toString(100)), (String)String.format("Expected the error message to contain the max partition count - instead, it was: %s", cause.getMessage()));
    }

    @Test
    public void testPartitionCreationWithInvalidNumPartitions() throws ExecutionException, InterruptedException {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        int currentNumPartitions = 5;
        tenantAdminClient.createTopics(Collections.singletonList(this.newTopic("topic", currentNumPartitions))).all().get();
        for (Integer invalidNumPartitions : Arrays.asList(currentNumPartitions, currentNumPartitions - 1, 0, -10, Integer.MIN_VALUE)) {
            CreatePartitionsResult createPartitionsResult = tenantAdminClient.createPartitions(Collections.singletonMap("topic", NewPartitions.increaseTo((int)invalidNumPartitions)));
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
                Void cfr_ignored_0 = (Void)createPartitionsResult.all().get();
            });
            Throwable cause = exception.getCause();
            Assertions.assertEquals(cause.getClass(), InvalidPartitionsException.class);
        }
    }

    @Test
    public void testInvalidTopicCreationWithAutoTopicCreation() throws Throwable {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
        this.createPhysicalAndLogicalClusters(brokerProps);
        HashSet<AclBinding> acls = new HashSet<AclBinding>();
        acls.add(new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.CREATE, AclPermissionType.ALLOW)));
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        tenantAdminClient.createAcls(acls).all().get();
        Assertions.assertEquals(acls, this.describeAllAcls(tenantAdminClient));
        try (KafkaProducer<String, String> producer = this.testHarness.createProducer(this.logicalCluster1.user(11), SecurityProtocol.SASL_PLAINTEXT);){
            Assertions.assertThrows(AuthorizationException.class, () -> KafkaTestUtils.sendRecords(producer, ".", 0, 10));
        }
    }

    @Test
    public void testCreateTopicPolicyMaxPartitionPerTenantIsDynamicallyUpdated() throws Exception {
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        Collection<NewTopic> topics = this.newTopics(512);
        tenantAdminClient.createTopics(topics).all().get();
        Set topicNames = topics.stream().map(NewTopic::name).collect(Collectors.toSet());
        TestUtils.waitForCondition(() -> ((Set)tenantAdminClient.listTopics().names().get()).containsAll(topicNames), (String)String.format("Could not list topics %s in time", topicNames));
        CreateTopicsResult createTopicsResult = tenantAdminClient.createTopics(Collections.singletonList(this.newTopic("over-limit", 1)));
        TestUtils.assertFutureThrows((Future)createTopicsResult.all(), PolicyViolationException.class, (String)String.format("You may not create more than 0 new partitions. Adding the requested number of partitions will exceed %d total partitions. Currently, there are %d total topic partitions", 512, 512));
        internalAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.topic.policy.max.partitions.per.tenant", String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Void cfr_ignored_0 = (Void)tenantAdminClient.createTopics(Collections.singletonList(this.newTopic("over-limit", 1))).all().get();
        });
        this.physicalCluster.kafkaCluster().shutdownBrokers();
        this.physicalCluster.kafkaCluster().startBrokersAfterShutdown();
        AdminClient tenantAdmin = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Void cfr_ignored_0 = (Void)tenantAdmin.createTopics(Collections.singletonList(this.newTopic("over-limit-again", 1))).all().get();
        });
    }

    private Collection<NewTopic> newTopics(int numTotalPartitions) {
        ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
        int i = 0;
        while (numTotalPartitions > 0) {
            int partitionsToCreate = Math.min(numTotalPartitions, 100);
            numTotalPartitions -= partitionsToCreate;
            topics.add(this.newTopic("test-" + i, partitionsToCreate));
            ++i;
        }
        return topics;
    }

    private NewTopic newTopic(String name, int numPartitions) {
        return new NewTopic(name, numPartitions, 1);
    }
}

