/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.log.AbstractLog;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.TestInfoUtils$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:shard_count:3")})
@ScalaSignature(bytes="\u0006\u0005\u00055c\u0001B\u0007\u000f\u0001MAQ\u0001\u0007\u0001\u0005\u0002eAqa\u0007\u0001C\u0002\u0013\u0005C\u0004\u0003\u0004$\u0001\u0001\u0006I!\b\u0005\nI\u0001\u0001\r\u0011!Q!\n\u0015BQ!\r\u0001\u0005BIBQ\u0001\u0010\u0001\u0005\nuBq!\u0015\u0001\u0012\u0002\u0013%!\u000bC\u0003^\u0001\u0011\u0005a\fC\u0003|\u0001\u0011\u0005A\u0010C\u0004\u0002\u0006\u0001!\t!a\u0002\t\u000f\u0005M\u0001\u0001\"\u0001\u0002\u0016!9\u0011\u0011\u0005\u0001\u0005\n\u0005\r\"AE\"mkN$XM\u001d'j].L5O\u001d+fgRT!a\u0004\t\u0002\t1Lgn\u001b\u0006\u0002#\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0015!\t)b#D\u0001\u000f\u0013\t9bB\u0001\u0012BEN$(/Y2u\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003i\u0001\"!\u0006\u0001\u0002#I,\u0007\u000f\\5dCRLwN\u001c$bGR|'/F\u0001\u001e!\tq\u0012%D\u0001 \u0015\u0005\u0001\u0013!B:dC2\f\u0017B\u0001\u0012 \u0005\u0015\u0019\u0006n\u001c:u\u0003I\u0011X\r\u001d7jG\u0006$\u0018n\u001c8GC\u000e$xN\u001d\u0011\u0002\u0013}#Xm\u001d;J]\u001a|\u0007C\u0001\u00140\u001b\u00059#B\u0001\u0015*\u0003\r\t\u0007/\u001b\u0006\u0003U-\nqA[;qSR,'O\u0003\u0002-[\u0005)!.\u001e8ji*\ta&A\u0002pe\u001eL!\u0001M\u0014\u0003\u0011Q+7\u000f^%oM>\fQa]3u+B$\"a\r\u001c\u0011\u0005y!\u0014BA\u001b \u0005\u0011)f.\u001b;\t\u000b]*\u0001\u0019A\u0013\u0002\u0011Q,7\u000f^%oM>D#!B\u001d\u0011\u0005\u0019R\u0014BA\u001e(\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u000eg\u0016$X\u000b]\"mkN$XM]:\u0015\u0005Mr\u0004bB \u0007!\u0003\u0005\r\u0001Q\u0001\u0018I\u0016\u001cHO\u0011:pW\u0016\u0014\bK]8q\u001fZ,'O]5eKN\u0004B!\u0011#G\r6\t!I\u0003\u0002D?\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u0015\u0013%aA'baB\u0011qI\u0014\b\u0003\u00112\u0003\"!S\u0010\u000e\u0003)S!a\u0013\n\u0002\rq\u0012xn\u001c;?\u0013\tiu$\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u001fB\u0013aa\u0015;sS:<'BA' \u0003]\u0019X\r^+q\u00072,8\u000f^3sg\u0012\"WMZ1vYR$\u0013'F\u0001TU\t\u0001EkK\u0001V!\t16,D\u0001X\u0015\tA\u0016,A\u0005v]\u000eDWmY6fI*\u0011!lH\u0001\u000bC:tw\u000e^1uS>t\u0017B\u0001/X\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u001bi\u0016\u001cH\u000fR3ti&t\u0017\r^5p]VsG-\u001a:NS:L5O\u001d\u000b\u0004g}\u000b\u0007\"\u00021\t\u0001\u00041\u0015AB9v_J,X\u000eC\u0003c\u0011\u0001\u00071-A\u0006d_>\u0014H-\u001b8bi>\u0014\bC\u0001\u0010e\u0013\t)wDA\u0004C_>dW-\u00198)\t!9WN\u001c\t\u0003Q.l\u0011!\u001b\u0006\u0003U&\na\u0001]1sC6\u001c\u0018B\u00017j\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001p\u0003!ZH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b' \u0018d_>\u0014H-\u001b8bi>\u0014Xh_\u0019~Q\u0011A\u0011o\u001e=\u0011\u0005I,X\"A:\u000b\u0005QL\u0017\u0001\u00039s_ZLG-\u001a:\n\u0005Y\u001c(\u0001D'fi\"|GmU8ve\u000e,\u0017!\u0002<bYV,G&A=\"\u0003i\fq\"\u00197m\u0007>l'-\u001b8bi&|gn]\u0001\u0016i\u0016\u001cHOU3ti\u0006\u0014H\u000fU1vg\u0016$G*\u001b8l)\r\u0019TP \u0005\u0006A&\u0001\rA\u0012\u0005\u0006E&\u0001\ra\u0019\u0015\u0005\u0013\u001dlg\u000eK\u0003\nc^\f\u0019\u0001L\u0001z\u0003i!Xm\u001d;NSJ\u0014xN\u001d'pG\u0006d'+\u001a9mS\u000e\fG/[8o)\u0015\u0019\u0014\u0011BA\u0006\u0011\u0015\u0001'\u00021\u0001G\u0011\u0015\u0011'\u00021\u0001dQ\u0011Qq-\u001c8)\u000b)\tx/!\u0005-\u0003e\fA\u0003^3ti6K'O]8s/&$\bn\u00148f\u0013N\u0014H#B\u001a\u0002\u0018\u0005e\u0001\"\u00021\f\u0001\u00041\u0005\"\u00022\f\u0001\u0004\u0019\u0007\u0006B\u0006h[:DSaC9x\u0003?a\u0013!_\u0001%kN,W\t_2mkNLg/\u001a'fC\u0012,'OR8s\u001b&\u0014(o\u001c:QCJ$\u0018\u000e^5p]R\u00191'!\n\t\u000f\u0005\u001dB\u00021\u0001\u0002*\u0005\u0011A\u000f\u001d\t\u0005\u0003W\t9$\u0004\u0002\u0002.)!\u0011qFA\u0019\u0003\u0019\u0019w.\\7p]*\u0019\u0011#a\r\u000b\u0007\u0005UR&\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0005\u0003s\tiC\u0001\bU_BL7\rU1si&$\u0018n\u001c8)\r\u0001\tid^A\"!\r1\u0013qH\u0005\u0004\u0003\u0003:#a\u0001+bO\u0006\u0012\u0011QI\u0001\fS:$Xm\u001a:bi&|g\u000e\u000b\u0004\u0001\u0003{9\u0018\u0011J\u0011\u0003\u0003\u0017\n1CY1{K2T4\u000f[1sI~\u001bw.\u001e8uuM\u0002")
public class ClusterLinkIsrTest
extends AbstractClusterLinkIntegrationTest {
    private final short replicationFactor = (short)3;
    private TestInfo _testInfo;

    @Override
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    private void setUpClusters(Map<String, String> destBrokerPropOverrides) {
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo) && this.sourceCluster() == null && this.destCluster() == null) {
            None$ x$4 = None$.MODULE$;
            this.sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, (Option<SecurityProtocol>)x$4, 0, 3));
            None$ x$8 = None$.MODULE$;
            this.destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, (Option<SecurityProtocol>)x$8, 100, 3));
        } else if (this.sourceCluster() == null && this.destCluster() == null) {
            None$ x$12 = None$.MODULE$;
            this.sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, (Option<SecurityProtocol>)x$12, 0, 3));
            None$ x$16 = None$.MODULE$;
            this.destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, (Option<SecurityProtocol>)x$16, 100, 3));
        }
        this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), "5000");
        this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2");
        destBrokerPropOverrides.foreach((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                String k = (String)x0$1._1();
                String v = (String)x0$1._2();
                return this.destCluster().serverConfig().setProperty(k, v);
            }
            throw new MatchError(null);
        });
        super.setUp(this._testInfo);
    }

    private Map<String, String> setUpClusters$default$1() {
        return (Map)Map$.MODULE$.empty();
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDestinationUnderMinIsr(String quorum, boolean coordinator) {
        this.setUpClusters((Map<String, String>)((Map)Map$.MODULE$.empty()));
        this.numPartitions_$eq(1);
        TopicPartition tp = (TopicPartition)this.partitions(this.partitions$default$1(), this.partitions$default$2(), this.partitions$default$3()).head();
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$22 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$22, x$3, x$4, x$5, x$6);
        Properties linkProps = new Properties();
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp(), "-1");
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.linkName();
        ClusterLinkTestHarness x$8 = this.sourceCluster();
        long x$10 = qual$2.createDestClusterLink$default$3();
        long x$11 = qual$2.createDestClusterLink$default$4();
        long x$12 = qual$2.createDestClusterLink$default$5();
        qual$2.createDestClusterLink(x$7, x$8, x$10, x$11, x$12, linkProps);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$13 = this.topic();
        short x$14 = this.replicationFactor();
        String x$15 = this.linkName();
        Map<String, String> x$16 = qual$3.linkTopic$default$4();
        String x$17 = qual$3.linkTopic$default$5();
        qual$3.linkTopic(x$13, x$14, x$15, x$16, x$17);
        this.produceToSourceCluster(2);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        KafkaBroker destLeader = this.destCluster().partitionLeader(tp);
        scala.collection.immutable.Map destFollowers = ((IterableOnceOps)((IterableOps)this.destCluster().brokers().filterNot((Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$1(destLeader, x$2)))).map((Function1 & Serializable)follower -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)follower.config().brokerId())), follower))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        destFollowers.foreach((Function1 & Serializable)follower -> {
            ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$3(this, follower);
            return BoxedUnit.UNIT;
        });
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$4(this, destLeader, tp)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Destination not under-min-isr with two brokers down");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.produceToSourceCluster(10);
        long destOffset = BoxesRunTime.unboxToLong((Object)this.logEndOffset(destLeader, tp).get());
        long sourceOffset = BoxesRunTime.unboxToLong((Object)this.logEndOffset(this.sourceCluster().partitionLeader(tp), tp).get());
        Assertions.assertTrue((sourceOffset > destOffset ? 1 : 0) != 0, (String)new StringBuilder(62).append("Records mirrored with under-min-isrs sourceOffset=").append(sourceOffset).append(" destOffset=").append(destOffset).toString());
        destFollowers.foreach((Function1 & Serializable)follower -> {
            ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$6(this, follower);
            return BoxedUnit.UNIT;
        });
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.produceToSourceCluster(10);
        String x$20 = this.topic();
        Seq<KafkaBroker> x$222 = this.verifyMirror$default$2();
        boolean x$23 = this.verifyMirror$default$3();
        this.verifyMirror(x$20, x$222, x$23, false);
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testRestartPausedLink(String quorum, boolean coordinator) {
        void var38_49;
        Tuple2 tuple2;
        this.setUpClusters((Map<String, String>)((Map)Map$.MODULE$.empty()));
        this.numPartitions_$eq(1);
        TopicPartition tp = (TopicPartition)this.partitions(this.partitions$default$1(), this.partitions$default$2(), this.partitions$default$3()).head();
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$32 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$62 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$32, x$4, x$5, x$62);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.linkName();
        ClusterLinkTestHarness x$8 = this.sourceCluster();
        long x$9 = qual$2.createDestClusterLink$default$3();
        long x$10 = qual$2.createDestClusterLink$default$4();
        long x$11 = qual$2.createDestClusterLink$default$5();
        Properties x$12 = qual$2.createDestClusterLink$default$6();
        Uuid linkId = qual$2.createDestClusterLink(x$7, x$8, x$9, x$10, x$11, x$12);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$13 = this.topic();
        short x$14 = this.replicationFactor();
        String x$15 = this.linkName();
        Map<String, String> x$16 = qual$3.linkTopic$default$4();
        String x$17 = qual$3.linkTopic$default$5();
        qual$3.linkTopic(x$13, x$14, x$15, x$16, x$17);
        this.produceToSourceCluster(2);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        ObjectRef destLeader = ObjectRef.create((Object)this.destCluster().partitionLeader(tp));
        Buffer buffer = (Buffer)this.destCluster().brokers().filterNot((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$1(destLeader, x$3)));
        this.destCluster().shutdownBroker(((KafkaBroker)buffer.head()).config().brokerId());
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void waitUntilTrue_pause;
            this.produceToSourceCluster(2);
            if (((Partition)((KafkaBroker)destLeader.elem).replicaManager().onlinePartition(tp).get()).isUnderReplicated()) break;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Destination not under-replicated with a broker down");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.waitForMirror((Seq<KafkaBroker>)new .colon.colon((Object)((KafkaBroker)destLeader.elem), (List)Nil$.MODULE$), this.waitForMirror$default$2());
        Option<Object> endOffset = this.logEndOffset((KafkaBroker)destLeader.elem, tp);
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$18 = this.linkName();
        Map x$19 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"true")}));
        .colon.colon x$20 = new .colon.colon((Object)((KafkaBroker)destLeader.elem), (List)Nil$.MODULE$);
        Set<String> x$21 = qual$4.alterClusterLink$default$4();
        boolean x$22 = qual$4.alterClusterLink$default$5();
        qual$4.alterClusterLink(x$18, (Map<String, String>)x$19, (Seq<KafkaBroker>)x$20, x$21, x$22);
        long l2 = 100L;
        long waitUntilTrue_waitTimeMs2 = 15000L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testRestartPausedLink$4(this, linkId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                Assertions.fail((String)"ClusterLink is not paused in one or more brokers.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
        }
        this.produceToSourceCluster(2);
        this.destCluster().shutdownBroker(((KafkaBroker)buffer.apply(1)).config().brokerId());
        this.destCluster().shutdownBroker(((KafkaBroker)destLeader.elem).config().brokerId());
        this.destCluster().startBroker(((KafkaBroker)destLeader.elem).config().brokerId());
        destLeader.elem = this.destCluster().partitionLeader(tp);
        long l3 = 100L;
        long waitUntilTrue_waitTimeMs3 = 15000L;
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (true) {
            void waitUntilTrue_pause;
            boolean bl;
            try {
                bl = !((KafkaBroker)destLeader.elem).replicaManager().getPartitionOrException(tp).isBlockedOnMirrorSource();
            }
            catch (Exception exception) {
                bl = false;
            }
            if (bl) break;
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + waitUntilTrue_waitTimeMs3) {
                Assertions.fail((String)"Paused partition should not be blocked on source");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs3), (long)waitUntilTrue_pause));
        }
        Assertions.assertEquals(endOffset, this.logEndOffset((KafkaBroker)destLeader.elem, tp));
        buffer = (Buffer)buffer.map((Function1 & Serializable)follower -> this.destCluster().startBroker(follower.config().brokerId()));
        .colon.colon expectedOffsets = new .colon.colon(endOffset, (List)new .colon.colon(endOffset, (List)Nil$.MODULE$));
        long l4 = 100L;
        long computeUntilTrue_waitTime = 15000L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            Buffer computeUntilTrue_result;
            if (ClusterLinkIsrTest.$anonfun$testRestartPausedLink$13((scala.collection.immutable.Seq)expectedOffsets, computeUntilTrue_result = (Buffer)buffer.map((Function1 & Serializable)x$6 -> this.logEndOffset((KafkaBroker)x$6, tp$2)))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        Object var72_47 = null;
        Tuple2 tuple22 = tuple2;
        if (tuple22 == null) {
            throw new MatchError(null);
        }
        Buffer offsets = (Buffer)tuple22._1();
        Assertions.assertEquals((Object)expectedOffsets, (Object)var38_49);
        long l5 = 100L;
        long waitUntilTrue_waitTimeMs4 = 15000L;
        long waitUntilTrue_startTime4 = System.currentTimeMillis();
        while (!(!((Partition)((KafkaBroker)destLeader.elem).replicaManager().onlinePartition(tp).get()).isUnderReplicated())) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime4 + waitUntilTrue_waitTimeMs4) {
                Assertions.fail((String)"Destination follower unable to join ISR with paused link");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs4), (long)waitUntilTrue_pause));
        }
        ClusterLinkTestHarness qual$5 = this.destCluster();
        String x$23 = this.linkName();
        Map x$24 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"false")}));
        Seq<KafkaBroker> x$25 = qual$5.alterClusterLink$default$3();
        Set<String> x$26 = qual$5.alterClusterLink$default$4();
        boolean x$27 = qual$5.alterClusterLink$default$5();
        qual$5.alterClusterLink(x$23, (Map<String, String>)x$24, x$25, x$26, x$27);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.produceToSourceCluster(2);
        String x$28 = this.topic();
        Seq<KafkaBroker> x$30 = this.verifyMirror$default$2();
        boolean x$31 = this.verifyMirror$default$3();
        this.verifyMirror(x$28, x$30, x$31, false);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorLocalReplication(String quorum, boolean coordinator) {
        this.setUpClusters((Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)KafkaConfig$.MODULE$.ReplicaFetchWaitMaxMsProp()), (Object)"60000"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp()), (Object)"60000"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)KafkaConfig$.MODULE$.ReplicaSocketTimeoutMsProp()), (Object)"120000"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp()), (Object)"1"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"confluent.cluster.link.metadata.topic.partitions"), (Object)"1")}))));
        this.numPartitions_$eq(1);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Uuid linkId = this.createClusterLink(this.linkName(), this.destLinkProps(this.destLinkProps$default$1()), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        this.useExclusiveLeaderForMirrorPartition(new TopicPartition(this.topic(), 0));
        this.produceToSourceCluster(20);
        this.waitAndVerifyMetricsAndMirror(this.topic(), linkId, false);
        this.destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, this.topic(), this.numPartitions());
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$12 = this.linkName();
        boolean x$13 = qual$3.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$14 = qual$3.deleteClusterLink$default$3();
        qual$3.deleteClusterLink(x$12, x$13, x$14);
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMirrorWithOneIsr(String quorum, boolean coordinator) {
        this.setUpClusters((Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp()), (Object)"1"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"confluent.cluster.link.metadata.topic.partitions"), (Object)"1")}))));
        this.numPartitions_$eq(1);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        this.createClusterLink(this.linkName(), this.destLinkProps(this.destLinkProps$default$1()), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        String x$9 = this.linkName();
        Map x$10 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"min.insync.replicas"), (Object)"1")}));
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, (short)1, x$9, (Map<String, String>)x$10, x$11);
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        this.useExclusiveLeaderForMirrorPartition(tp);
        int numRecords = 20;
        Properties consumerProps = new Properties();
        consumerProps.setProperty("fetch.max.wait.ms", "60000");
        ClusterLinkTestHarness qual$3 = this.destCluster();
        ByteArrayDeserializer x$13 = qual$3.createConsumer$default$1();
        ByteArrayDeserializer x$14 = qual$3.createConsumer$default$2();
        List<String> x$15 = qual$3.createConsumer$default$4();
        Consumer consumer = qual$3.createConsumer(x$13, x$14, consumerProps, x$15);
        consumer.assign(Collections.singleton(tp));
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testMirrorWithOneIsr$1(this, consumer, tp)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Fetch request not sent within timeout");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.produceToSourceCluster(numRecords);
        this.consumeRecords(consumer, this.consumeRecords$default$2(), this.consumeRecords$default$3());
        Assertions.assertEquals((long)numRecords, (long)((AbstractLog)this.destCluster().partitionLeader(tp).replicaManager().localLog(tp).get()).highWatermark());
        consumer.close();
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$16 = this.topic();
        String x$17 = this.linkName();
        int x$18 = this.numPartitions();
        boolean x$19 = qual$4.unlinkTopic$default$3();
        boolean x$20 = qual$4.unlinkTopic$default$4();
        boolean x$21 = qual$4.unlinkTopic$default$5();
        qual$4.unlinkTopic(x$16, x$17, x$19, x$20, x$21, x$18);
        ClusterLinkTestHarness qual$5 = this.destCluster();
        String x$22 = this.linkName();
        boolean x$23 = qual$5.deleteClusterLink$default$2();
        Seq<KafkaBroker> x$24 = qual$5.deleteClusterLink$default$3();
        qual$5.deleteClusterLink(x$22, x$23, x$24);
    }

    private void useExclusiveLeaderForMirrorPartition(TopicPartition tp) {
        this.destCluster().partitionLeader(tp).replicaManager().leaderPartitionsIterator().map((Function1 & Serializable)x$8 -> x$8.topicPartition()).filter((Function1 & Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$useExclusiveLeaderForMirrorPartition$2(tp, x$9))).foreach(arg_0 -> ClusterLinkIsrTest.$anonfun$useExclusiveLeaderForMirrorPartition$3$adapted(this.destCluster(), arg_0));
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$1(KafkaBroker destLeader$1, KafkaBroker x$2) {
        KafkaBroker kafkaBroker = x$2;
        return !(kafkaBroker != null ? !kafkaBroker.equals(destLeader$1) : destLeader$1 != null);
    }

    public static final /* synthetic */ void $anonfun$testDestinationUnderMinIsr$3(ClusterLinkIsrTest $this, Tuple2 follower) {
        $this.destCluster().killBrokerById(follower._1$mcI$sp());
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$4(ClusterLinkIsrTest $this, KafkaBroker destLeader$1, TopicPartition tp$1) {
        $this.produceToSourceCluster(2);
        return ((Partition)destLeader$1.replicaManager().onlinePartition(tp$1).get()).isUnderMinIsr();
    }

    public static final /* synthetic */ String $anonfun$testDestinationUnderMinIsr$5() {
        return "Destination not under-min-isr with two brokers down";
    }

    public static final /* synthetic */ void $anonfun$testDestinationUnderMinIsr$6(ClusterLinkIsrTest $this, Tuple2 follower) {
        ClusterLinkTestHarness qual$4 = $this.destCluster();
        int x$18 = follower._1$mcI$sp();
        boolean x$19 = qual$4.restartDeadBrokerById$default$2();
        qual$4.restartDeadBrokerById(x$18, x$19);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$1(ObjectRef destLeader$2, KafkaBroker x$3) {
        KafkaBroker kafkaBroker = x$3;
        KafkaBroker kafkaBroker2 = (KafkaBroker)destLeader$2.elem;
        return !(kafkaBroker != null ? !kafkaBroker.equals(kafkaBroker2) : kafkaBroker2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$2(ClusterLinkIsrTest $this, ObjectRef destLeader$2, TopicPartition tp$2) {
        $this.produceToSourceCluster(2);
        return ((Partition)((KafkaBroker)destLeader$2.elem).replicaManager().onlinePartition(tp$2).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$3() {
        return "Destination not under-replicated with a broker down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$6(ClusterLinkFactory.ConnectionManager x$5) {
        return !x$5.active();
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$5(Uuid linkId$1, KafkaBroker x$4) {
        return x$4.clusterLinkManager().connectionManager(linkId$1).exists((Function1 & Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$6(x$5)));
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$4(ClusterLinkIsrTest $this, Uuid linkId$1) {
        return $this.destCluster().aliveServers().forall((Function1 & Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$5(linkId$1, x$4)));
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$7() {
        return "ClusterLink is not paused in one or more brokers.";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$8(ObjectRef destLeader$2, TopicPartition tp$2) {
        boolean bl;
        try {
            if (((KafkaBroker)destLeader$2.elem).replicaManager().getPartitionOrException(tp$2).isBlockedOnMirrorSource()) {
                return false;
            }
            bl = true;
        }
        catch (Exception exception) {
            bl = false;
        }
        return bl;
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$9() {
        return "Paused partition should not be blocked on source";
    }

    public static final /* synthetic */ Buffer $anonfun$testRestartPausedLink$11(ClusterLinkIsrTest $this, ObjectRef destFollowers$1, TopicPartition tp$2) {
        return (Buffer)((Buffer)destFollowers$1.elem).map((Function1 & Serializable)x$6 -> this.logEndOffset((KafkaBroker)x$6, tp$2));
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$13(scala.collection.immutable.Seq expectedOffsets$1, Buffer x$7) {
        Buffer buffer = x$7;
        return !(buffer != null ? !buffer.equals(expectedOffsets$1) : expectedOffsets$1 != null);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$14(ObjectRef destLeader$2, TopicPartition tp$2) {
        return !((Partition)((KafkaBroker)destLeader$2.elem).replicaManager().onlinePartition(tp$2).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$15() {
        return "Destination follower unable to join ISR with paused link";
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithOneIsr$1(ClusterLinkIsrTest $this, Consumer consumer$1, TopicPartition tp$3) {
        consumer$1.poll(Duration.ofMillis(1L));
        return $this.destCluster().partitionLeader(tp$3).replicaManager().delayedFetchPurgatory().numDelayed() > 0;
    }

    public static final /* synthetic */ String $anonfun$testMirrorWithOneIsr$2() {
        return "Fetch request not sent within timeout";
    }

    public static final /* synthetic */ boolean $anonfun$useExclusiveLeaderForMirrorPartition$2(TopicPartition tp$4, TopicPartition x$9) {
        TopicPartition topicPartition = x$9;
        return topicPartition == null ? tp$4 != null : !topicPartition.equals(tp$4);
    }

    public static final /* synthetic */ Object $anonfun$testRestartPausedLink$13$adapted(scala.collection.immutable.Seq expectedOffsets$1, Buffer x$7) {
        return BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$13(expectedOffsets$1, x$7));
    }

    public static final /* synthetic */ Object $anonfun$useExclusiveLeaderForMirrorPartition$3$adapted(ClusterLinkTestHarness eta$0$1$1, TopicPartition tp) {
        return BoxesRunTime.boxToInteger((int)eta$0$1$1.changeLeader(tp));
    }
}

