/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.storage.internals.log;

import io.confluent.kafka.storage.log.AbstractLog;
import io.confluent.kafka.storage.log.CleanParams;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.DigestException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.CleanedTransactionMetadata;
import org.apache.kafka.storage.internals.log.CleanerStats;
import org.apache.kafka.storage.internals.log.ExtendedOffsetMap;
import org.apache.kafka.storage.internals.log.LastRecord;
import org.apache.kafka.storage.internals.log.LocalLogSegmentReadAdapter;
import org.apache.kafka.storage.internals.log.LogCleanerMetrics;
import org.apache.kafka.storage.internals.log.LogCleaningAbortedException;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.LogSegmentOffsetOverflowException;
import org.apache.kafka.storage.internals.log.LogSegmentReadAdapter;
import org.apache.kafka.storage.internals.log.LogToClean;
import org.apache.kafka.storage.internals.log.MergedLogUtils;
import org.apache.kafka.storage.internals.log.OffsetMap;
import org.apache.kafka.storage.internals.log.ValidateLogResult;
import org.apache.kafka.storage.internals.utils.Throttler;
import org.slf4j.Logger;

public class Cleaner {
    private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
    private final Logger logger;
    public final int id;
    public final OffsetMap offsetMap;
    private final int ioBufferSize;
    private final int maxIoBufferSize;
    private final double dupBufferLoadFactor;
    private final Throttler throttler;
    private final Time time;
    private final Consumer<TopicPartition> checkDone;
    private final LogCleanerMetrics cleanerMetrics;
    private ByteBuffer readBuffer;
    private ByteBuffer writeBuffer;

    public Cleaner(int id, OffsetMap offsetMap, int ioBufferSize, int maxIoBufferSize, double dupBufferLoadFactor, Throttler throttler, Time time, Consumer<TopicPartition> checkDone, LogCleanerMetrics cleanerMetrics) {
        this.id = id;
        this.offsetMap = offsetMap;
        this.ioBufferSize = ioBufferSize;
        this.maxIoBufferSize = maxIoBufferSize;
        this.dupBufferLoadFactor = dupBufferLoadFactor;
        this.throttler = throttler;
        this.time = time;
        this.checkDone = checkDone;
        this.cleanerMetrics = cleanerMetrics;
        this.logger = new LogContext("Cleaner " + id + ": ").logger(Cleaner.class);
        this.readBuffer = ByteBuffer.allocate(ioBufferSize);
        this.writeBuffer = ByteBuffer.allocate(ioBufferSize);
        assert ((double)offsetMap.slots() * dupBufferLoadFactor > 1.0) : "offset map is too small to fit in even a single message, so log cleaning will never make progress. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads";
    }

    public Map.Entry<Long, CleanerStats> clean(CleanParams cleanable) throws IOException, DigestException {
        if (cleanable.firstDirtyOffset() >= cleanable.firstUncleanableOffset()) {
            this.logger.warn("Skip cleaning for log {} because dirty range is not cleanable. firstDirtyOffset={}, firstUncleanableOffset={}", new Object[]{cleanable.log().name(), cleanable.firstDirtyOffset(), cleanable.firstUncleanableOffset()});
            return Map.entry(cleanable.firstDirtyOffset(), new CleanerStats(this.time, this.cleanerMetrics));
        }
        Map.Entry<Long, CleanerStats> result = this.doClean(cleanable, this.time.milliseconds());
        double bytesShrinkageRatio = 1.0 - (double)result.getValue().bytesWritten / (double)result.getValue().bytesRead;
        cleanable.log().updateLastShrinkageRatio(bytesShrinkageRatio);
        return result;
    }

    public Map.Entry<Long, CleanerStats> doClean(CleanParams cleanable, long currentTime) throws IOException, DigestException {
        return this.doCleanInternal(cleanable, currentTime);
    }

    public Map.Entry<Long, CleanerStats> doClean(LogToClean logToClean, long currentTime) throws IOException, DigestException {
        return this.doCleanInternal(logToClean, currentTime);
    }

    private Map.Entry<Long, CleanerStats> doCleanInternal(CleanParams cleanable, long currentTime) throws IOException, DigestException {
        AbstractLog log = cleanable.log();
        this.logger.info("Beginning cleaning of log {}", (Object)log.name());
        List<LogSegment> segments = log.localLogSegments(0L, cleanable.firstDirtyOffset()).stream().toList();
        long legacyDeleteHorizonMs = segments.isEmpty() ? 0L : segments.get(segments.size() - 1).lastModified() - log.config().deleteRetentionMs;
        CleanerStats stats = new CleanerStats(this.time, this.cleanerMetrics);
        this.logger.info("Building offset map for {}...", (Object)log.name());
        long upperBoundOffset = cleanable.firstUncleanableOffset();
        boolean isOffsetMapFull = this.buildOffsetMap(log, cleanable.firstDirtyOffset(), upperBoundOffset, this.offsetMap, stats);
        long endOffset = this.offsetMap.latestOffset() + 1L;
        stats.indexDone(isOffsetMapFull);
        segments = log.localLogSegments(0L, cleanable.firstUncleanableOffset()).stream().toList();
        long cleanableHorizonMs = segments.isEmpty() ? 0L : segments.get(segments.size() - 1).lastModified();
        this.logger.info("Cleaning log {} (cleaning prior to {}, discarding tombstones prior to upper bound deletion horizon {})...", new Object[]{log.name(), new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)});
        CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata();
        List<List<LogSegment>> groupedSegments = this.groupSegmentsBySize(log.localLogSegments(cleanable.startOffset(), endOffset), log.config().segmentSize(), log.config().maxIndexSize, cleanable.firstUncleanableOffset());
        for (List<LogSegment> group : groupedSegments) {
            this.cleanSegments(log, group, this.offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
        }
        stats.bufferUtilization = this.offsetMap.utilization();
        stats.allDone();
        return Map.entry(endOffset, stats);
    }

    public void cleanSegments(AbstractLog log, List<LogSegment> segments, OffsetMap map, long currentTime, CleanerStats stats, CleanedTransactionMetadata transactionMetadata, long legacyDeleteHorizonMs, long upperBoundOffsetOfCleaningRound) throws IOException {
        LogSegment cleaned = MergedLogUtils.createNewCleanedSegment(log.dir(), log.config(), segments.get(0).baseOffset());
        transactionMetadata.setCleanedIndex(Optional.of(cleaned.txnIndex()));
        try {
            Iterator<LogSegment> iter = segments.iterator();
            Optional<LogSegment> currentSegmentOpt = Optional.ofNullable(iter.next());
            Map<Long, LastRecord> lastOffsetOfActiveProducers = log.lastRecordsOfActiveProducers();
            while (currentSegmentOpt.isPresent()) {
                LogSegment currentSegment = currentSegmentOpt.get();
                Optional<LogSegment> nextSegmentOpt = iter.hasNext() ? Optional.of(iter.next()) : Optional.empty();
                long startOffset = currentSegment.baseOffset();
                long upperBoundOffset = nextSegmentOpt.map(LogSegment::baseOffset).orElse(currentSegment.readNextOffset());
                List<AbortedTxn> abortedTransactions = this.collectAbortedTransactions(log, startOffset, upperBoundOffset);
                transactionMetadata.addAbortedTransactions(abortedTransactions);
                boolean retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified() > legacyDeleteHorizonMs;
                this.logger.info("Cleaning {} in log {} into {} with an upper bound deletion horizon {} computed from the segment last modified time of {}, {} deletes.", new Object[]{currentSegment, log.name(), cleaned.baseOffset(), legacyDeleteHorizonMs, currentSegment.lastModified(), retainLegacyDeletesAndTxnMarkers ? "retaining" : "discarding"});
                try {
                    LocalLogSegmentReadAdapter segmentAdapter = new LocalLogSegmentReadAdapter(log, currentSegment);
                    this.cleanInto(log.topicPartition(), segmentAdapter, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config().deleteRetentionMs, log.config().maxMessageSize(), transactionMetadata, lastOffsetOfActiveProducers, upperBoundOffsetOfCleaningRound, stats, currentTime, log.config().confluentLogConfig().logCleanerTimestampValidationEnable);
                }
                catch (LogSegmentOffsetOverflowException e) {
                    this.logger.info("Caught segment overflow error during cleaning: {}", (Object)e.getMessage());
                    log.splitOverflowedSegment(currentSegment);
                    throw new LogCleaningAbortedException();
                }
                currentSegmentOpt = nextSegmentOpt;
            }
            cleaned.onBecomeInactiveSegment();
            cleaned.flush();
            long modified = segments.get(segments.size() - 1).lastModified();
            cleaned.setLastModified(modified);
            this.logger.info("Swapping in cleaned segment {} for segment(s) {} in log {}", new Object[]{cleaned, segments, log});
            log.replaceSegments(List.of(cleaned), segments);
        }
        catch (LogCleaningAbortedException e) {
            try {
                cleaned.deleteIfExists();
            }
            catch (Exception deleteException) {
                e.addSuppressed(deleteException);
            }
            throw e;
        }
    }

    public void cleanInto(TopicPartition topicPartition, LogSegmentReadAdapter source, LogSegment dest, final OffsetMap map, final boolean retainLegacyDeletesAndTxnMarkers, long deleteRetentionMs, int maxLogMessageSize, final CleanedTransactionMetadata transactionMetadata, final Map<Long, LastRecord> lastRecordsOfActiveProducers, final long upperBoundOffsetOfCleaningRound, final CleanerStats stats, long currentTime, boolean validateSourceTimestamps) throws IOException {
        MemoryRecords.RecordFilter logCleanerFilter = new MemoryRecords.RecordFilter(this, currentTime, deleteRetentionMs){
            boolean discardBatchRecords;
            final /* synthetic */ Cleaner this$0;
            {
                this.this$0 = this$0;
                super(currentTime, deleteRetentionMs);
                this.discardBatchRecords = false;
            }

            public MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                boolean isLastBatchFromProducer;
                boolean canDiscardBatch = this.this$0.shouldDiscardBatch(batch, transactionMetadata);
                this.discardBatchRecords = batch.isControlBatch() ? (batch.deleteHorizonMs().isPresent() ? canDiscardBatch && batch.deleteHorizonMs().getAsLong() <= this.currentTime : canDiscardBatch && this.deleteRetentionMs == 0L) : canDiscardBatch;
                boolean bl = isLastBatchFromProducer = batch.hasProducerId() && Optional.ofNullable((LastRecord)lastRecordsOfActiveProducers.get(batch.producerId())).map(lastRecord -> {
                    if (lastRecord.lastDataOffset.isPresent()) {
                        return batch.lastOffset() == lastRecord.lastDataOffset.getAsLong();
                    }
                    return batch.isControlBatch() && batch.producerEpoch() == lastRecord.producerEpoch && batch.maxTimestamp() >= lastRecord.lastTimestampMs;
                }).orElse(false) != false;
                MemoryRecords.RecordFilter.BatchRetention batchRetention = isLastBatchFromProducer ? MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY : (batch.nextOffset() == upperBoundOffsetOfCleaningRound ? MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY : (this.discardBatchRecords ? MemoryRecords.RecordFilter.BatchRetention.DELETE : MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY));
                return new MemoryRecords.RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch && batch.isControlBatch());
            }

            public boolean shouldRetainRecord(RecordBatch batch, Record record) {
                if (this.discardBatchRecords) {
                    return false;
                }
                if (batch.isControlBatch()) {
                    return true;
                }
                try {
                    return this.this$0.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, this.currentTime, this.deleteRetentionMs);
                }
                catch (DigestException e) {
                    throw new RuntimeException(e);
                }
            }

            protected void onBatchRetained(RecordBatch batch) {
                transactionMetadata.onBatchRetained(batch);
            }
        };
        int position = source.startPosition();
        while (position < source.endPosition()) {
            this.checkDone.accept(topicPartition);
            this.readBuffer.clear();
            this.writeBuffer.clear();
            source.readBytes(this.readBuffer, position);
            MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)this.readBuffer);
            stats.recordThrottledTime(this.throttler.maybeThrottle(records.sizeInBytes()));
            MemoryRecords.FilterResult result = records.filterTo(logCleanerFilter, this.writeBuffer, this.decompressionBufferSupplier, validateSourceTimestamps);
            stats.readMessages(result.messagesRead(), result.bytesRead(), result.keyBytesRead());
            stats.recopyMessages(result.messagesRetained(), result.bytesRetained());
            position += result.bytesRead();
            ByteBuffer outputBuffer = result.outputBuffer();
            if (outputBuffer.position() > 0) {
                outputBuffer.flip();
                MemoryRecords retained = MemoryRecords.readableRecords((ByteBuffer)outputBuffer);
                dest.append(result.maxOffset(), retained);
                stats.recordThrottledTime(this.throttler.maybeThrottle(outputBuffer.limit()));
            }
            if (this.readBuffer.limit() <= 0 || result.bytesRead() != 0) continue;
            this.growBuffersOrFail(source, position, maxLogMessageSize, records);
        }
        this.restoreBuffers();
    }

    private void growBuffersOrFail(LogSegmentReadAdapter source, int position, int maxLogMessageSize, MemoryRecords memoryRecords) {
        int maxSize;
        if (this.readBuffer.capacity() >= maxLogMessageSize) {
            Integer nextBatchSize = memoryRecords.firstBatchSize();
            String logDesc = String.format("log segment %s at position %d", source, position);
            if (nextBatchSize == null) {
                throw new IllegalStateException("Could not determine next batch size for " + logDesc);
            }
            if (nextBatchSize <= 0) {
                throw new IllegalStateException("Invalid batch size " + nextBatchSize + " for " + logDesc);
            }
            if (nextBatchSize <= this.readBuffer.capacity()) {
                throw new IllegalStateException("Batch size " + nextBatchSize + " < buffer size " + this.readBuffer.capacity() + ", but not processed for " + logDesc);
            }
            int bytesLeft = source.endPosition() - position;
            if (nextBatchSize > bytesLeft) {
                throw new CorruptRecordException("Log segment may be corrupt, batch size " + nextBatchSize + " > " + bytesLeft + " bytes left in segment for " + logDesc);
            }
            maxSize = nextBatchSize;
        } else {
            maxSize = maxLogMessageSize;
        }
        this.growBuffers(maxSize);
    }

    private boolean shouldDiscardBatch(RecordBatch batch, CleanedTransactionMetadata transactionMetadata) {
        if (batch.isControlBatch()) {
            return transactionMetadata.onControlBatchRead(batch);
        }
        return transactionMetadata.onBatchRead(batch);
    }

    private boolean shouldRetainRecord(OffsetMap map, boolean retainDeletesForLegacyRecords, RecordBatch batch, Record record, CleanerStats stats, long currentTime, long deleteRetentionMs) throws DigestException {
        boolean pastLatestOffset;
        boolean bl = pastLatestOffset = record.offset() > map.latestOffset();
        if (pastLatestOffset) {
            return true;
        }
        if (record.hasKey()) {
            boolean legacyRecord;
            ByteBuffer key = record.key();
            long foundOffset = map.get(key);
            boolean latestOffsetForKey = record.offset() >= foundOffset;
            boolean bl2 = legacyRecord = batch.magic() < 2;
            boolean shouldRetainDeletes = !legacyRecord ? (deleteRetentionMs != 0L ? batch.deleteHorizonMs().isEmpty() || currentTime < batch.deleteHorizonMs().getAsLong() : false) : retainDeletesForLegacyRecords;
            boolean isRetainedValue = record.hasValue() || shouldRetainDeletes;
            return latestOffsetForKey && isRetainedValue;
        }
        stats.invalidMessage();
        return false;
    }

    private void growBuffers(int maxLogMessageSize) {
        int maxBufferSize = Math.max(maxLogMessageSize, this.maxIoBufferSize);
        if (this.readBuffer.capacity() >= maxBufferSize || this.writeBuffer.capacity() >= maxBufferSize) {
            throw new IllegalStateException("This log contains a message larger than maximum allowable size of " + maxBufferSize + ".");
        }
        int newSize = Math.min(this.readBuffer.capacity() * 2, maxBufferSize);
        this.logger.info("Growing cleaner I/O buffers from {} bytes to {} bytes.", (Object)this.readBuffer.capacity(), (Object)newSize);
        this.readBuffer = ByteBuffer.allocate(newSize);
        this.writeBuffer = ByteBuffer.allocate(newSize);
    }

    private void restoreBuffers() {
        if (this.readBuffer.capacity() > this.ioBufferSize) {
            this.readBuffer = ByteBuffer.allocate(this.ioBufferSize);
        }
        if (this.writeBuffer.capacity() > this.ioBufferSize) {
            this.writeBuffer = ByteBuffer.allocate(this.ioBufferSize);
        }
    }

    public List<List<LogSegment>> groupSegmentsBySize(Collection<LogSegment> segmentsCollection, int maxSize, int maxIndexSize, long firstUncleanableOffset) throws IOException {
        ArrayList<List<LogSegment>> grouped = new ArrayList<List<LogSegment>>();
        List<LogSegment> segments = new ArrayList<LogSegment>(segmentsCollection);
        while (!segments.isEmpty()) {
            ArrayList<LogSegment> group = new ArrayList<LogSegment>();
            group.add((LogSegment)segments.get(0));
            long logSize = ((LogSegment)segments.get(0)).size();
            long indexSize = ((LogSegment)segments.get(0)).offsetIndex().sizeInBytes();
            long timeIndexSize = ((LogSegment)segments.get(0)).timeIndex().sizeInBytes();
            segments = segments.subList(1, segments.size());
            while (!(segments.isEmpty() || logSize + (long)segments.get(0).size() > (long)maxSize || indexSize + (long)segments.get(0).offsetIndex().sizeInBytes() > (long)maxIndexSize || timeIndexSize + (long)segments.get(0).timeIndex().sizeInBytes() > (long)maxIndexSize || segments.get(0).size() != 0 && this.lastOffsetForFirstSegment(segments, firstUncleanableOffset) - ((LogSegment)group.get(group.size() - 1)).baseOffset() > Integer.MAX_VALUE)) {
                group.add(0, segments.get(0));
                logSize += (long)segments.get(0).size();
                indexSize += (long)segments.get(0).offsetIndex().sizeInBytes();
                timeIndexSize += (long)segments.get(0).timeIndex().sizeInBytes();
                segments = segments.subList(1, segments.size());
            }
            Collections.reverse(group);
            grouped.add(0, group);
        }
        Collections.reverse(grouped);
        return grouped;
    }

    private long lastOffsetForFirstSegment(List<LogSegment> segs, long firstUncleanableOffset) {
        if (segs.size() > 1) {
            return segs.get(1).baseOffset() - 1L;
        }
        return firstUncleanableOffset - 1L;
    }

    public boolean buildOffsetMap(AbstractLog log, long start, long end, OffsetMap map, CleanerStats stats) throws DigestException {
        List<LogSegmentReadAdapter> dirty = this.logSegments(log, start, end);
        return this.buildOffsetMap(log, dirty, start, end, map, stats);
    }

    private boolean buildOffsetMap(AbstractLog log, List<LogSegmentReadAdapter> segments, long start, long end, OffsetMap map, CleanerStats stats) throws DigestException {
        map.clear();
        ArrayList<LogSegmentReadAdapter> dirty = new ArrayList<LogSegmentReadAdapter>(segments);
        ArrayList<Long> nextSegmentStartOffsets = new ArrayList<Long>();
        if (!dirty.isEmpty()) {
            for (int i = 1; i < dirty.size(); ++i) {
                nextSegmentStartOffsets.add(((LogSegmentReadAdapter)dirty.get(i)).baseOffset());
            }
            nextSegmentStartOffsets.add(end);
        }
        this.logger.info("Building offset map for log {} for {} segments in offset range [{}, {}).", new Object[]{log.name(), dirty.size(), start, end});
        CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata();
        List<AbortedTxn> abortedTransactions = this.collectAbortedTransactions(log, start, end);
        transactionMetadata.addAbortedTransactions(abortedTransactions);
        boolean full = false;
        int mappedSegmentCount = 0;
        for (int i = 0; i < dirty.size() && !full; ++i) {
            LogSegmentReadAdapter segment = (LogSegmentReadAdapter)dirty.get(i);
            long nextSegmentStartOffset = (Long)nextSegmentStartOffsets.get(i);
            this.checkDone.accept(log.topicPartition());
            if (segment.endOffset() < log.logStartOffset() && segment.baseOffset() < log.logStartOffset()) {
                this.logger.warn("End offset of segment: {} is smaller than log start offset {}. Abort current cleaning operation and will be retried", (Object)segment, (Object)log.logStartOffset());
                throw new LogCleaningAbortedException();
            }
            this.logger.info("Building offset map for segment {}/{}: {}", new Object[]{++mappedSegmentCount, dirty.size(), segment});
            full = this.buildOffsetMapForSegment(log.topicPartition(), segment, map, start, nextSegmentStartOffset, log.config().maxMessageSize(), transactionMetadata, stats);
            if (!full) continue;
            this.logger.info("Offset map is full, {} segments fully mapped, segment with base offset {} is partially mapped", (Object)(mappedSegmentCount - 1), (Object)segment.baseOffset());
        }
        return full;
    }

    private boolean buildOffsetMapForSegment(TopicPartition topicPartition, LogSegmentReadAdapter source, OffsetMap map, long startOffset, long upperBoundOffset, int maxLogMessageSize, CleanedTransactionMetadata transactionMetadata, CleanerStats stats) throws DigestException {
        int position = source.startPosition();
        int maxDesiredMapSize = (int)((double)map.slots() * this.dupBufferLoadFactor);
        while (position < source.endPosition() && map.latestOffset() < upperBoundOffset - 1L) {
            this.checkDone.accept(topicPartition);
            this.readBuffer.clear();
            source.readBytes(this.readBuffer, position);
            MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)this.readBuffer);
            stats.recordThrottledTime(this.throttler.maybeThrottle(records.sizeInBytes()));
            int startPosition = position;
            Iterator batchIter = records.batches().iterator();
            while (batchIter.hasNext() && map.latestOffset() < upperBoundOffset - 1L) {
                RecordBatch batch = (RecordBatch)batchIter.next();
                if (batch.isControlBatch()) {
                    transactionMetadata.onControlBatchRead(batch);
                    stats.indexMessagesRead(1);
                } else {
                    boolean isAborted = transactionMetadata.onBatchRead(batch);
                    if (isAborted) {
                        stats.indexMessagesRead(batch.countOrNull());
                    } else {
                        try (CloseableIterator recordsIterator = batch.streamingIterator(this.decompressionBufferSupplier);){
                            for (Record record : () -> recordsIterator) {
                                if (record.hasKey() && record.offset() >= startOffset) {
                                    if (map.size() < maxDesiredMapSize) {
                                        map.put(record.key(), record.offset(), !record.hasValue());
                                    } else {
                                        boolean bl = true;
                                        return bl;
                                    }
                                }
                                stats.indexMessagesRead(1);
                            }
                        }
                    }
                }
                if (batch.lastOffset() < startOffset) continue;
                map.updateLatestOffset(batch.lastOffset());
            }
            int bytesRead = records.validBytes();
            stats.indexBytesRead(bytesRead);
            if ((position += bytesRead) != startPosition) continue;
            this.growBuffersOrFail(source, position, maxLogMessageSize, records);
        }
        map.updateLatestOffset(upperBoundOffset - 1L);
        this.restoreBuffers();
        return false;
    }

    public ValidateLogResult validateLog(AbstractLog log, long start, long end, ExtendedOffsetMap baseOffsetMap, CleanerStats stats) throws DigestException {
        this.offsetMap.clear();
        ExtendedOffsetMap diffOffsetMap = new ExtendedOffsetMap(this.offsetMap);
        List<LogSegmentReadAdapter> segments = this.logSegments(log, start, end);
        CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata();
        List<AbortedTxn> abortedTransactions = this.collectAbortedTransactions(log, start, end);
        transactionMetadata.addAbortedTransactions(abortedTransactions);
        long totalValidatedCount = 0L;
        long validatedCount = 0L;
        for (LogSegmentReadAdapter segment : segments) {
            if (validatedCount < 0L || (validatedCount = this.validateLogSegment(log.topicPartition(), segment, baseOffsetMap, diffOffsetMap, start, 0, transactionMetadata, stats)) <= 0L) continue;
            totalValidatedCount += validatedCount;
        }
        if (validatedCount < 0L) {
            this.logger.warn("{}: log validation terminated prematurely because diff offset map is full. baseOffsetMap.size: {}, diffOffsetMap.size={}, diffOffsetMap.tombstoneCount={}. found record count: {}", new Object[]{log.topicPartition(), baseOffsetMap.size(), diffOffsetMap.size(), diffOffsetMap.tombstoneCount(), totalValidatedCount});
            return ValidateLogResult.Aborted;
        }
        if (totalValidatedCount != (long)(baseOffsetMap.size() - baseOffsetMap.tombstoneCount()) || diffOffsetMap.size() != diffOffsetMap.tombstoneCount()) {
            this.logger.error("{}: log validation failed. Log data in offset range [{}, {}) is inconsistent with baseOffsetMap. baseOffsetMap.size: {}, baseOffsetMap.tombstoneCount={}. diffOffsetMap.size={}, diffOffsetMap.tombstoneCount={}. found record count: {}", new Object[]{log.topicPartition(), start, end, baseOffsetMap.size(), baseOffsetMap.tombstoneCount(), diffOffsetMap.size(), diffOffsetMap.tombstoneCount(), totalValidatedCount});
            return ValidateLogResult.Failed;
        }
        return ValidateLogResult.Passed;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private long validateLogSegment(TopicPartition topicPartition, LogSegmentReadAdapter source, ExtendedOffsetMap baseMap, ExtendedOffsetMap diffMap, long startOffset, int maxLogMessageSize, CleanedTransactionMetadata transactionMetadata, CleanerStats stats) throws DigestException {
        int diffMapMaxSize = (int)((double)diffMap.slots() * this.dupBufferLoadFactor);
        long validatedRecordCount = 0L;
        int position = source.startPosition();
        while (true) {
            if (position >= source.endPosition()) {
                this.restoreBuffers();
                return validatedRecordCount;
            }
            this.checkDone.accept(topicPartition);
            this.readBuffer.clear();
            source.readBytes(this.readBuffer, position);
            MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)this.readBuffer);
            stats.recordThrottledTime(this.throttler.maybeThrottle(records.sizeInBytes()));
            int startPosition = position;
            for (RecordBatch batch : records.batches()) {
                if (batch.isControlBatch()) {
                    transactionMetadata.onControlBatchRead(batch);
                    stats.indexMessagesRead(1);
                    continue;
                }
                boolean isAborted = transactionMetadata.onBatchRead(batch);
                if (isAborted) {
                    stats.indexMessagesRead(batch.countOrNull());
                    continue;
                }
                CloseableIterator recordsIterator = batch.streamingIterator(this.decompressionBufferSupplier);
                try {
                    for (Record record : () -> recordsIterator) {
                        if (record.hasKey() && record.offset() >= startOffset) {
                            long off = baseMap.get(record.key());
                            if (off == -1L) {
                                if (diffMap.size() >= diffMapMaxSize) {
                                    long l = -1L;
                                    return l;
                                }
                                diffMap.put(record.key(), record.offset(), !record.hasValue());
                            } else if (record.offset() >= off) {
                                if (record.offset() == off) {
                                    if (record.hasValue()) {
                                        ++validatedRecordCount;
                                    }
                                } else {
                                    if (diffMap.size() >= diffMapMaxSize) {
                                        long l = -1L;
                                        return l;
                                    }
                                    diffMap.put(record.key(), record.offset(), !record.hasValue());
                                }
                            }
                        }
                        stats.indexMessagesRead(1);
                    }
                }
                finally {
                    if (recordsIterator == null) continue;
                    recordsIterator.close();
                }
            }
            int bytesRead = records.validBytes();
            stats.indexBytesRead(bytesRead);
            if ((position += bytesRead) != startPosition) continue;
            this.growBuffersOrFail(source, position, maxLogMessageSize, records);
        }
    }

    public List<LogSegmentReadAdapter> logSegments(AbstractLog log, long from, long to) {
        ArrayList<LocalLogSegmentReadAdapter> adapters = new ArrayList<LocalLogSegmentReadAdapter>();
        for (LogSegment segment : log.localLogSegments(from, to)) {
            adapters.add(new LocalLogSegmentReadAdapter(log, segment));
        }
        return adapters.stream().filter(x -> x.endOffset() >= from).toList();
    }

    public List<AbortedTxn> collectAbortedTransactions(AbstractLog log, long start, long end) {
        return log.collectAbortedTransactions(start, end, false);
    }
}

