/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Properties;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.link.ClusterLinkTestHarness$;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.utils.TestInfoUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0001\u0005ma\u0001\u0002\u0006\f\u0001AAQ!\u0006\u0001\u0005\u0002YA\u0011\u0002\u0007\u0001A\u0002\u0003\u0005\u000b\u0015B\r\t\u000b\u0015\u0002A\u0011\t\u0014\t\u000bM\u0002A\u0011\u0002\u001b\t\u000fu\u0002\u0011\u0013!C\u0005}!)\u0011\n\u0001C\u0001\u0015\")q\u000e\u0001C\u0001a\")Q\u000f\u0001C\u0005m\"9\u00111\u0001\u0001\u0005\n\u0005\u0015!\u0001G\"p[B\f7\r^3e\u001b&\u0014(o\u001c:U_BL7\rV3ti*\u0011A\"D\u0001\u0005Y&t7NC\u0001\u000f\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\t\u0011\u0005I\u0019R\"A\u0006\n\u0005QY!AI!cgR\u0014\u0018m\u0019;DYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002/A\u0011!\u0003A\u0001\n?R,7\u000f^%oM>\u0004\"AG\u0012\u000e\u0003mQ!\u0001H\u000f\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\u001f?\u00059!.\u001e9ji\u0016\u0014(B\u0001\u0011\"\u0003\u0015QWO\\5u\u0015\u0005\u0011\u0013aA8sO&\u0011Ae\u0007\u0002\t)\u0016\u001cH/\u00138g_\u0006)1/\u001a;VaR\u0011q%\f\t\u0003Q-j\u0011!\u000b\u0006\u0002U\u0005)1oY1mC&\u0011A&\u000b\u0002\u0005+:LG\u000fC\u0003/\u0007\u0001\u0007\u0011$\u0001\u0005uKN$\u0018J\u001c4pQ\t\u0019\u0001\u0007\u0005\u0002\u001bc%\u0011!g\u0007\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017!D:fiV\u00038\t\\;ti\u0016\u00148\u000f\u0006\u0002(k!9a\u0007\u0002I\u0001\u0002\u00049\u0014aD2mK\u0006tWM]%oi\u0016\u0014h/\u00197\u0011\u0007!B$(\u0003\u0002:S\t1q\n\u001d;j_:\u0004\"\u0001K\u001e\n\u0005qJ#aA%oi\u000692/\u001a;Va\u000ecWo\u001d;feN$C-\u001a4bk2$H%M\u000b\u0002\u007f)\u0012q\u0007Q\u0016\u0002\u0003B\u0011!iR\u0007\u0002\u0007*\u0011A)R\u0001\nk:\u001c\u0007.Z2lK\u0012T!AR\u0015\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0002I\u0007\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u00021Q,7\u000f^\"p[B\f7\r^3e\u001b&\u0014(o\u001c:U_BL7\r\u0006\u0002(\u0017\")AJ\u0002a\u0001\u001b\u00061\u0011/^8sk6\u0004\"AT+\u000f\u0005=\u001b\u0006C\u0001)*\u001b\u0005\t&B\u0001*\u0010\u0003\u0019a$o\\8u}%\u0011A+K\u0001\u0007!J,G-\u001a4\n\u0005Y;&AB*ue&twM\u0003\u0002US!\"a!W1c!\tQv,D\u0001\\\u0015\taV,\u0001\u0005qe>4\u0018\u000eZ3s\u0015\tqV$\u0001\u0004qCJ\fWn]\u0005\u0003An\u00131BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cHFA2fC\u0005!\u0017A\u0001>lC\u00051\u0017!B6sC\u001a$\b\u0006\u0002\u0004iY6\u0004\"!\u001b6\u000e\u0003uK!a[/\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017%\u00018\u00021m$\u0017n\u001d9mCft\u0015-\\3~]E,xN];n{m\u0004T0A\u0014uKN$8i\\7qC\u000e$X\rZ'jeJ|'\u000fV8qS\u000e<\u0016\u000e\u001e5M_\u001e\u001cE.Z1oS:<GCA\u0014r\u0011\u0015au\u00011\u0001NQ\u00119\u0011,Y:-\u0005\r,\u0007\u0006B\u0004iY6\f!c^1ji\u001a{'\u000fT8h\u00072,\u0017M\\5oOR\u0019qe\u001e?\t\u000baD\u0001\u0019A=\u0002\u000f\rdWo\u001d;feB\u0011!C_\u0005\u0003w.\u0011ac\u00117vgR,'\u000fT5oWR+7\u000f\u001e%be:,7o\u001d\u0005\u0006{\"\u0001\rA`\u0001\u0007_\u001a47/\u001a;\u0011\u0005!z\u0018bAA\u0001S\t!Aj\u001c8h\u00039\t\u0007\u000f]3oI\u0006#xJ\u001a4tKR$RaJA\u0004\u0003\u0017Aa!!\u0003\n\u0001\u0004Q\u0014!\u00039beRLG/[8o\u0011\u0015i\u0018\u00021\u0001\u007fQ\u001d\u0001\u0011qBA\u000b\u0003/\u00012AGA\t\u0013\r\t\u0019b\u0007\u0002\u0004)\u0006<\u0017!\u0002<bYV,\u0017EAA\r\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8")
public class CompactedMirrorTopicTest
extends AbstractClusterLinkIntegrationTest {
    private TestInfo _testInfo;

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo) && this.sourceCluster() == null && this.destCluster() == null) {
            SecurityProtocol x$1 = SecurityProtocol.PLAINTEXT;
            int x$2 = 0;
            Option<SecurityProtocol> x$3 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2();
            int x$4 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4();
            this.sourceCluster_$eq(new ClusterLinkTestHarness(x$1, x$3, x$2, x$4));
            SecurityProtocol x$5 = SecurityProtocol.PLAINTEXT;
            int x$6 = 100;
            Option<SecurityProtocol> x$7 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2();
            int x$8 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4();
            this.destCluster_$eq(new ClusterLinkTestHarness(x$5, x$7, x$6, x$8));
            return;
        }
        if (this.sourceCluster() == null && this.destCluster() == null) {
            SecurityProtocol x$9 = SecurityProtocol.SASL_SSL;
            int x$10 = 0;
            Option<SecurityProtocol> x$11 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2();
            int x$12 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4();
            this.sourceCluster_$eq(new ClusterLinkTestHarness(x$9, x$11, x$10, x$12));
            SecurityProtocol x$13 = SecurityProtocol.SASL_PLAINTEXT;
            int x$14 = 100;
            Option<SecurityProtocol> x$15 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2();
            int x$16 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4();
            this.destCluster_$eq(new ClusterLinkTestHarness(x$13, x$15, x$14, x$16));
        }
    }

    private void setUpClusters(Option<Object> cleanerInterval) {
        this.sourceCluster().producerConfig().setProperty("batch.size", "150");
        cleanerInterval.foreach((Function1 & Serializable & scala.Serializable)intervalMs -> CompactedMirrorTopicTest.$anonfun$setUpClusters$1(this, BoxesRunTime.unboxToInt((Object)intervalMs)));
        super.setUp(this._testInfo);
        this.numPartitions_$eq(2);
        Properties topicProps = new Properties();
        topicProps.put("cleanup.policy", "compact");
        topicProps.put("segment.bytes", "1000");
        topicProps.put("min.compaction.lag.ms", "5");
        topicProps.put("max.compaction.lag.ms", "2000");
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, topicProps, x$5, x$6);
    }

    private Option<Object> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testCompactedMirrorTopic(String quorum) {
        this.setUpClusters(this.setUpClusters$default$1());
        this.produceToSourceCluster(10);
        this.appendRecords$1(100L);
        this.appendRecords$1(0xFFFFFFFEL);
        ClusterLinkTestHarness qual$1 = this.destCluster();
        String x$1 = this.linkName();
        ClusterLinkTestHarness x$2 = this.sourceCluster();
        long x$3 = qual$1.createDestClusterLink$default$3();
        long x$4 = qual$1.createDestClusterLink$default$4();
        long x$5 = qual$1.createDestClusterLink$default$5();
        Properties x$6 = qual$1.createDestClusterLink$default$6();
        qual$1.createDestClusterLink(x$1, x$2, x$3, x$4, x$5, x$6);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.appendRecords$1(10L);
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testCompactedMirrorTopicWithLogCleaning(String quorum) {
        this.setUpClusters((Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        ClusterLinkTestHarness qual$1 = this.destCluster();
        String x$1 = this.linkName();
        ClusterLinkTestHarness x$2 = this.sourceCluster();
        long x$3 = qual$1.createDestClusterLink$default$3();
        long x$4 = qual$1.createDestClusterLink$default$4();
        long x$5 = qual$1.createDestClusterLink$default$5();
        Properties x$6 = qual$1.createDestClusterLink$default$6();
        qual$1.createDestClusterLink(x$1, x$2, x$3, x$4, x$5, x$6);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        ClusterLinkTestHarness qual$3 = this.sourceCluster();
        ByteArraySerializer x$12 = qual$3.createProducer$default$1();
        ByteArraySerializer x$13 = qual$3.createProducer$default$2();
        Properties x$14 = qual$3.createProducer$default$3();
        KafkaProducer producer = qual$3.createProducer(x$12, x$13, x$14);
        this.produceRecords(producer, this.topic(), 500, (Function1<Object, String>)(Function1 & Serializable & scala.Serializable)index -> CompactedMirrorTopicTest.$anonfun$testCompactedMirrorTopicWithLogCleaning$1(BoxesRunTime.unboxToInt((Object)index)));
        this.waitForLogCleaning(this.sourceCluster(), 400L);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.waitForLogCleaning(this.destCluster(), 400L);
    }

    private void waitForLogCleaning(ClusterLinkTestHarness cluster, long offset) {
        this.partitions(this.partitions$default$1()).foreach((Function1 & Serializable & scala.Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)CompactedMirrorTopicTest.$anonfun$waitForLogCleaning$1(cluster, offset, tp)));
    }

    private void appendAtOffset(int partition, long offset) {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        ByteBufferOutputStream bytesOut = new ByteBufferOutputStream(buf);
        long nowMs = System.currentTimeMillis();
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bytesOut, 2, CompressionType.NONE, TimestampType.CREATE_TIME, offset, nowMs, 0L, (short)0, 0, false, false, 0, buf.capacity());
        byte[] key = new StringBuilder(4).append("key-").append(offset).toString().getBytes();
        byte[] value = new StringBuilder(6).append("value-").append(offset).toString().getBytes();
        builder.append(nowMs, key, value);
        MemoryRecords records = builder.build();
        TopicPartition tp = new TopicPartition(this.topic(), partition);
        LogManager qual$1 = this.sourceCluster().partitionLeader(tp).logManager();
        boolean x$2 = qual$1.getLog$default$2();
        ((AbstractLog)qual$1.getLog(tp, x$2).get()).appendAsFollower(records);
        this.producedRecords().$plus$eq((Object)new AbstractClusterLinkIntegrationTest.SourceRecord(this, this.topic(), partition, key, value, offset));
    }

    public static final /* synthetic */ Object $anonfun$setUpClusters$1(CompactedMirrorTopicTest $this, int intervalMs) {
        $this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(intervalMs));
        $this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(intervalMs));
        return $this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.LogDeleteDelayMsProp(), Integer.toString(intervalMs));
    }

    private final void appendRecords$1(long offsetInterval) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.numPartitions()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> {
            long startOffset = this.nextOffset(partition);
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.appendAtOffset(partition, startOffset + (long)i * offsetInterval + (long)partition));
        });
    }

    public static final /* synthetic */ String $anonfun$testCompactedMirrorTopicWithLogCleaning$1(int index) {
        return new StringBuilder(4).append("key ").append(index % 5).toString();
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogCleaning$1(ClusterLinkTestHarness cluster$1, long offset$1, TopicPartition tp) {
        return cluster$1.partitionLeader(tp).logManager().cleaner().awaitCleaned(tp, offset$1, 15000L);
    }
}

