/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.storage.internals.log;

import io.confluent.kafka.availability.FilesWrapper;
import io.confluent.kafka.availability.ThreadCountersManager;
import io.confluent.kafka.storage.checksum.ChecksumParams;
import io.confluent.kafka.storage.checksum.E2EChecksumProtectedObjectType;
import io.confluent.kafka.storage.checksum.E2EChecksumStore;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.generated.ProducerSnapshot;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.BatchMetadata;
import org.apache.kafka.storage.internals.log.CompletedTxn;
import org.apache.kafka.storage.internals.log.CorruptSnapshotException;
import org.apache.kafka.storage.internals.log.LogFileUtils;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.ProducerAppendInfo;
import org.apache.kafka.storage.internals.log.ProducerIdQuotaRecorder;
import org.apache.kafka.storage.internals.log.ProducerStateEntry;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.apache.kafka.storage.internals.log.SnapshotFile;
import org.apache.kafka.storage.internals.log.TxnMetadata;
import org.apache.kafka.storage.internals.log.VerificationStateEntry;
import org.slf4j.Logger;

public class ProducerStateManager {
    public static final long LATE_TRANSACTION_BUFFER_MS = 300000L;
    private static final int VERSION_OFFSET = 0;
    private static final int CRC_OFFSET = 2;
    private static final int PRODUCER_ENTRIES_OFFSET = 6;
    private final Logger log;
    private final TopicPartition topicPartition;
    private final Optional<String> tenantIdOpt;
    private final int maxTransactionTimeoutMs;
    private final ProducerStateManagerConfig producerStateManagerConfig;
    private final Time time;
    private Optional<ProducerIdQuotaRecorder> producerIdQuotaRecorder;
    private final Map<Long, ProducerStateEntry> producers = new HashMap<Long, ProducerStateEntry>();
    private final Map<Long, VerificationStateEntry> verificationStates = new HashMap<Long, VerificationStateEntry>();
    private final TreeMap<Long, TxnMetadata> ongoingTxns = new TreeMap();
    private final TreeMap<Long, TxnMetadata> unreplicatedTxns = new TreeMap();
    private volatile File logDir;
    private volatile int producerIdCount = 0;
    private volatile long oldestTxnStartTimestamp = -1L;
    private ConcurrentSkipListMap<Long, SnapshotFile> snapshots;
    private long lastMapOffset = 0L;
    private long lastSnapOffset = 0L;
    private final Optional<E2EChecksumStore> checksumStoreOpt;
    private final boolean e2eChecksumEnabledForTopic;

    public ProducerStateManager(TopicPartition topicPartition, File logDir, int maxTransactionTimeoutMs, ProducerStateManagerConfig producerStateManagerConfig, Time time, Optional<ProducerIdQuotaRecorder> producerIdQuotaRecorder, ChecksumParams checksumParams) throws IOException {
        this.topicPartition = topicPartition;
        this.tenantIdOpt = Optional.ofNullable(ProducerStateManager.extractTenantPrefix(topicPartition.topic()));
        this.logDir = logDir;
        this.maxTransactionTimeoutMs = maxTransactionTimeoutMs;
        this.producerStateManagerConfig = producerStateManagerConfig;
        this.time = time;
        this.log = new LogContext("[ProducerStateManager partition=" + String.valueOf(topicPartition) + "] ").logger(ProducerStateManager.class);
        this.snapshots = this.loadSnapshots();
        this.producerIdQuotaRecorder = producerIdQuotaRecorder;
        this.checksumStoreOpt = checksumParams.checksumStoreOpt();
        this.e2eChecksumEnabledForTopic = checksumParams.e2eChecksumEnabledForTopic();
    }

    public int maxTransactionTimeoutMs() {
        return this.maxTransactionTimeoutMs;
    }

    public ProducerStateManagerConfig producerStateManagerConfig() {
        return this.producerStateManagerConfig;
    }

    public Optional<ProducerIdQuotaRecorder> producerIdQuotaRecorder() {
        return this.producerIdQuotaRecorder;
    }

    public boolean hasLateTransaction(long currentTimeMs) {
        long lastTimestamp = this.oldestTxnStartTimestamp;
        return lastTimestamp > 0L && currentTimeMs - lastTimestamp > (long)this.maxTransactionTimeoutMs + 300000L;
    }

    public void truncateFullyAndReloadSnapshots() throws IOException {
        this.log.info("Reloading the producer state snapshots");
        this.truncateFullyAndStartAt(0L);
        this.snapshots = this.loadSnapshots();
    }

    public int producerIdCount() {
        return this.producerIdCount;
    }

    private void addProducerId(long producerId, ProducerStateEntry entry) {
        this.producers.put(producerId, entry);
        this.producerIdCount = this.producers.size();
    }

    private void clearProducerIds() {
        this.producers.clear();
        this.producerIdCount = 0;
    }

    public void clearProducerIdQuotaRecorder() {
        this.maybeRecordProducerIdQuota(this.topicPartition.topic(), -1 * this.producers.size(), this.time.milliseconds());
        this.producerIdQuotaRecorder = Optional.empty();
    }

    public VerificationStateEntry maybeCreateVerificationStateEntry(long producerId, int sequence, short epoch, boolean supportsEpochBump) {
        VerificationStateEntry entry = this.verificationStates.computeIfAbsent(producerId, pid -> new VerificationStateEntry(this.time.milliseconds(), sequence, epoch, supportsEpochBump));
        entry.maybeUpdateLowestSequenceAndEpoch(sequence, epoch);
        return entry;
    }

    public VerificationStateEntry verificationStateEntry(long producerId) {
        return this.verificationStates.get(producerId);
    }

    public void clearVerificationStateEntry(long producerId) {
        this.verificationStates.remove(producerId);
    }

    private ConcurrentSkipListMap<Long, SnapshotFile> loadSnapshots() throws IOException {
        ConcurrentSkipListMap<Long, SnapshotFile> offsetToSnapshots = new ConcurrentSkipListMap<Long, SnapshotFile>();
        List snapshotFiles = (List)ThreadCountersManager.wrapIOChecked(() -> ProducerStateManager.listSnapshotFiles(this.logDir, this.e2eChecksumEnabledForTopic));
        for (SnapshotFile snapshotFile : snapshotFiles) {
            offsetToSnapshots.put(snapshotFile.offset, snapshotFile);
        }
        return offsetToSnapshots;
    }

    public void removeStraySnapshots(Collection<Long> segmentBaseOffsets) throws IOException {
        SnapshotFile removedSnapshot;
        long maxOffset;
        long strayOffset;
        OptionalLong maxSegmentBaseOffset = segmentBaseOffsets.isEmpty() ? OptionalLong.empty() : OptionalLong.of(segmentBaseOffsets.stream().max(Long::compare).get());
        HashSet<Long> baseOffsets = new HashSet<Long>(segmentBaseOffsets);
        Optional<Object> latestStraySnapshot = Optional.empty();
        ConcurrentSkipListMap<Long, SnapshotFile> snapshots = this.loadSnapshots();
        for (SnapshotFile snapshot : snapshots.values()) {
            long key = snapshot.offset;
            if (latestStraySnapshot.isPresent()) {
                SnapshotFile prev = (SnapshotFile)latestStraySnapshot.get();
                if (baseOffsets.contains(key)) continue;
                prev.deleteIfExists(this.checksumStoreOpt);
                snapshots.remove(prev.offset);
                latestStraySnapshot = Optional.of(snapshot);
                continue;
            }
            if (baseOffsets.contains(key)) continue;
            latestStraySnapshot = Optional.of(snapshot);
        }
        if (latestStraySnapshot.isPresent() && maxSegmentBaseOffset.isPresent() && (strayOffset = ((SnapshotFile)latestStraySnapshot.get()).offset) < (maxOffset = maxSegmentBaseOffset.getAsLong()) && (removedSnapshot = snapshots.remove(strayOffset)) != null) {
            removedSnapshot.deleteIfExists(this.checksumStoreOpt);
        }
        this.snapshots = snapshots;
    }

    public void flushSnapshots(long fromOffset, long toOffset) throws IOException {
        if (toOffset < fromOffset) {
            throw new IllegalArgumentException("Invalid producer snapshot range: requested snapshots in " + String.valueOf(this.topicPartition) + " from offset " + fromOffset + " which is greater than limit offset " + toOffset);
        }
        for (SnapshotFile snapshotFile : this.snapshots.subMap((Object)fromOffset, true, (Object)toOffset, true).values()) {
            snapshotFile.flush();
            snapshotFile.close();
        }
    }

    public void flushSnapshots(long fromOffset) throws IOException {
        for (SnapshotFile snapshotFile : this.snapshots.tailMap((Object)fromOffset).values()) {
            snapshotFile.flush();
            snapshotFile.close();
        }
    }

    public Optional<LogOffsetMetadata> firstUnstableOffset() {
        Optional<LogOffsetMetadata> unreplicatedFirstOffset = Optional.ofNullable(this.unreplicatedTxns.firstEntry()).map(e -> ((TxnMetadata)e.getValue()).firstOffset);
        Optional<LogOffsetMetadata> undecidedFirstOffset = Optional.ofNullable(this.ongoingTxns.firstEntry()).map(e -> ((TxnMetadata)e.getValue()).firstOffset);
        if (unreplicatedFirstOffset.isEmpty()) {
            return undecidedFirstOffset;
        }
        if (undecidedFirstOffset.isEmpty()) {
            return unreplicatedFirstOffset;
        }
        if (undecidedFirstOffset.get().messageOffset < unreplicatedFirstOffset.get().messageOffset) {
            return undecidedFirstOffset;
        }
        return unreplicatedFirstOffset;
    }

    public void onHighWatermarkUpdated(long highWatermark) {
        this.removeUnreplicatedTransactions(highWatermark);
    }

    public OptionalLong firstUndecidedOffset() {
        Map.Entry<Long, TxnMetadata> firstEntry = this.ongoingTxns.firstEntry();
        return firstEntry != null ? OptionalLong.of(firstEntry.getValue().firstOffset.messageOffset) : OptionalLong.empty();
    }

    public long mapEndOffset() {
        return this.lastMapOffset;
    }

    public Map<Long, ProducerStateEntry> activeProducers() {
        return Collections.unmodifiableMap(this.producers);
    }

    public Map<Long, TxnMetadata> ongoingTxns() {
        return Collections.unmodifiableMap(this.ongoingTxns);
    }

    public boolean isEmpty() {
        return this.producers.isEmpty() && this.unreplicatedTxns.isEmpty();
    }

    private void loadFromSnapshot(long logStartOffset, long currentTimeMs) throws IOException {
        Optional<SnapshotFile> latestSnapshotFileOptional;
        int producerIdExpirationMs = this.producerStateManagerConfig.producerIdExpirationMs(this.tenantIdOpt);
        while ((latestSnapshotFileOptional = this.latestSnapshotFile()).isPresent()) {
            SnapshotFile snapshot = latestSnapshotFileOptional.get();
            try {
                this.log.info("Loading producer state from snapshot file '{}'", (Object)snapshot);
                Stream<ProducerStateEntry> loadedProducers = ProducerStateManager.readSnapshot(snapshot.file()).stream().filter(producerEntry -> !this.isProducerExpired(currentTimeMs, producerIdExpirationMs, (ProducerStateEntry)producerEntry));
                loadedProducers.forEach(producer -> this.loadProducerEntry((ProducerStateEntry)producer, currentTimeMs));
                this.lastMapOffset = this.lastSnapOffset = snapshot.offset;
                this.updateOldestTxnTimestamp();
                return;
            }
            catch (CorruptSnapshotException e) {
                this.log.warn("Failed to load producer snapshot from '{}': {}", (Object)snapshot.file(), (Object)e.getMessage());
                this.removeAndDeleteSnapshot(snapshot.offset);
            }
        }
        this.lastSnapOffset = logStartOffset;
        this.lastMapOffset = logStartOffset;
    }

    public static PriorityQueue<LruProducerInfo> newLruProducerInfoQueue(int max) {
        Comparator<LruProducerInfo> comparator = Comparator.comparingLong(p -> p.lastTimeStamp);
        return new PriorityQueue<LruProducerInfo>(max, comparator.reversed());
    }

    void maybeAddLruProducer(PriorityQueue<LruProducerInfo> lruProducers, int max, ProducerStateEntry producer, long minimalExpirationTimestamp) {
        if (producer.currentTxnFirstOffset().isPresent() || producer.lastTimestamp() > minimalExpirationTimestamp) {
            return;
        }
        if (lruProducers.size() < max) {
            lruProducers.offer(new LruProducerInfo(producer.producerId(), producer.lastTimestamp(), this.topicPartition));
        } else if (lruProducers.peek().lastTimeStamp > producer.lastTimestamp()) {
            lruProducers.poll();
            lruProducers.offer(new LruProducerInfo(producer.producerId(), producer.lastTimestamp(), this.topicPartition));
        }
    }

    public void getLruProducers(PriorityQueue<LruProducerInfo> lruProducers, int max, long minimalExpirationTimestamp) {
        this.producers.values().forEach(producer -> this.maybeAddLruProducer(lruProducers, max, (ProducerStateEntry)producer, minimalExpirationTimestamp));
    }

    public int removeLruProducers(List<LruProducerInfo> lruProducers, long currentTimeMs) {
        int removedProducerIdCount = 0;
        for (LruProducerInfo lruProducer : lruProducers) {
            ProducerStateEntry producer = this.producers.get(lruProducer.producerId);
            if (producer == null || producer.lastTimestamp() != lruProducer.lastTimeStamp || producer.currentTxnFirstOffset().isPresent()) continue;
            ++removedProducerIdCount;
            this.producers.remove(lruProducer.producerId);
        }
        this.producerIdCount = this.producers.size();
        if (removedProducerIdCount > 0) {
            this.maybeRecordProducerIdQuota(this.topicPartition.topic(), -1 * removedProducerIdCount, currentTimeMs);
        }
        return removedProducerIdCount;
    }

    public void loadProducerEntry(ProducerStateEntry entry, long loadTimeMs) {
        long producerId = entry.producerId();
        if (!this.producers.containsKey(producerId)) {
            this.maybeRecordProducerIdQuota(this.topicPartition.topic(), 1.0, loadTimeMs);
        }
        this.addProducerId(producerId, entry);
        entry.currentTxnFirstOffset().ifPresent(offset -> this.ongoingTxns.put(offset, new TxnMetadata(producerId, offset, loadTimeMs)));
    }

    private boolean isProducerExpired(long currentTimeMs, int producerIdExpirationMs, ProducerStateEntry producerState) {
        return producerState.currentTxnFirstOffset().isEmpty() && currentTimeMs - producerState.lastTimestamp() >= (long)producerIdExpirationMs;
    }

    private void maybeRecordProducerIdQuota(String topicName, double value, long timeMs) {
        this.producerIdQuotaRecorder.ifPresent(quotaRecorder -> quotaRecorder.maybeRecord(topicName, value, timeMs));
    }

    public void removeExpiredProducers(long currentTimeMs) {
        int producerIdExpirationMs = this.producerStateManagerConfig.producerIdExpirationMs(this.tenantIdOpt);
        int sizeBeforeRemoval = this.producers.size();
        this.producers.entrySet().removeIf(entry -> this.isProducerExpired(currentTimeMs, producerIdExpirationMs, (ProducerStateEntry)entry.getValue()));
        this.producerIdCount = this.producers.size();
        this.maybeRecordProducerIdQuota(this.topicPartition.topic(), this.producerIdCount - sizeBeforeRemoval, currentTimeMs);
        this.verificationStates.entrySet().removeIf(entry -> currentTimeMs - ((VerificationStateEntry)entry.getValue()).timestamp() >= (long)this.producerStateManagerConfig.producerIdExpirationMs());
    }

    public void truncateAndReload(long logStartOffset, long logEndOffset, long loadTimeMs) throws IOException {
        for (SnapshotFile snapshot : this.snapshots.values()) {
            if (snapshot.offset <= logEndOffset && snapshot.offset > logStartOffset) continue;
            this.removeAndDeleteSnapshot(snapshot.offset);
        }
        if (logEndOffset != this.mapEndOffset()) {
            this.maybeRecordProducerIdQuota(this.topicPartition.topic(), -1 * this.producers.size(), loadTimeMs);
            this.clearProducerIds();
            this.ongoingTxns.clear();
            this.updateOldestTxnTimestamp();
            this.unreplicatedTxns.clear();
            this.loadFromSnapshot(logStartOffset, loadTimeMs);
        } else {
            this.onLogStartOffsetIncremented(logStartOffset);
        }
    }

    public void reloadFromTieredSnapshot(long currentTimeMs, ByteBuffer snapshotBuffer, long snapshotOffset) throws IOException {
        if (!this.activeProducers().isEmpty()) {
            throw new IllegalStateException("expected producer state to be fully truncated before reloading tiered snapshot");
        }
        int producerIdExpirationMs = this.producerStateManagerConfig.producerIdExpirationMs(this.tenantIdOpt);
        try {
            List<ProducerStateEntry> loadedProducers = ProducerStateManager.readSnapshot(snapshotBuffer).stream().filter(producerEntry -> !this.isProducerExpired(currentTimeMs, producerIdExpirationMs, (ProducerStateEntry)producerEntry)).toList();
            this.log.info("Restored state for {} producers from tiered storage", (Object)loadedProducers.size());
            loadedProducers.forEach(producer -> this.loadProducerEntry((ProducerStateEntry)producer, currentTimeMs));
            this.lastMapOffset = snapshotOffset;
        }
        catch (CorruptSnapshotException e) {
            this.log.warn("Failed to load producer snapshot from buffer {}", (Object)e.getMessage());
            throw e;
        }
    }

    public ProducerAppendInfo prepareUpdate(long producerId, AppendOrigin origin, long currentTimeMs) {
        ProducerStateEntry currentEntry = this.lastEntry(producerId).orElse(ProducerStateEntry.empty(producerId));
        return new ProducerAppendInfo(this.topicPartition, producerId, currentEntry, origin, currentTimeMs, this.verificationStateEntry(producerId));
    }

    public void update(ProducerAppendInfo appendInfo) {
        if (appendInfo.producerId() == -1L) {
            throw new IllegalArgumentException("Invalid producer id " + appendInfo.producerId() + " passed to update for partition" + String.valueOf(this.topicPartition));
        }
        this.log.trace("Updated producer {} state to {}", (Object)appendInfo.producerId(), (Object)appendInfo);
        ProducerStateEntry updatedEntry = appendInfo.toEntry();
        ProducerStateEntry currentEntry = this.producers.get(appendInfo.producerId());
        if (currentEntry != null) {
            currentEntry.update(updatedEntry);
        } else {
            this.maybeRecordProducerIdQuota(this.topicPartition.topic(), 1.0, this.time.milliseconds());
            this.addProducerId(appendInfo.producerId(), updatedEntry);
        }
        appendInfo.startedTransactions().forEach(txn -> this.ongoingTxns.put(txn.firstOffset.messageOffset, (TxnMetadata)txn));
        this.updateOldestTxnTimestamp();
    }

    private void updateOldestTxnTimestamp() {
        Map.Entry<Long, TxnMetadata> firstEntry = this.ongoingTxns.firstEntry();
        this.oldestTxnStartTimestamp = firstEntry == null ? -1L : firstEntry.getValue().startTimeUpperBoundMs;
    }

    public void updateMapEndOffset(long lastOffset) {
        this.lastMapOffset = lastOffset;
    }

    public Optional<ProducerStateEntry> lastEntry(long producerId) {
        return Optional.ofNullable(this.producers.get(producerId));
    }

    public void takeSnapshot() throws IOException {
        if (this.lastMapOffset > this.lastSnapOffset) {
            SnapshotFile snapshotFile = (SnapshotFile)ThreadCountersManager.wrapIO(() -> new SnapshotFile(LogFileUtils.producerSnapshotFile(this.logDir, this.lastMapOffset), this.e2eChecksumEnabledForTopic));
            long start = this.time.hiResClockMs();
            ThreadCountersManager.wrapIOChecked(() -> this.writeSnapshot(snapshotFile, this.producers));
            this.log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", new Object[]{this.lastMapOffset, this.producers.size(), this.time.hiResClockMs() - start});
            this.snapshots.put(snapshotFile.offset, snapshotFile);
            this.lastSnapOffset = this.lastMapOffset;
        }
    }

    public void updateParentDir(File parentDir) {
        this.logDir = parentDir;
        this.snapshots.forEach((k, v) -> v.updateParentDir(parentDir, this.checksumStoreOpt));
    }

    public OptionalLong latestSnapshotOffset() {
        Optional<SnapshotFile> snapshotFileOptional = this.latestSnapshotFile();
        return snapshotFileOptional.map(snapshotFile -> OptionalLong.of(snapshotFile.offset)).orElseGet(OptionalLong::empty);
    }

    public OptionalLong oldestSnapshotOffset() {
        Optional<SnapshotFile> snapshotFileOptional = this.oldestSnapshotFile();
        return snapshotFileOptional.map(snapshotFile -> OptionalLong.of(snapshotFile.offset)).orElseGet(OptionalLong::empty);
    }

    public void onLogStartOffsetIncremented(long logStartOffset) {
        this.removeUnreplicatedTransactions(logStartOffset);
        if (this.lastMapOffset < logStartOffset) {
            this.lastMapOffset = logStartOffset;
        }
        this.lastSnapOffset = this.latestSnapshotOffset().orElse(logStartOffset);
    }

    private void removeUnreplicatedTransactions(long offset) {
        Iterator<Map.Entry<Long, TxnMetadata>> iterator = this.unreplicatedTxns.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, TxnMetadata> txnEntry = iterator.next();
            OptionalLong lastOffset = txnEntry.getValue().lastOffset;
            if (!lastOffset.isPresent() || lastOffset.getAsLong() >= offset) continue;
            iterator.remove();
        }
    }

    public void truncateFullyAndStartAt(long offset) throws IOException {
        this.maybeRecordProducerIdQuota(this.topicPartition.topic(), -1 * this.producers.size(), this.time.milliseconds());
        this.clearProducerIds();
        this.ongoingTxns.clear();
        this.unreplicatedTxns.clear();
        for (SnapshotFile snapshotFile : this.snapshots.values()) {
            this.removeAndDeleteSnapshot(snapshotFile.offset);
        }
        this.lastSnapOffset = 0L;
        this.lastMapOffset = offset;
        this.updateOldestTxnTimestamp();
    }

    public long proposedLastStableOffset(CompletedTxn completedTxn) {
        for (Map.Entry<Long, TxnMetadata> ongoingTxn : this.ongoingTxns.entrySet()) {
            if (ongoingTxn.getKey() == completedTxn.firstOffset) continue;
            return ongoingTxn.getValue().firstOffset.messageOffset;
        }
        return completedTxn.lastOffset + 1L;
    }

    public void completeTxn(CompletedTxn completedTxn) {
        TxnMetadata txnMetadata = this.ongoingTxns.remove(completedTxn.firstOffset);
        if (txnMetadata == null) {
            throw new IllegalArgumentException("Attempted to complete transaction " + String.valueOf(completedTxn) + " on partition " + String.valueOf(this.topicPartition) + " which was not started");
        }
        txnMetadata.lastOffset = OptionalLong.of(completedTxn.lastOffset);
        this.unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata);
        this.updateOldestTxnTimestamp();
    }

    public void deleteSnapshotsBefore(long offset) throws IOException {
        for (SnapshotFile snapshot : this.snapshots.subMap((Object)0L, (Object)offset).values()) {
            this.removeAndDeleteSnapshot(snapshot.offset);
        }
    }

    public Optional<File> fetchSnapshot(long offset) {
        return Optional.ofNullable(this.snapshots.get(offset)).map(SnapshotFile::file);
    }

    private Optional<SnapshotFile> oldestSnapshotFile() {
        return Optional.ofNullable(this.snapshots.firstEntry()).map(Map.Entry::getValue);
    }

    private Optional<SnapshotFile> latestSnapshotFile() {
        return Optional.ofNullable(this.snapshots.lastEntry()).map(Map.Entry::getValue);
    }

    public Optional<SnapshotFile> snapshotFileForOffset(long offset) {
        return Optional.ofNullable(this.snapshots.get(offset));
    }

    private void removeAndDeleteSnapshot(long snapshotOffset) throws IOException {
        SnapshotFile snapshotFile = this.snapshots.remove(snapshotOffset);
        if (snapshotFile != null) {
            snapshotFile.deleteIfExists(this.checksumStoreOpt);
        }
    }

    public Optional<SnapshotFile> removeAndMarkSnapshotForDeletion(long snapshotOffset) throws IOException {
        SnapshotFile snapshotFile = this.snapshots.remove(snapshotOffset);
        if (snapshotFile != null) {
            try {
                snapshotFile.close();
                snapshotFile.renameToDelete(this.checksumStoreOpt);
                return Optional.of(snapshotFile);
            }
            catch (NoSuchFileException ex) {
                this.log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", (Object)snapshotFile.file().getAbsoluteFile());
            }
        }
        return Optional.empty();
    }

    public static List<ProducerStateEntry> readSnapshot(File file) throws IOException {
        byte[] buffer = FilesWrapper.readAllBytes((Path)file.toPath());
        return ProducerStateManager.readSnapshot(ByteBuffer.wrap(buffer));
    }

    public static List<ProducerStateEntry> readSnapshot(ByteBuffer buffer) throws IOException {
        ProducerSnapshot producerSnapshot;
        ByteBuffer crcBuffer = buffer.duplicate();
        int crcBufferSize = buffer.remaining();
        try {
            short version = buffer.getShort();
            if (version < 1 || version > 1) {
                throw new CorruptSnapshotException("Snapshot contained an unknown file version " + version);
            }
            producerSnapshot = new ProducerSnapshot((Readable)new ByteBufferAccessor(buffer), version);
        }
        catch (Exception e) {
            throw new CorruptSnapshotException("Snapshot failed schema validation: " + e.getMessage());
        }
        long crc = producerSnapshot.crc();
        long computedCrc = Crc32C.compute((ByteBuffer)crcBuffer, (int)6, (int)(crcBufferSize - 6));
        if (crc != computedCrc) {
            throw new CorruptSnapshotException("Snapshot is corrupt (CRC is no longer valid). Stored crc: " + crc + ". Computed crc: " + computedCrc);
        }
        List<ProducerSnapshot.ProducerEntry> producerEntries = producerSnapshot.producerEntries();
        ArrayList<ProducerStateEntry> entries = new ArrayList<ProducerStateEntry>(producerEntries.size());
        for (ProducerSnapshot.ProducerEntry producerEntry : producerEntries) {
            long producerId = producerEntry.producerId();
            short producerEpoch = producerEntry.epoch();
            int lastSequence = producerEntry.lastSequence();
            long lastOffset = producerEntry.lastOffset();
            long timestamp = producerEntry.timestamp();
            int offsetDelta = producerEntry.offsetDelta();
            int coordinatorEpoch = producerEntry.coordinatorEpoch();
            long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
            OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0L ? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
            Optional<BatchMetadata> batchMetadata = lastOffset >= 0L ? Optional.of(new BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) : Optional.empty();
            entries.add(new ProducerStateEntry(producerId, producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
        }
        return entries;
    }

    public void writeSnapshot(SnapshotFile snapshotFile, Map<Long, ProducerStateEntry> entries) throws IOException {
        ProducerSnapshot producerSnapshot = new ProducerSnapshot();
        ArrayList<ProducerSnapshot.ProducerEntry> producerEntries = new ArrayList<ProducerSnapshot.ProducerEntry>(entries.size());
        for (Map.Entry<Long, ProducerStateEntry> producerIdEntry : entries.entrySet()) {
            Long producerId = producerIdEntry.getKey();
            ProducerStateEntry entry = producerIdEntry.getValue();
            ProducerSnapshot.ProducerEntry producerEntry = new ProducerSnapshot.ProducerEntry().setProducerId(producerId).setEpoch(entry.producerEpoch()).setLastSequence(entry.lastSeq()).setLastOffset(entry.lastDataOffset()).setOffsetDelta(entry.lastOffsetDelta()).setTimestamp(entry.lastTimestamp()).setCoordinatorEpoch(entry.coordinatorEpoch()).setCurrentTxnFirstOffset(entry.currentTxnFirstOffset().orElse(-1L));
            producerEntries.add(producerEntry);
        }
        producerSnapshot.setProducerEntries(producerEntries);
        ByteBuffer buffer = MessageUtil.toVersionPrefixedByteBuffer((short)1, (Message)producerSnapshot);
        long crc = Crc32C.compute((ByteBuffer)buffer, (int)6, (int)(buffer.limit() - 6));
        ByteUtils.writeUnsignedInt((ByteBuffer)buffer, (int)2, (long)crc);
        snapshotFile.maybeOpenForWrite();
        this.checksumStoreOpt.ifPresent(checksumStore -> this.mayUpdateChecksumStoreEntry((E2EChecksumStore)checksumStore, buffer.duplicate(), snapshotFile.file().getAbsolutePath()));
        snapshotFile.write(buffer);
    }

    private void mayUpdateChecksumStoreEntry(E2EChecksumStore checksumStore, ByteBuffer buffer, String key) {
        if (this.e2eChecksumEnabledForTopic && checksumStore.checksumProtectionEnabled(E2EChecksumProtectedObjectType.PRODUCER_STATE)) {
            checksumStore.store().initializeEntry(key);
            checksumStore.store().update(key, buffer);
        }
    }

    private static boolean isSnapshotFile(Path path) {
        return Files.isRegularFile(path, new LinkOption[0]) && path.getFileName().toString().endsWith(".snapshot");
    }

    public static List<SnapshotFile> listSnapshotFiles(File dir) throws IOException {
        return ProducerStateManager.listSnapshotFiles(dir, true);
    }

    public static List<SnapshotFile> listSnapshotFiles(File dir, boolean e2eChecksumEnabledForTopic) throws IOException {
        if (dir.exists() && dir.isDirectory()) {
            try (Stream<Path> paths = Files.list(dir.toPath());){
                List<SnapshotFile> list = paths.filter(ProducerStateManager::isSnapshotFile).map(path -> new SnapshotFile(path.toFile(), e2eChecksumEnabledForTopic)).toList();
                return list;
            }
        }
        return List.of();
    }

    private static String extractTenantPrefix(String name) {
        if (!name.startsWith("lkc-")) {
            return null;
        }
        int delimIndex = name.indexOf(95);
        if (delimIndex == -1) {
            return null;
        }
        return name.substring(0, delimIndex);
    }

    public static class LruProducerInfo {
        public final long producerId;
        public final long lastTimeStamp;
        public final TopicPartition topicPartition;

        public LruProducerInfo(long producerId, long lastTimeStamp, TopicPartition topicPartition) {
            this.producerId = producerId;
            this.lastTimeStamp = lastTimeStamp;
            this.topicPartition = topicPartition;
        }
    }
}

