package kafka.link;

import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: SourceInitiatedLinkAuthorizationTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:4")})
@ScalaSignature(bytes = "\u0006\u0005\u0005]a\u0001B\u0005\u000b\u0001=AQ\u0001\u0006\u0001\u0005\u0002UAQa\u0006\u0001\u0005BaAQ!\r\u0001\u0005BIBQA\u0018\u0001\u0005B}CQa\u001a\u0001\u0005B!DQA\u001c\u0001\u0005B=DQ!\u001e\u0001\u0005BYDQ\u0001 \u0001\u0005\u0002u\u0014AeU8ve\u000e,\u0017J\\5uS\u0006$X\r\u001a'j].\fU\u000f\u001e5pe&T\u0018\r^5p]R+7\u000f\u001e\u0006\u0003\u00171\tA\u0001\\5oW*\tQ\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\u0001\u0002CA\t\u0013\u001b\u0005Q\u0011BA\n\u000b\u0005q\u0019E.^:uKJd\u0015N\\6BkRDwN]5{CRLwN\u001c+fgR\fa\u0001P5oSRtD#\u0001\f\u0011\u0005E\u0001\u0011!B:fiV\u0003HCA\r !\tQR$D\u0001\u001c\u0015\u0005a\u0012!B:dC2\f\u0017B\u0001\u0010\u001c\u0005\u0011)f.\u001b;\t\u000b\u0001\u0012\u0001\u0019A\u0011\u0002\u0011Q,7\u000f^%oM>\u0004\"AI\u0016\u000e\u0003\rR!\u0001J\u0013\u0002\u0007\u0005\u0004\u0018N\u0003\u0002'O\u00059!.\u001e9ji\u0016\u0014(B\u0001\u0015*\u0003\u0015QWO\\5u\u0015\u0005Q\u0013aA8sO&\u0011Af\t\u0002\t)\u0016\u001cH/\u00138g_\"\u0012!A\f\t\u0003E=J!\u0001M\u0012\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.A\u0018uKN$()\u001b#je\u0016\u001cG/[8oC2d\u0015N\\6EKN\u001c'/\u001b2f\u0007>tg-[4t!\u0016\u0014X.[:tS>t7\u000f\u0006\u0002\u001ag!)Ag\u0001a\u0001k\u00051\u0011/^8sk6\u0004\"AN\u001f\u000f\u0005]Z\u0004C\u0001\u001d\u001c\u001b\u0005I$B\u0001\u001e\u000f\u0003\u0019a$o\\8u}%\u0011AhG\u0001\u0007!J,G-\u001a4\n\u0005yz$AB*ue&twM\u0003\u0002=7!\"1!\u0011#F!\t\u0011#)\u0003\u0002DG\tAA)[:bE2,G-A\u0003wC2,X-I\u0001G\u0003\r\"\u0015n]1cY\u0016$\u0007EZ8sAM|WO]2fA%t\u0017\u000e^5bi\u0016$\u0007\u0005\\5oWNDCa\u0001%O\u001fB\u0011\u0011\nT\u0007\u0002\u0015*\u00111*J\u0001\u0007a\u0006\u0014\u0018-\\:\n\u00055S%!\u0005)be\u0006lW\r^3sSj,G\rV3ti\u0006!a.Y7fC\u0005\u0001\u0016AI>eSN\u0004H.Y=OC6,WPL>be\u001e,X.\u001a8ug^KG\u000f\u001b(b[\u0016\u001cX\u0010\u000b\u0003\u0004%bK\u0006CA*W\u001b\u0005!&BA+K\u0003!\u0001(o\u001c<jI\u0016\u0014\u0018BA,U\u0005-1\u0016\r\\;f'>,(oY3\u0002\u000fM$(/\u001b8hg2\u0012!\fX\u0011\u00027\u0006\u0011!p[\u0011\u0002;\u0006)1N]1gi\u0006yB/Z:u%\u00164XM]:f\u0003:$7k^1q\u0003V$\bn\u001c:ju\u0006$\u0018n\u001c8\u0015\u0005e\u0001\u0007\"\u0002\u001b\u0005\u0001\u0004)\u0004\u0006\u0002\u0003B\t\n\f\u0013aY\u0001*\u001d>$\b%\u00199qY&\u001c\u0017M\u00197fA\u0019|'\u000fI:pkJ\u001cW\rI5oSRL\u0017\r^3eA1Lgn[:)\t\u0011Aej\u0014\u0015\u0005\tICf\r\f\u0002[9\u0006qB/Z:u\u0003\u000ed7+\u001f8d)\u0006\u001c8n\u0015;bi\u0016l\u0015M\\1hK6,g\u000e\u001e\u000b\u00033%DQ\u0001N\u0003A\u0002UBC!B!E\u000b\"\"Q\u0001\u0013(PQ\u0011)!\u000bW7-\u0005ic\u0016\u0001\u000f;fgR\f5\r\\*z]\u000e$\u0016m]6Ti\u0006$X-T1oC\u001e,W.\u001a8u\u0011\u0006tG\r\\3t\u0005J|7.\u001a:BkRDwN]5{CRLwN\u001c\u000b\u00033ADQ\u0001\u000e\u0004A\u0002UBCAB!E\u000b\"\"a\u0001\u0013(PQ\u00111!\u000b\u0017;-\u0003i\u000b\u0001\u0005^3ti\u0006+H\u000f[8sSj\fG/[8o\r>\u0014\u0018i\u00197NS\u001e\u0014\u0018\r^5p]R\u0011\u0011d\u001e\u0005\u0006i\u001d\u0001\r!\u000e\u0015\u0005\u000f\u0005#U\t\u000b\u0003\b\u0011:{\u0005\u0006B\u0004S1nd#A\u0017/\u0002SQ,7\u000f\u001e*fm\u0016\u00148/Z\"p]:,7\r^5p]\u0006+H\u000f[8sSj\fG/[8o\r\u0006LG.\u001e:f)\tIb\u0010C\u00035\u0011\u0001\u0007Q\u0007\u000b\u0003\t\u0011:{\u0005&\u0002\u0005S1\u0006\rA&\u0001.)\r\u0001\t9\u0001RA\u0007!\r\u0011\u0013\u0011B\u0005\u0004\u0003\u0017\u0019#a\u0001+bO\u0006\u0012\u0011qB\u0001\fS:$Xm\u001a:bi&|g\u000e\u000b\u0004\u0001\u0003\u000f!\u00151C\u0011\u0003\u0003+\t1CY1{K2T4\u000f[1sI~\u001bw.\u001e8uuQ\u0002")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkAuthorizationTest.class */
public class SourceInitiatedLinkAuthorizationTest extends ClusterLinkAuthorizationTest {
    @Override // kafka.link.ClusterLinkAuthorizationTest, kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        AclBinding aclBinding = aclBinding(brokerUser(), ResourceType.TOPIC, "_confluent-link-metadata", AclOperation.CREATE);
        destCluster().addAcls((Seq) destReverseConnectionAcls().$plus$plus(new $colon.colon(aclBinding, Nil$.MODULE$)));
        sourceCluster().addAcls((Seq) sourceReverseConnectionAcls().$plus$plus(new $colon.colon(aclBinding, Nil$.MODULE$)));
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testBiDirectionalLinkDescribeConfigsPermissions(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Not applicable for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testReverseAndSwapAuthorization(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testAclSyncTaskStateManagement(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testAclSyncTaskStateManagementHandlesBrokerAuthorization(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testAuthorizationForAclMigration(String str) {
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testReverseConnectionAuthorizationFailure(String str) {
        addAcls();
        sourceCluster().deleteAcls(new $colon.colon(sourceLinkUserClusterAlterAcl(), Nil$.MODULE$));
        destCluster().deleteAcls(new $colon.colon(destLinkUserClusterAlterAcl(), Nil$.MODULE$));
        prepareSourceTopic();
        ClusterLinkTestHarness destCluster = destCluster();
        Uuid createClusterLink = destCluster.createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), new Some(((KafkaBroker) sourceCluster().brokers().head()).clusterId()), destCluster.createClusterLink$default$4());
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.createClusterLink(this.linkName(), (Properties) this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), new Some(((KafkaBroker) this.destCluster().brokers().head()).clusterId()), sourceCluster.createClusterLink$default$4());
        });
        ClusterLinkData clusterLinkData = new ClusterLinkData(linkName(), createClusterLink, new Some(((KafkaBroker) destCluster().brokers().head()).clusterId()), None$.MODULE$, false);
        Properties properties = new Properties();
        ConfigDef.convertToStringMapWithPasswordValues((Map) sourceLinkProps(sourceLinkProps$default$1()).get()).forEach((str2, str3) -> {
            properties.setProperty(str2, str3);
        });
        if (useBidirectionalLink()) {
            properties.setProperty(ClusterLinkConfig$.MODULE$.RemoteLinkConnectionModeProp(), ConnectionMode$Inbound$.MODULE$.name());
        }
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        ClusterLinkFactory.LinkManager clusterLinkManager = sourceCluster().controller().clusterLinkManager();
        clusterLinkManager.createClusterLink(clusterLinkData, create, clusterLinkManager.configEncoder().encode(properties));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$3(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Connections not failed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(0.0d, kafkaMetricMaxValue(sourceCluster().brokers(), "reverse-connection-count", "cluster-link-metrics", new Some(linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceLinkMode().lowerCaseName())})), kafkaMetricMaxValue$default$6(), kafkaMetricMaxValue$default$7(), kafkaMetricMaxValue$default$8()), 0.001d);
        sourceCluster().addAcls(new $colon.colon(sourceLinkUserClusterAlterAcl(), Nil$.MODULE$));
        destCluster().addAcls(new $colon.colon(destLinkUserClusterAlterAcl(), Nil$.MODULE$));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$5(this)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("Connections not created on source");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$7(this)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                Assertions.fail("Connections not created on destination");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$3(SourceInitiatedLinkAuthorizationTest sourceInitiatedLinkAuthorizationTest) {
        return sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue(sourceInitiatedLinkAuthorizationTest.sourceCluster().brokers(), "reverse-connection-failed-total", "cluster-link-metrics", new Some(sourceInitiatedLinkAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceInitiatedLinkAuthorizationTest.sourceLinkMode().lowerCaseName())})), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$6(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$7(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$8()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$4() {
        return "Connections not failed";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$5(SourceInitiatedLinkAuthorizationTest sourceInitiatedLinkAuthorizationTest) {
        return sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue(sourceInitiatedLinkAuthorizationTest.sourceCluster().brokers(), "reverse-connection-count", "cluster-link-metrics", new Some(sourceInitiatedLinkAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceInitiatedLinkAuthorizationTest.sourceLinkMode().lowerCaseName())})), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$6(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$7(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$8()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$6() {
        return "Connections not created on source";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$7(SourceInitiatedLinkAuthorizationTest sourceInitiatedLinkAuthorizationTest) {
        return sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue(sourceInitiatedLinkAuthorizationTest.destCluster().brokers(), "reverse-connection-count", "cluster-link-metrics", new Some(sourceInitiatedLinkAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceInitiatedLinkAuthorizationTest.destinationLinkMode().lowerCaseName())})), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$6(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$7(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$8()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$8() {
        return "Connections not created on destination";
    }

    public SourceInitiatedLinkAuthorizationTest() {
        useSourceInitiatedLink_$eq(true);
    }
}
