/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.log.LogConfig$;
import kafka.log.LogManager;
import kafka.server.DynamicConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.QuotaType;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.math.Numeric;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001de\u0001B\u0001\u0003\u0001\u001d\u0011QCU3qY&\u001c\u0017\r^5p]F+x\u000e^1t)\u0016\u001cHO\u0003\u0002\u0004\t\u000511/\u001a:wKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\u0002\u0005\u0002\n\u00195\t!B\u0003\u0002\f\t\u0005\u0011!p[\u0005\u0003\u001b)\u0011ACW8p\u0017\u0016,\u0007/\u001a:UKN$\b*\u0019:oKN\u001c\b\"B\b\u0001\t\u0003\u0001\u0012A\u0002\u001fj]&$h\bF\u0001\u0012!\t\u0011\u0002!D\u0001\u0003\u0011\u0015!\u0002\u0001\"\u0001\u0016\u00031\u0001XM]2f]R,%O]8s)\r1B$\t\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\u0005\u0019>tw\rC\u0003\u001e'\u0001\u0007a$A\u0004qKJ\u001cWM\u001c;\u0011\u0005]y\u0012B\u0001\u0011\u0019\u0005\rIe\u000e\u001e\u0005\u0006EM\u0001\rAF\u0001\u0006m\u0006dW/\u001a\u0005\bI\u0001\u0011\r\u0011\"\u0001&\u0003!i7oZ\u00191a-\u0013U#\u0001\u0014\u0011\u0007]9\u0013&\u0003\u0002)1\t)\u0011I\u001d:bsB\u0011qCK\u0005\u0003Wa\u0011AAQ=uK\"1Q\u0006\u0001Q\u0001\n\u0019\n\u0011\"\\:hcA\u00024J\u0011\u0011\t\u000f=\u0002\u0001\u0019!C\u0001a\u00059!M]8lKJ\u001cX#A\u0019\u0011\u0007IRTH\u0004\u00024q9\u0011AgN\u0007\u0002k)\u0011aGB\u0001\u0007yI|w\u000e\u001e \n\u0003eI!!\u000f\r\u0002\u000fA\f7m[1hK&\u00111\b\u0010\u0002\u0004'\u0016\f(BA\u001d\u0019!\t\u0011b(\u0003\u0002@\u0005\tY1*\u00194lCN+'O^3s\u0011\u001d\t\u0005\u00011A\u0005\u0002\t\u000b1B\u0019:pW\u0016\u00148o\u0018\u0013fcR\u00111I\u0012\t\u0003/\u0011K!!\u0012\r\u0003\tUs\u0017\u000e\u001e\u0005\b\u000f\u0002\u000b\t\u00111\u00012\u0003\rAH%\r\u0005\u0007\u0013\u0002\u0001\u000b\u0015B\u0019\u0002\u0011\t\u0014xn[3sg\u0002Bqa\u0013\u0001C\u0002\u0013\u0005A*A\u0003u_BL7-F\u0001N!\tq5+D\u0001P\u0015\t\u0001\u0016+\u0001\u0003mC:<'\"\u0001*\u0002\t)\fg/Y\u0005\u0003)>\u0013aa\u0015;sS:<\u0007B\u0002,\u0001A\u0003%Q*\u0001\u0004u_BL7\r\t\u0005\b1\u0002\u0001\r\u0011\"\u0001Z\u0003!\u0001(o\u001c3vG\u0016\u0014X#\u0001.\u0011\tm#gEJ\u0007\u00029*\u0011\u0001,\u0018\u0006\u0003=~\u000bqa\u00197jK:$8O\u0003\u0002\u0006A*\u0011\u0011MY\u0001\u0007CB\f7\r[3\u000b\u0003\r\f1a\u001c:h\u0013\t)GLA\u0007LC\u001a\\\u0017\r\u0015:pIV\u001cWM\u001d\u0005\bO\u0002\u0001\r\u0011\"\u0001i\u00031\u0001(o\u001c3vG\u0016\u0014x\fJ3r)\t\u0019\u0015\u000eC\u0004HM\u0006\u0005\t\u0019\u0001.\t\r-\u0004\u0001\u0015)\u0003[\u0003%\u0001(o\u001c3vG\u0016\u0014\b\u0005C\u0003n\u0001\u0011\u0005c.\u0001\u0005uK\u0006\u0014Hi\\<o)\u0005\u0019\u0005F\u00017q!\t\tH/D\u0001s\u0015\t\u0019(-A\u0003kk:LG/\u0003\u0002ve\n)\u0011I\u001a;fe\")q\u000f\u0001C\u0001]\u0006Y3\u000f[8vY\u0012\u0014un\u001c;tiJ\f\u0007\u000fV<p\u0005J|7.\u001a:t/&$\b\u000eT3bI\u0016\u0014H\u000b\u001b:piRdW\r\u000b\u0002wsB\u0011\u0011O_\u0005\u0003wJ\u0014A\u0001V3ti\")Q\u0010\u0001C\u0001]\u0006i3\u000f[8vY\u0012\u0014un\u001c;tiJ\f\u0007\u000fV<p\u0005J|7.\u001a:t/&$\bNR8mY><XM\u001d+ie>$H\u000f\\3)\u0005qL\bbBA\u0001\u0001\u0011\u0005\u00111A\u00017g\"|W\u000f\u001c3NCR\u001c\u0007.U;pi\u0006\u0014V\r\u001d7jG\u0006$\u0018N\\4UQJ|Wo\u001a5B]\u0006\u001b\u00180\\7fiJL7\rV8q_2|w-\u001f\u000b\u0004\u0007\u0006\u0015\u0001bBA\u0004\u007f\u0002\u0007\u0011\u0011B\u0001\u000fY\u0016\fG-\u001a:UQJ|G\u000f\u001e7f!\r9\u00121B\u0005\u0004\u0003\u001bA\"a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003#\u0001A\u0011AA\n\u0003\t!\b\u000f\u0006\u0003\u0002\u0016\u0005\u0005\u0002\u0003BA\f\u0003;i!!!\u0007\u000b\u0007\u0005mq,\u0001\u0004d_6lwN\\\u0005\u0005\u0003?\tIB\u0001\bU_BL7\rU1si&$\u0018n\u001c8\t\u000f\u0005\r\u0012q\u0002a\u0001=\u0005I\u0001/\u0019:uSRLwN\u001c\u0005\u0007\u0003O\u0001A\u0011\u00018\u00023MDw.\u001e7e)\"\u0014x\u000e\u001e;mK>cGmU3h[\u0016tGo\u001d\u0015\u0004\u0003KI\bbBA\u0017\u0001\u0011\u0005\u0011qF\u0001\bC\u0012$G)\u0019;b)\u0015\u0019\u0015\u0011GA\u001b\u0011\u001d\t\u0019$a\u000bA\u0002y\t\u0001\"\\:h\u0007>,h\u000e\u001e\u0005\b\u0003o\tY\u00031\u0001'\u0003\ri7o\u001a\u0005\b\u0003w\u0001A\u0011BA\u001f\u0003U9\u0018-\u001b;G_J|eMZ:fiN$v.T1uG\"$raQA \u0003\u0007\n9\u0005C\u0004\u0002B\u0005e\u0002\u0019\u0001\u0010\u0002\r=4gm]3u\u0011\u001d\t)%!\u000fA\u0002y\t1\u0002]1si&$\u0018n\u001c8JI\"9\u0011\u0011JA\u001d\u0001\u0004q\u0012\u0001\u00032s_.,'/\u00133\t\u000f\u00055\u0003\u0001\"\u0003\u0002P\u0005I!M]8lKJ4uN\u001d\u000b\u0004{\u0005E\u0003bBA*\u0003\u0017\u0002\rAH\u0001\u0003S\u0012Dq!a\u0016\u0001\t\u0003\tI&A\u0007de\u0016\fG/\u001a\"s_.,'o\u001d\u000b\u0004\u0007\u0006m\u0003\u0002CA/\u0003+\u0002\r!a\u0018\u0002\u0013\t\u0014xn[3s\u0013\u0012\u001c\bc\u0001\u001a;=!9\u00111\r\u0001\u0005\n\u0005\u0015\u0014AB1w%\u0006$X\r\u0006\u0004\u0002h\u00055\u0014q\u000f\t\u0004/\u0005%\u0014bAA61\t1Ai\\;cY\u0016D\u0001\"a\u001c\u0002b\u0001\u0007\u0011\u0011O\u0001\u0010e\u0016\u0004H.[2bi&|g\u000eV=qKB\u0019!#a\u001d\n\u0007\u0005U$AA\u0005Rk>$\u0018\rV=qK\"9q&!\u0019A\u0002\u0005}\u0003bBA>\u0001\u0011%\u0011QP\u0001\r[\u0016\f7/\u001e:fIJ\u000bG/\u001a\u000b\u0007\u0003O\ny(a!\t\u000f\u0005\u0005\u0015\u0011\u0010a\u0001{\u00051!M]8lKJD\u0001\"!\"\u0002z\u0001\u0007\u0011\u0011O\u0001\be\u0016\u0004H+\u001f9f\u0001")
public class ReplicationQuotasTest
extends ZooKeeperTestHarness {
    private final byte[] msg100KB = new byte[100000];
    private Seq<KafkaServer> brokers = null;
    private final String topic;
    private KafkaProducer<byte[], byte[]> producer = null;

    public long percentError(int percent, long value) {
        return Math.round(value * (long)percent / 100L);
    }

    public byte[] msg100KB() {
        return this.msg100KB;
    }

    public Seq<KafkaServer> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(Seq<KafkaServer> x$1) {
        this.brokers = x$1;
    }

    public String topic() {
        return this.topic;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    @Override
    @After
    public void tearDown() {
        this.producer().close();
        TestUtils$.MODULE$.shutdownServers(this.brokers());
        super.tearDown();
    }

    @Test
    public void shouldBootstrapTwoBrokersWithLeaderThrottle() {
        this.shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(true);
    }

    @Test
    public void shouldBootstrapTwoBrokersWithFollowerThrottle() {
        this.shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(false);
    }

    public void shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(boolean leaderThrottle) {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 105).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        scala.collection.immutable.Map assignment = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 106}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{101, 106}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)2)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{102, 106}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)3)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{103, 107}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)4)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{104, 107}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)5)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{105, 107}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)6)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 106}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)7)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{101, 107})))}));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)assignment, this.brokers());
        byte[] msg = this.msg100KB();
        int msgCount = 100;
        int expectedDuration = 10;
        LongRef throttle = LongRef.create((long)(msgCount * msg.length / expectedDuration));
        if (!leaderThrottle) {
            throttle.elem *= 3L;
        }
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 107).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)brokerId -> this.adminZkClient().changeBrokerConfig((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{brokerId})), CoreUtils$.MODULE$.propsWith((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)DynamicConfig.Broker$.MODULE$.LeaderReplicationThrottledRateProp(), (Object)((Object)BoxesRunTime.boxToLong((long)throttle$1.elem)).toString()), new Tuple2((Object)DynamicConfig.Broker$.MODULE$.FollowerReplicationThrottledRateProp(), (Object)((Object)BoxesRunTime.boxToLong((long)throttle$1.elem)).toString())}))));
        if (leaderThrottle) {
            this.adminZkClient().changeTopicConfig(this.topic(), CoreUtils$.MODULE$.propsWith(LogConfig$.MODULE$.LeaderReplicationThrottledReplicasProp(), "0:100,1:101,2:102,3:103,4:104,5:105"));
        } else {
            this.adminZkClient().changeTopicConfig(this.topic(), CoreUtils$.MODULE$.propsWith(LogConfig$.MODULE$.FollowerReplicationThrottledReplicasProp(), "0:106,1:106,2:106,3:107,4:107,5:107"));
        }
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), msgCount).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$1 -> RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 7).foreach((Function1 & Serializable & scala.Serializable)partition -> ReplicationQuotasTest.$anonfun$shouldMatchQuotaReplicatingThroughAnAsymmetricTopology$4(this, msg, BoxesRunTime.unboxToInt((Object)partition))));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)id -> this.waitForOffsetsToMatch(msgCount, id, 100 + id));
        this.waitForOffsetsToMatch(msgCount, 6, 100);
        this.waitForOffsetsToMatch(msgCount, 7, 101);
        long start = System.currentTimeMillis();
        this.createBrokers((Seq<Object>)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(106), 107));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(106), 107).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)brokerId -> Assert.assertEquals((long)throttle$1.elem, (long)this.brokerFor(brokerId).quotaManagers().follower().upperBound()));
        if (!leaderThrottle) {
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> Assert.assertTrue((boolean)this.brokerFor(106).quotaManagers().follower().isThrottled(this.tp(partition))));
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(3), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partition -> Assert.assertTrue((boolean)this.brokerFor(107).quotaManagers().follower().isThrottled(this.tp(partition))));
        }
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 7).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)id -> this.waitForOffsetsToMatch(msgCount, id, 100 + id));
        long unthrottledTook = System.currentTimeMillis() - start;
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)id -> this.waitForOffsetsToMatch(msgCount, id, 106));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(3), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)id -> this.waitForOffsetsToMatch(msgCount, id, 107));
        long throttledTook = System.currentTimeMillis() - start;
        double throttledLowerBound = (double)(expectedDuration * 1000) * 0.9;
        int throttledUpperBound = expectedDuration * 1000 * 3;
        Assert.assertTrue((String)new StringBuilder(12).append("Expected ").append(unthrottledTook).append(" < ").append(throttledLowerBound).toString(), ((double)unthrottledTook < throttledLowerBound ? 1 : 0) != 0);
        Assert.assertTrue((String)new StringBuilder(12).append("Expected ").append(throttledTook).append(" > ").append(throttledLowerBound).toString(), ((double)throttledTook > throttledLowerBound ? 1 : 0) != 0);
        Assert.assertTrue((String)new StringBuilder(12).append("Expected ").append(throttledTook).append(" < ").append(throttledUpperBound).toString(), (throttledTook < (long)throttledUpperBound ? 1 : 0) != 0);
        double rateUpperBound = (double)throttle.elem * 1.1;
        double rateLowerBound = (double)throttle.elem * 0.5;
        double rate = leaderThrottle ? this.avRate((QuotaType)QuotaType.LeaderReplication$.MODULE$, (Seq<Object>)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 105)) : this.avRate((QuotaType)QuotaType.FollowerReplication$.MODULE$, (Seq<Object>)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(106), 107));
        Assert.assertTrue((String)new StringBuilder(12).append("Expected ").append(rate).append(" < ").append(rateUpperBound).toString(), (rate < rateUpperBound ? 1 : 0) != 0);
        Assert.assertTrue((String)new StringBuilder(12).append("Expected ").append(rate).append(" > ").append(rateLowerBound).toString(), (rate > rateLowerBound ? 1 : 0) != 0);
    }

    public TopicPartition tp(int partition) {
        return new TopicPartition(this.topic(), partition);
    }

    @Test
    public void shouldThrottleOldSegments() {
        Properties config = TestUtils$.MODULE$.createBrokerConfig(100, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        config.put("log.segment.bytes", ((Object)BoxesRunTime.boxToInteger((int)0x100000)).toString());
        this.brokers_$eq((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(config), TestUtils$.MODULE$.createServer$default$2())}))));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        byte[] msg = this.msg100KB();
        int msgCount = 200;
        int expectedDuration = 4;
        long throttle = msg.length * msgCount / expectedDuration;
        this.adminZkClient().changeBrokerConfig((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100})), CoreUtils$.MODULE$.propsWith(DynamicConfig.Broker$.MODULE$.LeaderReplicationThrottledRateProp(), ((Object)BoxesRunTime.boxToLong((long)throttle)).toString()));
        this.adminZkClient().changeTopicConfig(this.topic(), CoreUtils$.MODULE$.propsWith(LogConfig$.MODULE$.LeaderReplicationThrottledReplicasProp(), "0:100"));
        this.addData(msgCount, msg);
        long start = System.currentTimeMillis();
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting new broker");
        this.brokers_$eq((Seq<KafkaServer>)((Seq)this.brokers().$colon$plus((Object)TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(101, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18())), TestUtils$.MODULE$.createServer$default$2()), Seq$.MODULE$.canBuildFrom())));
        this.waitForOffsetsToMatch(msgCount, 0, 101);
        long throttledTook = System.currentTimeMillis() - start;
        Assert.assertTrue((String)new StringBuilder(42).append("Throttled replication of ").append(throttledTook).append("ms should be > ").append((double)(expectedDuration * 1000) * 0.9).append("ms").toString(), ((double)throttledTook > (double)(expectedDuration * 1000) * 0.9 ? 1 : 0) != 0);
        Assert.assertTrue((String)new StringBuilder(42).append("Throttled replication of ").append(throttledTook).append("ms should be < ").append(expectedDuration * 1500).append("ms").toString(), ((double)throttledTook < (double)(expectedDuration * 1000) * 1.5 ? 1 : 0) != 0);
    }

    public void addData(int msgCount, byte[] msg) {
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), 0, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15()));
        ((IterableLike)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), msgCount).map((Function1 & Serializable & scala.Serializable)x$2 -> ReplicationQuotasTest.$anonfun$addData$1(this, msg, BoxesRunTime.unboxToInt((Object)x$2)), IndexedSeq$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$3 -> (RecordMetadata)x$3.get());
        this.waitForOffsetsToMatch(msgCount, 0, 100);
    }

    private void waitForOffsetsToMatch(int offset, int partitionId, int brokerId) {
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            LogManager qual$1 = this.brokerFor(brokerId).getLogManager();
            TopicPartition x$7 = new TopicPartition(this.topic(), partitionId);
            boolean x$8 = qual$1.getLog$default$2();
            return BoxesRunTime.equals((Object)BoxesRunTime.boxToInteger((int)offset), (Object)qual$1.getLog(x$7, x$8).map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToLong((long)x$4.logEndOffset())).getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 0));
        }, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(47).append("Offsets did not match for partition ").append(partitionId).append(" on broker ").append(brokerId).toString(), 60000L, TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    private KafkaServer brokerFor(int id) {
        return (KafkaServer)((IterableLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ReplicationQuotasTest.$anonfun$brokerFor$1(id, x$5)))).head();
    }

    public void createBrokers(Seq<Object> brokerIds) {
        brokerIds.foreach((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)id -> this.brokers_$eq((Seq<KafkaServer>)((Seq)this.brokers().$colon$plus((Object)TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(id, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18())), TestUtils$.MODULE$.createServer$default$2()), Seq$.MODULE$.canBuildFrom()))));
    }

    private double avRate(QuotaType replicationType, Seq<Object> brokers) {
        return BoxesRunTime.unboxToDouble((Object)((TraversableOnce)((TraversableLike)brokers.map((Function1 & Serializable & scala.Serializable)id -> this.brokerFor(BoxesRunTime.unboxToInt((Object)id)), Seq$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToDouble((double)this.measuredRate(x$6, replicationType)), Seq$.MODULE$.canBuildFrom())).sum((Numeric)Numeric.DoubleIsFractional$.MODULE$)) / (double)brokers.length();
    }

    private double measuredRate(KafkaServer broker, QuotaType repType) {
        MetricName metricName = broker.metrics().metricName("byte-rate", repType.toString());
        return BoxesRunTime.unboxToDouble((Object)((KafkaMetric)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(broker.metrics().metrics()).asScala()).apply((Object)metricName)).metricValue());
    }

    public static final /* synthetic */ Future $anonfun$shouldMatchQuotaReplicatingThroughAnAsymmetricTopology$4(ReplicationQuotasTest $this, byte[] msg$1, int partition) {
        return $this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(partition), null, (Object)msg$1));
    }

    public static final /* synthetic */ Future $anonfun$addData$1(ReplicationQuotasTest $this, byte[] msg$2, int x$2) {
        return $this.producer().send(new ProducerRecord($this.topic(), (Object)msg$2));
    }

    public static final /* synthetic */ boolean $anonfun$brokerFor$1(int id$1, KafkaServer x$5) {
        return x$5.config().brokerId() == id$1;
    }

    public ReplicationQuotasTest() {
        this.topic = "topic1";
    }
}

