/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0001\u00054A!\u0003\u0006\u0001\u001f!)A\u0003\u0001C\u0001+!)q\u0003\u0001C!1!)A\u0006\u0001C\u0005[!9a\u0007AI\u0001\n\u00139\u0004\"\u0002\"\u0001\t\u0003A\u0002\"B$\u0001\t\u0003A\u0002\"B%\u0001\t\u0013Q\u0005\"B+\u0001\t\u00131&\u0001G\"p[B\f7\r^3e\u001b&\u0014(o\u001c:U_BL7\rV3ti*\u00111\u0002D\u0001\u0005Y&t7NC\u0001\u000e\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\t\u0011\u0005E\u0011R\"\u0001\u0006\n\u0005MQ!AI!cgR\u0014\u0018m\u0019;DYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002-A\u0011\u0011\u0003A\u0001\u0006g\u0016$X\u000b\u001d\u000b\u00023A\u0011!$H\u0007\u00027)\tA$A\u0003tG\u0006d\u0017-\u0003\u0002\u001f7\t!QK\\5uQ\t\u0011\u0001\u0005\u0005\u0002\"U5\t!E\u0003\u0002$I\u0005\u0019\u0011\r]5\u000b\u0005\u00152\u0013a\u00026va&$XM\u001d\u0006\u0003O!\nQA[;oSRT\u0011!K\u0001\u0004_J<\u0017BA\u0016#\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u000eg\u0016$X\u000b]\"mkN$XM]:\u0015\u0005eq\u0003bB\u0018\u0004!\u0003\u0005\r\u0001M\u0001\u0010G2,\u0017M\\3s\u0013:$XM\u001d<bYB\u0019!$M\u001a\n\u0005IZ\"AB(qi&|g\u000e\u0005\u0002\u001bi%\u0011Qg\u0007\u0002\u0004\u0013:$\u0018aF:fiV\u00038\t\\;ti\u0016\u00148\u000f\n3fM\u0006,H\u000e\u001e\u00132+\u0005A$F\u0001\u0019:W\u0005Q\u0004CA\u001eA\u001b\u0005a$BA\u001f?\u0003%)hn\u00195fG.,GM\u0003\u0002@7\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\u0005\u0005c$!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006AB/Z:u\u0007>l\u0007/Y2uK\u0012l\u0015N\u001d:peR{\u0007/[2)\u0005\u0015!\u0005CA\u0011F\u0013\t1%E\u0001\u0003UKN$\u0018a\n;fgR\u001cu.\u001c9bGR,G-T5se>\u0014Hk\u001c9jG^KG\u000f\u001b'pO\u000ecW-\u00198j]\u001eD#A\u0002#\u0002%]\f\u0017\u000e\u001e$pe2{wm\u00117fC:Lgn\u001a\u000b\u00043-\u0003\u0006\"\u0002'\b\u0001\u0004i\u0015aB2mkN$XM\u001d\t\u0003#9K!a\u0014\u0006\u0003-\rcWo\u001d;fe2Kgn\u001b+fgRD\u0015M\u001d8fgNDQ!U\u0004A\u0002I\u000baa\u001c4gg\u0016$\bC\u0001\u000eT\u0013\t!6D\u0001\u0003M_:<\u0017AD1qa\u0016tG-\u0011;PM\u001a\u001cX\r\u001e\u000b\u00043]K\u0006\"\u0002-\t\u0001\u0004\u0019\u0014!\u00039beRLG/[8o\u0011\u0015\t\u0006\u00021\u0001SQ\u0011\u00011LX0\u0011\u0005\u0005b\u0016BA/#\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0002A\u0006Y\u0011N\u001c;fOJ\fG/[8o\u0001")
public class CompactedMirrorTopicTest
extends AbstractClusterLinkIntegrationTest {
    @Override
    @BeforeEach
    public void setUp() {
    }

    private void setUpClusters(Option<Object> cleanerInterval) {
        this.sourceCluster().producerConfig().setProperty("batch.size", "150");
        cleanerInterval.foreach((Function1 & Serializable & scala.Serializable)intervalMs -> CompactedMirrorTopicTest.$anonfun$setUpClusters$1(this, BoxesRunTime.unboxToInt((Object)intervalMs)));
        super.setUp();
        this.numPartitions_$eq(2);
        Properties topicProps = new Properties();
        topicProps.put("cleanup.policy", "compact");
        topicProps.put("segment.bytes", "1000");
        topicProps.put("min.compaction.lag.ms", "5");
        topicProps.put("max.compaction.lag.ms", "2000");
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), topicProps);
    }

    private Option<Object> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    @Test
    public void testCompactedMirrorTopic() {
        this.setUpClusters(this.setUpClusters$default$1());
        this.produceToSourceCluster(10);
        this.appendRecords$1(100L);
        this.appendRecords$1(0xFFFFFFFEL);
        this.destCluster().createDestClusterLink(this.linkName(), this.sourceCluster(), this.destCluster().createDestClusterLink$default$3(), this.destCluster().createDestClusterLink$default$4(), this.destCluster().createDestClusterLink$default$5(), this.destCluster().createDestClusterLink$default$6());
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.appendRecords$1(10L);
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3());
    }

    @Test
    public void testCompactedMirrorTopicWithLogCleaning() {
        this.setUpClusters((Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        this.destCluster().createDestClusterLink(this.linkName(), this.sourceCluster(), this.destCluster().createDestClusterLink$default$3(), this.destCluster().createDestClusterLink$default$4(), this.destCluster().createDestClusterLink$default$5(), this.destCluster().createDestClusterLink$default$6());
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        KafkaProducer producer = this.sourceCluster().createProducer(this.sourceCluster().createProducer$default$1(), this.sourceCluster().createProducer$default$2(), this.sourceCluster().createProducer$default$3());
        this.produceRecords(producer, this.topic(), 500, (Function1<Object, String>)(Function1 & Serializable & scala.Serializable)index -> CompactedMirrorTopicTest.$anonfun$testCompactedMirrorTopicWithLogCleaning$1(BoxesRunTime.unboxToInt((Object)index)));
        this.waitForLogCleaning(this.sourceCluster(), 400L);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.waitForLogCleaning(this.destCluster(), 400L);
    }

    private void waitForLogCleaning(ClusterLinkTestHarness cluster, long offset) {
        this.partitions().foreach((Function1 & Serializable & scala.Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)CompactedMirrorTopicTest.$anonfun$waitForLogCleaning$1(cluster, offset, tp)));
    }

    private void appendAtOffset(int partition, long offset) {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        ByteBufferOutputStream bytesOut = new ByteBufferOutputStream(buf);
        long nowMs = System.currentTimeMillis();
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bytesOut, 2, CompressionType.NONE, TimestampType.CREATE_TIME, offset, nowMs, 0L, (short)0, 0, false, false, 0, buf.capacity());
        byte[] key = new StringBuilder(4).append("key-").append(offset).toString().getBytes();
        byte[] value = new StringBuilder(6).append("value-").append(offset).toString().getBytes();
        builder.append(nowMs, key, value);
        MemoryRecords records = builder.build();
        TopicPartition tp = new TopicPartition(this.topic(), partition);
        LogManager qual$1 = this.sourceCluster().partitionLeader(tp).logManager();
        boolean x$2 = qual$1.getLog$default$2();
        ((AbstractLog)qual$1.getLog(tp, x$2).get()).appendAsFollower(records);
        this.producedRecords().$plus$eq((Object)new AbstractClusterLinkIntegrationTest.SourceRecord(this, this.topic(), partition, key, value, offset));
    }

    public static final /* synthetic */ Object $anonfun$setUpClusters$1(CompactedMirrorTopicTest $this, int intervalMs) {
        $this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(intervalMs));
        $this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(intervalMs));
        return $this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogDeleteDelayMsProp(), Integer.toString(intervalMs));
    }

    private final void appendRecords$1(long offsetInterval) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.numPartitions()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> {
            long startOffset = this.nextOffset(partition);
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.appendAtOffset(partition, startOffset + (long)i * offsetInterval + (long)partition));
        });
    }

    public static final /* synthetic */ String $anonfun$testCompactedMirrorTopicWithLogCleaning$1(int index) {
        return new StringBuilder(4).append("key ").append(index % 5).toString();
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogCleaning$1(ClusterLinkTestHarness cluster$1, long offset$1, TopicPartition tp) {
        return cluster$1.partitionLeader(tp).logManager().cleaner().awaitCleaned(tp, offset$1, 15000L);
    }
}

