/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.link.AbstractClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.server.AbstractFetcherManager;
import kafka.server.DynamicConfig;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.LeaderEndPoint;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFetcher;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkLeaderEndPoint;
import kafka.server.link.ClusterLinkLeaderRequestBuilder;
import kafka.server.link.FetchResponseSize;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.HashMap;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0005\u0005Ed\u0001B\u0006\r\u0001EAQA\u0006\u0001\u0005\u0002]AQ!\u0007\u0001\u0005\u0002iAQA\u0014\u0001\u0005\u0002=CQ!\u0016\u0001\u0005\u0002YCQ\u0001\u0018\u0001\u0005\u0002uCQa\u0019\u0001\u0005\u0002\u0011DQA\u001b\u0001\u0005\u0002-Dq!!\u000e\u0001\t\u0003\t9\u0004C\u0004\u0002<\u0001!I!!\u0010\t\u000f\u0005}\u0002\u0001\"\u0003\u0002B\ty2\t\\;ti\u0016\u0014H*\u001b8l#V|G/Y%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u00055q\u0011\u0001\u00027j].T\u0011aD\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001!\u0003\u0005\u0002\u0014)5\tA\"\u0003\u0002\u0016\u0019\t\u0011\u0013IY:ue\u0006\u001cGo\u00117vgR,'\u000fT5oW&sG/Z4sCRLwN\u001c+fgR\fa\u0001P5oSRtD#\u0001\r\u0011\u0005M\u0001\u0011A\u000b;fgR$Um\u001d;j]\u0006$\u0018n\u001c8DYV\u001cH/\u001a:MS:\\'I]8lKJdUM^3m#V|G/\u0019\u000b\u00047\u0005r\u0003C\u0001\u000f \u001b\u0005i\"\"\u0001\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0001j\"\u0001B+oSRDQA\t\u0002A\u0002\r\na!];peVl\u0007C\u0001\u0013,\u001d\t)\u0013\u0006\u0005\u0002';5\tqE\u0003\u0002)!\u00051AH]8pizJ!AK\u000f\u0002\rA\u0013X\rZ3g\u0013\taSF\u0001\u0004TiJLgn\u001a\u0006\u0003UuAQa\f\u0002A\u0002A\n1bY8pe\u0012Lg.\u0019;peB\u0011A$M\u0005\u0003eu\u0011qAQ8pY\u0016\fg\u000e\u000b\u0003\u0003i\u0001\u000b\u0005CA\u001b?\u001b\u00051$BA\u001c9\u0003\u0019\u0001\u0018M]1ng*\u0011\u0011HO\u0001\bUV\u0004\u0018\u000e^3s\u0015\tYD(A\u0003kk:LGOC\u0001>\u0003\ry'oZ\u0005\u0003\u007fY\u0012\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0003\t\u000b\u0001f\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~]\r|wN\u001d3j]\u0006$xN]\u001f|cuDCA\u0001#K\u0017B\u0011Q\tS\u0007\u0002\r*\u0011qIN\u0001\taJ|g/\u001b3fe&\u0011\u0011J\u0012\u0002\r\u001b\u0016$\bn\u001c3T_V\u00148-Z\u0001\u0006m\u0006dW/\u001a\u0017\u0002\u0019\u0006\nQ*A\bbY2\u001cu.\u001c2j]\u0006$\u0018n\u001c8t\u0003}!Xm\u001d;EKN$\u0018N\\1uS>t7\t\\;ti\u0016\u0014H*\u001b8l#V|G/\u0019\u000b\u00047A\u000b\u0006\"\u0002\u0012\u0004\u0001\u0004\u0019\u0003\"B\u0018\u0004\u0001\u0004\u0001\u0004\u0006B\u00025\u0001\u0006CCa\u0001#K)2\nA*\u0001\u0019uKN$H)Z:uS:\fG/[8o\u00072,8\u000f^3s\u0019&t7.U;pi\u0006<\u0016\u000e\u001e5Ce>\\WM\u001d*fgR\f'\u000f\u001e\u000b\u00047]C\u0006\"\u0002\u0012\u0005\u0001\u0004\u0019\u0003\"B\u0018\u0005\u0001\u0004\u0001\u0004\u0006\u0002\u00035\u0001\u0006CC\u0001\u0002#K72\nA*A\u0013uKN$H)Z:uS:\fG/[8o\u0019\u0006<G*\u001b8l\r\u0016$8\r[3s)\"\u0014x\u000e\u001e;mKR\u00191DX0\t\u000b\t*\u0001\u0019A\u0012\t\u000b=*\u0001\u0019\u0001\u0019)\t\u0015!\u0004)\u0011\u0015\u0005\u000b\u0011S%\rL\u0001M\u0003Y!Xm\u001d;T_V\u00148-Z\"mkN$XM])v_R\fGcA\u000efM\")!E\u0002a\u0001G!)qF\u0002a\u0001a!\"a\u0001\u000e!BQ\u00111AIS5-\u00031\u000b\u0011E^3sS\u001aLH)Z:uS:\fG/[8o\u00072,8\u000f^3s\u0019&t7.U;pi\u0006$B\u0001\\;\u0002\bA\u0011Qn]\u0007\u0002]*\u0011q\u000e]\u0001\u0007G>lWn\u001c8\u000b\u0005=\t(B\u0001:=\u0003\u0019\t\u0007/Y2iK&\u0011AO\u001c\u0002\u0005+VLG\rC\u0003w\u000f\u0001\u0007q/A\u0005sKN|WO]2fgB\u0019\u0001p_?\u000e\u0003eT!A_\u000f\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002}s\n\u00191+Z9\u0011\u0007y\f\u0019!D\u0001\u0000\u0015\r\t\tA\\\u0001\u0007G>tg-[4\n\u0007\u0005\u0015qP\u0001\bD_:4\u0017n\u001a*fg>,(oY3\t\u000f\u0005%q\u00011\u0001\u0002\f\u0005I\u0011/^8uC6{G-\u001a\t\u0005\u0003\u001b\tyC\u0004\u0003\u0002\u0010\u0005%b\u0002BA\t\u0003KqA!a\u0005\u0002$9!\u0011QCA\u0011\u001d\u0011\t9\"a\b\u000f\t\u0005e\u0011Q\u0004\b\u0004M\u0005m\u0011\"A\u001f\n\u0005Id\u0014BA\br\u0013\ty\u0007/C\u0002\u0002\u00029L1!a\n\u0000\u0003%Ig\u000e^3s]\u0006d7/\u0003\u0003\u0002,\u00055\u0012\u0001E\"p]\u001adW/\u001a8u\u0007>tg-[4t\u0015\r\t9c`\u0005\u0005\u0003c\t\u0019D\u0001\u000bDYV\u001cH/\u001a:MS:\\\u0017+^8uC6{G-\u001a\u0006\u0005\u0003W\ti#A\bwKJLg-_)v_R\fWj\u001c3f)\rY\u0012\u0011\b\u0005\b\u0003\u0013A\u0001\u0019AA\u0006\u0003\u0001\"Wm\u001d;DYV\u001cH/\u001a:MS:\\'+\u001a9mS\u000e\f7\u000f\u00165s_R$H.\u001a3\u0015\u0003A\nqC^3sS\u001aLh)\u001a;dQJ+7\u000f]8og\u0016\u001c\u0016N_3\u0015\u000bm\t\u0019%a\u0012\t\r\u0005\u0015#\u00021\u0001m\u0003\u0019a\u0017N\\6JI\"9\u0011\u0011\n\u0006A\u0002\u0005-\u0013\u0001D3ya\u0016\u001cG/\u001a3TSj,\u0007#\u0002\u000f\u0002N\u0005E\u0013bAA(;\t1q\n\u001d;j_:\u0004B!a\u0015\u0002\\5\u0011\u0011Q\u000b\u0006\u0004\u001b\u0005]#bAA-\u001d\u000511/\u001a:wKJLA!!\u0018\u0002V\t\tb)\u001a;dQJ+7\u000f]8og\u0016\u001c\u0016N_3)\r\u0001\t\tGSA7!\u0011\t\u0019'!\u001b\u000e\u0005\u0005\u0015$bAA4q\u0005\u0019\u0011\r]5\n\t\u0005-\u0014Q\r\u0002\u0004)\u0006<\u0017EAA8\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8")
public class ClusterLinkQuotaIntegrationTest
extends AbstractClusterLinkIntegrationTest {
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDestinationClusterLinkBrokerLevelQuota(String quorum, boolean coordinator) {
        scala.collection.immutable.Seq resources = ((IterableOnceOps)((IterableOps)this.destCluster().brokers().map((Function1 & Serializable)x$1 -> BoxesRunTime.boxToInteger((int)ClusterLinkQuotaIntegrationTest.$anonfun$testDestinationClusterLinkBrokerLevelQuota$1(x$1)))).map((Function1 & Serializable)brokerId -> ClusterLinkQuotaIntegrationTest.$anonfun$testDestinationClusterLinkBrokerLevelQuota$2(BoxesRunTime.unboxToInt((Object)brokerId)))).toSeq();
        this.verifyDestinationClusterLinkQuota((Seq<ConfigResource>)resources, ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDestinationClusterLinkQuota(String quorum, boolean coordinator) {
        .colon.colon resources = new .colon.colon((Object)new ConfigResource(ConfigResource.Type.BROKER, ""), (List)Nil$.MODULE$);
        Uuid linkId = this.verifyDestinationClusterLinkQuota((Seq<ConfigResource>)resources, ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND);
        this.verifyFetchResponseSize(linkId, (Option<FetchResponseSize>)None$.MODULE$);
        ClusterLinkTestHarness qual$1 = this.destCluster();
        Properties x$1 = qual$1.createConfluentAdminClient$default$1();
        ConfluentAdmin destAdmin = qual$1.createConfluentAdminClient(x$1);
        AlterConfigOp alterOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.fetch.response.total.bytes", "10000"), AlterConfigOp.OpType.SET);
        java.util.Map configs = CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)resources.map((Function1 & Serializable)x$2 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$2), (Object)CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)Predef$.MODULE$.Set().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new AlterConfigOp[]{alterOp}))).asJavaCollection()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava();
        destAdmin.incrementalAlterConfigs(configs).all().get();
        this.verifyFetchResponseSize(linkId, (Option<FetchResponseSize>)new Some((Object)new FetchResponseSize(5000, 10000)));
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDestinationClusterLinkQuotaWithBrokerRestart(String quorum, boolean coordinator) {
        void var26_27;
        void var25_26;
        void var28_33;
        Tuple2 tuple2;
        void var23_24;
        void var22_23;
        this.numPartitions_$eq(1);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        Properties x$12 = qual$3.createConfluentAdminClient$default$1();
        ConfluentAdmin confluentAdmin = qual$3.createConfluentAdminClient(x$12);
        AlterConfigOp alterOp = new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), "100"), AlterConfigOp.OpType.SET);
        java.util.Map<ConfigResource, Collection> configs = Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)Predef$.MODULE$.Set().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new AlterConfigOp[]{alterOp}))).asJavaCollection());
        confluentAdmin.incrementalAlterConfigs(configs).all().get();
        confluentAdmin.close();
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        Tuple2<Object, Object> tuple22 = this.destCluster().shutdownLeader(tp);
        if (tuple22 == null) {
            throw new MatchError(null);
        }
        int oldLeaderId = tuple22._1$mcI$sp();
        int oldLeaderEpoch = tuple22._2$mcI$sp();
        Tuple2<Object, Object> tuple23 = this.destCluster().waitForLeaderChange(tp, (int)var22_23, (int)var23_24);
        if (tuple23 == null) {
            throw new MatchError(null);
        }
        int newLeaderId = tuple23._1$mcI$sp();
        int newLeaderEpoch = tuple23._2$mcI$sp();
        this.destCluster().startBroker((int)var22_23);
        long l = 100L;
        long computeUntilTrue_waitTime = 15000L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            Set computeUntilTrue_result;
            if ((computeUntilTrue_result = ClusterLinkQuotaIntegrationTest.$anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(this, tp)).contains((Object)BoxesRunTime.boxToInteger((int)var22_23))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        Object var41_31 = null;
        Tuple2 tuple24 = tuple2;
        if (tuple24 == null) {
            throw new MatchError(null);
        }
        Set isr = (Set)tuple24._1();
        Assertions.assertTrue((boolean)tuple24._2$mcZ$sp(), (String)new StringBuilder(42).append("Broker ").append((int)var22_23).append(" is not part of ISR ").append(var28_33).append(" for partition ").append(tp).toString());
        this.destCluster().updateBootstrapServers();
        ClusterLinkTestHarness qual$4 = this.destCluster();
        Properties x$13 = qual$4.createConfluentAdminClient$default$1();
        confluentAdmin = qual$4.createConfluentAdminClient(x$13);
        long l2 = 15000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l3 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                try {
                    confluentAdmin.electLeaders(ElectionType.PREFERRED, Collections.singleton(tp)).all().get(15L, TimeUnit.SECONDS);
                }
                catch (Throwable throwable) {
                    Assertions.fail((String)"Preferred leader election failed");
                }
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l3).append(", and then retrying.").toString();
                    Object var48_42 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l3);
                l3 += package$.MODULE$.min(l3, 1000L);
                continue;
            }
            break;
        }
        Object var42_37 = null;
        Object var47_41 = null;
        Assertions.assertEquals((int)this.destCluster().waitForLeaderChange(tp, (int)var25_26, (int)var26_27)._1$mcI$sp(), (int)var22_23, (String)"Preferred leader not elected");
        ClusterLinkTestHarness qual$5 = this.sourceCluster();
        ByteArraySerializer x$14 = qual$5.createProducer$default$1();
        ByteArraySerializer x$15 = qual$5.createProducer$default$2();
        Properties x$16 = qual$5.createProducer$default$3();
        this.produceUntil(qual$5.createProducer(x$14, x$15, x$16), (Function0<Object>)(JFunction0.mcZ.sp & Serializable)() -> this.destClusterLinkReplicasThrottled(), "Destination quota not applied after broker restart");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testDestinationLagLinkFetcherThrottle(String quorum, boolean coordinator) {
        this.numPartitions_$eq(2);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$12 = this.linkName();
        Map x$13 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), (Object)"-2")}));
        Seq<KafkaBroker> x$14 = qual$3.alterClusterLink$default$3();
        Set<String> x$15 = qual$3.alterClusterLink$default$4();
        boolean x$16 = qual$3.alterClusterLink$default$5();
        qual$3.alterClusterLink(x$12, (Map<String, String>)x$13, x$14, x$15, x$16);
        this.produceToSourceCluster(30);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        Assertions.assertTrue((this.totalKafkaMetricValue(this.destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", this.totalKafkaMetricValue$default$3(), this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5()) == 0.0 ? 1 : 0) != 0);
        ClusterLinkTestHarness qual$4 = this.destCluster();
        String x$17 = this.linkName();
        Map x$18 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), (Object)"-1")}));
        Seq<KafkaBroker> x$19 = qual$4.alterClusterLink$default$3();
        Set<String> x$20 = qual$4.alterClusterLink$default$4();
        boolean x$21 = qual$4.alterClusterLink$default$5();
        qual$4.alterClusterLink(x$17, (Map<String, String>)x$18, x$19, x$20, x$21);
        this.produceToSourceCluster(30);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        Assertions.assertTrue((this.totalKafkaMetricValue(this.destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", this.totalKafkaMetricValue$default$3(), this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5()) == 0.0 ? 1 : 0) != 0);
        ClusterLinkTestHarness qual$5 = this.destCluster();
        String x$22 = this.linkName();
        Map x$23 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), (Object)"10485760")}));
        Seq<KafkaBroker> x$24 = qual$5.alterClusterLink$default$3();
        Set<String> x$25 = qual$5.alterClusterLink$default$4();
        boolean x$26 = qual$5.alterClusterLink$default$5();
        qual$5.alterClusterLink(x$22, (Map<String, String>)x$23, x$24, x$25, x$26);
        this.produceToSourceCluster(30);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        Assertions.assertTrue((this.totalKafkaMetricValue(this.destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", this.totalKafkaMetricValue$default$3(), this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5()) == 0.0 ? 1 : 0) != 0);
        ClusterLinkTestHarness qual$6 = this.destCluster();
        String x$27 = this.linkName();
        Map x$28 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), (Object)"0")}));
        Seq<KafkaBroker> x$29 = qual$6.alterClusterLink$default$3();
        Set<String> x$30 = qual$6.alterClusterLink$default$4();
        boolean x$31 = qual$6.alterClusterLink$default$5();
        qual$6.alterClusterLink(x$27, (Map<String, String>)x$28, x$29, x$30, x$31);
        this.produceToSourceCluster(30);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        Assertions.assertTrue((this.totalKafkaMetricValue(this.destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", this.totalKafkaMetricValue$default$3(), this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5()) > 0.0 ? 1 : 0) != 0);
        Assertions.assertEquals((double)2.0, (double)this.totalKafkaMetricValue(this.destCluster().aliveServers(), "link-fetcher-count", this.totalKafkaMetricValue$default$3(), this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testSourceClusterQuota(String quorum, boolean coordinator) {
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Uuid linkId = this.createClusterLink(this.linkName(), this.destLinkProps((Map<String, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)KafkaConfig$.MODULE$.ReplicaFetchMaxBytesProp()), (Object)"100")})))), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$8 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$8, x$9, x$10, x$11);
        ClusterLinkTestHarness qual$3 = this.sourceCluster();
        Properties x$12 = qual$3.createConfluentAdminClient$default$1();
        ConfluentAdmin sourceAdmin = qual$3.createConfluentAdminClient(x$12);
        this.verifyQuota((Function1<Object, BoxedUnit>)(JFunction1.mcVJ.sp & Serializable)byteRate -> this.setQuota$1(byteRate, sourceAdmin), (Function0<Object>)(JFunction0.mcZ.sp & Serializable)() -> this.throttled$1(), false);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyBasicLinkMetrics(linkId, this.verifyBasicLinkMetrics$default$2(), this.verifyBasicLinkMetrics$default$3());
    }

    public Uuid verifyDestinationClusterLinkQuota(Seq<ConfigResource> resources, ConfluentConfigs.ClusterLinkQuotaMode quotaMode) {
        this.numPartitions_$eq(1);
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        Properties x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5, x$6);
        Uuid linkId = this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4(), this.createClusterLink$default$5());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$7 = this.topic();
        short x$82 = this.replicationFactor();
        String x$9 = this.linkName();
        Map<String, String> x$10 = qual$2.linkTopic$default$4();
        String x$11 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$7, x$82, x$9, x$10, x$11);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        Properties x$12 = qual$3.createConfluentAdminClient$default$1();
        ConfluentAdmin destAdmin = qual$3.createConfluentAdminClient(x$12);
        AlterConfigOp alterModeOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", quotaMode.toString()), AlterConfigOp.OpType.SET);
        java.util.Map configs = CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)resources.map((Function1 & Serializable)x$8 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$8), (Object)CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)Predef$.MODULE$.Set().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new AlterConfigOp[]{alterModeOp}))).asJavaCollection()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava();
        destAdmin.incrementalAlterConfigs(configs).all().get();
        this.verifyQuota((Function1<Object, BoxedUnit>)(JFunction1.mcVJ.sp & Serializable)byteRate -> ClusterLinkQuotaIntegrationTest.setQuota$2(byteRate, resources, destAdmin), (Function0<Object>)(JFunction0.mcZ.sp & Serializable)() -> this.destClusterLinkReplicasThrottled(), true);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyBasicLinkMetrics(linkId, this.verifyBasicLinkMetrics$default$2(), this.verifyBasicLinkMetrics$default$3());
        this.verifyQuotaMode(quotaMode);
        return linkId;
    }

    /*
     * WARNING - void declaration
     */
    public void verifyQuotaMode(ConfluentConfigs.ClusterLinkQuotaMode quotaMode) {
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        Properties x$1 = qual$1.createConfluentAdminClient$default$1();
        ConfluentAdmin admin = qual$1.createConfluentAdminClient(x$1);
        AlterConfigOp alterModeOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", quotaMode.toString()), AlterConfigOp.OpType.SET);
        AlterConfigOp alterProduceQuotaOp = new AlterConfigOp(new ConfigEntry("producer_byte_rate", "100000"), AlterConfigOp.OpType.SET);
        java.util.Map<ConfigResource, Collection> configs = Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)Predef$.MODULE$.Set().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new AlterConfigOp[]{alterModeOp, alterProduceQuotaOp}))).asJavaCollection());
        admin.incrementalAlterConfigs(configs).all().get();
        KafkaBroker broker = this.sourceCluster().partitionLeader(new TopicPartition(this.topic(), 0));
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkQuotaIntegrationTest.$anonfun$verifyQuotaMode$1(broker, quotaMode)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Quota mode not updated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.produceToSourceCluster(20);
        this.verifyClusterLinkQuotaMetrics((Seq<KafkaBroker>)new .colon.colon((Object)broker, (List)Nil$.MODULE$), quotaMode.equals(ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND));
    }

    private boolean destClusterLinkReplicasThrottled() {
        return this.yammerMetricMaxValue("kafka.server:type=ReplicaManager,name=ThrottledClusterLinkReplicasPerSec", (Option<String>)None$.MODULE$) > 0.0;
    }

    /*
     * WARNING - void declaration
     */
    private void verifyFetchResponseSize(Uuid linkId, Option<FetchResponseSize> expectedSize) {
        ClusterLinkFetcherManager fetcherManager = (ClusterLinkFetcherManager)((IterableOps)((IterableOps)this.destCluster().brokers().map((Function1 & Serializable)x$10 -> (ClusterLinkFetcherManager)x$10.clusterLinkManager().fetcherManager(linkId).get())).filter((Function1 & Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkQuotaIntegrationTest.$anonfun$verifyFetchResponseSize$2(x$11)))).head();
        LeaderEndPoint fetcherThreadLeaderEndPoint = ((ClusterLinkFetcher)((HashMap)TestUtils.fieldValue((Object)fetcherManager, AbstractFetcherManager.class, (String)"fetcherThreadMap")).values().head()).leader();
        Object expectedFetchSize = expectedSize.map((Function1 & Serializable)x$12 -> BoxesRunTime.boxToInteger((int)x$12.perPartitionSize())).getOrElse((Function0 & Serializable)() -> fetcherManager.currentConfig().replicaFetchMaxBytes());
        Object expectedFetchResponseSize = expectedSize.map((Function1 & Serializable)x$13 -> BoxesRunTime.boxToInteger((int)x$13.responseSize())).getOrElse((Function0 & Serializable)() -> fetcherManager.currentConfig().replicaFetchResponseMaxBytes());
        long l = 100L;
        long computeUntilTrue_waitTime = 15000L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            int n = BoxesRunTime.unboxToInt((Object)TestUtils.fieldValue((Object)fetcherThreadLeaderEndPoint, RemoteLeaderEndPoint.class, (String)"fetchSize"));
            Integer computeUntilTrue_result = BoxesRunTime.boxToInteger((int)n);
            if (ClusterLinkQuotaIntegrationTest.$anonfun$verifyFetchResponseSize$8(expectedFetchSize, n)) {
                Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        Object var13_11 = null;
        Assertions.assertEquals((Object)expectedFetchSize, (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)TestUtils.fieldValue((Object)fetcherThreadLeaderEndPoint, RemoteLeaderEndPoint.class, (String)"fetchSize"))));
        long l2 = 100L;
        long computeUntilTrue_waitTime2 = 15000L;
        long computeUntilTrue_startTime2 = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            int n = ClusterLinkQuotaIntegrationTest.fetchResponseSize$1(fetcherThreadLeaderEndPoint);
            Integer computeUntilTrue_result = BoxesRunTime.boxToInteger((int)n);
            if (ClusterLinkQuotaIntegrationTest.$anonfun$verifyFetchResponseSize$10(expectedFetchResponseSize, n)) {
                Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime2 + computeUntilTrue_waitTime2) {
                Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime2), (long)computeUntilTrue_pause));
        }
        Object var20_15 = null;
        Assertions.assertEquals((Object)expectedFetchResponseSize, (Object)BoxesRunTime.boxToInteger((int)ClusterLinkQuotaIntegrationTest.fetchResponseSize$1(fetcherThreadLeaderEndPoint)));
    }

    public static final /* synthetic */ int $anonfun$testDestinationClusterLinkBrokerLevelQuota$1(KafkaBroker x$1) {
        return x$1.config().brokerId();
    }

    public static final /* synthetic */ ConfigResource $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(int brokerId) {
        return new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId));
    }

    public static final /* synthetic */ Set $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(ClusterLinkQuotaIntegrationTest $this, TopicPartition tp$1) {
        return ((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(((TopicPartitionInfo)$this.destCluster().describeTopic($this.topic()).partitions().get(tp$1.partition())).isr()).asScala().map((Function1 & Serializable)x$5 -> BoxesRunTime.boxToInteger((int)x$5.id()))).toSet();
    }

    public static final /* synthetic */ void $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$4(ObjectRef destAdmin$1, TopicPartition tp$1) {
        try {
            ((ConfluentAdmin)destAdmin$1.elem).electLeaders(ElectionType.PREFERRED, Collections.singleton(tp$1)).all().get(15L, TimeUnit.SECONDS);
            return;
        }
        catch (Throwable throwable) {
            Assertions.fail((String)"Preferred leader election failed");
            return;
        }
    }

    private final void setQuota$1(long byteRate, ConfluentAdmin sourceAdmin$1) {
        ClientQuotaEntity quotaUser = new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"user"), (Object)this.linkUserName(this.linkName()))}))).asJava());
        ClientQuotaAlteration.Op quotaOp = new ClientQuotaAlteration.Op(DynamicConfig.Client$.MODULE$.ConsumerByteRateOverrideProp(), Predef$.MODULE$.double2Double((double)byteRate));
        ClientQuotaAlteration quota = new ClientQuotaAlteration(quotaUser, Collections.singleton(quotaOp));
        sourceAdmin$1.alterClientQuotas(Collections.singleton(quota)).all().get(15L, TimeUnit.SECONDS);
    }

    private final boolean throttled$1() {
        return this.kafkaMetricMaxValue((Seq<KafkaBroker>)this.destCluster().brokers(), "fetch-throttle-time-max", "cluster-link", (Option<String>)new Some((Object)this.linkName()), this.kafkaMetricMaxValue$default$5(), this.kafkaMetricMaxValue$default$6(), this.kafkaMetricMaxValue$default$7(), this.kafkaMetricMaxValue$default$8()) > 0.0;
    }

    private static final void setQuota$2(long byteRate, Seq resources$1, ConfluentAdmin destAdmin$2) {
        AlterConfigOp alterOp = new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), Long.toString(byteRate)), AlterConfigOp.OpType.SET);
        java.util.Map configs = CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)resources$1.map((Function1 & Serializable)x$9 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$9), (Object)CollectionConverters$.MODULE$.IterableHasAsJava((Iterable)Predef$.MODULE$.Set().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new AlterConfigOp[]{alterOp}))).asJavaCollection()))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava();
        destAdmin$2.incrementalAlterConfigs(configs).all().get();
    }

    public static final /* synthetic */ boolean $anonfun$verifyQuotaMode$1(KafkaBroker broker$1, ConfluentConfigs.ClusterLinkQuotaMode quotaMode$1) {
        ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode = broker$1.config().clusterLinkQuotaMode();
        return !(clusterLinkQuotaMode != null ? !clusterLinkQuotaMode.equals(quotaMode$1) : quotaMode$1 != null);
    }

    public static final /* synthetic */ String $anonfun$verifyQuotaMode$2() {
        return "Quota mode not updated";
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$2(ClusterLinkFetcherManager x$11) {
        return x$11.fetcherCount() > 0;
    }

    private static final int fetchSize$1(LeaderEndPoint fetcherThreadLeaderEndPoint$1) {
        return BoxesRunTime.unboxToInt((Object)TestUtils.fieldValue((Object)fetcherThreadLeaderEndPoint$1, RemoteLeaderEndPoint.class, (String)"fetchSize"));
    }

    private static final ClusterLinkLeaderRequestBuilder fetcherThreadLeaderRequestBuilder$1(LeaderEndPoint fetcherThreadLeaderEndPoint$1) {
        return (ClusterLinkLeaderRequestBuilder)TestUtils.fieldValue((Object)fetcherThreadLeaderEndPoint$1, ClusterLinkLeaderEndPoint.class, (String)"requestBuilder");
    }

    private static final int fetchResponseSize$1(LeaderEndPoint fetcherThreadLeaderEndPoint$1) {
        return BoxesRunTime.unboxToInt((Object)TestUtils.fieldValue((Object)((ClusterLinkLeaderRequestBuilder)TestUtils.fieldValue((Object)fetcherThreadLeaderEndPoint$1, ClusterLinkLeaderEndPoint.class, (String)"requestBuilder")), ClusterLinkLeaderRequestBuilder.class, (String)"fetchResponseSize"));
    }

    public static final /* synthetic */ int $anonfun$verifyFetchResponseSize$7(LeaderEndPoint fetcherThreadLeaderEndPoint$1) {
        return ClusterLinkQuotaIntegrationTest.fetchSize$1(fetcherThreadLeaderEndPoint$1);
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$8(Object expectedFetchSize$1, int x$14) {
        return BoxesRunTime.equals((Object)BoxesRunTime.boxToInteger((int)x$14), (Object)expectedFetchSize$1);
    }

    public static final /* synthetic */ int $anonfun$verifyFetchResponseSize$9(LeaderEndPoint fetcherThreadLeaderEndPoint$1) {
        return ClusterLinkQuotaIntegrationTest.fetchResponseSize$1(fetcherThreadLeaderEndPoint$1);
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$10(Object expectedFetchResponseSize$1, int x$15) {
        return BoxesRunTime.equals((Object)BoxesRunTime.boxToInteger((int)x$15), (Object)expectedFetchResponseSize$1);
    }

    public static final /* synthetic */ Object $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$3$adapted(int oldLeaderId$1, Set x$6) {
        return BoxesRunTime.boxToBoolean((boolean)x$6.contains((Object)BoxesRunTime.boxToInteger((int)oldLeaderId$1)));
    }
}

