package io.confluent.kafka.replication.push;

import io.confluent.kafka.replication.push.buffer.BufferingAppendRecordsBuilder;
import io.confluent.kafka.replication.push.buffer.BufferingPartitionDataBuilder;
import io.confluent.kafka.replication.push.buffer.PushReplicationEvent;
import io.confluent.kafka.replication.push.buffer.RefCountingMemoryTracker;
import io.confluent.kafka.replication.push.metrics.PushReplicationManagerMetrics;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.AppendRecordsRequestData;
import org.apache.kafka.common.message.AppendRecordsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AppendRecordsRequest;
import org.apache.kafka.common.requests.AppendRecordsResponse;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;

/* loaded from: input_file:io/confluent/kafka/replication/push/PusherThread.class */
class PusherThread extends InterBrokerSendThread implements Pusher {
    private final int id;
    private final ReplicationConfig config;
    private final PushReplicationManagerMetrics pushReplicationManagerMetrics;
    private final RefCountingMemoryTracker<MemoryRecords> tracker;
    private final Time time;
    private final int lingerMs;
    private final int maxWaitMs;
    private final int retryTimeoutMs;
    private final Deque<PushReplicationEvent<?>> eventQueue;
    private final Map<PartitionReplicaKey, PushSession> activePushSessions;
    private final StoppingPushSessions stoppingPushSessions;
    private final Map<Integer, BufferingAppendRecordsBuilder> bufferingRequestBuilders;
    private final Map<Integer, Long> latestKnownBrokerEpochs;
    final Map<Integer, Deque<RequestAndCompletionHandler>> pendingRequests;
    private final Map<Integer, Node> replicaNodes;
    final ExecutorService callbackExecutor;

    /* renamed from: io.confluent.kafka.replication.push.PusherThread$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/kafka/replication/push/PusherThread$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type = new int[PushReplicationEvent.Type.values().length];

        static {
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.MEMORY_RECORDS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.HWM_UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.LSO_UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.START_PUSH.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[PushReplicationEvent.Type.STOP_PUSH.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/replication/push/PusherThread$PartitionReplicaKey.class */
    public static final class PartitionReplicaKey {
        private final Uuid topicId;
        private final int partitionId;
        private final int replicaId;

        public PartitionReplicaKey(TopicIdPartition topicIdPartition, int i) {
            this.topicId = topicIdPartition.topicId();
            this.partitionId = topicIdPartition.partitionId();
            this.replicaId = i;
        }

        public PartitionReplicaKey(Uuid uuid, int i, int i2) {
            this.topicId = uuid;
            this.partitionId = i;
            this.replicaId = i2;
        }

        public PartitionReplicaKey(PushReplicationEvent<?> pushReplicationEvent) {
            this(pushReplicationEvent.topicIdPartition(), pushReplicationEvent.replicaId());
        }

        public int hashCode() {
            return Objects.hash(this.topicId, Integer.valueOf(this.partitionId), Integer.valueOf(this.replicaId));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof PartitionReplicaKey)) {
                return false;
            }
            PartitionReplicaKey partitionReplicaKey = (PartitionReplicaKey) obj;
            return this.topicId.equals(partitionReplicaKey.topicId) && this.partitionId == partitionReplicaKey.partitionId && this.replicaId == partitionReplicaKey.replicaId;
        }

        public String toString() {
            return "PartitionReplicaKey{topicId=" + this.topicId + ", partitionId=" + this.partitionId + ", replicaId=" + this.replicaId + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/replication/push/PusherThread$StoppingPushSessions.class */
    public static final class StoppingPushSessions {
        private final Map<PartitionReplicaKey, PushSession> stoppingPushSessions = new HashMap();
        private volatile int size = 0;

        StoppingPushSessions() {
        }

        PushSession get(PartitionReplicaKey partitionReplicaKey) {
            return this.stoppingPushSessions.get(partitionReplicaKey);
        }

        PushSession put(PartitionReplicaKey partitionReplicaKey, PushSession pushSession) {
            PushSession put = this.stoppingPushSessions.put(partitionReplicaKey, pushSession);
            this.size = this.stoppingPushSessions.size();
            return put;
        }

        PushSession remove(PartitionReplicaKey partitionReplicaKey) {
            PushSession remove = this.stoppingPushSessions.remove(partitionReplicaKey);
            this.size = this.stoppingPushSessions.size();
            return remove;
        }

        void clear() {
            this.stoppingPushSessions.clear();
            this.size = 0;
        }

        int size() {
            return this.size;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static PusherThread newPusher(int i, ReplicationConfig replicationConfig, PushReplicationManagerMetrics pushReplicationManagerMetrics, KafkaClient kafkaClient, RefCountingMemoryTracker<MemoryRecords> refCountingMemoryTracker, Time time) {
        return new PusherThread(String.format("Pusher-thread-%s", Integer.valueOf(i)), i, replicationConfig, pushReplicationManagerMetrics, kafkaClient, refCountingMemoryTracker, time);
    }

    PusherThread(String str, int i, ReplicationConfig replicationConfig, PushReplicationManagerMetrics pushReplicationManagerMetrics, KafkaClient kafkaClient, RefCountingMemoryTracker<MemoryRecords> refCountingMemoryTracker, Time time) {
        super(str, kafkaClient, replicationConfig.requestTimeoutMs(), time);
        this.id = i;
        this.config = replicationConfig;
        int maxInFlightRequests = replicationConfig.maxInFlightRequests();
        if (maxInFlightRequests != 1) {
            this.log.warn("Max in-flight requests for push replication configured to be '{}', but only '1' is currently supported", Integer.valueOf(maxInFlightRequests));
        }
        this.tracker = refCountingMemoryTracker;
        this.time = time;
        this.lingerMs = replicationConfig.lingerMs();
        this.retryTimeoutMs = replicationConfig.retryTimeoutMs();
        this.maxWaitMs = replicationConfig.maxWaitMs();
        this.eventQueue = new ConcurrentLinkedDeque();
        this.activePushSessions = new HashMap();
        this.stoppingPushSessions = new StoppingPushSessions();
        this.bufferingRequestBuilders = new HashMap();
        this.latestKnownBrokerEpochs = new HashMap();
        this.pendingRequests = new HashMap();
        this.replicaNodes = new HashMap();
        this.callbackExecutor = Executors.newSingleThreadExecutor();
        this.pushReplicationManagerMetrics = pushReplicationManagerMetrics;
        PushReplicationManagerMetrics pushReplicationManagerMetrics2 = this.pushReplicationManagerMetrics;
        StoppingPushSessions stoppingPushSessions = this.stoppingPushSessions;
        stoppingPushSessions.getClass();
        pushReplicationManagerMetrics2.registerStoppingPushSessionSupplier(stoppingPushSessions::size);
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public int id() {
        return this.id;
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public void startPush(TopicIdPartition topicIdPartition, PushSession pushSession) {
        enqueue(PushReplicationEvent.forStartPush(topicIdPartition, pushSession, this.time.hiResClockMs()));
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public void onLeaderAppend(TopicIdPartition topicIdPartition, int i, long j, AbstractRecords abstractRecords) {
        enqueue(PushReplicationEvent.forRecords(topicIdPartition, i, abstractRecords, j, this.time.hiResClockMs()));
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public void onHighWatermarkUpdate(TopicIdPartition topicIdPartition, int i, long j) {
        enqueue(PushReplicationEvent.forHighWatermarkUpdate(topicIdPartition, i, j, this.time.hiResClockMs()));
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public void onLogStartOffsetUpdate(TopicIdPartition topicIdPartition, int i, long j) {
        enqueue(PushReplicationEvent.forLogStartOffsetUpdate(topicIdPartition, i, j, this.time.hiResClockMs()));
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public void stopPush(TopicIdPartition topicIdPartition, int i, PushSessionEndReason pushSessionEndReason) {
        enqueueStopPushEvent(topicIdPartition, i, pushSessionEndReason, false);
    }

    private void enqueueStopPushEvent(TopicIdPartition topicIdPartition, int i, PushSessionEndReason pushSessionEndReason, boolean z) {
        PushReplicationEvent<PushReplicationEvent.StopPayload> forStopPush = PushReplicationEvent.forStopPush(topicIdPartition, i, pushSessionEndReason.sendEndSessionRequest, this.time.hiResClockMs());
        this.pushReplicationManagerMetrics.incrementPushSessionEndCount(pushSessionEndReason);
        if (z) {
            this.eventQueue.addFirst(forStopPush);
        } else {
            enqueue(forStopPush);
        }
    }

    private void enqueue(PushReplicationEvent<?> pushReplicationEvent) {
        this.eventQueue.offer(pushReplicationEvent);
        if (pushReplicationEvent.type().shouldWakeupPusherThread()) {
            wakeup();
        }
    }

    @Override // io.confluent.kafka.replication.push.Pusher
    public void shutdown() {
        try {
            super.shutdown();
            this.activePushSessions.clear();
            this.stoppingPushSessions.clear();
            this.bufferingRequestBuilders.clear();
            this.pendingRequests.clear();
            this.replicaNodes.clear();
            this.eventQueue.clear();
            ThreadUtils.shutdownExecutorServiceQuietly(this.callbackExecutor, 30L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
    }

    public Collection<RequestAndCompletionHandler> generateRequests() {
        while (!this.eventQueue.isEmpty()) {
            PushReplicationEvent<?> poll = this.eventQueue.poll();
            long hiResClockMs = this.time.hiResClockMs();
            if (poll.type().shouldWakeupPusherThread()) {
                this.pushReplicationManagerMetrics.updateEventQueueProcessingTime(hiResClockMs - poll.eventCreatedTimeMs());
            }
            switch (AnonymousClass1.$SwitchMap$io$confluent$kafka$replication$push$buffer$PushReplicationEvent$Type[poll.type().ordinal()]) {
                case 1:
                case 2:
                case 3:
                    processPartitionUpdateEvent(poll);
                    break;
                case ReplicationConfig.MAX_PUSHERS_DEFAULT /* 4 */:
                    processStartEvent(poll);
                    break;
                case 5:
                    processStopEvent(poll);
                    break;
                default:
                    failSessionAfterEventProcessingFailure("Unknown type for push replication event {}", poll);
                    break;
            }
            if (!this.pendingRequests.isEmpty()) {
                return getRequestsToSend();
            }
        }
        return getRequestsToSend();
    }

    protected long sendRequests(long j, long j2) {
        long min = Math.min(this.maxWaitMs, super.sendRequests(j, j2));
        if (this.lingerMs != 0 && !this.bufferingRequestBuilders.isEmpty()) {
            min = Math.min(min, this.lingerMs);
        }
        return min;
    }

    private boolean hasCreatedRequests(Node node) {
        return hasPendingRequests(node) || hasInFlightRequests(node);
    }

    private boolean hasPendingRequests(Node node) {
        Deque<RequestAndCompletionHandler> deque = this.pendingRequests.get(Integer.valueOf(node.id()));
        return (deque == null || deque.isEmpty()) ? false : true;
    }

    private void processPartitionUpdateEvent(PushReplicationEvent<?> pushReplicationEvent) {
        PartitionReplicaKey partitionReplicaKey = new PartitionReplicaKey(pushReplicationEvent);
        PushSession pushSession = this.activePushSessions.get(partitionReplicaKey);
        if (pushSession != null) {
            processAndMaybeCreateRequest(pushReplicationEvent, pushSession);
        } else {
            this.log.debug("Cannot process push replication event {} for partition replica {}; no active push session found", pushReplicationEvent, partitionReplicaKey);
            countDownRecords(pushReplicationEvent);
        }
    }

    private void countDownRecords(PushReplicationEvent<?> pushReplicationEvent) {
        if (pushReplicationEvent.type() == PushReplicationEvent.Type.MEMORY_RECORDS) {
            this.tracker.countDown(((PushReplicationEvent.RecordsPayload) pushReplicationEvent.payload()).records());
        }
    }

    private void countDownRecords(AppendRecordsRequestData.PartitionData partitionData) {
        BufferingPartitionDataBuilder.MultiMemoryRecords records = partitionData.records();
        if (records instanceof BufferingPartitionDataBuilder.MultiMemoryRecords) {
            Iterator<MemoryRecords> it = records.memoryRecords().iterator();
            while (it.hasNext()) {
                this.tracker.countDown(it.next());
            }
        }
    }

    private void processStartEvent(PushReplicationEvent<PushSession> pushReplicationEvent) {
        PushSession payload = pushReplicationEvent.payload();
        PartitionReplicaKey partitionReplicaKey = new PartitionReplicaKey(pushReplicationEvent);
        PushSession pushSession = this.activePushSessions.get(partitionReplicaKey);
        if (pushSession != null) {
            this.log.error("Cannot start push session {} due to start event {}; already found an active session {}", new Object[]{payload, pushReplicationEvent, pushSession});
            ExecutorService executorService = this.callbackExecutor;
            payload.getClass();
            executorService.submit(payload::onPushSessionEnded);
            return;
        }
        if (!maybeAddPushSessionAndUpdateBrokerEpoch(partitionReplicaKey, payload)) {
            ExecutorService executorService2 = this.callbackExecutor;
            payload.getClass();
            executorService2.submit(payload::onPushSessionEnded);
        } else {
            this.log.info("Leader started push session {} due to start event {}", payload, pushReplicationEvent);
            PushSession remove = this.stoppingPushSessions.remove(partitionReplicaKey);
            if (remove != null) {
                this.log.debug("Removed stopping push session {} for partition replica {} due to new push session {}", new Object[]{remove, partitionReplicaKey, payload});
            }
        }
    }

    private boolean maybeAddPushSessionAndUpdateBrokerEpoch(PartitionReplicaKey partitionReplicaKey, PushSession pushSession) {
        int i = partitionReplicaKey.replicaId;
        long replicaEpoch = pushSession.replicaEpoch();
        long longValue = this.latestKnownBrokerEpochs.getOrDefault(Integer.valueOf(i), -1L).longValue();
        if (replicaEpoch < longValue) {
            this.log.info("New push session {} ignored due to stale broker epoch. Latest known broker epoch is {}.", pushSession, Long.valueOf(longValue));
            return false;
        }
        if (replicaEpoch > longValue) {
            this.latestKnownBrokerEpochs.put(Integer.valueOf(i), Long.valueOf(replicaEpoch));
            BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = this.bufferingRequestBuilders.get(Integer.valueOf(i));
            if (bufferingAppendRecordsBuilder != null && bufferingAppendRecordsBuilder.destinationBrokerEpoch() < replicaEpoch) {
                bufferingAppendRecordsBuilder.clear();
                this.bufferingRequestBuilders.remove(Integer.valueOf(i));
            }
        }
        this.replicaNodes.put(Integer.valueOf(pushSession.replicaNode().id()), pushSession.replicaNode());
        this.activePushSessions.put(partitionReplicaKey, pushSession);
        return true;
    }

    private void processStopEvent(PushReplicationEvent<PushReplicationEvent.StopPayload> pushReplicationEvent) {
        PartitionReplicaKey partitionReplicaKey = new PartitionReplicaKey(pushReplicationEvent);
        PushSession remove = this.activePushSessions.remove(partitionReplicaKey);
        if (remove == null) {
            this.log.debug("Cannot stop push session due to stop event {}; no active session found", pushReplicationEvent);
        } else {
            this.log.info("Leader stopped push session {} due to stop event {}", remove, pushReplicationEvent);
            ExecutorService executorService = this.callbackExecutor;
            remove.getClass();
            executorService.submit(remove::onPushSessionEnded);
            processAndMaybeCreateRequest(pushReplicationEvent, remove);
        }
        PushSession pushSession = null;
        boolean sendEndSessionRequest = pushReplicationEvent.payload().sendEndSessionRequest();
        if (sendEndSessionRequest && remove != null) {
            pushSession = this.stoppingPushSessions.put(partitionReplicaKey, remove);
        } else if (!sendEndSessionRequest) {
            pushSession = this.stoppingPushSessions.remove(partitionReplicaKey);
            if (pushSession != null) {
                processAndMaybeCreateRequest(pushReplicationEvent, pushSession);
            }
        }
        if (pushSession != null) {
            this.log.info("Removed older stopping push session {} for partition replica {} after a stop event {} for a newer push session {}", new Object[]{pushSession, partitionReplicaKey, pushReplicationEvent, remove});
        }
    }

    private void processAndMaybeCreateRequest(PushReplicationEvent<?> pushReplicationEvent, PushSession pushSession) {
        int replicaId = pushReplicationEvent.replicaId();
        long replicaEpoch = pushSession.replicaEpoch();
        if (replicaEpoch != this.latestKnownBrokerEpochs.getOrDefault(Integer.valueOf(replicaId), -1L).longValue()) {
            this.log.info("Push replication event {} for push session {} ignored due to stale broker epoch. Latest known broker epoch is {}", new Object[]{pushReplicationEvent, pushSession, Long.valueOf(replicaEpoch)});
            countDownRecords(pushReplicationEvent);
            return;
        }
        BufferingAppendRecordsBuilder orCreateBuilder = getOrCreateBuilder(replicaId, replicaEpoch);
        if (orCreateBuilder.processEvent(pushReplicationEvent, pushSession)) {
            return;
        }
        this.bufferingRequestBuilders.remove(Integer.valueOf(replicaId));
        addPendingRequest(orCreateBuilder);
        if (getOrCreateBuilder(replicaId, replicaEpoch).processEvent(pushReplicationEvent, pushSession)) {
            return;
        }
        failSessionAfterEventProcessingFailure("Push replication event {} could not be consumed", pushReplicationEvent);
    }

    private void failSessionAfterEventProcessingFailure(String str, PushReplicationEvent<?> pushReplicationEvent) {
        this.log.error(str, pushReplicationEvent);
        PushReplicationEvent<PushReplicationEvent.StopPayload> forStopPush = PushReplicationEvent.forStopPush(pushReplicationEvent.topicIdPartition(), pushReplicationEvent.replicaId(), PushSessionEndReason.LEADER_REPLICATION_ERROR.sendEndSessionRequest, this.time.hiResClockMs());
        this.pushReplicationManagerMetrics.incrementPushSessionEndCount(PushSessionEndReason.LEADER_REPLICATION_ERROR);
        processStopEvent(forStopPush);
        this.pushReplicationManagerMetrics.incrementEventProcessingFailureCount();
    }

    private void failSessionAfterResponseFailure(PartitionReplicaKey partitionReplicaKey, PushSessionEndReason pushSessionEndReason) {
        enqueueStopPushEvent(new TopicIdPartition(partitionReplicaKey.topicId, partitionReplicaKey.partitionId), partitionReplicaKey.replicaId, pushSessionEndReason, true);
    }

    private Collection<RequestAndCompletionHandler> getRequestsToSend() {
        Node node;
        Iterator<Map.Entry<Integer, BufferingAppendRecordsBuilder>> it = this.bufferingRequestBuilders.entrySet().iterator();
        while (it.hasNext()) {
            BufferingAppendRecordsBuilder value = it.next().getValue();
            if (value.isRequestReady() && (node = this.replicaNodes.get(Integer.valueOf(value.destinationBrokerId()))) != null && !hasCreatedRequests(node)) {
                it.remove();
                addPendingRequest(value);
            }
        }
        if (this.pendingRequests.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Integer, Deque<RequestAndCompletionHandler>>> it2 = this.pendingRequests.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<Integer, Deque<RequestAndCompletionHandler>> next = it2.next();
            Deque<RequestAndCompletionHandler> value2 = next.getValue();
            Node node2 = this.replicaNodes.get(Integer.valueOf(next.getKey().intValue()));
            if (!hasInFlightRequests(node2)) {
                RequestAndCompletionHandler requestAndCompletionHandler = (RequestAndCompletionHandler) Objects.requireNonNull(value2.poll());
                if (value2.isEmpty()) {
                    it2.remove();
                }
                if (!node2.equals(requestAndCompletionHandler.destination)) {
                    requestAndCompletionHandler = new RequestAndCompletionHandler(requestAndCompletionHandler.creationTimeMs, node2, requestAndCompletionHandler.request, requestAndCompletionHandler.handler);
                }
                arrayList.add(requestAndCompletionHandler);
            }
        }
        return arrayList;
    }

    private BufferingAppendRecordsBuilder getOrCreateBuilder(int i, long j) {
        return this.bufferingRequestBuilders.computeIfAbsent(Integer.valueOf(i), num -> {
            return new BufferingAppendRecordsBuilder(num.intValue(), j, this.config, this.tracker, this.time);
        });
    }

    private void addPendingRequest(BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder) {
        int destinationBrokerId = bufferingAppendRecordsBuilder.destinationBrokerId();
        long milliseconds = this.time.milliseconds();
        AppendRecordsRequestData build = bufferingAppendRecordsBuilder.build();
        this.pendingRequests.computeIfAbsent(Integer.valueOf(destinationBrokerId), num -> {
            return new ArrayDeque();
        }).addLast(new RequestAndCompletionHandler(milliseconds, this.replicaNodes.get(Integer.valueOf(destinationBrokerId)), new AppendRecordsRequest.Builder(build), clientResponse -> {
            handleAppendRecordsResponse(milliseconds, destinationBrokerId, build, clientResponse);
        }));
        if (bufferingAppendRecordsBuilder.totalSizeInBytes() > 0) {
            this.pushReplicationManagerMetrics.recordAppendRecordsRequest(build);
        }
    }

    void handleAppendRecordsResponse(long j, int i, AppendRecordsRequestData appendRecordsRequestData, ClientResponse clientResponse) {
        AuthenticationException authenticationException = null;
        AppendRecordsResponse appendRecordsResponse = null;
        if (clientResponse.authenticationException() != null) {
            authenticationException = clientResponse.authenticationException();
        } else if (clientResponse.wasDisconnected()) {
            authenticationException = clientResponse.wasTimedOut() ? new TimeoutException() : new NetworkException();
        } else if (clientResponse.versionMismatch() != null) {
            authenticationException = clientResponse.versionMismatch();
        } else {
            appendRecordsResponse = (AppendRecordsResponse) clientResponse.responseBody();
            if (Errors.NONE.code() != appendRecordsResponse.data().errorCode()) {
                authenticationException = Errors.forCode(appendRecordsResponse.data().errorCode()).exception();
            }
        }
        long longValue = this.latestKnownBrokerEpochs.getOrDefault(Integer.valueOf(i), -1L).longValue();
        boolean z = appendRecordsRequestData.replicaEpoch() < longValue;
        long max = Math.max(0L, this.time.milliseconds() - j);
        boolean z2 = authenticationException instanceof RetriableException;
        if (z) {
            this.log.info("AppendRecords request for broker {} failed due to stale replica epoch {} with latest known replica epoch {}", new Object[]{Integer.valueOf(i), Long.valueOf(appendRecordsRequestData.replicaEpoch()), Long.valueOf(longValue)});
            handleFailedRequest(appendRecordsRequestData, i, false);
        } else if (z2 && max < this.retryTimeoutMs) {
            retryRequest(j, i, appendRecordsRequestData);
        } else if (authenticationException == null) {
            processResponseData(appendRecordsResponse.data(), i, processRequestAndCollectActiveSessions(appendRecordsRequestData, i));
        } else {
            this.log.info("AppendRecords request for broker {} failed due to error {} after {} ms", new Object[]{Integer.valueOf(i), authenticationException, Long.valueOf(max)});
            handleFailedRequest(appendRecordsRequestData, i, z2);
        }
    }

    private void retryRequest(long j, int i, AppendRecordsRequestData appendRecordsRequestData) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Retrying AppendRecords request {} with creation timestamp {} to broker {}", new Object[]{appendRecordsRequestData, Long.valueOf(j), Integer.valueOf(i)});
        }
        this.pendingRequests.computeIfAbsent(Integer.valueOf(i), num -> {
            return new ArrayDeque();
        }).addFirst(new RequestAndCompletionHandler(j, this.replicaNodes.get(Integer.valueOf(i)), new AppendRecordsRequest.Builder(appendRecordsRequestData), clientResponse -> {
            handleAppendRecordsResponse(j, i, appendRecordsRequestData, clientResponse);
        }));
    }

    private Map<PartitionReplicaKey, PushSession> processRequestAndCollectActiveSessions(AppendRecordsRequestData appendRecordsRequestData, int i) {
        HashMap hashMap = new HashMap();
        long replicaEpoch = appendRecordsRequestData.replicaEpoch();
        for (AppendRecordsRequestData.TopicData topicData : appendRecordsRequestData.topics()) {
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                countDownRecords(partitionData);
                PartitionReplicaKey partitionReplicaKey = new PartitionReplicaKey(topicData.topicId(), partitionData.partitionIndex(), i);
                if (partitionData.endReplicationSession()) {
                    PushSession pushSession = this.stoppingPushSessions.get(partitionReplicaKey);
                    if (pushSession == null || !pushSession.matchesRequestData(replicaEpoch, partitionData)) {
                        this.log.info("Stopping push session {} doesn't match AppendRecords request data {} for {}", new Object[]{pushSession, partitionData, partitionReplicaKey});
                    } else {
                        this.log.info("Follower stopped push session {} for {}", pushSession, partitionReplicaKey);
                        this.stoppingPushSessions.remove(partitionReplicaKey);
                    }
                } else {
                    PushSession pushSession2 = this.activePushSessions.get(partitionReplicaKey);
                    if (pushSession2 == null || !pushSession2.matchesRequestData(replicaEpoch, partitionData)) {
                        this.log.debug("Active push session {} doesn't match AppendRecords request data {} for {}", new Object[]{pushSession2, partitionData, partitionReplicaKey});
                    } else {
                        hashMap.put(partitionReplicaKey, pushSession2);
                    }
                }
            }
        }
        return hashMap;
    }

    private void handleFailedRequest(AppendRecordsRequestData appendRecordsRequestData, int i, boolean z) {
        long replicaEpoch = appendRecordsRequestData.replicaEpoch();
        ArrayList arrayList = new ArrayList(0);
        for (AppendRecordsRequestData.TopicData topicData : appendRecordsRequestData.topics()) {
            AppendRecordsRequestData.TopicData topicData2 = null;
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                countDownRecords(partitionData);
                PartitionReplicaKey partitionReplicaKey = new PartitionReplicaKey(topicData.topicId(), partitionData.partitionIndex(), i);
                if (partitionData.endReplicationSession()) {
                    PushSession pushSession = this.stoppingPushSessions.get(partitionReplicaKey);
                    if (pushSession == null || !pushSession.matchesRequestData(replicaEpoch, partitionData)) {
                        this.log.info("Stopping push session {} for {} did not match session in request {}", new Object[]{pushSession, partitionReplicaKey, partitionData});
                    } else if (z) {
                        if (topicData2 == null) {
                            topicData2 = new AppendRecordsRequestData.TopicData().setTopicId(topicData.topicId());
                            arrayList.add(topicData2);
                        }
                        topicData2.partitions().add(partitionData);
                        this.log.debug("Retrying stopping push session {} for {}", pushSession, partitionReplicaKey);
                    } else {
                        this.log.info("Stopping push session {} for {} failed due to non-retriable error", pushSession, partitionReplicaKey);
                        this.stoppingPushSessions.remove(partitionReplicaKey);
                    }
                } else {
                    PushSession pushSession2 = this.activePushSessions.get(partitionReplicaKey);
                    if (pushSession2 == null || !pushSession2.matchesRequestData(replicaEpoch, partitionData)) {
                        this.log.debug("Active push session {} for {} did not match session in request {}", new Object[]{pushSession2, partitionReplicaKey, partitionData});
                    } else {
                        failSessionAfterResponseFailure(partitionReplicaKey, z ? PushSessionEndReason.REQUEST_RETRIABLE_ERROR : PushSessionEndReason.REQUEST_NON_RETRIABLE_ERROR);
                    }
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        retryRequest(this.time.milliseconds(), i, new AppendRecordsRequestData().setReplicaEpoch(replicaEpoch).setTopics(arrayList));
    }

    private void processResponseData(AppendRecordsResponseData appendRecordsResponseData, int i, Map<PartitionReplicaKey, PushSession> map) {
        for (AppendRecordsResponseData.TopicData topicData : appendRecordsResponseData.topics()) {
            for (AppendRecordsResponseData.PartitionData partitionData : topicData.partitions()) {
                PartitionReplicaKey partitionReplicaKey = new PartitionReplicaKey(topicData.topicId(), partitionData.partitionIndex(), i);
                PushSession remove = map.remove(partitionReplicaKey);
                if (remove != null) {
                    if (partitionData.errorCode() != Errors.NONE.code()) {
                        this.log.info("Received AppendRecords response error {} for partition replica {}", Errors.forCode(partitionData.errorCode()), partitionReplicaKey);
                        failSessionAfterResponseFailure(partitionReplicaKey, PushSessionEndReason.REQUEST_NON_RETRIABLE_ERROR);
                    } else {
                        this.callbackExecutor.submit(() -> {
                            remove.onAppendRecordsResponse(partitionData.logEndOffset(), partitionData.logStartOffset());
                        });
                    }
                }
            }
        }
        if (map.isEmpty()) {
            return;
        }
        this.log.error("AppendRecords response {} did not contain a response for all partitions in request. Responses were missing for {}", appendRecordsResponseData, map);
    }
}
