package kafka.server.share;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionAllData;
import org.apache.kafka.server.share.persister.PartitionErrorData;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.ReadShareGroupStateParameters;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/server/share/SharePartition.class */
public class SharePartition {
    private static final Logger log = LoggerFactory.getLogger(SharePartition.class);
    static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
    private final String groupId;
    private final TopicIdPartition topicIdPartition;
    private final int leaderEpoch;
    private final NavigableMap<Long, InFlightBatch> cachedState;
    private final ReadWriteLock lock;
    private final AtomicBoolean findNextFetchOffset;
    private final AtomicBoolean fetchLock;
    private final int maxInFlightMessages;
    private final int maxDeliveryCount;
    private final GroupConfigManager groupConfigManager;
    private final int defaultRecordLockDurationMs;
    private final Timer timer;
    private final Time time;
    private final Persister persister;
    private final SharePartitionManager.SharePartitionListener listener;
    private long startOffset;
    private long endOffset;
    private final OffsetMetadata fetchOffsetMetadata;
    private int stateEpoch;
    private SharePartitionState partitionState;
    private final ReplicaManager replicaManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kafka.server.share.SharePartition$1, reason: invalid class name */
    /* loaded from: input_file:kafka/server/share/SharePartition$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_COORDINATOR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_NOT_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_LOAD_IN_PROGRESS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.GROUP_ID_NOT_FOUND.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_TOPIC_OR_PARTITION.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.FENCED_STATE_EPOCH.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.FENCED_LEADER_EPOCH.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartition$AcquisitionLockTimerTask.class */
    public final class AcquisitionLockTimerTask extends TimerTask {
        private final long expirationMs;
        private final String memberId;
        private final long firstOffset;
        private final long lastOffset;

        AcquisitionLockTimerTask(long j, String str, long j2, long j3) {
            super(j);
            this.expirationMs = SharePartition.this.time.hiResClockMs() + j;
            this.memberId = str;
            this.firstOffset = j2;
            this.lastOffset = j3;
        }

        long expirationMs() {
            return this.expirationMs;
        }

        public void run() {
            SharePartition.this.releaseAcquisitionLockOnTimeout(this.memberId, this.firstOffset, this.lastOffset);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartition$InFlightBatch.class */
    public final class InFlightBatch {
        private final long firstOffset;
        private final long lastOffset;
        private InFlightState batchState;
        private NavigableMap<Long, InFlightState> offsetState;

        InFlightBatch(String str, long j, long j2, RecordState recordState, int i, AcquisitionLockTimerTask acquisitionLockTimerTask) {
            this.firstOffset = j;
            this.lastOffset = j2;
            this.batchState = new InFlightState(recordState, i, str, acquisitionLockTimerTask);
        }

        long firstOffset() {
            return this.firstOffset;
        }

        long lastOffset() {
            return this.lastOffset;
        }

        RecordState batchState() {
            return inFlightState().state;
        }

        String batchMemberId() {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch member id is not available as the offset state is maintained");
            }
            return this.batchState.memberId;
        }

        int batchDeliveryCount() {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch delivery count is not available as the offset state is maintained");
            }
            return this.batchState.deliveryCount;
        }

        AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
            return inFlightState().acquisitionLockTimeoutTask;
        }

        NavigableMap<Long, InFlightState> offsetState() {
            return this.offsetState;
        }

        private InFlightState inFlightState() {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch state is not available as the offset state is maintained");
            }
            return this.batchState;
        }

        private boolean batchHasOngoingStateTransition() {
            return inFlightState().hasOngoingStateTransition();
        }

        private void archiveBatch(String str) {
            inFlightState().archive(str);
        }

        private InFlightState tryUpdateBatchState(RecordState recordState, boolean z, int i, String str) {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
            }
            return this.batchState.tryUpdateState(recordState, z, i, str);
        }

        private InFlightState startBatchStateTransition(RecordState recordState, boolean z, int i, String str) {
            if (this.batchState == null) {
                throw new IllegalStateException("The batch state update is not available as the offset state is maintained");
            }
            return this.batchState.startStateTransition(recordState, z, i, str);
        }

        private void maybeInitializeOffsetStateUpdate() {
            if (this.offsetState == null) {
                this.offsetState = new ConcurrentSkipListMap();
                long j = this.firstOffset;
                while (true) {
                    long j2 = j;
                    if (j2 > this.lastOffset) {
                        break;
                    }
                    if (this.batchState.acquisitionLockTimeoutTask != null) {
                        AcquisitionLockTimerTask acquisitionLockTimerTask = SharePartition.this.acquisitionLockTimerTask(this.batchState.memberId, j2, j2, this.batchState.acquisitionLockTimeoutTask.expirationMs() - SharePartition.this.time.hiResClockMs());
                        this.offsetState.put(Long.valueOf(j2), new InFlightState(this.batchState.state, this.batchState.deliveryCount, this.batchState.memberId, acquisitionLockTimerTask));
                        SharePartition.this.timer.add(acquisitionLockTimerTask);
                    } else {
                        this.offsetState.put(Long.valueOf(j2), new InFlightState(this.batchState.state, this.batchState.deliveryCount, this.batchState.memberId));
                    }
                    j = j2 + 1;
                }
                if (this.batchState.acquisitionLockTimeoutTask != null) {
                    this.batchState.cancelAndClearAcquisitionLockTimeoutTask();
                }
                this.batchState = null;
            }
        }

        private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimerTask) {
            inFlightState().acquisitionLockTimeoutTask = acquisitionLockTimerTask;
        }

        public String toString() {
            long j = this.firstOffset;
            long j2 = this.lastOffset;
            String.valueOf(this.batchState);
            String.valueOf(this.offsetState == null ? "null" : this.offsetState);
            return "InFlightBatch(firstOffset=" + j + ", lastOffset=" + j + ", inFlightState=" + j2 + ", offsetState=" + j + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartition$InFlightState.class */
    public static final class InFlightState {
        private RecordState state;
        private int deliveryCount;
        private String memberId;
        private InFlightState rollbackState;
        private AcquisitionLockTimerTask acquisitionLockTimeoutTask;

        InFlightState(RecordState recordState, int i, String str) {
            this(recordState, i, str, null);
        }

        InFlightState(RecordState recordState, int i, String str, AcquisitionLockTimerTask acquisitionLockTimerTask) {
            this.state = recordState;
            this.deliveryCount = i;
            this.memberId = str;
            this.acquisitionLockTimeoutTask = acquisitionLockTimerTask;
        }

        RecordState state() {
            return this.state;
        }

        String memberId() {
            return this.memberId;
        }

        TimerTask acquisitionLockTimeoutTask() {
            return this.acquisitionLockTimeoutTask;
        }

        void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimerTask) throws IllegalArgumentException {
            if (this.acquisitionLockTimeoutTask != null) {
                throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
            }
            this.acquisitionLockTimeoutTask = acquisitionLockTimerTask;
        }

        void cancelAndClearAcquisitionLockTimeoutTask() {
            this.acquisitionLockTimeoutTask.cancel();
            this.acquisitionLockTimeoutTask = null;
        }

        private boolean hasOngoingStateTransition() {
            return (this.rollbackState == null || this.rollbackState.state == null) ? false : true;
        }

        private InFlightState tryUpdateState(RecordState recordState, boolean z, int i, String str) {
            try {
                if (recordState == RecordState.AVAILABLE && this.deliveryCount >= i) {
                    recordState = RecordState.ARCHIVED;
                }
                this.state = this.state.validateTransition(recordState);
                if (z && recordState != RecordState.ARCHIVED) {
                    this.deliveryCount++;
                }
                this.memberId = str;
                return this;
            } catch (IllegalStateException e) {
                SharePartition.log.error("Failed to update state of the records", e);
                return null;
            }
        }

        private void archive(String str) {
            this.state = RecordState.ARCHIVED;
            this.memberId = str;
        }

        private InFlightState startStateTransition(RecordState recordState, boolean z, int i, String str) {
            this.rollbackState = new InFlightState(this.state, this.deliveryCount, this.memberId, this.acquisitionLockTimeoutTask);
            return tryUpdateState(recordState, z, i, str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void completeStateTransition(boolean z) {
            if (z) {
                this.rollbackState = null;
                return;
            }
            this.state = this.rollbackState.state;
            this.deliveryCount = this.rollbackState.deliveryCount;
            this.memberId = this.rollbackState.memberId;
            this.rollbackState = null;
        }

        public int hashCode() {
            return Objects.hash(this.state, Integer.valueOf(this.deliveryCount), this.memberId);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            InFlightState inFlightState = (InFlightState) obj;
            return this.state == inFlightState.state && this.deliveryCount == inFlightState.deliveryCount && this.memberId.equals(inFlightState.memberId);
        }

        public String toString() {
            return "InFlightState(state=" + this.state.toString() + ", deliveryCount=" + this.deliveryCount + ", memberId=" + this.memberId + ")";
        }
    }

    /* loaded from: input_file:kafka/server/share/SharePartition$OffsetMetadata.class */
    static final class OffsetMetadata {
        private long offset = -1;
        private LogOffsetMetadata offsetMetadata;

        OffsetMetadata() {
        }

        long offset() {
            return this.offset;
        }

        LogOffsetMetadata offsetMetadata() {
            return this.offsetMetadata;
        }

        void updateOffsetMetadata(long j, LogOffsetMetadata logOffsetMetadata) {
            this.offset = j;
            this.offsetMetadata = logOffsetMetadata;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartition$RecordState.class */
    public enum RecordState {
        AVAILABLE((byte) 0),
        ACQUIRED((byte) 1),
        ACKNOWLEDGED((byte) 2),
        ARCHIVED((byte) 4);

        public final byte id;

        RecordState(byte b) {
            this.id = b;
        }

        public RecordState validateTransition(RecordState recordState) throws IllegalStateException {
            Objects.requireNonNull(recordState, "newState cannot be null");
            if (this == recordState) {
                throw new IllegalStateException("The state transition is invalid as the new state isthe same as the current state");
            }
            if (this == ACKNOWLEDGED || this == ARCHIVED) {
                throw new IllegalStateException("The state transition is invalid from the current state: " + String.valueOf(this));
            }
            if (this != AVAILABLE || recordState == ACQUIRED) {
                return recordState;
            }
            throw new IllegalStateException("The state can only be transitioned to ACQUIRED from AVAILABLE");
        }

        public static RecordState forId(byte b) {
            switch (b) {
                case 0:
                    return AVAILABLE;
                case 1:
                    return ACQUIRED;
                case 2:
                    return ACKNOWLEDGED;
                case 3:
                default:
                    throw new IllegalArgumentException("Unknown record state id: " + b);
                case 4:
                    return ARCHIVED;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartition$SharePartitionState.class */
    public enum SharePartitionState {
        EMPTY,
        INITIALIZING,
        ACTIVE,
        FAILED,
        FENCED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SharePartition(String str, TopicIdPartition topicIdPartition, int i, int i2, int i3, int i4, Timer timer, Time time, Persister persister, ReplicaManager replicaManager, GroupConfigManager groupConfigManager, SharePartitionManager.SharePartitionListener sharePartitionListener) {
        this(str, topicIdPartition, i, i2, i3, i4, timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, sharePartitionListener);
    }

    SharePartition(String str, TopicIdPartition topicIdPartition, int i, int i2, int i3, int i4, Timer timer, Time time, Persister persister, ReplicaManager replicaManager, GroupConfigManager groupConfigManager, SharePartitionState sharePartitionState, SharePartitionManager.SharePartitionListener sharePartitionListener) {
        this.groupId = str;
        this.topicIdPartition = topicIdPartition;
        this.leaderEpoch = i;
        this.maxInFlightMessages = i2;
        this.maxDeliveryCount = i3;
        this.cachedState = new ConcurrentSkipListMap();
        this.lock = new ReentrantReadWriteLock();
        this.findNextFetchOffset = new AtomicBoolean(false);
        this.fetchLock = new AtomicBoolean(false);
        this.defaultRecordLockDurationMs = i4;
        this.timer = timer;
        this.time = time;
        this.persister = persister;
        this.partitionState = sharePartitionState;
        this.replicaManager = replicaManager;
        this.groupConfigManager = groupConfigManager;
        this.fetchOffsetMetadata = new OffsetMetadata();
        this.listener = sharePartitionListener;
    }

    public CompletableFuture<Void> maybeInitialize() {
        log.debug("Maybe initialize share partition: {}-{}", this.groupId, this.topicIdPartition);
        try {
            if (initializedOrThrowException()) {
                return CompletableFuture.completedFuture(null);
            }
            try {
                if (!emptyToInitialState()) {
                    return CompletableFuture.completedFuture(null);
                }
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                this.persister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(this.groupId).setTopicsData(Collections.singletonList(new TopicData(this.topicIdPartition.topicId(), Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(this.topicIdPartition.partition(), this.leaderEpoch))))).build()).build()).whenComplete((readShareGroupStateResult, th) -> {
                    this.lock.writeLock().lock();
                    try {
                        if (th != null) {
                            log.error("Failed to initialize the share partition: {}-{}", new Object[]{this.groupId, this.topicIdPartition, th});
                            boolean z = th != null;
                            if (z) {
                                this.partitionState = SharePartitionState.FAILED;
                            }
                            this.lock.writeLock().unlock();
                            if (z) {
                                completableFuture.completeExceptionally(th);
                                return;
                            } else {
                                completableFuture.complete(null);
                                return;
                            }
                        }
                        if (readShareGroupStateResult == null || readShareGroupStateResult.topicsData() == null || readShareGroupStateResult.topicsData().size() != 1) {
                            log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.", new Object[]{this.groupId, this.topicIdPartition, readShareGroupStateResult});
                            IllegalStateException illegalStateException = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", this.groupId, this.topicIdPartition));
                            boolean z2 = illegalStateException != null;
                            if (z2) {
                                this.partitionState = SharePartitionState.FAILED;
                            }
                            this.lock.writeLock().unlock();
                            if (z2) {
                                completableFuture.completeExceptionally(illegalStateException);
                                return;
                            } else {
                                completableFuture.complete(null);
                                return;
                            }
                        }
                        TopicData topicData = (TopicData) readShareGroupStateResult.topicsData().get(0);
                        if (topicData.topicId() != this.topicIdPartition.topicId() || topicData.partitions().size() != 1) {
                            log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.", new Object[]{this.groupId, this.topicIdPartition, readShareGroupStateResult});
                            IllegalStateException illegalStateException2 = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", this.groupId, this.topicIdPartition));
                            boolean z3 = illegalStateException2 != null;
                            if (z3) {
                                this.partitionState = SharePartitionState.FAILED;
                            }
                            this.lock.writeLock().unlock();
                            if (z3) {
                                completableFuture.completeExceptionally(illegalStateException2);
                                return;
                            } else {
                                completableFuture.complete(null);
                                return;
                            }
                        }
                        PartitionAllData partitionAllData = (PartitionAllData) topicData.partitions().get(0);
                        if (partitionAllData.partition() != this.topicIdPartition.partition()) {
                            log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.", new Object[]{this.groupId, this.topicIdPartition, partitionAllData});
                            IllegalStateException illegalStateException3 = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", this.groupId, this.topicIdPartition));
                            boolean z4 = illegalStateException3 != null;
                            if (z4) {
                                this.partitionState = SharePartitionState.FAILED;
                            }
                            this.lock.writeLock().unlock();
                            if (z4) {
                                completableFuture.completeExceptionally(illegalStateException3);
                                return;
                            } else {
                                completableFuture.complete(null);
                                return;
                            }
                        }
                        if (partitionAllData.errorCode() != Errors.NONE.code()) {
                            KafkaException fetchPersisterError = fetchPersisterError(partitionAllData.errorCode(), partitionAllData.errorMessage());
                            log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.", new Object[]{this.groupId, this.topicIdPartition, partitionAllData});
                            boolean z5 = fetchPersisterError != null;
                            if (z5) {
                                this.partitionState = SharePartitionState.FAILED;
                            }
                            this.lock.writeLock().unlock();
                            if (z5) {
                                completableFuture.completeExceptionally(fetchPersisterError);
                                return;
                            } else {
                                completableFuture.complete(null);
                                return;
                            }
                        }
                        this.startOffset = startOffsetDuringInitialization(partitionAllData.startOffset());
                        this.stateEpoch = partitionAllData.stateEpoch();
                        for (PersisterStateBatch persisterStateBatch : partitionAllData.stateBatches()) {
                            if (persisterStateBatch.firstOffset() < this.startOffset) {
                                log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {} is less than the start offset: {}.", new Object[]{this.groupId, this.topicIdPartition, Long.valueOf(persisterStateBatch.firstOffset()), Long.valueOf(this.startOffset)});
                                IllegalStateException illegalStateException4 = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", this.groupId, this.topicIdPartition));
                                boolean z6 = illegalStateException4 != null;
                                if (z6) {
                                    this.partitionState = SharePartitionState.FAILED;
                                }
                                this.lock.writeLock().unlock();
                                if (z6) {
                                    completableFuture.completeExceptionally(illegalStateException4);
                                    return;
                                } else {
                                    completableFuture.complete(null);
                                    return;
                                }
                            }
                            this.cachedState.put(Long.valueOf(persisterStateBatch.firstOffset()), new InFlightBatch(EMPTY_MEMBER_ID, persisterStateBatch.firstOffset(), persisterStateBatch.lastOffset(), RecordState.forId(persisterStateBatch.deliveryState()), persisterStateBatch.deliveryCount(), null));
                        }
                        if (this.cachedState.isEmpty()) {
                            this.endOffset = this.startOffset;
                        } else {
                            this.findNextFetchOffset.set(true);
                            this.endOffset = this.cachedState.lastEntry().getValue().lastOffset();
                            maybeUpdateCachedStateAndOffsets();
                        }
                        this.partitionState = SharePartitionState.ACTIVE;
                        boolean z7 = 0 != 0;
                        if (z7) {
                            this.partitionState = SharePartitionState.FAILED;
                        }
                        this.lock.writeLock().unlock();
                        if (z7) {
                            completableFuture.completeExceptionally(null);
                        } else {
                            completableFuture.complete(null);
                        }
                    } catch (Exception e) {
                        boolean z8 = e != null;
                        if (z8) {
                            this.partitionState = SharePartitionState.FAILED;
                        }
                        this.lock.writeLock().unlock();
                        if (z8) {
                            completableFuture.completeExceptionally(e);
                        } else {
                            completableFuture.complete(null);
                        }
                    } catch (Throwable th) {
                        boolean z9 = 0 != 0;
                        if (z9) {
                            this.partitionState = SharePartitionState.FAILED;
                        }
                        this.lock.writeLock().unlock();
                        if (z9) {
                            completableFuture.completeExceptionally(null);
                        } else {
                            completableFuture.complete(null);
                        }
                        throw th;
                    }
                });
                return completableFuture;
            } catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        } catch (Exception e2) {
            return CompletableFuture.failedFuture(e2);
        }
    }

    public long nextFetchOffset() {
        this.lock.writeLock().lock();
        try {
            if (!this.findNextFetchOffset.get()) {
                if (this.cachedState.isEmpty() || this.startOffset > this.cachedState.lastEntry().getValue().lastOffset()) {
                    long j = this.endOffset;
                    this.lock.writeLock().unlock();
                    return j;
                }
                long j2 = this.endOffset + 1;
                this.lock.writeLock().unlock();
                return j2;
            }
            if (this.cachedState.isEmpty() || this.startOffset > this.cachedState.lastEntry().getValue().lastOffset()) {
                this.findNextFetchOffset.set(false);
                long j3 = this.endOffset;
                this.lock.writeLock().unlock();
                return j3;
            }
            long j4 = -1;
            Iterator<Map.Entry<Long, InFlightBatch>> it = this.cachedState.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry<Long, InFlightBatch> next = it.next();
                if (next.getValue().offsetState() != null) {
                    Iterator<Map.Entry<Long, InFlightState>> it2 = next.getValue().offsetState().entrySet().iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        Map.Entry<Long, InFlightState> next2 = it2.next();
                        if (next2.getValue().state == RecordState.AVAILABLE) {
                            j4 = next2.getKey().longValue();
                            break;
                        }
                    }
                    if (j4 != -1) {
                        break;
                    }
                } else if (next.getValue().batchState() == RecordState.AVAILABLE) {
                    j4 = next.getValue().firstOffset();
                    break;
                }
            }
            if (j4 == -1) {
                this.findNextFetchOffset.set(false);
                j4 = this.endOffset + 1;
            }
            return j4;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public ShareAcquiredRecords acquire(String str, int i, FetchPartitionData fetchPartitionData) {
        log.trace("Received acquire request for share partition: {}-{} memberId: {}", new Object[]{this.groupId, this.topicIdPartition, str});
        if (stateNotActive() || i <= 0) {
            return ShareAcquiredRecords.empty();
        }
        RecordBatch recordBatch = (RecordBatch) fetchPartitionData.records.lastBatch().orElse(null);
        if (recordBatch == null) {
            return ShareAcquiredRecords.empty();
        }
        RecordBatch recordBatch2 = (RecordBatch) fetchPartitionData.records.batches().iterator().next();
        this.lock.writeLock().lock();
        try {
            long baseOffset = recordBatch2.baseOffset();
            Map.Entry<Long, InFlightBatch> floorEntry = this.cachedState.floorEntry(Long.valueOf(baseOffset));
            if (floorEntry != null && floorEntry.getValue().lastOffset() >= baseOffset) {
                baseOffset = floorEntry.getKey().longValue();
            }
            NavigableMap<Long, InFlightBatch> subMap = this.cachedState.subMap(Long.valueOf(baseOffset), true, Long.valueOf(recordBatch.lastOffset()), true);
            if (subMap.isEmpty()) {
                log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", this.groupId, this.topicIdPartition);
                ShareAcquiredRecords fromAcquiredRecords = ShareAcquiredRecords.fromAcquiredRecords(acquireNewBatchRecords(str, fetchPartitionData.records.batches(), recordBatch2.baseOffset(), recordBatch.lastOffset(), i));
                this.lock.writeLock().unlock();
                return fromAcquiredRecords;
            }
            log.trace("Overlap exists with in-flight records. Acquire the records if available for the share partition: {}-{}", this.groupId, this.topicIdPartition);
            ArrayList arrayList = new ArrayList();
            int i2 = 0;
            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
                if (i2 >= i) {
                    break;
                }
                InFlightBatch value = entry.getValue();
                if (!checkForFullMatch(value, recordBatch2.baseOffset(), recordBatch.lastOffset()) || value.offsetState() != null) {
                    log.trace("Subset or offset tracked batch record found for share partition, batch: {} request offsets - first: {}, last: {} for the share partition: {}-{}", new Object[]{value, Long.valueOf(recordBatch2.baseOffset()), Long.valueOf(recordBatch.lastOffset()), this.groupId, this.topicIdPartition});
                    if (value.offsetState() == null) {
                        if (value.batchState() != RecordState.AVAILABLE || value.batchHasOngoingStateTransition()) {
                            log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {} skipping offset tracking for batch as well.", new Object[]{this.groupId, this.topicIdPartition, value});
                        } else {
                            value.maybeInitializeOffsetStateUpdate();
                        }
                    }
                    i2 += acquireSubsetBatchRecords(str, recordBatch2.baseOffset(), recordBatch.lastOffset(), value, arrayList);
                } else if (value.batchState() != RecordState.AVAILABLE || value.batchHasOngoingStateTransition()) {
                    log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}", new Object[]{this.groupId, this.topicIdPartition, value});
                } else if (value.tryUpdateBatchState(RecordState.ACQUIRED, true, this.maxDeliveryCount, str) == null) {
                    log.info("Unable to acquire records for the batch: {} in share partition: {}-{}", new Object[]{value, this.groupId, this.topicIdPartition});
                } else {
                    value.updateAcquisitionLockTimeout(scheduleAcquisitionLockTimeout(str, value.firstOffset(), value.lastOffset()));
                    arrayList.add(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(value.firstOffset()).setLastOffset(value.lastOffset()).setDeliveryCount((short) value.batchDeliveryCount()));
                    i2 += (int) ((value.lastOffset() - value.firstOffset()) + 1);
                }
            }
            if (i2 < i && subMap.lastEntry().getValue().lastOffset() < recordBatch.lastOffset()) {
                log.trace("There exists another batch which needs to be acquired as well");
                ShareFetchResponseData.AcquiredRecords acquireNewBatchRecords = acquireNewBatchRecords(str, fetchPartitionData.records.batches(), subMap.lastEntry().getValue().lastOffset() + 1, recordBatch.lastOffset(), i - i2);
                arrayList.add(acquireNewBatchRecords);
                i2 += (int) ((acquireNewBatchRecords.lastOffset() - acquireNewBatchRecords.firstOffset()) + 1);
            }
            ShareAcquiredRecords shareAcquiredRecords = new ShareAcquiredRecords(arrayList, i2);
            this.lock.writeLock().unlock();
            return shareAcquiredRecords;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public CompletableFuture<Void> acknowledge(String str, List<ShareAcknowledgementBatch> list) {
        log.trace("Acknowledgement batch request for share partition: {}-{}", this.groupId, this.topicIdPartition);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Throwable th = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.lock.writeLock().lock();
        try {
            Iterator<ShareAcknowledgementBatch> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ShareAcknowledgementBatch next = it.next();
                try {
                    Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch = fetchRecordStateMapForAcknowledgementBatch(next);
                    if (next.lastOffset() < this.startOffset) {
                        log.trace("All offsets in the acknowledgement batch {} are already archived: {}-{}", new Object[]{next, this.groupId, this.topicIdPartition});
                    } else {
                        try {
                            Optional<Throwable> acknowledgeBatchRecords = acknowledgeBatchRecords(str, next, fetchRecordStateMapForAcknowledgementBatch, fetchSubMapForAcknowledgementBatch(next), arrayList, arrayList2);
                            if (acknowledgeBatchRecords.isPresent()) {
                                th = acknowledgeBatchRecords.get();
                                break;
                            }
                        } catch (InvalidRecordStateException | InvalidRequestException e) {
                            th = e;
                        }
                    }
                } catch (IllegalArgumentException e2) {
                    log.debug("Invalid acknowledge type: {} for share partition: {}-{}", new Object[]{next.acknowledgeTypes(), this.groupId, this.topicIdPartition});
                    th = new InvalidRequestException("Invalid acknowledge type: " + String.valueOf(next.acknowledgeTypes()));
                }
            }
            rollbackOrProcessStateUpdates(completableFuture, th, arrayList, arrayList2);
            this.lock.writeLock().unlock();
            return completableFuture;
        } catch (Throwable th2) {
            this.lock.writeLock().unlock();
            throw th2;
        }
    }

    public CompletableFuture<Void> releaseAcquiredRecords(String str) {
        RecordState recordState;
        Iterator<Map.Entry<Long, InFlightBatch>> it;
        log.trace("Release acquired records request for share partition: {}-{} memberId: {}", new Object[]{this.groupId, this.topicIdPartition, str});
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Throwable th = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.lock.writeLock().lock();
        try {
            recordState = RecordState.AVAILABLE;
            it = this.cachedState.entrySet().iterator();
        } catch (Throwable th2) {
            this.lock.writeLock().unlock();
            throw th2;
        }
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            InFlightBatch value = it.next().getValue();
            if (value.offsetState() == null && value.batchState() == RecordState.ACQUIRED && value.batchMemberId().equals(str) && checkForStartOffsetWithinBatch(value.firstOffset(), value.lastOffset())) {
                value.maybeInitializeOffsetStateUpdate();
            }
            if (value.offsetState() != null) {
                Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch = releaseAcquiredRecordsForPerOffsetBatch(str, value, recordState, arrayList, arrayList2);
                if (releaseAcquiredRecordsForPerOffsetBatch.isPresent()) {
                    th = releaseAcquiredRecordsForPerOffsetBatch.get();
                    break;
                }
            } else {
                Optional<Throwable> releaseAcquiredRecordsForCompleteBatch = releaseAcquiredRecordsForCompleteBatch(str, value, recordState, arrayList, arrayList2);
                if (releaseAcquiredRecordsForCompleteBatch.isPresent()) {
                    th = releaseAcquiredRecordsForCompleteBatch.get();
                    break;
                }
            }
            this.lock.writeLock().unlock();
            throw th2;
        }
        rollbackOrProcessStateUpdates(completableFuture, th, arrayList, arrayList2);
        this.lock.writeLock().unlock();
        return completableFuture;
    }

    private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String str, InFlightBatch inFlightBatch, RecordState recordState, List<InFlightState> list, List<PersisterStateBatch> list2) {
        log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
        for (Map.Entry<Long, InFlightState> entry : inFlightBatch.offsetState.entrySet()) {
            if (!entry.getValue().memberId().equals(str) && !entry.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
                log.debug("Member {} is not the owner of offset: {} in batch: {} for the share partition: {}-{}. Skipping offset.", new Object[]{str, entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                return Optional.empty();
            }
            if (entry.getValue().state == RecordState.ACQUIRED) {
                InFlightState startStateTransition = entry.getValue().startStateTransition(entry.getKey().longValue() < this.startOffset ? RecordState.ARCHIVED : recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
                if (startStateTransition == null) {
                    log.debug("Unable to release records from acquired state for the offset: {} in batch: {} for the share partition: {}-{}", new Object[]{entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                    return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the offset"));
                }
                list.add(startStateTransition);
                list2.add(new PersisterStateBatch(entry.getKey().longValue(), entry.getKey().longValue(), startStateTransition.state.id, (short) startStateTransition.deliveryCount));
                if (startStateTransition.state != RecordState.ARCHIVED) {
                    this.findNextFetchOffset.set(true);
                }
            }
        }
        return Optional.empty();
    }

    private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String str, InFlightBatch inFlightBatch, RecordState recordState, List<InFlightState> list, List<PersisterStateBatch> list2) {
        if (!inFlightBatch.batchMemberId().equals(str) && !inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
            log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}. Skipping batch.", new Object[]{str, inFlightBatch, this.groupId, this.topicIdPartition});
            return Optional.empty();
        }
        log.trace("Releasing acquired records for complete batch {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
        if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
            InFlightState startBatchStateTransition = inFlightBatch.startBatchStateTransition(inFlightBatch.lastOffset() < this.startOffset ? RecordState.ARCHIVED : recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (startBatchStateTransition == null) {
                log.debug("Unable to release records from acquired state for the batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch"));
            }
            list.add(startBatchStateTransition);
            list2.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), startBatchStateTransition.state.id, (short) startBatchStateTransition.deliveryCount));
            if (startBatchStateTransition.state != RecordState.ARCHIVED) {
                this.findNextFetchOffset.set(true);
            }
        }
        return Optional.empty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateCacheAndOffsets(long j) {
        this.lock.writeLock().lock();
        try {
            if (j <= this.startOffset) {
                log.error("The log start offset: {} is not greater than the start offset: {} for the share partition: {}-{}", new Object[]{Long.valueOf(j), Long.valueOf(this.startOffset), this.groupId, this.topicIdPartition});
                this.lock.writeLock().unlock();
                return;
            }
            log.debug("Updating start offset for share partition: {}-{} from: {} to: {} since LSO has moved to: {}", new Object[]{this.groupId, this.topicIdPartition, Long.valueOf(this.startOffset), Long.valueOf(j), Long.valueOf(j)});
            if (this.cachedState.isEmpty()) {
                this.startOffset = j;
                this.endOffset = j;
                this.lock.writeLock().unlock();
            } else {
                if (archiveAvailableRecordsOnLsoMovement(j)) {
                    this.findNextFetchOffset.set(true);
                }
                this.startOffset = j;
                if (this.endOffset < this.startOffset) {
                    this.endOffset = this.startOffset;
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean archiveAvailableRecordsOnLsoMovement(long j) {
        this.lock.writeLock().lock();
        try {
            boolean z = false;
            boolean z2 = false;
            for (Map.Entry<Long, InFlightBatch> entry : this.cachedState.entrySet()) {
                if (entry.getKey().longValue() >= j) {
                    break;
                }
                InFlightBatch value = entry.getValue();
                if (checkForFullMatch(value, this.startOffset, j - 1) && value.offsetState() == null) {
                    z2 = z2 || archiveCompleteBatch(value);
                } else {
                    log.debug("Subset or offset tracked batch record found while trying to update offsets and cached state map due to LSO movement, batch: {}, offsets to update - first: {}, last: {} for the share partition: {}-{}", new Object[]{value, Long.valueOf(this.startOffset), Long.valueOf(j - 1), this.groupId, this.topicIdPartition});
                    if (value.offsetState() == null) {
                        if (value.batchState() == RecordState.AVAILABLE) {
                            value.maybeInitializeOffsetStateUpdate();
                        }
                    }
                    z = z || archivePerOffsetBatchRecords(value, this.startOffset, j - 1);
                }
            }
            return z || z2;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean archivePerOffsetBatchRecords(InFlightBatch inFlightBatch, long j, long j2) {
        this.lock.writeLock().lock();
        try {
            boolean z = false;
            log.trace("Archiving offset tracked batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
            for (Map.Entry<Long, InFlightState> entry : inFlightBatch.offsetState().entrySet()) {
                if (entry.getKey().longValue() >= j) {
                    if (entry.getKey().longValue() > j2) {
                        break;
                    }
                    if (entry.getValue().state == RecordState.AVAILABLE) {
                        entry.getValue().archive(EMPTY_MEMBER_ID);
                        z = true;
                    }
                }
            }
            return z;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean archiveCompleteBatch(InFlightBatch inFlightBatch) {
        this.lock.writeLock().lock();
        try {
            log.trace("Archiving complete batch: {} for the share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
            if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
                return false;
            }
            inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
            return true;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean canAcquireRecords() {
        if (nextFetchOffset() != endOffset() + 1) {
            return true;
        }
        this.lock.readLock().lock();
        try {
            return (this.cachedState.isEmpty() ? 0L : (this.endOffset - this.startOffset) + 1) < ((long) this.maxInFlightMessages);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean maybeAcquireFetchLock() {
        if (stateNotActive()) {
            return false;
        }
        return this.fetchLock.compareAndSet(false, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseFetchLock() {
        this.fetchLock.set(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markFenced() {
        this.lock.writeLock().lock();
        try {
            this.partitionState = SharePartitionState.FENCED;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SharePartitionManager.SharePartitionListener listener() {
        return this.listener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int leaderEpoch() {
        return this.leaderEpoch;
    }

    private boolean stateNotActive() {
        return partitionState() != SharePartitionState.ACTIVE;
    }

    private boolean emptyToInitialState() {
        this.lock.writeLock().lock();
        try {
            if (initializedOrThrowException()) {
                return false;
            }
            this.partitionState = SharePartitionState.INITIALIZING;
            return true;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean initializedOrThrowException() {
        switch (partitionState()) {
            case EMPTY:
                return false;
            case INITIALIZING:
                throw new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", this.groupId, this.topicIdPartition));
            case ACTIVE:
                return true;
            case FAILED:
                throw new IllegalStateException(String.format("Share partition failed to load %s-%s", this.groupId, this.topicIdPartition));
            case FENCED:
                throw new FencedStateEpochException(String.format("Share partition is fenced %s-%s", this.groupId, this.topicIdPartition));
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    private ShareFetchResponseData.AcquiredRecords acquireNewBatchRecords(String str, Iterable<? extends RecordBatch> iterable, long j, long j2, int i) {
        this.lock.writeLock().lock();
        try {
            long j3 = j;
            if (this.cachedState.isEmpty() && this.endOffset > j3) {
                j3 = this.endOffset;
            }
            long j4 = j2;
            if (i < (j4 - j3) + 1) {
                j4 = lastOffsetFromBatchWithRequestOffset(iterable, (j3 + i) - 1);
            }
            this.cachedState.put(Long.valueOf(j3), new InFlightBatch(str, j3, j4, RecordState.ACQUIRED, 1, scheduleAcquisitionLockTimeout(str, j3, j4)));
            if (this.cachedState.firstKey().longValue() == j3) {
                this.startOffset = j3;
            }
            this.endOffset = j4;
            ShareFetchResponseData.AcquiredRecords deliveryCount = new ShareFetchResponseData.AcquiredRecords().setFirstOffset(j3).setLastOffset(j4).setDeliveryCount((short) 1);
            this.lock.writeLock().unlock();
            return deliveryCount;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    private int acquireSubsetBatchRecords(String str, long j, long j2, InFlightBatch inFlightBatch, List<ShareFetchResponseData.AcquiredRecords> list) {
        this.lock.writeLock().lock();
        int i = 0;
        try {
            for (Map.Entry<Long, InFlightState> entry : inFlightBatch.offsetState.entrySet()) {
                if (entry.getKey().longValue() >= j) {
                    if (entry.getKey().longValue() > j2) {
                        break;
                    }
                    if (entry.getValue().state != RecordState.AVAILABLE || entry.getValue().hasOngoingStateTransition()) {
                        log.trace("The offset {} is not available in share partition: {}-{}, skipping: {}", new Object[]{entry.getKey(), this.groupId, this.topicIdPartition, inFlightBatch});
                    } else if (entry.getValue().tryUpdateState(RecordState.ACQUIRED, true, this.maxDeliveryCount, str) == null) {
                        log.trace("Unable to acquire records for the offset: {} in batch: {} for the share partition: {}-{}", new Object[]{entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                    } else {
                        entry.getValue().updateAcquisitionLockTimeoutTask(scheduleAcquisitionLockTimeout(str, entry.getKey().longValue(), entry.getKey().longValue()));
                        list.add(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(entry.getKey().longValue()).setLastOffset(entry.getKey().longValue()).setDeliveryCount((short) entry.getValue().deliveryCount));
                        i++;
                    }
                }
            }
            return i;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean checkForFullMatch(InFlightBatch inFlightBatch, long j, long j2) {
        return inFlightBatch.firstOffset() >= j && inFlightBatch.lastOffset() <= j2;
    }

    private boolean checkForStartOffsetWithinBatch(long j, long j2) {
        long startOffset = startOffset();
        return j < startOffset && j2 >= startOffset;
    }

    private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(ShareAcknowledgementBatch shareAcknowledgementBatch) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < shareAcknowledgementBatch.acknowledgeTypes().size(); i++) {
            hashMap.put(Long.valueOf(shareAcknowledgementBatch.firstOffset() + i), fetchRecordState(((Byte) shareAcknowledgementBatch.acknowledgeTypes().get(i)).byteValue()));
        }
        return hashMap;
    }

    private static RecordState fetchRecordState(byte b) {
        switch (b) {
            case 0:
            case 3:
                return RecordState.ARCHIVED;
            case 1:
                return RecordState.ACKNOWLEDGED;
            case 2:
                return RecordState.AVAILABLE;
            default:
                throw new IllegalArgumentException("Invalid acknowledge type: " + b);
        }
    }

    private NavigableMap<Long, InFlightBatch> fetchSubMapForAcknowledgementBatch(ShareAcknowledgementBatch shareAcknowledgementBatch) {
        this.lock.writeLock().lock();
        try {
            Map.Entry<Long, InFlightBatch> floorEntry = this.cachedState.floorEntry(Long.valueOf(shareAcknowledgementBatch.firstOffset()));
            if (floorEntry == null) {
                if (!checkForStartOffsetWithinBatch(shareAcknowledgementBatch.firstOffset(), shareAcknowledgementBatch.lastOffset())) {
                    log.debug("Batch record {} not found for share partition: {}-{}", new Object[]{shareAcknowledgementBatch, this.groupId, this.topicIdPartition});
                    throw new InvalidRecordStateException("Batch record not found. The request batch offsets are not found in the cache.");
                }
                floorEntry = this.cachedState.floorEntry(Long.valueOf(this.startOffset));
            }
            NavigableMap<Long, InFlightBatch> subMap = this.cachedState.subMap(floorEntry.getKey(), true, Long.valueOf(shareAcknowledgementBatch.lastOffset()), true);
            if (subMap.lastEntry().getValue().lastOffset < shareAcknowledgementBatch.firstOffset()) {
                log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", new Object[]{shareAcknowledgementBatch, this.groupId, this.topicIdPartition});
                throw new InvalidRequestException("Batch record not found. The first offset in request is past acquired records.");
            }
            if (shareAcknowledgementBatch.lastOffset() <= subMap.lastEntry().getValue().lastOffset) {
                return subMap;
            }
            log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", new Object[]{shareAcknowledgementBatch, this.groupId, this.topicIdPartition});
            throw new InvalidRequestException("Batch record not found. The last offset in request is past acquired records.");
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private Optional<Throwable> acknowledgeBatchRecords(String str, ShareAcknowledgementBatch shareAcknowledgementBatch, Map<Long, RecordState> map, NavigableMap<Long, InFlightBatch> navigableMap, List<InFlightState> list, List<PersisterStateBatch> list2) {
        Optional<Throwable> acknowledgePerOffsetBatchRecords;
        this.lock.writeLock().lock();
        try {
            Iterator<Map.Entry<Long, InFlightBatch>> it = navigableMap.entrySet().iterator();
            while (it.hasNext()) {
                InFlightBatch value = it.next().getValue();
                if (value.lastOffset() < this.startOffset) {
                    log.trace("All offsets in the inflight batch {} are already archived: {}-{}", new Object[]{value, this.groupId, this.topicIdPartition});
                } else {
                    if (value.offsetState() == null) {
                        Optional<Throwable> validateAcknowledgementBatchMemberId = validateAcknowledgementBatchMemberId(str, value);
                        if (validateAcknowledgementBatchMemberId.isPresent()) {
                            return validateAcknowledgementBatchMemberId;
                        }
                    }
                    boolean checkForFullMatch = checkForFullMatch(value, shareAcknowledgementBatch.firstOffset(), shareAcknowledgementBatch.lastOffset());
                    boolean z = shareAcknowledgementBatch.acknowledgeTypes().size() > 1;
                    boolean checkForStartOffsetWithinBatch = checkForStartOffsetWithinBatch(value.firstOffset(), value.lastOffset());
                    if (!checkForFullMatch || value.offsetState() != null || z || checkForStartOffsetWithinBatch) {
                        log.debug("Subset or offset tracked batch record found for acknowledgement, batch: {}, request offsets - first: {}, last: {}, client per offsetstate {} for the share partition: {}-{}", new Object[]{value, Long.valueOf(shareAcknowledgementBatch.firstOffset()), Long.valueOf(shareAcknowledgementBatch.lastOffset()), Boolean.valueOf(z), this.groupId, this.topicIdPartition});
                        if (value.offsetState() == null) {
                            if (value.batchState() != RecordState.ACQUIRED) {
                                log.debug("The batch is not in the acquired state: {} for share partition: {}-{}", new Object[]{value, this.groupId, this.topicIdPartition});
                                Optional<Throwable> of = Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The subset batch is not in the acquired state."));
                                this.lock.writeLock().unlock();
                                return of;
                            }
                            value.maybeInitializeOffsetStateUpdate();
                        }
                        acknowledgePerOffsetBatchRecords = acknowledgePerOffsetBatchRecords(str, shareAcknowledgementBatch, value, map, list, list2);
                    } else {
                        acknowledgePerOffsetBatchRecords = acknowledgeCompleteBatch(shareAcknowledgementBatch, value, map.get(Long.valueOf(shareAcknowledgementBatch.firstOffset())), list, list2);
                    }
                    if (acknowledgePerOffsetBatchRecords.isPresent()) {
                        Optional<Throwable> optional = acknowledgePerOffsetBatchRecords;
                        this.lock.writeLock().unlock();
                        return optional;
                    }
                }
            }
            this.lock.writeLock().unlock();
            return Optional.empty();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private Optional<Throwable> validateAcknowledgementBatchMemberId(String str, InFlightBatch inFlightBatch) {
        if (inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
            log.debug("The batch is not in the acquired state: {} for share partition: {}-{}. Empty member id for batch.", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
            return Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The batch is not in the acquired state."));
        }
        if (inFlightBatch.batchMemberId().equals(str)) {
            return Optional.empty();
        }
        log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}", new Object[]{str, inFlightBatch, this.groupId, this.topicIdPartition});
        return Optional.of(new InvalidRecordStateException("Member is not the owner of batch record"));
    }

    private Optional<Throwable> acknowledgePerOffsetBatchRecords(String str, ShareAcknowledgementBatch shareAcknowledgementBatch, InFlightBatch inFlightBatch, Map<Long, RecordState> map, List<InFlightState> list, List<PersisterStateBatch> list2) {
        this.lock.writeLock().lock();
        try {
            RecordState recordState = map.get(Long.valueOf(shareAcknowledgementBatch.firstOffset()));
            for (Map.Entry<Long, InFlightState> entry : inFlightBatch.offsetState.entrySet()) {
                if (entry.getKey().longValue() >= shareAcknowledgementBatch.firstOffset() && entry.getKey().longValue() >= this.startOffset) {
                    if (entry.getKey().longValue() > shareAcknowledgementBatch.lastOffset()) {
                        break;
                    }
                    if (entry.getValue().state != RecordState.ACQUIRED) {
                        log.debug("The offset is not acquired, offset: {} batch: {} for the share partition: {}-{}", new Object[]{entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                        Optional<Throwable> of = Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The offset is not acquired."));
                        this.lock.writeLock().unlock();
                        return of;
                    }
                    if (!entry.getValue().memberId.equals(str)) {
                        log.debug("Member {} is not the owner of offset: {} in batch: {} for the share partition: {}-{}", new Object[]{str, entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                        Optional<Throwable> of2 = Optional.of(new InvalidRecordStateException("Member is not the owner of offset"));
                        this.lock.writeLock().unlock();
                        return of2;
                    }
                    RecordState recordState2 = map.size() > 1 ? map.get(entry.getKey()) : recordState;
                    InFlightState startStateTransition = entry.getValue().startStateTransition(recordState2, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
                    if (startStateTransition == null) {
                        log.debug("Unable to acknowledge records for the offset: {} in batch: {} for the share partition: {}-{}", new Object[]{entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition});
                        Optional<Throwable> of3 = Optional.of(new InvalidRecordStateException("Unable to acknowledge records for the batch"));
                        this.lock.writeLock().unlock();
                        return of3;
                    }
                    list.add(startStateTransition);
                    list2.add(new PersisterStateBatch(entry.getKey().longValue(), entry.getKey().longValue(), startStateTransition.state.id, (short) startStateTransition.deliveryCount));
                    if (recordState2 == RecordState.AVAILABLE && startStateTransition.state != RecordState.ARCHIVED) {
                        this.findNextFetchOffset.set(true);
                    }
                }
            }
            return Optional.empty();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private Optional<Throwable> acknowledgeCompleteBatch(ShareAcknowledgementBatch shareAcknowledgementBatch, InFlightBatch inFlightBatch, RecordState recordState, List<InFlightState> list, List<PersisterStateBatch> list2) {
        this.lock.writeLock().lock();
        try {
            log.trace("Acknowledging complete batch record {} for the share partition: {}-{}", new Object[]{shareAcknowledgementBatch, this.groupId, this.topicIdPartition});
            if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
                log.debug("The batch is not in the acquired state: {} for share partition: {}-{}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition});
                Optional<Throwable> of = Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The batch is not in the acquired state."));
                this.lock.writeLock().unlock();
                return of;
            }
            InFlightState startBatchStateTransition = inFlightBatch.startBatchStateTransition(recordState, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
            if (startBatchStateTransition == null) {
                log.debug("Unable to acknowledge records for the batch: {} with state: {} for the share partition: {}-{}", new Object[]{inFlightBatch, recordState, this.groupId, this.topicIdPartition});
                Optional<Throwable> of2 = Optional.of(new InvalidRecordStateException("Unable to acknowledge records for the batch"));
                this.lock.writeLock().unlock();
                return of2;
            }
            list.add(startBatchStateTransition);
            list2.add(new PersisterStateBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset, startBatchStateTransition.state.id, (short) startBatchStateTransition.deliveryCount));
            if (recordState == RecordState.AVAILABLE && startBatchStateTransition.state != RecordState.ARCHIVED) {
                this.findNextFetchOffset.set(true);
            }
            return Optional.empty();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateFetchOffsetMetadata(long j, LogOffsetMetadata logOffsetMetadata) {
        this.lock.writeLock().lock();
        try {
            this.fetchOffsetMetadata.updateOffsetMetadata(j, logOffsetMetadata);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<LogOffsetMetadata> fetchOffsetMetadata(long j) {
        this.lock.readLock().lock();
        try {
            if (this.fetchOffsetMetadata.offsetMetadata() == null || this.fetchOffsetMetadata.offset() != j) {
                Optional<LogOffsetMetadata> empty = Optional.empty();
                this.lock.readLock().unlock();
                return empty;
            }
            Optional<LogOffsetMetadata> of = Optional.of(this.fetchOffsetMetadata.offsetMetadata());
            this.lock.readLock().unlock();
            return of;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    SharePartitionState partitionState() {
        this.lock.readLock().lock();
        try {
            return this.partitionState;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    void rollbackOrProcessStateUpdates(CompletableFuture<Void> completableFuture, Throwable th, List<InFlightState> list, List<PersisterStateBatch> list2) {
        this.lock.writeLock().lock();
        try {
            if (th != null) {
                log.debug("Request failed for updating state, rollback any changed state for the share partition: {}-{}", this.groupId, this.topicIdPartition);
                list.forEach(inFlightState -> {
                    inFlightState.completeStateTransition(false);
                });
                completableFuture.completeExceptionally(th);
                this.lock.writeLock().unlock();
                return;
            }
            if (list2.isEmpty() && list.isEmpty()) {
                completableFuture.complete(null);
                this.lock.writeLock().unlock();
            } else {
                this.lock.writeLock().unlock();
                writeShareGroupState(list2).whenComplete((r10, th2) -> {
                    this.lock.writeLock().lock();
                    try {
                        if (th2 != null) {
                            log.error("Failed to write state to persister for the share partition: {}-{}", new Object[]{this.groupId, this.topicIdPartition, th2});
                            list.forEach(inFlightState2 -> {
                                inFlightState2.completeStateTransition(false);
                            });
                            completableFuture.completeExceptionally(th2);
                            this.lock.writeLock().unlock();
                            return;
                        }
                        log.trace("State change request successful for share partition: {}-{}", this.groupId, this.topicIdPartition);
                        list.forEach(inFlightState3 -> {
                            inFlightState3.completeStateTransition(true);
                            inFlightState3.cancelAndClearAcquisitionLockTimeoutTask();
                        });
                        maybeUpdateCachedStateAndOffsets();
                        completableFuture.complete(null);
                        this.lock.writeLock().unlock();
                    } catch (Throwable th2) {
                        this.lock.writeLock().unlock();
                        throw th2;
                    }
                });
            }
        } catch (Throwable th3) {
            this.lock.writeLock().unlock();
            throw th3;
        }
    }

    private void maybeUpdateCachedStateAndOffsets() {
        long longValue;
        this.lock.writeLock().lock();
        try {
            if (canMoveStartOffset()) {
                long findLastOffsetAcknowledged = findLastOffsetAcknowledged();
                if (findLastOffsetAcknowledged == -1) {
                    this.lock.writeLock().unlock();
                    return;
                }
                long lastOffset = this.cachedState.lastEntry().getValue().lastOffset();
                if (findLastOffsetAcknowledged == lastOffset) {
                    this.startOffset = lastOffset + 1;
                    this.endOffset = lastOffset + 1;
                    this.cachedState.clear();
                    this.lock.writeLock().unlock();
                    return;
                }
                long longValue2 = this.cachedState.firstKey().longValue();
                Map.Entry<Long, InFlightBatch> floorEntry = this.cachedState.floorEntry(Long.valueOf(findLastOffsetAcknowledged));
                if (findLastOffsetAcknowledged == floorEntry.getValue().lastOffset()) {
                    this.startOffset = this.cachedState.higherKey(Long.valueOf(findLastOffsetAcknowledged)).longValue();
                    longValue = floorEntry.getKey().longValue();
                } else {
                    this.startOffset = findLastOffsetAcknowledged + 1;
                    longValue = floorEntry.getKey().equals(this.cachedState.firstKey()) ? -1L : this.cachedState.lowerKey(floorEntry.getKey()).longValue();
                }
                if (longValue != -1) {
                    Iterator<Long> it = this.cachedState.subMap(Long.valueOf(longValue2), true, Long.valueOf(longValue), true).keySet().iterator();
                    while (it.hasNext()) {
                        this.cachedState.remove(it.next());
                    }
                }
                this.lock.writeLock().unlock();
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private boolean canMoveStartOffset() {
        if (this.cachedState.isEmpty()) {
            return false;
        }
        Map.Entry<Long, InFlightBatch> floorEntry = this.cachedState.floorEntry(Long.valueOf(this.startOffset));
        if (floorEntry != null) {
            return isRecordStateAcknowledged(floorEntry.getValue().offsetState == null ? floorEntry.getValue().batchState() : ((InFlightState) floorEntry.getValue().offsetState().get(Long.valueOf(this.startOffset))).state());
        }
        log.error("The start offset: {} is not found in the cached state for share partition: {}-{}. Cannot move the start offset.", new Object[]{Long.valueOf(this.startOffset), this.groupId, this.topicIdPartition});
        return false;
    }

    private boolean isRecordStateAcknowledged(RecordState recordState) {
        return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED;
    }

    private long findLastOffsetAcknowledged() {
        this.lock.readLock().lock();
        long j = -1;
        try {
            Iterator<Map.Entry<Long, InFlightBatch>> it = this.cachedState.entrySet().iterator();
            while (it.hasNext()) {
                InFlightBatch value = it.next().getValue();
                if (value.offsetState() != null) {
                    for (Map.Entry<Long, InFlightState> entry : value.offsetState.entrySet()) {
                        if (!isRecordStateAcknowledged(entry.getValue().state())) {
                            long j2 = j;
                            this.lock.readLock().unlock();
                            return j2;
                        }
                        j = entry.getKey().longValue();
                    }
                } else {
                    if (!isRecordStateAcknowledged(value.batchState())) {
                        return j;
                    }
                    j = value.lastOffset();
                }
            }
            this.lock.readLock().unlock();
            return j;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private long lastOffsetFromBatchWithRequestOffset(Iterable<? extends RecordBatch> iterable, long j) {
        RecordBatch recordBatch = null;
        for (RecordBatch recordBatch2 : iterable) {
            if (j < recordBatch2.baseOffset()) {
                break;
            }
            recordBatch = recordBatch2;
        }
        return (recordBatch == null || j > recordBatch.lastOffset()) ? j : recordBatch.lastOffset();
    }

    CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> list) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.persister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(this.groupId).setTopicsData(Collections.singletonList(new TopicData(this.topicIdPartition.topicId(), Collections.singletonList(PartitionFactory.newPartitionStateBatchData(this.topicIdPartition.partition(), this.stateEpoch, this.startOffset, this.leaderEpoch, list))))).build()).build()).whenComplete((writeShareGroupStateResult, th) -> {
            if (th != null) {
                log.error("Failed to write the share group state for share partition: {}-{}", new Object[]{this.groupId, this.topicIdPartition, th});
                completableFuture.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", this.groupId, this.topicIdPartition), th));
                return;
            }
            if (writeShareGroupStateResult == null || writeShareGroupStateResult.topicsData() == null || writeShareGroupStateResult.topicsData().size() != 1) {
                log.error("Failed to write the share group state for share partition: {}-{}. Invalid state found: {}", new Object[]{this.groupId, this.topicIdPartition, writeShareGroupStateResult});
                completableFuture.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", this.groupId, this.topicIdPartition)));
                return;
            }
            TopicData topicData = (TopicData) writeShareGroupStateResult.topicsData().get(0);
            if (topicData.topicId() != this.topicIdPartition.topicId() || topicData.partitions().size() != 1 || ((PartitionErrorData) topicData.partitions().get(0)).partition() != this.topicIdPartition.partition()) {
                log.error("Failed to write the share group state for share partition: {}-{}. Invalid topic partition response: {}", new Object[]{this.groupId, this.topicIdPartition, writeShareGroupStateResult});
                completableFuture.completeExceptionally(new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", this.groupId, this.topicIdPartition)));
                return;
            }
            PartitionErrorData partitionErrorData = (PartitionErrorData) topicData.partitions().get(0);
            if (partitionErrorData.errorCode() == Errors.NONE.code()) {
                completableFuture.complete(null);
                return;
            }
            KafkaException fetchPersisterError = fetchPersisterError(partitionErrorData.errorCode(), partitionErrorData.errorMessage());
            log.error("Failed to write the share group state for share partition: {}-{} due to exception", new Object[]{this.groupId, this.topicIdPartition, fetchPersisterError});
            completableFuture.completeExceptionally(fetchPersisterError);
        });
        return completableFuture;
    }

    private KafkaException fetchPersisterError(short s, String str) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.forCode(s).ordinal()]) {
            case 1:
            case 2:
            case 3:
                return new CoordinatorNotAvailableException(str);
            case 4:
                return new GroupIdNotFoundException(str);
            case 5:
                return new UnknownTopicOrPartitionException(str);
            case 6:
                return new FencedStateEpochException(str);
            case 7:
                return new NotLeaderOrFollowerException(str);
            default:
                return new UnknownServerException(str);
        }
    }

    AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String str, long j, long j2) {
        return scheduleAcquisitionLockTimeout(str, j, j2, this.groupConfigManager.groupConfig(this.groupId).isPresent() ? ((GroupConfig) this.groupConfigManager.groupConfig(this.groupId).get()).shareRecordLockDurationMs() : this.defaultRecordLockDurationMs);
    }

    private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String str, long j, long j2, long j3) {
        AcquisitionLockTimerTask acquisitionLockTimerTask = acquisitionLockTimerTask(str, j, j2, j3);
        this.timer.add(acquisitionLockTimerTask);
        return acquisitionLockTimerTask;
    }

    private AcquisitionLockTimerTask acquisitionLockTimerTask(String str, long j, long j2, long j3) {
        return new AcquisitionLockTimerTask(j3, str, j, j2);
    }

    private void releaseAcquisitionLockOnTimeout(String str, long j, long j2) {
        this.lock.writeLock().lock();
        try {
            Map.Entry<Long, InFlightBatch> floorEntry = this.cachedState.floorEntry(Long.valueOf(j));
            if (floorEntry == null) {
                log.error("Base offset {} not found for share partition: {}-{}", new Object[]{Long.valueOf(j), this.groupId, this.topicIdPartition});
                this.lock.writeLock().unlock();
                return;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<Long, InFlightBatch>> it = this.cachedState.subMap(floorEntry.getKey(), true, Long.valueOf(j2), true).entrySet().iterator();
            while (it.hasNext()) {
                InFlightBatch value = it.next().getValue();
                if (value.offsetState() == null && value.batchState() == RecordState.ACQUIRED && checkForStartOffsetWithinBatch(value.firstOffset(), value.lastOffset())) {
                    value.maybeInitializeOffsetStateUpdate();
                }
                if (value.offsetState() == null) {
                    releaseAcquisitionLockOnTimeoutForCompleteBatch(value, arrayList, str);
                } else {
                    releaseAcquisitionLockOnTimeoutForPerOffsetBatch(value, arrayList, str, j, j2);
                }
            }
            if (!arrayList.isEmpty()) {
                writeShareGroupState(arrayList).whenComplete((r9, th) -> {
                    if (th != null) {
                        log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", new Object[]{this.groupId, this.topicIdPartition, str, th});
                    }
                    maybeUpdateCachedStateAndOffsets();
                });
            }
            if (arrayList.isEmpty()) {
                return;
            }
            this.replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(this.groupId, this.topicIdPartition.topicId(), this.topicIdPartition.partition()));
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch, List<PersisterStateBatch> list, String str) {
        if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
            log.debug("The batch is not in acquired state while release of acquisition lock on timeout, skipping, batch: {} for the share partition: {}-{} memberId: {}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition, str});
            return;
        }
        InFlightState tryUpdateBatchState = inFlightBatch.tryUpdateBatchState(inFlightBatch.lastOffset() < this.startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
        if (tryUpdateBatchState == null) {
            log.error("Unable to release acquisition lock on timeout for the batch: {} for the share partition: {}-{} memberId: {}", new Object[]{inFlightBatch, this.groupId, this.topicIdPartition, str});
            return;
        }
        list.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), tryUpdateBatchState.state.id, (short) tryUpdateBatchState.deliveryCount));
        tryUpdateBatchState.cancelAndClearAcquisitionLockTimeoutTask();
        if (tryUpdateBatchState.state != RecordState.ARCHIVED) {
            this.findNextFetchOffset.set(true);
        }
    }

    private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch, List<PersisterStateBatch> list, String str, long j, long j2) {
        for (Map.Entry<Long, InFlightState> entry : inFlightBatch.offsetState().entrySet()) {
            if (entry.getKey().longValue() >= j) {
                if (entry.getKey().longValue() > j2) {
                    return;
                }
                if (entry.getValue().state != RecordState.ACQUIRED) {
                    log.debug("The offset is not in acquired state while release of acquisition lock on timeout, skipping, offset: {} batch: {} for the share partition: {}-{} memberId: {}", new Object[]{entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition, str});
                } else {
                    InFlightState tryUpdateState = entry.getValue().tryUpdateState(entry.getKey().longValue() < this.startOffset ? RecordState.ARCHIVED : RecordState.AVAILABLE, false, this.maxDeliveryCount, EMPTY_MEMBER_ID);
                    if (tryUpdateState == null) {
                        log.error("Unable to release acquisition lock on timeout for the offset: {} in batch: {} for the share partition: {}-{} memberId: {}", new Object[]{entry.getKey(), inFlightBatch, this.groupId, this.topicIdPartition, str});
                    } else {
                        list.add(new PersisterStateBatch(entry.getKey().longValue(), entry.getKey().longValue(), tryUpdateState.state.id, (short) tryUpdateState.deliveryCount));
                        tryUpdateState.cancelAndClearAcquisitionLockTimeoutTask();
                        if (tryUpdateState.state != RecordState.ARCHIVED) {
                            this.findNextFetchOffset.set(true);
                        }
                    }
                }
            }
        }
    }

    private long startOffsetDuringInitialization(long j) throws Exception {
        if (j != -1) {
            return j;
        }
        ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset = this.groupConfigManager.groupConfig(this.groupId).isPresent() ? ((GroupConfig) this.groupConfigManager.groupConfig(this.groupId).get()).shareAutoOffsetReset() : GroupConfig.defaultShareAutoOffsetReset();
        return shareAutoOffsetReset.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST ? ShareFetchUtils.offsetForLatestTimestamp(this.topicIdPartition, this.replicaManager, this.leaderEpoch) : shareAutoOffsetReset.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST ? ShareFetchUtils.offsetForEarliestTimestamp(this.topicIdPartition, this.replicaManager, this.leaderEpoch) : ShareFetchUtils.offsetForTimestamp(this.topicIdPartition, this.replicaManager, shareAutoOffsetReset.timestamp().longValue(), this.leaderEpoch);
    }

    NavigableMap<Long, InFlightBatch> cachedState() {
        return new ConcurrentSkipListMap((SortedMap) this.cachedState);
    }

    boolean findNextFetchOffset() {
        return this.findNextFetchOffset.get();
    }

    void findNextFetchOffset(boolean z) {
        this.findNextFetchOffset.getAndSet(z);
    }

    long startOffset() {
        this.lock.readLock().lock();
        try {
            return this.startOffset;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    long endOffset() {
        this.lock.readLock().lock();
        try {
            return this.endOffset;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    int stateEpoch() {
        return this.stateEpoch;
    }

    Timer timer() {
        return this.timer;
    }
}
