/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.util.Properties;
import kafka.link.ClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.link.ClusterLinkTestHarness$;
import kafka.log.Defaults$;
import kafka.log.LogConfig$;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConfigDefaults$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0001I3AAD\b\u0001)!)\u0011\u0004\u0001C\u00015!9A\u0004\u0001b\u0001\n\u0003j\u0002B\u0002\u0013\u0001A\u0003%a\u0004C\u0004&\u0001\t\u0007I\u0011\t\u0014\t\r)\u0002\u0001\u0015!\u0003(\u0011\u001dY\u0003A1A\u0005B\u0019Ba\u0001\f\u0001!\u0002\u00139\u0003\"B\u0017\u0001\t\u0003q\u0003\"B \u0001\t\u0003q\u0003\"B!\u0001\t\u0013\u0011\u0005\"B#\u0001\t\u0003q\u0003\"B$\u0001\t\u0003q\u0003\"B%\u0001\t\u0003q#AI*pkJ\u001cW-\u00138ji&\fG/\u001a3MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002\u0011#\u0005!A.\u001b8l\u0015\u0005\u0011\u0012!B6bM.\f7\u0001A\n\u0003\u0001U\u0001\"AF\f\u000e\u0003=I!\u0001G\b\u00035\rcWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005Y\u0002C\u0001\f\u0001\u0003Y)8/Z*pkJ\u001cW-\u00138ji&\fG/\u001a3MS:\\W#\u0001\u0010\u0011\u0005}\u0011S\"\u0001\u0011\u000b\u0003\u0005\nQa]2bY\u0006L!a\t\u0011\u0003\u000f\t{w\u000e\\3b]\u00069Ro]3T_V\u00148-Z%oSRL\u0017\r^3e\u0019&t7\u000eI\u0001\u000eg>,(oY3DYV\u001cH/\u001a:\u0016\u0003\u001d\u0002\"A\u0006\u0015\n\u0005%z!AF\"mkN$XM\u001d'j].$Vm\u001d;ICJtWm]:\u0002\u001dM|WO]2f\u00072,8\u000f^3sA\u0005YA-Z:u\u00072,8\u000f^3s\u00031!Wm\u001d;DYV\u001cH/\u001a:!\u0003)\"Xm\u001d;D_:$(o\u001c7mKJ\u001c\u0005.\u00198hK^KG\u000f\u001b*fm\u0016\u00148/Z\"p]:,7\r^5p]N$\u0012a\f\t\u0003?AJ!!\r\u0011\u0003\tUs\u0017\u000e\u001e\u0015\u0003\u0011M\u0002\"\u0001N\u001f\u000e\u0003UR!AN\u001c\u0002\u0007\u0005\u0004\u0018N\u0003\u00029s\u00059!.\u001e9ji\u0016\u0014(B\u0001\u001e<\u0003\u0015QWO\\5u\u0015\u0005a\u0014aA8sO&\u0011a(\u000e\u0002\u0005)\u0016\u001cH/\u0001\ruKN$Hk\u001c9jG\u000e{gNZ5h'ft7MU;mKND#!C\u001a\u0002=Y,'/\u001b4z%\u00164XM]:f\u0007>tg.Z2uS>tW*\u001a;sS\u000e\u001cHCA\u0018D\u0011\u0015!%\u00021\u0001\u001f\u00039)\u0007\u0010]3di\u001a\u000b\u0017\u000e\\;sKN\f\u0001\u0004^3tiN{WO]2f\u00072,8\u000f^3s%\u0016\u001cH/\u0019:uQ\tY1'A\ruKN$Hj\\2bY2K7\u000f^3oKJ|e/\u001a:sS\u0012,\u0007F\u0001\u00074\u0003a!Xm\u001d;EKN$8i\u001c8ue>dG.\u001a:DQ\u0006tw-\u001a\u0015\u0003\u001bMBC\u0001\u0001'P!B\u0011A'T\u0005\u0003\u001dV\u00121\u0001V1h\u0003\u00151\u0018\r\\;fC\u0005\t\u0016aC5oi\u0016<'/\u0019;j_:\u0004")
public class SourceInitiatedLinkIntegrationTest
extends ClusterLinkIntegrationTest {
    private final boolean useSourceInitiatedLink;
    private final ClusterLinkTestHarness sourceCluster = new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4());
    private final ClusterLinkTestHarness destCluster = new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4());

    @Override
    public boolean useSourceInitiatedLink() {
        return this.useSourceInitiatedLink;
    }

    @Override
    public ClusterLinkTestHarness sourceCluster() {
        return this.sourceCluster;
    }

    @Override
    public ClusterLinkTestHarness destCluster() {
        return this.destCluster;
    }

    @Test
    public void testControllerChangeWithReverseConnections() {
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), this.sourceCluster().createTopic$default$4());
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyReverseConnectionMetrics(false);
        KafkaServer destController = this.destCluster().controller();
        this.destCluster().killBroker(this.destCluster().servers().indexOf((Object)destController));
        this.produceToSourceCluster(10);
        Buffer destServers = (Buffer)this.destCluster().servers().filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)SourceInitiatedLinkIntegrationTest.$anonfun$testControllerChangeWithReverseConnections$1(this, destController, x$1)));
        this.waitForMirror((Seq<KafkaServer>)destServers, this.waitForMirror$default$2());
        KafkaServer sourceController = this.sourceCluster().controller();
        this.sourceCluster().killBroker(this.sourceCluster().servers().indexOf((Object)sourceController));
        this.produceToSourceCluster(10);
        this.waitForMirror((Seq<KafkaServer>)destServers, this.waitForMirror$default$2());
        this.verifyReverseConnectionMetrics(true);
        this.verifyMirror(this.topic(), (Seq<KafkaServer>)destServers, this.verifyMirror$default$3());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testTopicConfigSyncRules() {
        String topicConfigSyncIncludeOverride = ((TraversableOnce)((TraversableOnce)CollectionConverters$.MODULE$.asScalaBufferConverter(ClusterLinkConfigDefaults$.MODULE$.TopicConfigSyncIncludeDefault()).asScala()).toSet().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{LogConfig$.MODULE$.MinCompactionLagMsProp(), LogConfig$.MODULE$.CompressionTypeProp()}))).$minus$minus((GenTraversableOnce)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{LogConfig$.MODULE$.MaxCompactionLagMsProp()})))).mkString(",");
        Map linkConfigsToUpdate = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), (Object)topicConfigSyncIncludeOverride), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), (Object)"100")}));
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), new Properties(null){
            {
                this.put(LogConfig$.MODULE$.MinCompactionLagMsProp(), "142857");
                this.put(LogConfig$.MODULE$.CompressionTypeProp(), "snappy");
            }
        });
        this.destCluster().alterClusterLink(this.linkName(), (Map<String, String>)linkConfigsToUpdate, this.destCluster().alterClusterLink$default$3());
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyReverseConnectionMetrics(false);
        Map expect = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)LogConfig$.MODULE$.MinCompactionLagMsProp()), (Object)"142857"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)LogConfig$.MODULE$.CompressionTypeProp()), (Object)"snappy"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)LogConfig$.MODULE$.MaxCompactionLagMsProp()), (Object)Long.toString(Defaults$.MODULE$.MaxCompactionLagMs()))}));
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!SourceInitiatedLinkIntegrationTest.$anonfun$testTopicConfigSyncRules$1(this, expect)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)SourceInitiatedLinkIntegrationTest.$anonfun$testTopicConfigSyncRules$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private void verifyReverseConnectionMetrics(boolean expectFailures) {
        this.verifyMetricRange$1((Seq)new .colon.colon((Object)this.sourceCluster().controller(), (List)Nil$.MODULE$), "controller-reverse-connection-count", "source", 1.0, 2.0);
        this.verifyMetricRange$1((Seq)new .colon.colon((Object)this.destCluster().controller(), (List)Nil$.MODULE$), "controller-reverse-connection-count", "destination", 1.0, 2.0);
        this.verifyMetricRange$1(this.sourceCluster().aliveServers(), "reverse-connection-count", "source", 2.0, 10.0);
        this.verifyMetricRange$1(this.destCluster().aliveServers(), "reverse-connection-count", "destination", 2.0, 10.0);
        this.verifyMetricRange$1(this.sourceCluster().aliveServers(), "reverse-connection-created-total", "source", 2.0, 1000.0);
        this.verifyMetricRange$1(this.destCluster().aliveServers(), "reverse-connection-created-total", "destination", 2.0, 1000.0);
        this.verifyMetricRange$1(this.sourceCluster().aliveServers(), "reverse-connection-closed-total", "source", 0.0, 1000.0);
        this.verifyMetricRange$1(this.destCluster().aliveServers(), "reverse-connection-closed-total", "destination", 0.0, 1000.0);
        String x$1 = "reverse-connection-failed-total";
        Seq<KafkaServer> x$2 = this.sourceCluster().aliveServers();
        String x$4 = this.verifyKafkaMetric$default$2();
        Option<String> x$5 = this.verifyKafkaMetric$default$4();
        Map<String, String> x$6 = this.verifyKafkaMetric$default$5();
        boolean x$7 = this.verifyKafkaMetric$default$7();
        this.verifyKafkaMetric(x$1, x$4, expectFailures, x$5, x$6, x$2, x$7);
        Map sourceTag = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"mode"), (Object)"source")}));
        Map dstTag = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"mode"), (Object)"destination")}));
        double sourceLinks = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "link-count", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4());
        double sourceConns = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "reverse-connection-count", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4());
        double sourceCreated = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "reverse-connection-created-total", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4());
        double sourceClosed = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "reverse-connection-closed-total", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4());
        double destLinks = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "link-count", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4());
        double destConns = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "reverse-connection-count", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4());
        double destCreated = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "reverse-connection-created-total", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4());
        double destClosed = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "reverse-connection-closed-total", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4());
        SourceInitiatedLinkIntegrationTest.verifyRange$1(sourceLinks, 1.0, 0.0, "Source links vs source alive servers");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(sourceConns, destConns, 2.0, "Dest vs source active connections");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(sourceConns, sourceCreated - sourceClosed, 2.0, "Source active connections vs created-closed");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(destLinks, 1.0, 0.0, "Dest links vs dest alive servers");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(destConns, destCreated - destClosed, 2.0, "Dest active connections vs created-closed");
    }

    @Test
    public void testSourceClusterRestart() {
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), 2, this.sourceCluster().createTopic$default$4());
        this.produceToSourceCluster(100);
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        this.destCluster().linkTopic(this.topic(), (short)2, this.linkName(), this.destCluster().linkTopic$default$4());
        this.shutdownSource$1();
        this.restartSource$1();
        this.destCluster().deleteTopic(this.topic(), this.destCluster().deleteTopic$default$2());
        TestUtils$.MODULE$.verifyTopicDeletion(this.destCluster().zkClient(), this.topic(), this.numPartitions(), (Seq<KafkaServer>)this.destCluster().servers());
        this.destCluster().linkTopic(this.topic(), (short)2, this.linkName(), this.destCluster().linkTopic$default$4());
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
    }

    @Test
    public void testLocalListenerOverride() {
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), this.sourceCluster().createTopic$default$4());
        this.produceToSourceCluster(20);
        Properties sourceProps = (Properties)this.sourceLinkProps(this.sourceLinkProps$default$1()).get();
        ((IterableLike)((TraversableLike)CollectionConverters$.MODULE$.asScalaSetConverter(sourceProps.stringPropertyNames()).asScala()).filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)x$2.startsWith("local.")))).foreach((Function1 & Serializable & scala.Serializable)x$1 -> sourceProps.remove(x$1));
        sourceProps.setProperty(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), this.sourceCluster().interBrokerListenerName().value());
        sourceProps.setProperty(new StringBuilder(23).append("local.").append("security.protocol").toString(), this.sourceCluster().interBrokerSecurityProtocol().name);
        String x$12 = this.linkName();
        Some x$22 = new Some((Object)sourceProps);
        Properties x$3 = this.createClusterLink$default$2();
        boolean x$4 = this.createClusterLink$default$4();
        this.createClusterLink(x$12, x$3, (Option<Properties>)x$22, x$4);
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3());
    }

    @Test
    public void testDestControllerChange() {
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        this.sourceCluster().createTopic(this.topic(), this.numPartitions(), this.replicationFactor(), this.sourceCluster().createTopic$default$4());
        this.destCluster().changeController();
        this.destCluster().linkTopic(this.topic(), this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
    }

    public static final /* synthetic */ boolean $anonfun$testControllerChangeWithReverseConnections$1(SourceInitiatedLinkIntegrationTest $this, KafkaServer destController$1, KafkaServer x$1) {
        KafkaServer kafkaServer = x$1;
        KafkaServer kafkaServer2 = $this.destCluster().serverWithBrokerId(destController$1.config().brokerId());
        return kafkaServer == null ? kafkaServer2 != null : !kafkaServer.equals(kafkaServer2);
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSyncRules$1(SourceInitiatedLinkIntegrationTest $this, Map expect$1) {
        return $this.destCluster().describeTopicConfigEquals($this.topic(), (Map<String, String>)expect$1);
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSyncRules$2() {
        return "min.compaction.lag.ms, compression.type should sync, max.compaction.lag.ms shouldn't sync";
    }

    private final void verifyMetricRange$1(Seq servers, String name, String mode, double minValue, double maxValue) {
        double value = this.kafkaMetricValue((Seq<KafkaServer>)servers, name, (Map<String, String>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"mode"), (Object)mode)}))), this.kafkaMetricValue$default$4());
        Assertions.assertTrue((value >= minValue ? 1 : 0) != 0, (String)new StringBuilder(22).append("Metric ").append(name).append(" too low for ").append(mode).append(": ").append(value).toString());
        Assertions.assertTrue((value <= maxValue ? 1 : 0) != 0, (String)new StringBuilder(23).append("Metric ").append(name).append(" too high for ").append(mode).append(": ").append(value).toString());
    }

    private static final void verifyRange$1(double first, double second, double maxDiff, String desc) {
        Assertions.assertTrue((Math.abs(first - second) <= maxDiff ? 1 : 0) != 0, (String)new StringBuilder(25).append(desc).append(" : (").append(first).append(", ").append(second).append(") not within ").append(maxDiff).append(" range").toString());
    }

    private final void shutdownSource$1() {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.sourceCluster().servers().length()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.sourceCluster().killBroker(i - this.sourceCluster().firstBrokerId()));
    }

    private final void restartSource$1() {
        this.sourceCluster().restartDeadBrokers(this.sourceCluster().restartDeadBrokers$default$1());
        this.sourceCluster().updateBootstrapServers();
    }

    public SourceInitiatedLinkIntegrationTest() {
        this.useSourceInitiatedLink = true;
    }
}

