package org.apache.kafka.raft;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.raft.RaftResponse;
import org.apache.kafka.raft.ValidOffsetAndEpoch;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.generated.QuorumStateData;
import org.apache.kafka.raft.internals.AddVoterHandler;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.raft.internals.BlockingMessageQueue;
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.DefaultRequestSender;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.RemoveVoterHandler;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.raft.internals.UpdateVoterHandler;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient.class */
public final class KafkaRaftClient<T> implements RaftClient<T> {
    private static final int RETRY_BACKOFF_BASE_MS = 100;
    private static final int MAX_NUMBER_OF_BATCHES = 10;
    public static final int MAX_FETCH_WAIT_MS = 500;
    public static final int MAX_BATCH_SIZE_BYTES = 8388608;
    public static final int MAX_FETCH_SIZE_BYTES = 8388608;
    private final OptionalInt nodeId;
    private final Uuid nodeDirectoryId;
    private final AtomicReference<KafkaRaftClient<T>.GracefulShutdown> shutdown;
    private final LogContext logContext;
    private final Logger logger;
    private final Time time;
    private final int fetchMaxWaitMs;
    private final boolean followersAlwaysFlush;
    private final String clusterId;
    private final Endpoints localListeners;
    private final SupportedVersionRange localSupportedKRaftVersion;
    private final NetworkChannel channel;
    private final ReplicatedLog log;
    private final Random random;
    private final FuturePurgatory<Long> appendPurgatory;
    private final FuturePurgatory<Long> fetchPurgatory;
    private final RecordSerde<T> serde;
    private final MemoryPool memoryPool;
    private final RaftMessageQueue messageQueue;
    private final QuorumConfig quorumConfig;
    private final RaftMetadataLogCleanerManager snapshotCleaner;
    private final Map<RaftClient.Listener<T>, KafkaRaftClient<T>.ListenerContext> listenerContexts;
    private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations;
    private volatile KRaftControlRecordStateMachine partitionState;
    private volatile KafkaRaftMetrics kafkaRaftMetrics;
    private volatile QuorumState quorum;
    private volatile RequestManager requestManager;
    private volatile AddVoterHandler addVoterHandler;
    private volatile RemoveVoterHandler removeVoterHandler;
    private volatile UpdateVoterHandler updateVoterHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.raft.KafkaRaftClient$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$ApiKeys;
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$raft$ValidOffsetAndEpoch$Kind = new int[ValidOffsetAndEpoch.Kind.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$raft$ValidOffsetAndEpoch$Kind[ValidOffsetAndEpoch.Kind.DIVERGING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$raft$ValidOffsetAndEpoch$Kind[ValidOffsetAndEpoch.Kind.SNAPSHOT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$kafka$common$protocol$ApiKeys = new int[ApiKeys.values().length];
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.VOTE.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.BEGIN_QUORUM_EPOCH.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.END_QUORUM_EPOCH.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH_SNAPSHOT.ordinal()] = 5;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.API_VERSIONS.ordinal()] = 6;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.UPDATE_RAFT_VOTER.ordinal()] = 7;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DESCRIBE_QUORUM.ordinal()] = 8;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.ADD_RAFT_VOTER.ordinal()] = 9;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.REMOVE_RAFT_VOTER.ordinal()] = KafkaRaftClient.MAX_NUMBER_OF_BATCHES;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$GracefulShutdown.class */
    public class GracefulShutdown {
        final Timer finishTimer;
        final CompletableFuture<Void> completeFuture;

        public GracefulShutdown(long j, CompletableFuture<Void> completableFuture) {
            this.finishTimer = KafkaRaftClient.this.time.timer(j);
            this.completeFuture = completableFuture;
        }

        public void update(long j) {
            this.finishTimer.update(j);
        }

        public boolean hasTimedOut() {
            return this.finishTimer.isExpired();
        }

        public boolean isFinished() {
            return this.completeFuture.isDone();
        }

        public long remainingTimeMs() {
            return this.finishTimer.remainingMs();
        }

        public void failWithTimeout() {
            KafkaRaftClient.this.logger.warn("Graceful shutdown timed out after {}ms", Long.valueOf(this.finishTimer.timeoutMs()));
            this.completeFuture.completeExceptionally(new TimeoutException("Timeout expired before graceful shutdown completed"));
        }

        public void complete() {
            KafkaRaftClient.this.logger.info("Graceful shutdown completed");
            this.completeFuture.complete(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$ListenerContext.class */
    public final class ListenerContext implements CloseListener<BatchReader<T>> {
        private static final long STARTING_NEXT_OFFSET = -1;
        private static final long SMALLEST_LOG_OFFSET = 0;
        private final RaftClient.Listener<T> listener;
        private LeaderAndEpoch lastFiredLeaderChange;
        private BatchReader<T> lastSent;
        private long nextOffset;

        private ListenerContext(RaftClient.Listener<T> listener) {
            this.lastFiredLeaderChange = LeaderAndEpoch.UNKNOWN;
            this.lastSent = null;
            this.nextOffset = STARTING_NEXT_OFFSET;
            this.listener = listener;
        }

        private synchronized long nextOffset() {
            return this.nextOffset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void resetOffsetToSmallestLogOffset() {
            this.nextOffset = SMALLEST_LOG_OFFSET;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized OptionalLong nextExpectedOffset() {
            if (this.lastSent == null) {
                return OptionalLong.of(this.nextOffset);
            }
            OptionalLong lastOffset = this.lastSent.lastOffset();
            return lastOffset.isPresent() ? OptionalLong.of(lastOffset.getAsLong() + 1) : OptionalLong.empty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fireHandleSnapshot(SnapshotReader<T> snapshotReader) {
            synchronized (this) {
                this.nextOffset = snapshotReader.snapshotId().offset();
                this.lastSent = null;
            }
            KafkaRaftClient.this.logger.debug("Notifying listener {} of snapshot {}", listenerName(), snapshotReader.snapshotId());
            this.listener.handleLoadSnapshot(snapshotReader);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fireHandleCommit(long j, Records records) {
            fireHandleCommit(RecordsBatchReader.of(j, records, KafkaRaftClient.this.serde, BufferSupplier.create(), 8388608, this, true));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fireHandleCommit(long j, int i, long j2, int i2, List<T> list) {
            fireHandleCommit(MemoryBatchReader.of(Collections.singletonList(Batch.data(j, i, j2, i2, list)), this));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String listenerName() {
            return KafkaRaftClient.listenerName(this.listener);
        }

        private void fireHandleCommit(BatchReader<T> batchReader) {
            synchronized (this) {
                this.lastSent = batchReader;
            }
            KafkaRaftClient.this.logger.debug("Notifying listener {} of batch for baseOffset {} and lastOffset {}", new Object[]{listenerName(), Long.valueOf(batchReader.baseOffset()), batchReader.lastOffset()});
            this.listener.handleCommit(batchReader);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (shouldFireLeaderChange(leaderAndEpoch)) {
                this.lastFiredLeaderChange = leaderAndEpoch;
                KafkaRaftClient.this.logger.debug("Notifying listener {} of leader change {}", listenerName(), leaderAndEpoch);
                this.listener.handleLeaderChange(leaderAndEpoch);
            }
        }

        private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (leaderAndEpoch.equals(this.lastFiredLeaderChange)) {
                return false;
            }
            if (leaderAndEpoch.epoch() > this.lastFiredLeaderChange.epoch()) {
                return true;
            }
            return leaderAndEpoch.leaderId().isPresent() && !this.lastFiredLeaderChange.leaderId().isPresent();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long j) {
            if (!shouldFireLeaderChange(leaderAndEpoch) || nextOffset() <= j) {
                return;
            }
            this.lastFiredLeaderChange = leaderAndEpoch;
            KafkaRaftClient.this.logger.debug("Notifying listener {} of new leadership {}", listenerName(), leaderAndEpoch);
            this.listener.handleLeaderChange(leaderAndEpoch);
        }

        @Override // org.apache.kafka.raft.internals.CloseListener
        public synchronized void onClose(BatchReader<T> batchReader) {
            OptionalLong lastOffset = batchReader.lastOffset();
            if (lastOffset.isPresent()) {
                this.nextOffset = lastOffset.getAsLong() + 1;
            }
            if (this.lastSent == batchReader) {
                this.lastSent = null;
                KafkaRaftClient.this.wakeup();
            }
        }

        /* synthetic */ ListenerContext(KafkaRaftClient kafkaRaftClient, RaftClient.Listener listener, AnonymousClass1 anonymousClass1) {
            this(listener);
        }
    }

    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$RaftMetadataLogCleanerManager.class */
    private static class RaftMetadataLogCleanerManager {
        private final Logger logger;
        private final Timer timer;
        private final long delayMs;
        private final Runnable cleaner;

        RaftMetadataLogCleanerManager(Logger logger, Time time, long j, Runnable runnable) {
            this.logger = logger;
            this.timer = time.timer(j);
            this.delayMs = j;
            this.cleaner = runnable;
        }

        public long maybeClean(long j) {
            this.timer.update(j);
            if (this.timer.isExpired()) {
                try {
                    this.cleaner.run();
                } catch (Throwable th) {
                    this.logger.error("Had an error during log cleaning", th);
                }
                this.timer.reset(this.delayMs);
            }
            return this.timer.remainingMs();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$Registration.class */
    public static final class Registration<T> {
        private final Ops ops;
        private final RaftClient.Listener<T> listener;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/raft/KafkaRaftClient$Registration$Ops.class */
        public enum Ops {
            REGISTER,
            UNREGISTER
        }

        private Registration(Ops ops, RaftClient.Listener<T> listener) {
            this.ops = ops;
            this.listener = listener;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Ops ops() {
            return this.ops;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RaftClient.Listener<T> listener() {
            return this.listener;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> Registration<T> register(RaftClient.Listener<T> listener) {
            return new Registration<>(Ops.REGISTER, listener);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> Registration<T> unregister(RaftClient.Listener<T> listener) {
            return new Registration<>(Ops.UNREGISTER, listener);
        }
    }

    public KafkaRaftClient(OptionalInt optionalInt, Uuid uuid, RecordSerde<T> recordSerde, NetworkChannel networkChannel, ReplicatedLog replicatedLog, Time time, ExpirationService expirationService, LogContext logContext, boolean z, String str, Collection<InetSocketAddress> collection, Endpoints endpoints, SupportedVersionRange supportedVersionRange, QuorumConfig quorumConfig) {
        this(optionalInt, uuid, recordSerde, networkChannel, new BlockingMessageQueue(), replicatedLog, new BatchMemoryPool(5, 8388608), time, expirationService, MAX_FETCH_WAIT_MS, z, str, collection, endpoints, supportedVersionRange, logContext, new Random(), quorumConfig);
    }

    KafkaRaftClient(OptionalInt optionalInt, Uuid uuid, RecordSerde<T> recordSerde, NetworkChannel networkChannel, RaftMessageQueue raftMessageQueue, ReplicatedLog replicatedLog, MemoryPool memoryPool, Time time, ExpirationService expirationService, int i, boolean z, String str, Collection<InetSocketAddress> collection, Endpoints endpoints, SupportedVersionRange supportedVersionRange, LogContext logContext, Random random, QuorumConfig quorumConfig) {
        this.shutdown = new AtomicReference<>();
        this.listenerContexts = new IdentityHashMap();
        this.pendingRegistrations = new ConcurrentLinkedQueue<>();
        if (uuid.equals(Uuid.ZERO_UUID)) {
            throw new IllegalArgumentException("The node directory id must be set and not be the zero uuid");
        }
        this.nodeId = optionalInt;
        this.nodeDirectoryId = uuid;
        this.logContext = logContext;
        this.serde = recordSerde;
        this.channel = networkChannel;
        this.messageQueue = raftMessageQueue;
        this.log = replicatedLog;
        this.memoryPool = memoryPool;
        this.fetchPurgatory = new ThresholdPurgatory(expirationService);
        this.appendPurgatory = new ThresholdPurgatory(expirationService);
        this.time = time;
        this.clusterId = str;
        this.localListeners = endpoints;
        this.localSupportedKRaftVersion = supportedVersionRange;
        this.fetchMaxWaitMs = i;
        this.followersAlwaysFlush = z;
        this.logger = logContext.logger(KafkaRaftClient.class);
        this.random = random;
        this.quorumConfig = quorumConfig;
        Logger logger = this.logger;
        replicatedLog.getClass();
        this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 60000L, replicatedLog::maybeClean);
        if (collection.isEmpty()) {
            return;
        }
        AtomicInteger atomicInteger = new AtomicInteger(-2);
        List list = (List) collection.stream().map(inetSocketAddress -> {
            return new Node(atomicInteger.getAndDecrement(), inetSocketAddress.getHostString(), inetSocketAddress.getPort());
        }).collect(Collectors.toList());
        this.logger.info("Starting request manager with bootstrap servers: {}", list);
        this.requestManager = new RequestManager(list, quorumConfig.retryBackoffMs(), quorumConfig.requestTimeoutMs(), random);
    }

    private void updateFollowerHighWatermark(FollowerState followerState, OptionalLong optionalLong) {
        optionalLong.ifPresent(j -> {
            long min = Math.min(endOffset().offset(), j);
            if (followerState.updateHighWatermark(OptionalLong.of(min))) {
                this.logger.debug("Follower high watermark updated to {}", Long.valueOf(min));
                this.log.updateHighWatermark(new LogOffsetMetadata(min));
                updateListenersProgress(min);
            }
        });
    }

    private void updateLeaderEndOffsetAndTimestamp(LeaderState<T> leaderState, long j) {
        LogOffsetMetadata endOffset = this.log.endOffset();
        if (leaderState.updateLocalState(endOffset, this.partitionState.lastVoterSet())) {
            onUpdateLeaderHighWatermark(leaderState, j);
        }
        this.fetchPurgatory.maybeComplete(Long.valueOf(endOffset.offset()), j);
    }

    private void onUpdateLeaderHighWatermark(LeaderState<T> leaderState, long j) {
        leaderState.highWatermark().ifPresent(logOffsetMetadata -> {
            this.logger.debug("Leader high watermark updated to {}", logOffsetMetadata);
            this.log.updateHighWatermark(logOffsetMetadata);
            this.addVoterHandler.highWatermarkUpdated(leaderState);
            this.removeVoterHandler.highWatermarkUpdated(leaderState);
            this.appendPurgatory.maybeComplete(Long.valueOf(logOffsetMetadata.offset()), j);
            updateListenersProgress(logOffsetMetadata.offset());
        });
    }

    private void updateListenersProgress(long j) {
        for (KafkaRaftClient<T>.ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(j2 -> {
                if (j2 < j && ((j2 == -1 || j2 < this.log.startOffset()) && latestSnapshot().isPresent())) {
                    listenerContext.fireHandleSnapshot(latestSnapshot().get());
                } else if (j2 == -1) {
                    this.logger.info("Setting the next offset of {} to {} since there are no snapshots", listenerContext.listenerName(), 0L);
                    listenerContext.resetOffsetToSmallestLogOffset();
                } else if (j2 < this.log.startOffset()) {
                    throw new IllegalStateException(String.format("Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d", listenerContext.listenerName(), Long.valueOf(j2), Long.valueOf(this.log.startOffset()), Long.valueOf(j)));
                }
            });
            listenerContext.nextExpectedOffset().ifPresent(j3 -> {
                if (j3 < j) {
                    listenerContext.fireHandleCommit(j3, this.log.read(j3, Isolation.COMMITTED).records);
                }
            });
        }
    }

    private Optional<SnapshotReader<T>> latestSnapshot() {
        return (Optional<SnapshotReader<T>>) this.log.latestSnapshot().map(rawSnapshotReader -> {
            return RecordsSnapshotReader.of(rawSnapshotReader, this.serde, BufferSupplier.create(), 8388608, true);
        });
    }

    private void maybeFireHandleCommit(long j, int i, long j2, int i2, List<T> list) {
        for (KafkaRaftClient<T>.ListenerContext listenerContext : this.listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(j3 -> {
                if (j3 == j) {
                    listenerContext.fireHandleCommit(j, i, j2, i2, list);
                }
            });
        }
    }

    private void maybeFireLeaderChange(LeaderState<T> leaderState) {
        Iterator<KafkaRaftClient<T>.ListenerContext> it = this.listenerContexts.values().iterator();
        while (it.hasNext()) {
            it.next().maybeFireLeaderChange(this.quorum.leaderAndEpoch(), leaderState.epochStartOffset());
        }
    }

    private void maybeFireLeaderChange() {
        Iterator<KafkaRaftClient<T>.ListenerContext> it = this.listenerContexts.values().iterator();
        while (it.hasNext()) {
            it.next().maybeFireLeaderChange(this.quorum.leaderAndEpoch());
        }
    }

    public void initialize(Map<Integer, InetSocketAddress> map, QuorumStateStore quorumStateStore, Metrics metrics) {
        this.partitionState = new KRaftControlRecordStateMachine(map.isEmpty() ? VoterSet.empty() : VoterSet.fromInetSocketAddresses(this.channel.listenerName(), map), this.log, this.serde, BufferSupplier.create(), 8388608, this.logContext);
        this.logger.info("Reading KRaft snapshot and log as part of the initialization");
        this.partitionState.updateState();
        this.logger.info("Starting voters are {}", this.partitionState.lastVoterSet());
        if (this.requestManager == null) {
            if (map.isEmpty()) {
                throw new ConfigException(String.format("Missing kraft bootstrap servers. Must specify a value for %s.", QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG));
            }
            List list = (List) map.entrySet().stream().map(entry -> {
                return new Node(((Integer) entry.getKey()).intValue(), ((InetSocketAddress) entry.getValue()).getHostString(), ((InetSocketAddress) entry.getValue()).getPort());
            }).collect(Collectors.toList());
            this.logger.info("Starting request manager with static voters: {}", list);
            this.requestManager = new RequestManager(list, this.quorumConfig.retryBackoffMs(), this.quorumConfig.requestTimeoutMs(), this.random);
        }
        this.quorum = new QuorumState(this.nodeId, this.nodeDirectoryId, this.partitionState, this.localListeners, this.localSupportedKRaftVersion, this.quorumConfig.electionTimeoutMs(), this.quorumConfig.fetchTimeoutMs(), quorumStateStore, this.time, this.logContext, this.random);
        this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", this.quorum);
        this.kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
        this.quorum.initialize(new OffsetAndEpoch(this.log.endOffset().offset(), this.log.lastFetchedEpoch()));
        long milliseconds = this.time.milliseconds();
        if (this.quorum.isLeader()) {
            throw new IllegalStateException("Voter cannot initialize as a Leader");
        }
        if (this.quorum.isCandidate()) {
            onBecomeCandidate(milliseconds);
        } else if (this.quorum.isFollower()) {
            onBecomeFollower(milliseconds);
        }
        if (this.quorum.isOnlyVoter() && !this.quorum.isCandidate()) {
            transitionToCandidate(milliseconds);
        }
        this.addVoterHandler = new AddVoterHandler(this.partitionState, new DefaultRequestSender(this.requestManager, this.channel, this.messageQueue, this.logContext), this.time, this.logContext);
        this.removeVoterHandler = new RemoveVoterHandler(this.nodeId, this.nodeDirectoryId, this.partitionState, this.time, this.quorumConfig.requestTimeoutMs(), this.logContext);
        this.updateVoterHandler = new UpdateVoterHandler(this.nodeId, this.partitionState, this.channel.listenerName(), this.time, this.quorumConfig.requestTimeoutMs());
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void register(RaftClient.Listener<T> listener) {
        this.pendingRegistrations.add(Registration.register(listener));
        wakeup();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void unregister(RaftClient.Listener<T> listener) {
        this.pendingRegistrations.add(Registration.unregister(listener));
    }

    @Override // org.apache.kafka.raft.RaftClient
    public LeaderAndEpoch leaderAndEpoch() {
        return isInitialized() ? this.quorum.leaderAndEpoch() : LeaderAndEpoch.UNKNOWN;
    }

    @Override // org.apache.kafka.raft.RaftClient
    public OptionalInt nodeId() {
        return this.nodeId;
    }

    private OffsetAndEpoch endOffset() {
        return new OffsetAndEpoch(this.log.endOffset().offset(), this.log.lastFetchedEpoch());
    }

    private void resetConnections() {
        this.requestManager.resetAll();
    }

    private void onBecomeLeader(long j) {
        long offset = this.log.endOffset().offset();
        LeaderState<T> transitionToLeader = this.quorum.transitionToLeader(offset, new BatchAccumulator<>(this.quorum.epoch(), offset, this.quorumConfig.appendLingerMs(), 8388608, MAX_NUMBER_OF_BATCHES, this.memoryPool, this.time, Compression.NONE, this.serde));
        this.log.initializeLeaderEpoch(this.quorum.epoch());
        transitionToLeader.appendStartOfEpochControlRecords(this.quorum.localVoterNodeOrThrow(), j);
        resetConnections();
        this.kafkaRaftMetrics.maybeUpdateElectionLatency(j);
    }

    private void flushLeaderLog(LeaderState<T> leaderState, long j) {
        updateLeaderEndOffsetAndTimestamp(leaderState, j);
        this.log.flush(false);
    }

    private boolean maybeTransitionToLeader(CandidateState candidateState, long j) {
        if (!candidateState.isVoteGranted()) {
            return false;
        }
        onBecomeLeader(j);
        return true;
    }

    private void onBecomeCandidate(long j) {
        if (maybeTransitionToLeader(this.quorum.candidateStateOrThrow(), j)) {
            return;
        }
        resetConnections();
        this.kafkaRaftMetrics.updateElectionStartMs(j);
    }

    private void transitionToCandidate(long j) {
        this.quorum.transitionToCandidate();
        maybeFireLeaderChange();
        onBecomeCandidate(j);
    }

    private void transitionToUnattached(int i) {
        this.quorum.transitionToUnattached(i);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void transitionToResigned(List<ReplicaKey> list) {
        this.fetchPurgatory.completeAllExceptionally(Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
        this.quorum.transitionToResigned(list);
        resetConnections();
    }

    private void transitionToUnattachedVoted(ReplicaKey replicaKey, int i) {
        this.quorum.transitionToUnattachedVotedState(i, replicaKey);
    }

    private void onBecomeFollower(long j) {
        this.kafkaRaftMetrics.maybeUpdateElectionLatency(j);
        resetConnections();
        this.fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException("Cannot process the fetch request because the node is no longer the leader."));
        this.appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException("Failed to receive sufficient acknowledgments for this append before leader change."));
    }

    private void transitionToFollower(int i, int i2, Endpoints endpoints, long j) {
        if (endpoints.isEmpty()) {
            throw new IllegalArgumentException(String.format("Unknown leader endpoints (%s) after request or response with leader (%s) and the voters %s", endpoints, Integer.valueOf(i2), this.partitionState.lastVoterSet()));
        }
        this.quorum.transitionToFollower(i, i2, endpoints);
        maybeFireLeaderChange();
        onBecomeFollower(j);
    }

    private VoteResponseData buildVoteResponse(ListenerName listenerName, short s, Errors errors, boolean z) {
        return RaftUtil.singletonVoteResponse(listenerName, s, Errors.NONE, this.log.topicPartition(), errors, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), z, this.quorum.leaderEndpoints());
    }

    private VoteResponseData handleVoteRequest(RaftRequest.Inbound inbound) {
        VoteRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        VoteRequestData.PartitionData partitionData = (VoteRequestData.PartitionData) ((VoteRequestData.TopicData) data.topics().get(0)).partitions().get(0);
        int candidateId = partitionData.candidateId();
        int candidateEpoch = partitionData.candidateEpoch();
        int lastOffsetEpoch = partitionData.lastOffsetEpoch();
        long lastOffset = partitionData.lastOffset();
        if (lastOffset < 0 || lastOffsetEpoch < 0 || lastOffsetEpoch >= candidateEpoch) {
            return buildVoteResponse(inbound.listenerName(), inbound.apiVersion(), Errors.INVALID_REQUEST, false);
        }
        Optional<Errors> validateVoterOnlyRequest = validateVoterOnlyRequest(candidateId, candidateEpoch);
        if (validateVoterOnlyRequest.isPresent()) {
            return buildVoteResponse(inbound.listenerName(), inbound.apiVersion(), validateVoterOnlyRequest.get(), false);
        }
        if (candidateEpoch > this.quorum.epoch()) {
            transitionToUnattached(candidateEpoch);
        }
        Optional<ReplicaKey> voteRequestVoterKey = RaftUtil.voteRequestVoterKey(data, partitionData);
        if (!isValidVoterKey(voteRequestVoterKey)) {
            this.logger.info("Candidate sent a voter key ({}) in the VOTE request that doesn't match the local key ({}, {}); rejecting the vote", new Object[]{voteRequestVoterKey, this.nodeId, this.nodeDirectoryId});
            return buildVoteResponse(inbound.listenerName(), inbound.apiVersion(), Errors.INVALID_VOTER_KEY, false);
        }
        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(lastOffset, lastOffsetEpoch);
        ReplicaKey of = ReplicaKey.of(candidateId, partitionData.candidateDirectoryId());
        boolean canGrantVote = this.quorum.canGrantVote(of, offsetAndEpoch.compareTo(endOffset()) >= 0);
        if (canGrantVote && this.quorum.isUnattachedNotVoted()) {
            transitionToUnattachedVoted(of, candidateEpoch);
        }
        Logger logger = this.logger;
        Object[] objArr = new Object[3];
        objArr[0] = data;
        objArr[1] = Integer.valueOf(candidateEpoch);
        objArr[2] = canGrantVote ? "granted" : "rejected";
        logger.info("Vote request {} with epoch {} is {}", objArr);
        return buildVoteResponse(inbound.listenerName(), inbound.apiVersion(), Errors.NONE, canGrantVote);
    }

    private boolean handleVoteResponse(RaftResponse.Inbound inbound, long j) {
        int id = inbound.source().id();
        VoteResponseData data = inbound.data();
        Errors forCode = Errors.forCode(data.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return false;
        }
        VoteResponseData.PartitionData partitionData = (VoteResponseData.PartitionData) ((VoteResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        OptionalInt optionalLeaderId = optionalLeaderId(partitionData.leaderId());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId, partitionData.leaderEpoch(), optionalLeaderId.isPresent() ? data.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(optionalLeaderId.getAsInt()) : Endpoints.fromVoteResponse(this.channel.listenerName(), optionalLeaderId.getAsInt(), data.nodeEndpoints()) : Endpoints.empty(), inbound.source(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        if (this.quorum.isLeader()) {
            this.logger.debug("Ignoring vote response {} since we already became leader for epoch {}", partitionData, Integer.valueOf(this.quorum.epoch()));
            return true;
        }
        if (!this.quorum.isCandidate()) {
            this.logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}", partitionData, Integer.valueOf(this.quorum.epoch()));
            return true;
        }
        CandidateState candidateStateOrThrow = this.quorum.candidateStateOrThrow();
        if (partitionData.voteGranted()) {
            candidateStateOrThrow.recordGrantedVote(id);
            maybeTransitionToLeader(candidateStateOrThrow, j);
            return true;
        }
        candidateStateOrThrow.recordRejectedVote(id);
        if (!candidateStateOrThrow.isVoteRejected() || candidateStateOrThrow.isBackingOff()) {
            return true;
        }
        this.logger.info("Insufficient remaining votes to become leader (rejected by {}). We will backoff before retrying election again", candidateStateOrThrow.rejectingVoters());
        candidateStateOrThrow.startBackingOff(j, binaryExponentialElectionBackoffMs(candidateStateOrThrow.retries()));
        return true;
    }

    private int binaryExponentialElectionBackoffMs(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("Retries " + i + " should be larger than zero");
        }
        return Math.min(RETRY_BACKOFF_BASE_MS * this.random.nextInt(2 << Math.min(20, i - 1)), this.quorumConfig.electionBackoffMaxMs());
    }

    private int strictExponentialElectionBackoffMs(int i, int i2) {
        if (i == 0) {
            return 0;
        }
        if (i < 0 || i >= i2) {
            return this.quorumConfig.electionBackoffMaxMs();
        }
        return Math.min(this.quorumConfig.electionBackoffMaxMs(), (this.quorumConfig.electionBackoffMaxMs() >> (i2 - 1)) << (i - 1));
    }

    private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(ListenerName listenerName, short s, Errors errors) {
        return RaftUtil.singletonBeginQuorumEpochResponse(listenerName, s, Errors.NONE, this.log.topicPartition(), errors, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints());
    }

    private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(RaftRequest.Inbound inbound, long j) {
        BeginQuorumEpochRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        BeginQuorumEpochRequestData.PartitionData partitionData = (BeginQuorumEpochRequestData.PartitionData) ((BeginQuorumEpochRequestData.TopicData) data.topics().get(0)).partitions().get(0);
        int leaderId = partitionData.leaderId();
        int leaderEpoch = partitionData.leaderEpoch();
        Optional<Errors> validateVoterOnlyRequest = validateVoterOnlyRequest(leaderId, leaderEpoch);
        if (validateVoterOnlyRequest.isPresent()) {
            return buildBeginQuorumEpochResponse(inbound.listenerName(), inbound.apiVersion(), validateVoterOnlyRequest.get());
        }
        maybeTransition(OptionalInt.of(leaderId), leaderEpoch, data.leaderEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(leaderId) : Endpoints.fromBeginQuorumEpochRequest(data.leaderEndpoints()), j);
        Optional<ReplicaKey> beginQuorumEpochRequestVoterKey = RaftUtil.beginQuorumEpochRequestVoterKey(data, partitionData);
        if (isValidVoterKey(beginQuorumEpochRequestVoterKey)) {
            return buildBeginQuorumEpochResponse(inbound.listenerName(), inbound.apiVersion(), Errors.NONE);
        }
        this.logger.info("Leader sent a voter key ({}) in the BEGIN_QUORUM_EPOCH request that doesn't match the local key ({}, {}); returning INVALID_VOTER_KEY", new Object[]{beginQuorumEpochRequestVoterKey, this.nodeId, this.nodeDirectoryId});
        return buildBeginQuorumEpochResponse(inbound.listenerName(), inbound.apiVersion(), Errors.INVALID_VOTER_KEY);
    }

    private boolean handleBeginQuorumEpochResponse(RaftResponse.Inbound inbound, long j) {
        int id = inbound.source().id();
        BeginQuorumEpochResponseData data = inbound.data();
        Errors forCode = Errors.forCode(data.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return false;
        }
        BeginQuorumEpochResponseData.PartitionData partitionData = (BeginQuorumEpochResponseData.PartitionData) ((BeginQuorumEpochResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        OptionalInt optionalLeaderId = optionalLeaderId(partitionData.leaderId());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId, partitionData.leaderEpoch(), optionalLeaderId.isPresent() ? data.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(optionalLeaderId.getAsInt()) : Endpoints.fromBeginQuorumEpochResponse(this.channel.listenerName(), optionalLeaderId.getAsInt(), data.nodeEndpoints()) : Endpoints.empty(), inbound.source(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        if (this.quorum.isLeader()) {
            this.quorum.leaderStateOrThrow().addAcknowledgementFrom(id);
            return true;
        }
        this.logger.debug("Ignoring BeginQuorumEpoch response {} since this node is not the leader anymore", data);
        return true;
    }

    private EndQuorumEpochResponseData buildEndQuorumEpochResponse(ListenerName listenerName, short s, Errors errors) {
        return RaftUtil.singletonEndQuorumEpochResponse(listenerName, s, Errors.NONE, this.log.topicPartition(), errors, this.quorum.epoch(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints());
    }

    private EndQuorumEpochResponseData handleEndQuorumEpochRequest(RaftRequest.Inbound inbound, long j) {
        EndQuorumEpochRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }
        EndQuorumEpochRequestData.PartitionData partitionData = (EndQuorumEpochRequestData.PartitionData) ((EndQuorumEpochRequestData.TopicData) data.topics().get(0)).partitions().get(0);
        int leaderEpoch = partitionData.leaderEpoch();
        int leaderId = partitionData.leaderId();
        Optional<Errors> validateVoterOnlyRequest = validateVoterOnlyRequest(leaderId, leaderEpoch);
        if (validateVoterOnlyRequest.isPresent()) {
            return buildEndQuorumEpochResponse(inbound.listenerName(), inbound.apiVersion(), validateVoterOnlyRequest.get());
        }
        maybeTransition(OptionalInt.of(leaderId), leaderEpoch, data.leaderEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(leaderId) : Endpoints.fromEndQuorumEpochRequest(data.leaderEndpoints()), j);
        if (this.quorum.isFollower()) {
            FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
            if (followerStateOrThrow.leaderId() == leaderId) {
                long endEpochElectionBackoff = endEpochElectionBackoff((List) EndQuorumEpochRequest.preferredCandidates(partitionData).stream().map(replicaInfo -> {
                    return ReplicaKey.of(replicaInfo.candidateId(), replicaInfo.candidateDirectoryId());
                }).collect(Collectors.toList()));
                this.logger.debug("Overriding follower fetch timeout to {} after receiving EndQuorumEpoch request from leader {} in epoch {}", new Object[]{Long.valueOf(endEpochElectionBackoff), Integer.valueOf(leaderId), Integer.valueOf(leaderEpoch)});
                followerStateOrThrow.overrideFetchTimeout(j, endEpochElectionBackoff);
            }
        }
        return buildEndQuorumEpochResponse(inbound.listenerName(), inbound.apiVersion(), Errors.NONE);
    }

    private long endEpochElectionBackoff(Collection<ReplicaKey> collection) {
        int i = 0;
        for (ReplicaKey replicaKey : collection) {
            if (replicaKey.id() == this.quorum.localIdOrThrow() && (!replicaKey.directoryId().isPresent() || replicaKey.directoryId().get().equals(this.quorum.localDirectoryId()))) {
                break;
            }
            i++;
        }
        return strictExponentialElectionBackoffMs(i, collection.size());
    }

    private boolean handleEndQuorumEpochResponse(RaftResponse.Inbound inbound, long j) {
        EndQuorumEpochResponseData data = inbound.data();
        Errors forCode = Errors.forCode(data.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return false;
        }
        EndQuorumEpochResponseData.PartitionData partitionData = (EndQuorumEpochResponseData.PartitionData) ((EndQuorumEpochResponseData.TopicData) data.topics().get(0)).partitions().get(0);
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        OptionalInt optionalLeaderId = optionalLeaderId(partitionData.leaderId());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId, partitionData.leaderEpoch(), optionalLeaderId.isPresent() ? data.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(optionalLeaderId.getAsInt()) : Endpoints.fromEndQuorumEpochResponse(this.channel.listenerName(), optionalLeaderId.getAsInt(), data.nodeEndpoints()) : Endpoints.empty(), inbound.source(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        this.quorum.resignedStateOrThrow().acknowledgeResignation(inbound.source().id());
        return true;
    }

    private FetchResponseData buildFetchResponse(ListenerName listenerName, short s, Errors errors, Records records, ValidOffsetAndEpoch validOffsetAndEpoch, Optional<LogOffsetMetadata> optional) {
        return RaftUtil.singletonFetchResponse(listenerName, s, this.log.topicPartition(), this.log.topicId(), Errors.NONE, this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionData -> {
            partitionData.setRecords(records).setErrorCode(errors.code()).setLogStartOffset(this.log.startOffset()).setHighWatermark(((Long) optional.map((v0) -> {
                return v0.offset();
            }).orElse(-1L)).longValue());
            partitionData.currentLeader().setLeaderEpoch(this.quorum.epoch()).setLeaderId(this.quorum.leaderIdOrSentinel());
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$raft$ValidOffsetAndEpoch$Kind[validOffsetAndEpoch.kind().ordinal()]) {
                case QuorumStateData.HIGHEST_SUPPORTED_VERSION /* 1 */:
                    partitionData.divergingEpoch().setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch()).setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset());
                    return;
                case 2:
                    partitionData.snapshotId().setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch()).setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset());
                    return;
                default:
                    return;
            }
        });
    }

    private FetchResponseData buildEmptyFetchResponse(ListenerName listenerName, short s, Errors errors, Optional<LogOffsetMetadata> optional) {
        return buildFetchResponse(listenerName, s, errors, MemoryRecords.EMPTY, ValidOffsetAndEpoch.valid(), optional);
    }

    private boolean hasValidClusterId(String str) {
        if (str == null) {
            return true;
        }
        return this.clusterId.equals(str);
    }

    private CompletableFuture<FetchResponseData> handleFetchRequest(RaftRequest.Inbound inbound, long j) {
        FetchRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition(), this.log.topicId())) {
            return CompletableFuture.completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        }
        ((FetchRequestData.FetchTopic) data.topics().get(0)).setTopic(this.log.topicPartition().topic());
        FetchRequestData.FetchPartition fetchPartition = (FetchRequestData.FetchPartition) ((FetchRequestData.FetchTopic) data.topics().get(0)).partitions().get(0);
        if (data.maxWaitMs() < 0 || fetchPartition.fetchOffset() < 0 || fetchPartition.lastFetchedEpoch() < 0 || fetchPartition.lastFetchedEpoch() > fetchPartition.currentLeaderEpoch()) {
            return CompletableFuture.completedFuture(buildEmptyFetchResponse(inbound.listenerName(), inbound.apiVersion(), Errors.INVALID_REQUEST, Optional.empty()));
        }
        ReplicaKey of = ReplicaKey.of(FetchRequest.replicaId(data), fetchPartition.replicaDirectoryId());
        FetchResponseData tryCompleteFetchRequest = tryCompleteFetchRequest(inbound.listenerName(), inbound.apiVersion(), of, fetchPartition, j);
        FetchResponseData.PartitionData partitionData = (FetchResponseData.PartitionData) ((FetchResponseData.FetchableTopicResponse) tryCompleteFetchRequest.responses().get(0)).partitions().get(0);
        return (partitionData.errorCode() != Errors.NONE.code() || FetchResponse.recordsSize(partitionData) > 0 || data.maxWaitMs() == 0 || isPartitionDiverged(partitionData) || isPartitionSnapshotted(partitionData)) ? CompletableFuture.completedFuture(tryCompleteFetchRequest) : this.fetchPurgatory.await(Long.valueOf(fetchPartition.fetchOffset()), data.maxWaitMs()).handle((l, th) -> {
            if (th == null) {
                this.logger.trace("Completing delayed fetch from {} starting at offset {} at {}", new Object[]{of, Long.valueOf(fetchPartition.fetchOffset()), l});
                return tryCompleteFetchRequest(inbound.listenerName(), inbound.apiVersion(), of, fetchPartition, this.time.milliseconds());
            }
            Errors forException = Errors.forException(th instanceof ExecutionException ? th.getCause() : th);
            if (forException == Errors.REQUEST_TIMED_OUT) {
                return tryCompleteFetchRequest;
            }
            this.logger.info("Failed to handle fetch from {} at {} due to {}", new Object[]{of, Long.valueOf(fetchPartition.fetchOffset()), forException});
            return buildEmptyFetchResponse(inbound.listenerName(), inbound.apiVersion(), forException, Optional.empty());
        });
    }

    private FetchResponseData tryCompleteFetchRequest(ListenerName listenerName, short s, ReplicaKey replicaKey, FetchRequestData.FetchPartition fetchPartition, long j) {
        Records records;
        try {
            Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(fetchPartition.currentLeaderEpoch());
            if (validateLeaderOnlyRequest.isPresent()) {
                return buildEmptyFetchResponse(listenerName, s, validateLeaderOnlyRequest.get(), Optional.empty());
            }
            long fetchOffset = fetchPartition.fetchOffset();
            int lastFetchedEpoch = fetchPartition.lastFetchedEpoch();
            LeaderState<T> leaderStateOrThrow = this.quorum.leaderStateOrThrow();
            Optional<OffsetAndEpoch> latestSnapshotId = this.log.latestSnapshotId();
            ValidOffsetAndEpoch snapshot = (fetchOffset == 0 && latestSnapshotId.isPresent() && !latestSnapshotId.get().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) ? ValidOffsetAndEpoch.snapshot(latestSnapshotId.get()) : this.log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
            if (snapshot.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                LogFetchInfo read = this.log.read(fetchOffset, Isolation.UNCOMMITTED);
                if (leaderStateOrThrow.updateReplicaState(replicaKey, j, read.startOffsetMetadata)) {
                    onUpdateLeaderHighWatermark(leaderStateOrThrow, j);
                }
                records = read.records;
            } else {
                records = MemoryRecords.EMPTY;
            }
            return buildFetchResponse(listenerName, s, Errors.NONE, records, snapshot, leaderStateOrThrow.highWatermark());
        } catch (Exception e) {
            this.logger.error("Caught unexpected error in fetch completion of request {}", fetchPartition, e);
            return buildEmptyFetchResponse(listenerName, s, Errors.UNKNOWN_SERVER_ERROR, Optional.empty());
        }
    }

    private static boolean isPartitionDiverged(FetchResponseData.PartitionData partitionData) {
        FetchResponseData.EpochEndOffset divergingEpoch = partitionData.divergingEpoch();
        return (divergingEpoch.epoch() == -1 && divergingEpoch.endOffset() == -1) ? false : true;
    }

    private static boolean isPartitionSnapshotted(FetchResponseData.PartitionData partitionData) {
        FetchResponseData.SnapshotId snapshotId = partitionData.snapshotId();
        return (snapshotId.epoch() == -1 && snapshotId.endOffset() == -1) ? false : true;
    }

    private static OptionalInt optionalLeaderId(int i) {
        return i < 0 ? OptionalInt.empty() : OptionalInt.of(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String listenerName(RaftClient.Listener<?> listener) {
        return String.format("%s@%d", listener.getClass().getTypeName(), Integer.valueOf(System.identityHashCode(listener)));
    }

    private boolean handleFetchResponse(RaftResponse.Inbound inbound, long j) {
        FetchResponseData data = inbound.data();
        Errors forCode = Errors.forCode(data.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition(), this.log.topicId())) {
            return false;
        }
        ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).setTopic(this.log.topicPartition().topic());
        FetchResponseData.PartitionData partitionData = (FetchResponseData.PartitionData) ((FetchResponseData.FetchableTopicResponse) data.responses().get(0)).partitions().get(0);
        FetchResponseData.LeaderIdAndEpoch currentLeader = partitionData.currentLeader();
        OptionalInt optionalLeaderId = optionalLeaderId(currentLeader.leaderId());
        int leaderEpoch = currentLeader.leaderEpoch();
        Errors forCode2 = Errors.forCode(partitionData.errorCode());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(forCode2, optionalLeaderId, leaderEpoch, optionalLeaderId.isPresent() ? data.nodeEndpoints().isEmpty() ? this.partitionState.lastVoterSet().listeners(optionalLeaderId.getAsInt()) : Endpoints.fromFetchResponse(this.channel.listenerName(), optionalLeaderId.getAsInt(), data.nodeEndpoints()) : Endpoints.empty(), inbound.source(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
        if (forCode2 != Errors.NONE) {
            return handleUnexpectedError(forCode2, inbound);
        }
        FetchResponseData.EpochEndOffset divergingEpoch = partitionData.divergingEpoch();
        if (divergingEpoch.epoch() >= 0) {
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(divergingEpoch.endOffset(), divergingEpoch.epoch());
            followerStateOrThrow.highWatermark().ifPresent(logOffsetMetadata -> {
                if (offsetAndEpoch.offset() < logOffsetMetadata.offset()) {
                    throw new KafkaException("The leader requested truncation to offset " + offsetAndEpoch.offset() + ", which is below the current high watermark " + logOffsetMetadata);
                }
            });
            long truncateToEndOffset = this.log.truncateToEndOffset(offsetAndEpoch);
            this.logger.info("Truncated to offset {} from Fetch response from leader {}", Long.valueOf(truncateToEndOffset), Integer.valueOf(this.quorum.leaderIdOrSentinel()));
            this.partitionState.truncateNewEntries(truncateToEndOffset);
        } else if (partitionData.snapshotId().epoch() < 0 && partitionData.snapshotId().endOffset() < 0) {
            Records recordsOrFail = FetchResponse.recordsOrFail(partitionData);
            if (recordsOrFail.sizeInBytes() > 0) {
                appendAsFollower(recordsOrFail);
            }
            updateFollowerHighWatermark(followerStateOrThrow, partitionData.highWatermark() < 0 ? OptionalLong.empty() : OptionalLong.of(partitionData.highWatermark()));
        } else {
            if (partitionData.snapshotId().epoch() < 0) {
                this.logger.error("The leader sent a snapshot id with a valid end offset {} but with an invalid epoch {}", Long.valueOf(partitionData.snapshotId().endOffset()), Integer.valueOf(partitionData.snapshotId().epoch()));
                return false;
            }
            if (partitionData.snapshotId().endOffset() < 0) {
                this.logger.error("The leader sent a snapshot id with a valid epoch {} but with an invalid end offset {}", Integer.valueOf(partitionData.snapshotId().epoch()), Long.valueOf(partitionData.snapshotId().endOffset()));
                return false;
            }
            OffsetAndEpoch offsetAndEpoch2 = new OffsetAndEpoch(partitionData.snapshotId().endOffset(), partitionData.snapshotId().epoch());
            followerStateOrThrow.setFetchingSnapshot(this.log.createNewSnapshotUnchecked(offsetAndEpoch2));
            if (followerStateOrThrow.fetchingSnapshot().isPresent()) {
                this.logger.info("Fetching snapshot {} from Fetch response from leader {}", offsetAndEpoch2, Integer.valueOf(this.quorum.leaderIdOrSentinel()));
            } else {
                this.logger.info("Leader {} returned a snapshot {} in the FETCH response which is already stored", Integer.valueOf(this.quorum.leaderIdOrSentinel()), offsetAndEpoch2);
            }
        }
        followerStateOrThrow.resetFetchTimeout(j);
        return true;
    }

    private void appendAsFollower(Records records) {
        LogAppendInfo appendAsFollower = this.log.appendAsFollower(records);
        if (this.quorum.isVoter() || this.followersAlwaysFlush) {
            this.log.flush(false);
        }
        this.partitionState.updateState();
        OffsetAndEpoch endOffset = endOffset();
        this.kafkaRaftMetrics.updateFetchedRecords((appendAsFollower.lastOffset - appendAsFollower.firstOffset) + 1);
        this.kafkaRaftMetrics.updateLogEnd(endOffset);
        this.logger.trace("Follower end offset updated to {} after append", endOffset);
    }

    private LogAppendInfo appendAsLeader(Records records) {
        LogAppendInfo appendAsLeader = this.log.appendAsLeader(records, this.quorum.epoch());
        this.partitionState.updateState();
        OffsetAndEpoch endOffset = endOffset();
        this.kafkaRaftMetrics.updateAppendRecords((appendAsLeader.lastOffset - appendAsLeader.firstOffset) + 1);
        this.kafkaRaftMetrics.updateLogEnd(endOffset);
        this.logger.trace("Leader appended records at base offset {}, new end offset is {}", Long.valueOf(appendAsLeader.firstOffset), endOffset);
        return appendAsLeader;
    }

    private DescribeQuorumResponseData handleDescribeQuorumRequest(RaftRequest.Inbound inbound, long j) {
        DescribeQuorumRequestData data = inbound.data();
        if (!RaftUtil.hasValidTopicPartition(data, this.log.topicPartition())) {
            return DescribeQuorumRequest.getPartitionLevelErrorResponse(data, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        if (!this.quorum.isLeader()) {
            return DescribeQuorumResponse.singletonErrorResponse(this.log.topicPartition(), Errors.NOT_LEADER_OR_FOLLOWER);
        }
        LeaderState<T> leaderStateOrThrow = this.quorum.leaderStateOrThrow();
        return RaftUtil.singletonDescribeQuorumResponse(inbound.apiVersion(), this.log.topicPartition(), this.quorum.localIdOrThrow(), leaderStateOrThrow.epoch(), ((Long) leaderStateOrThrow.highWatermark().map((v0) -> {
            return v0.offset();
        }).orElse(-1L)).longValue(), leaderStateOrThrow.voterStates().values(), leaderStateOrThrow.observerStates(j).values(), j);
    }

    private FetchSnapshotResponseData handleFetchSnapshotRequest(RaftRequest.Inbound inbound, long j) {
        int i;
        FetchSnapshotRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }
        if (data.topics().size() != 1 && ((FetchSnapshotRequestData.TopicSnapshot) data.topics().get(0)).partitions().size() != 1) {
            return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
        }
        Optional forTopicPartition = FetchSnapshotRequest.forTopicPartition(data, this.log.topicPartition());
        if (!forTopicPartition.isPresent()) {
            return RaftUtil.singletonFetchSnapshotResponse(inbound.listenerName(), inbound.apiVersion(), new TopicPartition(((FetchSnapshotRequestData.TopicSnapshot) data.topics().get(0)).name(), ((FetchSnapshotRequestData.PartitionSnapshot) ((FetchSnapshotRequestData.TopicSnapshot) data.topics().get(0)).partitions().get(0)).partition()), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionSnapshot -> {
                return partitionSnapshot.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
            });
        }
        FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot2 = (FetchSnapshotRequestData.PartitionSnapshot) forTopicPartition.get();
        Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(partitionSnapshot2.currentLeaderEpoch());
        if (validateLeaderOnlyRequest.isPresent()) {
            return RaftUtil.singletonFetchSnapshotResponse(inbound.listenerName(), inbound.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionSnapshot3 -> {
                return addQuorumLeader(partitionSnapshot3).setErrorCode(((Errors) validateLeaderOnlyRequest.get()).code());
            });
        }
        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(partitionSnapshot2.snapshotId().endOffset(), partitionSnapshot2.snapshotId().epoch());
        Optional<RawSnapshotReader> readSnapshot = this.log.readSnapshot(offsetAndEpoch);
        if (!readSnapshot.isPresent() || offsetAndEpoch.equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
            return RaftUtil.singletonFetchSnapshotResponse(inbound.listenerName(), inbound.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionSnapshot4 -> {
                return addQuorumLeader(partitionSnapshot4).setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
            });
        }
        RawSnapshotReader rawSnapshotReader = readSnapshot.get();
        long sizeInBytes = rawSnapshotReader.sizeInBytes();
        if (partitionSnapshot2.position() < 0 || partitionSnapshot2.position() >= sizeInBytes) {
            return RaftUtil.singletonFetchSnapshotResponse(inbound.listenerName(), inbound.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionSnapshot5 -> {
                return addQuorumLeader(partitionSnapshot5).setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
            });
        }
        if (partitionSnapshot2.position() > 2147483647L) {
            throw new IllegalStateException(String.format("Trying to fetch a snapshot with size (%d) and a position (%d) larger than %d", Long.valueOf(sizeInBytes), Long.valueOf(partitionSnapshot2.position()), Integer.MAX_VALUE));
        }
        try {
            i = Math.toIntExact(sizeInBytes);
        } catch (ArithmeticException e) {
            i = Integer.MAX_VALUE;
        }
        UnalignedRecords slice = rawSnapshotReader.slice(partitionSnapshot2.position(), Math.min(data.maxBytes(), i));
        this.quorum.leaderStateOrThrow().updateCheckQuorumForFollowingVoter(ReplicaKey.of(data.replicaId(), partitionSnapshot2.replicaDirectoryId()), j);
        return RaftUtil.singletonFetchSnapshotResponse(inbound.listenerName(), inbound.apiVersion(), this.log.topicPartition(), this.quorum.leaderIdOrSentinel(), this.quorum.leaderEndpoints(), partitionSnapshot6 -> {
            addQuorumLeader(partitionSnapshot6).snapshotId().setEndOffset(offsetAndEpoch.offset()).setEpoch(offsetAndEpoch.epoch());
            return partitionSnapshot6.setSize(sizeInBytes).setPosition(partitionSnapshot2.position()).setUnalignedRecords(slice);
        });
    }

    private Endpoints computeFetchSnapshotLeaderEndpoints(OptionalInt optionalInt, FetchSnapshotResponseData.NodeEndpointCollection nodeEndpointCollection) {
        Endpoints empty = Endpoints.empty();
        if (optionalInt.isPresent()) {
            empty = Endpoints.fromFetchSnapshotResponse(this.channel.listenerName(), optionalInt.getAsInt(), nodeEndpointCollection);
            if (empty.isEmpty()) {
                empty = this.partitionState.lastVoterSet().listeners(optionalInt.getAsInt());
            }
        }
        return empty;
    }

    private boolean handleFetchSnapshotResponse(RaftResponse.Inbound inbound, long j) {
        UnalignedMemoryRecords unalignedMemoryRecords;
        FetchSnapshotResponseData data = inbound.data();
        Errors forCode = Errors.forCode(data.errorCode());
        if (forCode != Errors.NONE) {
            return handleTopLevelError(forCode, inbound);
        }
        if (data.topics().size() != 1 && ((FetchSnapshotResponseData.TopicSnapshot) data.topics().get(0)).partitions().size() != 1) {
            return false;
        }
        Optional forTopicPartition = FetchSnapshotResponse.forTopicPartition(data, this.log.topicPartition());
        if (!forTopicPartition.isPresent()) {
            return false;
        }
        FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = (FetchSnapshotResponseData.PartitionSnapshot) forTopicPartition.get();
        FetchSnapshotResponseData.LeaderIdAndEpoch currentLeader = partitionSnapshot.currentLeader();
        OptionalInt optionalLeaderId = optionalLeaderId(currentLeader.leaderId());
        Optional<Boolean> maybeHandleCommonResponse = maybeHandleCommonResponse(Errors.forCode(partitionSnapshot.errorCode()), optionalLeaderId, currentLeader.leaderEpoch(), computeFetchSnapshotLeaderEndpoints(optionalLeaderId, data.nodeEndpoints()), inbound.source(), j);
        if (maybeHandleCommonResponse.isPresent()) {
            return maybeHandleCommonResponse.get().booleanValue();
        }
        FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
        if (Errors.forCode(partitionSnapshot.errorCode()) == Errors.SNAPSHOT_NOT_FOUND || partitionSnapshot.snapshotId().endOffset() < 0 || partitionSnapshot.snapshotId().epoch() < 0) {
            this.logger.info("Leader doesn't know about snapshot id {}, returned error {} and snapshot id {}", new Object[]{followerStateOrThrow.fetchingSnapshot(), Short.valueOf(partitionSnapshot.errorCode()), partitionSnapshot.snapshotId()});
            followerStateOrThrow.setFetchingSnapshot(Optional.empty());
            followerStateOrThrow.resetFetchTimeout(j);
            return true;
        }
        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(partitionSnapshot.snapshotId().endOffset(), partitionSnapshot.snapshotId().epoch());
        if (!followerStateOrThrow.fetchingSnapshot().isPresent()) {
            throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
        }
        RawSnapshotWriter rawSnapshotWriter = followerStateOrThrow.fetchingSnapshot().get();
        if (!rawSnapshotWriter.snapshotId().equals(offsetAndEpoch)) {
            throw new IllegalStateException(String.format("Received fetch snapshot response with an invalid id. Expected %s; Received %s", rawSnapshotWriter.snapshotId(), offsetAndEpoch));
        }
        if (rawSnapshotWriter.sizeInBytes() != partitionSnapshot.position()) {
            throw new IllegalStateException(String.format("Received fetch snapshot response with an invalid position. Expected %d; Received %d", Long.valueOf(rawSnapshotWriter.sizeInBytes()), Long.valueOf(partitionSnapshot.position())));
        }
        if (partitionSnapshot.unalignedRecords() instanceof MemoryRecords) {
            unalignedMemoryRecords = new UnalignedMemoryRecords(partitionSnapshot.unalignedRecords().buffer());
        } else {
            if (!(partitionSnapshot.unalignedRecords() instanceof UnalignedMemoryRecords)) {
                throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
            }
            unalignedMemoryRecords = (UnalignedMemoryRecords) partitionSnapshot.unalignedRecords();
        }
        rawSnapshotWriter.append(unalignedMemoryRecords);
        if (rawSnapshotWriter.sizeInBytes() == partitionSnapshot.size()) {
            rawSnapshotWriter.freeze();
            followerStateOrThrow.setFetchingSnapshot(Optional.empty());
            if (!this.log.truncateToLatestSnapshot()) {
                throw new IllegalStateException(String.format("Full log truncation expected but didn't happen. Snapshot of %s, log end offset %s, last fetched %d", rawSnapshotWriter.snapshotId(), this.log.endOffset(), Integer.valueOf(this.log.lastFetchedEpoch())));
            }
            this.logger.info("Fully truncated the log at ({}, {}) after downloading snapshot {} from leader {}", new Object[]{this.log.endOffset(), Integer.valueOf(this.log.lastFetchedEpoch()), rawSnapshotWriter.snapshotId(), Integer.valueOf(this.quorum.leaderIdOrSentinel())});
            this.partitionState.updateState();
            updateFollowerHighWatermark(followerStateOrThrow, OptionalLong.of(this.log.highWatermark().offset()));
        }
        followerStateOrThrow.resetFetchTimeout(j);
        return true;
    }

    private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(RaftRequest.Inbound inbound, long j) {
        AddRaftVoterRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()).setErrorMessage(String.format("The given id \"%s\" doesn't match the cluster id \"%s\"", data.clusterId(), this.clusterId)));
        }
        Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(this.quorum.epoch());
        if (validateLeaderOnlyRequest.isPresent()) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(validateLeaderOnlyRequest.get().code()));
        }
        Optional<ReplicaKey> addVoterRequestVoterKey = RaftUtil.addVoterRequestVoterKey(data);
        if (!addVoterRequestVoterKey.isPresent() || !addVoterRequestVoterKey.get().directoryId().isPresent()) {
            return CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Add voter request didn't include a valid voter"));
        }
        Endpoints fromAddVoterRequest = Endpoints.fromAddVoterRequest(data.listeners());
        return !fromAddVoterRequest.address(this.channel.listenerName()).isPresent() ? CompletableFuture.completedFuture(new AddRaftVoterResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage(String.format("Add voter request didn't include the endpoint (%s) for the default listener %s", fromAddVoterRequest, this.channel.listenerName()))) : this.addVoterHandler.handleAddVoterRequest(this.quorum.leaderStateOrThrow(), addVoterRequestVoterKey.get(), fromAddVoterRequest, j);
    }

    private boolean handleApiVersionsResponse(RaftResponse.Inbound inbound, long j) {
        if (!this.quorum.isLeader()) {
            return true;
        }
        ApiVersionsResponseData data = inbound.data();
        return this.addVoterHandler.handleApiVersionsResponse(this.quorum.leaderStateOrThrow(), inbound.source(), Errors.forCode(data.errorCode()), Optional.ofNullable(data.supportedFeatures().find("kraft.version")), j);
    }

    private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(RaftRequest.Inbound inbound, long j) {
        RemoveRaftVoterRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(new RemoveRaftVoterResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()).setErrorMessage(String.format("The given id \"%s\" doesn't match the cluster id \"%s\"", data.clusterId(), this.clusterId)));
        }
        Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(this.quorum.epoch());
        if (validateLeaderOnlyRequest.isPresent()) {
            return CompletableFuture.completedFuture(new RemoveRaftVoterResponseData().setErrorCode(validateLeaderOnlyRequest.get().code()));
        }
        Optional<ReplicaKey> removeVoterRequestVoterKey = RaftUtil.removeVoterRequestVoterKey(data);
        return (removeVoterRequestVoterKey.isPresent() && removeVoterRequestVoterKey.get().directoryId().isPresent()) ? this.removeVoterHandler.handleRemoveVoterRequest(this.quorum.leaderStateOrThrow(), removeVoterRequestVoterKey.get(), j) : CompletableFuture.completedFuture(new RemoveRaftVoterResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Remove voter request didn't include a valid voter"));
    }

    private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(RaftRequest.Inbound inbound, long j) {
        UpdateRaftVoterRequestData data = inbound.data();
        if (!hasValidClusterId(data.clusterId())) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID, inbound.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        Optional<Errors> validateLeaderOnlyRequest = validateLeaderOnlyRequest(data.currentLeaderEpoch());
        if (validateLeaderOnlyRequest.isPresent()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(validateLeaderOnlyRequest.get(), inbound.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        Optional<ReplicaKey> updateVoterRequestVoterKey = RaftUtil.updateVoterRequestVoterKey(data);
        if (!updateVoterRequestVoterKey.isPresent() || !updateVoterRequestVoterKey.get().directoryId().isPresent()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INVALID_REQUEST, inbound.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        Endpoints fromUpdateVoterRequest = Endpoints.fromUpdateVoterRequest(data.listeners());
        if (!fromUpdateVoterRequest.address(this.channel.listenerName()).isPresent()) {
            return CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INVALID_REQUEST, inbound.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints()));
        }
        UpdateRaftVoterRequestData.KRaftVersionFeature kRaftVersionFeature = data.kRaftVersionFeature();
        return (kRaftVersionFeature.minSupportedVersion() < 0 || kRaftVersionFeature.maxSupportedVersion() < 0 || kRaftVersionFeature.maxSupportedVersion() < kRaftVersionFeature.minSupportedVersion()) ? CompletableFuture.completedFuture(RaftUtil.updateVoterResponse(Errors.INVALID_REQUEST, inbound.listenerName(), this.quorum.leaderAndEpoch(), this.quorum.leaderEndpoints())) : this.updateVoterHandler.handleUpdateVoterRequest(this.quorum.leaderStateOrThrow(), inbound.listenerName(), updateVoterRequestVoterKey.get(), fromUpdateVoterRequest, kRaftVersionFeature, j);
    }

    private boolean handleUpdateVoterResponse(RaftResponse.Inbound inbound, long j) {
        UpdateRaftVoterResponseData data = inbound.data();
        Errors forCode = Errors.forCode(data.errorCode());
        OptionalInt optionalLeaderId = optionalLeaderId(data.currentLeader().leaderId());
        return maybeHandleCommonResponse(forCode, optionalLeaderId, data.currentLeader().leaderEpoch(), (optionalLeaderId.isPresent() && data.currentLeader().host().isEmpty()) ? Endpoints.fromInetSocketAddresses(Collections.singletonMap(this.channel.listenerName(), InetSocketAddress.createUnresolved(data.currentLeader().host(), data.currentLeader().port()))) : Endpoints.empty(), inbound.source(), j).orElse(true).booleanValue();
    }

    private boolean hasConsistentLeader(int i, OptionalInt optionalInt) {
        return (optionalInt.isPresent() && optionalInt.getAsInt() == this.quorum.localIdOrSentinel()) ? this.quorum.isLeader() : (i == this.quorum.epoch() && optionalInt.isPresent() && this.quorum.leaderId().isPresent() && !optionalInt.equals(this.quorum.leaderId())) ? false : true;
    }

    private Optional<Boolean> maybeHandleCommonResponse(Errors errors, OptionalInt optionalInt, int i, Endpoints endpoints, Node node, long j) {
        if (endpoints.isEmpty() && optionalInt.isPresent()) {
            endpoints = this.partitionState.lastVoterSet().listeners(optionalInt.getAsInt());
        }
        if (i < this.quorum.epoch() || errors == Errors.UNKNOWN_LEADER_EPOCH) {
            return Optional.of(true);
        }
        if (i > this.quorum.epoch() || errors == Errors.FENCED_LEADER_EPOCH || errors == Errors.NOT_LEADER_OR_FOLLOWER) {
            maybeTransition(optionalInt, i, endpoints, j);
            return Optional.of(true);
        }
        if (i == this.quorum.epoch() && optionalInt.isPresent() && !this.quorum.hasLeader()) {
            transitionToFollower(i, optionalInt.getAsInt(), endpoints, j);
            return errors == Errors.NONE ? Optional.empty() : Optional.of(true);
        }
        if (errors == Errors.BROKER_NOT_AVAILABLE) {
            return Optional.of(false);
        }
        if (errors == Errors.INVALID_VOTER_KEY) {
            this.logger.info("Voter key for VOTE or BEGIN_QUORUM_EPOCH request didn't match the receiver's replica key: {}", node);
            return Optional.of(true);
        }
        if (errors == Errors.INVALID_REQUEST) {
            throw new IllegalStateException("Received unexpected invalid request error");
        }
        return Optional.empty();
    }

    private void maybeTransition(OptionalInt optionalInt, int i, Endpoints endpoints, long j) {
        if (!hasConsistentLeader(i, optionalInt)) {
            throw new IllegalStateException("Received request or response with leader " + optionalInt + " and epoch " + i + " which is inconsistent with current leader " + this.quorum.leaderId() + " and epoch " + this.quorum.epoch());
        }
        if (i > this.quorum.epoch()) {
            if (optionalInt.isPresent()) {
                transitionToFollower(i, optionalInt.getAsInt(), endpoints, j);
                return;
            } else {
                transitionToUnattached(i);
                return;
            }
        }
        if (optionalInt.isPresent()) {
            if (!this.quorum.hasLeader() || endpoints.size() > this.quorum.leaderEndpoints().size()) {
                transitionToFollower(i, optionalInt.getAsInt(), endpoints, j);
            }
        }
    }

    private boolean handleTopLevelError(Errors errors, RaftResponse.Inbound inbound) {
        if (errors == Errors.BROKER_NOT_AVAILABLE) {
            return false;
        }
        if (errors == Errors.CLUSTER_AUTHORIZATION_FAILED) {
            throw new ClusterAuthorizationException("Received cluster authorization error in response " + inbound);
        }
        return handleUnexpectedError(errors, inbound);
    }

    private boolean handleUnexpectedError(Errors errors, RaftResponse.Inbound inbound) {
        this.logger.error("Unexpected error {} in {} response: {}", new Object[]{errors, ApiKeys.forId(inbound.data().apiKey()), inbound});
        return false;
    }

    private void handleResponse(RaftResponse.Inbound inbound, long j) {
        boolean handleUpdateVoterResponse;
        ApiKeys forId = ApiKeys.forId(inbound.data().apiKey());
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[forId.ordinal()]) {
            case QuorumStateData.HIGHEST_SUPPORTED_VERSION /* 1 */:
                handleUpdateVoterResponse = handleFetchResponse(inbound, j);
                break;
            case 2:
                handleUpdateVoterResponse = handleVoteResponse(inbound, j);
                break;
            case 3:
                handleUpdateVoterResponse = handleBeginQuorumEpochResponse(inbound, j);
                break;
            case 4:
                handleUpdateVoterResponse = handleEndQuorumEpochResponse(inbound, j);
                break;
            case 5:
                handleUpdateVoterResponse = handleFetchSnapshotResponse(inbound, j);
                break;
            case 6:
                handleUpdateVoterResponse = handleApiVersionsResponse(inbound, j);
                break;
            case 7:
                handleUpdateVoterResponse = handleUpdateVoterResponse(inbound, j);
                break;
            default:
                throw new IllegalArgumentException("Received unexpected response type: " + forId);
        }
        this.requestManager.onResponseResult(inbound.source(), inbound.correlationId(), handleUpdateVoterResponse, j);
    }

    private Optional<Errors> validateVoterOnlyRequest(int i, int i2) {
        return i2 < this.quorum.epoch() ? Optional.of(Errors.FENCED_LEADER_EPOCH) : i < 0 ? Optional.of(Errors.INVALID_REQUEST) : Optional.empty();
    }

    private boolean isValidVoterKey(Optional<ReplicaKey> optional) {
        return ((Boolean) optional.map(replicaKey -> {
            if (!OptionalInt.of(replicaKey.id()).equals(this.nodeId)) {
                return false;
            }
            if (replicaKey.directoryId().isPresent()) {
                return Boolean.valueOf(replicaKey.directoryId().get().equals(this.nodeDirectoryId));
            }
            return true;
        }).orElse(true)).booleanValue();
    }

    private Optional<Errors> validateLeaderOnlyRequest(int i) {
        return i < this.quorum.epoch() ? Optional.of(Errors.FENCED_LEADER_EPOCH) : i > this.quorum.epoch() ? Optional.of(Errors.UNKNOWN_LEADER_EPOCH) : !this.quorum.isLeader() ? Optional.of(Errors.NOT_LEADER_OR_FOLLOWER) : this.shutdown.get() != null ? Optional.of(Errors.BROKER_NOT_AVAILABLE) : Optional.empty();
    }

    private void handleRequest(RaftRequest.Inbound inbound, long j) {
        CompletableFuture<FetchResponseData> handleUpdateVoterRequest;
        ApiKeys forId = ApiKeys.forId(inbound.data().apiKey());
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[forId.ordinal()]) {
            case QuorumStateData.HIGHEST_SUPPORTED_VERSION /* 1 */:
                handleUpdateVoterRequest = handleFetchRequest(inbound, j);
                break;
            case 2:
                handleUpdateVoterRequest = CompletableFuture.completedFuture(handleVoteRequest(inbound));
                break;
            case 3:
                handleUpdateVoterRequest = CompletableFuture.completedFuture(handleBeginQuorumEpochRequest(inbound, j));
                break;
            case 4:
                handleUpdateVoterRequest = CompletableFuture.completedFuture(handleEndQuorumEpochRequest(inbound, j));
                break;
            case 5:
                handleUpdateVoterRequest = CompletableFuture.completedFuture(handleFetchSnapshotRequest(inbound, j));
                break;
            case 6:
            default:
                throw new IllegalArgumentException("Unexpected request type " + forId);
            case 7:
                handleUpdateVoterRequest = handleUpdateVoterRequest(inbound, j);
                break;
            case 8:
                handleUpdateVoterRequest = CompletableFuture.completedFuture(handleDescribeQuorumRequest(inbound, j));
                break;
            case 9:
                handleUpdateVoterRequest = handleAddVoterRequest(inbound, j);
                break;
            case MAX_NUMBER_OF_BATCHES /* 10 */:
                handleUpdateVoterRequest = handleRemoveVoterRequest(inbound, j);
                break;
        }
        handleUpdateVoterRequest.whenComplete((apiMessage, th) -> {
            ApiMessage apiMessage = apiMessage;
            if (apiMessage == null) {
                apiMessage = RaftUtil.errorResponse(forId, Errors.forException(th));
            }
            RaftResponse.Outbound outbound = new RaftResponse.Outbound(inbound.correlationId(), apiMessage);
            inbound.completion.complete(outbound);
            this.logger.trace("Sent response {} to inbound request {}", outbound, inbound);
        });
    }

    private void handleInboundMessage(RaftMessage raftMessage, long j) {
        this.logger.trace("Received inbound message {}", raftMessage);
        if (raftMessage instanceof RaftRequest.Inbound) {
            handleRequest((RaftRequest.Inbound) raftMessage, j);
            return;
        }
        if (!(raftMessage instanceof RaftResponse.Inbound)) {
            throw new IllegalArgumentException("Unexpected message " + raftMessage);
        }
        RaftResponse.Inbound inbound = (RaftResponse.Inbound) raftMessage;
        if (this.requestManager.isResponseExpected(inbound.source(), inbound.correlationId())) {
            handleResponse(inbound, j);
        } else {
            this.logger.debug("Ignoring response {} since it is no longer needed", inbound);
        }
    }

    private long maybeSendRequest(long j, Node node, Supplier<ApiMessage> supplier) {
        if (this.requestManager.isBackingOff(node, j)) {
            long remainingBackoffMs = this.requestManager.remainingBackoffMs(node, j);
            this.logger.debug("Connection for {} is backing off for {} ms", node, Long.valueOf(remainingBackoffMs));
            return remainingBackoffMs;
        }
        if (this.requestManager.isReady(node, j)) {
            int newCorrelationId = this.channel.newCorrelationId();
            ApiMessage apiMessage = supplier.get();
            RaftRequest.Outbound outbound = new RaftRequest.Outbound(newCorrelationId, apiMessage, node, j);
            outbound.completion.whenComplete((inbound, th) -> {
                if (th != null) {
                    inbound = new RaftResponse.Inbound(newCorrelationId, RaftUtil.errorResponse(ApiKeys.forId(apiMessage.apiKey()), Errors.forException(th)), node);
                }
                this.messageQueue.add(inbound);
            });
            this.requestManager.onRequestSent(node, newCorrelationId, j);
            this.channel.send(outbound);
            this.logger.trace("Sent outbound request: {}", outbound);
        }
        return this.requestManager.remainingRequestTimeMs(node, j);
    }

    private EndQuorumEpochRequestData buildEndQuorumEpochRequest(ResignedState resignedState) {
        return RaftUtil.singletonEndQuorumEpochRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow(), resignedState.preferredSuccessors());
    }

    private long maybeSendRequests(long j, Set<Node> set, Supplier<ApiMessage> supplier) {
        long j2 = Long.MAX_VALUE;
        Iterator<Node> it = set.iterator();
        while (it.hasNext()) {
            long maybeSendRequest = maybeSendRequest(j, it.next(), supplier);
            if (maybeSendRequest < j2) {
                j2 = maybeSendRequest;
            }
        }
        return j2;
    }

    private long maybeSendRequest(long j, Set<ReplicaKey> set, Function<Integer, Node> function, Function<ReplicaKey, ApiMessage> function2) {
        long j2 = Long.MAX_VALUE;
        for (ReplicaKey replicaKey : set) {
            j2 = Math.min(j2, maybeSendRequest(j, function.apply(Integer.valueOf(replicaKey.id())), () -> {
                return (ApiMessage) function2.apply(replicaKey);
            }));
        }
        return j2;
    }

    private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest(ReplicaKey replicaKey) {
        return RaftUtil.singletonBeginQuorumEpochRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localIdOrThrow(), this.quorum.leaderEndpoints(), replicaKey);
    }

    private VoteRequestData buildVoteRequest(ReplicaKey replicaKey) {
        OffsetAndEpoch endOffset = endOffset();
        return RaftUtil.singletonVoteRequest(this.log.topicPartition(), this.clusterId, this.quorum.epoch(), this.quorum.localReplicaKeyOrThrow(), replicaKey, endOffset.epoch(), endOffset.offset());
    }

    private FetchRequestData buildFetchRequest() {
        return RaftUtil.singletonFetchRequest(this.log.topicPartition(), this.log.topicId(), fetchPartition -> {
            fetchPartition.setCurrentLeaderEpoch(this.quorum.epoch()).setLastFetchedEpoch(this.log.lastFetchedEpoch()).setFetchOffset(this.log.endOffset().offset()).setReplicaDirectoryId(this.quorum.localDirectoryId());
        }).setMaxBytes(8388608).setMaxWaitMs(this.fetchMaxWaitMs).setClusterId(this.clusterId).setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(this.quorum.localIdOrSentinel()));
    }

    private long maybeSendFetchToAnyBootstrap(long j) {
        Optional<Node> findReadyBootstrapServer = this.requestManager.findReadyBootstrapServer(j);
        return findReadyBootstrapServer.isPresent() ? maybeSendRequest(j, findReadyBootstrapServer.get(), this::buildFetchRequest) : this.requestManager.backoffBeforeAvailableBootstrapServer(j);
    }

    private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch offsetAndEpoch, long j) {
        return RaftUtil.singletonFetchSnapshotRequest(this.clusterId, ReplicaKey.of(quorum().localIdOrSentinel(), this.quorum.localDirectoryId()), this.log.topicPartition(), this.quorum.epoch(), offsetAndEpoch, 8388608, j);
    }

    private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot) {
        partitionSnapshot.currentLeader().setLeaderEpoch(this.quorum.epoch()).setLeaderId(this.quorum.leaderIdOrSentinel());
        return partitionSnapshot;
    }

    public boolean isRunning() {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        return gracefulShutdown == null || !gracefulShutdown.isFinished();
    }

    public boolean isShuttingDown() {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        return (gracefulShutdown == null || gracefulShutdown.isFinished()) ? false : true;
    }

    private void appendBatch(LeaderState<T> leaderState, BatchAccumulator.CompletedBatch<T> completedBatch, long j) {
        try {
            int epoch = leaderState.epoch();
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(appendAsLeader(completedBatch.data).lastOffset, epoch);
            this.appendPurgatory.await(Long.valueOf(offsetAndEpoch.offset() + 1), 2147483647L).whenComplete((l, th) -> {
                if (th != null) {
                    this.logger.debug("Failed to commit {} records up to last offset {}", new Object[]{Integer.valueOf(completedBatch.numRecords), offsetAndEpoch, th});
                    return;
                }
                this.kafkaRaftMetrics.updateCommitLatency(Math.max(0L, l.longValue() - j) / completedBatch.numRecords, j);
                this.logger.debug("Completed commit of {} records up to last offset {}", Integer.valueOf(completedBatch.numRecords), offsetAndEpoch);
                completedBatch.records.ifPresent(list -> {
                    maybeFireHandleCommit(completedBatch.baseOffset, epoch, completedBatch.appendTimestamp(), completedBatch.sizeInBytes(), list);
                });
            });
            completedBatch.release();
        } catch (Throwable th2) {
            completedBatch.release();
            throw th2;
        }
    }

    private long maybeAppendBatches(LeaderState<T> leaderState, long j) {
        boolean hasNext;
        if (leaderState.accumulator().timeUntilDrain(j) <= 0) {
            Iterator<BatchAccumulator.CompletedBatch<T>> it = leaderState.accumulator().drain().iterator();
            while (it.hasNext()) {
                try {
                    appendBatch(leaderState, it.next(), j);
                } finally {
                    while (it.hasNext()) {
                        it.next().release();
                    }
                }
            }
            flushLeaderLog(leaderState, j);
            while (true) {
                if (!hasNext) {
                    break;
                }
            }
        }
        return leaderState.accumulator().timeUntilDrain(j);
    }

    private long maybeSendBeginQuorumEpochRequests(LeaderState<T> leaderState, long j) {
        long timeUntilBeginQuorumEpochTimerExpires = leaderState.timeUntilBeginQuorumEpochTimerExpires(j);
        if (timeUntilBeginQuorumEpochTimerExpires == 0) {
            VoterSet lastVoterSet = this.partitionState.lastVoterSet();
            timeUntilBeginQuorumEpochTimerExpires = maybeSendRequest(j, (Set) lastVoterSet.voterKeys().stream().filter(replicaKey -> {
                return replicaKey.id() != this.quorum.localIdOrThrow();
            }).collect(Collectors.toSet()), num -> {
                return lastVoterSet.voterNode(num.intValue(), this.channel.listenerName()).orElseThrow(() -> {
                    return new IllegalStateException(String.format("Unknown endpoint for voter id %d for listener name %s", num, this.channel.listenerName()));
                });
            }, this::buildBeginQuorumEpochRequest);
            leaderState.resetBeginQuorumEpochTimer(j);
        }
        return timeUntilBeginQuorumEpochTimerExpires;
    }

    private long pollResigned(long j) {
        long remainingElectionTimeMs;
        ResignedState resignedStateOrThrow = this.quorum.resignedStateOrThrow();
        long maybeSendRequests = maybeSendRequests(j, this.partitionState.lastVoterSet().voterNodes(resignedStateOrThrow.unackedVoters().stream(), this.channel.listenerName()), () -> {
            return buildEndQuorumEpochRequest(resignedStateOrThrow);
        });
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            remainingElectionTimeMs = gracefulShutdown.remainingTimeMs();
        } else if (resignedStateOrThrow.hasElectionTimeoutExpired(j)) {
            if (this.quorum.isVoter()) {
                transitionToCandidate(j);
            } else {
                transitionToUnattached(this.quorum.epoch() + 1);
            }
            remainingElectionTimeMs = 0;
        } else {
            remainingElectionTimeMs = resignedStateOrThrow.remainingElectionTimeMs(j);
        }
        return Math.min(remainingElectionTimeMs, maybeSendRequests);
    }

    private long pollLeader(long j) {
        LeaderState<T> leaderStateOrThrow = this.quorum.leaderStateOrThrow();
        maybeFireLeaderChange(leaderStateOrThrow);
        long timeUntilCheckQuorumExpires = leaderStateOrThrow.timeUntilCheckQuorumExpires(j);
        if (this.shutdown.get() != null || leaderStateOrThrow.isResignRequested() || timeUntilCheckQuorumExpires == 0) {
            transitionToResigned(leaderStateOrThrow.nonLeaderVotersByDescendingFetchOffset());
            return 0L;
        }
        return Math.min(maybeAppendBatches(leaderStateOrThrow, j), Math.min(maybeSendBeginQuorumEpochRequests(leaderStateOrThrow, j), Math.min(timeUntilCheckQuorumExpires, leaderStateOrThrow.maybeExpirePendingOperation(j))));
    }

    private long maybeSendVoteRequests(CandidateState candidateState, long j) {
        if (candidateState.isVoteRejected()) {
            return Long.MAX_VALUE;
        }
        VoterSet lastVoterSet = this.partitionState.lastVoterSet();
        return maybeSendRequest(j, candidateState.unrecordedVoters(), num -> {
            return lastVoterSet.voterNode(num.intValue(), this.channel.listenerName()).orElseThrow(() -> {
                return new IllegalStateException(String.format("Unknown endpoint for voter id %d for listener name %s", num, this.channel.listenerName()));
            });
        }, this::buildVoteRequest);
    }

    private long pollCandidate(long j) {
        CandidateState candidateStateOrThrow = this.quorum.candidateStateOrThrow();
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            return Math.min(gracefulShutdown.remainingTimeMs(), maybeSendVoteRequests(candidateStateOrThrow, j));
        }
        if (candidateStateOrThrow.isBackingOff()) {
            if (!candidateStateOrThrow.isBackoffComplete(j)) {
                return candidateStateOrThrow.remainingBackoffMs(j);
            }
            this.logger.info("Re-elect as candidate after election backoff has completed");
            transitionToCandidate(j);
            return 0L;
        }
        if (!candidateStateOrThrow.hasElectionTimeoutExpired(j)) {
            return Math.min(maybeSendVoteRequests(candidateStateOrThrow, j), candidateStateOrThrow.remainingElectionTimeMs(j));
        }
        long binaryExponentialElectionBackoffMs = binaryExponentialElectionBackoffMs(candidateStateOrThrow.retries());
        this.logger.info("Election has timed out, backing off for {}ms before becoming a candidate again", Long.valueOf(binaryExponentialElectionBackoffMs));
        candidateStateOrThrow.startBackingOff(j, binaryExponentialElectionBackoffMs);
        return binaryExponentialElectionBackoffMs;
    }

    private long pollFollower(long j) {
        FollowerState followerStateOrThrow = this.quorum.followerStateOrThrow();
        return this.quorum.isVoter() ? pollFollowerAsVoter(followerStateOrThrow, j) : pollFollowerAsObserver(followerStateOrThrow, j);
    }

    private long pollFollowerAsVoter(FollowerState followerState, long j) {
        long maybeSendFetchToBestNode;
        if (this.shutdown.get() != null) {
            maybeSendFetchToBestNode = 0;
        } else if (followerState.hasFetchTimeoutExpired(j)) {
            this.logger.info("Become candidate due to fetch timeout");
            transitionToCandidate(j);
            maybeSendFetchToBestNode = 0;
        } else if (followerState.hasUpdateVoterPeriodExpired(j)) {
            maybeSendFetchToBestNode = (this.partitionState.lastKraftVersion().isReconfigSupported() && this.partitionState.lastVoterSet().voterNodeNeedsUpdate(this.quorum.localVoterNodeOrThrow())) ? maybeSendUpdateVoterRequest(followerState, j) : maybeSendFetchOrFetchSnapshot(followerState, j);
            followerState.resetUpdateVoterPeriod(j);
        } else {
            maybeSendFetchToBestNode = maybeSendFetchToBestNode(followerState, j);
        }
        return Math.min(maybeSendFetchToBestNode, Math.min(followerState.remainingFetchTimeMs(j), followerState.remainingUpdateVoterPeriodMs(j)));
    }

    private long pollFollowerAsObserver(FollowerState followerState, long j) {
        return followerState.hasFetchTimeoutExpired(j) ? maybeSendFetchToAnyBootstrap(j) : maybeSendFetchToBestNode(followerState, j);
    }

    private long maybeSendFetchToBestNode(FollowerState followerState, long j) {
        long maybeSendFetchToAnyBootstrap;
        Node leaderNode = followerState.leaderNode(this.channel.listenerName());
        if (this.requestManager.hasRequestTimedOut(leaderNode, j)) {
            this.requestManager.reset(leaderNode);
            maybeSendFetchToAnyBootstrap = maybeSendFetchToAnyBootstrap(j);
        } else {
            maybeSendFetchToAnyBootstrap = this.requestManager.isBackingOff(leaderNode, j) ? maybeSendFetchToAnyBootstrap(j) : !this.requestManager.hasAnyInflightRequest(j) ? maybeSendFetchOrFetchSnapshot(followerState, j) : this.requestManager.backoffBeforeAvailableBootstrapServer(j);
        }
        return Math.min(maybeSendFetchToAnyBootstrap, followerState.remainingFetchTimeMs(j));
    }

    private long maybeSendFetchOrFetchSnapshot(FollowerState followerState, long j) {
        Supplier<ApiMessage> supplier;
        if (followerState.fetchingSnapshot().isPresent()) {
            RawSnapshotWriter rawSnapshotWriter = followerState.fetchingSnapshot().get();
            long sizeInBytes = rawSnapshotWriter.sizeInBytes();
            supplier = () -> {
                return buildFetchSnapshotRequest(rawSnapshotWriter.snapshotId(), sizeInBytes);
            };
        } else {
            supplier = this::buildFetchRequest;
        }
        return maybeSendRequest(j, followerState.leaderNode(this.channel.listenerName()), supplier);
    }

    private UpdateRaftVoterRequestData buildUpdateVoterRequest() {
        return RaftUtil.updateVoterRequest(this.clusterId, this.quorum.localReplicaKeyOrThrow(), this.quorum.epoch(), this.localSupportedKRaftVersion, this.localListeners);
    }

    private long maybeSendUpdateVoterRequest(FollowerState followerState, long j) {
        return maybeSendRequest(j, followerState.leaderNode(this.channel.listenerName()), this::buildUpdateVoterRequest);
    }

    private long pollUnattached(long j) {
        UnattachedState unattachedStateOrThrow = this.quorum.unattachedStateOrThrow();
        return this.quorum.isVoter() ? pollUnattachedAsVoter(unattachedStateOrThrow, j) : pollUnattachedCommon(unattachedStateOrThrow, j);
    }

    private long pollUnattachedAsVoter(UnattachedState unattachedState, long j) {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown != null) {
            return gracefulShutdown.remainingTimeMs();
        }
        if (!unattachedState.hasElectionTimeoutExpired(j)) {
            return pollUnattachedCommon(unattachedState, j);
        }
        transitionToCandidate(j);
        return 0L;
    }

    private long pollUnattachedCommon(UnattachedState unattachedState, long j) {
        return Math.min(maybeSendFetchToAnyBootstrap(j), unattachedState.remainingElectionTimeMs(j));
    }

    private long pollCurrentState(long j) {
        if (this.quorum.isLeader()) {
            return pollLeader(j);
        }
        if (this.quorum.isCandidate()) {
            return pollCandidate(j);
        }
        if (this.quorum.isFollower()) {
            return pollFollower(j);
        }
        if (this.quorum.isUnattached()) {
            return pollUnattached(j);
        }
        if (this.quorum.isResigned()) {
            return pollResigned(j);
        }
        throw new IllegalStateException("Unexpected quorum state " + this.quorum);
    }

    private void pollListeners() {
        while (true) {
            Registration<T> poll = this.pendingRegistrations.poll();
            if (poll == null) {
                break;
            } else {
                processRegistration(poll);
            }
        }
        this.quorum.highWatermark().ifPresent(logOffsetMetadata -> {
            updateListenersProgress(logOffsetMetadata.offset());
        });
        Optional<LeaderState<T>> maybeLeaderState = this.quorum.maybeLeaderState();
        if (maybeLeaderState.isPresent()) {
            maybeFireLeaderChange(maybeLeaderState.get());
        } else {
            if (this.quorum.isResigned()) {
                return;
            }
            maybeFireLeaderChange();
        }
    }

    private void processRegistration(Registration<T> registration) {
        RaftClient.Listener<T> listener = registration.listener();
        if (registration.ops() == Registration.Ops.REGISTER) {
            if (this.listenerContexts.putIfAbsent(listener, new ListenerContext(this, listener, null)) != null) {
                this.logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
                return;
            } else {
                this.logger.info("Registered the listener {}", listenerName(listener));
                return;
            }
        }
        if (this.listenerContexts.remove(listener) == null) {
            this.logger.error("Attempting to remove a listener that doesn't exists: {}", listenerName(listener));
        } else {
            this.logger.info("Unregistered the listener {}", listenerName(listener));
        }
    }

    private boolean maybeCompleteShutdown(long j) {
        KafkaRaftClient<T>.GracefulShutdown gracefulShutdown = this.shutdown.get();
        if (gracefulShutdown == null) {
            return false;
        }
        gracefulShutdown.update(j);
        if (gracefulShutdown.hasTimedOut()) {
            gracefulShutdown.failWithTimeout();
            return true;
        }
        if (!this.quorum.isObserver() && !this.quorum.isOnlyVoter() && !this.quorum.hasRemoteLeader()) {
            return false;
        }
        gracefulShutdown.complete();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeup() {
        this.messageQueue.wakeup();
    }

    public void handle(RaftRequest.Inbound inbound) {
        this.messageQueue.add((RaftMessage) Objects.requireNonNull(inbound));
    }

    public void poll() {
        if (!isInitialized()) {
            throw new IllegalStateException("Replica needs to be initialized before polling");
        }
        long milliseconds = this.time.milliseconds();
        if (maybeCompleteShutdown(milliseconds)) {
            return;
        }
        long min = Math.min(pollCurrentState(milliseconds), this.snapshotCleaner.maybeClean(milliseconds));
        this.kafkaRaftMetrics.updatePollStart(this.time.milliseconds());
        RaftMessage poll = this.messageQueue.poll(min);
        long milliseconds2 = this.time.milliseconds();
        this.kafkaRaftMetrics.updatePollEnd(milliseconds2);
        if (poll != null) {
            handleInboundMessage(poll, milliseconds2);
        }
        pollListeners();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public long prepareAppend(int i, List<T> list) {
        return append(i, list);
    }

    private long append(int i, List<T> list) {
        if (!isInitialized()) {
            throw new NotLeaderException("Append failed because the replica is not the current leader");
        }
        BatchAccumulator<T> accumulator = this.quorum.maybeLeaderState().orElseThrow(() -> {
            return new NotLeaderException("Append failed because the replica is not the current leader");
        }).accumulator();
        boolean isEmpty = accumulator.isEmpty();
        long append = accumulator.append(i, list, true);
        if (isEmpty || accumulator.needsDrain(this.time.milliseconds())) {
            wakeup();
        }
        return append;
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void schedulePreparedAppend() {
        if (!isInitialized()) {
            throw new NotLeaderException("Flush failed because the replica is not the current leader");
        }
        LeaderState<T> orElseThrow = this.quorum.maybeLeaderState().orElseThrow(() -> {
            return new NotLeaderException("Flush failed because the replica is not the current leader");
        });
        orElseThrow.accumulator().allowDrain();
        if (orElseThrow.accumulator().needsDrain(this.time.milliseconds())) {
            wakeup();
        }
    }

    @Override // org.apache.kafka.raft.RaftClient
    public CompletableFuture<Void> shutdown(int i) {
        this.logger.info("Beginning graceful shutdown");
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.shutdown.set(new GracefulShutdown(i, completableFuture));
        wakeup();
        return completableFuture;
    }

    @Override // org.apache.kafka.raft.RaftClient
    public void resign(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + i);
        }
        if (!isInitialized()) {
            throw new IllegalStateException("Replica needs to be initialized before resigning");
        }
        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
        int epoch = leaderAndEpoch.epoch();
        if (i > epoch) {
            throw new IllegalArgumentException("Attempt to resign from epoch " + i + " which is larger than the current epoch " + epoch);
        }
        if (i < epoch) {
            this.logger.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", Integer.valueOf(i), Integer.valueOf(epoch));
            return;
        }
        if (!leaderAndEpoch.isLeader(this.quorum.localIdOrThrow())) {
            throw new IllegalArgumentException("Cannot resign from epoch " + i + " since we are not the leader");
        }
        Optional<LeaderState<T>> maybeLeaderState = this.quorum.maybeLeaderState();
        if (!maybeLeaderState.isPresent()) {
            this.logger.debug("Ignoring call to resign from epoch {} since this node is no longer the leader", Integer.valueOf(i));
            return;
        }
        LeaderState<T> leaderState = maybeLeaderState.get();
        if (leaderState.epoch() != i) {
            this.logger.debug("Ignoring call to resign from epoch {} since it is smaller than the current epoch {}", Integer.valueOf(i), Integer.valueOf(leaderState.epoch()));
            return;
        }
        this.logger.info("Received user request to resign from the current epoch {}", Integer.valueOf(epoch));
        leaderState.requestResign();
        wakeup();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public Optional<SnapshotWriter<T>> createSnapshot(OffsetAndEpoch offsetAndEpoch, long j) {
        if (isInitialized()) {
            return (Optional<SnapshotWriter<T>>) this.log.createNewSnapshot(offsetAndEpoch).map(rawSnapshotWriter -> {
                long offset = offsetAndEpoch.offset() - 1;
                return new RecordsSnapshotWriter.Builder().setLastContainedLogTimestamp(j).setTime(this.time).setMaxBatchSize(8388608).setMemoryPool(this.memoryPool).setRawSnapshotWriter(new NotifyingRawSnapshotWriter(rawSnapshotWriter, offsetAndEpoch2 -> {
                    this.partitionState.truncateOldEntries(offsetAndEpoch2.offset());
                })).setKraftVersion(this.partitionState.kraftVersionAtOffset(offset)).setVoterSet(this.partitionState.voterSetAtOffset(offset)).build(this.serde);
            });
        }
        throw new IllegalStateException("Cannot create snapshot before the replica has been initialized");
    }

    @Override // org.apache.kafka.raft.RaftClient
    public Optional<OffsetAndEpoch> latestSnapshotId() {
        return this.log.latestSnapshotId();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public long logEndOffset() {
        return this.log.endOffset().offset();
    }

    @Override // org.apache.kafka.raft.RaftClient
    public KRaftVersion kraftVersion() {
        return this.partitionState.lastKraftVersion();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.log.flush(true);
        if (this.kafkaRaftMetrics != null) {
            this.kafkaRaftMetrics.close();
        }
        if (this.memoryPool instanceof BatchMemoryPool) {
            ((BatchMemoryPool) this.memoryPool).releaseRetained();
        }
    }

    @Override // org.apache.kafka.raft.RaftClient
    public OptionalLong highWatermark() {
        return (isInitialized() && this.quorum.highWatermark().isPresent()) ? OptionalLong.of(this.quorum.highWatermark().get().offset()) : OptionalLong.empty();
    }

    public Optional<Node> voterNode(int i, ListenerName listenerName) {
        return this.partitionState.lastVoterSet().voterNode(i, listenerName);
    }

    QuorumState quorum() {
        return this.quorum;
    }

    private boolean isInitialized() {
        return (this.partitionState == null || this.quorum == null || this.requestManager == null || this.kafkaRaftMetrics == null) ? false : true;
    }
}
