package org.apache.kafka.storage.internals.epoch;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.class */
public final class LeaderEpochFileCache {
    private final TopicPartition topicPartition;
    private final LeaderEpochCheckpointFile checkpoint;
    private final Scheduler scheduler;
    private final Logger log;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final TreeMap<Integer, EpochEntry> epochs = new TreeMap<>();

    public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpointFile leaderEpochCheckpointFile, Scheduler scheduler) {
        this.checkpoint = leaderEpochCheckpointFile;
        this.topicPartition = topicPartition;
        this.scheduler = scheduler;
        this.log = new LogContext("[LeaderEpochCache " + String.valueOf(topicPartition) + "] ").logger(LeaderEpochFileCache.class);
        leaderEpochCheckpointFile.read().forEach(this::assign);
    }

    private LeaderEpochFileCache(List<EpochEntry> list, TopicPartition topicPartition, LeaderEpochCheckpointFile leaderEpochCheckpointFile, Scheduler scheduler) {
        this.checkpoint = leaderEpochCheckpointFile;
        this.topicPartition = topicPartition;
        this.scheduler = scheduler;
        this.log = new LogContext("[LeaderEpochCache " + String.valueOf(topicPartition) + "] ").logger(LeaderEpochFileCache.class);
        for (EpochEntry epochEntry : list) {
            this.epochs.put(Integer.valueOf(epochEntry.epoch), epochEntry);
        }
    }

    public void assign(int i, long j) {
        EpochEntry epochEntry = new EpochEntry(i, j);
        if (assign(epochEntry)) {
            this.log.debug("Appended new epoch entry {}. Cache now contains {} entries.", epochEntry, Integer.valueOf(this.epochs.size()));
            writeToFile();
        }
    }

    public void assign(List<EpochEntry> list) {
        list.forEach(epochEntry -> {
            if (assign(epochEntry)) {
                this.log.debug("Appended new epoch entry {}. Cache now contains {} entries.", epochEntry, Integer.valueOf(this.epochs.size()));
            }
        });
        if (list.isEmpty()) {
            return;
        }
        writeToFile();
    }

    private boolean isUpdateNeeded(EpochEntry epochEntry) {
        return ((Boolean) latestEntry().map(epochEntry2 -> {
            return Boolean.valueOf(epochEntry.epoch != epochEntry2.epoch || epochEntry.startOffset < epochEntry2.startOffset);
        }).orElse(true)).booleanValue();
    }

    private boolean assign(EpochEntry epochEntry) {
        if (epochEntry.epoch < 0 || epochEntry.startOffset < 0) {
            throw new IllegalArgumentException("Received invalid partition leader epoch entry " + String.valueOf(epochEntry));
        }
        if (!isUpdateNeeded(epochEntry)) {
            return false;
        }
        this.lock.writeLock().lock();
        try {
            if (!isUpdateNeeded(epochEntry)) {
                return false;
            }
            maybeTruncateNonMonotonicEntries(epochEntry);
            this.epochs.put(Integer.valueOf(epochEntry.epoch), epochEntry);
            return true;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void maybeTruncateNonMonotonicEntries(EpochEntry epochEntry) {
        List<EpochEntry> removeWhileMatching = removeWhileMatching(this.epochs.descendingMap().entrySet().iterator(), epochEntry2 -> {
            return epochEntry2.epoch >= epochEntry.epoch || epochEntry2.startOffset >= epochEntry.startOffset;
        });
        if (removeWhileMatching.size() > 1 || !(removeWhileMatching.isEmpty() || removeWhileMatching.get(0).startOffset == epochEntry.startOffset)) {
            this.log.warn("New epoch entry {} caused truncation of conflicting entries {}. Cache now contains {} entries.", new Object[]{epochEntry, removeWhileMatching, Integer.valueOf(this.epochs.size())});
        }
    }

    private static List<EpochEntry> removeWhileMatching(Iterator<Map.Entry<Integer, EpochEntry>> it, Predicate<EpochEntry> predicate) {
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            EpochEntry value = it.next().getValue();
            if (!predicate.test(value)) {
                return arrayList;
            }
            arrayList.add(value);
            it.remove();
        }
        return arrayList;
    }

    public boolean nonEmpty() {
        this.lock.readLock().lock();
        try {
            return !this.epochs.isEmpty();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public Optional<EpochEntry> latestEntry() {
        this.lock.readLock().lock();
        try {
            return Optional.ofNullable(this.epochs.lastEntry()).map((v0) -> {
                return v0.getValue();
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public OptionalInt latestEpoch() {
        return (OptionalInt) latestEntry().map(epochEntry -> {
            return OptionalInt.of(epochEntry.epoch);
        }).orElseGet(OptionalInt::empty);
    }

    public OptionalInt previousEpoch() {
        this.lock.readLock().lock();
        try {
            return (OptionalInt) latestEntry().flatMap(epochEntry -> {
                return Optional.ofNullable(this.epochs.lowerEntry(Integer.valueOf(epochEntry.epoch)));
            }).map(entry -> {
                return OptionalInt.of(((Integer) entry.getKey()).intValue());
            }).orElseGet(OptionalInt::empty);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public Optional<EpochEntry> earliestEntry() {
        this.lock.readLock().lock();
        try {
            return Optional.ofNullable(this.epochs.firstEntry()).map((v0) -> {
                return v0.getValue();
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public OptionalInt previousEpoch(int i) {
        this.lock.readLock().lock();
        try {
            return toOptionalInt(this.epochs.lowerKey(Integer.valueOf(i)));
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public Optional<EpochEntry> previousEntry(int i) {
        this.lock.readLock().lock();
        try {
            return Optional.ofNullable(this.epochs.lowerEntry(Integer.valueOf(i))).map((v0) -> {
                return v0.getValue();
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public OptionalInt nextEpoch(int i) {
        this.lock.readLock().lock();
        try {
            return toOptionalInt(this.epochs.higherKey(Integer.valueOf(i)));
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private static OptionalInt toOptionalInt(Integer num) {
        return num != null ? OptionalInt.of(num.intValue()) : OptionalInt.empty();
    }

    public Optional<EpochEntry> epochEntry(int i) {
        this.lock.readLock().lock();
        try {
            return Optional.ofNullable(this.epochs.get(Integer.valueOf(i)));
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public Map.Entry<Integer, Long> endOffsetFor(int i, long j) {
        AbstractMap.SimpleImmutableEntry simpleImmutableEntry;
        this.lock.readLock().lock();
        try {
            if (i == -1) {
                simpleImmutableEntry = new AbstractMap.SimpleImmutableEntry(-1, -1L);
            } else if (latestEpoch().isPresent() && latestEpoch().getAsInt() == i) {
                simpleImmutableEntry = new AbstractMap.SimpleImmutableEntry(Integer.valueOf(i), Long.valueOf(j));
            } else {
                Map.Entry<Integer, EpochEntry> higherEntry = this.epochs.higherEntry(Integer.valueOf(i));
                if (higherEntry == null) {
                    simpleImmutableEntry = new AbstractMap.SimpleImmutableEntry(-1, -1L);
                } else {
                    Map.Entry<Integer, EpochEntry> floorEntry = this.epochs.floorEntry(Integer.valueOf(i));
                    simpleImmutableEntry = floorEntry == null ? new AbstractMap.SimpleImmutableEntry(Integer.valueOf(i), Long.valueOf(higherEntry.getValue().startOffset)) : new AbstractMap.SimpleImmutableEntry(Integer.valueOf(floorEntry.getValue().epoch), Long.valueOf(higherEntry.getValue().startOffset));
                }
            }
            if (this.log.isTraceEnabled()) {
                this.log.trace("Processed end offset request for epoch {} and returning epoch {} with end offset {} from epoch cache of size {}}", new Object[]{Integer.valueOf(i), simpleImmutableEntry.getKey(), simpleImmutableEntry.getValue(), Integer.valueOf(this.epochs.size())});
            }
            return simpleImmutableEntry;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void truncateFromEndAsyncFlush(long j) {
        this.lock.writeLock().lock();
        try {
            List<EpochEntry> truncateFromEnd = truncateFromEnd(this.epochs, j);
            if (!truncateFromEnd.isEmpty()) {
                this.scheduler.scheduleOnce("leader-epoch-cache-flush-" + String.valueOf(this.topicPartition), this::writeIfDirExists);
                this.log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", new Object[]{truncateFromEnd, Long.valueOf(j), Integer.valueOf(this.epochs.size())});
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public void truncateFromStartAsyncFlush(long j) {
        this.lock.writeLock().lock();
        try {
            List<EpochEntry> truncateFromStart = truncateFromStart(this.epochs, j);
            if (!truncateFromStart.isEmpty()) {
                this.scheduler.scheduleOnce("leader-epoch-cache-flush-" + String.valueOf(this.topicPartition), this::writeIfDirExists);
                this.log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", new Object[]{truncateFromStart, truncateFromStart.get(truncateFromStart.size() - 1), Long.valueOf(j), Integer.valueOf(this.epochs.size())});
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private static List<EpochEntry> truncateFromStart(TreeMap<Integer, EpochEntry> treeMap, long j) {
        List<EpochEntry> removeWhileMatching = removeWhileMatching(treeMap.entrySet().iterator(), epochEntry -> {
            return epochEntry.startOffset <= j;
        });
        if (!removeWhileMatching.isEmpty()) {
            EpochEntry epochEntry2 = new EpochEntry(removeWhileMatching.get(removeWhileMatching.size() - 1).epoch, j);
            treeMap.put(Integer.valueOf(epochEntry2.epoch), epochEntry2);
        }
        return removeWhileMatching;
    }

    private static List<EpochEntry> truncateFromEnd(TreeMap<Integer, EpochEntry> treeMap, long j) {
        Optional map = Optional.ofNullable(treeMap.lastEntry()).map((v0) -> {
            return v0.getValue();
        });
        return (j < 0 || !map.isPresent() || ((EpochEntry) map.get()).startOffset < j) ? Collections.emptyList() : removeWhileMatching(treeMap.descendingMap().entrySet().iterator(), epochEntry -> {
            return epochEntry.startOffset >= j;
        });
    }

    public OptionalInt epochForOffset(long j) {
        this.lock.readLock().lock();
        try {
            OptionalInt empty = OptionalInt.empty();
            for (EpochEntry epochEntry : this.epochs.values()) {
                int i = epochEntry.epoch;
                long j2 = epochEntry.startOffset;
                if (j2 == j) {
                    OptionalInt of = OptionalInt.of(i);
                    this.lock.readLock().unlock();
                    return of;
                }
                if (j2 > j) {
                    return empty;
                }
                empty = OptionalInt.of(i);
            }
            OptionalInt optionalInt = empty;
            this.lock.readLock().unlock();
            return optionalInt;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile leaderEpochCheckpointFile) {
        this.lock.readLock().lock();
        try {
            return new LeaderEpochFileCache(epochEntries(), this.topicPartition, leaderEpochCheckpointFile, this.scheduler);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public List<EpochEntry> epochEntriesInRange(long j, long j2) {
        this.lock.readLock().lock();
        try {
            TreeMap treeMap = new TreeMap((SortedMap) this.epochs);
            if (j >= 0) {
                truncateFromStart(treeMap, j);
            }
            truncateFromEnd(treeMap, j2);
            ArrayList arrayList = new ArrayList(treeMap.values());
            this.lock.readLock().unlock();
            return arrayList;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public void clearAndFlush() {
        this.lock.writeLock().lock();
        try {
            this.epochs.clear();
            writeToFile();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public void clear() {
        this.lock.writeLock().lock();
        try {
            this.epochs.clear();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public List<EpochEntry> epochEntries() {
        this.lock.readLock().lock();
        try {
            return new ArrayList(this.epochs.values());
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public NavigableMap<Integer, Long> epochWithOffsets() {
        this.lock.readLock().lock();
        try {
            TreeMap treeMap = new TreeMap();
            for (EpochEntry epochEntry : this.epochs.values()) {
                treeMap.put(Integer.valueOf(epochEntry.epoch), Long.valueOf(epochEntry.startOffset));
            }
            return treeMap;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private void writeToFile() {
        this.lock.readLock().lock();
        try {
            this.checkpoint.write(this.epochs.values());
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private void writeIfDirExists() {
        this.lock.readLock().lock();
        try {
            this.checkpoint.writeIfDirExists(this.epochs.values());
        } finally {
            this.lock.readLock().unlock();
        }
    }
}
