/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.BrokerControlStates;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.controller.PartitionChangeBuilder;
import org.apache.kafka.controller.PartitionReassignmentReplicas;
import org.apache.kafka.controller.PartitionReassignmentRevert;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.TopicIdPartition;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.ConfluentPartitionsPerTopicListener;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

public class ReplicationControlManager {
    static final int MAX_ELECTIONS_PER_IMBALANCE = 1000;
    private final SnapshotRegistry snapshotRegistry;
    private final Logger log;
    private final short defaultReplicationFactor;
    private final int defaultNumPartitions;
    private final int maxElectionsPerImbalance;
    private final ConfigurationControlManager configurationControl;
    private final ClusterControlManager clusterControl;
    private final Optional<CreateTopicPolicy> createTopicPolicy;
    private final boolean applyCreateTopicsPolicyToCreatePartitions;
    private final FeatureControlManager featureControl;
    private final TimelineHashMap<String, Uuid> topicsByName;
    private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;
    private final TimelineHashMap<Uuid, TopicControlInfo> topics;
    private final MirrorTopicControlManager mirrorTopicControl;
    private final BrokersToIsrs brokersToIsrs;
    private final TimelineHashMap<Uuid, int[]> reassigningTopics;
    private final TimelineHashSet<TopicIdPartition> imbalancedPartitions;
    final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();

    static Map<String, String> translateCreationConfigs(CreateTopicsRequestData.CreateableTopicConfigCollection collection) {
        HashMap result = new HashMap();
        collection.forEach(config -> result.put(config.name(), config.value()));
        return Collections.unmodifiableMap(result);
    }

    private ReplicationControlManager(SnapshotRegistry snapshotRegistry, LogContext logContext, short defaultReplicationFactor, int defaultNumPartitions, int maxElectionsPerImbalance, ConfigurationControlManager configurationControl, ClusterControlManager clusterControl, Optional<CreateTopicPolicy> createTopicPolicy, FeatureControlManager featureControl, boolean applyCreateTopicsPolicyToCreatePartitions, MirrorTopicControlManager mirrorTopicControl) {
        this.snapshotRegistry = snapshotRegistry;
        this.log = logContext.logger(ReplicationControlManager.class);
        this.defaultReplicationFactor = defaultReplicationFactor;
        this.defaultNumPartitions = defaultNumPartitions;
        this.maxElectionsPerImbalance = maxElectionsPerImbalance;
        this.configurationControl = configurationControl;
        this.createTopicPolicy = createTopicPolicy;
        this.featureControl = featureControl;
        this.applyCreateTopicsPolicyToCreatePartitions = applyCreateTopicsPolicyToCreatePartitions;
        this.clusterControl = clusterControl;
        this.topicsByName = new TimelineHashMap(snapshotRegistry, 0);
        this.topicsWithCollisionChars = new TimelineHashMap(snapshotRegistry, 0);
        this.topics = new TimelineHashMap(snapshotRegistry, 0);
        this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
        this.reassigningTopics = new TimelineHashMap(snapshotRegistry, 0);
        this.imbalancedPartitions = new TimelineHashSet(snapshotRegistry, 0);
        this.mirrorTopicControl = mirrorTopicControl;
    }

    Optional<CreateTopicPolicy> createTopicPolicy() {
        return this.createTopicPolicy;
    }

    public void replay(TopicRecord record) {
        this.topicsByName.put(record.name(), record.topicId());
        if (Topic.hasCollisionChars((String)record.name())) {
            String normalizedName = Topic.unifyCollisionChars((String)record.name());
            TimelineHashSet<String> topicNames = this.topicsWithCollisionChars.get(normalizedName);
            if (topicNames == null) {
                topicNames = new TimelineHashSet(this.snapshotRegistry, 1);
                this.topicsWithCollisionChars.put(normalizedName, topicNames);
            }
            topicNames.add(record.name());
        }
        this.topics.put(record.topicId(), new TopicControlInfo(record.name(), this.snapshotRegistry, record.topicId()));
        this.updateConfluentPartitionsAndTopicsPerTopicListener(record.name(), 0, 1);
        this.log.info("Created topic {} with topic ID {}.", (Object)record.name(), (Object)record.topicId());
    }

    public void replay(PartitionRecord record) {
        TopicControlInfo topicInfo = this.topics.get(record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionRegistration newPartInfo = new PartitionRegistration(record);
        PartitionRegistration prevPartInfo = (PartitionRegistration)topicInfo.parts.get(record.partitionId());
        String description = topicInfo.name + "-" + record.partitionId() + " with topic ID " + record.topicId();
        if (prevPartInfo == null) {
            this.log.info("Created partition {} and {}.", (Object)description, (Object)newPartInfo);
            topicInfo.parts.put(record.partitionId(), newPartInfo);
            this.brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, -1, newPartInfo.leader);
            this.updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), false, newPartInfo.isReassigning());
            this.updateConfluentPartitionsAndTopicsPerTopicListener(topicInfo.name, 1, 0);
        } else if (!newPartInfo.equals(prevPartInfo)) {
            newPartInfo.maybeLogPartitionChange(this.log, description, prevPartInfo);
            topicInfo.parts.put(record.partitionId(), newPartInfo);
            this.brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
            this.updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), prevPartInfo.isReassigning(), newPartInfo.isReassigning());
        }
        if (newPartInfo.hasPreferredLeader()) {
            this.imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
        } else {
            this.imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
        }
    }

    private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId, boolean wasReassigning, boolean isReassigning) {
        if (!wasReassigning) {
            if (isReassigning) {
                int[] prevReassigningParts = this.reassigningTopics.getOrDefault(topicId, Replicas.NONE);
                this.reassigningTopics.put(topicId, Replicas.copyWith(prevReassigningParts, partitionId));
            }
        } else if (!isReassigning) {
            int[] prevReassigningParts = this.reassigningTopics.getOrDefault(topicId, Replicas.NONE);
            int[] newReassigningParts = Replicas.copyWithout(prevReassigningParts, partitionId);
            if (newReassigningParts.length == 0) {
                this.reassigningTopics.remove(topicId);
            } else {
                this.reassigningTopics.put(topicId, newReassigningParts);
            }
        }
    }

    public void replay(PartitionChangeRecord record) {
        TopicControlInfo topicInfo = this.topics.get(record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionRegistration prevPartitionInfo = (PartitionRegistration)topicInfo.parts.get(record.partitionId());
        if (prevPartitionInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no partition with that id was found.");
        }
        PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
        this.updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), prevPartitionInfo.isReassigning(), newPartitionInfo.isReassigning());
        topicInfo.parts.put(record.partitionId(), newPartitionInfo);
        this.brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, newPartitionInfo.leader);
        String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + record.topicId();
        newPartitionInfo.maybeLogPartitionChange(this.log, topicPart, prevPartitionInfo);
        if (newPartitionInfo.hasPreferredLeader()) {
            this.imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
        } else {
            this.imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
        }
        if (record.removingReplicas() != null || record.addingReplicas() != null) {
            this.log.info("Replayed partition assignment change {} for topic {}", (Object)record, (Object)topicInfo.name);
        } else if (this.log.isTraceEnabled()) {
            this.log.trace("Replayed partition change {} for topic {}", (Object)record, (Object)topicInfo.name);
        }
    }

    public void replay(RemoveTopicRecord record) {
        String normalizedName;
        TimelineHashSet<String> colliding;
        TopicControlInfo topic = this.topics.remove(record.topicId());
        if (topic == null) {
            throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() + " to remove.");
        }
        this.topicsByName.remove(topic.name);
        if (Topic.hasCollisionChars((String)topic.name) && (colliding = this.topicsWithCollisionChars.get(normalizedName = Topic.unifyCollisionChars((String)topic.name))) != null) {
            colliding.remove(topic.name);
            if (colliding.isEmpty()) {
                this.topicsWithCollisionChars.remove(normalizedName);
            }
        }
        this.reassigningTopics.remove(record.topicId());
        this.configurationControl.deleteTopicConfigs(topic.name);
        this.mirrorTopicControl.deleteMirrorTopic(topic.topicId(), topic.name());
        for (Map.Entry entry : topic.parts.entrySet()) {
            int partitionId = (Integer)entry.getKey();
            PartitionRegistration partition = (PartitionRegistration)entry.getValue();
            for (int i = 0; i < partition.isr.length; ++i) {
                this.brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
            }
            this.imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), partitionId));
        }
        this.brokersToIsrs.removeTopicEntryForBroker(topic.id, -1);
        this.log.info("Removed topic {} with ID {}.", (Object)topic.name, (Object)record.topicId());
        this.updateConfluentPartitionsAndTopicsPerTopicListener(topic.name, -topic.parts.size(), -1);
    }

    ControllerResult<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request, Set<String> describable, KafkaPrincipal principal) {
        HashMap<String, ApiError> topicErrors = new HashMap<String, ApiError>();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ReplicationControlManager.validateNewTopicNames(topicErrors, request.topics(), this.topicsWithCollisionChars);
        request.topics().stream().filter(creatableTopic -> this.topicsByName.containsKey(creatableTopic.name())).forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic '" + t.name() + "' already exists.")));
        Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges = ReplicationControlManager.computeConfigChanges(topicErrors, request.topics());
        ControllerResult<Map<ConfigResource, ApiError>> configResult = this.configurationControl.incrementalAlterConfigs(configChanges, true, principal);
        for (Map.Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) {
            if (!entry.getValue().isFailure()) continue;
            topicErrors.put(entry.getKey().name(), entry.getValue());
        }
        records.addAll(configResult.records());
        HashMap<String, CreateTopicsResponseData.CreatableTopicResult> successes = new HashMap<String, CreateTopicsResponseData.CreatableTopicResult>();
        Set<Integer> excludedBrokerIds = this.clusterControl.activeBrokerReplicaExclusions().keySet();
        for (CreateTopicsRequestData.CreatableTopic topic : request.topics()) {
            ApiError error;
            if (topicErrors.containsKey(topic.name())) continue;
            try {
                error = this.createTopic(topic, records, successes, describable.contains(topic.name()), excludedBrokerIds, principal);
            }
            catch (ApiException e) {
                error = ApiError.fromThrowable((Throwable)e);
            }
            if (!error.isFailure()) continue;
            topicErrors.put(topic.name(), error);
        }
        CreateTopicsResponseData data = new CreateTopicsResponseData();
        StringBuilder resultsBuilder = new StringBuilder();
        String resultsPrefix = "";
        for (CreateTopicsRequestData.CreatableTopic topic : request.topics()) {
            ApiError error = (ApiError)topicErrors.get(topic.name());
            if (error != null) {
                data.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name()).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                resultsBuilder.append(resultsPrefix).append(topic).append(": ").append(error.error()).append(" (").append(error.message()).append(")");
                resultsPrefix = ", ";
                continue;
            }
            CreateTopicsResponseData.CreatableTopicResult result = (CreateTopicsResponseData.CreatableTopicResult)successes.get(topic.name());
            data.topics().add((ImplicitLinkedHashCollection.Element)result);
            resultsBuilder.append(resultsPrefix).append(topic).append(": ").append("SUCCESS");
            resultsPrefix = ", ";
        }
        if (request.validateOnly()) {
            this.log.info("Validate-only CreateTopics result(s): {}", (Object)resultsBuilder.toString());
            return ControllerResult.atomicOf(Collections.emptyList(), data);
        }
        this.log.info("CreateTopics result(s): {}", (Object)resultsBuilder.toString());
        return ControllerResult.atomicOf(records, data);
    }

    private ApiError createTopic(CreateTopicsRequestData.CreatableTopic topic, List<ApiMessageAndVersion> records, Map<String, CreateTopicsResponseData.CreatableTopicResult> successes, boolean authorizedToReturnConfigs, Set<Integer> excludedBrokerIds, KafkaPrincipal principal) {
        boolean isMirrorTopic = topic.mirrorTopic() != null;
        Map<String, String> creationConfigs = ReplicationControlManager.translateCreationConfigs(topic.configs());
        HashMap<Integer, PartitionRegistration> newParts = new HashMap<Integer, PartitionRegistration>();
        if (!topic.assignments().isEmpty()) {
            if (topic.replicationFactor() != -1) {
                return new ApiError(Errors.INVALID_REQUEST, "A manual partition assignment was specified, but replication factor was not set to -1.");
            }
            if (topic.numPartitions() != -1) {
                return new ApiError(Errors.INVALID_REQUEST, "A manual partition assignment was specified, but numPartitions was not set to -1.");
            }
            OptionalInt replicationFactor = OptionalInt.empty();
            for (CreateTopicsRequestData.CreatableReplicaAssignment assignment : topic.assignments()) {
                if (newParts.containsKey(assignment.partitionIndex())) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "Found multiple manual partition assignments for partition " + assignment.partitionIndex());
                }
                this.validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor, excludedBrokerIds);
                replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                List<Integer> isr = assignment.brokerIds().stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
                if (isr.isEmpty()) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "All brokers specified in the manual partition assignment for partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
                }
                newParts.put(assignment.partitionIndex(), new PartitionRegistration(Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr), Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0));
            }
            for (int i = 0; i < newParts.size(); ++i) {
                if (newParts.containsKey(i)) continue;
                return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "partitions should be a consecutive 0-based integer sequence");
            }
            ApiError error = this.maybeCheckCreateTopicPolicy(() -> {
                HashMap assignments = new HashMap();
                newParts.entrySet().forEach(e -> assignments.put(e.getKey(), Replicas.toList(((PartitionRegistration)e.getValue()).replicas)));
                return new CreateTopicPolicy.RequestMetadata(topic.name(), null, null, assignments, creationConfigs);
            });
            if (error.isFailure()) {
                return error;
            }
        } else {
            if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Replication factor must be larger than 0, or -1 to use the default value.");
            }
            if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
                return new ApiError(Errors.INVALID_PARTITIONS, "Number of partitions was set to an invalid non-positive value.");
            }
            int numPartitions = topic.numPartitions() == -1 ? this.defaultNumPartitions : topic.numPartitions();
            short replicationFactor = topic.replicationFactor() == -1 ? this.defaultReplicationFactor : topic.replicationFactor();
            try {
                List<List<Integer>> partitions = this.clusterControl.replicaPlacer().place(new PlacementSpec(0, numPartitions, replicationFactor, topic.name(), principal, this.clusterControl.activeBrokerReplicaExclusions().keySet()), this.clusterDescriber);
                for (int partitionId = 0; partitionId < partitions.size(); ++partitionId) {
                    List<Integer> replicas = partitions.get(partitionId);
                    List<Integer> isr = replicas.stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
                    if (isr.isEmpty()) {
                        return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Unable to replicate the partition " + replicationFactor + " time(s): All brokers are currently fenced or in controlled shutdown.");
                    }
                    if (isMirrorTopic) {
                        newParts.put(partitionId, new PartitionRegistration(Replicas.toArray(replicas), Replicas.toArray(isr), Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0, -1, PartitionRegistration.LinkState.ACTIVE));
                        continue;
                    }
                    newParts.put(partitionId, new PartitionRegistration(Replicas.toArray(replicas), Replicas.toArray(isr), Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0));
                }
            }
            catch (InvalidReplicationFactorException e) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Unable to replicate the partition " + replicationFactor + " time(s): " + e.getMessage());
            }
            ApiError error = this.maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(topic.name(), Integer.valueOf(numPartitions), Short.valueOf(replicationFactor), null, creationConfigs));
            if (error.isFailure()) {
                return error;
            }
        }
        Uuid topicId = Uuid.randomUuid();
        ArrayList mirrorTopicRecord = new ArrayList(1);
        ApiError clusterLinkError = this.mirrorTopicControl.maybeAddMirrorTopicRecord(topic, topicId, mirrorTopicRecord::add);
        if (clusterLinkError != ApiError.NONE) {
            return clusterLinkError;
        }
        CreateTopicsResponseData.CreatableTopicResult result = new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name()).setTopicId(topicId).setErrorCode(Errors.NONE.code()).setErrorMessage(null);
        if (authorizedToReturnConfigs) {
            Map<String, ConfigEntry> effectiveConfig = this.configurationControl.computeEffectiveTopicConfigs(creationConfigs);
            ArrayList<String> configNames = new ArrayList<String>(effectiveConfig.keySet());
            configNames.sort(String::compareTo);
            for (String configName : configNames) {
                ConfigEntry entry = effectiveConfig.get(configName);
                result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().setConfigName(entry.name()).setValue(entry.isSensitive() ? null : entry.value()).setReadOnly(entry.isReadOnly()).setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).setIsSensitive(entry.isSensitive()));
            }
            result.setNumPartitions(newParts.size());
            result.setReplicationFactor((short)((PartitionRegistration)newParts.values().iterator().next()).replicas.length);
            result.setTopicConfigErrorCode(Errors.NONE.code());
        } else {
            result.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
        }
        successes.put(topic.name(), result);
        records.add(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName(topic.name()).setTopicId(topicId), MetadataRecordType.TOPIC_RECORD.highestSupportedVersion()));
        records.addAll(mirrorTopicRecord);
        for (Map.Entry partEntry : newParts.entrySet()) {
            int partitionIndex = (Integer)partEntry.getKey();
            PartitionRegistration info = (PartitionRegistration)partEntry.getValue();
            records.add(info.toRecord(topicId, partitionIndex));
        }
        return ApiError.NONE;
    }

    private ApiError maybeCheckCreateTopicPolicy(Supplier<CreateTopicPolicy.RequestMetadata> supplier) {
        if (this.createTopicPolicy.isPresent()) {
            try {
                this.createTopicPolicy.get().validate(supplier.get());
            }
            catch (PolicyViolationException e) {
                return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
            }
        }
        return ApiError.NONE;
    }

    static void validateNewTopicNames(Map<String, ApiError> topicErrors, CreateTopicsRequestData.CreatableTopicCollection topics, Map<String, ? extends Set<String>> topicsWithCollisionChars) {
        for (CreateTopicsRequestData.CreatableTopic topic : topics) {
            String normalizedName;
            Set<String> colliding;
            if (topicErrors.containsKey(topic.name())) continue;
            try {
                Topic.validate((String)topic.name());
            }
            catch (InvalidTopicException e) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
            }
            if (!Topic.hasCollisionChars((String)topic.name()) || (colliding = topicsWithCollisionChars.get(normalizedName = Topic.unifyCollisionChars((String)topic.name()))) == null) continue;
            topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic '" + topic.name() + "' collides with existing topic: " + colliding.iterator().next()));
        }
    }

    static Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> computeConfigChanges(Map<String, ApiError> topicErrors, CreateTopicsRequestData.CreatableTopicCollection topics) {
        HashMap<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges = new HashMap<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>>();
        for (CreateTopicsRequestData.CreatableTopic topic : topics) {
            if (topicErrors.containsKey(topic.name())) continue;
            HashMap<String, AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>> topicConfigs = new HashMap<String, AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>>();
            ArrayList<String> nullConfigs = new ArrayList<String>();
            for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) {
                if (config.value() == null) {
                    nullConfigs.add(config.name());
                    continue;
                }
                topicConfigs.put(config.name(), new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, config.value()));
            }
            if (!nullConfigs.isEmpty()) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_CONFIG, "Null value not supported for topic configs: " + String.join((CharSequence)",", nullConfigs)));
                continue;
            }
            if (topicConfigs.isEmpty()) continue;
            configChanges.put(new ConfigResource(ConfigResource.Type.TOPIC, topic.name()), topicConfigs);
        }
        return configChanges;
    }

    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
        HashMap<String, ResultOrError<Uuid>> results = new HashMap<String, ResultOrError<Uuid>>(names.size());
        for (String name : names) {
            if (name == null) {
                results.put(null, new ResultOrError(Errors.INVALID_REQUEST, "Invalid null topic name."));
                continue;
            }
            Uuid id = this.topicsByName.get(name, offset);
            if (id == null) {
                results.put(name, new ResultOrError<ApiError>(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)));
                continue;
            }
            results.put(name, new ResultOrError<Uuid>(id));
        }
        return results;
    }

    Map<String, Uuid> findAllTopicIds(long offset) {
        HashMap<String, Uuid> result = new HashMap<String, Uuid>(this.topicsByName.size(offset));
        for (Map.Entry<String, Uuid> entry : this.topicsByName.entrySet(offset)) {
            result.put(entry.getKey(), entry.getValue());
        }
        return result;
    }

    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
        HashMap<Uuid, ResultOrError<String>> results = new HashMap<Uuid, ResultOrError<String>>(ids.size());
        for (Uuid id : ids) {
            if (id == null || id.equals((Object)Uuid.ZERO_UUID)) {
                results.put(id, new ResultOrError<ApiError>(new ApiError(Errors.INVALID_REQUEST, "Attempt to find topic with invalid topicId " + id)));
                continue;
            }
            TopicControlInfo topic = this.topics.get(id, offset);
            if (topic == null) {
                results.put(id, new ResultOrError<ApiError>(new ApiError(Errors.UNKNOWN_TOPIC_ID)));
                continue;
            }
            results.put(id, new ResultOrError<String>(topic.name));
        }
        return results;
    }

    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
        HashMap<Uuid, ApiError> results = new HashMap<Uuid, ApiError>(ids.size());
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(ids.size());
        for (Uuid id : ids) {
            try {
                this.deleteTopic(id, records);
                results.put(id, ApiError.NONE);
            }
            catch (ApiException e) {
                results.put(id, ApiError.fromThrowable((Throwable)e));
            }
            catch (Exception e) {
                this.log.error("Unexpected deleteTopics error for {}", (Object)id, (Object)e);
                results.put(id, ApiError.fromThrowable((Throwable)e));
            }
        }
        return ControllerResult.atomicOf(records, results);
    }

    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
        TopicControlInfo topic = this.topics.get(id);
        if (topic == null) {
            throw new UnknownTopicIdException(Errors.UNKNOWN_TOPIC_ID.message());
        }
        records.add(new ApiMessageAndVersion((ApiMessage)new RemoveTopicRecord().setTopicId(id), MetadataRecordType.REMOVE_TOPIC_RECORD.highestSupportedVersion()));
    }

    PartitionRegistration getPartition(Uuid topicId, int partitionId) {
        TopicControlInfo topic = this.topics.get(topicId);
        if (topic == null) {
            return null;
        }
        return (PartitionRegistration)topic.parts.get(partitionId);
    }

    TopicControlInfo getTopic(Uuid topicId) {
        return this.topics.get(topicId);
    }

    Uuid getTopicId(String name) {
        return this.topicsByName.get(name);
    }

    Set<String> getAllTopicNames() {
        return this.topicsByName.keySet();
    }

    BrokersToIsrs brokersToIsrs() {
        return this.brokersToIsrs;
    }

    Set<TopicIdPartition> imbalancedPartitions() {
        return new HashSet<TopicIdPartition>(this.imbalancedPartitions);
    }

    ControllerResult<AlterPartitionResponseData> alterPartition(ControllerRequestContext context, AlterPartitionRequestData request) {
        short requestVersion = context.requestHeader().requestApiVersion();
        this.clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
        AlterPartitionResponseData response = new AlterPartitionResponseData();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        for (AlterPartitionRequestData.TopicData topicData : request.topics()) {
            Uuid topicId;
            AlterPartitionResponseData.TopicData responseTopicData = new AlterPartitionResponseData.TopicData().setTopicName(topicData.topicName()).setTopicId(topicData.topicId());
            response.topics().add(responseTopicData);
            Uuid uuid = topicId = requestVersion > 1 ? topicData.topicId() : this.topicsByName.get(topicData.topicName());
            if (topicId == null || topicId.equals((Object)Uuid.ZERO_UUID) || !this.topics.containsKey(topicId)) {
                Errors error = requestVersion > 1 ? Errors.UNKNOWN_TOPIC_ID : Errors.UNKNOWN_TOPIC_OR_PARTITION;
                for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                    responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(error.code()));
                }
                this.log.info("Rejecting AlterPartition request for unknown topic ID {} or name {}.", (Object)topicData.topicId(), (Object)topicData.topicName());
                continue;
            }
            TopicControlInfo topic = this.topics.get(topicId);
            for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                int partitionId = partitionData.partitionIndex();
                PartitionRegistration partition = (PartitionRegistration)topic.parts.get(partitionId);
                Errors validationError = this.validateAlterPartitionData(request.brokerId(), topic, partitionId, partition, context.requestHeader().requestApiVersion(), partitionData);
                if (validationError != Errors.NONE) {
                    responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId).setErrorCode(validationError.code()));
                    continue;
                }
                PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topic.id, partitionId, this.clusterControl::isActive, this.featureControl.metadataVersion().isLeaderRecoverySupported());
                if (this.configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                    builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                }
                builder.setTargetIsr(partitionData.newIsr());
                builder.setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
                this.buildClusterLinkState(topicId, partitionId, partitionData.clusterLinkState(), builder);
                Optional<ApiMessageAndVersion> record = builder.build();
                if (record.isPresent()) {
                    records.add(record.get());
                    if (partitionData.clusterLinkState().linkFailed()) {
                        this.mirrorTopicControl.failMirrorTopic(topicId, records::add);
                    }
                    PartitionChangeRecord change = (PartitionChangeRecord)record.get().message();
                    partition = partition.merge(change);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Node {} has altered ISR for {}-{} to {}.", new Object[]{request.brokerId(), topic.name, partitionId, change.isr()});
                    }
                    if (change.leader() != request.brokerId() && change.leader() != -2) {
                        Errors error = requestVersion > 1 ? Errors.NEW_LEADER_ELECTED : Errors.FENCED_LEADER_EPOCH;
                        this.log.info("AlterPartition request from node {} for {}-{} completed the ongoing partition reassignment and triggered a leadership change. Returning {}.", new Object[]{request.brokerId(), topic.name, partitionId, error});
                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId).setErrorCode(error.code()));
                        continue;
                    }
                    if (change.removingReplicas() != null || change.addingReplicas() != null) {
                        this.log.info("AlterPartition request from node {} for {}-{} completed the ongoing partition reassignment.", new Object[]{request.brokerId(), topic.name, partitionId});
                    }
                }
                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId).setErrorCode(Errors.NONE.code()).setLeaderId(partition.leader).setIsr(Replicas.toList(partition.isr)).setLeaderRecoveryState(partition.leaderRecoveryState.value()).setLeaderEpoch(partition.leaderEpoch).setPartitionEpoch(partition.partitionEpoch));
            }
        }
        return ControllerResult.of(records, response);
    }

    private void buildClusterLinkState(Uuid topicId, int partitionId, AlterPartitionRequestData.ClusterLinkState clusterLinkState, PartitionChangeBuilder builder) {
        if (this.mirrorTopicControl.isMirrorTopic(topicId)) {
            builder.setLinkedLeaderEpoch(clusterLinkState.linkedLeaderEpoch());
            builder.setLinkFailed(clusterLinkState.linkFailed());
        } else if (clusterLinkState.linkedLeaderEpoch() != -1 || clusterLinkState.linkFailed()) {
            this.log.error("Ignoring cluster link data for non-mirrored topic {} partition {}.", (Object)topicId, (Object)partitionId);
        }
    }

    void unlinkMirrorTopic(Uuid topicId) {
        TopicControlInfo topic = this.topics.get(topicId);
        Iterator iterator = topic.parts.keySet().iterator();
        while (iterator.hasNext()) {
            int partitionId = (Integer)iterator.next();
            topic.parts.computeIfPresent(partitionId, (__, prevPartition) -> prevPartition.unlink());
        }
    }

    private Errors validateAlterPartitionData(int brokerId, TopicControlInfo topic, int partitionId, PartitionRegistration partition, short requestApiVersion, AlterPartitionRequestData.PartitionData partitionData) {
        if (partition == null) {
            this.log.info("Rejecting AlterPartition request for unknown partition {}-{}.", (Object)topic.name, (Object)partitionId);
            return Errors.UNKNOWN_TOPIC_OR_PARTITION;
        }
        if (partitionData.leaderEpoch() > partition.leaderEpoch) {
            this.log.debug("Rejecting AlterPartition request from node {} for {}-{} because the current leader epoch is {}, which is greater than the local value {}.", new Object[]{brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch()});
            return Errors.NOT_CONTROLLER;
        }
        if (partitionData.partitionEpoch() > partition.partitionEpoch) {
            this.log.debug("Rejecting AlterPartition request from node {} for {}-{} because the current partition epoch is {}, which is greater than the local value {}.", new Object[]{brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch()});
            return Errors.NOT_CONTROLLER;
        }
        if (partitionData.leaderEpoch() < partition.leaderEpoch) {
            this.log.debug("Rejecting AlterPartition request from node {} for {}-{} because the current leader epoch is {}, not {}.", new Object[]{brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch()});
            return Errors.FENCED_LEADER_EPOCH;
        }
        if (brokerId != partition.leader) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the current leader is {}.", new Object[]{brokerId, topic.name, partitionId, partition.leader});
            return Errors.INVALID_REQUEST;
        }
        if (partitionData.partitionEpoch() < partition.partitionEpoch) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the current partition epoch is {}, not {}.", new Object[]{brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch()});
            return Errors.INVALID_UPDATE_VERSION;
        }
        int[] newIsr = Replicas.toArray(partitionData.newIsr());
        if (!Replicas.validateIsr(partition.replicas, newIsr)) {
            this.log.error("Rejecting AlterPartition request from node {} for {}-{} because it specified an invalid ISR {}.", new Object[]{brokerId, topic.name, partitionId, partitionData.newIsr()});
            return Errors.INVALID_REQUEST;
        }
        if (!Replicas.contains(newIsr, partition.leader)) {
            this.log.error("Rejecting AlterPartition request from node {} for {}-{} because it specified an invalid ISR {} that doesn't include itself.", new Object[]{brokerId, topic.name, partitionId, partitionData.newIsr()});
            return Errors.INVALID_REQUEST;
        }
        LeaderRecoveryState leaderRecoveryState = LeaderRecoveryState.of(partitionData.leaderRecoveryState());
        if (partitionData.isUnclean()) {
            leaderRecoveryState = LeaderRecoveryState.RECOVERING;
        }
        if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && newIsr.length > 1) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the ISR {} had more than one replica while the leader was still recovering from an unclean leader election {}.", new Object[]{brokerId, topic.name, partitionId, partitionData.newIsr(), leaderRecoveryState});
            return Errors.INVALID_REQUEST;
        }
        if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED && leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the leader recovery state cannot change from RECOVERED to RECOVERING.", new Object[]{brokerId, topic.name, partitionId});
            return Errors.INVALID_REQUEST;
        }
        List<IneligibleReplica> ineligibleReplicas = this.ineligibleReplicasForIsr(newIsr);
        if (!ineligibleReplicas.isEmpty()) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because it specified ineligible replicas {} in the new ISR {}.", new Object[]{brokerId, topic.name, partitionId, ineligibleReplicas, partitionData.newIsr()});
            if (requestApiVersion > 1) {
                return Errors.INELIGIBLE_REPLICA;
            }
            return Errors.OPERATION_NOT_ATTEMPTED;
        }
        return Errors.NONE;
    }

    private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
        ArrayList<IneligibleReplica> ineligibleReplicas = new ArrayList<IneligibleReplica>(0);
        int[] nArray = replicas;
        int n = nArray.length;
        for (int i = 0; i < n; ++i) {
            Integer replicaId = nArray[i];
            BrokerRegistration registration = this.clusterControl.registration(replicaId);
            if (registration == null) {
                ineligibleReplicas.add(new IneligibleReplica(replicaId, "not registered"));
                continue;
            }
            if (registration.inControlledShutdown()) {
                ineligibleReplicas.add(new IneligibleReplica(replicaId, "shutting down"));
                continue;
            }
            if (!registration.fenced()) continue;
            ineligibleReplicas.add(new IneligibleReplica(replicaId, "fenced"));
        }
        return ineligibleReplicas;
    }

    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
        BrokerRegistration brokerRegistration = this.clusterControl.brokerRegistrations().get(brokerId);
        if (brokerRegistration == null) {
            throw new RuntimeException("Can't find broker registration for broker " + brokerId);
        }
        this.generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, -1, records, this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
        if (this.featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
            records.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).setFenced(BrokerRegistrationFencingChange.FENCE.value()), 0));
        } else {
            records.add(new ApiMessageAndVersion((ApiMessage)new FenceBrokerRecord().setId(brokerId).setEpoch(brokerRegistration.epoch()), 0));
        }
    }

    void handleBrokerUnregistered(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        this.generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, -1, records, this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
        records.add(new ApiMessageAndVersion((ApiMessage)new UnregisterBrokerRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), 0));
    }

    void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        if (this.featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
            records.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0));
        } else {
            records.add(new ApiMessageAndVersion((ApiMessage)new UnfenceBrokerRecord().setId(brokerId).setEpoch(brokerEpoch), 0));
        }
        this.generateLeaderAndIsrUpdates("handleBrokerUnfenced", -1, brokerId, records, this.brokersToIsrs.partitionsWithNoLeader());
    }

    void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        if (this.featureControl.metadataVersion().isInControlledShutdownStateSupported() && !this.clusterControl.inControlledShutdown(brokerId)) {
            records.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), 1));
        }
        this.generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]", brokerId, -1, records, this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
    }

    ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
        ElectionType electionType = ReplicationControlManager.electionType(request.electionType());
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ElectLeadersResponseData response = new ElectLeadersResponseData();
        if (request.topicPartitions() == null) {
            for (Map.Entry<String, Uuid> topicEntry : this.topicsByName.entrySet()) {
                String topicName = topicEntry.getKey();
                ElectLeadersResponseData.ReplicaElectionResult topicResults = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topicName);
                response.replicaElectionResults().add(topicResults);
                TopicControlInfo topic = this.topics.get(topicEntry.getValue());
                if (topic == null) continue;
                Iterator iterator = topic.parts.keySet().iterator();
                while (iterator.hasNext()) {
                    int partitionId = (Integer)iterator.next();
                    ApiError error = this.electLeader(topicName, partitionId, electionType, records);
                    if (error.error() == Errors.ELECTION_NOT_NEEDED) continue;
                    topicResults.partitionResult().add(new ElectLeadersResponseData.PartitionResult().setPartitionId(partitionId).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                }
            }
        } else {
            for (ElectLeadersRequestData.TopicPartitions topic : request.topicPartitions()) {
                ElectLeadersResponseData.ReplicaElectionResult topicResults = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic.topic());
                response.replicaElectionResults().add(topicResults);
                Iterator iterator = topic.partitions().iterator();
                while (iterator.hasNext()) {
                    int partitionId = (Integer)iterator.next();
                    ApiError error = this.electLeader(topic.topic(), partitionId, electionType, records);
                    topicResults.partitionResult().add(new ElectLeadersResponseData.PartitionResult().setPartitionId(partitionId).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                }
            }
        }
        return ControllerResult.of(records, response);
    }

    private static ElectionType electionType(byte electionType) {
        try {
            return ElectionType.valueOf((byte)electionType);
        }
        catch (IllegalArgumentException e) {
            throw new InvalidRequestException("Unknown election type " + electionType);
        }
    }

    ApiError electLeader(String topic, int partitionId, ElectionType electionType, List<ApiMessageAndVersion> records) {
        Uuid topicId = this.topicsByName.get(topic);
        if (topicId == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as " + topic);
        }
        TopicControlInfo topicInfo = this.topics.get(topicId);
        if (topicInfo == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic id as " + topicId);
        }
        PartitionRegistration partition = (PartitionRegistration)topicInfo.parts.get(partitionId);
        if (partition == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such partition as " + topic + "-" + partitionId);
        }
        if (electionType == ElectionType.PREFERRED && partition.hasPreferredLeader() || electionType == ElectionType.UNCLEAN && partition.hasLeader()) {
            return new ApiError(Errors.ELECTION_NOT_NEEDED);
        }
        PartitionChangeBuilder.Election election = PartitionChangeBuilder.Election.PREFERRED;
        if (electionType == ElectionType.UNCLEAN) {
            election = PartitionChangeBuilder.Election.UNCLEAN;
        }
        PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, partitionId, this.clusterControl::isActive, this.featureControl.metadataVersion().isLeaderRecoverySupported());
        builder.setElection(election);
        Optional<ApiMessageAndVersion> record = builder.build();
        if (!record.isPresent()) {
            if (electionType == ElectionType.PREFERRED) {
                return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
            }
            return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
        }
        records.add(record.get());
        return ApiError.NONE;
    }

    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
        int brokerId = request.brokerId();
        long brokerEpoch = request.brokerEpoch();
        this.clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
        BrokerHeartbeatManager heartbeatManager = this.clusterControl.heartbeatManager();
        BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId, request, registerBrokerRecordOffset, () -> this.brokersToIsrs.hasLeaderships(brokerId));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        if (states.current() != states.next()) {
            switch (states.next()) {
                case FENCED: {
                    this.handleBrokerFenced(brokerId, records);
                    break;
                }
                case UNFENCED: {
                    this.handleBrokerUnfenced(brokerId, brokerEpoch, records);
                    break;
                }
                case CONTROLLED_SHUTDOWN: {
                    this.handleBrokerInControlledShutdown(brokerId, brokerEpoch, records);
                    break;
                }
                case SHUTDOWN_NOW: {
                    this.handleBrokerFenced(brokerId, records);
                }
            }
        }
        heartbeatManager.touch(brokerId, states.next().fenced(), request.currentMetadataOffset());
        boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
        BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp, states.next().fenced(), states.next().inControlledShutdown(), states.next().shouldShutDown());
        return ControllerResult.of(records, reply);
    }

    public ControllerResult<Void> unregisterBroker(int brokerId) {
        BrokerRegistration registration = this.clusterControl.brokerRegistrations().get(brokerId);
        if (registration == null) {
            throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + " is not currently registered");
        }
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        this.handleBrokerUnregistered(brokerId, registration.epoch(), records);
        return ControllerResult.of(records, null);
    }

    ControllerResult<Void> maybeFenceOneStaleBroker() {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        BrokerHeartbeatManager heartbeatManager = this.clusterControl.heartbeatManager();
        heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
            this.log.info("Fencing broker {} because its session has timed out.", brokerId);
            this.handleBrokerFenced((int)brokerId, (List<ApiMessageAndVersion>)records);
            heartbeatManager.fence((int)brokerId);
        });
        return ControllerResult.of(records, null);
    }

    boolean arePartitionLeadersImbalanced() {
        return !this.imbalancedPartitions.isEmpty();
    }

    ControllerResult<Boolean> maybeBalancePartitionLeaders() {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        for (TopicIdPartition topicPartition : this.imbalancedPartitions) {
            if (records.size() >= this.maxElectionsPerImbalance) {
                return ControllerResult.of(records, true);
            }
            TopicControlInfo topic = this.topics.get(topicPartition.topicId());
            if (topic == null) {
                this.log.error("Skipping unknown imbalanced topic {}", (Object)topicPartition);
                continue;
            }
            PartitionRegistration partition = (PartitionRegistration)topic.parts.get(topicPartition.partitionId());
            if (partition == null) {
                this.log.error("Skipping unknown imbalanced partition {}", (Object)topicPartition);
                continue;
            }
            PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicPartition.topicId(), topicPartition.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion().isLeaderRecoverySupported());
            builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
            builder.build().ifPresent(records::add);
        }
        return ControllerResult.of(records, false);
    }

    ControllerResult<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(List<CreatePartitionsRequestData.CreatePartitionsTopic> topics, KafkaPrincipal principal) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ArrayList<CreatePartitionsResponseData.CreatePartitionsTopicResult> results = new ArrayList<CreatePartitionsResponseData.CreatePartitionsTopicResult>();
        for (CreatePartitionsRequestData.CreatePartitionsTopic topic : topics) {
            ApiError apiError = ApiError.NONE;
            try {
                this.createPartitions(topic, records, principal);
            }
            catch (ApiException e) {
                apiError = ApiError.fromThrowable((Throwable)e);
            }
            catch (Exception e) {
                this.log.error("Unexpected createPartitions error for {}", (Object)topic, (Object)e);
                apiError = ApiError.fromThrowable((Throwable)e);
            }
            results.add(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName(topic.name()).setErrorCode(apiError.error().code()).setErrorMessage(apiError.message()));
        }
        return ControllerResult.atomicOf(records, results);
    }

    void createPartitions(CreatePartitionsRequestData.CreatePartitionsTopic topic, List<ApiMessageAndVersion> records, KafkaPrincipal principal) {
        ArrayList isrs;
        List<Object> placements;
        Uuid topicId = this.topicsByName.get(topic.name());
        if (topicId == null) {
            throw new UnknownTopicOrPartitionException();
        }
        if (this.mirrorTopicControl.isMirrorTopic(topicId) && topic.assignments() != null) {
            throw new InvalidRequestException("Partition assignments specified for mirror topic " + topic.name() + " with id " + topicId);
        }
        TopicControlInfo topicInfo = this.topics.get(topicId);
        if (topicInfo == null) {
            throw new UnknownTopicOrPartitionException();
        }
        if (topic.count() == topicInfo.parts.size()) {
            throw new InvalidPartitionsException("Topic already has " + topicInfo.parts.size() + " partition(s).");
        }
        if (topic.count() < topicInfo.parts.size()) {
            throw new InvalidPartitionsException("The topic " + topic.name() + " currently has " + topicInfo.parts.size() + " partition(s); " + topic.count() + " would not be an increase.");
        }
        int additional = topic.count() - topicInfo.parts.size();
        if (topic.assignments() != null && topic.assignments().size() != additional) {
            throw new InvalidReplicaAssignmentException("Attempted to add " + additional + " additional partition(s), but only " + topic.assignments().size() + " assignment(s) were specified.");
        }
        Iterator iterator = topicInfo.parts.values().iterator();
        if (!iterator.hasNext()) {
            throw new UnknownServerException("Invalid state: topic " + topic.name() + " appears to have no partitions.");
        }
        PartitionRegistration partitionInfo = (PartitionRegistration)iterator.next();
        if (partitionInfo.replicas.length > Short.MAX_VALUE) {
            throw new UnknownServerException("Invalid replication factor " + partitionInfo.replicas.length + ": expected a number equal to less than " + Short.MAX_VALUE);
        }
        Set<Integer> excludedBrokerIds = this.clusterControl.activeBrokerReplicaExclusions().keySet();
        short replicationFactor = (short)partitionInfo.replicas.length;
        int startPartitionId = topicInfo.parts.size();
        if (topic.assignments() != null) {
            ApiError error;
            placements = new ArrayList();
            isrs = new ArrayList();
            for (int i = 0; i < topic.assignments().size(); ++i) {
                CreatePartitionsRequestData.CreatePartitionsAssignment assignment = (CreatePartitionsRequestData.CreatePartitionsAssignment)topic.assignments().get(i);
                this.validateManualPartitionAssignment(assignment.brokerIds(), OptionalInt.of(replicationFactor), excludedBrokerIds);
                placements.add(assignment.brokerIds());
                List isr = assignment.brokerIds().stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
                if (isr.isEmpty()) {
                    throw new InvalidReplicaAssignmentException("All brokers specified in the manual partition assignment for partition " + (startPartitionId + i) + " are fenced or in controlled shutdown.");
                }
                isrs.add(isr);
            }
            if (this.applyCreateTopicsPolicyToCreatePartitions && (error = this.maybeCheckCreateTopicPolicy(() -> {
                HashMap<Integer, List> placementMap = new HashMap<Integer, List>();
                int i = startPartitionId;
                for (List placement : placements) {
                    placementMap.put(i, placement);
                    ++i;
                }
                return new CreateTopicPolicy.RequestMetadata(topic.name(), null, null, placementMap, this.configurationControl.getTopicConfigs(topicInfo.name));
            })).isFailure()) {
                throw error.exception();
            }
        } else {
            ApiError error;
            placements = this.clusterControl.replicaPlacer().place(new PlacementSpec(startPartitionId, additional, replicationFactor, topic.name(), principal, this.clusterControl.activeBrokerReplicaExclusions().keySet()), this.clusterDescriber);
            isrs = placements;
            if (this.applyCreateTopicsPolicyToCreatePartitions && (error = this.maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(topic.name(), Integer.valueOf(additional), Short.valueOf(replicationFactor), null, this.configurationControl.getTopicConfigs(topicInfo.name)))).isFailure()) {
                throw error.exception();
            }
        }
        int partitionId = startPartitionId;
        for (int i = 0; i < placements.size(); ++i) {
            List replicas = (List)placements.get(i);
            List<Integer> isr = ((List)isrs.get(i)).stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
            if (isr.isEmpty()) {
                throw new InvalidReplicationFactorException("Unable to replicate the partition " + replicationFactor + " time(s): All brokers are currently fenced or in controlled shutdown.");
            }
            records.add(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(partitionId).setTopicId(topicId).setReplicas(replicas).setIsr(isr).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(isr.get(0)).setLeaderEpoch(0).setPartitionEpoch(0), MetadataRecordType.PARTITION_RECORD.highestSupportedVersion()));
            ++partitionId;
        }
    }

    void validateManualPartitionAssignment(List<Integer> assignment, OptionalInt replicationFactor, Set<Integer> excludedBrokerIds) {
        if (assignment.isEmpty()) {
            throw new InvalidReplicaAssignmentException("The manual partition assignment includes an empty replica list.");
        }
        ArrayList<Integer> sortedBrokerIds = new ArrayList<Integer>(assignment);
        sortedBrokerIds.sort(Integer::compare);
        Integer prevBrokerId = null;
        for (Integer brokerId : sortedBrokerIds) {
            if (!this.clusterControl.brokerRegistrations().containsKey(brokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes broker " + brokerId + ", but no such broker is registered.");
            }
            if (brokerId.equals(prevBrokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes the broker " + prevBrokerId + " more than once.");
            }
            if (excludedBrokerIds.contains(brokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes the broker " + brokerId + " which is excluded from replica placement.");
            }
            prevBrokerId = brokerId;
        }
        if (replicationFactor.isPresent() && sortedBrokerIds.size() != replicationFactor.getAsInt()) {
            throw new InvalidReplicaAssignmentException("The manual partition assignment includes a partition with " + sortedBrokerIds.size() + " replica(s), but this is not consistent with previous partitions, which have " + replicationFactor.getAsInt() + " replica(s).");
        }
    }

    void generateLeaderAndIsrUpdates(String context, int brokerToRemove, int brokerToAdd, List<ApiMessageAndVersion> records, Iterator<TopicIdPartition> iterator) {
        int oldSize = records.size();
        IntPredicate isAcceptableLeader = r -> r != brokerToRemove && (r == brokerToAdd || this.clusterControl.isActive(r));
        while (iterator.hasNext()) {
            TopicIdPartition topicIdPart = iterator.next();
            TopicControlInfo topic = this.topics.get(topicIdPart.topicId());
            if (topic == null) {
                throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in isrMembers, but not in the topics map.");
            }
            PartitionRegistration partition = (PartitionRegistration)topic.parts.get(topicIdPart.partitionId());
            if (partition == null) {
                throw new RuntimeException("Partition " + topicIdPart + " existed in isrMembers, but not in the partitions map.");
            }
            PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicIdPart.topicId(), topicIdPart.partitionId(), isAcceptableLeader, this.featureControl.metadataVersion().isLeaderRecoverySupported());
            if (this.configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
            }
            builder.setTargetIsr(Replicas.toList(Replicas.copyWithout(partition.isr, brokerToRemove)));
            builder.build().ifPresent(records::add);
        }
        if (records.size() != oldSize) {
            if (this.log.isDebugEnabled()) {
                StringBuilder bld = new StringBuilder();
                String prefix = "";
                ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize);
                while (iter.hasNext()) {
                    ApiMessageAndVersion apiMessageAndVersion = iter.next();
                    PartitionChangeRecord record = (PartitionChangeRecord)apiMessageAndVersion.message();
                    bld.append(prefix).append(this.topics.get(record.topicId()).name).append("-").append(record.partitionId());
                    prefix = ", ";
                }
                this.log.debug("{}: changing partition(s): {}", (Object)context, (Object)bld.toString());
            } else if (this.log.isInfoEnabled()) {
                this.log.info("{}: changing {} partition(s)", (Object)context, (Object)(records.size() - oldSize));
            }
        }
    }

    ControllerResult<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        AlterPartitionReassignmentsResponseData result = new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
        int successfulAlterations = 0;
        int totalAlterations = 0;
        for (AlterPartitionReassignmentsRequestData.ReassignableTopic topic : request.topics()) {
            AlterPartitionReassignmentsResponseData.ReassignableTopicResponse topicResponse = new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topic.name());
            for (AlterPartitionReassignmentsRequestData.ReassignablePartition partition : topic.partitions()) {
                ApiError error = ApiError.NONE;
                try {
                    this.alterPartitionReassignment(topic.name(), partition, records);
                    ++successfulAlterations;
                }
                catch (Throwable e) {
                    this.log.info("Unable to alter partition reassignment for " + topic.name() + ":" + partition.partitionIndex() + " because of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
                    error = ApiError.fromThrowable((Throwable)e);
                }
                ++totalAlterations;
                topicResponse.partitions().add(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(partition.partitionIndex()).setErrorCode(error.error().code()).setErrorMessage(error.message()));
            }
            result.responses().add(topicResponse);
        }
        this.log.info("Successfully altered {} out of {} partition reassignment(s).", (Object)successfulAlterations, (Object)totalAlterations);
        return ControllerResult.atomicOf(records, result);
    }

    void alterPartitionReassignment(String topicName, AlterPartitionReassignmentsRequestData.ReassignablePartition target, List<ApiMessageAndVersion> records) {
        Uuid topicId = this.topicsByName.get(topicName);
        if (topicId == null) {
            throw new UnknownTopicOrPartitionException("Unable to find a topic named " + topicName + ".");
        }
        TopicControlInfo topicInfo = this.topics.get(topicId);
        if (topicInfo == null) {
            throw new UnknownTopicOrPartitionException("Unable to find a topic with ID " + topicId + ".");
        }
        TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex());
        PartitionRegistration part = (PartitionRegistration)topicInfo.parts.get(target.partitionIndex());
        if (part == null) {
            throw new UnknownTopicOrPartitionException("Unable to find partition " + topicName + ":" + target.partitionIndex() + ".");
        }
        Optional<ApiMessageAndVersion> record = target.replicas() == null ? this.cancelPartitionReassignment(topicName, tp, part) : this.changePartitionReassignment(tp, part, target);
        record.ifPresent(records::add);
    }

    Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName, TopicIdPartition tp, PartitionRegistration part) {
        if (!part.isReassigning()) {
            throw new NoReassignmentInProgressException(Errors.NO_REASSIGNMENT_IN_PROGRESS.message());
        }
        PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
        if (revert.unclean() && !this.configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
            throw new InvalidReplicaAssignmentException("Unable to revert partition assignment for " + topicName + ":" + tp.partitionId() + " because it would require an unclean leader election.");
        }
        PartitionChangeBuilder builder = new PartitionChangeBuilder(part, tp.topicId(), tp.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion().isLeaderRecoverySupported());
        if (this.configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
            builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
        }
        builder.setTargetIsr(revert.isr()).setTargetReplicas(revert.replicas()).setTargetRemoving(Collections.emptyList()).setTargetAdding(Collections.emptyList());
        return builder.build();
    }

    Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp, PartitionRegistration part, AlterPartitionReassignmentsRequestData.ReassignablePartition target) {
        List<Integer> currentReplicas = Replicas.toList(part.replicas);
        HashSet<Integer> excludedBrokersToValidateAgainst = new HashSet<Integer>(this.clusterControl.activeBrokerReplicaExclusions().keySet());
        excludedBrokersToValidateAgainst.removeAll(currentReplicas);
        this.validateManualPartitionAssignment(target.replicas(), OptionalInt.empty(), excludedBrokersToValidateAgainst);
        PartitionReassignmentReplicas reassignment = new PartitionReassignmentReplicas(currentReplicas, target.replicas());
        PartitionChangeBuilder builder = new PartitionChangeBuilder(part, tp.topicId(), tp.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion().isLeaderRecoverySupported());
        if (!reassignment.merged().equals(currentReplicas)) {
            builder.setTargetReplicas(reassignment.merged());
        }
        if (!reassignment.removing().isEmpty()) {
            builder.setTargetRemoving(reassignment.removing());
        }
        if (!reassignment.adding().isEmpty()) {
            builder.setTargetAdding(reassignment.adding());
        }
        return builder.build();
    }

    ListPartitionReassignmentsResponseData listPartitionReassignments(List<ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics> topicList) {
        ListPartitionReassignmentsResponseData response = new ListPartitionReassignmentsResponseData().setErrorMessage(null);
        if (topicList == null) {
            for (Map.Entry<Uuid, int[]> entry : this.reassigningTopics.entrySet()) {
                this.listReassigningTopic(response, entry.getKey(), Replicas.toList(entry.getValue()));
            }
        } else {
            for (ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics topic : topicList) {
                Uuid topicId = this.topicsByName.get(topic.name());
                if (topicId == null) continue;
                this.listReassigningTopic(response, topicId, topic.partitionIndexes());
            }
        }
        return response;
    }

    private void listReassigningTopic(ListPartitionReassignmentsResponseData response, Uuid topicId, List<Integer> partitionIds) {
        TopicControlInfo topicInfo = this.topics.get(topicId);
        if (topicInfo == null) {
            return;
        }
        ListPartitionReassignmentsResponseData.OngoingTopicReassignment ongoingTopic = new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topicInfo.name);
        for (int partitionId : partitionIds) {
            Optional<ListPartitionReassignmentsResponseData.OngoingPartitionReassignment> ongoing = this.getOngoingPartitionReassignment(topicInfo, partitionId);
            if (!ongoing.isPresent()) continue;
            ongoingTopic.partitions().add(ongoing.get());
        }
        if (!ongoingTopic.partitions().isEmpty()) {
            response.topics().add(ongoingTopic);
        }
    }

    private Optional<ListPartitionReassignmentsResponseData.OngoingPartitionReassignment> getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
        PartitionRegistration partition = (PartitionRegistration)topicInfo.parts.get(partitionId);
        if (partition == null || !partition.isReassigning()) {
            return Optional.empty();
        }
        return Optional.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setAddingReplicas(Replicas.toList(partition.addingReplicas)).setRemovingReplicas(Replicas.toList(partition.removingReplicas)).setPartitionIndex(partitionId).setReplicas(Replicas.toList(partition.replicas)));
    }

    ReplicationControlIterator iterator(long epoch) {
        return new ReplicationControlIterator(epoch);
    }

    private void updateConfluentPartitionsAndTopicsPerTopicListener(String topic, int numPartitionsChange, int numTopicsChange) {
        this.createTopicPolicy.ifPresent(policy -> {
            if (policy instanceof ConfluentPartitionsPerTopicListener) {
                ConfluentPartitionsPerTopicListener listener = (ConfluentPartitionsPerTopicListener)policy;
                listener.partialUpdate(topic, numPartitionsChange, numTopicsChange);
            }
        });
    }

    void resetConfluentPartitionsPerTopicListener() {
        this.createTopicPolicy.ifPresent(policy -> {
            if (policy instanceof ConfluentPartitionsPerTopicListener) {
                ConfluentPartitionsPerTopicListener listener = (ConfluentPartitionsPerTopicListener)policy;
                listener.fullUpdate(new Iterator<Map.Entry<String, Integer>>(){
                    Iterator topicIterator;
                    {
                        this.topicIterator = ReplicationControlManager.this.topics.values().iterator();
                    }

                    @Override
                    public boolean hasNext() {
                        return this.topicIterator.hasNext();
                    }

                    @Override
                    public Map.Entry<String, Integer> next() {
                        TopicControlInfo topicInfo = (TopicControlInfo)this.topicIterator.next();
                        return new AbstractMap.SimpleImmutableEntry<String, Integer>(topicInfo.name, topicInfo.parts.size());
                    }
                });
            }
        });
    }

    private static final class IneligibleReplica {
        private final int replicaId;
        private final String reason;

        private IneligibleReplica(int replicaId, String reason) {
            this.replicaId = replicaId;
            this.reason = reason;
        }

        public String toString() {
            return this.replicaId + " (" + this.reason + ")";
        }
    }

    class ReplicationControlIterator
    implements Iterator<List<ApiMessageAndVersion>> {
        private final long epoch;
        private final Iterator<TopicControlInfo> iterator;

        ReplicationControlIterator(long epoch) {
            this.epoch = epoch;
            this.iterator = ReplicationControlManager.this.topics.values(epoch).iterator();
        }

        @Override
        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        @Override
        public List<ApiMessageAndVersion> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            TopicControlInfo topic = this.iterator.next();
            ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
            records.add(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName(topic.name).setTopicId(topic.id), MetadataRecordType.TOPIC_RECORD.highestSupportedVersion()));
            ReplicationControlManager.this.mirrorTopicControl.snapshotRecord(topic.id, topic.name, this.epoch, records::add);
            for (Map.Entry entry : topic.parts.entrySet(this.epoch)) {
                records.add(((PartitionRegistration)entry.getValue()).toRecord(topic.id, (Integer)entry.getKey()));
            }
            return records;
        }
    }

    static class TopicControlInfo {
        private final String name;
        private final Uuid id;
        private final TimelineHashMap<Integer, PartitionRegistration> parts;

        TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
            this.name = name;
            this.id = id;
            this.parts = new TimelineHashMap(snapshotRegistry, 0);
        }

        public String name() {
            return this.name;
        }

        public Uuid topicId() {
            return this.id;
        }
    }

    class KRaftClusterDescriber
    implements ClusterDescriber {
        KRaftClusterDescriber() {
        }

        @Override
        public Iterator<UsableBroker> usableBrokers() {
            return ReplicationControlManager.this.clusterControl.usableBrokers();
        }

        @Override
        public Iterator<String> topicNames() {
            return Collections.unmodifiableCollection(ReplicationControlManager.this.topicsByName.keySet()).iterator();
        }

        @Override
        public List<List<Integer>> replicasForTopicName(String topicName) {
            Uuid id = (Uuid)ReplicationControlManager.this.topicsByName.get(topicName);
            if (id == null) {
                return Collections.emptyList();
            }
            TopicControlInfo topicInfo = (TopicControlInfo)ReplicationControlManager.this.topics.get(id);
            if (topicInfo == null) {
                return Collections.emptyList();
            }
            ArrayList<Map.Entry> partInfo = new ArrayList<Map.Entry>();
            for (Map.Entry entry : topicInfo.parts.entrySet()) {
                PartitionRegistration registration = (PartitionRegistration)entry.getValue();
                partInfo.add(new AbstractMap.SimpleImmutableEntry(entry.getKey(), Replicas.toList(registration.replicas)));
            }
            partInfo.sort(Comparator.comparingInt(Map.Entry::getKey));
            ArrayList<List<Integer>> results = new ArrayList<List<Integer>>();
            for (Map.Entry entry : partInfo) {
                results.add((List<Integer>)entry.getValue());
            }
            return results;
        }
    }

    static class Builder {
        private SnapshotRegistry snapshotRegistry = null;
        private LogContext logContext = null;
        private short defaultReplicationFactor = (short)3;
        private int defaultNumPartitions = 1;
        private int maxElectionsPerImbalance = 1000;
        private ConfigurationControlManager configurationControl = null;
        private ClusterControlManager clusterControl = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private FeatureControlManager featureControl = null;
        private boolean applyCreateTopicsPolicyToCreatePartitions = false;
        private MirrorTopicControlManager mirrorControl = null;

        Builder() {
        }

        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
            this.defaultReplicationFactor = defaultReplicationFactor;
            return this;
        }

        Builder setDefaultNumPartitions(int defaultNumPartitions) {
            this.defaultNumPartitions = defaultNumPartitions;
            return this;
        }

        Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
            this.maxElectionsPerImbalance = maxElectionsPerImbalance;
            return this;
        }

        Builder setConfigurationControl(ConfigurationControlManager configurationControl) {
            this.configurationControl = configurationControl;
            return this;
        }

        Builder setClusterControl(ClusterControlManager clusterControl) {
            this.clusterControl = clusterControl;
            return this;
        }

        Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
            this.createTopicPolicy = createTopicPolicy;
            return this;
        }

        Builder setApplyCreateTopicsPolicyToCreatePartitions(boolean applyCreateTopicsPolicyToCreatePartitions) {
            this.applyCreateTopicsPolicyToCreatePartitions = applyCreateTopicsPolicyToCreatePartitions;
            return this;
        }

        Builder setMirrorTopicControl(MirrorTopicControlManager mirrorControl) {
            this.mirrorControl = mirrorControl;
            return this;
        }

        public Builder setFeatureControl(FeatureControlManager featureControl) {
            this.featureControl = featureControl;
            return this;
        }

        ReplicationControlManager build() {
            if (this.configurationControl == null) {
                throw new IllegalStateException("Configuration control must be set before building");
            }
            if (this.clusterControl == null) {
                throw new IllegalStateException("Cluster controller must be set before building");
            }
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.mirrorControl == null) {
                this.mirrorControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.empty(), __ -> Optional.empty());
            }
            if (this.snapshotRegistry == null) {
                this.snapshotRegistry = this.configurationControl.snapshotRegistry();
            }
            if (this.featureControl == null) {
                this.featureControl = new FeatureControlManager.Builder().setLogContext(this.logContext).setSnapshotRegistry(this.snapshotRegistry).setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), QuorumFeatures.defaultFeatureMap(), Collections.singletonList(0))).setMetadataVersion(MetadataVersion.latest()).build();
            }
            return new ReplicationControlManager(this.snapshotRegistry, this.logContext, this.defaultReplicationFactor, this.defaultNumPartitions, this.maxElectionsPerImbalance, this.configurationControl, this.clusterControl, this.createTopicPolicy, this.featureControl, this.applyCreateTopicsPolicyToCreatePartitions, this.mirrorControl);
        }
    }
}

