/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.link.ClusterLinkTestHarness$;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0001)3Aa\u0003\u0007\u0001#!)a\u0003\u0001C\u0001/!9\u0011\u0004\u0001b\u0001\n\u0003R\u0002B\u0002\u0010\u0001A\u0003%1\u0004C\u0004 \u0001\t\u0007I\u0011\t\u000e\t\r\u0001\u0002\u0001\u0015!\u0003\u001c\u0011\u001d\t\u0003A1A\u0005B\tBa!\u000b\u0001!\u0002\u0013\u0019\u0003\"\u0002\u0016\u0001\t\u0003Z\u0003\"\u0002\u001f\u0001\t\u0003Y\u0003\"B!\u0001\t\u0003Y#AE\"mkN$XM\u001d'j].L5O\u001d+fgRT!!\u0004\b\u0002\t1Lgn\u001b\u0006\u0002\u001f\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0013!\t\u0019B#D\u0001\r\u0013\t)BB\u0001\u0012BEN$(/Y2u\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003a\u0001\"a\u0005\u0001\u0002\u001bM|WO]2f\u00072,8\u000f^3s+\u0005Y\u0002CA\n\u001d\u0013\tiBB\u0001\fDYV\u001cH/\u001a:MS:\\G+Z:u\u0011\u0006\u0014h.Z:t\u00039\u0019x.\u001e:dK\u000ecWo\u001d;fe\u0002\n1\u0002Z3ti\u000ecWo\u001d;fe\u0006aA-Z:u\u00072,8\u000f^3sA\u0005\t\"/\u001a9mS\u000e\fG/[8o\r\u0006\u001cGo\u001c:\u0016\u0003\r\u0002\"\u0001J\u0014\u000e\u0003\u0015R\u0011AJ\u0001\u0006g\u000e\fG.Y\u0005\u0003Q\u0015\u0012Qa\u00155peR\f!C]3qY&\u001c\u0017\r^5p]\u001a\u000b7\r^8sA\u0005)1/\u001a;VaR\tA\u0006\u0005\u0002%[%\u0011a&\n\u0002\u0005+:LG\u000f\u000b\u0002\taA\u0011\u0011GO\u0007\u0002e)\u00111\u0007N\u0001\u0004CBL'BA\u001b7\u0003\u001dQW\u000f]5uKJT!a\u000e\u001d\u0002\u000b),h.\u001b;\u000b\u0003e\n1a\u001c:h\u0013\tY$G\u0001\u0006CK\u001a|'/Z#bG\"\f!\u0004^3ti\u0012+7\u000f^5oCRLwN\\+oI\u0016\u0014X*\u001b8JgJD#!\u0003 \u0011\u0005Ez\u0014B\u0001!3\u0005\u0011!Vm\u001d;\u0002+Q,7\u000f\u001e*fgR\f'\u000f\u001e)bkN,G\rT5oW\"\u0012!B\u0010\u0015\u0005\u0001\u0011;\u0005\n\u0005\u00022\u000b&\u0011aI\r\u0002\u0004)\u0006<\u0017!\u0002<bYV,\u0017%A%\u0002\u0017%tG/Z4sCRLwN\u001c")
public class ClusterLinkIsrTest
extends AbstractClusterLinkIntegrationTest {
    private final ClusterLinkTestHarness sourceCluster;
    private final ClusterLinkTestHarness destCluster;
    private final short replicationFactor;

    @Override
    public ClusterLinkTestHarness sourceCluster() {
        return this.sourceCluster;
    }

    @Override
    public ClusterLinkTestHarness destCluster() {
        return this.destCluster;
    }

    @Override
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override
    @BeforeEach
    public void setUp() {
        this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), "5000");
        this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2");
        super.setUp();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testDestinationUnderMinIsr() {
        this.numPartitions_$eq(1);
        TopicPartition tp = (TopicPartition)this.partitions().head();
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), this.sourceCluster().createTopic$default$4());
        Properties linkProps = new Properties();
        linkProps.setProperty(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp(), "-1");
        String x$12 = this.linkName();
        ClusterLinkTestHarness x$22 = this.sourceCluster();
        long x$4 = this.destCluster().createDestClusterLink$default$3();
        long x$5 = this.destCluster().createDestClusterLink$default$4();
        long x$6 = this.destCluster().createDestClusterLink$default$5();
        this.destCluster().createDestClusterLink(x$12, x$22, x$4, x$5, x$6, linkProps);
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.produceToSourceCluster(2);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        KafkaServer destLeader = this.destCluster().partitionLeader(tp);
        Buffer destFollowers = (Buffer)this.destCluster().servers().filterNot((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$1(destLeader, x$1)));
        destFollowers.foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
            x$2.shutdown();
            return BoxedUnit.UNIT;
        });
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$3(this, destLeader, tp)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkIsrTest.$anonfun$testDestinationUnderMinIsr$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.produceToSourceCluster(10);
        long destOffset = BoxesRunTime.unboxToLong((Object)this.logEndOffset(destLeader, tp).get());
        long sourceOffset = BoxesRunTime.unboxToLong((Object)this.logEndOffset(this.sourceCluster().partitionLeader(tp), tp).get());
        Assertions.assertTrue((sourceOffset > destOffset ? 1 : 0) != 0, (String)new StringBuilder(62).append("Records mirrored with under-min-isrs sourceOffset=").append(sourceOffset).append(" destOffset=").append(destOffset).toString());
        destFollowers.foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
            x$3.startup();
            return BoxedUnit.UNIT;
        });
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.produceToSourceCluster(10);
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testRestartPausedLink() {
        void var6_18;
        Tuple2 tuple2;
        this.numPartitions_$eq(1);
        TopicPartition tp = (TopicPartition)this.partitions().head();
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), this.sourceCluster().createTopic$default$4());
        this.destCluster().createDestClusterLink(this.linkName(), this.sourceCluster(), this.destCluster().createDestClusterLink$default$3(), this.destCluster().createDestClusterLink$default$4(), this.destCluster().createDestClusterLink$default$5(), this.destCluster().createDestClusterLink$default$6());
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.produceToSourceCluster(2);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        KafkaServer destLeader = this.destCluster().partitionLeader(tp);
        Buffer destFollowers = (Buffer)this.destCluster().servers().filterNot((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$1(destLeader, x$4)));
        ((KafkaServer)destFollowers.head()).shutdown();
        this.destCluster().updateBootstrapServers();
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testRestartPausedLink$2(this, destLeader, tp)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.waitForMirror((Seq<KafkaServer>)new .colon.colon((Object)destLeader, (List)Nil$.MODULE$), this.waitForMirror$default$2());
        Option<Object> endOffset = this.logEndOffset(destLeader, tp);
        this.destCluster().alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"true")}))), (Seq<KafkaServer>)new .colon.colon((Object)destLeader, (List)Nil$.MODULE$));
        this.produceToSourceCluster(2);
        destLeader.shutdown();
        ((KafkaServer)destFollowers.apply(1)).shutdown();
        this.destCluster().startBroker(destLeader.config().brokerId());
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testRestartPausedLink$4(destLeader, tp)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Assertions.assertEquals(endOffset, this.logEndOffset(destLeader, tp));
        destFollowers.foreach((Function1 & Serializable & scala.Serializable)follower -> {
            ClusterLinkIsrTest.$anonfun$testRestartPausedLink$6(this, follower);
            return BoxedUnit.UNIT;
        });
        .colon.colon expectedOffsets = new .colon.colon(endOffset, (List)new .colon.colon(endOffset, (List)Nil$.MODULE$));
        long l5 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        long l6 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            void computeUntilTrue_waitTime;
            Buffer computeUntilTrue_result;
            Buffer buffer;
            if (ClusterLinkIsrTest.$anonfun$testRestartPausedLink$9((Seq)expectedOffsets, buffer = (computeUntilTrue_result = ClusterLinkIsrTest.$anonfun$testRestartPausedLink$7(this, destFollowers, tp)))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        Buffer offsets = (Buffer)tuple2._1();
        Assertions.assertEquals((Object)expectedOffsets, (Object)var6_18);
        long l7 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l8 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (!ClusterLinkIsrTest.$anonfun$testRestartPausedLink$10(destLeader, tp)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$11());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.destCluster().alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), (Object)"false")}))), this.destCluster().alterClusterLink$default$3());
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.produceToSourceCluster(2);
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3());
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$1(KafkaServer destLeader$1, KafkaServer x$1) {
        KafkaServer kafkaServer = x$1;
        return !(kafkaServer != null ? !kafkaServer.equals(destLeader$1) : destLeader$1 != null);
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$3(ClusterLinkIsrTest $this, KafkaServer destLeader$1, TopicPartition tp$1) {
        $this.produceToSourceCluster(2);
        return ((Partition)destLeader$1.replicaManager().onlinePartition(tp$1).get()).isUnderMinIsr();
    }

    public static final /* synthetic */ String $anonfun$testDestinationUnderMinIsr$4() {
        return "Destination not under-min-isr with two brokers down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$1(KafkaServer destLeader$2, KafkaServer x$4) {
        KafkaServer kafkaServer = x$4;
        return !(kafkaServer != null ? !kafkaServer.equals(destLeader$2) : destLeader$2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$2(ClusterLinkIsrTest $this, KafkaServer destLeader$2, TopicPartition tp$2) {
        $this.produceToSourceCluster(2);
        return ((Partition)destLeader$2.replicaManager().onlinePartition(tp$2).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$3() {
        return "Destination not under-replicated with a broker down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$4(KafkaServer destLeader$2, TopicPartition tp$2) {
        boolean bl;
        try {
            if (destLeader$2.replicaManager().getPartitionOrException(tp$2).isBlockedOnMirrorSource()) {
                return false;
            }
            bl = true;
        }
        catch (Exception exception) {
            bl = false;
        }
        return bl;
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$5() {
        return "Paused partition should not be blocked on source";
    }

    public static final /* synthetic */ void $anonfun$testRestartPausedLink$6(ClusterLinkIsrTest $this, KafkaServer follower) {
        $this.destCluster().startBroker(follower.config().brokerId());
    }

    public static final /* synthetic */ Buffer $anonfun$testRestartPausedLink$7(ClusterLinkIsrTest $this, Buffer destFollowers$1, TopicPartition tp$2) {
        return (Buffer)destFollowers$1.map((Function1 & Serializable & scala.Serializable)x$5 -> $this.logEndOffset((KafkaServer)x$5, tp$2), Buffer$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$9(Seq expectedOffsets$1, Buffer x$6) {
        Buffer buffer = x$6;
        return !(buffer != null ? !buffer.equals(expectedOffsets$1) : expectedOffsets$1 != null);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$10(KafkaServer destLeader$2, TopicPartition tp$2) {
        return !((Partition)destLeader$2.replicaManager().onlinePartition(tp$2).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$11() {
        return "Destination follower unable to join ISR with paused link";
    }

    public ClusterLinkIsrTest() {
        SecurityProtocol x$1 = SecurityProtocol.SASL_SSL;
        int x$2 = 0;
        int x$3 = 3;
        Option<SecurityProtocol> x$4 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2();
        this.sourceCluster = new ClusterLinkTestHarness(x$1, x$4, x$2, x$3);
        SecurityProtocol x$5 = SecurityProtocol.SASL_PLAINTEXT;
        int x$6 = 100;
        int x$7 = 3;
        Option<SecurityProtocol> x$8 = ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$2();
        this.destCluster = new ClusterLinkTestHarness(x$5, x$8, x$6, x$7);
        this.replicationFactor = (short)3;
    }

    public static final /* synthetic */ Object $anonfun$testRestartPausedLink$9$adapted(Seq expectedOffsets$1, Buffer x$6) {
        return BoxesRunTime.boxToBoolean((boolean)ClusterLinkIsrTest.$anonfun$testRestartPausedLink$9(expectedOffsets$1, x$6));
    }
}

