package kafka.log;

import java.io.File;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.checkpoints.OffsetCheckpointFile;
import kafka.server.checkpoints.OffsetCheckpointFile$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.ArrayOps$;
import scala.collection.IndexedSeqOps;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.math.Numeric$IntIsIntegral$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;
import scala.runtime.RichLong$;

/* compiled from: LogCleanerParameterizedIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\tEb\u0001\u0002\u000e\u001c\u0001\u0001BQ!\n\u0001\u0005\u0002\u0019Bq\u0001\u000b\u0001C\u0002\u0013\u0005\u0011\u0006\u0003\u00048\u0001\u0001\u0006IA\u000b\u0005\bq\u0001\u0011\r\u0011\"\u0001:\u0011\u00191\u0005\u0001)A\u0005u!)q\t\u0001C\u0001\u0011\"9\u0011q\u000b\u0001\u0005\u0002\u0005e\u0003bBA1\u0001\u0011\u0005\u00111\r\u0005\b\u0003_\u0002A\u0011AA9\u0011\u001d\tI\b\u0001C\u0001\u0003wBq!a!\u0001\t\u0013\t)\tC\u0004\u00026\u0002!I!a.\t\u000f\u0005m\u0007\u0001\"\u0003\u0002^\"9\u0011Q\u001f\u0001\u0005\n\u0005]\bbBA~\u0001\u0011%\u0011Q \u0005\n\u00053\u0001\u0011\u0013!C\u0005\u000579Q!Z\u000e\t\u0002\u00194QAG\u000e\t\u0002\u001dDQ!\n\n\u0005\u0002-4A\u0001\u001c\n\u0001[\")Q\u0005\u0006C\u0001s\")A\u0010\u0006C!{\u001a1\u0011q\u0007\n\u0001\u0003sAa!J\f\u0005\u0002\u0005m\u0002B\u0002?\u0018\t\u0003\nyD\u0001\u0014M_\u001e\u001cE.Z1oKJ\u0004\u0016M]1nKR,'/\u001b>fI&sG/Z4sCRLwN\u001c+fgRT!\u0001H\u000f\u0002\u00071|wMC\u0001\u001f\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0011\u0011\u0005\t\u001aS\"A\u000e\n\u0005\u0011Z\"!I!cgR\u0014\u0018m\u0019;M_\u001e\u001cE.Z1oKJLe\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001(!\t\u0011\u0003!\u0001\u0003uS6,W#\u0001\u0016\u0011\u0005-*T\"\u0001\u0017\u000b\u00055r\u0013\u0001B;uS2T!a\f\u0019\u0002\rM,'O^3s\u0015\tq\u0012G\u0003\u00023g\u00051\u0011\r]1dQ\u0016T\u0011\u0001N\u0001\u0004_J<\u0017B\u0001\u001c-\u0005!iunY6US6,\u0017!\u0002;j[\u0016\u0004\u0013a\u0004;pa&\u001c\u0007+\u0019:uSRLwN\\:\u0016\u0003i\u00022a\u000f A\u001b\u0005a$\"A\u001f\u0002\u000bM\u001c\u0017\r\\1\n\u0005}b$!B!se\u0006L\bCA!E\u001b\u0005\u0011%BA\"1\u0003\u0019\u0019w.\\7p]&\u0011QI\u0011\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0003A!x\u000e]5d!\u0006\u0014H/\u001b;j_:\u001c\b%A\u0006dY\u0016\fg.\u001a:UKN$HCA%M!\tY$*\u0003\u0002Ly\t!QK\\5u\u0011\u0015ie\u00011\u0001O\u0003\u0015\u0019w\u000eZ3d!\ty%+D\u0001Q\u0015\t\t&)\u0001\u0004sK\u000e|'\u000fZ\u0005\u0003'B\u0013qbQ8naJ,7o]5p]RK\b/\u001a\u0015\u0005\rU\u000b'\r\u0005\u0002W?6\tqK\u0003\u0002Y3\u0006A\u0001O]8wS\u0012,'O\u0003\u0002[7\u00061\u0001/\u0019:b[NT!\u0001X/\u0002\u000f),\b/\u001b;fe*\u0011alM\u0001\u0006UVt\u0017\u000e^\u0005\u0003A^\u0013q\"\u0011:hk6,g\u000e^:T_V\u00148-Z\u0001\u0006m\u0006dW/Z\u0012\u0002GB\u0011A\r\u0006\b\u0003EE\ta\u0005T8h\u00072,\u0017M\\3s!\u0006\u0014\u0018-\\3uKJL'0\u001a3J]R,wM]1uS>tG+Z:u!\t\u0011#c\u0005\u0002\u0013QB\u00111([\u0005\u0003Ur\u0012a!\u00118z%\u00164G#\u00014\u0003\u001f\u0005cGnQ8naJ,7o]5p]N\u001c2\u0001\u00068w!\tyG/D\u0001q\u0015\t\t(/\u0001\u0003mC:<'\"A:\u0002\t)\fg/Y\u0005\u0003kB\u0014aa\u00142kK\u000e$\bC\u0001,x\u0013\tAxKA\tBe\u001e,X.\u001a8ugB\u0013xN^5eKJ$\u0012A\u001f\t\u0003wRi\u0011AE\u0001\u0011aJ|g/\u001b3f\u0003J<W/\\3oiN$2A`A\u0012a\ry\u0018\u0011\u0003\t\u0007\u0003\u0003\tI!!\u0004\u000e\u0005\u0005\r!\u0002BA\u0003\u0003\u000f\taa\u001d;sK\u0006l'BA\u0017s\u0013\u0011\tY!a\u0001\u0003\rM#(/Z1n!\u0011\ty!!\u0005\r\u0001\u0011Y\u00111\u0003\f\u0002\u0002\u0003\u0005)\u0011AA\u000b\u0005\ryF%M\t\u0005\u0003/\ti\u0002E\u0002<\u00033I1!a\u0007=\u0005\u001dqu\u000e\u001e5j]\u001e\u00042AVA\u0010\u0013\r\t\tc\u0016\u0002\n\u0003J<W/\\3oiNDq!!\n\u0017\u0001\u0004\t9#A\u0004d_:$X\r\u001f;\u0011\t\u0005%\u00121G\u0007\u0003\u0003WQA!!\f\u00020\u0005IQ\r\u001f;f]NLwN\u001c\u0006\u0004\u0003cY\u0016aA1qS&!\u0011QGA\u0016\u0005A)\u0005\u0010^3og&|gnQ8oi\u0016DHOA\u0006Fq\u000edW\u000fZ3[gR$7cA\fomR\u0011\u0011Q\b\t\u0003w^!B!!\u0011\u0002LA\"\u00111IA$!\u0019\t\t!!\u0003\u0002FA!\u0011qBA$\t-\tI%GA\u0001\u0002\u0003\u0015\t!!\u0006\u0003\u0007}##\u0007C\u0004\u0002&e\u0001\r!a\n)\u0007\u0019\ty\u0005\u0005\u0003\u0002R\u0005MS\"A-\n\u0007\u0005U\u0013LA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fq\u0005^3ti\u000ecW-\u00198t\u0007>l'-\u001b8fI\u000e{W\u000e]1di\u0006sG\rR3mKR,Gk\u001c9jGR\u0019\u0011*a\u0017\t\u000b5;\u0001\u0019\u0001()\t\u001d)\u0016M\u0019\u0015\u0004\u000f\u0005=\u0013A\b;fgR\u001cE.Z1oKJ<\u0016\u000e\u001e5NKN\u001c\u0018mZ3G_Jl\u0017\r\u001e,1)\rI\u0015Q\r\u0005\u0006\u001b\"\u0001\rA\u0014\u0015\u0006\u0011U\u000b\u0017\u0011N\u0012\u0003\u0003W\u0002\"\u0001Z\f)\u0007!\ty%A\u0013uKN$8\t\\3b]&twMT3ti\u0016$W*Z:tC\u001e,7oV5uQZ\u0003\u0014I\u001c3WcQ\u0019\u0011*a\u001d\t\u000b5K\u0001\u0019\u0001()\u000b%)\u0016-!\u001b)\u0007%\ty%A\fdY\u0016\fg.\u001a:D_:4\u0017nZ+qI\u0006$X\rV3tiR\u0019\u0011*! \t\u000b5S\u0001\u0019\u0001()\t))\u0016M\u0019\u0015\u0004\u0015\u0005=\u0013\u0001E2iK\u000e\\G*Y:u\u00072,\u0017M\\3e)\u001dI\u0015qQAQ\u0003WCq!!#\f\u0001\u0004\tY)A\u0003u_BL7\r\u0005\u0003\u0002\u000e\u0006me\u0002BAH\u0003/\u00032!!%=\u001b\t\t\u0019JC\u0002\u0002\u0016~\ta\u0001\u0010:p_Rt\u0014bAAMy\u00051\u0001K]3eK\u001aLA!!(\u0002 \n11\u000b\u001e:j]\u001eT1!!'=\u0011\u001d\t\u0019k\u0003a\u0001\u0003K\u000b1\u0002]1si&$\u0018n\u001c8JIB\u00191(a*\n\u0007\u0005%FHA\u0002J]RDq!!,\f\u0001\u0004\ty+\u0001\u0006gSJ\u001cH\u000fR5sif\u00042aOAY\u0013\r\t\u0019\f\u0010\u0002\u0005\u0019>tw-\u0001\u000edQ\u0016\u001c7\u000eT8h\u0003\u001a$XM]!qa\u0016tG-\u001b8h\tV\u00048\u000fF\u0004J\u0003s\u000b\t-!2\t\rqa\u0001\u0019AA^!\r\u0011\u0013QX\u0005\u0004\u0003\u007f[\"AC+oS\u001aLW\r\u001a'pO\"9\u00111\u0019\u0007A\u0002\u0005=\u0016!C:uCJ$8+\u001b>f\u0011\u001d\t9\r\u0004a\u0001\u0003\u0013\fq!\u00199qK:$7\u000f\u0005\u0004\u0002L\u0006E\u0017Q[\u0007\u0003\u0003\u001bT1!a4=\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0003'\fiMA\u0002TKF\u0004\u0012bOAl\u0003K\u000bY)a,\n\u0007\u0005eGH\u0001\u0004UkBdWmM\u0001\u0006i>l\u0015\r\u001d\u000b\u0005\u0003?\fY\u000f\u0005\u0005\u0002L\u0006\u0005\u0018QUAs\u0013\u0011\t\u0019/!4\u0003\u00075\u000b\u0007\u000fE\u0004<\u0003O\fY)a,\n\u0007\u0005%HH\u0001\u0004UkBdWM\r\u0005\b\u0003[l\u0001\u0019AAx\u0003!iWm]:bO\u0016\u001c\bCBAf\u0003c\f).\u0003\u0003\u0002t\u00065'\u0001C%uKJ\f'\r\\3\u0002\u0017I,\u0017\r\u001a$s_6dun\u001a\u000b\u0005\u0003_\fI\u0010\u0003\u0004\u001d\u001d\u0001\u0007\u00111X\u0001\u001aoJLG/\u001a#vaN\u001c\u0016N\\4mK6+7o]1hKN+G\u000f\u0006\b\u0002J\u0006}(1\u0001B\u0004\u0005\u0013\u0011YAa\u0004\t\u000f\t\u0005q\u00021\u0001\u0002&\u00069a.^7LKf\u001c\bb\u0002B\u0003\u001f\u0001\u0007\u0011QU\u0001\b]VlG)\u001e9t\u0011\u0019ar\u00021\u0001\u0002<\")Qj\u0004a\u0001\u001d\"I!QB\b\u0011\u0002\u0003\u0007\u0011QU\u0001\tgR\f'\u000f^&fs\"9!\u0011C\bA\u0002\tM\u0011AC7bO&\u001cg+\u00197vKB\u00191H!\u0006\n\u0007\t]AH\u0001\u0003CsR,\u0017aI<sSR,G)\u001e9t'&tw\r\\3NKN\u001c\u0018mZ3TKR$C-\u001a4bk2$H%N\u000b\u0003\u0005;QC!!*\u0003 -\u0012!\u0011\u0005\t\u0005\u0005G\u0011i#\u0004\u0002\u0003&)!!q\u0005B\u0015\u0003%)hn\u00195fG.,GMC\u0002\u0003,q\n!\"\u00198o_R\fG/[8o\u0013\u0011\u0011yC!\n\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r")
/* loaded from: input_file:kafka/log/LogCleanerParameterizedIntegrationTest.class */
public class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrationTest {
    private final MockTime time = new MockTime();
    private final TopicPartition[] topicPartitions = {new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)};

    /* compiled from: LogCleanerParameterizedIntegrationTest.scala */
    /* loaded from: input_file:kafka/log/LogCleanerParameterizedIntegrationTest$AllCompressions.class */
    public static class AllCompressions implements ArgumentsProvider {
        public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
            return Arrays.stream((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(CompressionType.values()), compressionType -> {
                return Arguments.of(new Object[]{compressionType});
            }, ClassTag$.MODULE$.apply(Arguments.class)));
        }
    }

    /* compiled from: LogCleanerParameterizedIntegrationTest.scala */
    /* loaded from: input_file:kafka/log/LogCleanerParameterizedIntegrationTest$ExcludeZstd.class */
    public static class ExcludeZstd implements ArgumentsProvider {
        public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
            return Arrays.stream((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[]) ArrayOps$.MODULE$.filter$extension(Predef$.MODULE$.refArrayOps(CompressionType.values()), compressionType -> {
                return BoxesRunTime.boxToBoolean($anonfun$provideArguments$2(compressionType));
            })), compressionType2 -> {
                return Arguments.of(new Object[]{compressionType2});
            }, ClassTag$.MODULE$.apply(Arguments.class)));
        }

        public static final /* synthetic */ boolean $anonfun$provideArguments$2(CompressionType compressionType) {
            CompressionType compressionType2 = CompressionType.ZSTD;
            return compressionType == null ? compressionType2 != null : !compressionType.equals(compressionType2);
        }
    }

    @Override // kafka.log.AbstractLogCleanerIntegrationTest
    public MockTime time() {
        return this.time;
    }

    public TopicPartition[] topicPartitions() {
        return this.topicPartitions;
    }

    @ArgumentsSource(AllCompressions.class)
    @ParameterizedTest
    public void cleanerTest(CompressionType compressionType) {
        Tuple2<String, MemoryRecords> createLargeSingleMessageSet = createLargeSingleMessageSet(20, (byte) 2, compressionType);
        if (createLargeSingleMessageSet == null) {
            throw new MatchError((Object) null);
        }
        String str = (String) createLargeSingleMessageSet._1();
        MemoryRecords memoryRecords = (MemoryRecords) createLargeSingleMessageSet._2();
        cleaner_$eq(makeCleaner(Predef$.MODULE$.wrapRefArray(topicPartitions()), makeCleaner$default$2(), makeCleaner$default$3(), makeCleaner$default$4(), memoryRecords.sizeInBytes(), makeCleaner$default$6(), makeCleaner$default$7(), makeCleaner$default$8(), makeCleaner$default$9(), makeCleaner$default$10(), makeCleaner$default$11()));
        UnifiedLog unifiedLog = (UnifiedLog) cleaner().logs().get(topicPartitions()[0]);
        Seq<Tuple3<Object, String, Object>> writeDups = writeDups(100, 3, unifiedLog, compressionType, writeDups$default$5(), writeDups$default$6());
        long size = unifiedLog.size();
        cleaner().startup();
        checkLastCleaned("log", 0, unifiedLog.activeSegment().baseOffset());
        int unboxToInt = BoxesRunTime.unboxToInt(((IterableOnceOps) unifiedLog.logSegments().map(logSegment -> {
            return BoxesRunTime.boxToInteger(logSegment.size());
        })).sum(Numeric$IntIsIntegral$.MODULE$));
        Assertions.assertTrue(size > ((long) unboxToInt), new StringBuilder(57).append("log should have been compacted: startSize=").append(size).append(" compactedSize=").append(unboxToInt).toString());
        checkLogAfterAppendingDups(unifiedLog, size, writeDups);
        LogAppendInfo appendAsLeader = unifiedLog.appendAsLeader(memoryRecords, 0, unifiedLog.appendAsLeader$default$3(), unifiedLog.appendAsLeader$default$4(), unifiedLog.appendAsLeader$default$5(), unifiedLog.appendAsLeader$default$6());
        unifiedLog.updateHighWatermark(unifiedLog.logEndOffset());
        Seq<Tuple3<Object, String, Object>> seq = (Seq) ((IterableOps) writeDups.$plus$plus(new $colon.colon(new Tuple3(BoxesRunTime.boxToInteger(20), str, BoxesRunTime.boxToLong(((LogOffsetMetadata) appendAsLeader.firstOffset().get()).messageOffset)), Nil$.MODULE$))).$plus$plus(writeDups(100, 3, unifiedLog, compressionType, 20 + 1, writeDups$default$6()));
        checkLastCleaned("log", 0, unifiedLog.activeSegment().baseOffset());
        checkLogAfterAppendingDups(unifiedLog, size, seq);
        cleaner().logs().remove(topicPartitions()[0]);
        cleaner().updateCheckpoints(logDir(), Option$.MODULE$.apply(topicPartitions()[0]));
        File file = new File(logDir(), cleaner().cleanerManager().offsetCheckpointFile());
        OffsetCheckpointFile$ offsetCheckpointFile$ = OffsetCheckpointFile$.MODULE$;
        Assertions.assertFalse(new OffsetCheckpointFile(file, (LogDirFailureChannel) null).read().contains(topicPartitions()[0]));
    }

    @ArgumentsSource(AllCompressions.class)
    @ParameterizedTest
    public void testCleansCombinedCompactAndDeleteTopic(CompressionType compressionType) {
        Properties properties = new Properties();
        Integer int2Integer = Predef$.MODULE$.int2Integer(100000);
        properties.put("retention.ms", int2Integer);
        properties.put("cleanup.policy", "compact,delete");
        Tuple2 runCleanerAndCheckCompacted$1 = runCleanerAndCheckCompacted$1(100, properties, compressionType);
        if (runCleanerAndCheckCompacted$1 == null) {
            throw new MatchError((Object) null);
        }
        UnifiedLog unifiedLog = (UnifiedLog) runCleanerAndCheckCompacted$1._1();
        long logEndOffset = unifiedLog.logEndOffset();
        unifiedLog.logSegments().foreach(logSegment -> {
            return logSegment.lastModified_$eq(this.time().milliseconds() - (2 * Predef$.MODULE$.Integer2int(int2Integer)));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testCleansCombinedCompactAndDeleteTopic$3(unifiedLog, logEndOffset)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for deletion of old segments");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(1, unifiedLog.numberOfSegments());
        cleaner().shutdown();
        Tuple2 runCleanerAndCheckCompacted$12 = runCleanerAndCheckCompacted$1(20, properties, compressionType);
        if (runCleanerAndCheckCompacted$12 == null) {
            throw new MatchError((Object) null);
        }
        UnifiedLog unifiedLog2 = (UnifiedLog) runCleanerAndCheckCompacted$12._1();
        Seq seq = (Seq) runCleanerAndCheckCompacted$12._2();
        Assertions.assertEquals(toMap(seq), toMap(readFromLog(unifiedLog2)), "Contents of the map shouldn't change");
    }

    @ArgumentsSource(ExcludeZstd.class)
    @ParameterizedTest
    public void testCleanerWithMessageFormatV0(CompressionType compressionType) {
        Tuple2<String, MemoryRecords> createLargeSingleMessageSet = createLargeSingleMessageSet(20, (byte) 0, compressionType);
        if (createLargeSingleMessageSet == null) {
            throw new MatchError((Object) null);
        }
        String str = (String) createLargeSingleMessageSet._1();
        MemoryRecords memoryRecords = (MemoryRecords) createLargeSingleMessageSet._2();
        int sizeInBytes = CompressionType.NONE.equals(compressionType) ? memoryRecords.sizeInBytes() : memoryRecords.sizeInBytes() + 6;
        cleaner_$eq(makeCleaner(Predef$.MODULE$.wrapRefArray(topicPartitions()), makeCleaner$default$2(), makeCleaner$default$3(), makeCleaner$default$4(), sizeInBytes, makeCleaner$default$6(), makeCleaner$default$7(), makeCleaner$default$8(), makeCleaner$default$9(), makeCleaner$default$10(), makeCleaner$default$11()));
        UnifiedLog unifiedLog = (UnifiedLog) cleaner().logs().get(topicPartitions()[0]);
        Properties logConfigProperties = logConfigProperties(logConfigProperties$default$1(), sizeInBytes, logConfigProperties$default$3(), logConfigProperties$default$4(), logConfigProperties$default$5(), logConfigProperties$default$6(), logConfigProperties$default$7());
        logConfigProperties.put("message.format.version", MetadataVersion.IBP_0_9_0.version());
        unifiedLog.updateConfig(new LogConfig(logConfigProperties));
        Seq<Tuple3<Object, String, Object>> writeDups = writeDups(100, 3, unifiedLog, compressionType, writeDups$default$5(), (byte) 0);
        long size = unifiedLog.size();
        cleaner().startup();
        checkLastCleaned("log", 0, unifiedLog.activeSegment().baseOffset());
        int unboxToInt = BoxesRunTime.unboxToInt(((IterableOnceOps) unifiedLog.logSegments().map(logSegment -> {
            return BoxesRunTime.boxToInteger(logSegment.size());
        })).sum(Numeric$IntIsIntegral$.MODULE$));
        Assertions.assertTrue(size > ((long) unboxToInt), new StringBuilder(57).append("log should have been compacted: startSize=").append(size).append(" compactedSize=").append(unboxToInt).toString());
        checkLogAfterAppendingDups(unifiedLog, size, writeDups);
        Seq<Tuple3<Object, String, Object>> writeDups2 = writeDups(40, 3, unifiedLog, compressionType, writeDups$default$5(), (byte) 0);
        LogAppendInfo appendAsLeader = unifiedLog.appendAsLeader(memoryRecords, 0, unifiedLog.appendAsLeader$default$3(), unifiedLog.appendAsLeader$default$4(), unifiedLog.appendAsLeader$default$5(), unifiedLog.appendAsLeader$default$6());
        unifiedLog.updateHighWatermark(unifiedLog.logEndOffset());
        long unboxToLong = BoxesRunTime.unboxToLong(appendAsLeader.firstOffset().map(logOffsetMetadata -> {
            return BoxesRunTime.boxToLong(logOffsetMetadata.messageOffset);
        }).get());
        logConfigProperties.put("message.format.version", MetadataVersion.IBP_0_11_0_IV0.version());
        unifiedLog.updateConfig(new LogConfig(logConfigProperties));
        Seq<Tuple3<Object, String, Object>> seq = (Seq) ((IterableOps) ((IterableOps) ((IterableOps) writeDups.$plus$plus(writeDups2)).$plus$plus(new $colon.colon(new Tuple3(BoxesRunTime.boxToInteger(20), str, BoxesRunTime.boxToLong(unboxToLong)), Nil$.MODULE$))).$plus$plus(writeDups(40, 3, unifiedLog, compressionType, 30, (byte) 1))).$plus$plus(writeDups(5, 3, unifiedLog, compressionType, 15, (byte) 2));
        checkLastCleaned("log", 0, unifiedLog.activeSegment().baseOffset());
        checkLogAfterAppendingDups(unifiedLog, size, seq);
    }

    @ArgumentsSource(ExcludeZstd.class)
    @ParameterizedTest
    public void testCleaningNestedMessagesWithV0AndV1(CompressionType compressionType) {
        cleaner_$eq(makeCleaner(Predef$.MODULE$.wrapRefArray(topicPartitions()), makeCleaner$default$2(), makeCleaner$default$3(), makeCleaner$default$4(), 192, makeCleaner$default$6(), makeCleaner$default$7(), 256, makeCleaner$default$9(), makeCleaner$default$10(), makeCleaner$default$11()));
        UnifiedLog unifiedLog = (UnifiedLog) cleaner().logs().get(topicPartitions()[0]);
        Properties logConfigProperties = logConfigProperties(logConfigProperties$default$1(), 192, logConfigProperties$default$3(), logConfigProperties$default$4(), logConfigProperties$default$5(), 256, logConfigProperties$default$7());
        logConfigProperties.put("message.format.version", MetadataVersion.IBP_0_9_0.version());
        unifiedLog.updateConfig(new LogConfig(logConfigProperties));
        Seq seq = (Seq) writeDupsSingleMessageSet(2, 3, unifiedLog, compressionType, 0, (byte) 0).$plus$plus(writeDupsSingleMessageSet(2, 2, unifiedLog, compressionType, 3, (byte) 0));
        logConfigProperties.put("message.format.version", MetadataVersion.IBP_0_10_0_IV1.version());
        unifiedLog.updateConfig(new LogConfig(logConfigProperties));
        Seq<Tuple3<Object, String, Object>> seq2 = (Seq) seq.$plus$plus((Seq) ((Seq) writeDupsSingleMessageSet(2, 2, unifiedLog, compressionType, 4, (byte) 1).$plus$plus(writeDupsSingleMessageSet(2, 2, unifiedLog, compressionType, 4, (byte) 1))).$plus$plus(writeDupsSingleMessageSet(2, 2, unifiedLog, compressionType, 6, (byte) 1)));
        long size = unifiedLog.size();
        cleaner().startup();
        long baseOffset = unifiedLog.activeSegment().baseOffset();
        Assertions.assertTrue(baseOffset > ((long) seq.size()));
        checkLastCleaned("log", 0, baseOffset);
        int unboxToInt = BoxesRunTime.unboxToInt(((IterableOnceOps) unifiedLog.logSegments().map(logSegment -> {
            return BoxesRunTime.boxToInteger(logSegment.size());
        })).sum(Numeric$IntIsIntegral$.MODULE$));
        Assertions.assertTrue(size > ((long) unboxToInt), new StringBuilder(57).append("log should have been compacted: startSize=").append(size).append(" compactedSize=").append(unboxToInt).toString());
        checkLogAfterAppendingDups(unifiedLog, size, seq2);
    }

    @ArgumentsSource(AllCompressions.class)
    @ParameterizedTest
    public void cleanerConfigUpdateTest(CompressionType compressionType) {
        Tuple2<String, MemoryRecords> createLargeSingleMessageSet = createLargeSingleMessageSet(20, (byte) 2, compressionType);
        if (createLargeSingleMessageSet == null) {
            throw new MatchError((Object) null);
        }
        cleaner_$eq(makeCleaner(Predef$.MODULE$.wrapRefArray(topicPartitions()), makeCleaner$default$2(), makeCleaner$default$3(), 1L, ((MemoryRecords) createLargeSingleMessageSet._2()).sizeInBytes(), makeCleaner$default$6(), makeCleaner$default$7(), makeCleaner$default$8(), makeCleaner$default$9(), new Some(BoxesRunTime.boxToInteger(1)), makeCleaner$default$11()));
        UnifiedLog unifiedLog = (UnifiedLog) cleaner().logs().get(topicPartitions()[0]);
        writeDups(100, 3, unifiedLog, compressionType, writeDups$default$5(), writeDups$default$6());
        long size = unifiedLog.size();
        cleaner().startup();
        Assertions.assertEquals(1, cleaner().cleanerCount());
        long baseOffset = unifiedLog.activeSegment().baseOffset();
        cleaner().awaitCleaned(new TopicPartition("log", 0), baseOffset, 10L);
        Assertions.assertTrue(cleaner().cleanerManager().allCleanerCheckpoints().isEmpty(), "Should not have cleaned");
        cleaner().reconfigure(kafkaConfigWithCleanerConfig$1(cleaner().currentConfig()), kafkaConfigWithCleanerConfig$1(new CleanerConfig(2, cleaner().currentConfig().dedupeBufferSize, cleaner().currentConfig().dedupeBufferLoadFactor, 100000, cleaner().currentConfig().maxMessageSize, cleaner().currentConfig().maxIoBytesPerSecond, cleaner().currentConfig().backoffMs, true)));
        Assertions.assertEquals(2, cleaner().cleanerCount());
        checkLastCleaned("log", 0, baseOffset);
        int unboxToInt = BoxesRunTime.unboxToInt(((IterableOnceOps) unifiedLog.logSegments().map(logSegment -> {
            return BoxesRunTime.boxToInteger(logSegment.size());
        })).sum(Numeric$IntIsIntegral$.MODULE$));
        Assertions.assertTrue(size > ((long) unboxToInt), new StringBuilder(57).append("log should have been compacted: startSize=").append(size).append(" compactedSize=").append(unboxToInt).toString());
    }

    private void checkLastCleaned(String str, int i, long j) {
        TopicPartition topicPartition = new TopicPartition(str, i);
        LogCleaner cleaner = cleaner();
        cleaner.awaitCleaned(topicPartition, j, cleaner.awaitCleaned$default$3());
        long unboxToLong = BoxesRunTime.unboxToLong(cleaner().cleanerManager().allCleanerCheckpoints().apply(topicPartition));
        Assertions.assertTrue(unboxToLong >= j, new StringBuilder(65).append("log cleaner should have processed up to offset ").append(j).append(", but lastCleaned=").append(unboxToLong).toString());
    }

    private void checkLogAfterAppendingDups(UnifiedLog unifiedLog, long j, Seq<Tuple3<Object, String, Object>> seq) {
        Assertions.assertEquals(toMap(seq), toMap(readFromLog(unifiedLog)), "Contents of the map shouldn't change");
        Assertions.assertTrue(j > unifiedLog.size());
    }

    private Map<Object, Tuple2<String, Object>> toMap(Iterable<Tuple3<Object, String, Object>> iterable) {
        return ((IterableOnceOps) iterable.map(tuple3 -> {
            if (tuple3 == null) {
                throw new MatchError((Object) null);
            }
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(tuple3._1()))), new Tuple2((String) tuple3._2(), BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(tuple3._3()))));
        })).toMap($less$colon$less$.MODULE$.refl());
    }

    private Iterable<Tuple3<Object, String, Object>> readFromLog(UnifiedLog unifiedLog) {
        return (Iterable) unifiedLog.logSegments().flatMap(logSegment -> {
            return (Iterable) CollectionConverters$.MODULE$.IterableHasAsScala(logSegment.log().records()).asScala().map(record -> {
                int int$extension = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(TestUtils$.MODULE$.readString(record.key(), TestUtils$.MODULE$.readString$default$2())));
                return new Tuple3(BoxesRunTime.boxToInteger(int$extension), TestUtils$.MODULE$.readString(record.value(), TestUtils$.MODULE$.readString$default$2()), BoxesRunTime.boxToLong(record.offset()));
            });
        });
    }

    private Seq<Tuple3<Object, String, Object>> writeDupsSingleMessageSet(int i, int i2, UnifiedLog unifiedLog, CompressionType compressionType, int i3, byte b) {
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i2).flatMap(obj -> {
            return $anonfun$writeDupsSingleMessageSet$1(this, i3, i, BoxesRunTime.unboxToInt(obj));
        });
        LogAppendInfo appendAsLeader = unifiedLog.appendAsLeader(MemoryRecords.withRecords(b, 0L, compressionType, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, (SimpleRecord[]) ((IndexedSeq) indexedSeq.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            return new SimpleRecord(Integer.toString(tuple2._1$mcI$sp()).getBytes(), ((String) tuple2._2()).getBytes());
        })).toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), 0, unifiedLog.appendAsLeader$default$3(), unifiedLog.appendAsLeader$default$4(), unifiedLog.appendAsLeader$default$5(), unifiedLog.appendAsLeader$default$6());
        unifiedLog.updateHighWatermark(unifiedLog.logEndOffset());
        return (Seq) ((IndexedSeqOps) indexedSeq.zip(new RichLong(Predef$.MODULE$.longWrapper(((LogOffsetMetadata) appendAsLeader.firstOffset().get()).messageOffset)).to(BoxesRunTime.boxToLong(appendAsLeader.lastOffset())))).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            Tuple2 tuple22 = (Tuple2) tuple22._1();
            return new Tuple3(BoxesRunTime.boxToInteger(tuple22._1$mcI$sp()), tuple22._2(), BoxesRunTime.boxToLong(tuple22._2$mcJ$sp()));
        });
    }

    private int writeDupsSingleMessageSet$default$5() {
        return 0;
    }

    private final Tuple2 runCleanerAndCheckCompacted$1(int i, Properties properties, CompressionType compressionType) {
        cleaner_$eq(makeCleaner(Predef$.MODULE$.wrapRefArray((Object[]) ArrayOps$.MODULE$.take$extension(Predef$.MODULE$.refArrayOps(topicPartitions()), 1)), makeCleaner$default$2(), makeCleaner$default$3(), 100L, makeCleaner$default$5(), makeCleaner$default$6(), makeCleaner$default$7(), makeCleaner$default$8(), makeCleaner$default$9(), makeCleaner$default$10(), properties));
        UnifiedLog unifiedLog = (UnifiedLog) cleaner().logs().get(topicPartitions()[0]);
        Seq<Tuple3<Object, String, Object>> writeDups = writeDups(i, 3, unifiedLog, compressionType, writeDups$default$5(), writeDups$default$6());
        long size = unifiedLog.size();
        unifiedLog.updateHighWatermark(unifiedLog.logEndOffset());
        long baseOffset = unifiedLog.activeSegment().baseOffset();
        cleaner().startup();
        checkLastCleaned("log", 0, baseOffset);
        int unboxToInt = BoxesRunTime.unboxToInt(((IterableOnceOps) unifiedLog.logSegments().map(logSegment -> {
            return BoxesRunTime.boxToInteger(logSegment.size());
        })).sum(Numeric$IntIsIntegral$.MODULE$));
        Assertions.assertTrue(size > ((long) unboxToInt), new StringBuilder(57).append("log should have been compacted: startSize=").append(size).append(" compactedSize=").append(unboxToInt).toString());
        return new Tuple2(unifiedLog, writeDups);
    }

    public static final /* synthetic */ boolean $anonfun$testCleansCombinedCompactAndDeleteTopic$3(UnifiedLog unifiedLog, long j) {
        return unifiedLog.logStartOffset() == j;
    }

    public static final /* synthetic */ String $anonfun$testCleansCombinedCompactAndDeleteTopic$4() {
        return "Timed out waiting for deletion of old segments";
    }

    private static final KafkaConfig kafkaConfigWithCleanerConfig$1(CleanerConfig cleanerConfig) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(0, "localhost:2181", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogCleanerThreadsProp(), Integer.toString(cleanerConfig.numThreads));
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), Long.toString(cleanerConfig.dedupeBufferSize));
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferLoadFactorProp(), Double.toString(cleanerConfig.dedupeBufferLoadFactor));
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogCleanerIoBufferSizeProp(), Integer.toString(cleanerConfig.ioBufferSize));
        createBrokerConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), Integer.toString(cleanerConfig.maxMessageSize));
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogCleanerBackoffMsProp(), Long.toString(cleanerConfig.backoffMs));
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogCleanerIoMaxBytesPerSecondProp(), Double.toString(cleanerConfig.maxIoBytesPerSecond));
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public static final /* synthetic */ Tuple2 $anonfun$writeDupsSingleMessageSet$2(LogCleanerParameterizedIntegrationTest logCleanerParameterizedIntegrationTest, int i) {
        String num = Integer.toString(logCleanerParameterizedIntegrationTest.counter());
        logCleanerParameterizedIntegrationTest.incCounter();
        return new Tuple2(BoxesRunTime.boxToInteger(i), num);
    }

    public static final /* synthetic */ IndexedSeq $anonfun$writeDupsSingleMessageSet$1(LogCleanerParameterizedIntegrationTest logCleanerParameterizedIntegrationTest, int i, int i2, int i3) {
        return RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(i), i + i2).map(obj -> {
            return $anonfun$writeDupsSingleMessageSet$2(logCleanerParameterizedIntegrationTest, BoxesRunTime.unboxToInt(obj));
        });
    }
}
