package org.apache.kafka.server.share.persister;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager.class */
public class PersisterStateManager {
    private SendThread sender;
    public static final long REQUEST_BACKOFF_MS = 1000;
    public static final long REQUEST_BACKOFF_MAX_MS = 30000;
    private static final int MAX_FIND_COORD_ATTEMPTS = 5;
    private final Time time;
    private final Timer timer;
    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
    private Runnable generateCallback;
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final Map<RPCType, Set<Node>> inFlight = new HashMap();
    private final Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap = new HashMap();
    private final Object nodeMapLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.server.share.persister.PersisterStateManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors;
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$server$share$persister$PersisterStateManager$RPCType = new int[RPCType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$server$share$persister$PersisterStateManager$RPCType[RPCType.WRITE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$share$persister$PersisterStateManager$RPCType[RPCType.READ.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_NOT_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_LOAD_IN_PROGRESS.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_COORDINATOR.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$BackoffManager.class */
    public static class BackoffManager {
        private final int maxAttempts;
        private int attempts;
        private final ExponentialBackoff backoff;

        BackoffManager(int i, long j, long j2) {
            this.maxAttempts = i;
            this.backoff = new ExponentialBackoff(j, 2, j2, 0.2d);
        }

        void incrementAttempt() {
            this.attempts++;
        }

        void resetAttempts() {
            this.attempts = 0;
        }

        boolean canAttempt() {
            return this.attempts < this.maxAttempts;
        }

        long backOff() {
            return this.backoff.backoff(this.attempts);
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$PersisterStateManagerHandler.class */
    public abstract class PersisterStateManagerHandler implements RequestCompletionHandler {
        protected Node coordinatorNode;
        private final BackoffManager findCoordBackoff;
        protected final Logger log = LoggerFactory.getLogger(getClass());
        private Consumer<ClientResponse> onCompleteCallback = clientResponse -> {
        };
        protected final SharePartitionKey partitionKey;

        public PersisterStateManagerHandler(String str, Uuid uuid, int i, long j, long j2, int i2) {
            this.findCoordBackoff = new BackoffManager(i2, j, j2);
            this.partitionKey = SharePartitionKey.getInstance(str, uuid, i);
        }

        protected abstract AbstractRequest.Builder<? extends AbstractRequest> requestBuilder();

        protected abstract void handleRequestResponse(ClientResponse clientResponse);

        protected abstract boolean isResponseForRequest(ClientResponse clientResponse);

        protected abstract void findCoordinatorErrorResponse(Errors errors, Exception exc);

        protected abstract String name();

        protected abstract RPCType rpcType();

        protected abstract CompletableFuture<? extends AbstractResponse> result();

        protected AbstractRequest.Builder<FindCoordinatorRequest> findShareCoordinatorBuilder() {
            return new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id()).setKey(partitionKey().asCoordinatorKey()));
        }

        public void addRequestToNodeMap(Node node, PersisterStateManagerHandler persisterStateManagerHandler) {
            if (persisterStateManagerHandler.isBatchable()) {
                synchronized (PersisterStateManager.this.nodeMapLock) {
                    PersisterStateManager.this.nodeRPCMap.computeIfAbsent(node, node2 -> {
                        return new HashMap();
                    }).computeIfAbsent(persisterStateManagerHandler.rpcType(), rPCType -> {
                        return new HashMap();
                    }).computeIfAbsent(partitionKey().groupId(), str -> {
                        return new LinkedList();
                    }).add(persisterStateManagerHandler);
                }
                PersisterStateManager.this.sender.wakeup();
            }
        }

        protected boolean lookupNeeded() {
            if (this.coordinatorNode != null) {
                return false;
            }
            if (!PersisterStateManager.this.cacheHelper.containsTopic("__share_group_state")) {
                return true;
            }
            this.log.debug("{} internal topic already exists.", "__share_group_state");
            Node shareCoordinator = PersisterStateManager.this.cacheHelper.getShareCoordinator(partitionKey(), "__share_group_state");
            if (shareCoordinator == Node.noNode()) {
                return true;
            }
            this.log.debug("Found coordinator node in cache: {}", shareCoordinator);
            this.coordinatorNode = shareCoordinator;
            addRequestToNodeMap(shareCoordinator, this);
            return false;
        }

        protected SharePartitionKey partitionKey() {
            return this.partitionKey;
        }

        protected boolean isFindCoordinatorResponse(ClientResponse clientResponse) {
            return clientResponse != null && clientResponse.requestHeader().apiKey() == ApiKeys.FIND_COORDINATOR;
        }

        public void onComplete(ClientResponse clientResponse) {
            if (this.onCompleteCallback != null) {
                this.onCompleteCallback.accept(clientResponse);
            }
            if (clientResponse != null && clientResponse.hasResponse()) {
                if (isFindCoordinatorResponse(clientResponse)) {
                    handleFindCoordinatorResponse(clientResponse);
                } else if (isResponseForRequest(clientResponse)) {
                    handleRequestResponse(clientResponse);
                }
            }
            PersisterStateManager.this.sender.wakeup();
        }

        protected void resetCoordinatorNode() {
            this.coordinatorNode = null;
        }

        protected void handleFindCoordinatorResponse(ClientResponse clientResponse) {
            this.log.debug("Find coordinator response received - {}", clientResponse);
            this.findCoordBackoff.incrementAttempt();
            List coordinators = clientResponse.responseBody().coordinators();
            if (coordinators.size() != 1) {
                this.log.error("Find coordinator response for {} is invalid", partitionKey());
                findCoordinatorErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new IllegalStateException("Invalid response with multiple coordinators."));
                return;
            }
            FindCoordinatorResponseData.Coordinator coordinator = (FindCoordinatorResponseData.Coordinator) coordinators.get(0);
            Errors forCode = Errors.forCode(coordinator.errorCode());
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[forCode.ordinal()]) {
                case 1:
                    this.log.debug("Find coordinator response valid. Enqueuing actual request.");
                    this.findCoordBackoff.resetAttempts();
                    this.coordinatorNode = new Node(coordinator.nodeId(), coordinator.host(), coordinator.port());
                    if (isBatchable()) {
                        addRequestToNodeMap(this.coordinatorNode, this);
                        return;
                    } else {
                        PersisterStateManager.this.enqueue(this);
                        return;
                    }
                case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT /* 2 */:
                case 3:
                case 4:
                    this.log.warn("Received retriable error in find coordinator for {} using key {}: {}", new Object[]{name(), partitionKey(), forCode.message()});
                    if (this.findCoordBackoff.canAttempt()) {
                        resetCoordinatorNode();
                        PersisterStateManager.this.timer.add(new PersisterTimerTask(this.findCoordBackoff.backOff(), this));
                        return;
                    } else {
                        this.log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey());
                        findCoordinatorErrorResponse(forCode, new Exception("Exhausted max retries to find coordinator without success."));
                        return;
                    }
                default:
                    this.log.error("Unable to find coordinator for {} using key {}.", name(), partitionKey());
                    findCoordinatorErrorResponse(forCode, null);
                    return;
            }
        }

        public Node getCoordinatorNode() {
            return this.coordinatorNode;
        }

        protected abstract boolean isBatchable();

        protected void setOnCompleteCallback(Consumer<ClientResponse> consumer) {
            this.onCompleteCallback = consumer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$PersisterTimerTask.class */
    public final class PersisterTimerTask extends TimerTask {
        private final PersisterStateManagerHandler handler;

        PersisterTimerTask(long j, PersisterStateManagerHandler persisterStateManagerHandler) {
            super(j);
            this.handler = persisterStateManagerHandler;
        }

        @Override // java.lang.Runnable
        public void run() {
            PersisterStateManager.this.enqueue(this.handler);
            PersisterStateManager.this.sender.wakeup();
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$RPCType.class */
    public enum RPCType {
        READ,
        WRITE,
        DELETE,
        SUMMARY,
        UNKNOWN
    }

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$ReadStateHandler.class */
    public class ReadStateHandler extends PersisterStateManagerHandler {
        private final int leaderEpoch;
        private final CompletableFuture<ReadShareGroupStateResponse> result;
        private final BackoffManager readStateBackoff;

        public ReadStateHandler(String str, Uuid uuid, int i, int i2, CompletableFuture<ReadShareGroupStateResponse> completableFuture, long j, long j2, int i3, Consumer<ClientResponse> consumer) {
            super(str, uuid, i, j, j2, i3);
            this.leaderEpoch = i2;
            this.result = completableFuture;
            this.readStateBackoff = new BackoffManager(i3, j, j2);
        }

        public ReadStateHandler(PersisterStateManager persisterStateManager, String str, Uuid uuid, int i, int i2, CompletableFuture<ReadShareGroupStateResponse> completableFuture, Consumer<ClientResponse> consumer) {
            this(str, uuid, i, i2, completableFuture, 1000L, 30000L, PersisterStateManager.MAX_FIND_COORD_ATTEMPTS, consumer);
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected String name() {
            return "ReadStateHandler";
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected AbstractRequest.Builder<ReadShareGroupStateRequest> requestBuilder() {
            throw new RuntimeException("Read requests are batchable, hence individual requests not needed.");
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected boolean isResponseForRequest(ClientResponse clientResponse) {
            return clientResponse.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE;
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected void handleRequestResponse(ClientResponse clientResponse) {
            this.log.debug("Read state response received - {}", clientResponse);
            this.readStateBackoff.incrementAttempt();
            for (ReadShareGroupStateResponseData.ReadStateResult readStateResult : clientResponse.responseBody().data().results()) {
                if (readStateResult.topicId().equals(partitionKey().topicId())) {
                    Optional findFirst = readStateResult.partitions().stream().filter(partitionResult -> {
                        return partitionResult.partition() == partitionKey().partition();
                    }).findFirst();
                    if (findFirst.isPresent()) {
                        Errors forCode = Errors.forCode(((ReadShareGroupStateResponseData.PartitionResult) findFirst.get()).errorCode());
                        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[forCode.ordinal()]) {
                            case 1:
                                this.readStateBackoff.resetAttempts();
                                this.result.complete(new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(ReadShareGroupStateResponse.toResponseReadStateResult(partitionKey().topicId(), Collections.singletonList((ReadShareGroupStateResponseData.PartitionResult) findFirst.get()))))));
                                return;
                            case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT /* 2 */:
                            case 3:
                            case 4:
                                this.log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), forCode.message());
                                if (this.readStateBackoff.canAttempt()) {
                                    super.resetCoordinatorNode();
                                    PersisterStateManager.this.timer.add(new PersisterTimerTask(this.readStateBackoff.backOff(), this));
                                    return;
                                } else {
                                    this.log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey());
                                    readStateErrorReponse(forCode, new Exception("Exhausted max retries to complete read state RPC without success."));
                                    return;
                                }
                            default:
                                this.log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), forCode.message());
                                readStateErrorReponse(forCode, null);
                                return;
                        }
                    }
                }
            }
            IllegalStateException illegalStateException = new IllegalStateException("Failed to read state for share partition " + String.valueOf(partitionKey()));
            readStateErrorReponse(Errors.forException(illegalStateException), illegalStateException);
        }

        protected void readStateErrorReponse(Errors errors, Exception exc) {
            this.result.complete(new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), errors, "Error in find coordinator. " + (exc == null ? errors.message() : exc.getMessage()))));
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected void findCoordinatorErrorResponse(Errors errors, Exception exc) {
            this.result.complete(new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), errors, "Error in read state RPC. " + (exc == null ? errors.message() : exc.getMessage()))));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        public CompletableFuture<ReadShareGroupStateResponse> result() {
            return this.result;
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected boolean isBatchable() {
            return true;
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected RPCType rpcType() {
            return RPCType.READ;
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$RequestCoalescerHelper.class */
    private static class RequestCoalescerHelper {
        static final /* synthetic */ boolean $assertionsDisabled;

        private RequestCoalescerHelper() {
        }

        public static AbstractRequest.Builder<? extends AbstractRequest> coalesceRequests(String str, RPCType rPCType, List<? extends PersisterStateManagerHandler> list) {
            switch (AnonymousClass1.$SwitchMap$org$apache$kafka$server$share$persister$PersisterStateManager$RPCType[rPCType.ordinal()]) {
                case 1:
                    return coalesceWrites(str, list);
                case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT /* 2 */:
                    return coalesceReads(str, list);
                default:
                    throw new RuntimeException("Unknown rpc type: " + String.valueOf(rPCType));
            }
        }

        private static AbstractRequest.Builder<? extends AbstractRequest> coalesceWrites(String str, List<? extends PersisterStateManagerHandler> list) {
            HashMap hashMap = new HashMap();
            list.forEach(persisterStateManagerHandler -> {
                if (!$assertionsDisabled && !(persisterStateManagerHandler instanceof WriteStateHandler)) {
                    throw new AssertionError();
                }
                WriteStateHandler writeStateHandler = (WriteStateHandler) persisterStateManagerHandler;
                ((List) hashMap.computeIfAbsent(writeStateHandler.partitionKey().topicId(), uuid -> {
                    return new LinkedList();
                })).add(new WriteShareGroupStateRequestData.PartitionData().setPartition(writeStateHandler.partitionKey().partition()).setStateEpoch(writeStateHandler.stateEpoch).setLeaderEpoch(writeStateHandler.leaderEpoch).setStartOffset(writeStateHandler.startOffset).setStateBatches((List) writeStateHandler.batches.stream().map(persisterStateBatch -> {
                    return new WriteShareGroupStateRequestData.StateBatch().setFirstOffset(persisterStateBatch.firstOffset()).setLastOffset(persisterStateBatch.lastOffset()).setDeliveryState(persisterStateBatch.deliveryState()).setDeliveryCount(persisterStateBatch.deliveryCount());
                }).collect(Collectors.toList())));
            });
            return new WriteShareGroupStateRequest.Builder(new WriteShareGroupStateRequestData().setGroupId(str).setTopics((List) hashMap.entrySet().stream().map(entry -> {
                return new WriteShareGroupStateRequestData.WriteStateData().setTopicId((Uuid) entry.getKey()).setPartitions((List) entry.getValue());
            }).collect(Collectors.toList())));
        }

        private static AbstractRequest.Builder<? extends AbstractRequest> coalesceReads(String str, List<? extends PersisterStateManagerHandler> list) {
            HashMap hashMap = new HashMap();
            list.forEach(persisterStateManagerHandler -> {
                if (!$assertionsDisabled && !(persisterStateManagerHandler instanceof ReadStateHandler)) {
                    throw new AssertionError();
                }
                ReadStateHandler readStateHandler = (ReadStateHandler) persisterStateManagerHandler;
                ((List) hashMap.computeIfAbsent(readStateHandler.partitionKey().topicId(), uuid -> {
                    return new LinkedList();
                })).add(new ReadShareGroupStateRequestData.PartitionData().setPartition(readStateHandler.partitionKey().partition()).setLeaderEpoch(readStateHandler.leaderEpoch));
            });
            return new ReadShareGroupStateRequest.Builder(new ReadShareGroupStateRequestData().setGroupId(str).setTopics((List) hashMap.entrySet().stream().map(entry -> {
                return new ReadShareGroupStateRequestData.ReadStateData().setTopicId((Uuid) entry.getKey()).setPartitions((List) entry.getValue());
            }).collect(Collectors.toList())));
        }

        static {
            $assertionsDisabled = !PersisterStateManager.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$SendThread.class */
    public class SendThread extends InterBrokerSendThread {
        private final ConcurrentLinkedQueue<PersisterStateManagerHandler> queue;
        private final Random random;

        public SendThread(String str, KafkaClient kafkaClient, int i, Time time, boolean z, Random random) {
            super(str, kafkaClient, i, time, z);
            this.queue = new ConcurrentLinkedQueue<>();
            this.random = random;
        }

        private Node randomNode() {
            List<Node> clusterNodes = PersisterStateManager.this.cacheHelper.getClusterNodes();
            return (clusterNodes == null || clusterNodes.isEmpty()) ? Node.noNode() : clusterNodes.get(this.random.nextInt(clusterNodes.size()));
        }

        @Override // org.apache.kafka.server.util.InterBrokerSendThread
        public Collection<RequestAndCompletionHandler> generateRequests() {
            if (PersisterStateManager.this.generateCallback != null) {
                PersisterStateManager.this.generateCallback.run();
            }
            ArrayList arrayList = new ArrayList();
            if (!this.queue.isEmpty()) {
                PersisterStateManagerHandler peek = this.queue.peek();
                this.queue.poll();
                if (peek.lookupNeeded()) {
                    Node randomNode = randomNode();
                    if (randomNode != Node.noNode()) {
                        this.log.debug("Sending find coordinator RPC");
                        return Collections.singletonList(new RequestAndCompletionHandler(PersisterStateManager.this.time.milliseconds(), randomNode, peek.findShareCoordinatorBuilder(), peek));
                    }
                    this.log.error("Unable to find node to use for coordinator lookup.");
                    peek.findCoordinatorErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Errors.COORDINATOR_NOT_AVAILABLE.exception());
                    return Collections.emptyList();
                }
                if (!peek.isBatchable()) {
                    arrayList.add(new RequestAndCompletionHandler(PersisterStateManager.this.time.milliseconds(), peek.coordinatorNode, peek.requestBuilder(), peek));
                }
            }
            HashMap hashMap = new HashMap();
            synchronized (PersisterStateManager.this.nodeMapLock) {
                PersisterStateManager.this.nodeRPCMap.forEach((node, map) -> {
                    map.forEach((rPCType, map) -> {
                        map.forEach((str, list) -> {
                            if (PersisterStateManager.this.inFlight.containsKey(rPCType) && PersisterStateManager.this.inFlight.get(rPCType).contains(node)) {
                                return;
                            }
                            arrayList.add(new RequestAndCompletionHandler(PersisterStateManager.this.time.milliseconds(), node, RequestCoalescerHelper.coalesceRequests(str, rPCType, list), clientResponse -> {
                                PersisterStateManager.this.inFlight.computeIfPresent(rPCType, (rPCType, set) -> {
                                    set.remove(node);
                                    return set;
                                });
                                list.forEach(persisterStateManagerHandler -> {
                                    persisterStateManagerHandler.onComplete(clientResponse);
                                });
                                wakeup();
                            }));
                            ((Set) hashMap.computeIfAbsent(rPCType, rPCType -> {
                                return new HashSet();
                            })).add(node);
                        });
                    });
                });
                hashMap.forEach((rPCType, set) -> {
                    PersisterStateManager.this.inFlight.computeIfAbsent(rPCType, rPCType -> {
                        return new HashSet();
                    }).addAll(set);
                    set.forEach(node2 -> {
                        PersisterStateManager.this.nodeRPCMap.computeIfPresent(node2, (node2, map2) -> {
                            map2.remove(rPCType);
                            return map2;
                        });
                    });
                });
            }
            return arrayList;
        }

        public void enqueue(PersisterStateManagerHandler persisterStateManagerHandler) {
            this.queue.add(persisterStateManagerHandler);
            wakeup();
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManager$WriteStateHandler.class */
    public class WriteStateHandler extends PersisterStateManagerHandler {
        private final int stateEpoch;
        private final int leaderEpoch;
        private final long startOffset;
        private final List<PersisterStateBatch> batches;
        private final CompletableFuture<WriteShareGroupStateResponse> result;
        private final BackoffManager writeStateBackoff;

        public WriteStateHandler(String str, Uuid uuid, int i, int i2, int i3, long j, List<PersisterStateBatch> list, CompletableFuture<WriteShareGroupStateResponse> completableFuture, long j2, long j3, int i4) {
            super(str, uuid, i, j2, j3, i4);
            this.stateEpoch = i2;
            this.leaderEpoch = i3;
            this.startOffset = j;
            this.batches = list;
            this.result = completableFuture;
            this.writeStateBackoff = new BackoffManager(i4, j2, j3);
        }

        public WriteStateHandler(PersisterStateManager persisterStateManager, String str, Uuid uuid, int i, int i2, int i3, long j, List<PersisterStateBatch> list, CompletableFuture<WriteShareGroupStateResponse> completableFuture, Consumer<ClientResponse> consumer) {
            this(str, uuid, i, i2, i3, j, list, completableFuture, 1000L, 30000L, PersisterStateManager.MAX_FIND_COORD_ATTEMPTS);
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected String name() {
            return "WriteStateHandler";
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
            throw new RuntimeException("Write requests are batchable, hence individual requests not needed.");
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected boolean isResponseForRequest(ClientResponse clientResponse) {
            return clientResponse.requestHeader().apiKey() == ApiKeys.WRITE_SHARE_GROUP_STATE;
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected void handleRequestResponse(ClientResponse clientResponse) {
            this.log.debug("Write state response received - {}", clientResponse);
            this.writeStateBackoff.incrementAttempt();
            for (WriteShareGroupStateResponseData.WriteStateResult writeStateResult : clientResponse.responseBody().data().results()) {
                if (writeStateResult.topicId().equals(partitionKey().topicId())) {
                    Optional findFirst = writeStateResult.partitions().stream().filter(partitionResult -> {
                        return partitionResult.partition() == partitionKey().partition();
                    }).findFirst();
                    if (findFirst.isPresent()) {
                        Errors forCode = Errors.forCode(((WriteShareGroupStateResponseData.PartitionResult) findFirst.get()).errorCode());
                        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[forCode.ordinal()]) {
                            case 1:
                                this.writeStateBackoff.resetAttempts();
                                this.result.complete(new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(partitionKey().topicId(), Collections.singletonList((WriteShareGroupStateResponseData.PartitionResult) findFirst.get()))))));
                                return;
                            case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT /* 2 */:
                            case 3:
                            case 4:
                                this.log.warn("Received retriable error in write state RPC for key {}: {}", partitionKey(), forCode.message());
                                if (this.writeStateBackoff.canAttempt()) {
                                    super.resetCoordinatorNode();
                                    PersisterStateManager.this.timer.add(new PersisterTimerTask(this.writeStateBackoff.backOff(), this));
                                    return;
                                } else {
                                    this.log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey());
                                    writeStateErrorResponse(forCode, new Exception("Exhausted max retries to complete write state RPC without success."));
                                    return;
                                }
                            default:
                                this.log.error("Unable to perform write state RPC for key {}: {}", partitionKey(), forCode.message());
                                writeStateErrorResponse(forCode, null);
                                return;
                        }
                    }
                }
            }
            IllegalStateException illegalStateException = new IllegalStateException("Failed to write state for share partition: " + String.valueOf(partitionKey()));
            writeStateErrorResponse(Errors.forException(illegalStateException), illegalStateException);
        }

        private void writeStateErrorResponse(Errors errors, Exception exc) {
            this.result.complete(new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), errors, "Error in write state RPC. " + (exc == null ? errors.message() : exc.getMessage()))));
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected void findCoordinatorErrorResponse(Errors errors, Exception exc) {
            this.result.complete(new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), errors, "Error in find coordinator. " + (exc == null ? errors.message() : exc.getMessage()))));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        public CompletableFuture<WriteShareGroupStateResponse> result() {
            return this.result;
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected boolean isBatchable() {
            return true;
        }

        @Override // org.apache.kafka.server.share.persister.PersisterStateManager.PersisterStateManagerHandler
        protected RPCType rpcType() {
            return RPCType.WRITE;
        }
    }

    public PersisterStateManager(KafkaClient kafkaClient, ShareCoordinatorMetadataCacheHelper shareCoordinatorMetadataCacheHelper, Time time, Timer timer) {
        if (kafkaClient == null) {
            throw new IllegalArgumentException("Kafkaclient must not be null.");
        }
        if (time == null) {
            throw new IllegalArgumentException("Time must not be null.");
        }
        if (timer == null) {
            throw new IllegalArgumentException("Timer must not be null.");
        }
        if (shareCoordinatorMetadataCacheHelper == null) {
            throw new IllegalArgumentException("CacheHelper must not be null.");
        }
        this.time = time;
        this.timer = timer;
        this.cacheHelper = shareCoordinatorMetadataCacheHelper;
        this.sender = new SendThread("PersisterStateManager", kafkaClient, Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS.longValue()), this.time, true, new Random(this.time.milliseconds()));
    }

    public void enqueue(PersisterStateManagerHandler persisterStateManagerHandler) {
        this.sender.enqueue(persisterStateManagerHandler);
    }

    public void start() {
        if (this.isStarted.compareAndSet(false, true)) {
            this.sender.start();
            this.isStarted.set(true);
        }
    }

    public void stop() throws Exception {
        if (this.isStarted.compareAndSet(true, false)) {
            this.sender.shutdown();
            Utils.closeQuietly(this.timer, "PersisterStateManager timer");
        }
    }

    Map<Node, Map<RPCType, Map<String, List<PersisterStateManagerHandler>>>> nodeRPCMap() {
        return this.nodeRPCMap;
    }

    public void setGenerateCallback(Runnable runnable) {
        this.generateCallback = runnable;
    }
}
