/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import kafka.log.AbortedTxn;
import kafka.log.OffsetPosition;
import kafka.server.DelayedOperationKey;
import kafka.server.TierFetchOperationKey;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.MemoryTracker;
import kafka.tier.fetcher.OffsetIndexFetchRequest;
import kafka.tier.fetcher.ReclaimableMemoryRecords;
import kafka.tier.fetcher.TierAbortedTxnReader;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.TierFetcherMetrics;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.fetcher.offsetcache.FetchOffsetMetadata;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PendingFetch
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(PendingFetch.class);
    private final CancellationContext cancellationContext;
    private final TierObjectStore tierObjectStore;
    private final Optional<TierFetcherMetrics> tierFetcherMetrics;
    private final TierObjectStore.ObjectMetadata objectMetadata;
    private final Consumer<DelayedOperationKey> fetchCompletionCallback;
    private final long targetOffset;
    private final int maxBytes;
    private final int segmentSize;
    private final List<TopicPartition> ignoredTopicPartitions;
    private final Uuid requestId = Uuid.randomUuid();
    private final CompletableFuture<TierFetchResult> transferPromise;
    private final IsolationLevel isolationLevel;
    private final FetchOffsetCache cache;
    private final FetchOffsetMetadata fetchOffsetMetadata;
    private final TierSegmentReader reader;
    private final String logPrefix;
    private final MemoryTracker memoryTracker;
    private final Time time;

    PendingFetch(CancellationContext cancellationContext, TierObjectStore tierObjectStore, FetchOffsetCache cache, Optional<TierFetcherMetrics> tierFetcherMetrics, TierObjectStore.ObjectMetadata objectMetadata, Consumer<DelayedOperationKey> fetchCompletionCallback, long targetOffset, int maxBytes, int segmentSize, IsolationLevel isolationLevel, MemoryTracker memoryTracker, List<TopicPartition> ignoredTopicPartitions, Time time) {
        this.cancellationContext = cancellationContext;
        this.tierObjectStore = tierObjectStore;
        this.tierFetcherMetrics = tierFetcherMetrics;
        this.objectMetadata = objectMetadata;
        this.fetchCompletionCallback = fetchCompletionCallback;
        this.targetOffset = targetOffset;
        this.maxBytes = maxBytes;
        this.segmentSize = segmentSize;
        this.cache = cache;
        this.ignoredTopicPartitions = ignoredTopicPartitions;
        this.transferPromise = new CompletableFuture();
        this.isolationLevel = isolationLevel;
        this.logPrefix = "PendingFetch(requestId=" + this.requestId + ")";
        this.reader = new TierSegmentReader(this.logPrefix);
        this.time = time;
        this.fetchOffsetMetadata = targetOffset == objectMetadata.baseOffset() ? new FetchOffsetMetadata(0, OptionalInt.empty()) : cache.get(objectMetadata.objectId(), targetOffset);
        this.memoryTracker = memoryTracker;
    }

    public List<DelayedOperationKey> delayedOperationKeys() {
        return Collections.singletonList(new TierFetchOperationKey(this.objectMetadata.topicIdPartition().topicPartition(), this.requestId));
    }

    public boolean isComplete() {
        return this.transferPromise.isDone();
    }

    private OffsetPosition fetchOffsetPosition() throws Exception {
        if (this.fetchOffsetMetadata != null) {
            log.debug("{} using fetch position {}", (Object)this.logPrefix, (Object)this.fetchOffsetMetadata);
            return new OffsetPosition(this.targetOffset, this.fetchOffsetMetadata.bytePosition);
        }
        log.debug("{} fetching offset index", (Object)this.logPrefix);
        return OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset(this.cancellationContext, this.tierObjectStore, this.objectMetadata, this.targetOffset);
    }

    private Integer getEndRange(OffsetPosition offsetPosition) {
        if (this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            int length = Math.max(this.fetchOffsetMetadata.recordBatchSize.getAsInt() + 17, this.maxBytes);
            return offsetPosition.position() + length;
        }
        return null;
    }

    private TierObjectStoreResponse fetchSegment(OffsetPosition offsetPosition) throws IOException {
        Integer endRange = this.getEndRange(offsetPosition);
        if (endRange != null) {
            log.debug("{} fetching segment startPosition: {}, endPosition: {}", new Object[]{this.logPrefix, offsetPosition, endRange});
            return this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.SEGMENT, offsetPosition.position(), endRange);
        }
        log.debug("{} fetching segment startPosition: {}", (Object)this.logPrefix, (Object)offsetPosition);
        return this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.SEGMENT, offsetPosition.position());
    }

    private List<AbortedTxn> fetchAbortedTxns(ReclaimableMemoryRecords records) throws Exception {
        Long startOffset = null;
        long lastOffset = 0L;
        for (RecordBatch recordBatch : records.batches()) {
            if (startOffset == null) {
                startOffset = recordBatch.baseOffset();
            }
            lastOffset = recordBatch.lastOffset();
        }
        if (startOffset == null || lastOffset == 0L) {
            return Collections.emptyList();
        }
        Throwable throwable = null;
        try (TierObjectStoreResponse abortedTransactionsResponse = this.tierObjectStore.getObject(this.objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX);){
            List<AbortedTxn> list = TierAbortedTxnReader.readInto(this.cancellationContext, abortedTransactionsResponse.getInputStream(), startOffset, lastOffset);
            return list;
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
    }

    private Optional<MemoryTracker.MemoryLease> waitOnMemoryLease() {
        if (this.memoryTracker.isDisabled()) {
            return Optional.empty();
        }
        log.debug("{} acquiring memory lease", (Object)this.logPrefix);
        if (this.fetchOffsetMetadata != null && this.fetchOffsetMetadata.recordBatchSize.isPresent()) {
            int amount = this.fetchOffsetMetadata.recordBatchSize.getAsInt();
            return Optional.of(this.memoryTracker.newLease(this.cancellationContext, amount));
        }
        return Optional.of(this.memoryTracker.newLease(this.cancellationContext, 17L));
    }

    @Override
    public void run() {
        block20: {
            long leaseTimeTakenMs = -1L;
            long fetchTimeTakenMs = -1L;
            log.debug("Starting tiered fetch. requestId={}, objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={}.", new Object[]{this.requestId, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel});
            try {
                if (!this.cancellationContext.isCancelled()) {
                    OffsetPosition offsetPosition = this.fetchOffsetPosition();
                    long leaseStartTimeMs = this.time.hiResClockMs();
                    Optional<MemoryTracker.MemoryLease> lease = this.waitOnMemoryLease();
                    long currentTimeMs = this.time.hiResClockMs();
                    leaseTimeTakenMs = currentTimeMs - leaseStartTimeMs;
                    long fetchStartTimeMs = currentTimeMs;
                    try (TierObjectStoreResponse response = this.fetchSegment(offsetPosition);){
                        TierSegmentReader.RecordsAndNextBatchMetadata recordsAndNextBatchMetadata = this.reader.readRecords(this.cancellationContext, lease, response.getInputStream(), this.maxBytes, this.targetOffset, offsetPosition.position(), this.segmentSize);
                        ReclaimableMemoryRecords records = recordsAndNextBatchMetadata.records;
                        this.updateCache(recordsAndNextBatchMetadata.nextOffsetAndBatchMetadata);
                        if (this.objectMetadata.hasAbortedTxns() && this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                            List<AbortedTxn> abortedTxns = this.fetchAbortedTxns(records);
                            fetchTimeTakenMs = this.time.hiResClockMs() - fetchStartTimeMs;
                            this.completeFetch(records, abortedTxns, null, leaseTimeTakenMs, fetchTimeTakenMs);
                        } else {
                            fetchTimeTakenMs = this.time.hiResClockMs() - fetchStartTimeMs;
                            this.completeFetch(records, Collections.emptyList(), null, leaseTimeTakenMs, fetchTimeTakenMs);
                        }
                        break block20;
                    }
                    catch (Throwable t) {
                        lease.ifPresent(MemoryTracker.MemoryLease::release);
                        throw t;
                    }
                }
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), null, -1L, -1L);
            }
            catch (CancellationException e) {
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), null, leaseTimeTakenMs, fetchTimeTakenMs);
            }
            catch (Throwable t) {
                this.completeFetch(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), t, leaseTimeTakenMs, fetchTimeTakenMs);
            }
        }
    }

    private void updateCache(TierSegmentReader.NextOffsetAndBatchMetadata nextOffsetAndBatchMetadata) {
        if (nextOffsetAndBatchMetadata != null) {
            long nextOffset = nextOffsetAndBatchMetadata.nextOffset;
            FetchOffsetMetadata nextBatchMetadata = nextOffsetAndBatchMetadata.nextBatchMetadata;
            if (nextBatchMetadata != null) {
                log.debug("{} updating cache. metadata: {}", (Object)this.logPrefix, (Object)nextOffsetAndBatchMetadata);
                this.cache.put(this.objectMetadata.objectId(), nextOffset, nextBatchMetadata);
            }
        }
    }

    public Map<TopicPartition, TierFetchResult> finish() {
        this.cancel();
        HashMap<TopicPartition, TierFetchResult> resultMap = new HashMap<TopicPartition, TierFetchResult>();
        try {
            TierFetchResult tierFetchResult = this.transferPromise.get();
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.bytesFetched().record((double)tierFetchResult.records.sizeInBytes()));
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), tierFetchResult);
        }
        catch (InterruptedException e) {
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), TierFetchResult.emptyFetchResult());
        }
        catch (ExecutionException e) {
            log.warn("Failed exceptionally while finishing pending fetch request for partition {} from tiered storage. This exception is unexpected as the promise in not completed exceptionally ", (Object)this.objectMetadata.topicIdPartition().topicPartition(), (Object)e);
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchException().record());
            resultMap.put(this.objectMetadata.topicIdPartition().topicPartition(), new TierFetchResult(ReclaimableMemoryRecords.EMPTY, Collections.emptyList(), e.getCause()));
        }
        for (TopicPartition ignoredTopicPartition : this.ignoredTopicPartitions) {
            resultMap.put(ignoredTopicPartition, TierFetchResult.emptyFetchResult());
        }
        return resultMap;
    }

    public void markFetchExpired() {
        this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchCancelled().record());
    }

    public void cancel() {
        this.cancellationContext.cancel();
        this.memoryTracker.wakeup();
    }

    private void completeFetch(ReclaimableMemoryRecords records, List<AbortedTxn> abortedTxns, Throwable throwable, long leaseTimeTakenMs, long fetchTimeTakenMs) {
        this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchTotalTimeMs().record((double)fetchTimeTakenMs));
        if (throwable != null) {
            log.error("{} tier fetch objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={} completed with exception (leaseTimeTaken:{}ms) (fetchTimeTaken:{}ms)", new Object[]{this.logPrefix, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel, leaseTimeTakenMs, fetchTimeTakenMs, throwable});
            this.tierFetcherMetrics.ifPresent(metrics -> metrics.fetchException().record());
        } else {
            log.debug("{} tier fetch objectMetadata={}, targetOffset={}, maxBytes={}, isolationLevel={} completed (leaseTimeTaken:{}ms) (fetchTimeTaken:{}ms)", new Object[]{this.logPrefix, this.objectMetadata, this.targetOffset, this.maxBytes, this.isolationLevel, leaseTimeTakenMs, fetchTimeTakenMs});
        }
        TierFetchResult tierFetchResult = new TierFetchResult(records, abortedTxns, throwable);
        this.transferPromise.complete(tierFetchResult);
        if (this.fetchCompletionCallback != null) {
            for (DelayedOperationKey key : this.delayedOperationKeys()) {
                this.fetchCompletionCallback.accept(key);
            }
        }
    }
}

