package kafka.tier.fetcher.objectcache;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.fetcher.TierFetcherMetrics;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/fetcher/objectcache/PrefetchCache.class */
public class PrefetchCache {
    private static final Logger log = LoggerFactory.getLogger(PrefetchCache.class);
    private static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true);
    public final LinkedHashMap<CacheKey, CacheEntry> cacheMap = new LinkedHashMap<>(20, 0.75f, false);
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    public volatile long cacheMaxSizeBytes;
    private final long entrySizeBytes;
    private final long prefetchRangeBytes;
    private final TierObjectStore objectStore;
    public volatile Optional<TierFetcherMetrics> tierFetcherMetrics;

    /* loaded from: input_file:kafka/tier/fetcher/objectcache/PrefetchCache$CacheEntry.class */
    public static class CacheEntry {
        public final CompletableFuture<ByteBuf> future;
        public final AtomicInteger refCnt = new AtomicInteger(1);

        public CacheEntry(CompletableFuture<ByteBuf> completableFuture) {
            this.future = completableFuture;
        }

        public void release() {
            if (this.refCnt.decrementAndGet() <= 0) {
                this.future.thenApply((v0) -> {
                    return v0.release();
                });
            }
        }

        public void retain() {
            this.refCnt.incrementAndGet();
        }
    }

    /* loaded from: input_file:kafka/tier/fetcher/objectcache/PrefetchCache$CacheEntryInputStream.class */
    public static class CacheEntryInputStream extends ByteBufInputStream {
        private final List<CacheEntry> entries;

        public CacheEntryInputStream(List<CacheEntry> list, long j, long j2) {
            super(createCompositeBuf(list, j, j2), true);
            this.entries = list;
        }

        private static CompositeByteBuf createCompositeBuf(List<CacheEntry> list, long j, long j2) {
            List list2 = (List) list.stream().map(cacheEntry -> {
                return cacheEntry.future.join();
            }).collect(Collectors.toList());
            CompositeByteBuf compositeBuffer = PrefetchCache.ALLOCATOR.compositeBuffer(list2.size());
            try {
                list2.forEach(byteBuf -> {
                    compositeBuffer.addComponent(true, byteBuf.retainedDuplicate());
                });
                compositeBuffer.readerIndex((int) j);
                compositeBuffer.writerIndex((int) Math.min(compositeBuffer.writerIndex(), j2));
                return compositeBuffer;
            } catch (Exception e) {
                PrefetchCache.log.error("Failed to create InputStream from buffers", e);
                compositeBuffer.release();
                throw e;
            }
        }

        public void close() throws IOException {
            try {
                super.close();
            } finally {
                this.entries.forEach((v0) -> {
                    v0.release();
                });
            }
        }
    }

    /* loaded from: input_file:kafka/tier/fetcher/objectcache/PrefetchCache$CacheKey.class */
    public static class CacheKey {
        public final UUID objectId;
        public final long bytePosition;

        public CacheKey(UUID uuid, long j) {
            this.objectId = uuid;
            this.bytePosition = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            CacheKey cacheKey = (CacheKey) obj;
            return this.bytePosition == cacheKey.bytePosition && Objects.equals(this.objectId, cacheKey.objectId);
        }

        public int hashCode() {
            return Objects.hash(this.objectId, Long.valueOf(this.bytePosition));
        }

        public String toString() {
            return "CacheKey(objectId=" + this.objectId + ", bytePosition=" + this.bytePosition + ')';
        }
    }

    public PrefetchCache(ObjectCacheConfig objectCacheConfig, TierObjectStore tierObjectStore, Optional<TierFetcherMetrics> optional) {
        this.cacheMaxSizeBytes = objectCacheConfig.prefetchCacheMaxSizeBytes;
        this.entrySizeBytes = objectCacheConfig.prefetchCacheEntrySizeBytes;
        this.prefetchRangeBytes = objectCacheConfig.prefetchCacheRangeBytes;
        this.objectStore = tierObjectStore;
        this.tierFetcherMetrics = optional;
    }

    public CompletableFuture<CacheEntryInputStream> get(ObjectMetadata objectMetadata, long j, long j2, int i) {
        log.debug("Fetching data by prefetch cache: {}, start: {}, end: {}, segmentSize: {}", new Object[]{objectMetadata, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
        long alignToEntrySize = alignToEntrySize(j);
        long min = Math.min(j2, i);
        ArrayList arrayList = new ArrayList();
        log.debug("Fetching requested range by prefetch cache: {}, aligned start: {}, end position: {}", new Object[]{objectMetadata, Long.valueOf(alignToEntrySize), Long.valueOf(min)});
        long j3 = alignToEntrySize;
        while (true) {
            long j4 = j3;
            if (j4 >= min) {
                break;
            }
            arrayList.add(getOrCreateCacheEntry(new CacheKey(objectMetadata.objectId(), j4), Math.min(j4 + this.entrySizeBytes, i), objectMetadata, true));
            j3 = j4 + this.entrySizeBytes;
        }
        long min2 = Math.min(alignToEntrySize + this.prefetchRangeBytes, i);
        if (min2 > min) {
            long alignToEntrySize2 = alignToEntrySize(min);
            log.debug("Prefetching data by prefetch cache: {}, range: {} - {}", new Object[]{objectMetadata, Long.valueOf(alignToEntrySize2), Long.valueOf(min2)});
            long j5 = alignToEntrySize2;
            while (j5 < min2) {
                try {
                    getOrCreateCacheEntry(new CacheKey(objectMetadata.objectId(), j5), Math.min(j5 + this.entrySizeBytes, i), objectMetadata, false).release();
                    j5 += this.entrySizeBytes;
                } catch (Exception e) {
                    log.error("Failed to prefetch data: {}, range: {} - {}", new Object[]{objectMetadata, Long.valueOf(alignToEntrySize2), Long.valueOf(min2), e});
                }
            }
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map(cacheEntry -> {
            return cacheEntry.future;
        }).toArray(i2 -> {
            return new CompletableFuture[i2];
        })).thenApply(r17 -> {
            return new CacheEntryInputStream(arrayList, j - alignToEntrySize, min - alignToEntrySize);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            arrayList.forEach((v0) -> {
                v0.release();
            });
            Throwable cause = th.getCause() != null ? th.getCause() : th;
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            throw new RuntimeException(cause);
        });
    }

    private CacheEntry getOrCreateCacheEntry(CacheKey cacheKey, long j, ObjectMetadata objectMetadata, boolean z) {
        this.rwLock.readLock().lock();
        CacheEntry cacheEntry = this.cacheMap.get(cacheKey);
        if (cacheEntry != null) {
            cacheEntry.retain();
        }
        this.rwLock.readLock().unlock();
        if (z) {
            this.tierFetcherMetrics.ifPresent(tierFetcherMetrics -> {
                tierFetcherMetrics.objectCacheAccesses().record();
            });
        }
        if (cacheEntry == null || cacheEntry.future.isCompletedExceptionally()) {
            this.rwLock.writeLock().lock();
            try {
                cacheEntry = this.cacheMap.get(cacheKey);
                if (cacheEntry == null || cacheEntry.future.isCompletedExceptionally()) {
                    cacheEntry = new CacheEntry(fetchDataAsync(objectMetadata, cacheKey.bytePosition, j));
                    this.cacheMap.remove(cacheKey);
                    this.cacheMap.put(cacheKey, cacheEntry);
                } else if (z) {
                    this.tierFetcherMetrics.ifPresent(tierFetcherMetrics2 -> {
                        tierFetcherMetrics2.objectCacheHits().record();
                    });
                }
                cacheEntry.retain();
                evictEntries();
                this.rwLock.writeLock().unlock();
            } catch (Throwable th) {
                this.rwLock.writeLock().unlock();
                throw th;
            }
        } else if (z) {
            this.tierFetcherMetrics.ifPresent(tierFetcherMetrics3 -> {
                tierFetcherMetrics3.objectCacheHits().record();
            });
        }
        return cacheEntry;
    }

    private CompletableFuture<ByteBuf> fetchDataAsync(ObjectMetadata objectMetadata, long j, long j2) {
        return this.objectStore.getObjectStoreFragmentAsync(objectMetadata, FragmentType.SEGMENT, Long.valueOf(j), Long.valueOf(j2)).thenApply(tierObjectStoreResponse -> {
            ByteBuf byteBuf = null;
            try {
                try {
                    int i = (int) (j2 - j);
                    ByteBuf buffer = ALLOCATOR.buffer((int) this.entrySizeBytes);
                    int writeBytes = buffer.writeBytes(tierObjectStoreResponse.getInputStream(), i);
                    if (writeBytes != i) {
                        throw new IllegalStateException(String.format("Failed to read %d bytes from TierObjectStoreResponse, actual read bytes: %d", Integer.valueOf(i), Integer.valueOf(writeBytes)));
                    }
                    this.tierFetcherMetrics.ifPresent(tierFetcherMetrics -> {
                        tierFetcherMetrics.bytesPrefetched().record(i);
                    });
                    return buffer;
                } catch (Exception e) {
                    if (0 != 0) {
                        byteBuf.release();
                    }
                    log.error("Failed to read data from TierObjectStoreResponse", e);
                    throw new TierObjectStoreRetriableException("Failed to read data from TierObjectStoreResponse", e);
                }
            } finally {
                try {
                    tierObjectStoreResponse.close();
                } catch (Exception e2) {
                    log.warn("failed to close TierObjectStoreResponse", e2);
                }
            }
        });
    }

    private long alignToEntrySize(long j) {
        return (j / this.entrySizeBytes) * this.entrySizeBytes;
    }

    private void evictEntries() {
        while (cacheTotalSize() > this.cacheMaxSizeBytes) {
            this.cacheMap.remove(this.cacheMap.keySet().iterator().next()).release();
        }
        this.tierFetcherMetrics.ifPresent(tierFetcherMetrics -> {
            tierFetcherMetrics.objectCacheTotalSizeBytes = cacheTotalSize();
        });
    }

    private long cacheTotalSize() {
        return this.entrySizeBytes * this.cacheMap.size();
    }

    public void close() {
        this.cacheMaxSizeBytes = 0L;
        this.rwLock.writeLock().lock();
        try {
            evictEntries();
        } finally {
            this.rwLock.writeLock().unlock();
            this.tierFetcherMetrics = Optional.empty();
        }
    }
}
