package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ComponentHealthStatus;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.PartitionPlacementStrategy;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.ClusterLinkNotFoundException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterBrokerHealthRequestData;
import org.apache.kafka.common.message.AlterBrokerHealthResponseData;
import org.apache.kafka.common.message.AlterBrokerReplicaExclusionsRequestData;
import org.apache.kafka.common.message.AlterCellResponseData;
import org.apache.kafka.common.message.AlterMirrorTopicsRequestData;
import org.apache.kafka.common.message.AlterMirrorTopicsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignBrokersToCellResponseData;
import org.apache.kafka.common.message.AssignTenantsToCellResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreateCellResponseData;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.message.CreateClusterLinksResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteCellResponseData;
import org.apache.kafka.common.message.DeleteClusterLinksRequestData;
import org.apache.kafka.common.message.DeleteClusterLinksResponseData;
import org.apache.kafka.common.message.DeleteTenantsResponseData;
import org.apache.kafka.common.message.DescribeBrokerHealthResponseData;
import org.apache.kafka.common.message.DescribeCellsResponseData;
import org.apache.kafka.common.message.DescribeTenantsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.UnAssignBrokersFromCellResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerReplicaExclusionRecord;
import org.apache.kafka.common.metadata.CellRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClusterLinkRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EncryptedEnvelopeRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.InstallMetadataEncryptorRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.MirrorTopicChangeRecord;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveCellRecord;
import org.apache.kafka.common.metadata.RemoveClusterLinkRecord;
import org.apache.kafka.common.metadata.RemoveTenantRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TenantRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.AlterCellRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.AssignBrokersToCellRequest;
import org.apache.kafka.common.requests.AssignTenantsToCellRequest;
import org.apache.kafka.common.requests.CreateCellRequest;
import org.apache.kafka.common.requests.DeleteCellRequest;
import org.apache.kafka.common.requests.DeleteTenantsRequest;
import org.apache.kafka.common.requests.DescribeCellsRequest;
import org.apache.kafka.common.requests.DescribeTenantsRequest;
import org.apache.kafka.common.requests.UnAssignBrokersFromCellRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.LogReplayTracker;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ScramControlManager;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.metrics.CellMetrics;
import org.apache.kafka.controller.metrics.ControllerMetrics;
import org.apache.kafka.controller.metrics.ControllerMetricsManager;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.DegradedBrokerHealthState;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.TopicPlacement;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.CellAssignor;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.AlterReplicaExclusionOp;
import org.apache.kafka.server.common.AlterReplicaExclusionsReply;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateClusterLinkPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.codehaus.plexus.util.SelectorUtils;
import org.slf4j.Logger;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/*  JADX ERROR: NullPointerException in pass: ProcessKotlinInternals
    java.lang.NullPointerException
    */
/* loaded from: input_file:org/apache/kafka/controller/QuorumController.class */
public final class QuorumController implements Controller {
    private static final int MAX_RECORDS_PER_BATCH = 10000;
    static final int MAX_RECORDS_PER_USER_OP = 10000;
    public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler";
    static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
    private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders";
    private static final String LEADER_ELECTION_FOR_DEMOTED_BROKERS = "leaderElectionForDemotedBrokers";
    private static final String LEADER_ELECTION_FOR_PROMOTED_BROKERS = "leaderElectionForPromotedBrokers";
    private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
    private final FaultHandler nonFatalFaultHandler;
    private final FaultHandler fatalFaultHandler;
    private final Logger log;
    private final int nodeId;
    private final String clusterId;
    private final KafkaEventQueue queue;
    private final Time time;
    private final ControllerMetrics controllerMetrics;
    private final CellMetrics cellMetrics;
    private final ControllerMetricsManager controllerMetricsManager;
    private final SnapshotRegistry snapshotRegistry;
    private final DeferredEventQueue deferredEventQueue;
    private final Consumer<ConfigResource> resourceExists;
    private final EncryptionControlManager encryptionControl;
    private final ConfigurationControlManager configurationControl;
    private final ClientQuotaControlManager clientQuotaControlManager;
    private final ClusterControlManager clusterControl;
    private final FeatureControlManager featureControl;
    private final ProducerIdControlManager producerIdControlManager;
    private final ReplicationControlManager replicationControl;
    private final ScramControlManager scramControlManager;
    private final Optional<ClusterMetadataAuthorizer> authorizer;
    private final Map<String, Object> staticConfig;
    private final AclControlManager aclControlManager;
    private final LogReplayTracker logReplayTracker;
    private final ClusterLinkControlManager clusterLinkControl;
    private final MirrorTopicControlManager mirrorTopicControl;
    private final CellControlManager cellControl;
    private final TenantControlManager tenantControl;
    private final RaftClient<ApiMessageAndVersion> raftClient;
    private QuorumMetaLogListener metaLogListener;
    private volatile int curClaimEpoch;
    private long lastCommittedOffset;
    private int lastCommittedEpoch;
    private long lastCommittedTimestamp;
    private boolean needToCompleteAuthorizerLoad;
    private long writeOffset;
    private long oldestNonSnapshottedTimestamp;
    private final OptionalLong leaderImbalanceCheckIntervalNs;
    private final OptionalLong maxIdleIntervalNs;
    private ImbalanceSchedule imbalancedScheduled;
    private boolean noOpRecordScheduled;
    private final BootstrapMetadata bootstrapMetadata;
    private final ZkRecordConsumer zkRecordConsumer;
    private final boolean zkMigrationEnabled;
    private final int maxRecordsPerBatch;
    private final RecordRedactor recordRedactor;

    /* renamed from: org.apache.kafka.controller.QuorumController$1 */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$1.class */
    public class AnonymousClass1 implements ControllerWriteOperation<BrokerHeartbeatReply> {
        private final int brokerId;
        private boolean inControlledShutdown = false;
        final /* synthetic */ BrokerHeartbeatRequestData val$request;

        AnonymousClass1(BrokerHeartbeatRequestData brokerHeartbeatRequestData) {
            r5 = brokerHeartbeatRequestData;
            this.brokerId = r5.brokerId();
        }

        @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
        public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
            OptionalLong registerBrokerRecordOffset = QuorumController.this.clusterControl.registerBrokerRecordOffset(this.brokerId);
            if (!registerBrokerRecordOffset.isPresent()) {
                throw new StaleBrokerEpochException(String.format("Receive a heartbeat from broker %d before registration", Integer.valueOf(this.brokerId)));
            }
            ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat = QuorumController.this.replicationControl.processBrokerHeartbeat(r5, registerBrokerRecordOffset.getAsLong());
            this.inControlledShutdown = processBrokerHeartbeat.response().inControlledShutdown();
            QuorumController.this.rescheduleMaybeFenceStaleBrokers();
            return processBrokerHeartbeat;
        }

        @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
        public void processBatchEndOffset(long j) {
            if (this.inControlledShutdown) {
                QuorumController.this.clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(this.brokerId, j);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$Builder.class */
    public static class Builder {
        private final int nodeId;
        private final String clusterId;
        private FaultHandler nonFatalFaultHandler = null;
        private FaultHandler fatalFaultHandler = null;
        private Function<String, String> nameToTenantCallback = null;
        private Time time = Time.SYSTEM;
        private String threadNamePrefix = null;
        private LogContext logContext = null;
        private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
        private RaftClient<ApiMessageAndVersion> raftClient = null;
        private QuorumFeatures quorumFeatures = null;
        private short defaultReplicationFactor = 3;
        private int defaultNumPartitions = 1;
        private int defaultMinIsrCount = 2;
        private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
        private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
        private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
        private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
        private ControllerMetrics controllerMetrics = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
        private Optional<CreateClusterLinkPolicy> createClusterLinkPolicy = Optional.empty();
        private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
        private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
        private Map<String, Object> staticConfig = Collections.emptyMap();
        private BootstrapMetadata bootstrapMetadata = null;
        private int maxRecordsPerBatch = 10000;
        private boolean zkMigrationEnabled = false;
        private CellAssignor cellAssignor = null;
        private PartitionPlacementStrategy partitionPlacementStrategy = PartitionPlacementStrategy.CLUSTER_WIDE;
        private boolean isImplicitCellCreationEnabled = false;
        private Optional<TopicPlacement> defaultTopicPlacement = Optional.empty();
        private CellMetrics cellMetrics = null;

        public Builder(int i, String str) {
            this.nodeId = i;
            this.clusterId = str;
        }

        public Builder setNonFatalFaultHandler(FaultHandler faultHandler) {
            this.nonFatalFaultHandler = faultHandler;
            return this;
        }

        public Builder setFatalFaultHandler(FaultHandler faultHandler) {
            this.fatalFaultHandler = faultHandler;
            return this;
        }

        public Builder setNameToTenantCallback(Function<String, String> function) {
            this.nameToTenantCallback = function;
            return this;
        }

        public Builder setDefaultMinIsrCount(int i) {
            this.defaultMinIsrCount = i;
            return this;
        }

        public int nodeId() {
            return this.nodeId;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setThreadNamePrefix(String str) {
            this.threadNamePrefix = str;
            return this;
        }

        public Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public LogContext logContext() {
            return this.logContext;
        }

        public Builder setConfigSchema(KafkaConfigSchema kafkaConfigSchema) {
            this.configSchema = kafkaConfigSchema;
            return this;
        }

        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> raftClient) {
            this.raftClient = raftClient;
            return this;
        }

        public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
            this.quorumFeatures = quorumFeatures;
            return this;
        }

        public Builder setDefaultReplicationFactor(short s) {
            this.defaultReplicationFactor = s;
            return this;
        }

        public Builder setDefaultNumPartitions(int i) {
            this.defaultNumPartitions = i;
            return this;
        }

        public ReplicaPlacer replicaPlacer() {
            return this.replicaPlacer;
        }

        public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
            this.replicaPlacer = replicaPlacer;
            return this;
        }

        public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong optionalLong) {
            this.leaderImbalanceCheckIntervalNs = optionalLong;
            return this;
        }

        public Builder setMaxIdleIntervalNs(OptionalLong optionalLong) {
            this.maxIdleIntervalNs = optionalLong;
            return this;
        }

        public Builder setSessionTimeoutNs(long j) {
            this.sessionTimeoutNs = j;
            return this;
        }

        public Builder setMetrics(ControllerMetrics controllerMetrics) {
            this.controllerMetrics = controllerMetrics;
            return this;
        }

        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
            this.bootstrapMetadata = bootstrapMetadata;
            return this;
        }

        public Builder setMaxRecordsPerBatch(int i) {
            this.maxRecordsPerBatch = i;
            return this;
        }

        public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> optional) {
            this.createTopicPolicy = optional;
            return this;
        }

        public Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> optional) {
            this.alterConfigPolicy = optional;
            return this;
        }

        public Builder setCreateClusterLinkPolicy(Optional<CreateClusterLinkPolicy> optional) {
            this.createClusterLinkPolicy = optional;
            return this;
        }

        public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
            this.configurationValidator = configurationValidator;
            return this;
        }

        public Builder setAuthorizer(ClusterMetadataAuthorizer clusterMetadataAuthorizer) {
            this.authorizer = Optional.of(clusterMetadataAuthorizer);
            return this;
        }

        public Builder setCellAssignor(CellAssignor cellAssignor) {
            this.cellAssignor = cellAssignor;
            return this;
        }

        public Builder setPartitionPlacementStrategy(PartitionPlacementStrategy partitionPlacementStrategy) {
            this.partitionPlacementStrategy = partitionPlacementStrategy;
            return this;
        }

        public Builder setIsImplicitCellCreationEnabled(boolean z) {
            this.isImplicitCellCreationEnabled = z;
            return this;
        }

        public Builder setStaticConfig(Map<String, Object> map) {
            this.staticConfig = map;
            return this;
        }

        public Builder setZkMigrationEnabled(boolean z) {
            this.zkMigrationEnabled = z;
            return this;
        }

        public Builder setDefaultTopicPlacement(Optional<TopicPlacement> optional) {
            this.defaultTopicPlacement = optional;
            return this;
        }

        public Builder setCellMetrics(CellMetrics cellMetrics) {
            this.cellMetrics = cellMetrics;
            return this;
        }

        public QuorumController build() throws Exception {
            if (this.raftClient == null) {
                throw new IllegalStateException("You must set a raft client.");
            }
            if (this.bootstrapMetadata == null) {
                throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
            }
            if (this.quorumFeatures == null) {
                throw new IllegalStateException("You must specify the quorum features");
            }
            if (this.nonFatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a non-fatal fault handler.");
            }
            if (this.fatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a fatal fault handler.");
            }
            if (this.cellAssignor == null) {
                throw new IllegalStateException("You must specify a cell assignor");
            }
            if (this.cellMetrics == null) {
                throw new IllegalStateException("You must specify a cell metrics");
            }
            if (this.threadNamePrefix == null) {
                this.threadNamePrefix = String.format("quorum-controller-%d-", Integer.valueOf(this.nodeId));
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(String.format("[QuorumController id=%d] ", Integer.valueOf(this.nodeId)));
            }
            if (this.controllerMetrics == null) {
                this.controllerMetrics = (ControllerMetrics) Class.forName("org.apache.kafka.controller.metrics.MockControllerMetrics").getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            KafkaEventQueue kafkaEventQueue = null;
            try {
                kafkaEventQueue = new KafkaEventQueue(this.time, this.logContext, this.threadNamePrefix);
                return new QuorumController(this.nonFatalFaultHandler, this.fatalFaultHandler, this.logContext, this.nodeId, this.clusterId, kafkaEventQueue, this.time, this.configSchema, this.raftClient, this.quorumFeatures, this.defaultReplicationFactor, this.defaultNumPartitions, this.replicaPlacer, this.leaderImbalanceCheckIntervalNs, this.maxIdleIntervalNs, this.sessionTimeoutNs, this.controllerMetrics, this.createTopicPolicy, this.alterConfigPolicy, this.configurationValidator, this.authorizer, this.staticConfig, this.bootstrapMetadata, this.nameToTenantCallback, this.defaultMinIsrCount, this.maxRecordsPerBatch, this.zkMigrationEnabled, this.createClusterLinkPolicy, this.cellAssignor, this.partitionPlacementStrategy, this.isImplicitCellCreationEnabled, this.defaultTopicPlacement, this.cellMetrics);
            } catch (Exception e) {
                Utils.closeQuietly(kafkaEventQueue, "event queue");
                throw e;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$CompleteActivationEvent.class */
    public class CompleteActivationEvent implements ControllerWriteOperation<Void> {
        CompleteActivationEvent() {
        }

        @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
        public ControllerResult<Void> generateRecordsAndResult() {
            try {
                return ControllerResult.atomicOf(QuorumController.generateActivationRecords(QuorumController.this.log, QuorumController.this.logReplayTracker.empty(), QuorumController.this.zkMigrationEnabled, QuorumController.this.bootstrapMetadata, QuorumController.this.featureControl), null);
            } catch (Throwable th) {
                throw QuorumController.this.fatalFaultHandler.handleFault("exception while completing controller activation", th);
            }
        }

        @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
        public void processBatchEndOffset(long j) {
            QuorumController.this.maybeScheduleNextBalancePartitionLeaders();
            QuorumController.this.maybeScheduleNextWriteNoOpRecord();
            KafkaEventQueue kafkaEventQueue = QuorumController.this.queue;
            QuorumController quorumController = QuorumController.this;
            EncryptionControlManager encryptionControlManager = QuorumController.this.encryptionControl;
            encryptionControlManager.getClass();
            kafkaEventQueue.prepend(new ControllerWriteEvent("maybeInstallEncryptor", encryptionControlManager::maybeInstallEncryptor, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ConfigResourceExistenceChecker.class */
    public class ConfigResourceExistenceChecker implements Consumer<ConfigResource> {
        ConfigResourceExistenceChecker() {
        }

        @Override // java.util.function.Consumer
        public void accept(ConfigResource configResource) {
            switch (configResource.type()) {
                case BROKER_LOGGER:
                default:
                    return;
                case BROKER:
                    if (configResource.name().isEmpty()) {
                        return;
                    }
                    try {
                        int parseInt = Integer.parseInt(configResource.name());
                        if (!QuorumController.this.clusterControl.brokerRegistrations().containsKey(Integer.valueOf(parseInt)) && !QuorumController.this.featureControl.isControllerId(parseInt)) {
                            throw new BrokerIdNotRegisteredException("No node with id " + parseInt + " found.");
                        }
                        return;
                    } catch (NumberFormatException e) {
                        throw new InvalidRequestException("Invalid broker name " + configResource.name());
                    }
                case TOPIC:
                    if (QuorumController.this.replicationControl.getTopicId(configResource.name()) == null) {
                        throw new UnknownTopicOrPartitionException("The topic '" + configResource.name() + "' does not exist.");
                    }
                    return;
                case CLUSTER_LINK:
                    if (!QuorumController.this.clusterLinkControl.getClusterLink(configResource.name()).isPresent()) {
                        throw new ClusterLinkNotFoundException("The cluster link " + configResource.name() + " does not exist.");
                    }
                    return;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerEvent.class */
    public class ControllerEvent implements EventQueue.Event {
        private final String name;
        private final Runnable handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs = OptionalLong.empty();

        ControllerEvent(String str, Runnable runnable) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.name = str;
            this.handler = runnable;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            long nanoseconds = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = OptionalLong.of(nanoseconds);
            QuorumController.this.log.debug("Executing {}.", this);
            this.handler.run();
            QuorumController.this.handleEventEnd(toString(), this.startProcessingTimeNs.getAsLong());
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, th);
        }

        public String toString() {
            return this.name;
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerOperationFlag.class */
    public enum ControllerOperationFlag {
        DOES_NOT_UPDATE_QUEUE_TIME,
        RUNS_IN_PREMIGRATION
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerReadEvent.class */
    public class ControllerReadEvent<T> implements EventQueue.Event {
        private final String name;
        private final Supplier<T> handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
        private final CompletableFuture<T> future = new CompletableFuture<>();

        ControllerReadEvent(String str, Supplier<T> supplier) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.name = str;
            this.handler = supplier;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            long nanoseconds = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = OptionalLong.of(nanoseconds);
            T t = this.handler.get();
            QuorumController.this.handleEventEnd(toString(), this.startProcessingTimeNs.getAsLong());
            this.future.complete(t);
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, th));
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerWriteEvent.class */
    public class ControllerWriteEvent<T> implements EventQueue.Event, DeferredEvent {
        private final String name;
        private final ControllerWriteOperation<T> op;
        private final long eventCreatedTimeNs;
        private final EnumSet<ControllerOperationFlag> flags;
        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
        private final CompletableFuture<T> future = new CompletableFuture<>();
        private ControllerResultAndOffset<T> resultAndOffset = null;

        /* renamed from: org.apache.kafka.controller.QuorumController$ControllerWriteEvent$1 */
        /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerWriteEvent$1.class */
        class AnonymousClass1 implements Function<List<ApiMessageAndVersion>, Long> {
            private long prevEndOffset;
            final /* synthetic */ int val$controllerEpoch;

            AnonymousClass1(int i) {
                r6 = i;
                this.prevEndOffset = QuorumController.this.writeOffset;
            }

            @Override // java.util.function.Function
            public Long apply(List<ApiMessageAndVersion> list) {
                int i = 1;
                for (ApiMessageAndVersion apiMessageAndVersion : list) {
                    try {
                        QuorumController.this.replay(apiMessageAndVersion.message(), Optional.empty(), this.prevEndOffset + list.size());
                        i++;
                    } catch (Throwable th) {
                        throw QuorumController.this.fatalFaultHandler.handleFault(String.format("Unable to apply %s record, which was %d of %d record(s) in the batch following last write offset %d.", apiMessageAndVersion.message().getClass().getSimpleName(), Integer.valueOf(i), Integer.valueOf(list.size()), Long.valueOf(this.prevEndOffset)), th);
                    }
                }
                this.prevEndOffset = QuorumController.this.raftClient.scheduleAtomicAppend(r6, list);
                QuorumController.this.snapshotRegistry.getOrCreateSnapshot(this.prevEndOffset);
                return Long.valueOf(this.prevEndOffset);
            }
        }

        ControllerWriteEvent(String str, ControllerWriteOperation<T> controllerWriteOperation, EnumSet<ControllerOperationFlag> enumSet) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.name = str;
            this.op = controllerWriteOperation;
            this.flags = enumSet;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() throws Exception {
            long nanoseconds = QuorumController.this.time.nanoseconds();
            if (!this.flags.contains(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME)) {
                QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds - this.eventCreatedTimeNs));
            }
            int i = QuorumController.this.curClaimEpoch;
            if (!QuorumController.isActiveController(i)) {
                throw ControllerExceptions.newWrongControllerException(QuorumController.this.latestController());
            }
            if (QuorumController.this.featureControl.inPreMigrationMode() && !this.flags.contains(ControllerOperationFlag.RUNS_IN_PREMIGRATION)) {
                QuorumController.this.log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", this.name);
                throw ControllerExceptions.newPreMigrationException(QuorumController.this.latestController());
            }
            this.startProcessingTimeNs = OptionalLong.of(nanoseconds);
            ControllerResult<T> generateRecordsAndResult = this.op.generateRecordsAndResult();
            if (generateRecordsAndResult.records().isEmpty()) {
                this.op.processBatchEndOffset(QuorumController.this.writeOffset);
                OptionalLong highestPendingOffset = QuorumController.this.deferredEventQueue.highestPendingOffset();
                if (highestPendingOffset.isPresent()) {
                    this.resultAndOffset = ControllerResultAndOffset.of(highestPendingOffset.getAsLong(), generateRecordsAndResult);
                    QuorumController.this.log.debug("Read-only operation {} will be completed when the log reaches offset {}", this, Long.valueOf(this.resultAndOffset.offset()));
                } else {
                    this.resultAndOffset = ControllerResultAndOffset.of(-1L, generateRecordsAndResult);
                    QuorumController.this.log.debug("Completing read-only operation {} immediately because the purgatory is empty.", this);
                    complete(null);
                }
            } else {
                long appendRecords = QuorumController.appendRecords(QuorumController.this.log, generateRecordsAndResult, QuorumController.this.maxRecordsPerBatch, new Function<List<ApiMessageAndVersion>, Long>() { // from class: org.apache.kafka.controller.QuorumController.ControllerWriteEvent.1
                    private long prevEndOffset;
                    final /* synthetic */ int val$controllerEpoch;

                    AnonymousClass1(int i2) {
                        r6 = i2;
                        this.prevEndOffset = QuorumController.this.writeOffset;
                    }

                    @Override // java.util.function.Function
                    public Long apply(List<ApiMessageAndVersion> list) {
                        int i2 = 1;
                        for (ApiMessageAndVersion apiMessageAndVersion : list) {
                            try {
                                QuorumController.this.replay(apiMessageAndVersion.message(), Optional.empty(), this.prevEndOffset + list.size());
                                i2++;
                            } catch (Throwable th) {
                                throw QuorumController.this.fatalFaultHandler.handleFault(String.format("Unable to apply %s record, which was %d of %d record(s) in the batch following last write offset %d.", apiMessageAndVersion.message().getClass().getSimpleName(), Integer.valueOf(i2), Integer.valueOf(list.size()), Long.valueOf(this.prevEndOffset)), th);
                            }
                        }
                        this.prevEndOffset = QuorumController.this.raftClient.scheduleAtomicAppend(r6, list);
                        QuorumController.this.snapshotRegistry.getOrCreateSnapshot(this.prevEndOffset);
                        return Long.valueOf(this.prevEndOffset);
                    }
                });
                this.op.processBatchEndOffset(appendRecords);
                QuorumController.this.updateWriteOffset(appendRecords);
                this.resultAndOffset = ControllerResultAndOffset.of(appendRecords, generateRecordsAndResult);
                QuorumController.this.log.debug("Read-write operation {} will be completed when the log reaches offset {}.", this, Long.valueOf(this.resultAndOffset.offset()));
            }
            QuorumController.this.maybeScheduleNextBalancePartitionLeaders();
            if (this.future.isDone()) {
                return;
            }
            QuorumController.this.deferredEventQueue.add(this.resultAndOffset.offset(), this);
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void handleException(Throwable th) {
            complete(th);
        }

        @Override // org.apache.kafka.deferred.DeferredEvent
        public void complete(Throwable th) {
            if (th != null) {
                this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, th));
            } else {
                QuorumController.this.handleEventEnd(toString(), this.startProcessingTimeNs.getAsLong());
                this.future.complete(this.resultAndOffset.response());
            }
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerWriteOperation.class */
    public interface ControllerWriteOperation<T> {
        ControllerResult<T> generateRecordsAndResult() throws Exception;

        default void processBatchEndOffset(long j) {
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ImbalanceSchedule.class */
    public enum ImbalanceSchedule {
        SCHEDULED,
        DEFERRED,
        RETRY_AFTER_BACKOFF
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$MigrationRecordConsumer.class */
    public class MigrationRecordConsumer implements ZkRecordConsumer {
        private volatile OffsetAndEpoch highestMigrationRecordOffset;

        /* loaded from: input_file:org/apache/kafka/controller/QuorumController$MigrationRecordConsumer$MigrationWriteOperation.class */
        class MigrationWriteOperation implements ControllerWriteOperation<Void> {
            private final List<ApiMessageAndVersion> batch;

            MigrationWriteOperation(List<ApiMessageAndVersion> list) {
                this.batch = list;
            }

            @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
            public ControllerResult<Void> generateRecordsAndResult() {
                return ControllerResult.atomicOf(this.batch, null);
            }

            @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
            public void processBatchEndOffset(long j) {
                MigrationRecordConsumer.this.highestMigrationRecordOffset = new OffsetAndEpoch(j, QuorumController.this.curClaimEpoch);
            }
        }

        MigrationRecordConsumer() {
        }

        @Override // org.apache.kafka.metadata.migration.ZkRecordConsumer
        public void beginMigration() {
            QuorumController.this.log.info("Starting ZK Migration");
        }

        @Override // org.apache.kafka.metadata.migration.ZkRecordConsumer
        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> list) {
            if (QuorumController.this.queue.size() > 100) {
                CompletableFuture<?> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large"));
                return completableFuture;
            }
            ControllerWriteEvent controllerWriteEvent = new ControllerWriteEvent("ZK Migration Batch", new MigrationWriteOperation(list), EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
            QuorumController.this.queue.append(controllerWriteEvent);
            return controllerWriteEvent.future;
        }

        @Override // org.apache.kafka.metadata.migration.ZkRecordConsumer
        public CompletableFuture<OffsetAndEpoch> completeMigration() {
            QuorumController.this.log.info("Completing ZK Migration");
            ControllerWriteEvent controllerWriteEvent = new ControllerWriteEvent("Complete ZK Migration", new MigrationWriteOperation(Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
            QuorumController.this.queue.append(controllerWriteEvent);
            return controllerWriteEvent.future.thenApply(r3 -> {
                return this.highestMigrationRecordOffset;
            });
        }

        @Override // org.apache.kafka.metadata.migration.ZkRecordConsumer
        public void abortMigration() {
            QuorumController.this.fatalFaultHandler.handleFault("Aborting the ZK migration");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$QuorumMetaLogListener.class */
    public class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> {
        QuorumMetaLogListener() {
        }

        @Override // org.apache.kafka.raft.RaftClient.Listener
        public void handleCommit(BatchReader<ApiMessageAndVersion> batchReader) {
            appendRaftEvent("handleCommit[baseOffset=" + batchReader.baseOffset() + SelectorUtils.PATTERN_HANDLER_SUFFIX, () -> {
                RuntimeException handleFault;
                try {
                    QuorumController.this.maybeCompleteAuthorizerInitialLoad();
                    boolean isActiveController = QuorumController.this.isActiveController();
                    while (batchReader.hasNext()) {
                        Batch next = batchReader.next();
                        long lastOffset = next.lastOffset();
                        int epoch = next.epoch();
                        List<ApiMessageAndVersion> records = next.records();
                        if (isActiveController) {
                            QuorumController.this.log.debug("Completing purgatory items up to offset {} and epoch {}.", Long.valueOf(lastOffset), Integer.valueOf(epoch));
                            QuorumController.this.deferredEventQueue.completeUpTo(lastOffset);
                            QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(lastOffset);
                        } else {
                            if (QuorumController.this.log.isDebugEnabled()) {
                                QuorumController.this.log.debug("Replaying commits from the active node up to offset {} and epoch {}.", Long.valueOf(lastOffset), Integer.valueOf(epoch));
                            }
                            int i = 1;
                            Iterator<ApiMessageAndVersion> it = records.iterator();
                            while (it.hasNext()) {
                                try {
                                    QuorumController.this.replay(it.next().message(), Optional.empty(), lastOffset);
                                    i++;
                                } finally {
                                }
                            }
                        }
                        QuorumController.this.controllerMetricsManager.replayBatch(next.baseOffset(), records);
                        QuorumController.this.updateLastCommittedState(lastOffset, epoch, next.appendTimestamp());
                        if (lastOffset >= ((Long) QuorumController.this.raftClient.latestSnapshotId().map((v0) -> {
                            return v0.offset();
                        }).orElse(0L)).longValue()) {
                            QuorumController.access$3902(QuorumController.this, Math.min(QuorumController.this.oldestNonSnapshottedTimestamp, next.appendTimestamp()));
                        }
                    }
                } finally {
                    batchReader.close();
                }
            });
        }

        @Override // org.apache.kafka.raft.RaftClient.Listener
        public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> snapshotReader) {
            appendRaftEvent(String.format("handleLoadSnapshot[snapshotId=%s]", snapshotReader.snapshotId()), () -> {
                try {
                    String filenameFromSnapshotId = Snapshots.filenameFromSnapshotId(snapshotReader.snapshotId());
                    if (QuorumController.this.isActiveController()) {
                        throw QuorumController.this.fatalFaultHandler.handleFault("Asked to load snapshot " + filenameFromSnapshotId + ", but we are the active controller at epoch " + QuorumController.this.curClaimEpoch);
                    }
                    QuorumController.this.log.info("Starting to replay snapshot {}, from last commit offset {} and epoch {}", filenameFromSnapshotId, Long.valueOf(QuorumController.this.lastCommittedOffset), Integer.valueOf(QuorumController.this.lastCommittedEpoch));
                    QuorumController.this.resetToEmptyState();
                    while (snapshotReader.hasNext()) {
                        Batch next = snapshotReader.next();
                        long lastOffset = next.lastOffset();
                        List<ApiMessageAndVersion> records = next.records();
                        QuorumController.this.log.debug("Replaying snapshot {} batch with last offset of {}", filenameFromSnapshotId, Long.valueOf(lastOffset));
                        int i = 1;
                        for (ApiMessageAndVersion apiMessageAndVersion : records) {
                            try {
                                QuorumController.this.replay(apiMessageAndVersion.message(), Optional.of(snapshotReader.snapshotId()), snapshotReader.lastContainedLogOffset());
                                QuorumController.this.controllerMetricsManager.replay(apiMessageAndVersion.message());
                                i++;
                            } catch (Throwable th) {
                                throw QuorumController.this.fatalFaultHandler.handleFault(String.format("Unable to apply %s record from snapshot %s on standby controller, which was %d of %d record(s) in the batch with baseOffset %d.", apiMessageAndVersion.message().getClass().getSimpleName(), snapshotReader.snapshotId(), Integer.valueOf(i), Integer.valueOf(records.size()), Long.valueOf(next.baseOffset())), th);
                            }
                        }
                    }
                    QuorumController.this.log.info("Finished replaying snapshot {}", filenameFromSnapshotId);
                    QuorumController.this.updateLastCommittedState(snapshotReader.lastContainedLogOffset(), snapshotReader.lastContainedLogEpoch(), snapshotReader.lastContainedLogTimestamp());
                    QuorumController.this.snapshotRegistry.getOrCreateSnapshot(QuorumController.this.lastCommittedOffset);
                    QuorumController.this.authorizer.ifPresent(clusterMetadataAuthorizer -> {
                        clusterMetadataAuthorizer.loadSnapshot(QuorumController.this.aclControlManager.idToAcl());
                    });
                    snapshotReader.close();
                } catch (Throwable th2) {
                    snapshotReader.close();
                    throw th2;
                }
            });
        }

        @Override // org.apache.kafka.raft.RaftClient.Listener
        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            appendRaftEvent("handleLeaderChange[" + leaderAndEpoch.epoch() + SelectorUtils.PATTERN_HANDLER_SUFFIX, () -> {
                String valueOf = leaderAndEpoch.leaderId().isPresent() ? String.valueOf(leaderAndEpoch.leaderId().getAsInt()) : "(none)";
                if (!QuorumController.this.isActiveController()) {
                    if (!leaderAndEpoch.isLeader(QuorumController.this.nodeId)) {
                        QuorumController.this.log.info("In the new epoch {}, the leader is {}.", Integer.valueOf(leaderAndEpoch.epoch()), valueOf);
                        return;
                    } else {
                        QuorumController.this.log.info("Becoming the active controller at epoch {}, committed offset {}, committed epoch {}", Integer.valueOf(leaderAndEpoch.epoch()), Long.valueOf(QuorumController.this.lastCommittedOffset), Integer.valueOf(QuorumController.this.lastCommittedEpoch));
                        QuorumController.this.claim(leaderAndEpoch.epoch());
                        return;
                    }
                }
                if (leaderAndEpoch.isLeader(QuorumController.this.nodeId)) {
                    QuorumController.this.log.warn("We were the leader in epoch {}, and are still the leader in the new epoch {}.", Integer.valueOf(QuorumController.this.curClaimEpoch), Integer.valueOf(leaderAndEpoch.epoch()));
                    QuorumController.this.curClaimEpoch = leaderAndEpoch.epoch();
                } else {
                    QuorumController.this.log.warn("Renouncing the leadership due to a metadata log event. We were the leader at epoch {}, but in the new epoch {}, the leader is {}. Reverting to last committed offset {}.", Integer.valueOf(QuorumController.this.curClaimEpoch), Integer.valueOf(leaderAndEpoch.epoch()), valueOf, Long.valueOf(QuorumController.this.lastCommittedOffset));
                    QuorumController.this.renounce();
                }
            });
        }

        @Override // org.apache.kafka.raft.RaftClient.Listener
        public void beginShutdown() {
            QuorumController.this.queue.beginShutdown("MetaLogManager.Listener");
        }

        private void appendRaftEvent(String str, Runnable runnable) {
            QuorumController.this.appendControlEvent(str, () -> {
                if (this != QuorumController.this.metaLogListener) {
                    QuorumController.this.log.debug("Ignoring {} raft event from an old registration", str);
                    return;
                }
                try {
                    runnable.run();
                } finally {
                    QuorumController.this.maybeCompleteAuthorizerInitialLoad();
                }
            });
        }
    }

    public OptionalInt latestController() {
        return this.raftClient.leaderAndEpoch().leaderId();
    }

    private long currentReadOffset() {
        if (isActiveController()) {
            return this.lastCommittedOffset;
        }
        return Long.MAX_VALUE;
    }

    public void handleEventEnd(String str, long j) {
        long nanoseconds = this.time.nanoseconds() - j;
        this.log.debug("Processed {} in {} us", str, Long.valueOf(TimeUnit.MICROSECONDS.convert(nanoseconds, TimeUnit.NANOSECONDS)));
        this.controllerMetrics.updateEventQueueProcessingTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds));
    }

    public Throwable handleEventException(String str, OptionalLong optionalLong, Throwable th) {
        Throwable externalException = ControllerExceptions.toExternalException(th, () -> {
            return latestController();
        });
        if (!optionalLong.isPresent()) {
            this.log.error("{}: unable to start processing because of {}. Reason: {}", str, th.getClass().getSimpleName(), th.getMessage());
            return externalException;
        }
        long convert = TimeUnit.MICROSECONDS.convert(this.time.nanoseconds() - optionalLong.getAsLong(), TimeUnit.NANOSECONDS);
        if (ControllerExceptions.isExpected(th)) {
            this.log.info("{}: failed with {} in {} us. Reason: {}", str, th.getClass().getSimpleName(), Long.valueOf(convert), th.getMessage());
            return externalException;
        }
        if (isActiveController()) {
            this.nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server exception %s at epoch %d in %d us. Renouncing leadership and reverting to the last committed offset %d.", str, th.getClass().getSimpleName(), Integer.valueOf(this.curClaimEpoch), Long.valueOf(convert), Long.valueOf(this.lastCommittedOffset)), th);
            renounce();
        } else {
            this.nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server exception %s in %d us. The controller is already in standby mode.", str, th.getClass().getSimpleName(), Long.valueOf(convert)), th);
        }
        return externalException;
    }

    public void appendControlEvent(String str, Runnable runnable) {
        this.queue.append(new ControllerEvent(str, runnable));
    }

    ReplicationControlManager replicationControl() {
        return this.replicationControl;
    }

    ClusterControlManager clusterControl() {
        return this.clusterControl;
    }

    FeatureControlManager featureControl() {
        return this.featureControl;
    }

    ConfigurationControlManager configurationControl() {
        return this.configurationControl;
    }

    public ZkRecordConsumer zkRecordConsumer() {
        return this.zkRecordConsumer;
    }

    public ClusterLinkControlManager clusterLinkControlManager() {
        return this.clusterLinkControl;
    }

    <T> CompletableFuture<T> appendReadEvent(String str, OptionalLong optionalLong, Supplier<T> supplier) {
        ControllerReadEvent controllerReadEvent = new ControllerReadEvent(str, supplier);
        if (optionalLong.isPresent()) {
            this.queue.appendWithDeadline(optionalLong.getAsLong(), controllerReadEvent);
        } else {
            this.queue.append(controllerReadEvent);
        }
        return controllerReadEvent.future();
    }

    static long appendRecords(Logger logger, ControllerResult<?> controllerResult, int i, Function<List<ApiMessageAndVersion>, Long> function) {
        try {
            List<ApiMessageAndVersion> records = controllerResult.records();
            if (controllerResult.isAtomic()) {
                if (records.size() > i) {
                    throw new IllegalStateException("Attempted to atomically commit " + records.size() + " records, but maxRecordsPerBatch is " + i);
                }
                long longValue = function.apply(records).longValue();
                if (logger.isTraceEnabled()) {
                    logger.trace("Atomically appended {} record(s) ending with offset {}.", Integer.valueOf(records.size()), Long.valueOf(longValue));
                }
                return longValue;
            }
            int i2 = 0;
            int i3 = 0;
            while (true) {
                i3++;
                int i4 = i2 + i;
                if (i4 > records.size()) {
                    break;
                }
                function.apply(records.subList(i2, i4));
                i2 += i;
            }
            long longValue2 = function.apply(records.subList(i2, records.size())).longValue();
            if (logger.isTraceEnabled()) {
                logger.trace("Appended {} record(s) in {} batch(es), ending with offset {}.", Integer.valueOf(records.size()), Integer.valueOf(i3), Long.valueOf(longValue2));
            }
            return longValue2;
        } catch (ApiException e) {
            throw new RuntimeException(e);
        }
    }

    <T> CompletableFuture<T> appendWriteEvent(String str, OptionalLong optionalLong, ControllerWriteOperation<T> controllerWriteOperation) {
        return appendWriteEvent(str, optionalLong, controllerWriteOperation, EnumSet.noneOf(ControllerOperationFlag.class));
    }

    <T> CompletableFuture<T> appendWriteEvent(String str, OptionalLong optionalLong, ControllerWriteOperation<T> controllerWriteOperation, EnumSet<ControllerOperationFlag> enumSet) {
        ControllerWriteEvent controllerWriteEvent = new ControllerWriteEvent(str, controllerWriteOperation, enumSet);
        if (optionalLong.isPresent()) {
            this.queue.appendWithDeadline(optionalLong.getAsLong(), controllerWriteEvent);
        } else {
            this.queue.append(controllerWriteEvent);
        }
        return controllerWriteEvent.future();
    }

    public void maybeCompleteAuthorizerInitialLoad() {
        if (this.needToCompleteAuthorizerLoad) {
            OptionalLong highWatermark = this.raftClient.highWatermark();
            if (!highWatermark.isPresent()) {
                this.log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not set.");
            } else {
                if (this.lastCommittedOffset + 1 < highWatermark.getAsLong()) {
                    this.log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed because lastCommittedOffset  = {}, but highWatermark = {}.", Long.valueOf(this.lastCommittedOffset), Long.valueOf(highWatermark.getAsLong()));
                    return;
                }
                this.log.info("maybeCompleteAuthorizerInitialLoad: completing authorizer initial load at last committed offset {}.", Long.valueOf(this.lastCommittedOffset));
                this.authorizer.get().completeInitialLoad();
                this.needToCompleteAuthorizerLoad = false;
            }
        }
    }

    public boolean isActiveController() {
        return isActiveController(this.curClaimEpoch);
    }

    public static boolean isActiveController(int i) {
        return i != -1;
    }

    public void updateWriteOffset(long j) {
        this.writeOffset = j;
        if (isActiveController()) {
            this.controllerMetrics.setLastAppliedRecordOffset(this.writeOffset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(this.time.milliseconds());
        } else {
            this.controllerMetrics.setLastAppliedRecordOffset(this.lastCommittedOffset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(this.lastCommittedTimestamp);
        }
    }

    public void claim(int i) {
        try {
            long milliseconds = this.time.milliseconds();
            if (this.curClaimEpoch != -1) {
                throw new RuntimeException("Cannot claim leadership because we are already the active controller.");
            }
            this.curClaimEpoch = i;
            this.controllerMetrics.setActive(true);
            updateWriteOffset(this.lastCommittedOffset);
            this.clusterControl.activate();
            this.snapshotRegistry.getOrCreateSnapshot(this.lastCommittedOffset);
            this.queue.prepend(new ControllerWriteEvent("completeActivation[" + i + SelectorUtils.PATTERN_HANDLER_SUFFIX, new CompleteActivationEvent(), EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME, ControllerOperationFlag.RUNS_IN_PREMIGRATION)));
            KafkaEventQueue kafkaEventQueue = this.queue;
            ReplicationControlManager replicationControlManager = this.replicationControl;
            replicationControlManager.getClass();
            kafkaEventQueue.append(new ControllerWriteEvent(LEADER_ELECTION_FOR_DEMOTED_BROKERS, replicationControlManager::tryUnelectDemotedLeaders, EnumSet.noneOf(ControllerOperationFlag.class)));
            appendControlEvent("updateControllerLoadTime", () -> {
                this.controllerMetrics.recordControllerLoadTime(milliseconds, this.time.milliseconds());
            });
        } catch (Throwable th) {
            this.fatalFaultHandler.handleFault("exception while claiming leadership", th);
        }
    }

    public static List<ApiMessageAndVersion> generateActivationRecords(Logger logger, boolean z, boolean z2, BootstrapMetadata bootstrapMetadata, FeatureControlManager featureControlManager) {
        ArrayList arrayList = new ArrayList();
        if (z) {
            logger.info("The metadata log appears to be empty. Appending {} bootstrap record(s) at metadata.version {} from {}.", Integer.valueOf(bootstrapMetadata.records().size()), bootstrapMetadata.metadataVersion(), bootstrapMetadata.source());
            arrayList.addAll(bootstrapMetadata.records());
            if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
                if (z2) {
                    logger.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until the ZK metadata has been migrated");
                    arrayList.add(ZkMigrationState.PRE_MIGRATION.toRecord());
                } else {
                    logger.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.");
                    arrayList.add(ZkMigrationState.NONE.toRecord());
                }
            } else if (z2) {
                throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + " does not support ZK migrations. Cannot continue with ZK migrations enabled.");
            }
        } else {
            if (featureControlManager.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
                logger.info("No metadata.version feature level record was found in the log. Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION);
            }
            if (featureControlManager.metadataVersion().isMigrationSupported()) {
                logger.info("Loaded ZK migration state of {}", featureControlManager.zkMigrationState());
                switch (featureControlManager.zkMigrationState()) {
                    case NONE:
                        if (z2) {
                            throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode.");
                        }
                        break;
                    case PRE_MIGRATION:
                        logger.warn("Activating pre-migration controller without empty log. There may be a partial migration");
                        break;
                    case MIGRATION:
                        if (!z2) {
                            logger.warn("Completing the ZK migration since this controller was configured with 'zookeeper.metadata.migration.enable' set to 'false'.");
                            arrayList.add(ZkMigrationState.POST_MIGRATION.toRecord());
                            break;
                        } else {
                            logger.info("Staying in the ZK migration since 'zookeeper.metadata.migration.enable' is still 'true'.");
                            break;
                        }
                    case POST_MIGRATION:
                        if (z2) {
                            logger.info("Ignoring 'zookeeper.metadata.migration.enable' value of 'true' since the ZK migrationhas been completed.");
                            break;
                        }
                        break;
                }
            } else if (z2) {
                throw new RuntimeException("Should not have ZK migrations enabled on a cluster running metadata.version " + featureControlManager.metadataVersion());
            }
        }
        return arrayList;
    }

    public void updateLastCommittedState(long j, int i, long j2) {
        this.lastCommittedOffset = j;
        this.lastCommittedEpoch = i;
        this.lastCommittedTimestamp = j2;
        this.controllerMetrics.setLastCommittedRecordOffset(j);
        if (isActiveController()) {
            return;
        }
        this.controllerMetrics.setLastAppliedRecordOffset(j);
        this.controllerMetrics.setLastAppliedRecordTimestamp(j2);
    }

    public void renounce() {
        try {
            if (this.curClaimEpoch == -1) {
                throw new RuntimeException("Cannot renounce leadership because we are not the current leader.");
            }
            this.raftClient.resign(this.curClaimEpoch);
            this.curClaimEpoch = -1;
            this.controllerMetrics.setActive(false);
            this.deferredEventQueue.failAll(ControllerExceptions.newWrongControllerException(OptionalInt.empty()));
            if (!this.snapshotRegistry.hasSnapshot(this.lastCommittedOffset)) {
                throw new RuntimeException("Unable to find last committed offset " + this.lastCommittedEpoch + " in snapshot registry.");
            }
            this.snapshotRegistry.revertToSnapshot(this.lastCommittedOffset);
            this.authorizer.ifPresent(clusterMetadataAuthorizer -> {
                clusterMetadataAuthorizer.loadSnapshot(this.aclControlManager.idToAcl());
            });
            this.replicationControl.resetConfluentPartitionsPerTopicListener();
            updateWriteOffset(-1L);
            this.clusterControl.deactivate();
            cancelMaybeFenceReplicas();
            cancelMaybeBalancePartitionLeaders();
            cancelNextWriteNoOpRecord();
        } catch (Throwable th) {
            this.fatalFaultHandler.handleFault("exception while renouncing leadership", th);
        }
    }

    private <T> void scheduleDeferredWriteEvent(String str, long j, ControllerWriteOperation<T> controllerWriteOperation, EnumSet<ControllerOperationFlag> enumSet) {
        if (!enumSet.contains(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME)) {
            throw new RuntimeException("deferred events should not update the queue time.");
        }
        ControllerWriteEvent controllerWriteEvent = new ControllerWriteEvent(str, controllerWriteOperation, enumSet);
        this.queue.scheduleDeferred(str, new EventQueue.EarliestDeadlineFunction(j), controllerWriteEvent);
        controllerWriteEvent.future.exceptionally((Function) th -> {
            if (ControllerExceptions.isTimeoutException(th)) {
                this.log.error("Cancelling deferred write event {} because the event queue is now closed.", str);
                return null;
            }
            if (th instanceof NotControllerException) {
                this.log.debug("Cancelling deferred write event {} because this controller is no longer active.", str);
                return null;
            }
            this.log.error("Unexpected exception while executing deferred write event {}. Rescheduling for a minute from now.", str, th);
            scheduleDeferredWriteEvent(str, j + TimeUnit.NANOSECONDS.convert(1L, TimeUnit.MINUTES), controllerWriteOperation, enumSet);
            return null;
        });
    }

    public void rescheduleMaybeFenceStaleBrokers() {
        long nextCheckTimeNs = this.clusterControl.heartbeatManager().nextCheckTimeNs();
        if (nextCheckTimeNs == Long.MAX_VALUE) {
            cancelMaybeFenceReplicas();
        } else {
            scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
                ControllerResult<Void> maybeFenceOneStaleBroker = this.replicationControl.maybeFenceOneStaleBroker();
                rescheduleMaybeFenceStaleBrokers();
                return maybeFenceOneStaleBroker;
            }, EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME, ControllerOperationFlag.RUNS_IN_PREMIGRATION));
        }
    }

    private void cancelMaybeFenceReplicas() {
        this.queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
    }

    public void maybeScheduleNextBalancePartitionLeaders() {
        if (this.imbalancedScheduled != ImbalanceSchedule.SCHEDULED && this.leaderImbalanceCheckIntervalNs.isPresent() && this.replicationControl.arePartitionLeadersImbalanced()) {
            this.log.debug("Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", MAYBE_BALANCE_PARTITION_LEADERS, this.imbalancedScheduled, this.leaderImbalanceCheckIntervalNs, Boolean.valueOf(this.replicationControl.arePartitionLeadersImbalanced()));
            ControllerWriteEvent controllerWriteEvent = new ControllerWriteEvent(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
                ControllerResult<Boolean> maybeBalancePartitionLeaders = this.replicationControl.maybeBalancePartitionLeaders();
                if (maybeBalancePartitionLeaders.response().booleanValue()) {
                    this.imbalancedScheduled = ImbalanceSchedule.RETRY_AFTER_BACKOFF;
                } else {
                    this.imbalancedScheduled = ImbalanceSchedule.DEFERRED;
                }
                return maybeBalancePartitionLeaders;
            }, EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME));
            long nanoseconds = this.time.nanoseconds();
            this.queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new EventQueue.EarliestDeadlineFunction(this.imbalancedScheduled == ImbalanceSchedule.DEFERRED ? nanoseconds + this.leaderImbalanceCheckIntervalNs.getAsLong() : nanoseconds + TimeUnit.NANOSECONDS.convert(100L, TimeUnit.MILLISECONDS)), controllerWriteEvent);
            this.imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
        }
    }

    private void cancelMaybeBalancePartitionLeaders() {
        this.imbalancedScheduled = ImbalanceSchedule.DEFERRED;
        this.queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
    }

    public void maybeScheduleNextWriteNoOpRecord() {
        if (!this.noOpRecordScheduled && this.maxIdleIntervalNs.isPresent() && this.featureControl.metadataVersion().isNoOpRecordSupported()) {
            this.log.debug("Scheduling write event for {} because maxIdleIntervalNs ({}) and metadataVersion ({})", WRITE_NO_OP_RECORD, Long.valueOf(this.maxIdleIntervalNs.getAsLong()), this.featureControl.metadataVersion());
            this.queue.scheduleDeferred(WRITE_NO_OP_RECORD, new EventQueue.EarliestDeadlineFunction(this.time.nanoseconds() + this.maxIdleIntervalNs.getAsLong()), new ControllerWriteEvent(WRITE_NO_OP_RECORD, () -> {
                this.noOpRecordScheduled = false;
                maybeScheduleNextWriteNoOpRecord();
                return ControllerResult.of(Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)), null);
            }, EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME, ControllerOperationFlag.RUNS_IN_PREMIGRATION)));
            this.noOpRecordScheduled = true;
        }
    }

    private void cancelNextWriteNoOpRecord() {
        this.noOpRecordScheduled = false;
        this.queue.cancelDeferred(WRITE_NO_OP_RECORD);
    }

    private void handleFeatureControlChange() {
        if (isActiveController()) {
            if (this.featureControl.metadataVersion().isNoOpRecordSupported()) {
                maybeScheduleNextWriteNoOpRecord();
            } else {
                cancelNextWriteNoOpRecord();
            }
        }
    }

    public void replay(ApiMessage apiMessage, Optional<OffsetAndEpoch> optional, long j) {
        if (this.log.isTraceEnabled()) {
            if (optional.isPresent()) {
                this.log.trace("Replaying snapshot {} record {}", Snapshots.filenameFromSnapshotId(optional.get()), this.recordRedactor.toLoggableString(apiMessage));
            } else {
                this.log.trace("Replaying log record {} with batchLastOffset {}", this.recordRedactor.toLoggableString(apiMessage), Long.valueOf(j));
            }
        }
        this.logReplayTracker.replay(apiMessage);
        MetadataRecordType fromId = MetadataRecordType.fromId(apiMessage.apiKey());
        if (fromId == MetadataRecordType.ENCRYPTED_ENVELOPE_RECORD) {
            apiMessage = this.encryptionControl.decrypt((EncryptedEnvelopeRecord) apiMessage).message();
            fromId = MetadataRecordType.fromId(apiMessage.apiKey());
        }
        switch (fromId) {
            case REGISTER_BROKER_RECORD:
                this.clusterControl.replay((RegisterBrokerRecord) apiMessage, j);
                return;
            case UNREGISTER_BROKER_RECORD:
                this.clusterControl.replay((UnregisterBrokerRecord) apiMessage);
                return;
            case TOPIC_RECORD:
                this.replicationControl.replay((TopicRecord) apiMessage);
                return;
            case PARTITION_RECORD:
                this.replicationControl.replay((PartitionRecord) apiMessage);
                return;
            case CONFIG_RECORD:
                this.configurationControl.replay((ConfigRecord) apiMessage);
                return;
            case PARTITION_CHANGE_RECORD:
                this.replicationControl.replay((PartitionChangeRecord) apiMessage);
                return;
            case FENCE_BROKER_RECORD:
                this.clusterControl.replay((FenceBrokerRecord) apiMessage);
                return;
            case UNFENCE_BROKER_RECORD:
                this.clusterControl.replay((UnfenceBrokerRecord) apiMessage);
                return;
            case REMOVE_TOPIC_RECORD:
                this.replicationControl.replay((RemoveTopicRecord) apiMessage);
                return;
            case FEATURE_LEVEL_RECORD:
                this.featureControl.replay((FeatureLevelRecord) apiMessage);
                handleFeatureControlChange();
                return;
            case CLIENT_QUOTA_RECORD:
                this.clientQuotaControlManager.replay((ClientQuotaRecord) apiMessage);
                return;
            case PRODUCER_IDS_RECORD:
                this.producerIdControlManager.replay((ProducerIdsRecord) apiMessage);
                return;
            case BROKER_REGISTRATION_CHANGE_RECORD:
                this.clusterControl.replay((BrokerRegistrationChangeRecord) apiMessage);
                return;
            case ACCESS_CONTROL_ENTRY_RECORD:
                this.aclControlManager.replay((AccessControlEntryRecord) apiMessage, optional);
                return;
            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
                this.aclControlManager.replay((RemoveAccessControlEntryRecord) apiMessage, optional);
                return;
            case USER_SCRAM_CREDENTIAL_RECORD:
                this.scramControlManager.replay((UserScramCredentialRecord) apiMessage);
                return;
            case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
                this.scramControlManager.replay((RemoveUserScramCredentialRecord) apiMessage);
                return;
            case NO_OP_RECORD:
                return;
            case ZK_MIGRATION_STATE_RECORD:
                this.featureControl.replay((ZkMigrationStateRecord) apiMessage);
                return;
            case BROKER_REPLICA_EXCLUSION_RECORD:
                this.clusterControl.replay((BrokerReplicaExclusionRecord) apiMessage);
                return;
            case ENCRYPTED_ENVELOPE_RECORD:
                throw new RuntimeException("Nested encrypted envelope records are not supported.");
            case INSTALL_METADATA_ENCRYPTOR_RECORD:
                this.encryptionControl.replay((InstallMetadataEncryptorRecord) apiMessage);
                return;
            case CLUSTER_LINK_RECORD:
                this.clusterLinkControl.replay((ClusterLinkRecord) apiMessage);
                return;
            case REMOVE_CLUSTER_LINK_RECORD:
                this.clusterLinkControl.replay((RemoveClusterLinkRecord) apiMessage, Optional.empty());
                return;
            case MIRROR_TOPIC_RECORD:
                this.mirrorTopicControl.replay((MirrorTopicRecord) apiMessage);
                return;
            case MIRROR_TOPIC_CHANGE_RECORD:
                this.mirrorTopicControl.replay((MirrorTopicChangeRecord) apiMessage);
                return;
            case CELL_RECORD:
                this.cellControl.replay((CellRecord) apiMessage);
                return;
            case REMOVE_CELL_RECORD:
                this.cellControl.replay((RemoveCellRecord) apiMessage);
                return;
            case TENANT_RECORD:
                this.tenantControl.replay((TenantRecord) apiMessage);
                return;
            case REMOVE_TENANT_RECORD:
                this.tenantControl.replay((RemoveTenantRecord) apiMessage);
                return;
            default:
                throw new RuntimeException("Unhandled record type " + fromId);
        }
    }

    public void resetToEmptyState() {
        this.snapshotRegistry.reset();
        this.controllerMetricsManager.reset();
        this.replicationControl.resetConfluentPartitionsPerTopicListener();
        updateLastCommittedState(-1L, -1, -1L);
    }

    private QuorumController(FaultHandler faultHandler, FaultHandler faultHandler2, LogContext logContext, int i, String str, KafkaEventQueue kafkaEventQueue, Time time, KafkaConfigSchema kafkaConfigSchema, RaftClient<ApiMessageAndVersion> raftClient, QuorumFeatures quorumFeatures, short s, int i2, ReplicaPlacer replicaPlacer, OptionalLong optionalLong, OptionalLong optionalLong2, long j, ControllerMetrics controllerMetrics, Optional<CreateTopicPolicy> optional, Optional<AlterConfigPolicy> optional2, ConfigurationValidator configurationValidator, Optional<ClusterMetadataAuthorizer> optional3, Map<String, Object> map, BootstrapMetadata bootstrapMetadata, Function<String, String> function, int i3, int i4, boolean z, Optional<CreateClusterLinkPolicy> optional4, CellAssignor cellAssignor, PartitionPlacementStrategy partitionPlacementStrategy, boolean z2, Optional<TopicPlacement> optional5, CellMetrics cellMetrics) {
        this.lastCommittedOffset = -1L;
        this.lastCommittedEpoch = -1;
        this.lastCommittedTimestamp = -1L;
        this.oldestNonSnapshottedTimestamp = Long.MAX_VALUE;
        this.imbalancedScheduled = ImbalanceSchedule.DEFERRED;
        this.noOpRecordScheduled = false;
        this.nonFatalFaultHandler = faultHandler;
        this.fatalFaultHandler = faultHandler2;
        this.log = logContext.logger(QuorumController.class);
        this.nodeId = i;
        this.clusterId = str;
        this.queue = kafkaEventQueue;
        this.time = time;
        this.controllerMetrics = controllerMetrics;
        this.cellMetrics = cellMetrics;
        this.controllerMetricsManager = new ControllerMetricsManager(controllerMetrics, cellMetrics, Optional.ofNullable(function), i3, s);
        this.snapshotRegistry = new SnapshotRegistry(logContext);
        this.deferredEventQueue = new DeferredEventQueue(logContext);
        this.resourceExists = new ConfigResourceExistenceChecker();
        this.encryptionControl = new EncryptionControlManager(logContext, this.snapshotRegistry, map);
        this.clientQuotaControlManager = new ClientQuotaControlManager(this.snapshotRegistry);
        this.featureControl = new FeatureControlManager.Builder().setLogContext(logContext).setQuorumFeatures(quorumFeatures).setSnapshotRegistry(this.snapshotRegistry).setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).build();
        this.cellControl = new CellControlManager(logContext, this.snapshotRegistry, this.featureControl, cellAssignor, Short.parseShort(map.getOrDefault(ConfluentConfigs.CELL_SIZE_CONFIG, (short) 15).toString()), Short.parseShort(map.getOrDefault(ConfluentConfigs.MIN_CELL_SIZE_CONFIG, (short) 6).toString()), Short.parseShort(map.getOrDefault(ConfluentConfigs.MAX_CELL_SIZE_CONFIG, (short) 15).toString()), s, z2, Boolean.parseBoolean(map.getOrDefault(ConfluentConfigs.ENABLE_CELLS_CONFIG, false).toString()));
        this.tenantControl = new TenantControlManager(logContext, this.featureControl, this.cellControl, partitionPlacementStrategy, s);
        this.clusterControl = new ClusterControlManager.Builder().setLogContext(logContext).setClusterId(str).setTime(time).setSnapshotRegistry(this.snapshotRegistry).setSessionTimeoutNs(j).setReplicaPlacer(replicaPlacer).setControllerMetrics(controllerMetrics).setFeatureControlManager(this.featureControl).setZkMigrationEnabled(z).setCellControlManager(this.cellControl).build();
        this.configurationControl = new ConfigurationControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).setKafkaConfigSchema(kafkaConfigSchema).setExistenceChecker(this.resourceExists).setAlterConfigPolicy(optional2).setValidator(configurationValidator).setStaticConfig(map).setNodeId(i).setEncryptionControlManager(this.encryptionControl).setUsableBrokers(() -> {
            return this.clusterControl.usableBrokers();
        }).setIsTopicPlacementSupport(() -> {
            return Boolean.valueOf(this.featureControl.isTopicPlacementSupported());
        }).build();
        this.producerIdControlManager = new ProducerIdControlManager(this.clusterControl, this.snapshotRegistry);
        this.leaderImbalanceCheckIntervalNs = optionalLong;
        this.mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, logContext, time, this::resolveTopicId, this::resolveClusterLinkId);
        this.maxIdleIntervalNs = optionalLong2;
        this.authorizer = optional3;
        optional3.ifPresent(clusterMetadataAuthorizer -> {
            clusterMetadataAuthorizer.setAclMutator(this);
        });
        this.staticConfig = map;
        this.aclControlManager = new AclControlManager(logContext, this.snapshotRegistry, optional3, this::isValidClusterLink);
        this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(logContext).setDefaultReplicationFactor(s).setDefaultNumPartitions(i2).setMaxElectionsPerImbalance(1000).setConfigurationControl(this.configurationControl).setClusterControl(this.clusterControl).setCreateTopicPolicy(optional).setFeatureControl(this.featureControl).setApplyCreateTopicsPolicyToCreatePartitions(shouldApplyCreateTopicsPolicyToCreatePartitions(map)).setMirrorTopicControl(this.mirrorTopicControl).setNameToTenantCallback(function).setTenantControl(this.tenantControl).setDefaultTopicPlacement(optional5).build();
        SnapshotRegistry snapshotRegistry = this.snapshotRegistry;
        ConfigurationControlManager configurationControlManager = this.configurationControl;
        MirrorTopicControlManager mirrorTopicControlManager = this.mirrorTopicControl;
        FeatureControlManager featureControlManager = this.featureControl;
        ReplicationControlManager replicationControlManager = this.replicationControl;
        replicationControlManager.getClass();
        Consumer consumer = replicationControlManager::unlinkMirrorTopic;
        AclControlManager aclControlManager = this.aclControlManager;
        aclControlManager.getClass();
        this.clusterLinkControl = new ClusterLinkControlManager(snapshotRegistry, logContext, configurationControlManager, mirrorTopicControlManager, featureControlManager, consumer, aclControlManager::unlinkAcls, str, optional4);
        this.scramControlManager = new ScramControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).build();
        this.logReplayTracker = new LogReplayTracker.Builder().setLogContext(logContext).build();
        this.raftClient = raftClient;
        this.bootstrapMetadata = bootstrapMetadata;
        this.maxRecordsPerBatch = i4;
        this.metaLogListener = new QuorumMetaLogListener();
        this.curClaimEpoch = -1;
        this.needToCompleteAuthorizerLoad = optional3.isPresent();
        this.zkRecordConsumer = new MigrationRecordConsumer();
        this.zkMigrationEnabled = z;
        this.recordRedactor = new RecordRedactor(kafkaConfigSchema);
        updateWriteOffset(-1L);
        resetToEmptyState();
        Logger logger = this.log;
        Object[] objArr = new Object[3];
        objArr[0] = str;
        objArr[1] = optional3;
        objArr[2] = z ? " ZK migration mode is enabled." : "";
        logger.info("Creating new QuorumController with clusterId {}, authorizer {}.{}", objArr);
        this.raftClient.register(this.metaLogListener);
    }

    static boolean shouldApplyCreateTopicsPolicyToCreatePartitions(Map<String, Object> map) {
        return Boolean.parseBoolean(map.getOrDefault(ConfluentConfigs.APPLY_CREATE_TOPIC_POLICY_TO_CREATE_PARTITIONS, "false").toString());
    }

    private double maxDemotedBrokersPercentage() {
        return Double.parseDouble(this.staticConfig.getOrDefault(ConfluentConfigs.ALTER_BROKER_HEALTH_MAX_DEMOTED_BROKERS_PERCENTAGE_CONFIG, ConfluentConfigs.ALTER_BROKER_HEALTH_MAX_DEMOTED_BROKERS_PERCENTAGE_DEFAULT).toString());
    }

    private Optional<Uuid> resolveClusterLinkId(String str) {
        return this.clusterLinkControl.getClusterLinkId(str);
    }

    private Boolean isValidClusterLink(Uuid uuid) {
        return Boolean.valueOf(this.clusterLinkControl.isValidLinkId(uuid));
    }

    private Optional<Uuid> resolveTopicId(String str) {
        return Optional.ofNullable(this.replicationControl.getTopicId(str));
    }

    @Override // org.apache.kafka.controller.Controller
    public boolean isMirrorTopic(String str) {
        return this.mirrorTopicControl.isMirrorTopic(str);
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<MetadataResponseData.MetadataResponseBrokerCollection> unfencedBrokerEndpoints(ControllerRequestContext controllerRequestContext, ListenerName listenerName) {
        return appendReadEvent("fetchMetadata", controllerRequestContext.deadlineNs(), () -> {
            return this.clusterControl.unfencedBrokerEndpoints(listenerName);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterPartitionResponseData> alterPartition(ControllerRequestContext controllerRequestContext, AlterPartitionRequestData alterPartitionRequestData) {
        return alterPartitionRequestData.topics().isEmpty() ? CompletableFuture.completedFuture(new AlterPartitionResponseData()) : appendWriteEvent("alterPartition", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.alterPartition(controllerRequestContext, alterPartitionRequestData);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterUserScramCredentialsResponseData> alterUserScramCredentials(ControllerRequestContext controllerRequestContext, AlterUserScramCredentialsRequestData alterUserScramCredentialsRequestData) {
        return (alterUserScramCredentialsRequestData.deletions().isEmpty() && alterUserScramCredentialsRequestData.upsertions().isEmpty()) ? CompletableFuture.completedFuture(new AlterUserScramCredentialsResponseData()) : appendWriteEvent("alterUserScramCredentials", controllerRequestContext.deadlineNs(), () -> {
            return this.scramControlManager.alterCredentials(alterUserScramCredentialsRequestData, this.featureControl.metadataVersion());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<CreateTopicsResponseData> createTopics(ControllerRequestContext controllerRequestContext, CreateTopicsRequestData createTopicsRequestData, Set<String> set) {
        return createTopicsRequestData.topics().isEmpty() ? CompletableFuture.completedFuture(new CreateTopicsResponseData()) : appendWriteEvent("createTopics", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.createTopics(controllerRequestContext, createTopicsRequestData, set);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Void> unregisterBroker(ControllerRequestContext controllerRequestContext, int i) {
        return appendWriteEvent("unregisterBroker", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.unregisterBroker(i);
        }, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(ControllerRequestContext controllerRequestContext, Collection<String> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendReadEvent("findTopicIds", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.findTopicIds(currentReadOffset(), collection);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<String, Uuid>> findAllTopicIds(ControllerRequestContext controllerRequestContext) {
        return appendReadEvent("findAllTopicIds", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.findAllTopicIds(currentReadOffset());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(ControllerRequestContext controllerRequestContext, Collection<Uuid> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendReadEvent("findTopicNames", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.findTopicNames(currentReadOffset(), collection);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext controllerRequestContext, Collection<Uuid> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("deleteTopics", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.deleteTopics(controllerRequestContext, collection);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(ControllerRequestContext controllerRequestContext, Map<ConfigResource, Collection<String>> map) {
        return appendReadEvent("describeConfigs", controllerRequestContext.deadlineNs(), () -> {
            return this.configurationControl.describeConfigs(currentReadOffset(), map);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<ElectLeadersResponseData> electLeaders(ControllerRequestContext controllerRequestContext, ElectLeadersRequestData electLeadersRequestData) {
        return (electLeadersRequestData.topicPartitions() == null || !electLeadersRequestData.topicPartitions().isEmpty()) ? appendWriteEvent("electLeaders", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.electLeaders(electLeadersRequestData);
        }) : CompletableFuture.completedFuture(new ElectLeadersResponseData());
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(ControllerRequestContext controllerRequestContext) {
        return appendReadEvent("getFinalizedFeatures", controllerRequestContext.deadlineNs(), () -> {
            return this.featureControl.finalizedFeatures(currentReadOffset());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(ControllerRequestContext controllerRequestContext, Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> map, boolean z) {
        return map.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("incrementalAlterConfigs", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs = this.configurationControl.incrementalAlterConfigs(map, false, controllerRequestContext.principal());
            return z ? incrementalAlterConfigs.withoutRecords() : incrementalAlterConfigs;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(ControllerRequestContext controllerRequestContext, AlterPartitionReassignmentsRequestData alterPartitionReassignmentsRequestData) {
        return alterPartitionReassignmentsRequestData.topics().isEmpty() ? CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData()) : appendWriteEvent("alterPartitionReassignments", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.alterPartitionReassignments(alterPartitionReassignmentsRequestData, controllerRequestContext.principal());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(ControllerRequestContext controllerRequestContext, ListPartitionReassignmentsRequestData listPartitionReassignmentsRequestData) {
        return (listPartitionReassignmentsRequestData.topics() == null || !listPartitionReassignmentsRequestData.topics().isEmpty()) ? appendReadEvent("listPartitionReassignments", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.listPartitionReassignments(listPartitionReassignmentsRequestData.topics(), currentReadOffset());
        }) : CompletableFuture.completedFuture(new ListPartitionReassignmentsResponseData().setErrorMessage(null));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(ControllerRequestContext controllerRequestContext, Map<ConfigResource, Map<String, String>> map, boolean z) {
        return map.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("legacyAlterConfigs", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs = this.configurationControl.legacyAlterConfigs(map, false, controllerRequestContext.principal());
            return z ? legacyAlterConfigs.withoutRecords() : legacyAlterConfigs;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(ControllerRequestContext controllerRequestContext, BrokerHeartbeatRequestData brokerHeartbeatRequestData) {
        return appendWriteEvent("processBrokerHeartbeat", controllerRequestContext.deadlineNs(), new ControllerWriteOperation<BrokerHeartbeatReply>() { // from class: org.apache.kafka.controller.QuorumController.1
            private final int brokerId;
            private boolean inControlledShutdown = false;
            final /* synthetic */ BrokerHeartbeatRequestData val$request;

            AnonymousClass1(BrokerHeartbeatRequestData brokerHeartbeatRequestData2) {
                r5 = brokerHeartbeatRequestData2;
                this.brokerId = r5.brokerId();
            }

            @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
            public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
                OptionalLong registerBrokerRecordOffset = QuorumController.this.clusterControl.registerBrokerRecordOffset(this.brokerId);
                if (!registerBrokerRecordOffset.isPresent()) {
                    throw new StaleBrokerEpochException(String.format("Receive a heartbeat from broker %d before registration", Integer.valueOf(this.brokerId)));
                }
                ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat = QuorumController.this.replicationControl.processBrokerHeartbeat(r5, registerBrokerRecordOffset.getAsLong());
                this.inControlledShutdown = processBrokerHeartbeat.response().inControlledShutdown();
                QuorumController.this.rescheduleMaybeFenceStaleBrokers();
                return processBrokerHeartbeat;
            }

            @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
            public void processBatchEndOffset(long j) {
                if (this.inControlledShutdown) {
                    QuorumController.this.clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(this.brokerId, j);
                }
            }
        }, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION)).whenComplete((brokerHeartbeatReply, th) -> {
            if (ControllerExceptions.isTimeoutException(th)) {
                this.replicationControl.processExpiredBrokerHeartbeat(brokerHeartbeatRequestData2);
                this.controllerMetrics.incrementTimedOutHeartbeats();
            }
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<BrokerRegistrationReply> registerBroker(ControllerRequestContext controllerRequestContext, BrokerRegistrationRequestData brokerRegistrationRequestData) {
        return appendWriteEvent("registerBroker", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<BrokerRegistrationReply> registerBroker = this.clusterControl.registerBroker(brokerRegistrationRequestData, this.writeOffset + 1, this.featureControl.finalizedFeatures(Long.MAX_VALUE));
            rescheduleMaybeFenceStaleBrokers();
            return registerBroker;
        }, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(ControllerRequestContext controllerRequestContext, Collection<ClientQuotaAlteration> collection, boolean z) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("alterClientQuotas", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<Map<ClientQuotaEntity, ApiError>> alterClientQuotas = this.clientQuotaControlManager.alterClientQuotas(collection);
            return z ? alterClientQuotas.withoutRecords() : alterClientQuotas;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterReplicaExclusionsReply> alterBrokerReplicaExclusions(ControllerRequestContext controllerRequestContext, AlterBrokerReplicaExclusionsRequestData alterBrokerReplicaExclusionsRequestData) {
        Set set = (Set) alterBrokerReplicaExclusionsRequestData.brokersToExclude().stream().map(brokerExclusion -> {
            return new AlterReplicaExclusionOp(brokerExclusion.brokerId(), brokerExclusion.reason(), ExclusionOp.OpType.forId(brokerExclusion.exclusionOperationCode()));
        }).collect(Collectors.toSet());
        return appendWriteEvent("alterBrokerReplicaExclusions", controllerRequestContext.deadlineNs(), () -> {
            return this.clusterControl.processAlterReplicaExclusions(set);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<Integer, String>> describeBrokerReplicaExclusions(ControllerRequestContext controllerRequestContext) {
        OptionalLong deadlineNs = controllerRequestContext.deadlineNs();
        ClusterControlManager clusterControlManager = this.clusterControl;
        clusterControlManager.getClass();
        return appendReadEvent("describeBrokerReplicaExclusions", deadlineNs, clusterControlManager::activeBrokerReplicaExclusions);
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterBrokerHealthResponseData> alterBrokerHealth(ControllerRequestContext controllerRequestContext, AlterBrokerHealthRequestData alterBrokerHealthRequestData) {
        ComponentHealthStatus forId = ComponentHealthStatus.forId(alterBrokerHealthRequestData.statusCode());
        if (forId == ComponentHealthStatus.UNKNOWN) {
            throw new IllegalArgumentException("Invalid health status code from request: " + ((int) alterBrokerHealthRequestData.statusCode()));
        }
        CompletableFuture<AlterBrokerHealthResponseData> appendWriteEvent = appendWriteEvent("alterBrokerHealth", controllerRequestContext.deadlineNs(), () -> {
            return this.clusterControl.processAlterBrokerHealth(alterBrokerHealthRequestData, maxDemotedBrokersPercentage());
        });
        appendWriteEvent.whenComplete((alterBrokerHealthResponseData, th) -> {
            if (alterBrokerHealthResponseData == null || alterBrokerHealthResponseData.errorCode() != Errors.NONE.code()) {
                return;
            }
            if (forId != ComponentHealthStatus.DEGRADED) {
                appendWriteEvent(LEADER_ELECTION_FOR_PROMOTED_BROKERS, OptionalLong.empty(), () -> {
                    return this.replicationControl.tryReelectPromotedLeaders(alterBrokerHealthRequestData.brokerIds());
                });
                return;
            }
            OptionalLong empty = OptionalLong.empty();
            ReplicationControlManager replicationControlManager = this.replicationControl;
            replicationControlManager.getClass();
            appendWriteEvent(LEADER_ELECTION_FOR_DEMOTED_BROKERS, empty, replicationControlManager::tryUnelectDemotedLeaders);
        });
        return appendWriteEvent;
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<DescribeBrokerHealthResponseData> describeBrokerHealth(ControllerRequestContext controllerRequestContext) {
        return appendReadEvent("describeBrokerHealth", controllerRequestContext.deadlineNs(), () -> {
            return toDescribeBrokerHealthResponseData(this.clusterControl.activeBrokerComponentDegradations());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(ControllerRequestContext controllerRequestContext, AllocateProducerIdsRequestData allocateProducerIdsRequestData) {
        return appendWriteEvent("allocateProducerIds", controllerRequestContext.deadlineNs(), () -> {
            return this.producerIdControlManager.generateNextProducerId(allocateProducerIdsRequestData.brokerId(), allocateProducerIdsRequestData.brokerEpoch());
        }).thenApply(producerIdsBlock -> {
            return new AllocateProducerIdsResponseData().setProducerIdStart(producerIdsBlock.firstProducerId()).setProducerIdLen(producerIdsBlock.size());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(ControllerRequestContext controllerRequestContext, UpdateFeaturesRequestData updateFeaturesRequestData) {
        return appendWriteEvent("updateFeatures", controllerRequestContext.deadlineNs(), () -> {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            updateFeaturesRequestData.featureUpdates().forEach(featureUpdateKey -> {
                String feature = featureUpdateKey.feature();
                hashMap2.put(feature, FeatureUpdate.UpgradeType.fromCode(featureUpdateKey.upgradeType()));
                hashMap.put(feature, Short.valueOf(featureUpdateKey.maxVersionLevel()));
            });
            return this.featureControl.updateFeatures(hashMap, hashMap2, this.clusterControl.brokerSupportedVersions(), updateFeaturesRequestData.validateOnly());
        }).thenApply(map -> {
            UpdateFeaturesResponseData updateFeaturesResponseData = new UpdateFeaturesResponseData();
            updateFeaturesResponseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(map.size()));
            map.forEach((str, apiError) -> {
                updateFeaturesResponseData.results().add((UpdateFeaturesResponseData.UpdatableFeatureResultCollection) new UpdateFeaturesResponseData.UpdatableFeatureResult().setFeature(str).setErrorCode(apiError.error().code()).setErrorMessage(apiError.message()));
            });
            return updateFeaturesResponseData;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(ControllerRequestContext controllerRequestContext, List<CreatePartitionsRequestData.CreatePartitionsTopic> list, boolean z) {
        return list.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyList()) : appendWriteEvent("createPartitions", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions = this.replicationControl.createPartitions(controllerRequestContext, list);
            if (z) {
                this.log.debug("Validate-only CreatePartitions result(s): {}", createPartitions.response());
                return createPartitions.withoutRecords();
            }
            this.log.debug("CreatePartitions result(s): {}", createPartitions.response());
            return createPartitions;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<CreateClusterLinksResponseData> createClusterLinks(ControllerRequestContext controllerRequestContext, CreateClusterLinksRequestData createClusterLinksRequestData, KafkaPrincipal kafkaPrincipal) {
        return appendWriteEvent("createClusterLinks", controllerRequestContext.deadlineNs(), clusterLinkSupportedOrThrow(() -> {
            return this.clusterLinkControl.createClusterLinks(createClusterLinksRequestData, kafkaPrincipal);
        }));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<DeleteClusterLinksResponseData> deleteClusterLinks(ControllerRequestContext controllerRequestContext, DeleteClusterLinksRequestData deleteClusterLinksRequestData) {
        return appendWriteEvent("deleteClusterLinks", controllerRequestContext.deadlineNs(), clusterLinkSupportedOrThrow(() -> {
            return this.clusterLinkControl.deleteClusterLinks(deleteClusterLinksRequestData);
        }));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterMirrorTopicsResponseData> alterMirrorTopic(ControllerRequestContext controllerRequestContext, AlterMirrorTopicsRequestData alterMirrorTopicsRequestData) {
        return appendWriteEvent("alterMirrorTopics", controllerRequestContext.deadlineNs(), clusterLinkSupportedOrThrow(() -> {
            return this.mirrorTopicControl.alterMirrorTopics(alterMirrorTopicsRequestData);
        }));
    }

    private <T> ControllerWriteOperation<T> clusterLinkSupportedOrThrow(Supplier<ControllerResult<T>> supplier) {
        return () -> {
            if (this.featureControl.metadataVersion().isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
                return (ControllerResult) supplier.get();
            }
            throw new UnsupportedVersionException("Cluster Linking needs a metadata.version of 3.3 or greater.");
        };
    }

    @Override // org.apache.kafka.metadata.authorizer.AclMutator
    public CompletableFuture<List<AclCreateResult>> createAcls(ControllerRequestContext controllerRequestContext, List<AclBinding> list) {
        return appendWriteEvent("createAcls", controllerRequestContext.deadlineNs(), () -> {
            return this.aclControlManager.createAcls(list);
        });
    }

    @Override // org.apache.kafka.metadata.authorizer.AclMutator
    public CompletableFuture<List<AclDeleteResult>> deleteAcls(ControllerRequestContext controllerRequestContext, List<AclBindingFilter> list) {
        return appendWriteEvent("deleteAcls", controllerRequestContext.deadlineNs(), () -> {
            return this.aclControlManager.deleteAcls(list);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Void> waitForReadyBrokers(int i) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        appendControlEvent("waitForReadyBrokers", () -> {
            this.clusterControl.addReadyBrokersFuture(completableFuture, i);
        });
        return completableFuture;
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<DescribeCellsResponseData> describeCells(ControllerRequestContext controllerRequestContext, DescribeCellsRequest describeCellsRequest) {
        return appendWriteEvent("describeCells", controllerRequestContext.deadlineNs(), () -> {
            return this.cellControl.describeCells(describeCellsRequest, usableBrokers());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<CreateCellResponseData> createCell(ControllerRequestContext controllerRequestContext, CreateCellRequest createCellRequest) {
        return appendWriteEvent("createCell", controllerRequestContext.deadlineNs(), () -> {
            return this.cellControl.createCell(createCellRequest);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<DeleteCellResponseData> deleteCell(ControllerRequestContext controllerRequestContext, DeleteCellRequest deleteCellRequest) {
        return appendWriteEvent("deleteCell", controllerRequestContext.deadlineNs(), () -> {
            return this.cellControl.deleteCell(deleteCellRequest);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterCellResponseData> alterCell(ControllerRequestContext controllerRequestContext, AlterCellRequest alterCellRequest) {
        return appendWriteEvent("alterCell", controllerRequestContext.deadlineNs(), () -> {
            return this.cellControl.alterCell(alterCellRequest, usableBrokers());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AssignBrokersToCellResponseData> assignBrokersToCell(ControllerRequestContext controllerRequestContext, AssignBrokersToCellRequest assignBrokersToCellRequest) {
        return appendWriteEvent("assignBrokersToCell", controllerRequestContext.deadlineNs(), () -> {
            return this.cellControl.assignBrokersToCell(assignBrokersToCellRequest, usableBrokers());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<UnAssignBrokersFromCellResponseData> unassignBrokersFromCell(ControllerRequestContext controllerRequestContext, UnAssignBrokersFromCellRequest unAssignBrokersFromCellRequest) {
        return appendWriteEvent("unassignBrokersFromCell", controllerRequestContext.deadlineNs(), () -> {
            return this.cellControl.unassignBrokersFromCell(unAssignBrokersFromCellRequest, usableBrokers());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AssignTenantsToCellResponseData> assignTenantsToCell(ControllerRequestContext controllerRequestContext, AssignTenantsToCellRequest assignTenantsToCellRequest) {
        return appendWriteEvent("assignTenantsToCell", controllerRequestContext.deadlineNs(), () -> {
            return this.tenantControl.assignTenantsToCell(assignTenantsToCellRequest, usableBrokers());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<DescribeTenantsResponseData> describeTenants(ControllerRequestContext controllerRequestContext, DescribeTenantsRequest describeTenantsRequest) {
        return appendWriteEvent("describeTenants", controllerRequestContext.deadlineNs(), () -> {
            return this.tenantControl.describeTenants(describeTenantsRequest);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<DeleteTenantsResponseData> deleteTenants(ControllerRequestContext controllerRequestContext, DeleteTenantsRequest deleteTenantsRequest) {
        return appendWriteEvent("deleteTenants", controllerRequestContext.deadlineNs(), () -> {
            return this.tenantControl.deleteTenants(deleteTenantsRequest);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public void beginShutdown() {
        this.queue.beginShutdown("QuorumController#beginShutdown");
    }

    public int nodeId() {
        return this.nodeId;
    }

    public String clusterId() {
        return this.clusterId;
    }

    @Override // org.apache.kafka.controller.Controller
    public int curClaimEpoch() {
        return this.curClaimEpoch;
    }

    @Override // org.apache.kafka.controller.Controller, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.queue.close();
        this.controllerMetrics.close();
        this.cellMetrics.close();
    }

    public CountDownLatch pause() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        appendControlEvent("pause", () -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                this.log.info("Interrupted while waiting for unpause.", (Throwable) e);
            }
        });
        return countDownLatch;
    }

    Time time() {
        return this.time;
    }

    private Set<Integer> usableBrokers() {
        HashSet hashSet = new HashSet();
        this.clusterControl.usableBrokers().forEachRemaining(usableBroker -> {
            hashSet.add(Integer.valueOf(usableBroker.id()));
        });
        return hashSet;
    }

    private DescribeBrokerHealthResponseData toDescribeBrokerHealthResponseData(Map<Integer, Set<DegradedBrokerHealthState>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, Set<DegradedBrokerHealthState>> entry : map.entrySet()) {
            ArrayList arrayList2 = new ArrayList();
            entry.getValue().forEach(degradedBrokerHealthState -> {
                arrayList2.add(new DescribeBrokerHealthResponseData.DegradedBrokerComponent().setReason(degradedBrokerHealthState.reason()).setComponentCode(degradedBrokerHealthState.component().id()));
            });
            arrayList.add(new DescribeBrokerHealthResponseData.DegradedBroker().setBrokerId(entry.getKey().intValue()).setDegradedBrokerComponents(arrayList2));
        }
        return new DescribeBrokerHealthResponseData().setErrorCode(Errors.NONE.code()).setDegradedBrokers(arrayList);
    }

    /* synthetic */ QuorumController(FaultHandler faultHandler, FaultHandler faultHandler2, LogContext logContext, int i, String str, KafkaEventQueue kafkaEventQueue, Time time, KafkaConfigSchema kafkaConfigSchema, RaftClient raftClient, QuorumFeatures quorumFeatures, short s, int i2, ReplicaPlacer replicaPlacer, OptionalLong optionalLong, OptionalLong optionalLong2, long j, ControllerMetrics controllerMetrics, Optional optional, Optional optional2, ConfigurationValidator configurationValidator, Optional optional3, Map map, BootstrapMetadata bootstrapMetadata, Function function, int i3, int i4, boolean z, Optional optional4, CellAssignor cellAssignor, PartitionPlacementStrategy partitionPlacementStrategy, boolean z2, Optional optional5, CellMetrics cellMetrics, AnonymousClass1 anonymousClass1) {
        this(faultHandler, faultHandler2, logContext, i, str, kafkaEventQueue, time, kafkaConfigSchema, raftClient, quorumFeatures, s, i2, replicaPlacer, optionalLong, optionalLong2, j, controllerMetrics, optional, optional2, configurationValidator, optional3, map, bootstrapMetadata, function, i3, i4, z, optional4, cellAssignor, partitionPlacementStrategy, z2, optional5, cellMetrics);
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.controller.QuorumController.access$3902(org.apache.kafka.controller.QuorumController, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$3902(org.apache.kafka.controller.QuorumController r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.oldestNonSnapshottedTimestamp = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.controller.QuorumController.access$3902(org.apache.kafka.controller.QuorumController, long):long");
    }
}
