package kafka.server.share;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:kafka/server/share/DelayedShareFetch.class */
public class DelayedShareFetch extends DelayedOperation {
    private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);
    private final ShareFetch shareFetch;
    private final ReplicaManager replicaManager;
    private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
    private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
    private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
    private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DelayedShareFetch(ShareFetch shareFetch, ReplicaManager replicaManager, BiConsumer<SharePartitionKey, Throwable> biConsumer, LinkedHashMap<TopicIdPartition, SharePartition> linkedHashMap) {
        super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
        this.shareFetch = shareFetch;
        this.replicaManager = replicaManager;
        this.partitionsAcquired = new LinkedHashMap<>();
        this.partitionsAlreadyFetched = new LinkedHashMap<>();
        this.exceptionHandler = biConsumer;
        this.sharePartitions = linkedHashMap;
    }

    public void onExpiration() {
    }

    public void onComplete() {
        this.lock.lock();
        log.trace("Completing the delayed share fetch request for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.partitionsAcquired.keySet()});
        try {
            LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions = this.partitionsAcquired.isEmpty() ? acquirablePartitions() : this.partitionsAcquired;
            if (acquirablePartitions.isEmpty()) {
                this.shareFetch.maybeComplete(Collections.emptyMap());
            } else {
                log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", new Object[]{acquirablePartitions, this.shareFetch.groupId(), this.shareFetch.fetchParams()});
                completeShareFetchRequest(acquirablePartitions);
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap) {
        try {
            try {
                LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog = this.partitionsAlreadyFetched.isEmpty() ? readFromLog(linkedHashMap) : combineLogReadResponse(linkedHashMap, this.partitionsAlreadyFetched);
                LinkedHashMap linkedHashMap2 = new LinkedHashMap();
                for (Map.Entry<TopicIdPartition, LogReadResult> entry : readFromLog.entrySet()) {
                    linkedHashMap2.put(entry.getKey(), entry.getValue().toFetchPartitionData(false));
                }
                this.shareFetch.maybeComplete(ShareFetchUtils.processFetchResponse(this.shareFetch, linkedHashMap2, this.sharePartitions, this.replicaManager, this.exceptionHandler));
                releasePartitionLocks(linkedHashMap.keySet());
                this.replicaManager.addToActionQueue(() -> {
                    linkedHashMap.keySet().forEach(topicIdPartition -> {
                        this.replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(this.shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
                    });
                });
            } catch (Exception e) {
                log.error("Error processing delayed share fetch request", e);
                handleFetchException(this.shareFetch, linkedHashMap.keySet(), e);
                releasePartitionLocks(linkedHashMap.keySet());
                this.replicaManager.addToActionQueue(() -> {
                    linkedHashMap.keySet().forEach(topicIdPartition -> {
                        this.replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(this.shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
                    });
                });
            }
        } catch (Throwable th) {
            releasePartitionLocks(linkedHashMap.keySet());
            this.replicaManager.addToActionQueue(() -> {
                linkedHashMap.keySet().forEach(topicIdPartition -> {
                    this.replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(this.shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
                });
            });
            throw th;
        }
    }

    public boolean tryComplete() {
        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions = acquirablePartitions();
        try {
            if (acquirablePartitions.isEmpty()) {
                log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.sharePartitions.keySet()});
                return false;
            }
            LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog = maybeReadFromLog(acquirablePartitions);
            maybeUpdateFetchOffsetMetadata(acquirablePartitions, maybeReadFromLog);
            if (!anyPartitionHasLogReadError(maybeReadFromLog) && !isMinBytesSatisfied(acquirablePartitions)) {
                log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, topic partitions {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), this.sharePartitions.keySet()});
                releasePartitionLocks(acquirablePartitions.keySet());
                return false;
            }
            this.partitionsAcquired = acquirablePartitions;
            this.partitionsAlreadyFetched = maybeReadFromLog;
            boolean forceComplete = forceComplete();
            if (!forceComplete) {
                releasePartitionLocks(this.partitionsAcquired.keySet());
            }
            return forceComplete;
        } catch (Exception e) {
            log.error("Error processing delayed share fetch request", e);
            this.partitionsAcquired.clear();
            this.partitionsAlreadyFetched.clear();
            releasePartitionLocks(acquirablePartitions.keySet());
            return forceComplete();
        }
    }

    LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap = new LinkedHashMap<>();
        this.sharePartitions.forEach((topicIdPartition, sharePartition) -> {
            int intValue = ((Integer) this.shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0)).intValue();
            if (sharePartition.maybeAcquireFetchLock()) {
                try {
                    if (sharePartition.canAcquireRecords()) {
                        linkedHashMap.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), sharePartition.nextFetchOffset(), 0L, intValue, Optional.empty()));
                    } else {
                        sharePartition.releaseFetchLock();
                        log.trace("Record lock partition limit exceeded for SharePartition {}, cannot acquire more records", sharePartition);
                    }
                } catch (Exception e) {
                    log.error("Error checking condition for SharePartition: {}", sharePartition, e);
                    sharePartition.releaseFetchLock();
                }
            }
        });
        return linkedHashMap;
    }

    private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap) {
        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap2 = new LinkedHashMap<>();
        linkedHashMap.forEach((topicIdPartition, partitionData) -> {
            if (this.sharePartitions.get(topicIdPartition).fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
                linkedHashMap2.put(topicIdPartition, partitionData);
            }
        });
        return linkedHashMap2.isEmpty() ? new LinkedHashMap<>() : readFromLog(linkedHashMap2);
    }

    private void maybeUpdateFetchOffsetMetadata(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap, LinkedHashMap<TopicIdPartition, LogReadResult> linkedHashMap2) {
        for (Map.Entry<TopicIdPartition, LogReadResult> entry : linkedHashMap2.entrySet()) {
            TopicIdPartition key = entry.getKey();
            SharePartition sharePartition = this.sharePartitions.get(key);
            LogReadResult value = entry.getValue();
            if (value.error().code() != Errors.NONE.code()) {
                log.debug("Replica manager read log result {} errored out for topic partition {}", value, key);
            } else {
                sharePartition.updateFetchOffsetMetadata(linkedHashMap.get(key).fetchOffset, value.info().fetchOffsetMetadata);
            }
        }
    }

    private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap) {
        long j = 0;
        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : linkedHashMap.entrySet()) {
            TopicIdPartition key = entry.getKey();
            FetchRequest.PartitionData value = entry.getValue();
            try {
                LogOffsetMetadata endOffsetMetadataForTopicPartition = endOffsetMetadataForTopicPartition(key);
                if (endOffsetMetadataForTopicPartition == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
                    continue;
                } else {
                    Optional<LogOffsetMetadata> fetchOffsetMetadata = this.sharePartitions.get(key).fetchOffsetMetadata(value.fetchOffset);
                    if (!fetchOffsetMetadata.isEmpty() && fetchOffsetMetadata.get() != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
                        LogOffsetMetadata logOffsetMetadata = fetchOffsetMetadata.get();
                        if (logOffsetMetadata.messageOffset > endOffsetMetadataForTopicPartition.messageOffset) {
                            log.debug("Satisfying delayed share fetch request for group {}, member {} since it is fetching later segments of topicIdPartition {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), key});
                            return true;
                        }
                        if (logOffsetMetadata.messageOffset >= endOffsetMetadataForTopicPartition.messageOffset) {
                            continue;
                        } else {
                            if (logOffsetMetadata.onOlderSegment(endOffsetMetadataForTopicPartition)) {
                                log.debug("Satisfying delayed share fetch request for group {}, member {} immediately since it is fetching older segments of topicIdPartition {}", new Object[]{this.shareFetch.groupId(), this.shareFetch.memberId(), key});
                                return true;
                            }
                            if (logOffsetMetadata.onSameSegment(endOffsetMetadataForTopicPartition)) {
                                j += Math.min(endOffsetMetadataForTopicPartition.positionDiff(logOffsetMetadata), value.maxBytes);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                this.shareFetch.addErroneous(key, e);
                this.exceptionHandler.accept(new SharePartitionKey(this.shareFetch.groupId(), key), e);
            }
        }
        return j >= ((long) this.shareFetch.fetchParams().minBytes);
    }

    private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
        LogOffsetSnapshot fetchOffsetSnapshot = ShareFetchUtils.partition(this.replicaManager, topicIdPartition.topicPartition()).fetchOffsetSnapshot(Optional.empty(), true);
        FetchIsolation fetchIsolation = this.shareFetch.fetchParams().isolation;
        return fetchIsolation == FetchIsolation.LOG_END ? fetchOffsetSnapshot.logEndOffset : fetchIsolation == FetchIsolation.HIGH_WATERMARK ? fetchOffsetSnapshot.highWatermark : fetchOffsetSnapshot.lastStableOffset;
    }

    private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap) {
        Set filterErroneousTopicPartitions = this.shareFetch.filterErroneousTopicPartitions(linkedHashMap.keySet());
        if (filterErroneousTopicPartitions.isEmpty()) {
            return new LinkedHashMap<>();
        }
        Seq<Tuple2<TopicIdPartition, LogReadResult>> readFromLog = this.replicaManager.readFromLog(this.shareFetch.fetchParams(), CollectionConverters.asScala((List) filterErroneousTopicPartitions.stream().map(topicIdPartition -> {
            return new Tuple2(topicIdPartition, (FetchRequest.PartitionData) linkedHashMap.get(topicIdPartition));
        }).collect(Collectors.toList())), QuotaFactory.UNBOUNDED_QUOTA, true);
        LinkedHashMap<TopicIdPartition, LogReadResult> linkedHashMap2 = new LinkedHashMap<>();
        readFromLog.foreach(tuple2 -> {
            linkedHashMap2.put((TopicIdPartition) tuple2._1(), (LogReadResult) tuple2._2());
            return BoxedUnit.UNIT;
        });
        log.trace("Data successfully retrieved by replica manager: {}", linkedHashMap2);
        return linkedHashMap2;
    }

    private boolean anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> linkedHashMap) {
        return linkedHashMap.values().stream().anyMatch(logReadResult -> {
            return logReadResult.error().code() != Errors.NONE.code();
        });
    }

    private void handleFetchException(ShareFetch shareFetch, Set<TopicIdPartition> set, Throwable th) {
        set.forEach(topicIdPartition -> {
            this.exceptionHandler.accept(new SharePartitionKey(shareFetch.groupId(), topicIdPartition), th);
        });
        shareFetch.maybeCompleteWithException(set, th);
    }

    LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap, LinkedHashMap<TopicIdPartition, LogReadResult> linkedHashMap2) {
        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> linkedHashMap3 = new LinkedHashMap<>();
        linkedHashMap.forEach((topicIdPartition, partitionData) -> {
            if (linkedHashMap2.containsKey(topicIdPartition)) {
                return;
            }
            linkedHashMap3.put(topicIdPartition, partitionData);
        });
        if (linkedHashMap3.isEmpty()) {
            return linkedHashMap2;
        }
        LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog = readFromLog(linkedHashMap3);
        readFromLog.putAll(linkedHashMap2);
        return readFromLog;
    }

    void releasePartitionLocks(Set<TopicIdPartition> set) {
        set.forEach(topicIdPartition -> {
            this.sharePartitions.get(topicIdPartition).releaseFetchLock();
        });
    }

    Lock lock() {
        return this.lock;
    }
}
