/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.util.Properties;
import kafka.link.ClusterLinkFailureTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.link.FailureType$AuthenticationFailure$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.server.link.ActiveClusterLink$;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.FailedClusterLink$;
import kafka.server.link.LinkState;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0001a2AAB\u0004\u0001\u0019!)\u0011\u0003\u0001C\u0001%!9A\u0003\u0001b\u0001\n\u0003*\u0002B\u0002\u000f\u0001A\u0003%a\u0003C\u0003\u001e\u0001\u0011\u0005a\u0004C\u00030\u0001\u0011\u0005aD\u0001\u0010T_V\u00148-Z%oSRL\u0017\r^3e\u0019&t7NR1jYV\u0014X\rV3ti*\u0011\u0001\"C\u0001\u0005Y&t7NC\u0001\u000b\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0007\u0011\u00059yQ\"A\u0004\n\u0005A9!AF\"mkN$XM\u001d'j].4\u0015-\u001b7ve\u0016$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005\u0019\u0002C\u0001\b\u0001\u0003Y)8/Z*pkJ\u001cW-\u00138ji&\fG/\u001a3MS:\\W#\u0001\f\u0011\u0005]QR\"\u0001\r\u000b\u0003e\tQa]2bY\u0006L!a\u0007\r\u0003\u000f\t{w\u000e\\3b]\u00069Ro]3T_V\u00148-Z%oSRL\u0017\r^3e\u0019&t7\u000eI\u0001\u001fi\u0016\u001cH\u000fT8dC2\fU\u000f\u001e5f]RL7-\u0019;j_:4\u0015-\u001b7ve\u0016$\u0012a\b\t\u0003/\u0001J!!\t\r\u0003\tUs\u0017\u000e\u001e\u0015\u0003\t\r\u0002\"\u0001J\u0017\u000e\u0003\u0015R!AJ\u0014\u0002\u0007\u0005\u0004\u0018N\u0003\u0002)S\u00059!.\u001e9ji\u0016\u0014(B\u0001\u0016,\u0003\u0015QWO\\5u\u0015\u0005a\u0013aA8sO&\u0011a&\n\u0002\u0005)\u0016\u001cH/A\u0014uKN$8k\\;sG\u0016\u001cE.^:uKJ\u0014Vm\u001d;beR<\u0016\u000e\u001e5GC&d\u0017N\\4MS:\\\u0007FA\u0003$Q\u0011\u0001!'\u000e\u001c\u0011\u0005\u0011\u001a\u0014B\u0001\u001b&\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0002o\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0001")
public class SourceInitiatedLinkFailureTest
extends ClusterLinkFailureTest {
    private final boolean useSourceInitiatedLink;

    @Override
    public boolean useSourceInitiatedLink() {
        return this.useSourceInitiatedLink;
    }

    @Test
    public void testLocalAuthenticationFailure() {
        ObjectRef newJaasConfig = ObjectRef.create(null);
        this.verifyFailureAndRecovery(FailureType$AuthenticationFailure$.MODULE$, (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            newJaasConfig$1.elem = this.updateCredentials(this.sourceCluster());
        }, (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.sourceCluster().alterClusterLink(this.linkName(), (Map<String, String>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new StringBuilder(22).append("local.").append("sasl.jaas.config").toString()), (Object)((String)newJaasConfig$1.elem))}))), this.sourceCluster().alterClusterLink$default$3()), this.verifyFailureAndRecovery$default$4());
        InvalidConfigurationException e = (InvalidConfigurationException)Assertions.assertThrows(InvalidConfigurationException.class, () -> this.sourceCluster().createClusterLink("missingDestLink", (Properties)this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), (Option<String>)new Some((Object)((KafkaServer)this.destCluster().servers().head()).clusterId()), this.sourceCluster().createClusterLink$default$4()));
        Assertions.assertTrue((boolean)e.getMessage().contains("destination cluster does not have a link named 'missingDestLink'"), (String)new StringBuilder(18).append("Unexpected error: ").append(e.getMessage()).toString());
    }

    @Test
    public void testSourceClusterRestartWithFailingLink() {
        this.numPartitions_$eq(1);
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), 2, this.sourceCluster().createTopic$default$4());
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        this.destCluster().linkTopic(this.topic(), (short)2, this.linkName(), this.destCluster().linkTopic$default$4());
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        String linkName2 = "badlink";
        this.createClusterLink(linkName2, this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        String invalidJaas = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;";
        this.sourceCluster().alterClusterLink(linkName2, (Map<String, String>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"sasl.jaas.config"), (Object)invalidJaas)}))), (Seq<KafkaServer>)Nil$.MODULE$);
        SourceInitiatedLinkFailureTest.waitForLinkState$1(this.sourceCluster(), linkName2, (LinkState)FailedClusterLink$.MODULE$);
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.sourceCluster().servers().indices().foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.sourceCluster().killBroker(i));
        this.sourceCluster().restartDeadBrokers(this.sourceCluster().restartDeadBrokers$default$1());
        this.sourceCluster().updateBootstrapServers();
        SourceInitiatedLinkFailureTest.waitForLinkState$1(this.sourceCluster(), this.linkName(), (LinkState)ActiveClusterLink$.MODULE$);
        SourceInitiatedLinkFailureTest.waitForLinkState$1(this.destCluster(), this.linkName(), (LinkState)ActiveClusterLink$.MODULE$);
        this.produceToSourceCluster(10);
        long x$1 = 30000L;
        Seq<KafkaServer> x$2 = this.waitForMirror$default$1();
        this.waitForMirror(x$2, x$1);
    }

    public static final /* synthetic */ LinkState $anonfun$testSourceClusterRestartWithFailingLink$1(ClusterLinkTestHarness cluster$1, String linkName$1) {
        return ((ClusterLinkManager)((KafkaBroker)cluster$1.servers().head()).clusterLinkManager()).linkState(linkName$1);
    }

    public static final /* synthetic */ boolean $anonfun$testSourceClusterRestartWithFailingLink$2(LinkState expectedState$1, LinkState x$1) {
        LinkState linkState = x$1;
        return !(linkState != null ? !linkState.equals(expectedState$1) : expectedState$1 != null);
    }

    /*
     * WARNING - void declaration
     */
    private static final void waitForLinkState$1(ClusterLinkTestHarness cluster, String linkName, LinkState expectedState) {
        void var3_9;
        Tuple2 tuple2;
        long l = TestUtils$.MODULE$.computeUntilTrue$default$3();
        long l2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            void computeUntilTrue_waitTime;
            LinkState computeUntilTrue_result;
            LinkState linkState;
            if (SourceInitiatedLinkFailureTest.$anonfun$testSourceClusterRestartWithFailingLink$2(expectedState, linkState = (computeUntilTrue_result = SourceInitiatedLinkFailureTest.$anonfun$testSourceClusterRestartWithFailingLink$1(cluster, linkName)))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        LinkState linkState = (LinkState)tuple2._1();
        Assertions.assertEquals((Object)expectedState, (Object)var3_9);
    }

    public SourceInitiatedLinkFailureTest() {
        this.useSourceInitiatedLink = true;
    }

    public static final /* synthetic */ Object $anonfun$testSourceClusterRestartWithFailingLink$2$adapted(LinkState expectedState$1, LinkState x$1) {
        return BoxesRunTime.boxToBoolean((boolean)SourceInitiatedLinkFailureTest.$anonfun$testSourceClusterRestartWithFailingLink$2(expectedState$1, x$1));
    }
}

