/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.replication.push;

import io.confluent.kafka.replication.push.PushSession;
import io.confluent.kafka.replication.push.PushSessionEndReason;
import io.confluent.kafka.replication.push.Pusher;
import io.confluent.kafka.replication.push.ReplicationConfig;
import io.confluent.kafka.replication.push.buffer.BufferingAppendRecordsBuilder;
import io.confluent.kafka.replication.push.buffer.BufferingPartitionDataBuilder;
import io.confluent.kafka.replication.push.buffer.PushReplicationEvent;
import io.confluent.kafka.replication.push.buffer.RefCountingMemoryTracker;
import io.confluent.kafka.replication.push.metrics.PushReplicationManagerMetrics;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.AppendRecordsRequestData;
import org.apache.kafka.common.message.AppendRecordsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AppendRecordsRequest;
import org.apache.kafka.common.requests.AppendRecordsResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;

class PusherThread
extends InterBrokerSendThread
implements Pusher {
    private final int id;
    private final ReplicationConfig config;
    final PushReplicationManagerMetrics pushReplicationManagerMetrics;
    private final RefCountingMemoryTracker<MemoryRecords> tracker;
    private final Time time;
    private final int lingerMs;
    private final int maxWaitMs;
    private final int retryTimeoutMs;
    private final Deque<PushReplicationEvent<?>> eventQueue;
    private final Map<PartitionReplicaKey, PushSession> activePushSessions;
    private final StoppingPushSessions stoppingPushSessions;
    private final Map<Integer, BufferingAppendRecordsBuilder> bufferingRequestBuilders;
    private final Map<Integer, Long> latestKnownBrokerEpochs;
    final Map<Integer, Deque<RequestAndCompletionHandler>> pendingRequests;
    private final Map<Integer, Node> replicaNodes;
    private final Executor callbackExecutor;

    static PusherThread newPusher(int pusherId, ReplicationConfig config, PushReplicationManagerMetrics pushReplicationManagerMetrics, KafkaClient networkClient, RefCountingMemoryTracker<MemoryRecords> tracker, Time time, Executor callbackExecutor) {
        String name = String.format("Pusher-thread-%s", pusherId);
        return new PusherThread(name, pusherId, config, pushReplicationManagerMetrics, networkClient, tracker, time, callbackExecutor);
    }

    PusherThread(String name, int id, ReplicationConfig config, PushReplicationManagerMetrics pushReplicationManagerMetrics, KafkaClient networkClient, RefCountingMemoryTracker<MemoryRecords> tracker, Time time, Executor callbackExecutor) {
        super(name, networkClient, config.requestTimeoutMs(), time);
        this.id = id;
        this.config = config;
        int maxInFlightRequests = config.maxInFlightRequests();
        if (maxInFlightRequests != 1) {
            this.log.warn("Max in-flight requests for push replication configured to be '{}', but only '1' is currently supported", (Object)maxInFlightRequests);
        }
        this.tracker = tracker;
        this.time = time;
        this.lingerMs = config.lingerMs();
        this.retryTimeoutMs = config.retryTimeoutMs();
        this.maxWaitMs = config.maxWaitMs();
        this.eventQueue = new ConcurrentLinkedDeque();
        this.activePushSessions = new HashMap<PartitionReplicaKey, PushSession>();
        this.stoppingPushSessions = new StoppingPushSessions();
        this.bufferingRequestBuilders = new HashMap<Integer, BufferingAppendRecordsBuilder>();
        this.latestKnownBrokerEpochs = new HashMap<Integer, Long>();
        this.pendingRequests = new HashMap<Integer, Deque<RequestAndCompletionHandler>>();
        this.replicaNodes = new HashMap<Integer, Node>();
        this.callbackExecutor = callbackExecutor;
        this.pushReplicationManagerMetrics = pushReplicationManagerMetrics;
        this.pushReplicationManagerMetrics.registerStoppingPushSessionSupplier(this.stoppingPushSessions::size);
    }

    @Override
    public int id() {
        return this.id;
    }

    @Override
    public void startPush(TopicIdPartition partition, PushSession pushSession) {
        PushReplicationEvent<PushSession> startPushEvent = PushReplicationEvent.forStartPush(partition, pushSession, this.time.hiResClockMs());
        this.enqueue(startPushEvent);
    }

    @Override
    public void onLeaderAppend(TopicIdPartition partition, int destinationBrokerId, long appendOffset, AbstractRecords records) {
        PushReplicationEvent<PushReplicationEvent.RecordsPayload> leaderAppendEvent = PushReplicationEvent.forRecords(partition, destinationBrokerId, records, appendOffset, this.time.hiResClockMs());
        this.enqueue(leaderAppendEvent);
    }

    @Override
    public void onHighWatermarkUpdate(TopicIdPartition partition, int destinationBrokerId, long highWatermark) {
        PushReplicationEvent<PushReplicationEvent.OffsetPayload> highWatermarkUpdateEvent = PushReplicationEvent.forHighWatermarkUpdate(partition, destinationBrokerId, highWatermark, this.time.hiResClockMs());
        this.enqueue(highWatermarkUpdateEvent);
    }

    @Override
    public void onLogStartOffsetUpdate(TopicIdPartition partition, int destinationBrokerId, long logStartOffset) {
        PushReplicationEvent<PushReplicationEvent.OffsetPayload> logStartOffsetUpdateEvent = PushReplicationEvent.forLogStartOffsetUpdate(partition, destinationBrokerId, logStartOffset, this.time.hiResClockMs());
        this.enqueue(logStartOffsetUpdateEvent);
    }

    @Override
    public void stopPush(TopicIdPartition partition, int destinationBrokerId, PushSessionEndReason pushSessionEndReason) {
        this.enqueueStopPushEvent(partition, destinationBrokerId, pushSessionEndReason, false);
    }

    private void enqueueStopPushEvent(TopicIdPartition partition, int destinationBrokerId, PushSessionEndReason pushSessionEndReason, boolean isInternalEvent) {
        PushReplicationEvent<PushSessionEndReason> stopPushEvent = PushReplicationEvent.forStopPush(partition, destinationBrokerId, pushSessionEndReason, this.time.hiResClockMs());
        this.pushReplicationManagerMetrics.incrementPushSessionEndCount(pushSessionEndReason);
        if (isInternalEvent) {
            this.eventQueue.addFirst(stopPushEvent);
        } else {
            this.enqueue(stopPushEvent);
        }
    }

    private void enqueue(PushReplicationEvent<?> event) {
        this.eventQueue.offer(event);
        if (event.type().shouldWakeupPusherThread()) {
            this.wakeup();
        }
    }

    @Override
    public void shutdown() {
        try {
            super.shutdown();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected void cleanUp() {
        Thread current = Thread.currentThread();
        if (this != current) {
            throw new RuntimeException("cleanUp() called from " + String.valueOf(current) + " instead of the owning thread " + String.valueOf(this));
        }
        this.activePushSessions.clear();
        this.stoppingPushSessions.clear();
        this.bufferingRequestBuilders.clear();
        this.pendingRequests.clear();
        this.replicaNodes.clear();
        this.eventQueue.clear();
    }

    public Collection<RequestAndCompletionHandler> generateRequests() {
        while (!this.eventQueue.isEmpty()) {
            PushReplicationEvent<PushSession> currentEvent = this.eventQueue.poll();
            long eventProcessedTimeMs = this.time.hiResClockMs();
            if (currentEvent.type().shouldWakeupPusherThread()) {
                this.pushReplicationManagerMetrics.updateEventQueueProcessingTime(eventProcessedTimeMs - currentEvent.eventCreatedTimeMs());
            }
            switch (currentEvent.type()) {
                case MEMORY_RECORDS: 
                case HWM_UPDATE: 
                case LSO_UPDATE: {
                    this.processPartitionUpdateEvent(currentEvent);
                    break;
                }
                case START_PUSH: {
                    PushReplicationEvent<PushSession> startEvent = currentEvent;
                    this.processStartEvent(startEvent);
                    break;
                }
                case STOP_PUSH: {
                    PushReplicationEvent<PushSession> stopEvent = currentEvent;
                    this.processStopEvent(stopEvent);
                    break;
                }
                default: {
                    this.failSessionAfterEventProcessingFailure("Unknown type for push replication event {}", currentEvent);
                }
            }
            if (this.pendingRequests.isEmpty()) continue;
            break;
        }
        return this.getRequestsToSend();
    }

    protected long sendRequests(long now, long maxTimeoutMs) {
        long currentTimeoutMs = Math.min((long)this.maxWaitMs, super.sendRequests(now, maxTimeoutMs));
        if (this.lingerMs != 0 && !this.bufferingRequestBuilders.isEmpty()) {
            currentTimeoutMs = Math.min(currentTimeoutMs, (long)this.lingerMs);
        }
        return currentTimeoutMs;
    }

    private boolean hasCreatedRequests(Node node) {
        return this.hasPendingRequests(node) || this.hasInFlightRequests(node);
    }

    private boolean hasPendingRequests(Node node) {
        Deque<RequestAndCompletionHandler> requestsForNode = this.pendingRequests.get(node.id());
        return requestsForNode != null && !requestsForNode.isEmpty();
    }

    private void processPartitionUpdateEvent(PushReplicationEvent<?> event) {
        PartitionReplicaKey key = new PartitionReplicaKey(event);
        PushSession pushSession = this.activePushSessions.get(key);
        if (pushSession == null) {
            this.log.debug("Cannot process push replication event {} for partition replica {}; no active push session found", event, (Object)key);
            this.countDownRecords(event);
            return;
        }
        this.processAndMaybeCreateRequest(event, pushSession);
    }

    private void countDownRecords(PushReplicationEvent<?> event) {
        if (event.type() == PushReplicationEvent.Type.MEMORY_RECORDS) {
            PushReplicationEvent.RecordsPayload payload = (PushReplicationEvent.RecordsPayload)event.payload();
            MemoryRecords memoryRecords = (MemoryRecords)payload.records();
            this.tracker.countDown(memoryRecords);
        }
    }

    private void countDownRecords(AppendRecordsRequestData.PartitionData partitionData) {
        BaseRecords records = partitionData.records();
        if (records instanceof BufferingPartitionDataBuilder.MultiMemoryRecords) {
            for (MemoryRecords memoryRecords : ((BufferingPartitionDataBuilder.MultiMemoryRecords)records).memoryRecords()) {
                this.tracker.countDown(memoryRecords);
            }
        }
    }

    private void processStartEvent(PushReplicationEvent<PushSession> startEvent) {
        PushSession newSession = startEvent.payload();
        PartitionReplicaKey key = new PartitionReplicaKey(startEvent);
        PushSession oldSession = this.activePushSessions.get(key);
        if (oldSession != null) {
            this.log.error("Cannot start push session {} due to start event {}; already found an active session {}", new Object[]{newSession, startEvent, oldSession});
            newSession.onPushSessionEnded();
            return;
        }
        if (!this.maybeAddPushSessionAndUpdateBrokerEpoch(key, newSession)) {
            newSession.onPushSessionEnded();
            return;
        }
        this.log.info("Leader started push session {} due to start event {}", (Object)newSession, startEvent);
        PushSession prevStoppingSession = this.stoppingPushSessions.remove(key);
        if (prevStoppingSession != null) {
            this.log.debug("Removed stopping push session {} for partition replica {} due to new push session {}", new Object[]{prevStoppingSession, key, newSession});
        }
    }

    private boolean maybeAddPushSessionAndUpdateBrokerEpoch(PartitionReplicaKey partitionReplicaKey, PushSession pushSession) {
        long lastKnownBrokerEpoch;
        int replicaId = partitionReplicaKey.replicaId;
        long sessionBrokerEpoch = pushSession.replicaEpoch();
        if (sessionBrokerEpoch < (lastKnownBrokerEpoch = this.latestKnownBrokerEpochs.getOrDefault(replicaId, -1L).longValue())) {
            this.log.info("New push session {} ignored due to stale broker epoch. Latest known broker epoch is {}.", (Object)pushSession, (Object)lastKnownBrokerEpoch);
            return false;
        }
        if (sessionBrokerEpoch > lastKnownBrokerEpoch) {
            this.latestKnownBrokerEpochs.put(replicaId, sessionBrokerEpoch);
            BufferingAppendRecordsBuilder requestBuilder = this.bufferingRequestBuilders.get(replicaId);
            if (requestBuilder != null && requestBuilder.destinationBrokerEpoch() < sessionBrokerEpoch) {
                requestBuilder.clear();
                this.bufferingRequestBuilders.remove(replicaId);
            }
        }
        this.replicaNodes.put(pushSession.replicaNode().id(), pushSession.replicaNode());
        this.activePushSessions.put(partitionReplicaKey, pushSession);
        return true;
    }

    private void processStopEvent(PushReplicationEvent<PushSessionEndReason> stopEvent) {
        PartitionReplicaKey key = new PartitionReplicaKey(stopEvent);
        PushSession pushSession = this.activePushSessions.remove(key);
        if (pushSession == null) {
            this.log.debug("Cannot stop push session due to stop event {}; no active session found", stopEvent);
        } else {
            this.log.info("Leader stopped push session {} due to stop event {}", (Object)pushSession, stopEvent);
            pushSession.onPushSessionEnded();
            this.processAndMaybeCreateRequest(stopEvent, pushSession);
        }
        PushSession prevStoppingSession = null;
        boolean sendEndSession = stopEvent.payload().sendEndSessionRequest;
        if (sendEndSession && pushSession != null) {
            prevStoppingSession = this.stoppingPushSessions.put(key, pushSession);
        } else if (!sendEndSession && (prevStoppingSession = this.stoppingPushSessions.remove(key)) != null) {
            this.processAndMaybeCreateRequest(stopEvent, prevStoppingSession);
        }
        if (prevStoppingSession != null) {
            this.log.info("Removed older stopping push session {} for partition replica {} after a stop event {} for a newer push session {}", new Object[]{prevStoppingSession, key, stopEvent, pushSession});
        }
    }

    private void processAndMaybeCreateRequest(PushReplicationEvent<?> event, PushSession pushSession) {
        int destinationBrokerId = event.replicaId();
        long destinationBrokerEpoch = pushSession.replicaEpoch();
        if (destinationBrokerEpoch == this.latestKnownBrokerEpochs.getOrDefault(destinationBrokerId, -1L)) {
            BufferingAppendRecordsBuilder builder = this.getOrCreateBuilder(destinationBrokerId, destinationBrokerEpoch);
            if (!builder.processEvent(event, pushSession)) {
                this.bufferingRequestBuilders.remove(destinationBrokerId);
                this.addPendingRequest(builder);
                if (!this.getOrCreateBuilder(destinationBrokerId, destinationBrokerEpoch).processEvent(event, pushSession)) {
                    this.failSessionAfterEventProcessingFailure("Push replication event {} could not be consumed", event);
                }
            }
        } else {
            this.log.info("Push replication event {} for push session {} ignored due to stale broker epoch. Latest known broker epoch is {}", new Object[]{event, pushSession, destinationBrokerEpoch});
            this.countDownRecords(event);
        }
    }

    private void failSessionAfterEventProcessingFailure(String errorMessageTemplate, PushReplicationEvent<?> event) {
        this.log.error(errorMessageTemplate, event);
        PushReplicationEvent<PushSessionEndReason> stopEvent = PushReplicationEvent.forStopPush(event.topicIdPartition(), event.replicaId(), PushSessionEndReason.LEADER_REPLICATION_ERROR, this.time.hiResClockMs());
        this.pushReplicationManagerMetrics.incrementPushSessionEndCount(PushSessionEndReason.LEADER_REPLICATION_ERROR);
        this.processStopEvent(stopEvent);
        this.pushReplicationManagerMetrics.incrementEventProcessingFailureCount();
    }

    private void failSessionAfterResponseFailure(PartitionReplicaKey key, PushSessionEndReason pushSessionEndReason) {
        this.enqueueStopPushEvent(new TopicIdPartition(key.topicId, key.partitionId), key.replicaId, pushSessionEndReason, true);
    }

    private Collection<RequestAndCompletionHandler> getRequestsToSend() {
        Iterator<Map.Entry<Integer, BufferingAppendRecordsBuilder>> iterator = this.bufferingRequestBuilders.entrySet().iterator();
        while (iterator.hasNext()) {
            Node destinationNode;
            BufferingAppendRecordsBuilder builder = iterator.next().getValue();
            if (!builder.isRequestReady() || (destinationNode = this.replicaNodes.get(builder.destinationBrokerId())) == null || this.hasCreatedRequests(destinationNode)) continue;
            iterator.remove();
            this.addPendingRequest(builder);
        }
        if (this.pendingRequests.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<RequestAndCompletionHandler> selected = new ArrayList<RequestAndCompletionHandler>();
        Iterator<Map.Entry<Integer, Deque<RequestAndCompletionHandler>>> it = this.pendingRequests.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, Deque<RequestAndCompletionHandler>> item = it.next();
            Deque<RequestAndCompletionHandler> pendingRequestsForNode = item.getValue();
            int destinationBrokerId = item.getKey();
            Node destinationNode = this.replicaNodes.get(destinationBrokerId);
            if (this.hasInFlightRequests(destinationNode)) continue;
            RequestAndCompletionHandler first = Objects.requireNonNull(pendingRequestsForNode.poll());
            if (pendingRequestsForNode.isEmpty()) {
                it.remove();
            }
            if (!destinationNode.equals((Object)first.destination)) {
                first = new RequestAndCompletionHandler(first.creationTimeMs, destinationNode, first.request, first.handler, Optional.empty());
            }
            selected.add(first);
        }
        return selected;
    }

    private BufferingAppendRecordsBuilder getOrCreateBuilder(int destinationBrokerId, long destinationBrokerEpoch) {
        return this.bufferingRequestBuilders.computeIfAbsent(destinationBrokerId, id -> new BufferingAppendRecordsBuilder((int)id, destinationBrokerEpoch, this.config, this.tracker, this.time, this.pushReplicationManagerMetrics));
    }

    private void addPendingRequest(BufferingAppendRecordsBuilder builder) {
        int destinationBrokerId = builder.destinationBrokerId();
        long creationTimeMs = this.time.milliseconds();
        AppendRecordsRequestData requestData = builder.build();
        this.pendingRequests.computeIfAbsent(destinationBrokerId, id -> new ArrayDeque()).addLast(new RequestAndCompletionHandler(creationTimeMs, this.replicaNodes.get(destinationBrokerId), (AbstractRequest.Builder)new AppendRecordsRequest.Builder(requestData), r -> this.handleAppendRecordsResponse(creationTimeMs, destinationBrokerId, requestData, r), Optional.empty()));
    }

    void handleAppendRecordsResponse(long creationTimeMs, int destinationBrokerId, AppendRecordsRequestData requestData, ClientResponse clientResponse) {
        AuthenticationException topLevelError = null;
        AppendRecordsResponse response = null;
        if (clientResponse.authenticationException() != null) {
            topLevelError = clientResponse.authenticationException();
        } else if (clientResponse.wasDisconnected()) {
            topLevelError = clientResponse.wasTimedOut() ? new TimeoutException() : new NetworkException();
        } else if (clientResponse.versionMismatch() != null) {
            topLevelError = clientResponse.versionMismatch();
        } else {
            response = (AppendRecordsResponse)clientResponse.responseBody();
            if (Errors.NONE.code() != response.data().errorCode()) {
                topLevelError = Errors.forCode((short)response.data().errorCode()).exception();
            }
        }
        long lastKnownBrokerEpoch = this.latestKnownBrokerEpochs.getOrDefault(destinationBrokerId, -1L);
        boolean requestHasStaleEpoch = requestData.replicaEpoch() < lastKnownBrokerEpoch;
        long timeElapsedMs = Math.max(0L, this.time.milliseconds() - creationTimeMs);
        boolean isRequestRetriable = topLevelError instanceof RetriableException;
        if (requestHasStaleEpoch) {
            this.log.info("AppendRecords request for broker {} failed due to stale replica epoch {} with latest known replica epoch {}", new Object[]{destinationBrokerId, requestData.replicaEpoch(), lastKnownBrokerEpoch});
            this.handleFailedRequest(requestData, destinationBrokerId, false);
        } else if (isRequestRetriable && timeElapsedMs < (long)this.retryTimeoutMs) {
            this.retryRequest(creationTimeMs, destinationBrokerId, requestData);
        } else if (topLevelError != null) {
            this.log.info("AppendRecords request for broker {} failed due to error {} after {} ms", new Object[]{destinationBrokerId, topLevelError, timeElapsedMs});
            this.handleFailedRequest(requestData, destinationBrokerId, isRequestRetriable);
        } else {
            Map<PartitionReplicaKey, PushSession> activeSessions = this.processRequestAndCollectActiveSessions(requestData, destinationBrokerId);
            this.processResponseData(response.data(), destinationBrokerId, activeSessions);
        }
    }

    private void retryRequest(long creationTimeMs, int destinationBrokerId, AppendRecordsRequestData requestData) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Retrying AppendRecords request {} with creation timestamp {} to broker {}", new Object[]{requestData, creationTimeMs, destinationBrokerId});
        }
        this.pendingRequests.computeIfAbsent(destinationBrokerId, id -> new ArrayDeque()).addFirst(new RequestAndCompletionHandler(creationTimeMs, this.replicaNodes.get(destinationBrokerId), (AbstractRequest.Builder)new AppendRecordsRequest.Builder(requestData), r -> this.handleAppendRecordsResponse(creationTimeMs, destinationBrokerId, requestData, r), Optional.empty()));
    }

    private Map<PartitionReplicaKey, PushSession> processRequestAndCollectActiveSessions(AppendRecordsRequestData requestData, int destinationBrokerId) {
        HashMap<PartitionReplicaKey, PushSession> activeSessions = new HashMap<PartitionReplicaKey, PushSession>();
        long replicaEpoch = requestData.replicaEpoch();
        for (AppendRecordsRequestData.TopicData topicData : requestData.topics()) {
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                this.countDownRecords(partitionData);
                PartitionReplicaKey key = new PartitionReplicaKey(topicData.topicId(), partitionData.partitionIndex(), destinationBrokerId);
                if (partitionData.endReplicationSession()) {
                    PushSession stoppingSession = this.stoppingPushSessions.get(key);
                    if (stoppingSession != null && stoppingSession.matchesRequestData(replicaEpoch, partitionData)) {
                        this.log.info("Follower stopped push session {} for {}", (Object)stoppingSession, (Object)key);
                        this.stoppingPushSessions.remove(key);
                        continue;
                    }
                    this.log.info("Stopping push session {} doesn't match AppendRecords request data {} for {}", new Object[]{stoppingSession, partitionData, key});
                    continue;
                }
                PushSession session = this.activePushSessions.get(key);
                if (session != null && session.matchesRequestData(replicaEpoch, partitionData)) {
                    activeSessions.put(key, session);
                    continue;
                }
                this.log.debug("Active push session {} doesn't match AppendRecords request data {} for {}", new Object[]{session, partitionData, key});
            }
        }
        return activeSessions;
    }

    private void handleFailedRequest(AppendRecordsRequestData requestData, int destinationBrokerId, boolean isRequestRetriable) {
        long replicaEpoch = requestData.replicaEpoch();
        ArrayList<AppendRecordsRequestData.TopicData> endSessionTopics = new ArrayList<AppendRecordsRequestData.TopicData>(0);
        for (AppendRecordsRequestData.TopicData topicData : requestData.topics()) {
            AppendRecordsRequestData.TopicData endSessionTopicData = null;
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                this.countDownRecords(partitionData);
                PartitionReplicaKey key = new PartitionReplicaKey(topicData.topicId(), partitionData.partitionIndex(), destinationBrokerId);
                if (partitionData.endReplicationSession()) {
                    PushSession stoppingSession = this.stoppingPushSessions.get(key);
                    if (stoppingSession != null && stoppingSession.matchesRequestData(replicaEpoch, partitionData)) {
                        if (isRequestRetriable) {
                            if (endSessionTopicData == null) {
                                endSessionTopicData = new AppendRecordsRequestData.TopicData().setTopicId(topicData.topicId());
                                endSessionTopics.add(endSessionTopicData);
                            }
                            endSessionTopicData.partitions().add(partitionData);
                            this.log.debug("Retrying stopping push session {} for {}", (Object)stoppingSession, (Object)key);
                            continue;
                        }
                        this.log.info("Stopping push session {} for {} failed due to non-retriable error", (Object)stoppingSession, (Object)key);
                        this.stoppingPushSessions.remove(key);
                        continue;
                    }
                    this.log.info("Stopping push session {} for {} did not match session in request {}", new Object[]{stoppingSession, key, partitionData});
                    continue;
                }
                PushSession session = this.activePushSessions.get(key);
                if (session != null && session.matchesRequestData(replicaEpoch, partitionData)) {
                    PushSessionEndReason pushSessionEndReason = isRequestRetriable ? PushSessionEndReason.REQUEST_RETRIABLE_ERROR : PushSessionEndReason.REQUEST_NON_RETRIABLE_ERROR;
                    this.failSessionAfterResponseFailure(key, pushSessionEndReason);
                    continue;
                }
                this.log.debug("Active push session {} for {} did not match session in request {}", new Object[]{session, key, partitionData});
            }
        }
        if (!endSessionTopics.isEmpty()) {
            AppendRecordsRequestData endSessionsRequest = new AppendRecordsRequestData().setReplicaEpoch(replicaEpoch).setTopics(endSessionTopics);
            this.retryRequest(this.time.milliseconds(), destinationBrokerId, endSessionsRequest);
        }
    }

    private void processResponseData(AppendRecordsResponseData responseData, int destinationBrokerId, Map<PartitionReplicaKey, PushSession> activeSessions) {
        ArrayList<PushSession> sessionsWithHwmOrLwmUpdated = new ArrayList<PushSession>();
        for (AppendRecordsResponseData.TopicData topicData : responseData.topics()) {
            for (AppendRecordsResponseData.PartitionData partitionData : topicData.partitions()) {
                PartitionReplicaKey key = new PartitionReplicaKey(topicData.topicId(), partitionData.partitionIndex(), destinationBrokerId);
                PushSession activeSession = activeSessions.remove(key);
                if (activeSession == null) continue;
                if (partitionData.errorCode() != Errors.NONE.code()) {
                    this.log.info("Received AppendRecords response error {} for partition replica {}", (Object)Errors.forCode((short)partitionData.errorCode()), (Object)key);
                    this.failSessionAfterResponseFailure(key, PushSessionEndReason.REQUEST_NON_RETRIABLE_ERROR);
                    continue;
                }
                boolean hwmOrLwmUpdated = activeSession.onAppendRecordsResponse(partitionData.logEndOffset(), partitionData.logStartOffset());
                if (!hwmOrLwmUpdated) continue;
                sessionsWithHwmOrLwmUpdated.add(activeSession);
            }
        }
        if (!sessionsWithHwmOrLwmUpdated.isEmpty()) {
            this.callbackExecutor.execute(() -> sessionsWithHwmOrLwmUpdated.forEach(PushSession::tryCompleteDelayedRequests));
        }
        if (!activeSessions.isEmpty()) {
            this.log.error("AppendRecords response {} did not contain a response for all partitions in request. Responses were missing for {}", (Object)responseData, activeSessions);
        }
    }

    private static final class StoppingPushSessions {
        private final Map<PartitionReplicaKey, PushSession> stoppingPushSessions = new HashMap<PartitionReplicaKey, PushSession>();
        private volatile int size = 0;

        StoppingPushSessions() {
        }

        PushSession get(PartitionReplicaKey partitionReplicaKey) {
            return this.stoppingPushSessions.get(partitionReplicaKey);
        }

        PushSession put(PartitionReplicaKey partitionReplicaKey, PushSession pushSession) {
            PushSession prevStoppingSession = this.stoppingPushSessions.put(partitionReplicaKey, pushSession);
            this.size = this.stoppingPushSessions.size();
            return prevStoppingSession;
        }

        PushSession remove(PartitionReplicaKey partitionReplicaKey) {
            PushSession prevStoppingSession = this.stoppingPushSessions.remove(partitionReplicaKey);
            this.size = this.stoppingPushSessions.size();
            return prevStoppingSession;
        }

        void clear() {
            this.stoppingPushSessions.clear();
            this.size = 0;
        }

        int size() {
            return this.size;
        }
    }

    private static final class PartitionReplicaKey {
        private final Uuid topicId;
        private final int partitionId;
        private final int replicaId;

        public PartitionReplicaKey(TopicIdPartition topicIdPartition, int replicaId) {
            this.topicId = topicIdPartition.topicId();
            this.partitionId = topicIdPartition.partitionId();
            this.replicaId = replicaId;
        }

        public PartitionReplicaKey(Uuid topicId, int partitionId, int replicaId) {
            this.topicId = topicId;
            this.partitionId = partitionId;
            this.replicaId = replicaId;
        }

        public PartitionReplicaKey(PushReplicationEvent<?> event) {
            this(event.topicIdPartition(), event.replicaId());
        }

        public int hashCode() {
            return Objects.hash(this.topicId, this.partitionId, this.replicaId);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof PartitionReplicaKey)) {
                return false;
            }
            PartitionReplicaKey that = (PartitionReplicaKey)o;
            return this.topicId.equals((Object)that.topicId) && this.partitionId == that.partitionId && this.replicaId == that.replicaId;
        }

        public String toString() {
            return "PartitionReplicaKey{topicId=" + String.valueOf(this.topicId) + ", partitionId=" + this.partitionId + ", replicaId=" + this.replicaId + "}";
        }
    }
}

