package kafka.server.share;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import kafka.cluster.PartitionListener;
import kafka.server.ReplicaManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/server/share/SharePartitionManager.class */
public class SharePartitionManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SharePartitionManager.class);
    private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
    private final ReplicaManager replicaManager;
    private final Time time;
    private final ShareSessionCache cache;
    private final GroupConfigManager groupConfigManager;
    private final int defaultRecordLockDurationMs;
    private final Timer timer;
    private final int maxInFlightMessages;
    private final int maxDeliveryCount;
    private final Persister persister;
    private final ShareGroupMetrics shareGroupMetrics;
    private final int maxFetchRecords;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartitionManager$ShareGroupMetrics.class */
    public static class ShareGroupMetrics {
        public static final String METRICS_GROUP_NAME = "share-group-metrics";
        public static final String SHARE_ACK_SENSOR = "share-acknowledgement-sensor";
        public static final String SHARE_ACK_RATE = "share-acknowledgement-rate";
        public static final String SHARE_ACK_COUNT = "share-acknowledgement-count";
        public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement";
        public static final String RECORD_ACK_RATE = "record-acknowledgement-rate";
        public static final String RECORD_ACK_COUNT = "record-acknowledgement-count";
        public static final String ACK_TYPE = "ack-type";
        public static final String PARTITION_LOAD_TIME_SENSOR = "partition-load-time-sensor";
        public static final String PARTITION_LOAD_TIME_AVG = "partition-load-time-avg";
        public static final String PARTITION_LOAD_TIME_MAX = "partition-load-time-max";
        public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap();
        private final Time time;
        private final Sensor shareAcknowledgementSensor;
        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap();
        private final Sensor partitionLoadTimeSensor;

        public ShareGroupMetrics(Metrics metrics, Time time) {
            this.time = time;
            this.shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
            this.shareAcknowledgementSensor.add(new Meter(metrics.metricName(SHARE_ACK_RATE, METRICS_GROUP_NAME, "Rate of acknowledge requests."), metrics.metricName(SHARE_ACK_COUNT, METRICS_GROUP_NAME, "The number of acknowledge requests.")));
            for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
                this.recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue())));
                this.recordAcksSensorMap.get(entry.getKey()).add(new Meter(metrics.metricName(RECORD_ACK_RATE, METRICS_GROUP_NAME, "Rate of records acknowledged per acknowledgement type.", new String[]{ACK_TYPE, entry.getValue()}), metrics.metricName(RECORD_ACK_COUNT, METRICS_GROUP_NAME, "The number of records acknowledged per acknowledgement type.", new String[]{ACK_TYPE, entry.getValue()})));
            }
            this.partitionLoadTimeSensor = metrics.sensor(PARTITION_LOAD_TIME_SENSOR);
            this.partitionLoadTimeSensor.add(metrics.metricName(PARTITION_LOAD_TIME_AVG, METRICS_GROUP_NAME, "The average time in milliseconds to load the share partitions."), new Avg());
            this.partitionLoadTimeSensor.add(metrics.metricName(PARTITION_LOAD_TIME_MAX, METRICS_GROUP_NAME, "The maximum time in milliseconds to load the share partitions."), new Max());
        }

        void shareAcknowledgement() {
            this.shareAcknowledgementSensor.record();
        }

        void recordAcknowledgement(byte b) {
            if (this.recordAcksSensorMap.containsKey(Byte.valueOf(b))) {
                this.recordAcksSensorMap.get(Byte.valueOf(b)).record();
            }
        }

        void partitionLoadTime(long j) {
            this.partitionLoadTimeSensor.record(this.time.hiResClockMs() - j);
        }

        static {
            RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
            RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
            RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartitionManager$SharePartitionListener.class */
    public static class SharePartitionListener implements PartitionListener {
        private final SharePartitionKey sharePartitionKey;
        private final ReplicaManager replicaManager;
        private final Map<SharePartitionKey, SharePartition> partitionCacheMap;

        SharePartitionListener(SharePartitionKey sharePartitionKey, ReplicaManager replicaManager, Map<SharePartitionKey, SharePartition> map) {
            this.sharePartitionKey = sharePartitionKey;
            this.replicaManager = replicaManager;
            this.partitionCacheMap = map;
        }

        @Override // kafka.cluster.PartitionListener
        public void onFailed(TopicPartition topicPartition) {
            SharePartitionManager.log.debug("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, this.sharePartitionKey);
            onUpdate(topicPartition);
        }

        @Override // kafka.cluster.PartitionListener
        public void onDeleted(TopicPartition topicPartition) {
            SharePartitionManager.log.debug("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, this.sharePartitionKey);
            onUpdate(topicPartition);
        }

        @Override // kafka.cluster.PartitionListener
        public void onBecomingFollower(TopicPartition topicPartition) {
            SharePartitionManager.log.debug("The share partition becoming follower listener is invoked for the topic-partition: {}, share-partition: {}", topicPartition, this.sharePartitionKey);
            onUpdate(topicPartition);
        }

        private void onUpdate(TopicPartition topicPartition) {
            if (this.sharePartitionKey.topicIdPartition().topicPartition().equals(topicPartition)) {
                SharePartitionManager.removeSharePartitionFromCache(this.sharePartitionKey, this.partitionCacheMap, this.replicaManager);
            } else {
                SharePartitionManager.log.error("The share partition listener is invoked for the wrong topic-partition: {}, share-partition: {}", topicPartition, this.sharePartitionKey);
            }
        }
    }

    public SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache shareSessionCache, int i, int i2, int i3, int i4, Persister persister, GroupConfigManager groupConfigManager, Metrics metrics) {
        this(replicaManager, time, shareSessionCache, new ConcurrentHashMap(), i, i2, i3, i4, persister, groupConfigManager, metrics);
    }

    private SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache shareSessionCache, Map<SharePartitionKey, SharePartition> map, int i, int i2, int i3, int i4, Persister persister, GroupConfigManager groupConfigManager, Metrics metrics) {
        this(replicaManager, time, shareSessionCache, map, i, new SystemTimerReaper("share-group-lock-timeout-reaper", new SystemTimer("share-group-lock-timeout")), i2, i3, i4, persister, groupConfigManager, metrics);
    }

    SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache shareSessionCache, Map<SharePartitionKey, SharePartition> map, int i, Timer timer, int i2, int i3, int i4, Persister persister, GroupConfigManager groupConfigManager, Metrics metrics) {
        this.replicaManager = replicaManager;
        this.time = time;
        this.cache = shareSessionCache;
        this.partitionCacheMap = map;
        this.defaultRecordLockDurationMs = i;
        this.timer = timer;
        this.maxDeliveryCount = i2;
        this.maxInFlightMessages = i3;
        this.persister = persister;
        this.groupConfigManager = groupConfigManager;
        this.shareGroupMetrics = new ShareGroupMetrics((Metrics) Objects.requireNonNull(metrics), time);
        this.maxFetchRecords = i4;
    }

    public CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages(String str, String str2, FetchParams fetchParams, Map<TopicIdPartition, Integer> map) {
        log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}", new Object[]{map.keySet(), str, fetchParams});
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> completableFuture = new CompletableFuture<>();
        processShareFetch(new ShareFetch(fetchParams, str, str2, completableFuture, map, this.maxFetchRecords));
        return completableFuture;
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> acknowledge(String str, String str2, Map<TopicIdPartition, List<ShareAcknowledgementBatch>> map) {
        log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", map.keySet(), str2);
        this.shareGroupMetrics.shareAcknowledgement();
        HashMap hashMap = new HashMap();
        map.forEach((topicIdPartition, list) -> {
            SharePartitionKey sharePartitionKey = sharePartitionKey(str2, topicIdPartition);
            SharePartition sharePartition = this.partitionCacheMap.get(sharePartitionKey);
            if (sharePartition == null) {
                hashMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
                return;
            }
            CompletableFuture completableFuture = new CompletableFuture();
            sharePartition.acknowledge(str, list).whenComplete((r8, th) -> {
                if (th != null) {
                    fencedSharePartitionHandler().accept(sharePartitionKey, th);
                    completableFuture.complete(th);
                } else {
                    list.forEach(shareAcknowledgementBatch -> {
                        List acknowledgeTypes = shareAcknowledgementBatch.acknowledgeTypes();
                        ShareGroupMetrics shareGroupMetrics = this.shareGroupMetrics;
                        Objects.requireNonNull(shareGroupMetrics);
                        acknowledgeTypes.forEach((v1) -> {
                            r1.recordAcknowledgement(v1);
                        });
                    });
                    completableFuture.complete(null);
                }
            });
            this.replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(str2, topicIdPartition.topicId(), topicIdPartition.partition()));
            hashMap.put(topicIdPartition, completableFuture);
        });
        return mapAcknowledgementFutures(hashMap);
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> releaseSession(String str, String str2) {
        log.trace("Release session request for groupId: {}, memberId: {}", str, str2);
        Uuid fromString = Uuid.fromString(str2);
        List<TopicIdPartition> cachedTopicIdPartitionsInShareSession = cachedTopicIdPartitionsInShareSession(str, fromString);
        ShareSessionKey shareSessionKey = shareSessionKey(str, fromString);
        if (this.cache.remove(shareSessionKey) == null) {
            log.error("Share session error for {}: no such share session found", shareSessionKey);
            return FutureUtils.failedFuture(Errors.SHARE_SESSION_NOT_FOUND.exception());
        }
        log.debug("Removed share session with key " + String.valueOf(shareSessionKey));
        if (cachedTopicIdPartitionsInShareSession.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        HashMap hashMap = new HashMap();
        cachedTopicIdPartitionsInShareSession.forEach(topicIdPartition -> {
            SharePartitionKey sharePartitionKey = sharePartitionKey(str, topicIdPartition);
            SharePartition sharePartition = this.partitionCacheMap.get(sharePartitionKey);
            if (sharePartition == null) {
                log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", str, topicIdPartition);
                hashMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
                return;
            }
            CompletableFuture completableFuture = new CompletableFuture();
            sharePartition.releaseAcquiredRecords(str2).whenComplete((r7, th) -> {
                if (th == null) {
                    completableFuture.complete(null);
                } else {
                    fencedSharePartitionHandler().accept(sharePartitionKey, th);
                    completableFuture.complete(th);
                }
            });
            this.replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(str, topicIdPartition.topicId(), topicIdPartition.partition()));
            hashMap.put(topicIdPartition, completableFuture);
        });
        return mapAcknowledgementFutures(hashMap);
    }

    private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> map) {
        return CompletableFuture.allOf((CompletableFuture[]) map.values().toArray(new CompletableFuture[0])).thenApply(r4 -> {
            HashMap hashMap = new HashMap();
            map.forEach((topicIdPartition, completableFuture) -> {
                ShareAcknowledgeResponseData.PartitionData partitionIndex = new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition());
                Throwable th = (Throwable) completableFuture.join();
                if (th != null) {
                    partitionIndex.setErrorCode(Errors.forException(th).code()).setErrorMessage(th.getMessage());
                }
                hashMap.put(topicIdPartition, partitionIndex);
            });
            return hashMap;
        });
    }

    public ShareFetchContext newContext(String str, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> map, List<TopicIdPartition> list, ShareRequestMetadata shareRequestMetadata, Boolean bool) {
        FinalContext shareSessionContext;
        HashMap hashMap = new HashMap();
        map.forEach((topicIdPartition, sharePartitionData) -> {
            if (sharePartitionData.maxBytes > 0) {
                hashMap.put(topicIdPartition, sharePartitionData);
            }
        });
        if (shareRequestMetadata.isFull()) {
            ShareSessionKey shareSessionKey = shareSessionKey(str, shareRequestMetadata.memberId());
            if (shareRequestMetadata.epoch() == -1) {
                if (!hashMap.isEmpty()) {
                    throw Errors.INVALID_REQUEST.exception();
                }
                if (this.cache.get(shareSessionKey) == null) {
                    log.error("Share session error for {}: no such share session found", shareSessionKey);
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                shareSessionContext = new FinalContext();
            } else {
                if (bool.booleanValue()) {
                    log.error("Acknowledge data present in Initial Fetch Request for group {} member {}", str, shareRequestMetadata.memberId());
                    throw Errors.INVALID_REQUEST.exception();
                }
                if (this.cache.remove(shareSessionKey) != null) {
                    log.debug("Removed share session with key {}", shareSessionKey);
                }
                ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(hashMap.size());
                hashMap.forEach((topicIdPartition2, sharePartitionData2) -> {
                    implicitLinkedHashCollection.mustAdd(new CachedSharePartition(topicIdPartition2, sharePartitionData2, false));
                });
                ShareSessionKey maybeCreateSession = this.cache.maybeCreateSession(str, shareRequestMetadata.memberId(), this.time.milliseconds(), implicitLinkedHashCollection);
                if (maybeCreateSession == null) {
                    log.error("Could not create a share session for group {} member {}", str, shareRequestMetadata.memberId());
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                shareSessionContext = new ShareSessionContext(shareRequestMetadata, hashMap);
                log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share session will be started.", new Object[]{maybeCreateSession, false, partitionsToLogString(hashMap.keySet())});
            }
        } else {
            synchronized (this.cache) {
                ShareSessionKey shareSessionKey2 = shareSessionKey(str, shareRequestMetadata.memberId());
                ShareSession shareSession = this.cache.get(shareSessionKey2);
                if (shareSession == null) {
                    log.error("Share session error for {}: no such share session found", shareSessionKey2);
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                if (shareSession.epoch != shareRequestMetadata.epoch()) {
                    log.debug("Share session error for {}: expected epoch {}, but got {} instead", new Object[]{shareSessionKey2, Integer.valueOf(shareSession.epoch), Integer.valueOf(shareRequestMetadata.epoch())});
                    throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
                }
                Map update = shareSession.update(hashMap, list);
                this.cache.touch(shareSession, this.time.milliseconds());
                shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
                log.debug("Created a new ShareSessionContext for session key {}, epoch {}: added {}, updated {}, removed {}", new Object[]{shareSession.key(), Integer.valueOf(shareSession.epoch), partitionsToLogString((Collection) update.get(ShareSession.ModifiedTopicIdPartitionType.ADDED)), partitionsToLogString((Collection) update.get(ShareSession.ModifiedTopicIdPartitionType.UPDATED)), partitionsToLogString((Collection) update.get(ShareSession.ModifiedTopicIdPartitionType.REMOVED))});
                shareSessionContext = new ShareSessionContext(shareRequestMetadata, shareSession);
            }
        }
        return shareSessionContext;
    }

    public void acknowledgeSessionUpdate(String str, ShareRequestMetadata shareRequestMetadata) {
        if (shareRequestMetadata.epoch() == 0) {
            throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
        }
        synchronized (this.cache) {
            ShareSessionKey shareSessionKey = shareSessionKey(str, shareRequestMetadata.memberId());
            ShareSession shareSession = this.cache.get(shareSessionKey);
            if (shareSession == null) {
                log.debug("Share session error for {}: no such share session found", shareSessionKey);
                throw Errors.SHARE_SESSION_NOT_FOUND.exception();
            }
            if (shareRequestMetadata.epoch() == -1) {
                return;
            }
            if (shareSession.epoch != shareRequestMetadata.epoch()) {
                log.debug("Share session error for {}: expected epoch {}, but got {} instead", new Object[]{shareSessionKey, Integer.valueOf(shareSession.epoch), Integer.valueOf(shareRequestMetadata.epoch())});
                throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
            }
            this.cache.touch(shareSession, this.time.milliseconds());
            shareSession.epoch = ShareRequestMetadata.nextEpoch(shareSession.epoch);
        }
    }

    List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String str, Uuid uuid) {
        ShareSession shareSession = this.cache.get(shareSessionKey(str, uuid));
        if (shareSession == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        shareSession.partitionMap().forEach(cachedSharePartition -> {
            arrayList.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())));
        });
        return arrayList;
    }

    private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, List<DelayedShareFetchKey> list) {
        this.replicaManager.addDelayedShareFetchRequest(delayedShareFetch, list);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.timer.close();
    }

    private ShareSessionKey shareSessionKey(String str, Uuid uuid) {
        return new ShareSessionKey(str, uuid);
    }

    private static String partitionsToLogString(Collection<TopicIdPartition> collection) {
        return ShareSession.partitionsToLogString(collection, Boolean.valueOf(log.isTraceEnabled()));
    }

    void processShareFetch(ShareFetch shareFetch) {
        if (shareFetch.partitionMaxBytes().isEmpty()) {
            shareFetch.maybeComplete(Collections.emptyMap());
            return;
        }
        ArrayList arrayList = new ArrayList();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) {
            SharePartitionKey sharePartitionKey = sharePartitionKey(shareFetch.groupId(), topicIdPartition);
            try {
                SharePartition orCreateSharePartition = getOrCreateSharePartition(sharePartitionKey);
                DelayedShareFetchGroupKey delayedShareFetchGroupKey = new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition());
                arrayList.add(delayedShareFetchGroupKey);
                arrayList.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition()));
                CompletableFuture<Void> maybeInitialize = orCreateSharePartition.maybeInitialize();
                boolean isDone = maybeInitialize.isDone();
                maybeInitialize.whenComplete((r10, th) -> {
                    if (th != null) {
                        handleInitializationException(sharePartitionKey, shareFetch, th);
                    }
                    if (isDone) {
                        return;
                    }
                    this.replicaManager.completeDelayedShareFetchRequest(delayedShareFetchGroupKey);
                });
                linkedHashMap.put(topicIdPartition, orCreateSharePartition);
            } catch (Exception e) {
                log.debug("Error processing share fetch request", e);
                shareFetch.addErroneous(topicIdPartition, e);
            }
        }
        if (shareFetch.errorInAllPartitions()) {
            shareFetch.maybeComplete(Collections.emptyMap());
        } else {
            addDelayedShareFetch(new DelayedShareFetch(shareFetch, this.replicaManager, fencedSharePartitionHandler(), linkedHashMap), arrayList);
        }
    }

    private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
        return this.partitionCacheMap.computeIfAbsent(sharePartitionKey, sharePartitionKey2 -> {
            long hiResClockMs = this.time.hiResClockMs();
            int leaderEpoch = ShareFetchUtils.leaderEpoch(this.replicaManager, sharePartitionKey.topicIdPartition().topicPartition());
            SharePartitionListener sharePartitionListener = new SharePartitionListener(sharePartitionKey, this.replicaManager, this.partitionCacheMap);
            this.replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartitionListener);
            SharePartition sharePartition = new SharePartition(sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition(), leaderEpoch, this.maxInFlightMessages, this.maxDeliveryCount, this.defaultRecordLockDurationMs, this.timer, this.time, this.persister, this.replicaManager, this.groupConfigManager, sharePartitionListener);
            this.shareGroupMetrics.partitionLoadTime(hiResClockMs);
            return sharePartition;
        });
    }

    private void handleInitializationException(SharePartitionKey sharePartitionKey, ShareFetch shareFetch, Throwable th) {
        if (th instanceof LeaderNotAvailableException) {
            log.debug("The share partition with key {} is not initialized yet", sharePartitionKey);
            return;
        }
        removeSharePartitionFromCache(sharePartitionKey, this.partitionCacheMap, this.replicaManager);
        log.debug("Error initializing share partition with key {}", sharePartitionKey, th);
        shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), th);
    }

    private BiConsumer<SharePartitionKey, Throwable> fencedSharePartitionHandler() {
        return (sharePartitionKey, th) -> {
            if ((th instanceof NotLeaderOrFollowerException) || (th instanceof FencedStateEpochException) || (th instanceof GroupIdNotFoundException) || (th instanceof UnknownTopicOrPartitionException)) {
                log.info("The share partition with key {} is fenced: {}", sharePartitionKey, th.getMessage());
                removeSharePartitionFromCache(sharePartitionKey, this.partitionCacheMap, this.replicaManager);
            }
        };
    }

    private SharePartitionKey sharePartitionKey(String str, TopicIdPartition topicIdPartition) {
        return new SharePartitionKey(str, topicIdPartition);
    }

    private static void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey, Map<SharePartitionKey, SharePartition> map, ReplicaManager replicaManager) {
        SharePartition remove = map.remove(sharePartitionKey);
        if (remove != null) {
            remove.markFenced();
            replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), remove.listener());
        }
    }
}
