/*
 * Decompiled with CFR 0.152.
 */
package kafka.metrics;

import java.io.File;
import java.io.Serializable;
import java.util.Optional;
import java.util.Properties;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.LogManager;
import kafka.server.BrokerTopicStats$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.Set$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@Timeout(value=120L)
@ScalaSignature(bytes="\u0006\u0005\u0005=b\u0001B\b\u0011\u0001UAQA\t\u0001\u0005\u0002\rB\u0011B\n\u0001A\u0002\u0003\u0005\u000b\u0015B\u0014\t\u000fM\u0002!\u0019!C\u0001i!11\b\u0001Q\u0001\nUBq\u0001\u0010\u0001C\u0002\u0013\u0005A\u0007\u0003\u0004>\u0001\u0001\u0006I!\u000e\u0005\b}\u0001\u0011\r\u0011\"\u0001@\u0011\u0019A\u0005\u0001)A\u0005\u0001\")\u0011\n\u0001C\u0001\u0015\")\u0011\u000b\u0001C!%\"9q\f\u0001b\u0001\n\u0003!\u0004B\u00021\u0001A\u0003%Q\u0007C\u0003b\u0001\u0011\u0005#\rC\u0003m\u0001\u0011\u0005QN\u0001\u000fGKR\u001c\u0007N\u0012:p[\u001a{G\u000e\\8xKJlU\r\u001e:jGN$Vm\u001d;\u000b\u0005E\u0011\u0012aB7fiJL7m\u001d\u0006\u0002'\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\u00179A\u0011qCG\u0007\u00021)\u0011\u0011DE\u0001\fS:$Xm\u001a:bi&|g.\u0003\u0002\u001c1\t12*\u00194lCN+'O^3s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u0002\u001eA5\taD\u0003\u0002 %\u0005)Q\u000f^5mg&\u0011\u0011E\b\u0002\b\u0019><w-\u001b8h\u0003\u0019a\u0014N\\5u}Q\tA\u0005\u0005\u0002&\u00015\t\u0001#A\u0005`i\u0016\u001cH/\u00138g_B\u0011\u0001&M\u0007\u0002S)\u0011!fK\u0001\u0004CBL'B\u0001\u0017.\u0003\u001dQW\u000f]5uKJT!AL\u0018\u0002\u000b),h.\u001b;\u000b\u0003A\n1a\u001c:h\u0013\t\u0011\u0014F\u0001\u0005UKN$\u0018J\u001c4p\u0003!qW/\u001c(pI\u0016\u001cX#A\u001b\u0011\u0005YJT\"A\u001c\u000b\u0003a\nQa]2bY\u0006L!AO\u001c\u0003\u0007%sG/A\u0005ok6tu\u000eZ3tA\u0005Aa.^7QCJ$8/A\u0005ok6\u0004\u0016M\u001d;tA\u0005I\"/Z9vSJ,GmS1gW\u0006\u001cVM\u001d<feB\u0013XMZ5y+\u0005\u0001\u0005CA!G\u001b\u0005\u0011%BA\"E\u0003\u0011a\u0017M\\4\u000b\u0003\u0015\u000bAA[1wC&\u0011qI\u0011\u0002\u0007'R\u0014\u0018N\\4\u00025I,\u0017/^5sK\u0012\\\u0015MZ6b'\u0016\u0014h/\u001a:Qe\u00164\u0017\u000e\u001f\u0011\u0002\u001f=4XM\u001d:jI&tw\r\u0015:paN,\u0012a\u0013\t\u0003\u0019>k\u0011!\u0014\u0006\u0003\u001d\u0012\u000bA!\u001e;jY&\u0011\u0001+\u0014\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\u0018aD4f]\u0016\u0014\u0018\r^3D_:4\u0017nZ:\u0016\u0003M\u00032\u0001V,Z\u001b\u0005)&B\u0001,8\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u00031V\u00131aU3r!\tQV,D\u0001\\\u0015\ta&#\u0001\u0004tKJ4XM]\u0005\u0003=n\u00131bS1gW\u0006\u001cuN\u001c4jO\u0006Ia.T3tg\u0006<Wm]\u0001\u000b]6+7o]1hKN\u0004\u0013!B:fiV\u0003HCA2g!\t1D-\u0003\u0002fo\t!QK\\5u\u0011\u00159W\u00021\u0001(\u0003!!Xm\u001d;J]\u001a|\u0007FA\u0007j!\tA#.\u0003\u0002lS\tQ!)\u001a4pe\u0016,\u0015m\u00195\u0002IQ,7\u000f\u001e$fi\u000eDgI]8n\r>dGn\\<fe6+GO]5dg\nKH/Z:PkR$\"a\u00198\t\u000b=t\u0001\u0019\u00019\u0002\rE,xN];n!\t\t\bP\u0004\u0002smB\u00111oN\u0007\u0002i*\u0011Q\u000fF\u0001\u0007yI|w\u000e\u001e \n\u0005]<\u0014A\u0002)sK\u0012,g-\u0003\u0002Hs*\u0011qo\u000e\u0015\u0007\u001dm\f\u0019!!\u0002\u0011\u0005q|X\"A?\u000b\u0005y\\\u0013A\u00029be\u0006l7/C\u0002\u0002\u0002u\u0014\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0005\u0005\u001d\u0011AI>eSN\u0004H.Y=OC6,WPL>be\u001e,X.\u001a8ug^KG\u000f\u001b(b[\u0016\u001cX\u0010K\u0004\u000f\u0003\u0017\t9\"!\u0007\u0011\t\u00055\u00111C\u0007\u0003\u0003\u001fQ1!!\u0005~\u0003!\u0001(o\u001c<jI\u0016\u0014\u0018\u0002BA\u000b\u0003\u001f\u00111BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cH\u0006BA\u000e\u0003?\t#!!\b\u0002\u0005i\\\u0017EAA\u0011\u0003\u0015Y'/\u00194uQ\u001d\u0001\u0011QEA\u0016\u0003[\u00012\u0001KA\u0014\u0013\r\tI#\u000b\u0002\b)&lWm\\;u\u0003\u00151\u0018\r\\;f=\u0005A\b")
public class FetchFromFollowerMetricsTest
extends KafkaServerTestHarness {
    private TestInfo _testInfo;
    private final int numNodes;
    private final int numParts;
    private final String requiredKafkaServerPrefix;
    private final int nMessages;

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String requiredKafkaServerPrefix() {
        return this.requiredKafkaServerPrefix;
    }

    public Properties overridingProps() {
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.NumPartitionsProp(), Integer.toString(this.numParts()));
        props.put("metrics.jmx.exclude", new StringBuilder(10).append(this.requiredKafkaServerPrefix()).append("=ClusterId").toString());
        props.put(KafkaConfig$.MODULE$.ReplicaSelectorClassProp(), "org.apache.kafka.common.replica.RackAwareReplicaSelector");
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo)) {
            props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "false");
        }
        return props;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        scala.collection.mutable.Map rackInfo = (scala.collection.mutable.Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numNodes()).foreach((Function1 & Serializable)i -> FetchFromFollowerMetricsTest.$anonfun$generateConfigs$1(rackInfo, BoxesRunTime.unboxToInt((Object)i)));
        int x$1 = this.numNodes();
        String x$22 = this.zkConnectOrNull();
        boolean x$5 = true;
        None$ x$6 = None$.MODULE$;
        None$ x$7 = None$.MODULE$;
        None$ x$8 = None$.MODULE$;
        boolean x$9 = true;
        boolean x$10 = false;
        boolean x$11 = false;
        boolean x$12 = false;
        int x$13 = 1;
        boolean x$14 = false;
        int x$15 = 1;
        short x$16 = 1;
        int x$17 = 0;
        boolean x$18 = false;
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(x$1, x$22, false, x$5, (Option<SecurityProtocol>)x$6, (Option<File>)x$7, (Option<Properties>)x$8, x$9, x$10, x$11, x$12, (Map<Object, String>)rackInfo, x$13, x$14, x$15, x$16, x$17, x$18).map((Function1 & Serializable)x$2 -> {
            Properties fromProps_overrides = this.overridingProps();
            return KafkaConfig$.MODULE$.fromProps(x$2, fromProps_overrides, true);
        });
    }

    public int nMessages() {
        return this.nMessages;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        super.setUp(testInfo);
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testFetchFromFollowerMetricsBytesOut(String quorum) {
        String topic = "test-fetch-from-follower-bytes-out";
        int leaderBrokerId = 0;
        int followerBrokerId = 1;
        String totalBytesOut = new StringBuilder(12).append("name=").append(BrokerTopicStats$.MODULE$.BytesOutPerSec()).append(",topic=").append(topic).toString();
        String fetchFromFollowerBytesOut = new StringBuilder(12).append("name=").append(BrokerTopicStats$.MODULE$.FetchFromFollowerBytesOutPerSec()).append(",topic=").append(topic).toString();
        Properties topicConfig = new Properties();
        topicConfig.setProperty("min.insync.replicas", "2");
        this.createTopic(topic, 1, this.numNodes(), topicConfig, this.createTopic$default$5(), this.createTopic$default$6());
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, this.nMessages(), -1);
        Admin admin = TestUtils$.MODULE$.createAdminClient(this.brokers(), this.listenerName(), new Properties());
        TestUtils$.MODULE$.describeTopic(admin, topic).partitions().forEach(partition -> {
            if (partition.leader().id() != leaderBrokerId) {
                java.util.Map reassignment = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), Optional.of(new NewPartitionReassignment(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)Predef$.MODULE$.int2Integer(leaderBrokerId), (List)new .colon.colon((Object)Predef$.MODULE$.int2Integer(followerBrokerId), (List)Nil$.MODULE$))).asJava())))}))).asJava();
                admin.alterPartitionReassignments(reassignment).all().get();
                long l = 100L;
                long waitUntilTrue_waitTimeMs = 15000L;
                long waitUntilTrue_startTime = System.currentTimeMillis();
                while (!FetchFromFollowerMetricsTest.$anonfun$testFetchFromFollowerMetricsBytesOut$2(admin)) {
                    void waitUntilTrue_pause;
                    if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                        Assertions.fail((String)"reassignment did not complete.");
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
                }
                admin.electLeaders(ElectionType.PREFERRED, CollectionConverters$.MODULE$.SetHasAsJava((Set)Set$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{new TopicPartition(topic, 0)}))).asJava()).all().get();
                return;
            }
        });
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        this.brokers().foreach((Function1 & Serializable)broker -> {
            FetchFromFollowerMetricsTest.$anonfun$testFetchFromFollowerMetricsBytesOut$4(topicPartition, broker);
            return BoxedUnit.UNIT;
        });
        Buffer<KafkaBroker> x$3 = this.brokers();
        int x$5 = this.nMessages();
        String x$6 = Integer.toString(followerBrokerId);
        String x$7 = "group";
        SecurityProtocol x$8 = SecurityProtocol.PLAINTEXT;
        None$ x$9 = None$.MODULE$;
        long x$10 = 15000L;
        TestUtils$.MODULE$.consumeTopicRecords(x$3, topic, x$5, x$7, x$8, (Option<File>)x$9, x$10, x$6);
        long totalBytesOut0 = TestUtils$.MODULE$.meterCount(totalBytesOut);
        long fetchFromFollowerBytesOut0 = TestUtils$.MODULE$.meterCount(fetchFromFollowerBytesOut);
        long consumeFromLeaderBytesOut0 = totalBytesOut0 - fetchFromFollowerBytesOut0;
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, this.nMessages(), -1);
        Buffer<KafkaBroker> x$11 = this.brokers();
        int x$13 = this.nMessages();
        String x$14 = Integer.toString(followerBrokerId);
        String x$15 = "group";
        SecurityProtocol x$16 = SecurityProtocol.PLAINTEXT;
        None$ x$17 = None$.MODULE$;
        long x$18 = 15000L;
        TestUtils$.MODULE$.consumeTopicRecords(x$11, topic, x$13, x$15, x$16, (Option<File>)x$17, x$18, x$14);
        long totalBytesOut1 = TestUtils$.MODULE$.meterCount(totalBytesOut);
        Assertions.assertTrue((totalBytesOut1 > totalBytesOut0 ? 1 : 0) != 0);
        long fetchFromFollowerBytesOut1 = TestUtils$.MODULE$.meterCount(fetchFromFollowerBytesOut);
        Assertions.assertTrue((fetchFromFollowerBytesOut1 > fetchFromFollowerBytesOut0 ? 1 : 0) != 0);
        Assertions.assertTrue((fetchFromFollowerBytesOut1 == TestUtils$.MODULE$.meterCount(totalBytesOut) ? 1 : 0) != 0);
        long consumeFromLeaderBytesOut1 = totalBytesOut1 - fetchFromFollowerBytesOut1;
        Assertions.assertTrue((consumeFromLeaderBytesOut1 == consumeFromLeaderBytesOut0 ? 1 : 0) != 0);
        Assertions.assertTrue((consumeFromLeaderBytesOut1 < fetchFromFollowerBytesOut1 ? 1 : 0) != 0);
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, this.nMessages(), -1);
        Buffer<KafkaBroker> x$19 = this.brokers();
        int x$21 = this.nMessages();
        String x$22 = Integer.toString(leaderBrokerId);
        String x$23 = "group";
        SecurityProtocol x$24 = SecurityProtocol.PLAINTEXT;
        None$ x$25 = None$.MODULE$;
        long x$26 = 15000L;
        TestUtils$.MODULE$.consumeTopicRecords(x$19, topic, x$21, x$23, x$24, (Option<File>)x$25, x$26, x$22);
        long totalBytesOut2 = TestUtils$.MODULE$.meterCount(totalBytesOut);
        Assertions.assertTrue((totalBytesOut2 > totalBytesOut1 ? 1 : 0) != 0);
        long fetchFromFollowerBytesOut2 = TestUtils$.MODULE$.meterCount(fetchFromFollowerBytesOut);
        Assertions.assertTrue((fetchFromFollowerBytesOut2 == fetchFromFollowerBytesOut1 ? 1 : 0) != 0);
        Assertions.assertTrue((fetchFromFollowerBytesOut2 < totalBytesOut2 ? 1 : 0) != 0);
        Assertions.assertTrue((totalBytesOut2 - fetchFromFollowerBytesOut2 > consumeFromLeaderBytesOut1 ? 1 : 0) != 0);
        admin.close();
    }

    public static final /* synthetic */ scala.collection.mutable.Map $anonfun$generateConfigs$1(scala.collection.mutable.Map rackInfo$1, int i) {
        return (scala.collection.mutable.Map)rackInfo$1.$plus$eq((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)i)), (Object)Integer.toString(i)));
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerMetricsBytesOut$2(Admin admin$1) {
        return ((java.util.Map)admin$1.listPartitionReassignments().reassignments().get()).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerMetricsBytesOut$3() {
        return "reassignment did not complete.";
    }

    public static final /* synthetic */ void $anonfun$testFetchFromFollowerMetricsBytesOut$4(TopicPartition topicPartition$1, KafkaBroker broker) {
        LogManager qual$1 = broker.logManager();
        boolean x$2 = qual$1.getLog$default$2();
        Option log = qual$1.getLog(topicPartition$1, x$2);
        int brokerId = broker.config().brokerId();
        Option logSize = log.map((Function1 & Serializable)x$3 -> BoxesRunTime.boxToLong((long)x$3.size()));
        Assertions.assertTrue((boolean)logSize.exists((Function1)(JFunction1.mcZJ.sp & Serializable)x$4 -> x$4 > 0L), (String)new StringBuilder(64).append("Expected broker ").append(brokerId).append(" to have a Log for ").append(topicPartition$1).append(" with positive size, actual: ").append(logSize).toString());
    }

    public FetchFromFollowerMetricsTest() {
        this.numNodes = 2;
        this.numParts = 2;
        this.requiredKafkaServerPrefix = "kafka.server:type=KafkaServer,name";
        this.nMessages = 2;
    }
}

