/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import kafka.server.DelayedOperationKey;
import kafka.server.TierFetchOperationKey;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.MemoryTracker;
import kafka.tier.fetcher.OffsetIndexFetchRequest;
import kafka.tier.fetcher.ReclaimableMemoryRecords;
import kafka.tier.fetcher.TierAbortedTxnReader;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.TierFetcherMetrics;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.objectcache.PrefetchCache;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.fetcher.offsetcache.FetchOffsetMetadata;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.OffsetPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingFetch
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(PendingFetch.class);
    private final CancellationContext cancellationContext;
    private final TierObjectStore tierObjectStore;
    private final Optional<PrefetchCache> prefetchCacheOpt;
    private final Optional<TierFetcherMetrics> tierFetcherMetrics;
    private final ObjectMetadata objectMetadata;
    private final Consumer<DelayedOperationKey> fetchCompletionCallback;
    private final Consumer<Runnable> fetchCancellationCallback;
    private final long targetOffset;
    private final int maxBytes;
    private final int segmentSize;
    private final List<TopicPartition> ignoredTopicPartitions;
    private final Uuid requestId = Uuid.randomUuid();
    private final CompletableFuture<TierFetchResult> transferPromise;
    private final IsolationLevel isolationLevel;
    private final FetchOffsetCache offsetCache;
    private final FetchOffsetMetadata fetchOffsetMetadata;
    private final TierSegmentReader reader;
    private final String logPrefix;
    private final MemoryTracker memoryTracker;
    private final Time time;
    private final long creationTimeNanos;
    private final AtomicBoolean isComplete;

    public PendingFetch(CancellationContext cancellationContext, TierObjectStore tierObjectStore, FetchOffsetCache offsetCache, Optional<TierFetcherMetrics> tierFetcherMetrics, ObjectMetadata objectMetadata, Consumer<DelayedOperationKey> fetchCompletionCallback, long targetOffset, int maxBytes, int segmentSize, IsolationLevel isolationLevel, MemoryTracker memoryTracker, List<TopicPartition> ignoredTopicPartitions, Time time) {
        this(cancellationContext, tierObjectStore, Optional.empty(), offsetCache, tierFetcherMetrics, objectMetadata, fetchCompletionCallback, targetOffset, maxBytes, segmentSize, isolationLevel, memoryTracker, ignoredTopicPartitions, time, null);
    }

    public PendingFetch(CancellationContext cancellationContext, TierObjectStore tierObjectStore, Optional<PrefetchCache> prefetchCacheOpt, FetchOffsetCache offsetCache, Optional<TierFetcherMetrics> tierFetcherMetrics, ObjectMetadata objectMetadata, Consumer<DelayedOperationKey> fetchCompletionCallback, long targetOffset, int maxBytes, int segmentSize, IsolationLevel isolationLevel, MemoryTracker memoryTracker, List<TopicPartition> ignoredTopicPartitions, Time time, Consumer<Runnable> fetchCancellationCallback) {
        this.cancellationContext = cancellationContext;
        this.tierObjectStore = tierObjectStore;
        this.prefetchCacheOpt = prefetchCacheOpt;
        this.tierFetcherMetrics = tierFetcherMetrics;
        this.objectMetadata = objectMetadata;
        this.fetchCompletionCallback = fetchCompletionCallback;
        this.targetOffset = targetOffset;
        this.maxBytes = maxBytes;
        this.segmentSize = segmentSize;
        this.offsetCache = offsetCache;
        this.ignoredTopicPartitions = ignoredTopicPartitions;
        this.transferPromise = new CompletableFuture();
        this.isolationLevel = isolationLevel;
        this.logPrefix = "PendingFetch(requestId=" + this.requestId + ")";
        this.reader = new TierSegmentReader(this.logPrefix);
        this.time = time;
        this.creationTimeNanos = time.nanoseconds();
        this.fetchOffsetMetadata = targetOffset == objectMetadata.baseOffset() ? new FetchOffsetMetadata(0, OptionalInt.empty()) : offsetCache.get(objectMetadata.objectId(), targetOffset);
        this.memoryTracker = memoryTracker;
        this.fetchCancellationCallback = fetchCancellationCallback;
        this.isComplete = new AtomicBoolean(false);
    }

    public List<DelayedOperationKey> delayedOperationKeys() {
        return Collections.singletonList(new TierFetchOperationKey(this.objectMetadata.topicIdPartition().topicPartition(), this.requestId));
    }

    public boolean isComplete() {
        return this.transferPromise.isDone();
    }

    private OffsetPosition fetchOffsetPosition() throws Exception {
        if (this.fetchOffsetMetadata != null) {
            log.debug("{} using fetch position {}", (Object)this.logPrefix, (Object)this.fetchOffsetMetadata);
            return new OffsetPosition(this.targetOffset, this.fetchOffsetMetadata.bytePosition);
        }
        log.debug("{} fetching offset index", (Object)this.logPrefix);
        return OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset((CancellationContext)this.cancellationContext, (TierObjectStore)this.tierObjectStore, (ObjectMetadata)this.objectMetadata, (long)this.targetOffset).startOffsetPosition;
    }

    private OffsetIndexFetchRequest.TierFetchOffsetIndexResult fetchOffsetPositionForCache() throws Exception {
        if (this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            return new OffsetIndexFetchRequest.TierFetchOffsetIndexResult(new OffsetPosition(this.targetOffset, this.fetchOffsetMetadata.bytePosition), new OffsetPosition(this.targetOffset, this.fetchOffsetMetadata.bytePosition + this.fetchOffsetMetadata.recordBatchSize.getAsInt() + 17));
        }
        OffsetIndexFetchRequest.TierFetchOffsetIndexResult result = OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset(this.cancellationContext, this.tierObjectStore, this.objectMetadata, this.targetOffset);
        int startPosition = this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.bytePosition > result.startOffsetPosition.position ? this.fetchOffsetMetadata.bytePosition : result.startOffsetPosition.position;
        int endPosition = result.endOffsetPosition != null ? result.endOffsetPosition.position : this.segmentSize;
        return new OffsetIndexFetchRequest.TierFetchOffsetIndexResult(new OffsetPosition(this.targetOffset, startPosition), new OffsetPosition(this.targetOffset, endPosition));
    }

    private OffsetIndexFetchRequest.TierFetchOffsetIndexResult fetchOffsetPositionRange() throws Exception {
        if (this.prefetchCacheOpt.isPresent()) {
            return this.fetchOffsetPositionForCache();
        }
        return new OffsetIndexFetchRequest.TierFetchOffsetIndexResult(this.fetchOffsetPosition(), null);
    }

    private Integer getEndRange(OffsetPosition offsetPosition) {
        if (this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            int length = Math.max(this.fetchOffsetMetadata.recordBatchSize.getAsInt() + 17, this.maxBytes);
            return offsetPosition.position + length;
        }
        return null;
    }

    private TierObjectStoreResponse fetchSegment(OffsetPosition offsetPosition) throws IOException {
        Integer endRange = this.getEndRange(offsetPosition);
        if (endRange != null) {
            log.debug("{} fetching segment startPosition: {}, endPosition: {}", new Object[]{this.logPrefix, offsetPosition, endRange});
            return this.tierObjectStore.getObjectStoreFragment(this.objectMetadata, FragmentType.SEGMENT, Long.valueOf(offsetPosition.position), (long)endRange);
        }
        log.debug("{} fetching segment startPosition: {}", (Object)this.logPrefix, (Object)offsetPosition);
        return this.tierObjectStore.getObjectStoreFragment(this.objectMetadata, FragmentType.SEGMENT, Long.valueOf(offsetPosition.position));
    }

    private InputStream fetchSegmentStreamWithCache(OffsetIndexFetchRequest.TierFetchOffsetIndexResult offsetPositionRange) throws Throwable {
        int length = Math.max(offsetPositionRange.endOffsetPosition.position - offsetPositionRange.startOffsetPosition.position, this.maxBytes);
        int endRange = offsetPositionRange.startOffsetPosition.position + length;
        try {
            log.debug("{} fetching segment from cache, startPosition: {}, endPosition: {}", new Object[]{this.logPrefix, offsetPositionRange.startOffsetPosition, endRange});
            return (InputStream)((Object)this.prefetchCacheOpt.get().get(this.objectMetadata, offsetPositionRange.startOffsetPosition.position, endRange, this.segmentSize).get());
        }
        catch (Exception e) {
            log.error("{} failed to fetch segment from cache startPosition: {}, endPosition: {}", new Object[]{this.logPrefix, offsetPositionRange.startOffsetPosition, endRange, e});
            throw e.getCause() != null ? e.getCause() : e;
        }
    }

    private InputStream fetchSegmentStream(OffsetIndexFetchRequest.TierFetchOffsetIndexResult offsetPositionRange) throws Throwable {
        if (this.prefetchCacheOpt.isPresent()) {
            return this.fetchSegmentStreamWithCache(offsetPositionRange);
        }
        return this.fetchSegment(offsetPositionRange.startOffsetPosition).getInputStream();
    }

    private List<AbortedTxn> fetchAbortedTxns(ReclaimableMemoryRecords records) throws Exception {
        Long startOffset = null;
        Long lastOffset = null;
        for (RecordBatch recordBatch : records.batches()) {
            if (startOffset == null) {
                startOffset = recordBatch.baseOffset();
            }
            lastOffset = recordBatch.lastOffset();
        }
        if (startOffset == null) {
            return Collections.emptyList();
        }
        Throwable throwable = null;
        try (TierObjectStoreResponse abortedTransactionsResponse = this.tierObjectStore.getObjectStoreFragment(this.objectMetadata, FragmentType.TRANSACTION_INDEX);){
            List<AbortedTxn> list = TierAbortedTxnReader.readInto(this.cancellationContext, abortedTransactionsResponse.getInputStream(), startOffset, lastOffset);
            return list;
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
    }

    private Optional<MemoryTracker.MemoryLease> waitOnMemoryLease() {
        if (this.memoryTracker.isDisabled()) {
            return Optional.empty();
        }
        log.debug("{} acquiring memory lease", (Object)this.logPrefix);
        if (this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            int amount = this.fetchOffsetMetadata.recordBatchSize.getAsInt();
            return Optional.of(this.memoryTracker.newLease(this.cancellationContext, amount));
        }
        return Optional.of(this.memoryTracker.newLease(this.cancellationContext, 17L));
    }

    @Override
    public void run() {
        block21: {
            long leaseTimeTakenMs = -1L;
            long fetchTimeTakenMs = -1L;
            log.debug("Starting tiered fetch. requestId={}, objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={}.", new Object[]{this.requestId, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel});
            long queuedTimeMs = this.time.hiResClockMs() - TimeUnit.NANOSECONDS.toMillis(this.creationTimeNanos);
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.queuedTimeMs().record((double)queuedTimeMs));
            try {
                if (!this.cancellationContext.isCancelled()) {
                    OffsetIndexFetchRequest.TierFetchOffsetIndexResult offsetPositionRange = this.fetchOffsetPositionRange();
                    long leaseStartTimeMs = this.time.hiResClockMs();
                    Optional<MemoryTracker.MemoryLease> lease = this.waitOnMemoryLease();
                    long currentTimeMs = this.time.hiResClockMs();
                    leaseTimeTakenMs = currentTimeMs - leaseStartTimeMs;
                    long fetchStartTimeMs = currentTimeMs;
                    try (InputStream inputStream = this.fetchSegmentStream(offsetPositionRange);){
                        TierSegmentReader.RecordsAndNextBatchMetadata recordsAndNextBatchMetadata = this.reader.readRecords(this.cancellationContext, lease, inputStream, this.maxBytes, this.targetOffset, offsetPositionRange.startOffsetPosition.position, this.segmentSize);
                        ReclaimableMemoryRecords records = recordsAndNextBatchMetadata.records;
                        this.updateOffsetCache(recordsAndNextBatchMetadata.nextOffsetAndBatchMetadata);
                        if (this.objectMetadata.hasAbortedTxns() && this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                            List<AbortedTxn> abortedTxns = this.fetchAbortedTxns(records);
                            fetchTimeTakenMs = this.time.hiResClockMs() - fetchStartTimeMs;
                            this.completeFetch(records, abortedTxns, null, false, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs);
                        } else {
                            fetchTimeTakenMs = this.time.hiResClockMs() - fetchStartTimeMs;
                            this.completeFetch(records, Collections.emptyList(), null, false, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs);
                        }
                        break block21;
                    }
                    catch (Throwable t) {
                        lease.ifPresent(MemoryTracker.MemoryLease::release);
                        throw t;
                    }
                }
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), null, false, -1L, -1L, queuedTimeMs);
            }
            catch (EOFException e) {
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), e, false, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs);
            }
            catch (IOException | CancellationException | TierObjectStoreRetriableException e) {
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), (Throwable)e, true, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs);
            }
            catch (Throwable t) {
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), t, false, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs);
            }
        }
    }

    private void updateOffsetCache(TierSegmentReader.NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata) {
        if (nextOffsetAndBatchMetadata != null) {
            long nextOffset = nextOffsetAndBatchMetadata.nextOffset;
            FetchOffsetMetadata nextBatchMetadata = nextOffsetAndBatchMetadata.nextBatchMetadata;
            if (nextBatchMetadata != null) {
                log.debug("{} updating offset cache. metadata: {}", (Object)this.logPrefix, (Object)nextOffsetAndBatchMetadata);
                this.offsetCache.put(this.objectMetadata.objectId(), nextOffset, nextBatchMetadata);
            }
        }
    }

    public Map<TopicPartition, TierFetchResult> finish() {
        this.cancel();
        HashMap<TopicPartition, TierFetchResult> resultMap = new HashMap<TopicPartition, TierFetchResult>();
        try {
            TierFetchResult tierFetchResult = this.transferPromise.get();
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.bytesFetched().record((double)tierFetchResult.records.sizeInBytes()));
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), tierFetchResult);
        }
        catch (InterruptedException e) {
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), TierFetchResult.emptyFetchResult());
        }
        catch (ExecutionException e) {
            log.warn("Failed exceptionally while finishing pending fetch request for partition {} from tiered storage. This exception is unexpected as the promise in not completed exceptionally ", (Object)this.objectMetadata.topicIdPartition().topicPartition(), (Object)e);
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchException().record());
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), new TierFetchResult(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), e.getCause(), 0L));
        }
        for (TopicPartition ignoredTopicPartition : this.ignoredTopicPartitions) {
            resultMap.put(ignoredTopicPartition, TierFetchResult.emptyFetchResult());
        }
        return resultMap;
    }

    public void markFetchExpired() {
        this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchCancelled().record());
    }

    public void cancel() {
        this.cancellationContext.cancel();
        if (this.complete(new TierFetchResult(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), null, 0L)) && this.fetchCancellationCallback != null) {
            this.fetchCancellationCallback.accept(this);
        }
        this.memoryTracker.wakeup();
    }

    private void completeFetch(ReclaimableMemoryRecords records, List<AbortedTxn> abortedTxns, Throwable throwable, boolean retriable, long leaseTimeTakenMs, long fetchTimeTakenMs, long queuedTimeMs) {
        this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchTotalTimeMs().record((double)fetchTimeTakenMs));
        if (throwable != null) {
            log.error("{} tier fetch objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={} completed with exception (leaseTimeTaken:{}ms) (fetchTimeTaken:{}ms) (queuedTimeTaken:{}ms)", new Object[]{this.logPrefix, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs, throwable});
            this.tierFetcherMetrics.ifPresent(metrics -> {
                metrics.fetchException().record();
                if (!retriable) {
                    metrics.fetchNonRetriableException().record();
                }
            });
        } else {
            log.debug("{} tier fetch objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={} completed (leaseTimeTaken:{}ms) (fetchTimeTaken:{}ms) (queuedTimeTaken:{}ms)", new Object[]{this.logPrefix, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel, leaseTimeTakenMs, fetchTimeTakenMs, queuedTimeMs});
        }
        TierFetchResult tierFetchResult = new TierFetchResult(records, abortedTxns, retriable ? null : throwable, this.time.nanoseconds() - this.creationTimeNanos);
        this.complete(tierFetchResult);
    }

    private boolean complete(TierFetchResult tierFetchResult) {
        if (this.isComplete.compareAndSet(false, true)) {
            this.transferPromise.complete(tierFetchResult);
            if (this.fetchCompletionCallback != null) {
                for (DelayedOperationKey key : this.delayedOperationKeys()) {
                    this.fetchCompletionCallback.accept(key);
                }
            }
            return true;
        }
        return false;
    }
}

