/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.time.OffsetDateTime;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.BrokerComponent;
import org.apache.kafka.clients.admin.ComponentHealthStatus;
import org.apache.kafka.common.Cell;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.BrokerRegistrationDoesNotContainEncryptorSecretException;
import org.apache.kafka.common.errors.ControllerRegistrationDoesNotContainActiveSecretException;
import org.apache.kafka.common.errors.DemotionLimitReachedException;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterBrokerHealthRequestData;
import org.apache.kafka.common.message.AlterBrokerHealthResponseData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerReplicaExclusionRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.InstallMetadataEncryptorRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterBrokerHealthRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.BrokerIdAndEpoch;
import org.apache.kafka.controller.CellControlManager;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.EncryptionControlManager;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.DegradedBrokerHealthState;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.ListenerInfo;
import org.apache.kafka.metadata.MetadataEncryptorFactory;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.AlterReplicaExclusionOp;
import org.apache.kafka.server.common.AlterReplicaExclusionsReply;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.BrokerReplicaExclusionModificationResult;
import org.apache.kafka.server.common.BrokerReplicaExclusionResult;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.util.BrokerReplicaExclusionUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

public class ClusterControlManager {
    static final long DEFAULT_SESSION_TIMEOUT_NS = TimeUnit.NANOSECONDS.convert(9L, TimeUnit.SECONDS);
    private static final short BROKER_REGISTRATION_CHANGE_RECORD_VERSION = 1;
    private final LogContext logContext;
    private final String clusterId;
    private final Logger log;
    private final Time time;
    private final long sessionTimeoutNs;
    private final ReplicaPlacer replicaPlacer;
    private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
    private final TimelineHashSet<Integer> demotedBrokers;
    private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;
    private final TimelineHashMap<Integer, String> brokerReplicaExclusions;
    private volatile BrokerHeartbeatManager heartbeatManager;
    private Optional<ReadyBrokersFuture> readyBrokersFuture;
    private final FeatureControlManager featureControl;
    private final BrokerShutdownHandler brokerShutdownHandler;
    private Map<String, String> rackMigrationMap;
    private final TimelineHashMap<Integer, ControllerRegistration> controllerRegistrations;
    private final TimelineHashMap<Uuid, Integer> directoryToBroker;
    private final CellControlManager cellControl;
    private final EncryptionControlManager encryptionControl;
    private final QuorumControllerMetrics metrics;

    private ClusterControlManager(LogContext logContext, String clusterId, Time time, SnapshotRegistry snapshotRegistry, long sessionTimeoutNs, ReplicaPlacer replicaPlacer, FeatureControlManager featureControl, CellControlManager cellControl, EncryptionControlManager encryptionControl, Map<String, String> rackMigrationMap, BrokerShutdownHandler brokerShutdownHandler, QuorumControllerMetrics metrics) {
        this.logContext = logContext;
        this.clusterId = clusterId;
        this.log = logContext.logger(ClusterControlManager.class);
        this.time = time;
        this.sessionTimeoutNs = sessionTimeoutNs;
        this.replicaPlacer = replicaPlacer;
        this.brokerRegistrations = new TimelineHashMap(snapshotRegistry, 0);
        this.demotedBrokers = new TimelineHashSet(snapshotRegistry, 0);
        this.registerBrokerRecordOffsets = new TimelineHashMap(snapshotRegistry, 0);
        this.brokerReplicaExclusions = new TimelineHashMap(snapshotRegistry, 0);
        this.heartbeatManager = null;
        this.readyBrokersFuture = Optional.empty();
        this.featureControl = featureControl;
        this.controllerRegistrations = new TimelineHashMap(snapshotRegistry, 0);
        this.directoryToBroker = new TimelineHashMap(snapshotRegistry, 0);
        this.cellControl = cellControl;
        this.encryptionControl = encryptionControl;
        this.rackMigrationMap = rackMigrationMap;
        this.brokerShutdownHandler = brokerShutdownHandler;
        this.metrics = metrics;
    }

    ReplicaPlacer replicaPlacer() {
        return this.replicaPlacer;
    }

    public void activate() {
        this.heartbeatManager = new BrokerHeartbeatManager(this.logContext, this.time, this.sessionTimeoutNs);
        long nowNs = this.time.nanoseconds();
        for (BrokerRegistration registration : this.brokerRegistrations.values()) {
            int brokerId = registration.id();
            this.heartbeatManager.register(brokerId, registration.fenced());
            this.metrics.addTimeSinceLastHeartbeatMetric(brokerId);
            if (registration.fenced()) continue;
            this.heartbeatManager.tracker().updateContactTime(new BrokerIdAndEpoch(brokerId, registration.epoch()), nowNs);
        }
    }

    String clusterId() {
        return this.clusterId;
    }

    public void deactivate() {
        this.heartbeatManager = null;
        this.metrics.removeTimeSinceLastHeartbeatMetrics();
    }

    Map<Integer, BrokerRegistration> brokerRegistrations() {
        return this.brokerRegistrations;
    }

    Set<Integer> cellBrokers(int cellId) {
        return this.cellControl.getCell(cellId).map(Cell::brokers).orElse(Set.of());
    }

    Map<Integer, String> activeBrokerReplicaExclusions() {
        return Collections.unmodifiableMap(this.brokerReplicaExclusions);
    }

    Map<Integer, String> activeBrokerReplicaExclusionsAndCells() {
        Map<Integer, String> exclusions = this.cellControl.brokersInQuarantinedAndExcludedCells().stream().collect(Collectors.toMap(brokerId -> brokerId, brokerId -> "Broker is in a quarantined or excluded cell."));
        this.brokerReplicaExclusions.forEach(exclusions::put);
        return Collections.unmodifiableMap(exclusions);
    }

    Set<Integer> demotedBrokers() {
        return Collections.unmodifiableSet(this.demotedBrokers);
    }

    public Map<Integer, Set<DegradedBrokerHealthState>> activeBrokerComponentDegradations() {
        HashMap degradations = new HashMap();
        this.demotedBrokers.forEach(brokerId -> {
            Set<DegradedBrokerHealthState> degradedComponents = ((BrokerRegistration)this.brokerRegistrations.get(brokerId)).degradedComponents();
            if (degradedComponents != null && !degradedComponents.isEmpty()) {
                degradations.put(brokerId, degradedComponents);
            }
        });
        return Collections.unmodifiableMap(degradations);
    }

    public ControllerResult<BrokerRegistrationReply> registerBroker(BrokerRegistrationRequestData request, long newBrokerEpoch, FinalizedControllerFeatures finalizedFeatures, boolean isNetworkHealthManagerEnabled, boolean cleanShutdownDetectionEnabled) {
        if (this.heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        if (!this.clusterId.equals(request.clusterId())) {
            throw new InconsistentClusterIdException("Expected cluster ID " + this.clusterId + ", but got cluster ID " + request.clusterId());
        }
        int brokerId = request.brokerId();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        BrokerRegistration existing = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        Uuid prevIncarnationId = null;
        long storedBrokerEpoch = -2L;
        if (existing != null) {
            prevIncarnationId = existing.incarnationId();
            storedBrokerEpoch = existing.epoch();
            if (this.heartbeatManager.hasValidSession(brokerId, existing.epoch()) && !request.incarnationId().equals((Object)prevIncarnationId)) {
                throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id.");
            }
        }
        if (request.isMigratingZkBroker()) {
            throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
        }
        if (this.featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
            if (request.logDirs().isEmpty()) {
                throw new InvalidRegistrationException("No directories specified in request");
            }
            if (request.logDirs().stream().anyMatch(DirectoryId::reserved)) {
                throw new InvalidRegistrationException("Reserved directory ID in request");
            }
            HashSet set = new HashSet(request.logDirs());
            if (set.size() != request.logDirs().size()) {
                throw new InvalidRegistrationException("Duplicate directory ID in request");
            }
            for (Uuid directory : request.logDirs()) {
                Integer dirBrokerId = (Integer)this.directoryToBroker.get((Object)directory);
                if (dirBrokerId == null || dirBrokerId == brokerId) continue;
                throw new InvalidRegistrationException("Broker " + dirBrokerId + " is already registered with directory " + String.valueOf(directory));
            }
        }
        ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRequest(request.listeners());
        List<RegisterBrokerRecord.Encryptor> brokerRecordMetadataEncryptors = null;
        Optional<Short> confluentMetadataLevel = finalizedFeatures.get("confluent.metadata.version");
        if (!confluentMetadataLevel.isPresent()) {
            confluentMetadataLevel = Optional.of(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel());
        }
        if (MetadataVersion.fromConfluentFeatureLevel((short)confluentMetadataLevel.get()).isSettingBrokerMetadataEncryptorsSupported()) {
            brokerRecordMetadataEncryptors = this.getBrokerRecordMetadataEncryptorsOrLegacyEncryptors(request);
        }
        this.maybeRejectBrokerRegistrationDueToInvalidEncryptor(brokerId, brokerRecordMetadataEncryptors);
        RegisterBrokerRecord record = new RegisterBrokerRecord().setBrokerId(brokerId).setIsMigratingZkBroker(request.isMigratingZkBroker()).setIncarnationId(request.incarnationId()).setRack(request.rack()).setEndPoints(listenerInfo.toBrokerRegistrationRecord()).setMetadataEncryptors(brokerRecordMetadataEncryptors);
        if (this.demotedBrokers.contains((Object)brokerId)) {
            record.setDegradedComponents(DegradedBrokerHealthState.toRegisterBrokerRecordDegradedComponent(this.activeBrokerComponentDegradations().get(brokerId)));
        }
        if (isNetworkHealthManagerEnabled && !request.incarnationId().equals((Object)prevIncarnationId) && this.featureControl.metadataVersionOrThrow().isExternalConnectivitySupported()) {
            this.addStartupDemotionOnBroker(record);
        }
        HashMap<String, Short> unverifiedFeatures = new HashMap<String, Short>(finalizedFeatures.featureMap());
        for (BrokerRegistrationRequestData.Feature feature2 : request.features()) {
            record.features().add(this.processRegistrationFeature(brokerId, finalizedFeatures, feature2));
            unverifiedFeatures.remove(feature2.name());
        }
        HashMap ranges = new HashMap();
        record.features().forEach(feature -> ranges.put(feature.name(), VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion())));
        VersionRange confluentRange = (VersionRange)ranges.get("confluent.metadata.version");
        VersionRange apacheRange = (VersionRange)ranges.get("metadata.version");
        if (confluentRange != null) {
            short max;
            short min = confluentRange.min();
            if (min >= 1 && min <= 7) {
                min = (short)(min + 100);
            }
            if ((max = confluentRange.max()) >= 1 && max <= 7) {
                max = (short)(max + 100);
            }
            if (!(confluentRange = VersionRange.of(min, max)).contains(confluentMetadataLevel.get())) {
                this.throwUnsupportedVersionDueToIncompatibleFeature(brokerId, "confluent.metadata.version", confluentRange, confluentMetadataLevel.get());
            }
        } else {
            if (apacheRange != null) {
                throw new UnsupportedVersionException("Unable to register broker " + brokerId + " because it does not support confluent.metadata.version. Please upgrade your broker to Confluent Platform, or downgrade your controller to Apache Kafka. Brokers must be upgraded first in a rolling upgrade from Apache to CP.");
            }
            throw new InvalidRegistrationException("Request features do not contain 'confluent.metadata.version'");
        }
        unverifiedFeatures.remove("confluent.metadata.version");
        unverifiedFeatures.forEach((featureName, finalizedVersion) -> {
            if (finalizedVersion != 0 && request.features().findAll(featureName).isEmpty()) {
                this.processRegistrationFeature(brokerId, finalizedFeatures, new BrokerRegistrationRequestData.Feature().setName(featureName).setMinSupportedVersion((short)0).setMaxSupportedVersion((short)0));
            }
        });
        if (this.featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
            record.setLogDirs(request.logDirs());
        }
        if (!request.incarnationId().equals((Object)prevIncarnationId)) {
            int prevNumRecords = records.size();
            boolean isCleanShutdown = cleanShutdownDetectionEnabled ? storedBrokerEpoch == request.previousBrokerEpoch() : false;
            this.brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records);
            int numRecordsAdded = records.size() - prevNumRecords;
            if (existing == null) {
                this.log.info("No previous registration found for broker {}. New incarnation ID is {}.  Generated {} record(s) to clean up previous incarnations. New broker epoch is {}.", new Object[]{brokerId, request.incarnationId(), numRecordsAdded, newBrokerEpoch});
                this.metrics.addTimeSinceLastHeartbeatMetric(brokerId);
            } else {
                this.log.info("Registering a new incarnation of broker {}. Previous incarnation ID was {}; new incarnation ID is {}. Generated {} record(s) to clean up previous incarnations. Broker epoch will become {}.", new Object[]{brokerId, existing.incarnationId(), request.incarnationId(), numRecordsAdded, newBrokerEpoch});
            }
            record.setBrokerEpoch(newBrokerEpoch);
        } else {
            this.log.info("Amending registration of broker {}, incarnation ID {}. Broker epoch remains {}.", new Object[]{request.brokerId(), request.incarnationId(), existing.epoch()});
            record.setFenced(existing.fenced());
            record.setInControlledShutdown(existing.inControlledShutdown());
            record.setBrokerEpoch(existing.epoch());
        }
        records.add(new ApiMessageAndVersion((ApiMessage)record, this.featureControl.metadataVersionOrThrow().registerBrokerRecordVersion()));
        this.brokerSpecificMaybeAddRecordToRotateEncryptor(brokerId, brokerRecordMetadataEncryptors, records);
        if (this.cellControl.isImplicitCellCreationEnabled()) {
            this.cellControl.createCellForBroker(brokerId, records::add);
        }
        if (!request.incarnationId().equals((Object)prevIncarnationId)) {
            this.heartbeatManager.remove(brokerId);
        }
        this.heartbeatManager.register(brokerId, record.fenced());
        return ControllerResult.of(records, new BrokerRegistrationReply(record.brokerEpoch()));
    }

    public void unregisterBroker(int brokerId, Consumer<ApiMessageAndVersion> recordConsumer) {
        this.cellControl.unregisterBroker(brokerId, recordConsumer);
    }

    private void addStartupDemotionOnBroker(RegisterBrokerRecord record) {
        if (!record.degradedComponents().stream().anyMatch(component -> component.reason() == AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason)) {
            this.log.info("Adding {} demotion on registration for broker {}", (Object)AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason, (Object)record.brokerId());
            record.degradedComponents().add(new RegisterBrokerRecord.DegradedComponent().setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason).setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()));
        }
    }

    private boolean brokerRegistrationContainsActiveEncryptor(List<RegisterBrokerRecord.Encryptor> brokerRecordMetadataEncryptors) {
        return brokerRecordMetadataEncryptors.stream().map(RegisterBrokerRecord.Encryptor::encryptorId).collect(Collectors.toSet()).contains(this.encryptionControl.activeEncryptorId());
    }

    private void maybeRejectBrokerRegistrationDueToInvalidEncryptor(int brokerId, List<RegisterBrokerRecord.Encryptor> brokerRecordMetadataEncryptors) {
        if (this.encryptionControl != null && this.encryptionControl.isEncryptorRequired().booleanValue() && (brokerRecordMetadataEncryptors == null || brokerRecordMetadataEncryptors.isEmpty())) {
            throw new BrokerRegistrationDoesNotContainEncryptorSecretException("Broker Registration with broker ID " + brokerId + " does not contain any encryptor secret while the confluent.metadata.encryptor.required is true");
        }
        if (this.encryptionControl != null && brokerRecordMetadataEncryptors != null && !this.encryptionControl.activeEncryptorId().equals((Object)Uuid.ZERO_UUID) && !this.brokerRegistrationContainsActiveEncryptor(brokerRecordMetadataEncryptors)) {
            throw new BrokerRegistrationDoesNotContainEncryptorSecretException("Broker Registration with broker ID " + brokerId + " does not contain the active secret " + String.valueOf(this.encryptionControl.activeEncryptorId()));
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private List<RegisterBrokerRecord.Encryptor> getBrokerRecordMetadataEncryptorsOrLegacyEncryptors(BrokerRegistrationRequestData request) {
        ArrayList<RegisterBrokerRecord.Encryptor> brokerRecordEncryptors = new ArrayList<RegisterBrokerRecord.Encryptor>();
        if (request.metadataEncryptors() == null) {
            if (this.encryptionControl == null) return null;
            MetadataEncryptorFactory currentMetadataEncryptorFactory = this.encryptionControl.getMetadataEncryptorFactorySupplier().get();
            currentMetadataEncryptorFactory.legacyEncryptorIds().forEach(legacyEncryptorId -> brokerRecordEncryptors.add(new RegisterBrokerRecord.Encryptor().setEncryptorId((Uuid)legacyEncryptorId)));
            return brokerRecordEncryptors;
        } else {
            request.metadataEncryptors().forEach(requestEncryptor -> brokerRecordEncryptors.add(new RegisterBrokerRecord.Encryptor().setEncryptorId(requestEncryptor.encryptorId())));
        }
        return brokerRecordEncryptors;
    }

    private void throwUnsupportedVersionDueToIncompatibleFeature(int brokerId, String name, VersionRange range, short finalizedLevel) {
        String logMessage = "Unable to register broker " + brokerId + " because it does not support finalized version " + finalizedLevel + " of " + name + ". The broker wants a version between " + range.min() + " and " + range.max() + ", inclusive.";
        this.log.error(logMessage);
        throw new UnsupportedVersionException(logMessage);
    }

    ControllerResult<Void> registerController(ControllerRegistrationRequestData request, FinalizedControllerFeatures finalizedFeatures) {
        if (!this.featureControl.metadataVersionOrThrow().isControllerRegistrationSupported()) {
            throw new UnsupportedVersionException("The current MetadataVersion is too old to support controller registrations.");
        }
        ListenerInfo listenerInfo = ListenerInfo.fromControllerRegistrationRequest(request.listeners());
        RegisterControllerRecord.ControllerFeatureCollection features = new RegisterControllerRecord.ControllerFeatureCollection();
        request.features().forEach(feature -> features.add(new RegisterControllerRecord.ControllerFeature().setName(feature.name()).setMaxSupportedVersion(feature.maxSupportedVersion()).setMinSupportedVersion(feature.minSupportedVersion())));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        List<RegisterControllerRecord.Encryptor> controllerRecordMetadataEncryptors = null;
        Optional<Short> confluentMetadataLevel = finalizedFeatures.get("confluent.metadata.version");
        if (!confluentMetadataLevel.isPresent()) {
            confluentMetadataLevel = Optional.of(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel());
        }
        if (MetadataVersion.fromConfluentFeatureLevel((short)confluentMetadataLevel.get()).isSettingControllerMetadataEncryptorsSupported()) {
            controllerRecordMetadataEncryptors = this.getControllerRecordMetadataEncryptorsOrLegacyEncryptors(request);
        }
        int controllerId = request.controllerId();
        if (this.encryptionControl != null && controllerRecordMetadataEncryptors != null && !this.encryptionControl.activeEncryptorId().equals((Object)Uuid.ZERO_UUID) && !this.controllerRegistrationContainsActiveEncryptor(controllerRecordMetadataEncryptors)) {
            throw new ControllerRegistrationDoesNotContainActiveSecretException("Controller Registration with controller ID " + controllerId + " does not contain the active secret " + String.valueOf(this.encryptionControl.activeEncryptorId()));
        }
        records.add(new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(controllerId).setIncarnationId(request.incarnationId()).setZkMigrationReady(false).setEndPoints(listenerInfo.toControllerRegistrationRecord()).setFeatures(features).setMetadataEncryptors(controllerRecordMetadataEncryptors), 0));
        this.controllerSpecificMaybeAddRecordToRotateEncryptor(controllerId, controllerRecordMetadataEncryptors, records);
        return ControllerResult.atomicOf(records, null);
    }

    private boolean controllerRegistrationContainsActiveEncryptor(List<RegisterControllerRecord.Encryptor> controllerRecordMetadataEncryptors) {
        return controllerRecordMetadataEncryptors.stream().map(RegisterControllerRecord.Encryptor::encryptorId).collect(Collectors.toSet()).contains(this.encryptionControl.activeEncryptorId());
    }

    private void controllerSpecificMaybeAddRecordToRotateEncryptor(int controllerId, List<RegisterControllerRecord.Encryptor> controllerEncryptors, List<ApiMessageAndVersion> records) {
        if (this.encryptionControl == null) {
            this.log.warn("Controller {} registered but there is no encryption control manager. We will ignore the supplied metadata log encryptor list and not attempt to rotate encryptors.", (Object)controllerId);
        } else {
            Set<Uuid> controllerEncryptorIds = controllerEncryptors == null ? null : controllerEncryptors.stream().map(RegisterControllerRecord.Encryptor::encryptorId).collect(Collectors.toSet());
            this.maybeAddRecordToRotateEncryptor(controllerId, controllerEncryptorIds, records);
        }
    }

    private void brokerSpecificMaybeAddRecordToRotateEncryptor(int brokerId, List<RegisterBrokerRecord.Encryptor> brokerEncryptors, List<ApiMessageAndVersion> records) {
        if (this.encryptionControl == null) {
            this.log.debug("Broker {} registered but there is no encryption control manager. We will ignore the supplied metadata log encryptor list and not attempt to rotate encryptors.", (Object)brokerId);
        } else {
            Set<Uuid> brokerEncryptorIds = brokerEncryptors == null ? null : brokerEncryptors.stream().map(RegisterBrokerRecord.Encryptor::encryptorId).collect(Collectors.toSet());
            this.maybeAddRecordToRotateEncryptor(brokerId, brokerEncryptorIds, records);
        }
    }

    private void maybeAddRecordToRotateEncryptor(int nodeIdRegisteringEncryptors, Set<Uuid> knownEncryptors, List<ApiMessageAndVersion> records) {
        MetadataEncryptorFactory currentMetadataEncryptorFactory = this.encryptionControl.getMetadataEncryptorFactorySupplier().get();
        Set<Uuid> legacyEncryptorsIdsKnownByThisControllerNode = currentMetadataEncryptorFactory.legacyEncryptorIds();
        Set<Uuid> encryptorIdsFromNode = this.getEncryptorIdsForNode(nodeIdRegisteringEncryptors, knownEncryptors, legacyEncryptorsIdsKnownByThisControllerNode);
        Set<Uuid> newerEncryptionIdsFromNode = currentMetadataEncryptorFactory.getNewerKnownEncryptorIds(encryptorIdsFromNode, this.encryptionControl.activeEncryptorId());
        this.log.info("Node {} identified {} potential metadata log encryptor rotation candidates: [{}]", new Object[]{nodeIdRegisteringEncryptors, newerEncryptionIdsFromNode.size(), encryptorIdsFromNode.stream().map(Uuid::toString).sorted().collect(Collectors.joining(", "))});
        HashSet<Uuid> newerEncryptionIdsKnownByAllControllers = new HashSet<Uuid>(newerEncryptionIdsFromNode);
        newerEncryptionIdsFromNode.forEach(rotationCandidateEncryptorId -> this.controllerRegistrations.values().stream().filter(r -> r.id() != nodeIdRegisteringEncryptors).forEach(controllerRegistration -> {
            Set<Uuid> encryptorIdsForOtherController = this.getEncryptorIdsForNode(controllerRegistration.id(), controllerRegistration.metadataEncryptors(), legacyEncryptorsIdsKnownByThisControllerNode);
            boolean knownByOtherController = encryptorIdsForOtherController.contains(rotationCandidateEncryptorId);
            this.log.info("Potential metadata log encryptor rotation candidate {} known by other controller {}: {}", new Object[]{rotationCandidateEncryptorId, controllerRegistration.id(), knownByOtherController});
            if (!knownByOtherController) {
                newerEncryptionIdsKnownByAllControllers.remove(rotationCandidateEncryptorId);
            }
        }));
        this.log.info("Potential metadata log encryptor rotation candidates that are existing in all controllers: [{}]", (Object)newerEncryptionIdsKnownByAllControllers.stream().map(Uuid::toString).sorted().collect(Collectors.joining(", ")));
        HashSet<Uuid> newerEncryptionIdsKnownByAllBrokers = new HashSet<Uuid>(newerEncryptionIdsFromNode);
        newerEncryptionIdsFromNode.forEach(rotationCandidateEncryptorId -> this.brokerRegistrations.values().stream().filter(r -> r.id() != nodeIdRegisteringEncryptors).forEach(brokerRegistration -> {
            Set<Uuid> encryptorIdsForOtherBroker = this.getEncryptorIdsForNode(brokerRegistration.id(), brokerRegistration.metadataEncryptorIds(), legacyEncryptorsIdsKnownByThisControllerNode);
            boolean knownByOtherBroker = encryptorIdsForOtherBroker.contains(rotationCandidateEncryptorId);
            this.log.info("Potential metadata log encryptor rotation candidate {} known by other broker {}: {}", new Object[]{rotationCandidateEncryptorId, brokerRegistration.id(), knownByOtherBroker});
            if (!knownByOtherBroker) {
                newerEncryptionIdsKnownByAllBrokers.remove(rotationCandidateEncryptorId);
            }
        }));
        this.log.info("Potential metadata log encryptor rotation candidates that are existing in all brokers: [{}]", (Object)newerEncryptionIdsKnownByAllBrokers.stream().map(Uuid::toString).sorted().collect(Collectors.joining(", ")));
        HashSet<Uuid> newerEncryptionIdsKnownByAllNodes = new HashSet<Uuid>(newerEncryptionIdsKnownByAllControllers);
        newerEncryptionIdsKnownByAllNodes.retainAll(newerEncryptionIdsKnownByAllBrokers);
        AtomicReference mostRecentEncryptorIdRef = new AtomicReference();
        AtomicReference mostRecentCreateDateRef = new AtomicReference();
        newerEncryptionIdsKnownByAllNodes.forEach(encryptorId -> {
            OffsetDateTime createDateForEncryptor = currentMetadataEncryptorFactory.getCreateDateForEncryptor((Uuid)encryptorId);
            if (createDateForEncryptor == null) {
                throw new IllegalStateException("This controller node did not know about an encryptor with ID " + String.valueOf(encryptorId) + " that we already determined it knew about (should not happen)");
            }
            OffsetDateTime mostRecentCreateDate = (OffsetDateTime)mostRecentCreateDateRef.get();
            if (mostRecentCreateDate == null || createDateForEncryptor.isAfter(mostRecentCreateDate)) {
                mostRecentEncryptorIdRef.set(encryptorId);
                mostRecentCreateDateRef.set(createDateForEncryptor);
            }
        });
        Uuid newestEncryptorIdKnownByAllNodes = (Uuid)mostRecentEncryptorIdRef.get();
        if (newestEncryptorIdKnownByAllNodes != null) {
            if (legacyEncryptorsIdsKnownByThisControllerNode.contains(newestEncryptorIdKnownByAllNodes)) {
                if (this.encryptionControl.activeEncryptorId() != currentMetadataEncryptorFactory.activeIdFromLegacyConfig()) {
                    newestEncryptorIdKnownByAllNodes = currentMetadataEncryptorFactory.activeIdFromLegacyConfig();
                } else {
                    return;
                }
            }
            this.log.info("Rotating to metadata log encryptor rotation candidate that exists in all nodes: {}", (Object)newestEncryptorIdKnownByAllNodes);
            records.add(new ApiMessageAndVersion((ApiMessage)new InstallMetadataEncryptorRecord().setKeyId(newestEncryptorIdKnownByAllNodes), 0));
        }
    }

    private Set<Uuid> getEncryptorIdsForNode(int nodeIdRegisteringEncryptors, Set<Uuid> knownEncryptors, Set<Uuid> legacyEncryptorsIdsKnownByThisControllerNode) {
        Set<Uuid> encryptorIdsFromNode;
        if (knownEncryptors == null) {
            encryptorIdsFromNode = legacyEncryptorsIdsKnownByThisControllerNode;
            this.log.info("Node {} registered but did not communicate a metadata log encryptor list (null received, likely due to an older image), assuming same legacy encryptor IDs that we know: [{}]", (Object)nodeIdRegisteringEncryptors, (Object)encryptorIdsFromNode.stream().map(Uuid::toString).sorted().collect(Collectors.joining(", ")));
        } else {
            encryptorIdsFromNode = knownEncryptors;
        }
        return encryptorIdsFromNode;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private List<RegisterControllerRecord.Encryptor> getControllerRecordMetadataEncryptorsOrLegacyEncryptors(ControllerRegistrationRequestData request) {
        ArrayList<RegisterControllerRecord.Encryptor> controllerRecordEncryptors = new ArrayList<RegisterControllerRecord.Encryptor>();
        if (request.metadataEncryptors() == null) {
            if (this.encryptionControl == null) return null;
            MetadataEncryptorFactory currentMetadataEncryptorFactory = this.encryptionControl.getMetadataEncryptorFactorySupplier().get();
            currentMetadataEncryptorFactory.legacyEncryptorIds().forEach(legacyEncryptorId -> controllerRecordEncryptors.add(new RegisterControllerRecord.Encryptor().setEncryptorId((Uuid)legacyEncryptorId)));
            return controllerRecordEncryptors;
        } else {
            request.metadataEncryptors().forEach(requestEncryptor -> controllerRecordEncryptors.add(new RegisterControllerRecord.Encryptor().setEncryptorId(requestEncryptor.encryptorId())));
        }
        return controllerRecordEncryptors;
    }

    RegisterBrokerRecord.BrokerFeature processRegistrationFeature(int brokerId, FinalizedControllerFeatures finalizedFeatures, BrokerRegistrationRequestData.Feature feature) {
        if (!feature.name().equals("metadata.version") && !feature.name().equals("confluent.metadata.version")) {
            short finalized = finalizedFeatures.versionOrDefault(feature.name(), (short)0);
            if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized)) {
                throw new UnsupportedVersionException("Unable to register because the broker does not support finalized version " + finalized + " of " + feature.name() + ". The broker wants a version between " + feature.minSupportedVersion() + " and " + feature.maxSupportedVersion() + ", inclusive.");
            }
            if (!finalizedFeatures.featureNames().contains(feature.name())) {
                this.log.debug("Broker {} registered with version range ({}, {}] of feature {} which controller does not know or has finalized version of 0.", new Object[]{brokerId, feature.minSupportedVersion(), feature.maxSupportedVersion(), feature.name()});
            }
        }
        return new RegisterBrokerRecord.BrokerFeature().setName(feature.name()).setMinSupportedVersion(feature.minSupportedVersion()).setMaxSupportedVersion(feature.maxSupportedVersion());
    }

    boolean trackBrokerHeartbeat(int brokerId, long brokerEpoch) {
        BrokerHeartbeatManager manager = this.heartbeatManager;
        if (manager == null) {
            return false;
        }
        manager.tracker().updateContactTime(new BrokerIdAndEpoch(brokerId, brokerEpoch));
        return true;
    }

    public OptionalLong registerBrokerRecordOffset(int brokerId) {
        Long registrationOffset = (Long)this.registerBrokerRecordOffsets.get((Object)brokerId);
        if (registrationOffset != null) {
            return OptionalLong.of(registrationOffset);
        }
        return OptionalLong.empty();
    }

    public ControllerResult<AlterReplicaExclusionsReply> processAlterReplicaExclusions(Set<AlterReplicaExclusionOp> requestedExclusions) {
        if (BrokerReplicaExclusionUtils.exclusionsMatchCurrentState(this.brokerReplicaExclusions, requestedExclusions)) {
            Set results = requestedExclusions.stream().map(op -> new BrokerReplicaExclusionResult(op.brokerId(), Optional.empty(), op.opType(), op.reason())).collect(Collectors.toSet());
            this.log.debug("Request with exclusion operations {} was matches the current state of the cluster - returning a successful response and not applying anything.", requestedExclusions);
            return ControllerResult.of(List.of(), new AlterReplicaExclusionsReply(true, true, results));
        }
        BrokerReplicaExclusionModificationResult modificationResult = BrokerReplicaExclusionUtils.prepareReplicaExclusions(this.brokerReplicaExclusions, requestedExclusions);
        if (!modificationResult.shouldApply()) {
            List errorStrings = modificationResult.reply().exclusionResults().stream().filter(r -> r.apiErrorOptional().isPresent()).map(e -> String.format("%s for broker %d, error %s", e.opType(), e.brokerId(), e.apiErrorOptional().get())).collect(Collectors.toList());
            this.log.info("Will not apply replica exclusions {} due to {} errors ({})", new Object[]{requestedExclusions, errorStrings.size(), errorStrings});
            return ControllerResult.of(List.of(), modificationResult.reply());
        }
        this.log.info("Applying {} replica exclusion operations ({})", (Object)requestedExclusions.size(), requestedExclusions.stream().map(e -> String.format("Broker %d -> %s (reason: %s)", e.brokerId(), e.opType(), e.reason())).collect(Collectors.toList()));
        return ControllerResult.of(List.of(new ApiMessageAndVersion((ApiMessage)this.exclusionRecord(modificationResult.modifiedExclusions().entrySet()), 0)), modificationResult.reply());
    }

    public ControllerResult<AlterBrokerHealthResponseData> processAlterBrokerHealth(AlterBrokerHealthRequestData request, double maxDemotedBrokersPercentage) {
        if (!this.featureControl.metadataVersionOrThrow().isSettingBrokerHealthSupported()) {
            throw new UnsupportedVersionException("Alter broker health not supported in version " + this.featureControl.metadataVersionOrThrow().confluentRelease());
        }
        BrokerComponent componentCode = BrokerComponent.forId((byte)request.componentCode());
        ComponentHealthStatus desiredStatus = ComponentHealthStatus.forId((byte)request.statusCode());
        String reason = request.reason();
        if (desiredStatus == ComponentHealthStatus.DEGRADED && !request.force()) {
            HashSet<Integer> demotedBrokers = new HashSet<Integer>(this.demotedBrokers());
            demotedBrokers.addAll(Optional.ofNullable(request.brokerIds()).orElseGet(List::of));
            int desiredNumBrokersDemoted = demotedBrokers.size();
            double numBrokersThatCanBeDemoted = Math.max(1.0, maxDemotedBrokersPercentage / 100.0 * (double)this.liveBrokers().size());
            if ((double)desiredNumBrokersDemoted > numBrokersThatCanBeDemoted || this.liveBrokers().size() < 3) {
                String errorMessage = String.format("Demoting brokers %s would lead to total %s brokers demoted and exceed the configured limit percentage of %s", request.brokerIds(), desiredNumBrokersDemoted, maxDemotedBrokersPercentage);
                this.log.error("AlterBrokerHealth request {} rejected: {}", (Object)request, (Object)errorMessage);
                throw new DemotionLimitReachedException(Errors.DEMOTION_LIMIT_REACHED.message());
            }
        }
        ArrayList<AlterBrokerHealthResponseData.BrokerHealthStatusResult> healthStatusResults = new ArrayList<AlterBrokerHealthResponseData.BrokerHealthStatusResult>();
        BoundedList brokerChangeRecords = BoundedList.newArrayBacked((int)10000);
        HashSet requestBrokerIds = new HashSet(request.brokerIds());
        Iterator iterator = requestBrokerIds.iterator();
        while (iterator.hasNext()) {
            BrokerRegistrationChangeRecord changeRecord;
            ArrayList<DegradedBrokerHealthState> updatedBrokerDegradedStates;
            int brokerId = (Integer)iterator.next();
            DegradedBrokerHealthState degradedBrokerHealthState = new DegradedBrokerHealthState(reason, componentCode);
            AlterBrokerHealthResponseData.BrokerHealthStatusResult result = new AlterBrokerHealthResponseData.BrokerHealthStatusResult().setBrokerId(brokerId).setComponentCode(componentCode.id()).setStatusCode(desiredStatus.id());
            healthStatusResults.add(result);
            BrokerRegistration brokerRegistration = this.registration(brokerId);
            if (brokerRegistration == null) {
                throw new InvalidRequestException(String.format("Invalid brokerId: %s. Only broker IDs of registered brokers are valid.", brokerId));
            }
            Set<DegradedBrokerHealthState> currentBrokerDegradedStates = brokerRegistration.degradedComponents();
            if (desiredStatus == ComponentHealthStatus.DEGRADED) {
                if (!currentBrokerDegradedStates.contains(degradedBrokerHealthState)) {
                    updatedBrokerDegradedStates = new ArrayList<DegradedBrokerHealthState>(currentBrokerDegradedStates);
                    updatedBrokerDegradedStates.add(degradedBrokerHealthState);
                    changeRecord = this.createRegistrationChangeRecordForDegradedHealth(this.brokerRegistrations().get(brokerId), updatedBrokerDegradedStates);
                    brokerChangeRecords.add(new ApiMessageAndVersion((ApiMessage)changeRecord, 1));
                    continue;
                }
                this.log.debug("Attempted to mark already degraded broker component {} as unhealthy for broker {}", (Object)brokerId, (Object)componentCode);
                continue;
            }
            if (desiredStatus == ComponentHealthStatus.HEALTHY) {
                if (currentBrokerDegradedStates.contains(degradedBrokerHealthState)) {
                    updatedBrokerDegradedStates = new ArrayList<DegradedBrokerHealthState>(currentBrokerDegradedStates);
                    updatedBrokerDegradedStates.remove(degradedBrokerHealthState);
                    changeRecord = this.createRegistrationChangeRecordForDegradedHealth(this.brokerRegistrations().get(brokerId), updatedBrokerDegradedStates);
                    brokerChangeRecords.add(new ApiMessageAndVersion((ApiMessage)changeRecord, 1));
                    continue;
                }
                this.log.debug("Attempted to mark non-degraded broker component {} as healthy for broker {}", (Object)brokerId, (Object)componentCode);
                continue;
            }
            throw new InvalidRequestException("Invalid status " + String.valueOf(desiredStatus) + " in AlterBrokerHealth request " + String.valueOf(request));
        }
        if (brokerChangeRecords.isEmpty()) {
            this.log.debug("No broker registration change records were created from no-op AlterBrokerHealth request {}", (Object)request);
        } else {
            this.log.info("Created broker registration change records from AlterBrokerHealth request {}: {}", (Object)request, (Object)brokerChangeRecords);
        }
        return ControllerResult.of((List<ApiMessageAndVersion>)brokerChangeRecords, new AlterBrokerHealthResponseData().setBrokerHealthStatusResults(healthStatusResults));
    }

    public void replay(RegisterBrokerRecord record, long offset) {
        this.registerBrokerRecordOffsets.put((Object)record.brokerId(), (Object)offset);
        int brokerId = record.brokerId();
        ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRecord(record.endPoints());
        HashMap<String, VersionRange> features = new HashMap<String, VersionRange>();
        Iterator iterator = record.features().iterator();
        while (iterator.hasNext()) {
            RegisterBrokerRecord.BrokerFeature feature = (RegisterBrokerRecord.BrokerFeature)iterator.next();
            features.put(feature.name(), VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()));
        }
        BrokerRegistration prevRegistration = (BrokerRegistration)this.brokerRegistrations.put((Object)brokerId, (Object)new BrokerRegistration.Builder().setId(brokerId).setEpoch(record.brokerEpoch()).setIncarnationId(record.incarnationId()).setListeners(listenerInfo.listeners()).setSupportedFeatures(features).setRack(Optional.ofNullable(record.rack())).setFenced(record.fenced()).setInControlledShutdown(record.inControlledShutdown()).setIsMigratingZkBroker(record.isMigratingZkBroker()).setDegradedComponents(DegradedBrokerHealthState.fromRegisterBrokerRecord(record.degradedComponents())).setMetadataEncryptors(this.getBrokerMetadataEncryptorIds(record)).setDirectories(record.logDirs()).build());
        if (record.degradedComponents().isEmpty()) {
            this.demotedBrokers.remove((Object)brokerId);
        } else {
            this.demotedBrokers.add((Object)brokerId);
        }
        this.updateDirectories(brokerId, prevRegistration == null ? null : prevRegistration.directories(), record.logDirs());
        if (this.heartbeatManager != null) {
            if (prevRegistration != null) {
                this.heartbeatManager.remove(brokerId);
            }
            this.heartbeatManager.register(brokerId, record.fenced());
        }
        if (prevRegistration == null) {
            this.log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", (Object)record.brokerId(), (Object)record);
        } else if (prevRegistration.incarnationId().equals((Object)record.incarnationId())) {
            this.log.info("Replayed RegisterBrokerRecord modifying the registration for broker {}: {}", (Object)record.brokerId(), (Object)record);
        } else {
            this.log.info("Replayed RegisterBrokerRecord establishing a new incarnation of broker {}: {}", (Object)record.brokerId(), (Object)record);
        }
    }

    private Set<Uuid> getBrokerMetadataEncryptorIds(RegisterBrokerRecord record) {
        if (record.metadataEncryptors() == null) {
            return null;
        }
        return record.metadataEncryptors().stream().map(RegisterBrokerRecord.Encryptor::encryptorId).collect(Collectors.toSet());
    }

    public void replay(UnregisterBrokerRecord record) {
        this.registerBrokerRecordOffsets.remove((Object)record.brokerId());
        int brokerId = record.brokerId();
        BrokerRegistration registration = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        if (registration == null) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration found for that id", record));
        }
        if (registration.epoch() != record.brokerEpoch()) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration with that epoch found", record));
        }
        if (this.heartbeatManager != null) {
            this.heartbeatManager.remove(brokerId);
        }
        this.updateDirectories(brokerId, registration.directories(), null);
        this.brokerRegistrations.remove((Object)brokerId);
        this.demotedBrokers.remove((Object)brokerId);
        this.log.info("Replayed {}", (Object)record);
    }

    public void replay(BrokerReplicaExclusionRecord record) {
        Map<Integer, String> exclusionsInRecord = record.brokerExclusions().stream().collect(Collectors.toMap(BrokerReplicaExclusionRecord.BrokerReplicaExclusion::brokerId, BrokerReplicaExclusionRecord.BrokerReplicaExclusion::reason));
        HashMap<Integer, String> oldExclusions = new HashMap<Integer, String>((Map<Integer, String>)this.brokerReplicaExclusions);
        this.brokerReplicaExclusions.clear();
        this.brokerReplicaExclusions.putAll(exclusionsInRecord);
        this.log.info("Loaded new broker replica exclusions {} (old exclusions {})", new TreeMap<Integer, String>((Map<Integer, String>)this.brokerReplicaExclusions), new TreeMap<Integer, String>(oldExclusions));
    }

    public void replay(FenceBrokerRecord record) {
        this.replayRegistrationChange(record, record.id(), record.epoch(), BrokerRegistrationFencingChange.FENCE.asBoolean(), BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(), Optional.empty(), Optional.empty());
    }

    public void replay(UnfenceBrokerRecord record) {
        this.replayRegistrationChange(record, record.id(), record.epoch(), BrokerRegistrationFencingChange.UNFENCE.asBoolean(), BrokerRegistrationInControlledShutdownChange.NONE.asBoolean(), Optional.empty(), Optional.empty());
    }

    public void replay(BrokerRegistrationChangeRecord record) {
        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(() -> new IllegalStateException(String.format("Unable to replay %s: unknown value for fenced field: %x", record, record.fenced())));
        BrokerRegistrationInControlledShutdownChange inControlledShutdownChange = BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(() -> new IllegalStateException(String.format("Unable to replay %s: unknown value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
        Set healthStatusChanges = null;
        if (record.degradedComponents() != null) {
            healthStatusChanges = record.degradedComponents().stream().map(degradedComponent -> new DegradedBrokerHealthState(degradedComponent.reason(), BrokerComponent.forId((byte)degradedComponent.componentCode()))).collect(Collectors.toSet());
        }
        Optional<List<Uuid>> directoriesChange = Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
        this.replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(), fencingChange.asBoolean(), inControlledShutdownChange.asBoolean(), Optional.ofNullable(healthStatusChanges), directoriesChange);
    }

    private void replayRegistrationChange(ApiMessage record, int brokerId, long brokerEpoch, Optional<Boolean> fencingChange, Optional<Boolean> inControlledShutdownChange, Optional<Set<DegradedBrokerHealthState>> brokerStatusChanges, Optional<List<Uuid>> directoriesChange) {
        BrokerRegistration curRegistration = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        if (curRegistration == null) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration found for that id", record.toString()));
        }
        if (curRegistration.epoch() != brokerEpoch) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration with that epoch found", record.toString()));
        }
        BrokerRegistration nextRegistration = curRegistration.cloneWith(fencingChange, inControlledShutdownChange, brokerStatusChanges, directoriesChange);
        if (!curRegistration.equals(nextRegistration)) {
            this.log.info("Replayed {} modifying the registration for broker {}: {}", new Object[]{record.getClass().getSimpleName(), brokerId, record});
            this.brokerRegistrations.put((Object)brokerId, (Object)nextRegistration);
            if (nextRegistration.degradedComponents().isEmpty()) {
                this.demotedBrokers.remove((Object)brokerId);
            } else {
                this.demotedBrokers.add((Object)brokerId);
            }
        } else {
            this.log.info("Ignoring no-op registration change {} for {}", (Object)record, (Object)curRegistration);
        }
        this.updateDirectories(brokerId, curRegistration.directories(), nextRegistration.directories());
        if (this.heartbeatManager != null) {
            this.heartbeatManager.register(brokerId, nextRegistration.fenced());
        }
        if (this.readyBrokersFuture.isPresent() && this.readyBrokersFuture.get().check()) {
            this.readyBrokersFuture.get().future.complete(null);
            this.readyBrokersFuture = Optional.empty();
        }
    }

    public void replay(RegisterControllerRecord record) {
        ControllerRegistration newRegistration = new ControllerRegistration.Builder(record).build();
        ControllerRegistration prevRegistration = (ControllerRegistration)this.controllerRegistrations.put((Object)record.controllerId(), (Object)newRegistration);
        this.log.info("Replayed RegisterControllerRecord containing {}.{}", (Object)newRegistration, prevRegistration == null ? "" : " Previous incarnation was " + String.valueOf(prevRegistration.incarnationId()));
    }

    Iterator<UsableBroker> usableBrokers() {
        if (this.heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        return this.heartbeatManager.usableBrokers(id -> {
            Optional<String> registeredRack = ((BrokerRegistration)this.brokerRegistrations.get(id)).rack();
            if (registeredRack.isPresent()) {
                String rack = registeredRack.get();
                return Optional.of(this.rackMigrationMap.getOrDefault(rack, rack));
            }
            return registeredRack;
        }, this.brokerReplicaExclusions.keySet(), this.cellControl::getBrokerCellId);
    }

    public boolean isUnfenced(int brokerId) {
        BrokerRegistration registration = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        if (registration == null) {
            return false;
        }
        return !registration.fenced();
    }

    public BrokerRegistration registration(int brokerId) {
        return (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
    }

    public boolean inControlledShutdown(int brokerId) {
        BrokerRegistration registration = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        if (registration == null) {
            return false;
        }
        return registration.inControlledShutdown();
    }

    public boolean isActive(int brokerId) {
        BrokerRegistration registration = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        if (registration == null) {
            return false;
        }
        return !registration.inControlledShutdown() && !registration.fenced();
    }

    MetadataResponseData.MetadataResponseBrokerCollection unfencedBrokerEndpoints(ListenerName listenerName) {
        MetadataResponseData.MetadataResponseBrokerCollection brokers = new MetadataResponseData.MetadataResponseBrokerCollection();
        for (BrokerRegistration registration : this.brokerRegistrations.values()) {
            Endpoint endpoint;
            if (registration.fenced() || (endpoint = registration.listeners().get(listenerName.value())) == null) continue;
            MetadataResponseData.MetadataResponseBroker metadataResponseBroker = new MetadataResponseData.MetadataResponseBroker().setNodeId(registration.id()).setHost(endpoint.host()).setPort(endpoint.port());
            registration.rack().ifPresent(arg_0 -> ((MetadataResponseData.MetadataResponseBroker)metadataResponseBroker).setRack(arg_0));
            brokers.add((ImplicitLinkedHashCollection.Element)metadataResponseBroker);
        }
        return brokers;
    }

    BrokerHeartbeatManager heartbeatManager() {
        if (this.heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        return this.heartbeatManager;
    }

    public void checkBrokerEpoch(int brokerId, long brokerEpoch) {
        BrokerRegistration registration = (BrokerRegistration)this.brokerRegistrations.get((Object)brokerId);
        if (registration == null) {
            throw new StaleBrokerEpochException("No broker registration found for broker id " + brokerId);
        }
        if (registration.epoch() != brokerEpoch) {
            throw new StaleBrokerEpochException("Expected broker epoch " + registration.epoch() + ", but got broker epoch " + brokerEpoch);
        }
    }

    public boolean hasBrokerRegistrationForId(int id) {
        return this.brokerRegistrations.containsKey((Object)id);
    }

    public boolean hasControllerRegistrationForId(int id) {
        return this.controllerRegistrations.containsKey((Object)id);
    }

    private BrokerRegistrationChangeRecord createRegistrationChangeRecordForDegradedHealth(BrokerRegistration currentRegistration, List<DegradedBrokerHealthState> updatedDegradedHealthStates) {
        List<BrokerRegistrationChangeRecord.DegradedComponent> degradedComponents = updatedDegradedHealthStates.stream().map(c -> new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(c.component().id()).setReason(c.reason())).collect(Collectors.toList());
        return new BrokerRegistrationChangeRecord().setBrokerId(currentRegistration.id()).setBrokerEpoch(currentRegistration.epoch()).setDegradedComponents(degradedComponents);
    }

    public void addReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers) {
        this.readyBrokersFuture = Optional.of(new ReadyBrokersFuture(future, minBrokers));
        if (this.readyBrokersFuture.get().check()) {
            this.readyBrokersFuture.get().future.complete(null);
            this.readyBrokersFuture = Optional.empty();
        }
    }

    public boolean hasOnlineDir(int brokerId, Uuid directoryId) {
        BrokerRegistration registration = this.registration(brokerId);
        return registration != null && registration.hasOnlineDir(directoryId);
    }

    public Uuid defaultDir(int brokerId) {
        BrokerRegistration registration = this.registration(brokerId);
        if (registration == null) {
            return DirectoryId.UNASSIGNED;
        }
        List<Uuid> directories = registration.directories();
        if (directories.isEmpty()) {
            return DirectoryId.MIGRATING;
        }
        if (directories.size() == 1) {
            return directories.get(0);
        }
        return DirectoryId.UNASSIGNED;
    }

    void updateDirectories(int brokerId, List<Uuid> dirsToRemove, List<Uuid> dirsToAdd) {
        if (dirsToRemove != null) {
            for (Uuid directory : dirsToRemove) {
                if (this.directoryToBroker.remove((Object)directory, (Object)brokerId)) continue;
                throw new IllegalStateException("BUG: directory " + String.valueOf(directory) + " not assigned to broker " + brokerId);
            }
        }
        if (dirsToAdd != null) {
            for (Uuid directory : dirsToAdd) {
                Integer existingId = (Integer)this.directoryToBroker.putIfAbsent((Object)directory, (Object)brokerId);
                if (existingId == null || existingId == brokerId) continue;
                throw new IllegalStateException("BUG: directory " + String.valueOf(directory) + " already assigned to broker " + existingId);
            }
        }
    }

    Iterator<Map.Entry<Integer, Map<String, VersionRange>>> brokerSupportedFeatures() {
        return new Iterator<Map.Entry<Integer, Map<String, VersionRange>>>(){
            private final Iterator<BrokerRegistration> iter;
            {
                this.iter = ClusterControlManager.this.brokerRegistrations.values().iterator();
            }

            @Override
            public boolean hasNext() {
                return this.iter.hasNext();
            }

            @Override
            public Map.Entry<Integer, Map<String, VersionRange>> next() {
                BrokerRegistration registration = this.iter.next();
                return new AbstractMap.SimpleImmutableEntry<Integer, Map<String, VersionRange>>(registration.id(), registration.supportedFeatures());
            }
        };
    }

    Iterator<Map.Entry<Integer, Map<String, VersionRange>>> controllerSupportedFeatures() {
        if (!this.featureControl.metadataVersionOrThrow().isControllerRegistrationSupported()) {
            throw new UnsupportedVersionException("The current MetadataVersion is too old to support controller registrations.");
        }
        return new Iterator<Map.Entry<Integer, Map<String, VersionRange>>>(){
            private final Iterator<ControllerRegistration> iter;
            {
                this.iter = ClusterControlManager.this.controllerRegistrations.values().iterator();
            }

            @Override
            public boolean hasNext() {
                return this.iter.hasNext();
            }

            @Override
            public Map.Entry<Integer, Map<String, VersionRange>> next() {
                ControllerRegistration registration = this.iter.next();
                return new AbstractMap.SimpleImmutableEntry<Integer, Map<String, VersionRange>>(registration.id(), registration.supportedFeatures());
            }
        };
    }

    private BrokerReplicaExclusionRecord exclusionRecord(Set<Map.Entry<Integer, String>> exclusions) {
        return new BrokerReplicaExclusionRecord().setBrokerExclusions(exclusions.stream().map(pair -> new BrokerReplicaExclusionRecord.BrokerReplicaExclusion().setBrokerId((Integer)pair.getKey()).setReason((String)pair.getValue())).collect(Collectors.toList()));
    }

    public Set<Integer> liveBrokers() {
        return this.brokerRegistrations.values().stream().filter(e -> this.isActive(e.id())).map(registration -> registration.id()).collect(Collectors.toSet());
    }

    @FunctionalInterface
    static interface BrokerShutdownHandler {
        public void addRecordsForShutdown(int var1, boolean var2, List<ApiMessageAndVersion> var3);
    }

    class ReadyBrokersFuture {
        private final CompletableFuture<Void> future;
        private final int minBrokers;

        ReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers) {
            this.future = future;
            this.minBrokers = minBrokers;
        }

        boolean check() {
            int numUnfenced = 0;
            for (BrokerRegistration registration : ClusterControlManager.this.brokerRegistrations.values()) {
                if (!registration.fenced()) {
                    ++numUnfenced;
                }
                if (numUnfenced < this.minBrokers) continue;
                return true;
            }
            return false;
        }
    }

    static class Builder {
        private LogContext logContext = null;
        private String clusterId = null;
        private Time time = Time.SYSTEM;
        private SnapshotRegistry snapshotRegistry = null;
        private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
        private ReplicaPlacer replicaPlacer = null;
        private FeatureControlManager featureControl = null;
        private CellControlManager cellControl = null;
        private EncryptionControlManager encryptionControl = null;
        private Map<String, String> rackMigrationMap = Map.of();
        private BrokerShutdownHandler brokerShutdownHandler = null;
        private QuorumControllerMetrics metrics = null;

        Builder() {
        }

        Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        Builder setClusterId(String clusterId) {
            this.clusterId = clusterId;
            return this;
        }

        Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        Builder setSessionTimeoutNs(long sessionTimeoutNs) {
            this.sessionTimeoutNs = sessionTimeoutNs;
            return this;
        }

        Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
            this.replicaPlacer = replicaPlacer;
            return this;
        }

        Builder setFeatureControlManager(FeatureControlManager featureControl) {
            this.featureControl = featureControl;
            return this;
        }

        Builder setCellControlManager(CellControlManager cellControl) {
            this.cellControl = cellControl;
            return this;
        }

        Builder setEncryptionControlManager(EncryptionControlManager encryptionControl) {
            this.encryptionControl = encryptionControl;
            return this;
        }

        Builder setBrokerShutdownHandler(BrokerShutdownHandler brokerShutdownHandler) {
            this.brokerShutdownHandler = brokerShutdownHandler;
            return this;
        }

        Builder setRackMigrationMapConfig(Map<String, String> rackMigrationMap) {
            this.rackMigrationMap = rackMigrationMap;
            return this;
        }

        Builder setMetrics(QuorumControllerMetrics metrics) {
            this.metrics = metrics;
            return this;
        }

        ClusterControlManager build() {
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.clusterId == null) {
                this.clusterId = Uuid.randomUuid().toString();
            }
            if (this.snapshotRegistry == null) {
                this.snapshotRegistry = new SnapshotRegistry(this.logContext);
            }
            if (this.replicaPlacer == null) {
                this.replicaPlacer = new StripedReplicaPlacer(new Random());
            }
            if (this.featureControl == null) {
                throw new RuntimeException("You must specify FeatureControlManager");
            }
            if (this.cellControl == null) {
                this.cellControl = new CellControlManager(new LogContext(), this.time, this.snapshotRegistry, this.featureControl, new Random(), 15, 6, 15, true, -1);
            }
            if (this.brokerShutdownHandler == null) {
                throw new RuntimeException("You must specify BrokerShutdownHandler");
            }
            if (this.metrics == null) {
                this.metrics = new QuorumControllerMetrics(Optional.empty(), this.time, new Metrics(), 0);
            }
            return new ClusterControlManager(this.logContext, this.clusterId, this.time, this.snapshotRegistry, this.sessionTimeoutNs, this.replicaPlacer, this.featureControl, this.cellControl, this.encryptionControl, this.rackMigrationMap, this.brokerShutdownHandler, this.metrics);
        }
    }
}

