/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.assignor;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.TestCluster;
import io.confluent.kafka.multitenant.assignor.ClusterMetadata;
import io.confluent.kafka.multitenant.assignor.RackMetadata;
import io.confluent.kafka.multitenant.assignor.TenantPartitionAssignor;
import io.confluent.kafka.multitenant.assignor.TenantPartitionAssignorBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.assignor.TopicReplicaAssignor;
import kafka.common.TopicPlacement;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TenantPartitionAssignorTest {
    private TestCluster testCluster = new TestCluster();
    private TenantPartitionAssignorBuilder partitionAssignorBuilder;
    private static final int MAX_PARTITIONS_PER_CREATION = 1000;
    private static final String TENANT_1 = "tenant1";
    private static final String TENANT_2 = "tenant2";
    private static final String TENANT_3 = "tenant3";
    private static final String CLUSTER_ID = "lkc-tenant";
    private static final MultiTenantPrincipal TENANT_PRINCIPAL = new MultiTenantPrincipal("tenant1", new TenantMetadata("tenant1", "lkc-tenant"));

    @BeforeEach
    public void setUp() {
        this.partitionAssignorBuilder = new TenantPartitionAssignorBuilder();
        this.partitionAssignorBuilder.configure(Collections.singletonMap("confluent.multitenant.max.partitions.per.request", 1000));
    }

    @Test
    public void testMaxPartitionAssignment() {
        int invalidPartitions = 1001;
        this.testCluster.addNode(0, "rack1", "cell1");
        this.testCluster.addNode(1, "rack2", "cell1");
        this.testCluster.addNode(2, "rack3", "cell1");
        this.testCluster.addNode(3, "rack1", "cell2");
        this.testCluster.addNode(4, "rack2", "cell2");
        this.testCluster.addNode(5, "rack3", "cell2");
        String existingTopic = "tenant1_topicA";
        this.createTopic(existingTopic, 9, 1);
        Assertions.assertThrows(InvalidRequestException.class, () -> this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions("tenant1_topicB", invalidPartitions, 0, 1), Optional.empty(), Collections.emptySet()));
        Assertions.assertThrows(InvalidRequestException.class, () -> this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(existingTopic, invalidPartitions, 9, 1), Optional.empty(), Collections.emptySet()));
    }

    @Test
    public void testNoClusterMetadataReturnsEmptyAssignment() {
        this.partitionAssignorBuilder.updateClusterMetadata(null);
        TenantPartitionAssignor assignor = (TenantPartitionAssignor)this.partitionAssignorBuilder.maybeBuildAssignor(Optional.of(TENANT_PRINCIPAL)).get();
        Optional newTopicAssignment = assignor.computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions("tenant1_topicB", 5, 0, 1), Optional.empty(), Collections.emptySet());
        Assertions.assertFalse((boolean)newTopicAssignment.isPresent(), (String)"Expected the assignor to refuse computing an assignment when there is no cluster given");
        Optional existingTopicAssignment = assignor.computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions("tenant1_topicB", 5, 0, 1), Optional.empty(), Collections.emptySet());
        Assertions.assertFalse((boolean)existingTopicAssignment.isPresent(), (String)"Expected the assignor to refuse computing an assignment when there is no cluster given");
    }

    @Test
    public void testTopicPlacementConstraintsReturnEmptyAssignment() {
        this.partitionAssignorBuilder.updateClusterMetadata(this.testCluster.cluster());
        TenantPartitionAssignor assignor = (TenantPartitionAssignor)this.partitionAssignorBuilder.maybeBuildAssignor(Optional.of(TENANT_PRINCIPAL)).get();
        String topicPlacementString = "{\"version\":1,\"replicas\":[{\"count\": 2, \"constraints\":{\"rack\":\"0\"}}], \"observers\": [{\"count\": 2, \"constraints\":{\"rack\":\"1\"}}]}";
        Optional placementOpt = TopicPlacement.parse((String)topicPlacementString);
        Assertions.assertTrue((boolean)placementOpt.isPresent(), (String)"Expected the topic placement to have parsed correctly");
        Optional newTopicAssignment = assignor.computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions("tenant1_topicB", 5, 0, 1), placementOpt, Collections.emptySet());
        Assertions.assertFalse((boolean)newTopicAssignment.isPresent(), (String)"Expected the assignor to refuse computing an assignment when there are topic placement constraints");
        Optional existingTopicAssignment = assignor.computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions("tenant1_topicB", 5, 0, 1), placementOpt, Collections.emptySet());
        Assertions.assertFalse((boolean)existingTopicAssignment.isPresent(), (String)"Expected the assignor to refuse computing an assignment when there are topic placement constraints");
    }

    @Test
    public void testRackUnawareAssignment() {
        String topic = "tenant1_topicA";
        this.addNodes(5, 0);
        this.createTopic(topic, 10, 3);
        List<List<Integer>> expectedAssignment = Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 4), Arrays.asList(3, 4, 0), Arrays.asList(4, 0, 1), Arrays.asList(0, 2, 3), Arrays.asList(1, 3, 4), Arrays.asList(2, 4, 0), Arrays.asList(3, 0, 1), Arrays.asList(4, 1, 2));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.verifyAssignmentsAreBalanced(0, 0, 0);
    }

    @Test
    public void testTopicCreationUnsatisfiableRfDueToExclusion() {
        String topic = "tenant1_topicA";
        HashSet<Integer> brokerIds = new HashSet<Integer>(Arrays.asList(0, 1, 2, 3));
        short replicationFactor = 3;
        for (Integer brokerId : brokerIds) {
            this.testCluster.addNode(brokerId);
        }
        Optional assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic, 3, 0, 1), Optional.empty(), brokerIds);
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon topic creation and therefore not create an assignment");
        assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic, 3, 0, replicationFactor), Optional.empty(), new HashSet<Integer>(Arrays.asList(0, 1)));
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon topic creation and therefore not create an assignment");
    }

    @Test
    public void testPartitionCreationUnsatisfiableRfDueToExclusion() {
        String topic = "tenant1_topicA";
        HashSet<Integer> brokerIds = new HashSet<Integer>(Arrays.asList(0, 1, 2, 3));
        short replicationFactor = 3;
        for (Integer brokerId : brokerIds) {
            this.testCluster.addNode(brokerId);
        }
        this.createTopic(topic, 3, replicationFactor);
        TopicReplicaAssignor.NewPartitions newPartitions = new TopicReplicaAssignor.NewPartitions(topic, 6, 3, replicationFactor);
        Optional assignmentOpt = this.assignor().computeAssignmentForExistingTopic(newPartitions, Optional.empty(), brokerIds);
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon new partition creation and therefore not create an assignment");
        assignmentOpt = this.assignor().computeAssignmentForExistingTopic(newPartitions, Optional.empty(), new HashSet<Integer>(Arrays.asList(0, 1)));
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon new partition creation and therefore not create an assignment");
    }

    @Test
    public void testRackUnawareAssignmentOneNodeCluster() {
        String topic = "tenant1_topicA";
        this.addNodes(1, 0);
        this.createTopic(topic, 5, 1);
        List<List<Integer>> expectedAssignment = Arrays.asList(Collections.singletonList(0), Collections.singletonList(0), Collections.singletonList(0), Collections.singletonList(0), Collections.singletonList(0));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
    }

    @Test
    public void testRackAwareAssignment() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0, "rack1");
        this.testCluster.addNode(1, "rack3");
        this.testCluster.addNode(2, "rack3");
        this.testCluster.addNode(3, "rack2");
        this.testCluster.addNode(4, "rack2");
        this.testCluster.addNode(5, "rack1");
        this.createTopic(topic, 12, 3);
        List<List<Integer>> expectedAssignment = Arrays.asList(Arrays.asList(0, 1, 3), Arrays.asList(1, 3, 5), Arrays.asList(3, 5, 2), Arrays.asList(5, 2, 4), Arrays.asList(2, 4, 0), Arrays.asList(4, 0, 1), Arrays.asList(0, 2, 4), Arrays.asList(1, 4, 0), Arrays.asList(3, 0, 1), Arrays.asList(5, 1, 3), Arrays.asList(2, 3, 5), Arrays.asList(4, 5, 2));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.verifyAssignmentsAreBalanced(0, 0, 0);
    }

    @Test
    public void testRackAwareCreateTopicAssignmentRespectsExclusion() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0, "rack1");
        this.testCluster.addNode(1, "rack3");
        this.testCluster.addNode(2, "rack3");
        this.testCluster.addNode(3, "rack2");
        this.testCluster.addNode(4, "rack2");
        this.testCluster.addNode(5, "rack1");
        HashSet<Integer> nonExcludedBrokers = new HashSet<Integer>(Arrays.asList(0, 1, 2, 3, 4, 5));
        HashSet<Integer> excludedRack3Brokers = new HashSet<Integer>(Arrays.asList(1, 2));
        nonExcludedBrokers.removeAll(excludedRack3Brokers);
        short replicationFactor = 3;
        Optional assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic, 6, 0, replicationFactor), Optional.empty(), excludedRack3Brokers);
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to have been able to compute an assignment for a new topic");
        Set allBrokerReplicas = ((List)assignmentOpt.get()).stream().flatMap(Collection::stream).collect(Collectors.toSet());
        Assertions.assertEquals(nonExcludedBrokers, allBrokerReplicas);
    }

    @Test
    public void testRackAwareCreatePartitionsAssignmentRespectsExclusion() {
        String topic = "tenant1_topicA";
        short replicationFactor = 3;
        this.testCluster.addNode(0, "rack1");
        this.testCluster.addNode(1, "rack3");
        this.testCluster.addNode(2, "rack3");
        this.testCluster.addNode(3, "rack2");
        this.testCluster.addNode(4, "rack2");
        this.testCluster.addNode(5, "rack1");
        this.createTopic(topic, 3, replicationFactor);
        HashSet<Integer> nonExcludedBrokers = new HashSet<Integer>(Arrays.asList(0, 1, 2, 3, 4, 5));
        HashSet<Integer> excludedRack3Brokers = new HashSet<Integer>(Arrays.asList(1, 2));
        nonExcludedBrokers.removeAll(excludedRack3Brokers);
        Optional assignmentOpt = this.assignor().computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions(topic, 15, 3, replicationFactor), Optional.empty(), excludedRack3Brokers);
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to have been able to compute an assignment for new partitions");
        Set newPartitionBrokerReplicas = ((List)assignmentOpt.get()).stream().flatMap(Collection::stream).collect(Collectors.toSet());
        Assertions.assertEquals(nonExcludedBrokers, newPartitionBrokerReplicas);
    }

    @Test
    public void testCellAwareAndRackUnawareAssignment() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0, null, "cell1");
        this.testCluster.addNode(1, null, "cell1");
        this.testCluster.addNode(2, null, "cell1");
        this.testCluster.addNode(3, null, "cell2");
        this.testCluster.addNode(4, null, "cell2");
        this.testCluster.addNode(5, null, "cell2");
        this.testCluster.addNode(6, null, "cell3");
        this.testCluster.addNode(7, null, "cell3");
        this.testCluster.addNode(8, null, "cell3");
        this.createTopic(topic, 9, 3);
        List<List<Integer>> expectedAssignment = Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 0), Arrays.asList(2, 0, 1), Arrays.asList(3, 4, 5), Arrays.asList(4, 5, 3), Arrays.asList(5, 3, 4), Arrays.asList(6, 7, 8), Arrays.asList(7, 8, 6), Arrays.asList(8, 6, 7));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.verifyAssignmentsAreBalanced(0, 0, 0);
    }

    @Test
    public void testCellAndRackAwareAssignment() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0, "rack1", "cell1");
        this.testCluster.addNode(1, "rack2", "cell1");
        this.testCluster.addNode(2, "rack3", "cell1");
        this.testCluster.addNode(3, "rack1", "cell2");
        this.testCluster.addNode(4, "rack2", "cell2");
        this.testCluster.addNode(5, "rack3", "cell2");
        this.testCluster.addNode(6, "rack1", "cell3");
        this.testCluster.addNode(7, "rack2", "cell3");
        this.testCluster.addNode(8, "rack3", "cell3");
        this.createTopic(topic, 9, 3);
        List<List<Integer>> expectedAssignment = Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 0), Arrays.asList(2, 0, 1), Arrays.asList(3, 4, 5), Arrays.asList(4, 5, 3), Arrays.asList(5, 3, 4), Arrays.asList(6, 7, 8), Arrays.asList(7, 8, 6), Arrays.asList(8, 6, 7));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.verifyAssignmentsAreBalanced(0, 0, 0);
    }

    @Test
    public void testCellAwareAssignmentWithOneCellIsEquivalentToRackUnawareAssignment() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0);
        this.testCluster.addNode(1);
        this.testCluster.addNode(2);
        this.testCluster.addNode(3);
        this.testCluster.addNode(4);
        this.createTopic(topic, 10, 3);
        List expectedAssignment = this.testCluster.cluster().partitionsForTopic(topic);
        this.testCluster.addNode(0, null, "cell1");
        this.testCluster.addNode(1, null, "cell1");
        this.testCluster.addNode(2, null, "cell1");
        this.testCluster.addNode(3, null, "cell1");
        this.testCluster.addNode(4, null, "cell1");
        this.createTopic(topic, 10, 3);
        List actualAssignment = this.testCluster.cluster().partitionsForTopic(topic);
        this.verifyPartitionsAssignment(expectedAssignment, actualAssignment);
    }

    @Test
    public void testCellAwareAssignmentWithOneCellIsEquivalentToRackAwareAssignment() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0, "rack1");
        this.testCluster.addNode(1, "rack3");
        this.testCluster.addNode(2, "rack3");
        this.testCluster.addNode(3, "rack2");
        this.testCluster.addNode(4, "rack1");
        this.testCluster.addNode(5, "rack2");
        this.createTopic(topic, 12, 3);
        List expectedAssignment = this.testCluster.cluster().partitionsForTopic(topic);
        this.testCluster.addNode(0, "rack1", "cell1");
        this.testCluster.addNode(1, "rack3", "cell1");
        this.testCluster.addNode(2, "rack3", "cell1");
        this.testCluster.addNode(3, "rack2", "cell1");
        this.testCluster.addNode(4, "rack1", "cell1");
        this.testCluster.addNode(5, "rack2", "cell1");
        this.createTopic(topic, 12, 3);
        List actualAssignment = this.testCluster.cluster().partitionsForTopic(topic);
        this.verifyPartitionsAssignment(expectedAssignment, actualAssignment);
    }

    @Test
    public void testCellAwareAssignmentFillEmptyCell() {
        String topicA = "tenant1_topicA";
        String topicB = "tenant1_topicB";
        this.testCluster.addNode(0, null, "cell1");
        this.testCluster.addNode(1, null, "cell1");
        this.testCluster.addNode(2, null, "cell1");
        this.testCluster.addNode(3, null, "cell1");
        this.testCluster.addNode(4, null, "cell2");
        this.testCluster.addNode(5, null, "cell2");
        this.testCluster.addNode(6, null, "cell2");
        this.testCluster.addNode(7, null, "cell2");
        this.testCluster.createPartitions(topicA, 0, Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 0), Arrays.asList(3, 0, 1)));
        this.createTopic(topicB, 4, 3);
        List<List<Integer>> expectedAssignment = Arrays.asList(Arrays.asList(4, 5, 6), Arrays.asList(5, 6, 7), Arrays.asList(6, 7, 4), Arrays.asList(7, 4, 5));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topicB));
    }

    @Test
    public void testCellAwareAssignmentBalancesLeadersFillEmptyNodes() {
        String topicA = "tenant1_topicA";
        String topicB = "tenant1_topicB";
        this.testCluster.addNode(0, null, "cell1");
        this.testCluster.addNode(1, null, "cell1");
        this.testCluster.addNode(2, null, "cell1");
        this.testCluster.addNode(3, null, "cell1");
        this.testCluster.addNode(4, null, "cell2");
        this.testCluster.addNode(5, null, "cell2");
        this.testCluster.addNode(6, null, "cell2");
        this.testCluster.addNode(7, null, "cell2");
        this.testCluster.createPartitions(topicA, 0, Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 0), Arrays.asList(4, 5, 6), Arrays.asList(5, 6, 7), Arrays.asList(6, 7, 4)));
        this.createTopic(topicB, 2, 3);
        Assertions.assertEquals(Arrays.asList(3, 7), (Object)this.testCluster.cluster().leadersForTopic(topicB));
    }

    @Test
    public void testCellAndRackAwareAssignmentAddPartitionsCorrectly() {
        String topic = "tenant1_topicA";
        this.testCluster.addNode(0, "rack1", "cell1");
        this.testCluster.addNode(1, "rack2", "cell1");
        this.testCluster.addNode(2, "rack3", "cell1");
        this.testCluster.addNode(3, "rack1", "cell2");
        this.testCluster.addNode(4, "rack2", "cell2");
        this.testCluster.addNode(5, "rack3", "cell2");
        this.createTopic(topic, 3, 3);
        ArrayList<List<Integer>> expectedAssignment = new ArrayList<List<Integer>>();
        expectedAssignment.addAll(Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 0), Arrays.asList(2, 0, 1)));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.addPartitions(topic, 3, 3);
        expectedAssignment.addAll(Arrays.asList(Arrays.asList(3, 4, 5), Arrays.asList(4, 5, 3), Arrays.asList(5, 3, 4)));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.addPartitions(topic, 6, 3);
        expectedAssignment.addAll(Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 0), Arrays.asList(2, 0, 1)));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
        this.addPartitions(topic, 9, 3);
        expectedAssignment.addAll(Arrays.asList(Arrays.asList(3, 4, 5), Arrays.asList(4, 5, 3), Arrays.asList(5, 3, 4)));
        this.verifyAssignment(expectedAssignment, this.testCluster.cluster().partitionsForTopic(topic));
    }

    private TenantPartitionAssignor assignor() {
        this.partitionAssignorBuilder.updateClusterMetadata(this.testCluster.cluster());
        return (TenantPartitionAssignor)this.partitionAssignorBuilder.maybeBuildAssignor(Optional.of(TENANT_PRINCIPAL)).get();
    }

    @Test
    public void testCellAndRackAwareNewTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion() {
        short largeRf = 6;
        short validRf = 3;
        int cell1BrokerToExclude = 2;
        int cell2BrokerToExclude = 5;
        this.testCluster.addNode(0, "rack1", "cell1");
        this.testCluster.addNode(1, "rack2", "cell1");
        this.testCluster.addNode(cell1BrokerToExclude, "rack3", "cell1");
        this.testCluster.addNode(3, "rack1", "cell2");
        this.testCluster.addNode(4, "rack2", "cell2");
        this.testCluster.addNode(cell2BrokerToExclude, "rack3", "cell2");
        this.testCellAwareNewTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion(largeRf, validRf, cell1BrokerToExclude, cell2BrokerToExclude);
    }

    @Test
    public void testCellAndRackUnAwareNewTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion() {
        short largeRf = 6;
        short validRf = 3;
        int cell1BrokerToExclude = 2;
        int cell2BrokerToExclude = 5;
        this.testCluster.addNode(0, null, "cell1");
        this.testCluster.addNode(1, null, "cell1");
        this.testCluster.addNode(cell1BrokerToExclude, null, "cell1");
        this.testCluster.addNode(3, null, "cell2");
        this.testCluster.addNode(4, null, "cell2");
        this.testCluster.addNode(cell2BrokerToExclude, null, "cell2");
        this.testCellAwareNewTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion(largeRf, validRf, cell1BrokerToExclude, cell2BrokerToExclude);
    }

    private void testCellAwareNewTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion(short largeRf, short validRf, int cell1BrokerToExclude, int cell2BrokerToExclude) {
        String topic = "tenant1_topicA";
        String topic2 = "tenant1_topicB";
        String topic3 = "tenant1_topicC";
        Optional assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic, 3, 0, largeRf), Optional.empty(), Collections.emptySet());
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon topic creation and therefore not create an assignment");
        assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic, 3, 0, validRf), Optional.empty(), Collections.emptySet());
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be able to satisfy a valid RF upon topic creation");
        Set<Integer> excludedBrokerIds = Collections.singleton(cell1BrokerToExclude);
        assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic2, 3, 0, validRf), Optional.empty(), excludedBrokerIds);
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be able to satisfy a valid RF upon topic creation");
        excludedBrokerIds = new HashSet<Integer>(Arrays.asList(cell1BrokerToExclude, cell2BrokerToExclude));
        assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic3, 3, 0, validRf), Optional.empty(), excludedBrokerIds);
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon topic creation and therefore not create an assignment");
    }

    @Test
    public void testCellAndRackAwareExistingTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion() {
        short largeRf = 20;
        short smallRf = 1;
        short validRf = 3;
        String topic = "tenant1_topicA";
        int cell1BrokerToExclude = 2;
        int cell2BrokerToExclude = 5;
        this.testCluster.addNode(0, "rack1", "cell1");
        this.testCluster.addNode(1, "rack2", "cell1");
        this.testCluster.addNode(cell1BrokerToExclude, "rack3", "cell1");
        this.testCluster.addNode(3, "rack1", "cell2");
        this.testCluster.addNode(4, "rack2", "cell2");
        this.testCluster.addNode(cell2BrokerToExclude, "rack3", "cell2");
        this.createTopic(topic, 3, validRf);
        this.testCellAwareExistingTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion(topic, 3, largeRf, smallRf, validRf, cell1BrokerToExclude, cell2BrokerToExclude);
    }

    @Test
    public void testCellAndRackUnAwareExistingTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion() {
        short largeRf = 20;
        short smallRf = 1;
        short validRf = 3;
        String topic = "tenant1_topicA";
        int cell1BrokerToExclude = 2;
        int cell2BrokerToExclude = 5;
        this.testCluster.addNode(0, null, "cell1");
        this.testCluster.addNode(1, null, "cell1");
        this.testCluster.addNode(cell1BrokerToExclude, null, "cell1");
        this.testCluster.addNode(3, null, "cell2");
        this.testCluster.addNode(4, null, "cell2");
        this.testCluster.addNode(cell2BrokerToExclude, null, "cell2");
        this.createTopic(topic, 3, validRf);
        this.testCellAwareExistingTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion(topic, 3, largeRf, smallRf, validRf, cell1BrokerToExclude, cell2BrokerToExclude);
    }

    private void testCellAwareExistingTopicAssignmentWithUnsatisfiableReplicationFactorAndExclusion(String existingTopic, int startingNumPartitions, short largeRf, short smallRf, short validRf, int cell1BrokerToExclude, int cell2BrokerToExclude) {
        for (Short rf : Arrays.asList(largeRf, smallRf)) {
            Optional assignmentOpt = this.assignor().computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions(existingTopic, 3, startingNumPartitions, rf.shortValue()), Optional.empty(), Collections.emptySet());
            Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon new partition creation and therefore not create an assignment");
        }
        int currentPartitionCount = startingNumPartitions;
        Optional assignmentOpt = this.assignor().computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions(existingTopic, currentPartitionCount + 6, currentPartitionCount, validRf), Optional.empty(), Collections.emptySet());
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be able to create an assignment with RF=3");
        assignmentOpt = this.assignor().computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions(existingTopic, (currentPartitionCount += 6) + 6, currentPartitionCount, validRf), Optional.empty(), Collections.singleton(cell1BrokerToExclude));
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be able to create a new assignments with RF=3 even if one cell has less");
        assignmentOpt = this.assignor().computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions(existingTopic, (currentPartitionCount += 6) + 6, currentPartitionCount, validRf), Optional.empty(), new HashSet<Integer>(Arrays.asList(cell1BrokerToExclude, cell2BrokerToExclude)));
        Assertions.assertFalse((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to be unable to satisfy RF upon new partition creation after both cells have less than RF and therefore not create an assignment");
    }

    @Test
    public void testNodeOrderingByPartitionCount() {
        this.addNodes(3, 0);
        this.testCluster.setPartitionLeaders("tenant1_topicA", 0, 10, 1);
        this.testCluster.setPartitionLeaders("tenant1_topicB", 0, 2, 2);
        this.testCluster.setPartitionLeaders("tenant2_topicA", 0, 5, 2);
        this.testCluster.setPartitionLeaders("tenant2_topicB", 0, 2, 1);
        this.verifyTopicCreateNodeOrder(TENANT_1, 0, 2, 1);
        this.verifyTopicCreateNodeOrder(TENANT_2, 0, 1, 2);
        this.verifyPartitionAddNodeOrder(TENANT_1, "tenant1_topicA", 0, 2, 1);
        this.verifyPartitionAddNodeOrder(TENANT_1, "tenant1_topicB", 0, 1, 2);
        this.verifyPartitionAddNodeOrder(TENANT_2, "tenant2_topicA", 0, 1, 2);
        this.verifyPartitionAddNodeOrder(TENANT_2, "tenant2_topicB", 0, 2, 1);
        this.testCluster.addNode(3, null);
        this.testCluster.setPartitionLeaders("tenant1_topicC", 0, 7, 3);
        this.verifyTopicCreateNodeOrder(TENANT_1, 0, 2, 3, 1);
        this.verifyPartitionAddNodeOrder(TENANT_1, "tenant1_topicB", 0, 3, 1, 2);
    }

    private void verifyTopicCreateNodeOrder(String tenant, Integer ... expectedNodeIds) {
        ClusterMetadata clusterMetadata = this.clusterMetadata(tenant, this.testCluster.cluster());
        ClusterMetadata.NodeReplicaCounter replicaCounter = clusterMetadata.nodeReplicaCounts(Collections.emptyList());
        List nodes = replicaCounter.orderLeaderNodes();
        Assertions.assertEquals(Arrays.asList(expectedNodeIds), (Object)nodes);
    }

    private void verifyPartitionAddNodeOrder(String tenant, String topic, Integer ... expectedNodeIds) {
        ClusterMetadata clusterMetadata = this.clusterMetadata(tenant, this.testCluster.cluster());
        ClusterMetadata.NodeReplicaCounter replicaCounter = clusterMetadata.nodeReplicaCounts(this.testCluster.cluster().partitionsForTopic(topic));
        List nodes = replicaCounter.orderLeaderNodes();
        Assertions.assertEquals(Arrays.asList(expectedNodeIds), (Object)nodes);
    }

    @Test
    public void testRackUnawareCreateTopicsBrokerMultiples() {
        this.verifyCreateTopicsBrokerMultiples(0);
    }

    @Test
    public void testRackAwareCreateTopicsBrokerMultiples() {
        this.verifyCreateTopicsBrokerMultiples(3);
    }

    @Test
    public void testRackUnawareCreateTopicsRespectsExclusion() {
        int numBrokers = 6;
        List<Integer> allBrokerIds = this.addNodes(numBrokers, 0);
        Set excludedBrokerIds = allBrokerIds.stream().limit(3L).collect(Collectors.toSet());
        String topic = "tenant1_topicA";
        short replicationFactor = 3;
        Optional assignmentOpt = this.assignor().computeAssignmentForNewTopic(new TopicReplicaAssignor.NewPartitions(topic, 3, 0, replicationFactor), Optional.empty(), excludedBrokerIds);
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to have been able to compute an assignment for a new topic");
        Set allBrokerReplicas = ((List)assignmentOpt.get()).stream().flatMap(Collection::stream).collect(Collectors.toSet());
        Assertions.assertEquals((int)3, (int)allBrokerReplicas.size());
        Set expectedEligibleReplicas = allBrokerIds.stream().filter(b -> !excludedBrokerIds.contains(b)).collect(Collectors.toSet());
        Assertions.assertEquals(expectedEligibleReplicas, allBrokerReplicas);
    }

    @Test
    public void testRackUnawareCreatePartitionsRespectsExclusion() {
        int numBrokers = 6;
        List<Integer> allBrokerIds = this.addNodes(numBrokers, 0);
        Set excludedBrokerIds = allBrokerIds.stream().limit(3L).collect(Collectors.toSet());
        Set expectedEligibleReplicas = allBrokerIds.stream().filter(b -> !excludedBrokerIds.contains(b)).collect(Collectors.toSet());
        String topic = "tenant1_topicA";
        short replicationFactor = 3;
        this.createTopic(topic, 3, replicationFactor);
        Optional assignmentOpt = this.assignor().computeAssignmentForExistingTopic(new TopicReplicaAssignor.NewPartitions(topic, 21, 3, replicationFactor), Optional.empty(), excludedBrokerIds);
        Assertions.assertTrue((boolean)assignmentOpt.isPresent(), (String)"Expected the assignor to have been able to compute an assignment for new partitions");
        List newPartitionAssignments = (List)assignmentOpt.get();
        Set allBrokerReplicas = newPartitionAssignments.stream().flatMap(Collection::stream).collect(Collectors.toSet());
        Assertions.assertEquals((int)3, (int)allBrokerReplicas.size());
        Assertions.assertEquals(expectedEligibleReplicas, allBrokerReplicas);
    }

    private void verifyCreateTopicsBrokerMultiples(int racks) {
        int i;
        int numBrokers = 6;
        this.addNodes(numBrokers, racks);
        for (i = 1; i <= 5; ++i) {
            this.createTopic("tenant1_topicA" + i, numBrokers, 3);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
        }
        for (i = 1; i <= 5; ++i) {
            this.createTopic("tenant2_topicB" + i, numBrokers, 3);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
        }
        for (i = 1; i <= 10; ++i) {
            this.createTopic("tenant1_topicC" + i, numBrokers * i, 3);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
        }
        for (i = 1; i <= 10; ++i) {
            this.createTopic("tenant2_topicD" + i, numBrokers * i, 3);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
        }
    }

    @Test
    public void testRackUnawareAddPartitionsBrokerMultiples() {
        this.verifyAddPartitionsBrokerMultiples(0);
    }

    @Test
    public void testRackAwareAddPartitionsBrokerMultiples() {
        this.verifyAddPartitionsBrokerMultiples(3);
    }

    private void verifyAddPartitionsBrokerMultiples(int racks) {
        int i;
        int numBrokers = 6;
        this.addNodes(numBrokers, racks);
        this.createTopic("tenant1_topicA", numBrokers, 3);
        for (i = 1; i <= 5; ++i) {
            this.addPartitions("tenant1_topicA", numBrokers * i, numBrokers);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
        }
        this.createTopic("tenant2_topicB", numBrokers, 3);
        for (i = 1; i <= 5; ++i) {
            this.addPartitions("tenant2_topicB", numBrokers * i, numBrokers);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
        }
        this.createTopic("tenant1_topicC", numBrokers, 3);
        i = 1;
        int p = numBrokers;
        while (i <= 10) {
            this.addPartitions("tenant1_topicC", p, numBrokers * i);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
            p += numBrokers * i++;
        }
        this.createTopic("tenant2_topicD", numBrokers, 3);
        i = 1;
        p = numBrokers;
        while (i <= 10) {
            this.addPartitions("tenant2_topicD", p, numBrokers * i);
            this.verifyAssignmentsAreBalanced(0, 0, 0);
            p += numBrokers * i++;
        }
    }

    @Test
    public void testRackUnawareTopicCreate() {
        this.addNodes(3, 0);
        this.createTopic("tenant1_topicA", 3, 2);
        this.verifyPartitionCountsOnNodes(true, 1, 1, 1);
        this.verifyPartitionCountsOnNodes(false, 1, 1, 1);
        this.createTopic("tenant2_topicA", 5, 2);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(true, 2, 3, 3);
        this.verifyAssignmentsAreBalanced(TENANT_2, 2);
        this.createTopic("tenant2_topicB", 4, 2);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(true, 4, 4, 4);
        this.verifyAssignmentsAreBalanced(TENANT_2, 2);
    }

    @Test
    public void testRackAwareTopicCreate() {
        this.addNodes(6, 3);
        this.createTopic("tenant1_topicA", 6, 3);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(true, 1, 1, 1, 1, 1, 1);
        this.verifyPartitionCountsOnNodes(false, 2, 2, 2, 2, 2, 2);
        this.createTopic("tenant2_topicA", 9, 3);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(true, 2, 2, 2, 3, 3, 3);
        this.verifyPartitionCountsOnNodes(false, 5, 5, 5, 5, 5, 5);
        this.verifyAssignmentsAreBalanced(1, 1, 1);
        this.createTopic("tenant2_topicB", 3, 3);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(false, 6, 6, 6, 6, 6, 6);
        this.verifyAssignmentsAreBalanced(0, 2, 2);
    }

    @Test
    public void testRackUnawarePartitionAdd() {
        this.addNodes(3, 0);
        this.createTopic("tenant1_topicA", 5, 2);
        this.addPartitions("tenant1_topicA", 5, 4);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(true, 3, 3, 3);
        this.verifyAssignmentsAreBalanced(TENANT_1, 2);
    }

    @Test
    public void testRackAwarePartitionAdd() {
        this.addNodes(6, 3);
        this.createTopic("tenant1_topicA", 9, 3);
        this.addPartitions("tenant1_topicA", 9, 3);
        this.verifyAssignments();
        this.verifyPartitionCountsOnNodes(true, 2, 2, 2, 2, 2, 2);
        this.verifyAssignmentsAreBalanced(0, 2, 2);
    }

    @Test
    public void testRackUnawareAssignmentVariation() {
        this.addNodes(5, 0);
        this.createTopic("tenant1_topicA", 5, 3);
        this.verifyPartitionCountsOnNodes(true, 1, 1, 1, 1, 1);
        this.verifyPartitionCountsOnNodes(false, 2, 2, 2, 2, 2);
        this.createTopic("tenant1_topicB", 10, 3);
        this.verifyPartitionCountsOnNodes(true, 3, 3, 3, 3, 3);
        this.verifyPartitionCountsOnNodes(false, 6, 6, 6, 6, 6);
        this.verifyAssignmentVariation();
    }

    @Test
    public void testRackAwareAssignmentVariation() {
        this.addNodes(6, 3);
        this.createTopic("tenant1_topicA", 6, 3);
        this.verifyPartitionCountsOnNodes(true, 1, 1, 1, 1, 1, 1);
        this.verifyPartitionCountsOnNodes(false, 2, 2, 2, 2, 2, 2);
        this.createTopic("tenant1_topicB", 12, 3);
        this.verifyPartitionCountsOnNodes(true, 3, 3, 3, 3, 3, 3);
        this.verifyPartitionCountsOnNodes(false, 6, 6, 6, 6, 6, 6);
        this.verifyAssignmentVariation();
    }

    private void verifyAssignmentVariation() {
        Cluster cluster = this.testCluster.cluster();
        HashSet allReplicas = new HashSet();
        int numPartitions = 0;
        for (String topic : cluster.topics()) {
            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
                ArrayList<Integer> partitionReplicas = new ArrayList<Integer>();
                for (Node replica : partitionInfo.replicas()) {
                    partitionReplicas.add(replica.id());
                }
                allReplicas.add(partitionReplicas);
                ++numPartitions;
            }
        }
        Assertions.assertTrue(((double)allReplicas.size() >= 0.5 * (double)numPartitions ? 1 : 0) != 0, (String)("Too few replica combinations " + allReplicas.size() + " for " + numPartitions));
    }

    @Test
    public void testUnavailableBrokers() {
        this.addNodes(6, 3);
        for (int i = 0; i < 5; ++i) {
            this.testCluster.setPartitionLeaders("tenant1_topicA", 1, 5, 10 + i);
        }
        Assertions.assertEquals((int)6, (int)this.testCluster.cluster().nodes().size());
        this.createTopic("tenant1_topicB", 6, 3);
        this.verifyPartitionCountsOnNodes(true, 1, 1, 1, 1, 1, 1);
        this.verifyPartitionCountsOnNodes(false, 2, 2, 2, 2, 2, 2);
    }

    @Test
    public void testRackUnawareAssignmentsAreBalanced() {
        this.addNodes(3, 0);
        this.verifyTopicCreation(TENANT_1);
        this.verifyPartitionAddition(TENANT_1);
    }

    @Test
    public void testRackAwareAssignmentsAreBalanced() {
        this.addNodes(6, 3);
        this.verifyTopicCreation(TENANT_1);
        this.verifyPartitionAddition(TENANT_1);
    }

    private void verifyTopicCreation(String tenant) {
        String otherTenant = "other" + tenant;
        this.testCluster.setPartitionLeaders(otherTenant + "_topicA", 0, 5, 2);
        this.testCluster.setPartitionLeaders(otherTenant + "_topicB", 0, 2, 1);
        TenantPartitionAssignor.TopicInfo topicInfo = new TenantPartitionAssignor.TopicInfo(10, 3, 0);
        Map<String, TenantPartitionAssignor.TopicInfo> newTopics = Collections.singletonMap(tenant + "_topicA", topicInfo);
        Map<String, List<List<Integer>>> assignment = this.assignNewTopics(tenant, newTopics);
        this.verifyAssignmentsAreBalanced(tenant, assignment, 3);
        topicInfo = new TenantPartitionAssignor.TopicInfo(10, 2, 0);
        newTopics = Collections.singletonMap(tenant + "_topicB", topicInfo);
        assignment = this.assignNewTopics(tenant, newTopics);
        this.verifyAssignmentsAreBalanced(TENANT_1, assignment, 3);
        newTopics = new HashMap<String, TenantPartitionAssignor.TopicInfo>();
        newTopics.put(tenant + "_topicC", new TenantPartitionAssignor.TopicInfo(10, 1, 0));
        newTopics.put(tenant + "_topicD", new TenantPartitionAssignor.TopicInfo(8, 2, 0));
        newTopics.put(tenant + "_topicE", new TenantPartitionAssignor.TopicInfo(7, 3, 0));
        assignment = this.assignNewTopics(tenant, newTopics);
        this.verifyAssignmentsAreBalanced(tenant, assignment, 3);
    }

    private void verifyPartitionAddition(String tenant) {
        HashMap<String, Integer> newPartitions = new HashMap<String, Integer>();
        int partitionsToAdd = 2;
        int maxReplicationFactor = 0;
        for (String topic : this.testCluster.cluster().topics()) {
            if (!topic.startsWith(tenant)) continue;
            List partitionInfos = this.testCluster.cluster().partitionsForTopic(topic);
            int firstNewPartition = partitionInfos.size();
            newPartitions.put(topic, firstNewPartition + partitionsToAdd);
            ++partitionsToAdd;
            int replicationFactor = ((PartitionInfo)partitionInfos.get(0)).replicas().length;
            if (replicationFactor <= maxReplicationFactor) continue;
            maxReplicationFactor = replicationFactor;
        }
        Assertions.assertFalse((boolean)newPartitions.isEmpty(), (String)"No tenant topics");
        Map<String, List<List<Integer>>> assignment = this.assignNewPartitions(newPartitions);
        this.verifyAssignmentsAreBalanced(tenant, assignment, maxReplicationFactor);
    }

    @Test
    public void testRackUnawareTopicCreateDelete() {
        this.addNodes(6, 0);
        this.verifyTopicCreateDelete();
    }

    @Test
    public void testRackAwareTopicCreateDelete() {
        this.addNodes(6, 3);
        this.verifyTopicCreateDelete();
    }

    private void verifyTopicCreateDelete() {
        Random random = new Random();
        List<String> tenants = Arrays.asList(TENANT_1, TENANT_2, TENANT_3);
        ArrayList<String> topics = new ArrayList<String>();
        for (int i = 0; i < 100; ++i) {
            String tenant = tenants.get(random.nextInt(tenants.size()));
            if (i > 5 && i % 5 == 0) {
                this.testCluster.deleteTopic((String)topics.remove(random.nextInt(topics.size())));
                continue;
            }
            String topic = tenant + "_Topic" + i;
            int partitions = random.nextInt(20);
            this.createTopic(topic, partitions == 0 ? 1 : partitions, 3);
            topics.add(topic);
        }
        block3: for (String tenant : tenants) {
            int maxRetries = 100;
            for (int i = 0; i < maxRetries; ++i) {
                try {
                    this.verifyAssignmentsAreBalanced(tenant, 3);
                    continue block3;
                }
                catch (Throwable t) {
                    if (i == maxRetries - 1) {
                        throw t;
                    }
                    this.createTopic(tenant + "_TopicA" + i, 2, 3);
                    continue;
                }
            }
        }
    }

    @Test
    public void testRackAwareOnlyIfAllBrokersHaveRack() {
        TestCluster cluster1 = new TestCluster();
        cluster1.addNode(1, null);
        Assertions.assertFalse((boolean)this.clusterMetadata("tenant", cluster1.cluster()).rackAware());
        cluster1.addNode(2, null);
        Assertions.assertFalse((boolean)this.clusterMetadata("tenant", cluster1.cluster()).rackAware());
        cluster1.addNode(3, "rack1");
        Assertions.assertFalse((boolean)this.clusterMetadata("tenant", cluster1.cluster()).rackAware());
        TestCluster cluster2 = new TestCluster();
        cluster2.addNode(1, "rack1");
        Assertions.assertTrue((boolean)this.clusterMetadata("tenant", cluster2.cluster()).rackAware());
        this.testCluster.addNode(2, "rack2");
        Assertions.assertTrue((boolean)this.clusterMetadata("tenant", cluster2.cluster()).rackAware());
        cluster2.addNode(3, null);
        Assertions.assertFalse((boolean)this.clusterMetadata("tenant", cluster2.cluster()).rackAware());
    }

    private ClusterMetadata clusterMetadata(String tenant, Cluster cluster) {
        return new ClusterMetadata(tenant, cluster, Collections.emptySet());
    }

    @Test
    public void testRackAlternatedBrokerList() {
        int numRacks = 3;
        int numBrokers = numRacks * 4;
        HashMap<Integer, String> brokerRacks = new HashMap<Integer, String>();
        ArrayList<Integer> brokerList = new ArrayList<Integer>();
        for (int i = 0; i < numBrokers; ++i) {
            brokerRacks.put(i, "rack" + i % numRacks);
            brokerList.add(i);
        }
        RackMetadata rackMetadata = brokerRacks::get;
        Assertions.assertEquals(brokerList, (Object)TenantPartitionAssignor.rackAlternatedBrokerList(brokerList, (RackMetadata)rackMetadata, Collections.emptyList()));
        Assertions.assertEquals(brokerList, (Object)TenantPartitionAssignor.rackAlternatedBrokerList(Arrays.asList(0, 3, 6, 9, 1, 4, 7, 10, 2, 5, 8, 11), (RackMetadata)rackMetadata, Collections.emptyList()));
        Collections.reverse(brokerList);
        Assertions.assertEquals(brokerList, (Object)TenantPartitionAssignor.rackAlternatedBrokerList(brokerList, (RackMetadata)rackMetadata, Collections.emptyList()));
        for (int i = 0; i < 5; ++i) {
            Collections.shuffle(brokerList);
            List alternateList = TenantPartitionAssignor.rackAlternatedBrokerList(brokerList, (RackMetadata)rackMetadata, Collections.emptyList());
            List rackOrder = alternateList.subList(0, numRacks).stream().map(b -> b % numRacks).collect(Collectors.toList());
            ArrayList rackAssignment = new ArrayList();
            for (int j = 0; j < numRacks; ++j) {
                rackAssignment.add(new ArrayList());
            }
            String errorMessage = "Unexpected assignment for " + brokerList + " : " + alternateList;
            for (int j = 0; j < numBrokers; ++j) {
                int expectedRack = (Integer)rackOrder.get(j % numRacks);
                int brokerId = (Integer)alternateList.get(j);
                Assertions.assertEquals((int)expectedRack, (int)(brokerId % numRacks), (String)errorMessage);
                ((List)rackAssignment.get(expectedRack)).add(brokerId);
            }
            rackAssignment.forEach(list -> {
                int prevIndex = -1;
                Iterator iterator = list.iterator();
                while (iterator.hasNext()) {
                    int broker = (Integer)iterator.next();
                    int curIndex = brokerList.indexOf(broker);
                    Assertions.assertTrue((curIndex > prevIndex ? 1 : 0) != 0, (String)errorMessage);
                    prevIndex = curIndex;
                }
            });
            ArrayList<Integer> expectedRackOrder = new ArrayList<Integer>();
            for (int j = 0; j < numBrokers; ++j) {
                int rack = (Integer)brokerList.get(j) % numRacks;
                if (expectedRackOrder.contains(rack)) continue;
                expectedRackOrder.add(rack);
                if (expectedRackOrder.size() == numRacks) break;
            }
            Assertions.assertEquals(expectedRackOrder, rackOrder, (String)errorMessage);
        }
    }

    private void createTopic(String topic, int partitions, int replicationFactor) {
        String tenant = topic.substring(0, topic.indexOf("_"));
        TenantPartitionAssignor.TopicInfo topicInfo = new TenantPartitionAssignor.TopicInfo(partitions, (short)replicationFactor, 0);
        Map<String, TenantPartitionAssignor.TopicInfo> topicInfos = Collections.singletonMap(topic, topicInfo);
        Map<String, List<List<Integer>>> assignment = this.assignNewTopics(tenant, topicInfos);
        this.testCluster.createPartitions(topic, 0, assignment.get(topic));
    }

    private void addPartitions(String topic, int firstNewPartition, int newPartitions) {
        int totalPartitions = firstNewPartition + newPartitions;
        Map<String, Integer> partitions = Collections.singletonMap(topic, totalPartitions);
        Map<String, List<List<Integer>>> assignment = this.assignNewPartitions(partitions);
        this.testCluster.createPartitions(topic, firstNewPartition, assignment.get(topic));
    }

    private List<Integer> addNodes(int count, int racks) {
        ArrayList<Integer> brokerIds = new ArrayList<Integer>();
        for (int i = 0; i < count; ++i) {
            String rack = racks == 0 ? null : "rack" + i % racks;
            this.testCluster.addNode(i, rack);
            brokerIds.add(i);
        }
        if (racks > 0) {
            Assertions.assertTrue((boolean)this.testCluster.rackAware());
        }
        return brokerIds;
    }

    private Map<String, List<List<Integer>>> assignNewTopics(String tenant, Map<String, TenantPartitionAssignor.TopicInfo> topicInfos) {
        HashMap<String, List<List<Integer>>> allAssignments = new HashMap<String, List<List<Integer>>>();
        for (Map.Entry<String, TenantPartitionAssignor.TopicInfo> entry : topicInfos.entrySet()) {
            TenantPartitionAssignor.TopicInfo tInfo = entry.getValue();
            String topic = entry.getKey();
            TopicReplicaAssignor.NewPartitions newPartitions = new TopicReplicaAssignor.NewPartitions(entry.getKey(), tInfo.totalPartitions, tInfo.firstNewPartition, tInfo.replicationFactor);
            TenantPartitionAssignor assignor = new TenantPartitionAssignor(this.testCluster.cluster(), tenant, 1000);
            List assignment = (List)assignor.computeAssignmentForNewTopic(newPartitions, Optional.empty(), Collections.emptySet()).orElseThrow(() -> new RuntimeException(String.format("Failed to create assignment for new topic %s", newPartitions)));
            Assertions.assertEquals((int)tInfo.totalPartitions, (int)assignment.size());
            this.testCluster.createPartitions(topic, 0, assignment);
            allAssignments.put(topic, assignment);
        }
        return allAssignments;
    }

    private Map<String, List<List<Integer>>> assignNewPartitions(Map<String, Integer> partitionInfos) {
        HashMap<String, List<List<Integer>>> allAssignments = new HashMap<String, List<List<Integer>>>();
        for (Map.Entry<String, Integer> entry : partitionInfos.entrySet()) {
            String topic = entry.getKey();
            int totalPartitions = entry.getValue();
            List existingPartitions = this.testCluster.cluster().partitionsForTopic(topic);
            short replicationFactor = (short)((PartitionInfo)existingPartitions.get(0)).replicas().length;
            int firstNewPartition = this.testCluster.cluster().partitionsForTopic(topic).size();
            TopicReplicaAssignor.NewPartitions newPartitions = new TopicReplicaAssignor.NewPartitions(topic, totalPartitions, firstNewPartition, replicationFactor);
            List assignment = (List)this.assignor().computeAssignmentForExistingTopic(newPartitions, Optional.empty(), Collections.emptySet()).orElseThrow(() -> new RuntimeException(String.format("Failed to create new partition assignment for %s", newPartitions)));
            int expectedNewPartitions = totalPartitions - firstNewPartition;
            Assertions.assertEquals((int)expectedNewPartitions, (int)assignment.size());
            for (List replicas : assignment) {
                Assertions.assertEquals((short)replicationFactor, (short)((short)replicas.size()));
            }
            this.testCluster.createPartitions(topic, firstNewPartition, assignment);
            allAssignments.put(topic, assignment);
        }
        return allAssignments;
    }

    private void verifyAssignmentsAreBalanced(String tenant, Map<String, List<List<Integer>>> assignments, int maxReplicationFactor) {
        Map<Node, Integer> leaders = this.testCluster.partitionCountByNode(tenant, true);
        Map<Node, Integer> followers = this.testCluster.partitionCountByNode(tenant, false);
        for (List<List<Integer>> assignment : assignments.values()) {
            for (List<Integer> replicas : assignment) {
                for (int i = 0; i < replicas.size(); ++i) {
                    Node node = this.testCluster.cluster().nodeById(replicas.get(i).intValue());
                    if (i == 0) {
                        leaders.put(node, leaders.get(node) + 1);
                        continue;
                    }
                    followers.put(node, leaders.get(node) + 1);
                }
            }
        }
        int maxReplicaDiff = maxReplicationFactor - 1;
        if (this.testCluster.rackAware()) {
            maxReplicaDiff *= this.testCluster.racks().size();
        }
        this.verifyAssignmentsAreBalanced(tenant, maxReplicaDiff);
    }

    private void verifyAssignmentsAreBalanced(String tenant, int maxReplicaDiff) {
        Collection<Integer> leaders = this.testCluster.partitionCountByNode(tenant, true).values();
        Collection<Integer> followers = this.testCluster.partitionCountByNode(tenant, false).values();
        int maxLeaderImbalance = this.testCluster.rackAware() ? this.testCluster.racks().size() : 1;
        Assertions.assertTrue((Collections.max(leaders) - Collections.min(leaders) <= maxLeaderImbalance ? 1 : 0) != 0, (String)("Leaders not balanced " + leaders));
        Assertions.assertTrue((Collections.max(followers) - Collections.min(followers) <= maxReplicaDiff ? 1 : 0) != 0, (String)("Follower replicas not balanced " + followers));
    }

    private void verifyAssignmentsAreBalanced(int ... maxDiff) {
        for (int i = 0; i < maxDiff.length; ++i) {
            List<Integer> replicaCounts = this.followerCountsByNode(Optional.of(i));
            Collections.sort(replicaCounts);
            Assertions.assertTrue((replicaCounts.get(replicaCounts.size() - 1) - replicaCounts.get(0) <= maxDiff[i] ? 1 : 0) != 0, (String)("Replicas not balanced for replica #" + i + " : " + replicaCounts));
        }
    }

    private List<Integer> leaderCountsByNode() {
        Cluster cluster = this.testCluster.cluster();
        return cluster.nodes().stream().map(node -> cluster.partitionsForNode(node.id()).size()).collect(Collectors.toList());
    }

    private List<Integer> followerCountsByNode(Optional<Integer> replicaIndex) {
        Cluster cluster = this.testCluster.cluster();
        Map partitionCounts = cluster.nodes().stream().collect(Collectors.toMap(Function.identity(), n -> 0));
        for (String topic : cluster.topics()) {
            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
                Node[] replicas = partitionInfo.replicas();
                replicaIndex.ifPresent(r -> partitionCounts.put(replicas[r], (Integer)partitionCounts.get(replicas[r]) + 1));
                if (replicaIndex.isPresent()) continue;
                for (int i = 1; i < replicas.length; ++i) {
                    partitionCounts.put(replicas[i], partitionCounts.get(replicas[i]) + 1);
                }
            }
        }
        return new ArrayList<Integer>(partitionCounts.values());
    }

    private void verifyPartitionCountsOnNodes(boolean leader, Integer ... sortedPartitionCounts) {
        List<Integer> expectedPartitionCounts = Arrays.asList(sortedPartitionCounts);
        List<Integer> actualPartitionCounts = leader ? this.leaderCountsByNode() : this.followerCountsByNode(Optional.empty());
        Collections.sort(actualPartitionCounts);
        Assertions.assertEquals(expectedPartitionCounts, actualPartitionCounts);
    }

    private void verifyAssignments() {
        Cluster cluster = this.testCluster.cluster();
        for (String topic : cluster.topics()) {
            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
                HashSet<Integer> replicas = new HashSet<Integer>();
                HashSet<String> replicaRacks = new HashSet<String>();
                for (Node node : partitionInfo.replicas()) {
                    replicas.add(node.id());
                    if (node.rack() == null) continue;
                    replicaRacks.add(node.rack());
                }
                Assertions.assertEquals((int)partitionInfo.replicas().length, (int)replicas.size());
                if (!this.testCluster.rackAware()) continue;
                Assertions.assertEquals((int)partitionInfo.replicas().length, (int)replicaRacks.size());
            }
        }
    }

    private void verifyAssignment(List<List<Integer>> expected, List<PartitionInfo> actual) {
        Assertions.assertEquals(expected, this.partitionsToAssignment(actual));
    }

    private void verifyPartitionsAssignment(List<PartitionInfo> expected, List<PartitionInfo> actual) {
        Assertions.assertEquals(this.partitionsToAssignment(expected), this.partitionsToAssignment(actual));
    }

    private List<List<Integer>> partitionsToAssignment(List<PartitionInfo> partitions) {
        ArrayList<List<Integer>> assignment = new ArrayList<List<Integer>>(partitions.size());
        for (int i = 0; i < partitions.size(); ++i) {
            assignment.add(null);
        }
        partitions.forEach(partitionInfo -> assignment.set(partitionInfo.partition(), Arrays.stream(partitionInfo.replicas()).map(Node::id).collect(Collectors.toList())));
        return assignment;
    }
}

