package kafka.tier.fetcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import kafka.server.DelayedOperationKey;
import kafka.tier.TierUnfetchedTimestampAndOffset;
import kafka.tier.fetcher.OffsetIndexFetchRequest;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:kafka/tier/fetcher/PendingOffsetForTimestampAsync.class */
public class PendingOffsetForTimestampAsync extends PendingOffsetForTimestamp {
    private final int parallelism;
    private final List<TopicPartition> partitionList;
    private final AtomicInteger nextPartitionIndex;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PendingOffsetForTimestampAsync(CancellationContext cancellationContext, TierObjectStore tierObjectStore, Map<TopicPartition, TierUnfetchedTimestampAndOffset> map, Optional<TierFetcherMetrics> optional, Consumer<DelayedOperationKey> consumer, Time time, int i, Consumer<Runnable> consumer2) {
        super(cancellationContext, tierObjectStore, map, optional, consumer, time, Uuid.randomUuid(), "PendingOffsetForTimestampAsync(%s)", consumer2);
        this.nextPartitionIndex = new AtomicInteger(0);
        this.parallelism = i;
        this.partitionList = new ArrayList(map.keySet());
    }

    @Override // java.lang.Runnable
    public void run() {
        log.debug("Starting offsetForTimestamp async, requestId={}, timestamps={}.", this.requestId, this.timestamps);
        long hiResClockMs = this.time.hiResClockMs();
        long j = hiResClockMs - this.creationTimeMs;
        this.tierFetcherMetrics.ifPresent(tierFetcherMetrics -> {
            tierFetcherMetrics.queuedTimeMs().record(j);
        });
        int min = Math.min(this.parallelism, this.timestamps.size());
        ArrayList arrayList = new ArrayList(min);
        for (int i = 0; i < min; i++) {
            arrayList.add(chainOffsetForTimestamp());
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[min])).join();
        long hiResClockMs2 = this.time.hiResClockMs() - hiResClockMs;
        this.tierFetcherMetrics.ifPresent(tierFetcherMetrics2 -> {
            tierFetcherMetrics2.fetchOffsetForTimestampTotalTimeMs().record(hiResClockMs2);
        });
        String format = String.format("Complete PendingOffsetForTimestampAsync for %d partitions, requestId=%s, queuedTimeMs=%d, executionTimeMs=%d", Integer.valueOf(this.results.size()), this.requestId.toString(), Long.valueOf(j), Long.valueOf(hiResClockMs2));
        if (hiResClockMs2 > 5000) {
            log.info(format);
        } else {
            log.debug(format);
        }
        complete();
    }

    private CompletableFuture<Void> chainOffsetForTimestamp() {
        int andIncrement = this.nextPartitionIndex.getAndIncrement();
        if (andIncrement >= this.partitionList.size()) {
            return CompletableFuture.completedFuture(null);
        }
        TopicPartition topicPartition = this.partitionList.get(andIncrement);
        return searchOffsetForPartition(topicPartition, this.timestamps.get(topicPartition)).thenCompose(r3 -> {
            return chainOffsetForTimestamp();
        });
    }

    private CompletableFuture<Void> searchOffsetForPartition(TopicPartition topicPartition, TierUnfetchedTimestampAndOffset tierUnfetchedTimestampAndOffset) {
        ObjectMetadata objectMetadata = tierUnfetchedTimestampAndOffset.metadata;
        long j = tierUnfetchedTimestampAndOffset.timestamp;
        return fetchable(topicPartition) ? TimestampIndexFetchRequest.fetchOffsetForTimestampAsync(this.cancellationContext, this.tierObjectStore, objectMetadata, j).thenCompose(tierFetchTimestampIndexResult -> {
            return fetchable(topicPartition) ? OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffsetAsync(this.cancellationContext, this.tierObjectStore, objectMetadata, tierFetchTimestampIndexResult.startTimestampOffset.offset, tierFetchTimestampIndexResult.endTimestampOffset.offset) : CompletableFuture.completedFuture(null);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) tierFetchOffsetIndexResult -> {
            return (tierFetchOffsetIndexResult == null || !fetchable(topicPartition)) ? CompletableFuture.completedFuture(null) : fetchSegmentRangeAndSearch(objectMetadata, tierFetchOffsetIndexResult, j, tierUnfetchedTimestampAndOffset);
        }).handle((optional, th) -> {
            if (th != null) {
                handlePartitionError(topicPartition, tierUnfetchedTimestampAndOffset, th);
                return null;
            }
            if (optional == null) {
                return null;
            }
            putOffsetResult(topicPartition, optional, j);
            return null;
        }) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Optional<Long>> fetchSegmentRangeAndSearch(ObjectMetadata objectMetadata, OffsetIndexFetchRequest.TierFetchOffsetIndexResult tierFetchOffsetIndexResult, long j, TierUnfetchedTimestampAndOffset tierUnfetchedTimestampAndOffset) {
        return (tierFetchOffsetIndexResult.endOffsetPosition == null ? this.tierObjectStore.getObjectStoreFragmentAsync(objectMetadata, FragmentType.SEGMENT, Long.valueOf(tierFetchOffsetIndexResult.startOffsetPosition.position)) : this.tierObjectStore.getObjectStoreFragmentAsync(objectMetadata, FragmentType.SEGMENT, Long.valueOf(tierFetchOffsetIndexResult.startOffsetPosition.position), Long.valueOf(tierFetchOffsetIndexResult.endOffsetPosition.position))).thenApply(tierObjectStoreResponse -> {
            try {
                try {
                    return this.reader.offsetForTimestamp(this.cancellationContext, tierObjectStoreResponse.getInputStream(), j, tierUnfetchedTimestampAndOffset.objectSize);
                } finally {
                    try {
                        tierObjectStoreResponse.close();
                    } catch (Exception e) {
                        log.warn("failed to close TierObjectStoreResponse of SEGMENT byte range", e);
                    }
                }
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        });
    }
}
