package io.confluent.kafka.multitenant.integration.test;

import com.sun.jna.platform.win32.COM.tlb.imp.TlbConst;
import io.confluent.kafka.multitenant.integration.cluster.TestCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterBrokerReplicaExclusionsResult;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionPlacementStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ResourceNotFoundException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantKafkaTopicCreationIntegrationTest.class */
public class MultiTenantKafkaTopicCreationIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private short replicationFactor = 3;
    private final int defaultClusterSize = 6;

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters() {
        super.createPhysicalAndLogicalClusters();
        awaitMetadataPropagation();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        super.createPhysicalAndLogicalClusters(properties);
        awaitMetadataPropagation();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testTopicCreationWithExclusionAndNoRacks(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).build(), new TestCluster.BrokerBuilder(1).build(), new TestCluster.BrokerBuilder(2).build(), new TestCluster.BrokerBuilder(3).build(), new TestCluster.BrokerBuilder(4).build(), new TestCluster.BrokerBuilder(5).build());
        setUpRackAwareCluster(create);
        Set<Integer> brokerIds = create.brokerIds();
        List<Integer> asList = Arrays.asList(0, 1, 2);
        brokerIds.getClass();
        asList.forEach((v1) -> {
            r1.remove(v1);
        });
        excludeBrokers(asList);
        validateSpannedNodes(createAndValidateRackAwareAssignment("exclusion_topic", 6, new HashSet()), brokerIds);
        removeExclusions(asList);
        brokerIds.addAll(asList);
        validateSpannedNodes(createAndValidateRackAwareAssignment("no_exclusion_topic", 6, new HashSet()), brokerIds);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRackAwareTopicCreation(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(2).setRack("2").build(), new TestCluster.BrokerBuilder(3).setRack("1").build(), new TestCluster.BrokerBuilder(4).setRack("1").build(), new TestCluster.BrokerBuilder(5).setRack("2").build());
        setUpRackAwareCluster(create);
        createAndValidateRackAwareAssignment(6, create.uniqueRacks());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRackAwareTopicCreationWithOneRack(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(2).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(3).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(4).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(5).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build());
        setUpRackAwareCluster(create);
        createAndValidateRackAwareAssignment(6, create.uniqueRacks());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRackAwareTopicCreationUnbalancedRacks(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(2).setRack("2").build(), new TestCluster.BrokerBuilder(3).setRack("1").build(), new TestCluster.BrokerBuilder(4).setRack("1").build(), new TestCluster.BrokerBuilder(5).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build());
        setUpRackAwareCluster(create);
        createAndValidateRackAwareAssignment(6, create.uniqueRacks());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRackAwareTopicCreationWithExclusion(String str) throws ExecutionException, InterruptedException {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setRack("1").build(), new TestCluster.BrokerBuilder(1).setRack("1").build(), new TestCluster.BrokerBuilder(2).setRack("1").build(), new TestCluster.BrokerBuilder(3).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(4).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(5).setRack("2").build(), new TestCluster.BrokerBuilder(6).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(7).setRack("2").build(), new TestCluster.BrokerBuilder(8).setRack("2").build());
        setUpRackAwareCluster(create);
        HashSet hashSet = new HashSet(Arrays.asList(TlbConst.TYPELIB_MINOR_VERSION_SHELL, "2"));
        HashSet hashSet2 = new HashSet(Arrays.asList(3, 4, 5, 6, 7, 8));
        List<Integer> asList = Arrays.asList(0, 1, 2);
        excludeBrokers(asList);
        validateSpannedNodes(createAndValidateRackAwareAssignment("exclusion_topic", create.brokerCount(), hashSet), hashSet2);
        removeExclusions(asList);
        hashSet.add("1");
        validateSpannedNodes(createAndValidateRackAwareAssignment("no_exclusion_topic", create.brokerCount(), hashSet), create.brokerIds());
    }

    private void excludeBrokers(List<Integer> list) throws ExecutionException, InterruptedException {
        alterBrokerReplicaExclusions(list, ExclusionOp.OpType.SET);
    }

    private void removeExclusions(List<Integer> list) throws ExecutionException, InterruptedException {
        alterBrokerReplicaExclusions(list, ExclusionOp.OpType.DELETE);
    }

    private void alterBrokerReplicaExclusions(List<Integer> list, ExclusionOp.OpType opType) throws ExecutionException, InterruptedException {
        AlterBrokerReplicaExclusionsResult.ExclusionsResult exclusionsResult = this.physicalCluster.superConfluentAdmin().alterBrokerReplicaExclusions((Map) list.stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return new ExclusionOp(opType);
        }))).result().get();
        Assertions.assertTrue(exclusionsResult.isSuccessful(), String.format("Expected %s exclusion of brokers %s to have applied successfully. Result was %s", opType, list, exclusionsResult.exclusionResultByBroker()));
    }

    private void setUpRackAwareCluster(TestCluster testCluster) {
        setUp(testCluster.brokerCount(), testCluster.consecutiveRacks());
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", Short.valueOf(this.replicationFactor));
        createPhysicalAndLogicalClusters(nodeProps);
    }

    private TopicDescription createAndValidateRackAwareAssignment(int i, Set<String> set) throws InterruptedException, ExecutionException {
        return createAndValidateRackAwareAssignment("test_topic", i, set);
    }

    private TopicDescription createAndValidateRackAwareAssignment(String str, int i, Set<String> set) throws InterruptedException, ExecutionException {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Collections.singletonList(new NewTopic(str, i, this.replicationFactor))).all().get();
        TopicDescription describeTopic = KafkaTestUtils.describeTopic(createAdminClient, str);
        Iterator<TopicPartitionInfo> it = describeTopic.partitions().iterator();
        while (it.hasNext()) {
            assertRackDistribution(this.replicationFactor, it.next(), set);
        }
        return describeTopic;
    }

    private void validateSpannedNodes(TopicDescription topicDescription, Set<Integer> set) {
        HashSet hashSet = new HashSet();
        Iterator<TopicPartitionInfo> it = topicDescription.partitions().iterator();
        while (it.hasNext()) {
            hashSet.addAll((Set) it.next().replicas().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet()));
        }
        Assertions.assertEquals(set, hashSet, String.format("Expected topic to span broker ids %s. Topic Description: %s", set, topicDescription));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCellAwareTopicCreationWithoutRack(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(2).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(3).setCell("1").build(), new TestCluster.BrokerBuilder(4).setCell("1").build(), new TestCluster.BrokerBuilder(5).setCell("1").build());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.PARTITION_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 3);
        setUpCellAwareCluster(create, nodeProps);
        createAndValidateCellAwareAssignment(create.brokerCount(), create);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCellAwareTopicCreationWithOneRack(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(2).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(3).setCell("1").setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(5).setCell("1").setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.PARTITION_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 3);
        setUpCellAwareCluster(create, nodeProps);
        createAndValidateCellAwareAssignment(create.brokerCount(), create);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCellAwareTopicCreationWithThreeRack(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack("1").build(), new TestCluster.BrokerBuilder(2).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack("2").build(), new TestCluster.BrokerBuilder(3).setCell("1").setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack("1").build(), new TestCluster.BrokerBuilder(5).setCell("1").setRack("2").build());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.PARTITION_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 3);
        setUpCellAwareCluster(create, nodeProps);
        createAndValidateCellAwareAssignment(create.brokerCount(), create);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCellAwareTopicCreationWithAnExcludedCell(String str) throws Exception {
        TestCluster create = TestCluster.create(new TestCluster.BrokerBuilder(0).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(1).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack("1").build(), new TestCluster.BrokerBuilder(2).setCell(TlbConst.TYPELIB_MINOR_VERSION_SHELL).setRack("2").build(), new TestCluster.BrokerBuilder(3).setCell("1").setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(4).setCell("1").setRack("1").build(), new TestCluster.BrokerBuilder(5).setCell("1").setRack("2").build(), new TestCluster.BrokerBuilder(6).setCell("2").setRack(TlbConst.TYPELIB_MINOR_VERSION_SHELL).build(), new TestCluster.BrokerBuilder(7).setCell("2").setRack("1").build(), new TestCluster.BrokerBuilder(8).setCell("2").setRack("2").build());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.PARTITION_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 3);
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 3);
        setUpCellAwareCluster(create, nodeProps);
        excludeBrokers(Arrays.asList(6, 7, 8));
        HashSet hashSet = new HashSet(Arrays.asList(TlbConst.TYPELIB_MINOR_VERSION_SHELL, "1"));
        createAndValidateCellAwareAssignment("fully_excluded_cell", create.brokerCount(), create.cellsByBrokerId(), create.uniqueRacksByCellId(), hashSet);
        removeExclusions(Arrays.asList(6, 7));
        createAndValidateCellAwareAssignment("insufficient_rf_cell", create.brokerCount(), create.cellsByBrokerId(), create.uniqueRacksByCellId(), hashSet);
        removeExclusions(Collections.singletonList(8));
        hashSet.add("2");
        createAndValidateCellAwareAssignment("healthy_cell", create.brokerCount() * 3, create.cellsByBrokerId(), create.uniqueRacksByCellId(), hashSet);
    }

    private void setUpCellAwareCluster(TestCluster testCluster, Properties properties) {
        setUp(testCluster.brokerCount(), testCluster.consecutiveRacks(), testCluster.consecutiveCells());
        properties.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        properties.put("confluent.plugins.topic.policy.replication.factor", (short) 3);
        createPhysicalAndLogicalClusters(properties);
    }

    private void createAndValidateCellAwareAssignment(int i, TestCluster testCluster) throws InterruptedException, ExecutionException {
        Assertions.assertTrue(i > testCluster.uniqueCells().size(), "Expected to create more partitions than the number of cells");
        createAndValidateCellAwareAssignment("test_topic", i, testCluster.cellsByBrokerId(), testCluster.uniqueRacksByCellId(), testCluster.uniqueCells());
    }

    private void createAndValidateCellAwareAssignment(String str, int i, Map<Integer, String> map, Map<String, Set<String>> map2, Set<String> set) throws InterruptedException, ExecutionException {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.user(9));
        createAdminClient.createTopics(Collections.singletonList(new NewTopic(str, i, this.replicationFactor))).all().get();
        TopicDescription describeTopic = KafkaTestUtils.describeTopic(createAdminClient, str);
        HashSet hashSet = new HashSet();
        for (TopicPartitionInfo topicPartitionInfo : describeTopic.partitions()) {
            String assertCellDistribution = assertCellDistribution(map, topicPartitionInfo);
            hashSet.add(assertCellDistribution);
            assertRackDistribution(this.replicationFactor, topicPartitionInfo, map2.get(assertCellDistribution));
        }
        Assertions.assertEquals(set, hashSet);
    }

    private String assertCellDistribution(Map<Integer, String> map, TopicPartitionInfo topicPartitionInfo) {
        Set set = (Set) topicPartitionInfo.replicas().stream().map(node -> {
            return (String) map.get(Integer.valueOf(node.id()));
        }).collect(Collectors.toSet());
        Assertions.assertEquals(1, set.size());
        return (String) set.iterator().next();
    }

    private void assertRackDistribution(short s, TopicPartitionInfo topicPartitionInfo, Set<String> set) {
        HashMap hashMap = new HashMap();
        Iterator<Node> it = topicPartitionInfo.replicas().iterator();
        while (it.hasNext()) {
            hashMap.compute(it.next().rack(), (str, num) -> {
                return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
            });
        }
        int intValue = ((Integer) hashMap.values().stream().max(Comparator.naturalOrder()).get()).intValue();
        int intValue2 = ((Integer) hashMap.values().stream().min(Comparator.naturalOrder()).get()).intValue();
        Assertions.assertEquals(s / hashMap.size(), intValue2, hashMap.toString());
        Assertions.assertEquals(s % hashMap.size() == 0 ? intValue2 : intValue2 + 1, intValue, hashMap.toString());
        Assertions.assertEquals(set, hashMap.keySet().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet()));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testRackUnAwareTopicCreation(String str) throws Exception {
        setUp(6, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig$.MODULE$.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", ConfluentConfigs.AUDIT_LOGGER_REPLICATION_FACTOR_DEFAULT);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", 6, (short) 3))).all().get();
        HashMap hashMap = new HashMap(6);
        TopicDescription describeTopic = KafkaTestUtils.describeTopic(createAdminClient, "test_topic");
        Iterator<TopicPartitionInfo> it = describeTopic.partitions().iterator();
        while (it.hasNext()) {
            for (Node node : it.next().replicas()) {
                hashMap.put(Integer.valueOf(node.id()), Integer.valueOf(((Integer) hashMap.getOrDefault(Integer.valueOf(node.id()), 0)).intValue() + 1));
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getValue()).intValue();
            Assertions.assertEquals(3, intValue, "Broker " + entry.getKey() + " has " + intValue + " replicas. Topic description: " + describeTopic);
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidTopicCreation(String str) {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        CreateTopicsResult createTopics = createAdminClient.createTopics(Collections.singletonList(new NewTopic(".", 2, (short) 1)));
        Throwable cause = Assertions.assertThrows(ExecutionException.class, () -> {
            createTopics.all().get();
        }).getCause();
        Assertions.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assertions.assertEquals("Invalid topic name specified.", cause.getMessage());
        Iterator it = Arrays.asList(Integer.MIN_VALUE, -10, 0).iterator();
        while (it.hasNext()) {
            CreateTopicsResult createTopics2 = createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", ((Integer) it.next()).intValue(), (short) 1)));
            Assertions.assertEquals(Assertions.assertThrows(ExecutionException.class, () -> {
                createTopics2.all().get();
            }).getCause().getClass(), InvalidPartitionsException.class);
        }
        Iterator it2 = Arrays.asList(Short.MIN_VALUE, (short) -10, (short) 0, (short) 3).iterator();
        while (it2.hasNext()) {
            CreateTopicsResult createTopics3 = createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", 3, ((Short) it2.next()).shortValue())));
            Assertions.assertEquals(Assertions.assertThrows(ExecutionException.class, () -> {
                createTopics3.all().get();
            }).getCause().getClass(), InvalidReplicationFactorException.class);
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testTopicCreationWithMaxNumPartitionsPerCluster(String str) throws ExecutionException, InterruptedException {
        setUp();
        createPhysicalAndLogicalClusters(Collections.singletonMap(ConfluentConfigs.MAX_PARTITIONS_PER_CLUSTER_CONFIG, Integer.toString(10)));
        String str2 = "testTopic2";
        String str3 = "testTopic3";
        CreateTopicsResult createTopics = this.testHarness.createAdminClient(this.logicalCluster1.adminUser()).createTopics(Arrays.asList(newTopic("testTopic1", 5), newTopic("testTopic2", 6), newTopic("testTopic3", 10)));
        createTopics.values().get("testTopic1").get();
        Throwable cause = ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createTopics.values().get(str2).get();
        })).getCause();
        Assertions.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assertions.assertTrue(cause.getMessage().contains("You may not create more than 5 new partitions"));
        Throwable cause2 = ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createTopics.values().get(str3).get();
        })).getCause();
        Assertions.assertEquals(cause2.getClass(), PolicyViolationException.class);
        Assertions.assertTrue(cause2.getMessage().contains("You may not create more than 5 new partitions"));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testTopicCreationWithMaxNumPartitionsPerRequest(String str) throws ExecutionException, InterruptedException {
        setUp();
        createPhysicalAndLogicalClusters();
        String str2 = "aboveMaxTopic";
        CreateTopicsResult createTopics = this.testHarness.createAdminClient(this.logicalCluster1.adminUser()).createTopics(Arrays.asList(newTopic("belowMaxTopic", 99), newTopic("atMaxTopic", 100), newTopic("aboveMaxTopic", 101)));
        createTopics.values().get("belowMaxTopic").get();
        createTopics.values().get("atMaxTopic").get();
        Throwable cause = ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createTopics.values().get(str2).get();
        })).getCause();
        Assertions.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assertions.assertTrue(cause.getMessage().contains(Integer.toString(100)), String.format("Expected the error message to contain the max partition count - instead, it was: %s", cause.getMessage()));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPartitionCreationWithMaxNumPartitionsPerRequest(String str) throws ExecutionException, InterruptedException {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Arrays.asList(newTopic("topic", 5), newTopic("topic2", 5))).all().get();
        HashMap hashMap = new HashMap();
        hashMap.put("topic", NewPartitions.increaseTo(5 + 100 + 1));
        hashMap.put("topic2", NewPartitions.increaseTo(5 + 100));
        CreatePartitionsResult createPartitions = createAdminClient.createPartitions(hashMap);
        createPartitions.values().get("topic2").get();
        Throwable cause = ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createPartitions.values().get("topic").get();
        })).getCause();
        Assertions.assertEquals(cause.getClass(), PolicyViolationException.class);
        Assertions.assertTrue(cause.getMessage().contains(Integer.toString(100)), String.format("Expected the error message to contain the max partition count - instead, it was: %s", cause.getMessage()));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPartitionCreationWithInvalidNumPartitions(String str) throws ExecutionException, InterruptedException {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createTopics(Collections.singletonList(newTopic("topic", 5))).all().get();
        Iterator it = Arrays.asList(5, Integer.valueOf(5 - 1), 0, -10, Integer.MIN_VALUE).iterator();
        while (it.hasNext()) {
            CreatePartitionsResult createPartitions = createAdminClient.createPartitions(Collections.singletonMap("topic", NewPartitions.increaseTo(((Integer) it.next()).intValue())));
            Assertions.assertEquals(((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                createPartitions.all().get();
            })).getCause().getClass(), InvalidPartitionsException.class);
        }
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidTopicCreationWithAutoTopicCreation(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
        createPhysicalAndLogicalClusters(nodeProps);
        HashSet hashSet = new HashSet();
        hashSet.add(new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(this.logicalCluster1.user(11).unprefixedKafkaPrincipal().toString(), "*", AclOperation.CREATE, AclPermissionType.ALLOW)));
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        createAdminClient.createAcls(hashSet).all().get();
        Assertions.assertEquals(hashSet, describeAllAcls(createAdminClient));
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(this.logicalCluster1.user(11), SecurityProtocol.SASL_PLAINTEXT);
        Throwable th = null;
        try {
            try {
                Assertions.assertThrows(AuthorizationException.class, () -> {
                    KafkaTestUtils.sendRecords(createProducer, ".", 0, 10);
                });
                if (createProducer != null) {
                    if (0 == 0) {
                        createProducer.close();
                        return;
                    }
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createProducer != null) {
                if (th != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createProducer.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicPolicyMaxPartitionPerTenantIsDynamicallyUpdated(String str) throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        Collection<NewTopic> newTopics = newTopics(512);
        createAdminClient.createTopics(newTopics).all().get();
        Set set = (Set) newTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toSet());
        TestUtils.waitForCondition(() -> {
            return createAdminClient.listTopics().names().get().containsAll(set);
        }, String.format("Could not list topics %s in time", set));
        TestUtils.assertFutureThrows(createAdminClient.createTopics(Collections.singletonList(newTopic("over-limit", 1))).all(), PolicyViolationException.class, String.format("You may not create more than 0 new partitions. Adding the requested number of partitions will exceed %d total partitions. Currently, there are %d total topic partitions", 512, 512));
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.MAX_PARTITIONS_PER_TENANT_CONFIG, String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
            createAdminClient.createTopics(Collections.singletonList(newTopic("over-limit", 1))).all().get();
        });
        this.testHarness.shutdownBrokers();
        this.testHarness.startBrokers(nodeProps(), nodeProps());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        TestUtils.retryOnExceptionWithTimeout(() -> {
            createAdminClient2.createTopics(Collections.singletonList(newTopic("over-limit-again", 1))).all().get();
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicPolicyMaxPartitionPerClusterInvalidValue(String str) {
        setUp();
        createPhysicalAndLogicalClusters();
        TestUtils.assertFutureThrows(this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.MAX_PARTITIONS_PER_CLUSTER_CONFIG, "not a number"), AlterConfigOp.OpType.SET)))).all(), InvalidRequestException.class);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicPolicyMaxPartitionPerClusterIsDynamicallyUpdated(String str) throws Exception {
        setUp();
        createPhysicalAndLogicalClusters();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.MAX_PARTITIONS_PER_CLUSTER_CONFIG, String.valueOf(1)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.assertFutureThrows(createAdminClient.createTopics(newTopics(1 + 1)).all(), PolicyViolationException.class, String.format("You may not create more than %d new partitions. Adding the requested number of partitions will exceed cluster limits.", 1));
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.MAX_PARTITIONS_PER_CLUSTER_CONFIG, String.valueOf(Integer.MAX_VALUE)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
            createAdminClient.createTopics(Collections.singletonList(newTopic("over-limit", 1))).all().get();
        });
        this.testHarness.shutdownBrokers();
        this.testHarness.startBrokers(nodeProps(), nodeProps());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        TestUtils.retryOnExceptionWithTimeout(() -> {
            createAdminClient2.createTopics(Collections.singletonList(newTopic("over-limit-again", 1))).all().get();
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicWithCellsEnabled(String str) throws Exception {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.TENANT_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 2);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 2);
        nodeProps.put("default.replication.factor", 2);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        String str2 = "testtopicname";
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopicname", 1, (short) 2))).all().get();
        TestUtils.waitForCondition(() -> {
            return createAdminClient.describeTopics(Collections.singleton(str2)).allTopicNames().get().containsKey(str2);
        }, "Could not describe topic testtopicname");
        TopicDescription topicDescription = createAdminClient.describeTopics(Collections.singleton("testtopicname")).allTopicNames().get().get("testtopicname");
        List<Node> replicas = topicDescription.partitions().get(0).replicas();
        Set set = (Set) replicas.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        HashSet hashSet = new HashSet(Arrays.asList(0, 1));
        Assertions.assertEquals(1, topicDescription.partitions().size());
        Assertions.assertEquals(2, replicas.size());
        Assertions.assertEquals(hashSet, set);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicWithCellsEnabledSingleBroker(String str) throws Exception {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.TENANT_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.user(9));
        String str2 = "testtopicname";
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopicname", 1, (short) 1))).all().get();
        TestUtils.waitForCondition(() -> {
            return createAdminClient.describeTopics(Collections.singleton(str2)).allTopicNames().get().containsKey(str2);
        }, "Could not describe topic testtopicname");
        TopicDescription topicDescription = createAdminClient.describeTopics(Collections.singleton("testtopicname")).allTopicNames().get().get("testtopicname");
        List<Node> replicas = topicDescription.partitions().get(0).replicas();
        Set set = (Set) replicas.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Assertions.assertEquals(1, topicDescription.partitions().size());
        Assertions.assertEquals(1, replicas.size());
        Assertions.assertTrue(set.contains(0) || set.contains(1));
        String str3 = "testtopicnametwo";
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopicnametwo", 1, (short) 1))).all().get();
        TestUtils.waitForCondition(() -> {
            return createAdminClient.describeTopics(Collections.singleton(str3)).allTopicNames().get().containsKey(str3);
        }, "Could not describe topic testtopicnametwo");
        TopicDescription topicDescription2 = createAdminClient.describeTopics(Collections.singleton("testtopicnametwo")).allTopicNames().get().get("testtopicnametwo");
        List<Node> replicas2 = topicDescription2.partitions().get(0).replicas();
        Set set2 = (Set) replicas2.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Assertions.assertEquals(1, topicDescription2.partitions().size());
        Assertions.assertEquals(1, replicas2.size());
        Assertions.assertEquals(set, set2);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicWithCellsEnabledMultipleBrokers(String str) throws Exception {
        setUp(10, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.TENANT_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.user(9));
        String str2 = "testtopicname";
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopicname", 1, (short) 1))).all().get();
        TestUtils.waitForCondition(() -> {
            return createAdminClient.describeTopics(Collections.singleton(str2)).allTopicNames().get().containsKey(str2);
        }, "Could not describe topic testtopicname");
        TopicDescription topicDescription = createAdminClient.describeTopics(Collections.singleton("testtopicname")).allTopicNames().get().get("testtopicname");
        List<Node> replicas = topicDescription.partitions().get(0).replicas();
        Set set = (Set) replicas.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Assertions.assertEquals(1, topicDescription.partitions().size());
        Assertions.assertEquals(1, replicas.size());
        for (int i = 0; i < 5; i++) {
            String str3 = "testtopicnametwo" + i;
            createAdminClient.createTopics(Collections.singletonList(new NewTopic(str3, 1, (short) 1))).all().get();
            TestUtils.waitForCondition(() -> {
                return createAdminClient.describeTopics(Collections.singleton(str3)).allTopicNames().get().containsKey(str3);
            }, "Could not describe topic " + str3);
            TopicDescription topicDescription2 = createAdminClient.describeTopics(Collections.singleton(str3)).allTopicNames().get().get(str3);
            List<Node> replicas2 = topicDescription2.partitions().get(0).replicas();
            Set set2 = (Set) replicas2.stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            Assertions.assertEquals(1, topicDescription2.partitions().size());
            Assertions.assertEquals(1, replicas2.size());
            Assertions.assertEquals(set, set2);
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicWithCellsEnabledMultipleBrokersMultipleTenants(String str) throws Exception {
        setUp(10, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.TENANT_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 1);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        nodeProps.put("default.replication.factor", 1);
        createPhysicalAndLogicalClusters(nodeProps);
        for (AdminClient adminClient : Arrays.asList(this.testHarness.createAdminClient(this.logicalCluster1.user(9)), this.testHarness.createAdminClient(this.logicalCluster2.user(22)))) {
            String str2 = "testtopicname";
            adminClient.createTopics(Collections.singletonList(new NewTopic("testtopicname", 1, (short) 1))).all().get();
            TestUtils.waitForCondition(() -> {
                return adminClient.describeTopics(Collections.singleton(str2)).allTopicNames().get().containsKey(str2);
            }, "Could not describe topic testtopicname");
            TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton("testtopicname")).allTopicNames().get().get("testtopicname");
            List<Node> replicas = topicDescription.partitions().get(0).replicas();
            Set set = (Set) replicas.stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet());
            Assertions.assertEquals(1, topicDescription.partitions().size());
            Assertions.assertEquals(1, replicas.size());
            for (int i = 0; i < 5; i++) {
                String str3 = "testtopicnametwo" + i;
                adminClient.createTopics(Collections.singletonList(new NewTopic(str3, 1, (short) 1))).all().get();
                TestUtils.waitForCondition(() -> {
                    return adminClient.describeTopics(Collections.singleton(str3)).allTopicNames().get().containsKey(str3);
                }, "Could not describe topic " + str3);
                TopicDescription topicDescription2 = adminClient.describeTopics(Collections.singleton(str3)).allTopicNames().get().get(str3);
                List<Node> replicas2 = topicDescription2.partitions().get(0).replicas();
                Set set2 = (Set) replicas2.stream().map((v0) -> {
                    return v0.id();
                }).collect(Collectors.toSet());
                Assertions.assertEquals(1, topicDescription2.partitions().size());
                Assertions.assertEquals(1, replicas2.size());
                Assertions.assertEquals(set, set2);
            }
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicWithCellsEnableNoAvailableCell(String str) {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.TENANT_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 16);
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 16);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 32);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        createPhysicalAndLogicalClusters(nodeProps);
        TestUtils.assertFutureThrows(this.testHarness.createAdminClient(this.logicalCluster1.user(9)).createTopics(Collections.singletonList(new NewTopic("testtopicname", 1, (short) 1))).all(), ResourceNotFoundException.class);
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicWithCellsDynamicallyDisabled(String str) throws Exception {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_CONFIG, true);
        nodeProps.put(ConfluentConfigs.ENABLE_CELLS_IMPLICIT_CREATION_CONFIG, true);
        nodeProps.put(ConfluentConfigs.DEFAULT_PARTITION_PLACEMENT_STRATEGY_CONFIG, PartitionPlacementStrategy.TENANT_IN_CELL.code().toString());
        nodeProps.put(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 16);
        nodeProps.put(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 16);
        nodeProps.put(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 32);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.user(9));
        this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.ENABLE_CELLS_CONFIG, "false"), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.waitForCondition(() -> {
            return !this.physicalCluster.superConfluentAdmin().describeCells(Collections.emptyList()).value().get().cellsEnabled();
        }, "Could not wait for cells to be disabled");
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopicname", 1, (short) 1))).all().get();
        TopicDescription topicDescription = createAdminClient.describeTopics(Collections.singleton("testtopicname")).allTopicNames().get().get("testtopicname");
        List<Node> replicas = topicDescription.partitions().get(0).replicas();
        Set set = (Set) replicas.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Assertions.assertEquals(1, topicDescription.partitions().size());
        Assertions.assertEquals(1, replicas.size());
        Assertions.assertTrue(set.contains(0) || set.contains(1));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMaxReplicasPerBroker(String str) throws Exception {
        setUp(3, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.MAX_REPLICAS_PER_BROKER_CONFIG, 3);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 3);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.user(9));
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        TestUtils.assertFutureThrows(createAdminClient.createTopics(Collections.singletonList(new NewTopic("topic-3", 10, (short) 3))).all(), PolicyViolationException.class, "The cluster has reached the maximum number of replicas per broker.");
        createAdminClient.createTopics(new ArrayList(Arrays.asList(new NewTopic("topic-0", 1, (short) 3), new NewTopic("topic-1", 1, (short) 3), new NewTopic("topic-2", 1, (short) 3)))).all().get();
        TestUtils.assertFutureThrows(createAdminClient.createTopics(Collections.singletonList(new NewTopic("topic-3", 1, (short) 3))).all(), PolicyViolationException.class, "The cluster has reached the maximum number of replicas per broker.");
        superAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry(ConfluentConfigs.MAX_REPLICAS_PER_BROKER_CONFIG, String.valueOf(3 + 1)), AlterConfigOp.OpType.SET)))).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
            createAdminClient.createTopics(Collections.singletonList(new NewTopic("topic-3", 1, (short) 3))).all().get();
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMaxReplicasPerBrokerWithPartitionReassignment(String str) throws Exception {
        setUp(9, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.MAX_REPLICAS_PER_BROKER_CONFIG, 3);
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        createPhysicalAndLogicalClusters(nodeProps);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.user(9));
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        HashMap hashMap = new HashMap();
        hashMap.put(0, Collections.singletonList(1));
        createAdminClient.createTopics(new ArrayList(Arrays.asList(new NewTopic("topic-0", hashMap), new NewTopic("topic-1", hashMap), new NewTopic("topic-2", hashMap)))).all().get();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new TopicPartition(this.logicalCluster1.logicalClusterId() + "_topic-0", 0), Optional.of(new NewPartitionReassignment(Collections.singletonList(0))));
        hashMap2.put(new TopicPartition(this.logicalCluster1.logicalClusterId() + "_topic-1", 0), Optional.of(new NewPartitionReassignment(Collections.singletonList(1))));
        hashMap2.put(new TopicPartition(this.logicalCluster1.logicalClusterId() + "_topic-2", 0), Optional.of(new NewPartitionReassignment(Collections.singletonList(2))));
        superAdminClient.alterPartitionReassignments(hashMap2).all().get();
        TestUtils.retryOnExceptionWithTimeout(() -> {
            createAdminClient.createTopics(Collections.singletonList(new NewTopic("topic-3", 1, (short) 1))).all().get();
        });
    }

    private Collection<NewTopic> newTopics(int i) {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (i > 0) {
            int min = Math.min(i, 100);
            i -= min;
            arrayList.add(newTopic("test-" + i2, min));
            i2++;
        }
        return arrayList;
    }

    private NewTopic newTopic(String str, int i) {
        return new NewTopic(str, i, (short) 1);
    }
}
