/*
 * Decompiled with CFR 0.152.
 */
package kafka.metrics;

import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import kafka.integration.KafkaServerTestHarness;
import kafka.metrics.BrokerWithMember;
import kafka.metrics.ConsumerLagEmitter$;
import kafka.metrics.ConsumerLagEmitterIntegrationTest$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Map$;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@Tag(value="bazel:size:large")
@ScalaSignature(bytes="\u0006\u0005\t%t!\u0002\u0011\"\u0011\u00031c!\u0002\u0015\"\u0011\u0003I\u0003\"\u0002\u0019\u0002\t\u0003\t\u0004\"\u0002\u001a\u0002\t\u0013\u0019\u0004\"\u0002\u001a\u0002\t\u0003!f\u0001\u0002\u0015\"\u0001}CQ\u0001M\u0003\u0005\u00021DqA\\\u0003C\u0002\u0013\u0005q\u000e\u0003\u0004t\u000b\u0001\u0006I\u0001\u001d\u0005\bi\u0016\u0011\r\u0011\"\u0001p\u0011\u0019)X\u0001)A\u0005a\"9a/\u0002b\u0001\n\u00039\bBB>\u0006A\u0003%\u0001\u0010C\u0004}\u000b\u0001\u0007I\u0011A?\t\u0013\u0005%R\u00011A\u0005\u0002\u0005-\u0002bBA\u001c\u000b\u0001\u0006KA \u0005\n\u0003s)\u0001\u0019!C\u0001\u0003wA\u0011\"a\u0013\u0006\u0001\u0004%\t!!\u0014\t\u0011\u0005ES\u0001)Q\u0005\u0003{Aq!a\u0015\u0006\t\u0003\t)\u0006C\u0004\u0002f\u0015!\t!a\u001a\t\u000f\u0005}T\u0001\"\u0001\u0002h!9\u0011\u0011R\u0003\u0005\u0002\u0005-\u0005bBA]\u000b\u0011\u0005\u00111\u0018\u0005\b\u0003\u000b,A\u0011BAd\u0011\u001d\t\t/\u0002C\u0005\u0003GDq!a:\u0006\t\u0013\tI\u000fC\u0005\u0003\u0012\u0015\t\n\u0011\"\u0003\u0003\u0014!I!\u0011F\u0003\u0012\u0002\u0013%!1\u0003\u0005\b\u0005W)A\u0011\u0002B\u0017\u0011%\u0011i$BI\u0001\n\u0013\u0011y\u0004C\u0004\u0003D\u0015!IA!\u0012\u0002C\r{gn];nKJd\u0015mZ#nSR$XM]%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u0005\t\u001a\u0013aB7fiJL7m\u001d\u0006\u0002I\u0005)1.\u00194lC\u000e\u0001\u0001CA\u0014\u0002\u001b\u0005\t#!I\"p]N,X.\u001a:MC\u001e,U.\u001b;uKJLe\u000e^3he\u0006$\u0018n\u001c8UKN$8CA\u0001+!\tYc&D\u0001-\u0015\u0005i\u0013!B:dC2\f\u0017BA\u0018-\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012AJ\u0001\u0015M&tGM\u0011:pW\u0016\u0014x+\u001b;i\u001b\u0016l'-\u001a:\u0015\u0007Q:t\t\u0005\u0002(k%\u0011a'\t\u0002\u0011\u0005J|7.\u001a:XSRDW*Z7cKJDQ\u0001O\u0002A\u0002e\nqA\u0019:pW\u0016\u00148\u000fE\u0002;\u007f\u0005k\u0011a\u000f\u0006\u0003yu\nq!\\;uC\ndWM\u0003\u0002?Y\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u0001[$A\u0002\"vM\u001a,'\u000f\u0005\u0002C\u000b6\t1I\u0003\u0002EG\u000511/\u001a:wKJL!AR\"\u0003\u0017-\u000bgm[1Ce>\\WM\u001d\u0005\u0006\u0011\u000e\u0001\r!S\u0001\u0006OJ|W\u000f\u001d\t\u0003\u0015Fs!aS(\u0011\u00051cS\"A'\u000b\u00059+\u0013A\u0002\u001fs_>$h(\u0003\u0002QY\u00051\u0001K]3eK\u001aL!AU*\u0003\rM#(/\u001b8h\u0015\t\u0001F\u0006F\u00025+zCQ\u0001\u000f\u0003A\u0002Y\u00032a\u0016/B\u001b\u0005A&BA-[\u0003\u0011)H/\u001b7\u000b\u0003m\u000bAA[1wC&\u0011Q\f\u0017\u0002\u0005\u0019&\u001cH\u000fC\u0003I\t\u0001\u0007\u0011jE\u0002\u0006A\u001a\u0004\"!\u00193\u000e\u0003\tT!aY\u0012\u0002\u0017%tG/Z4sCRLwN\\\u0005\u0003K\n\u0014acS1gW\u0006\u001cVM\u001d<feR+7\u000f\u001e%be:,7o\u001d\t\u0003O*l\u0011\u0001\u001b\u0006\u0003S\u000e\nQ!\u001e;jYNL!a\u001b5\u0003\u000f1{wmZ5oOR\tQ\u000e\u0005\u0002(\u000b\u0005Aa.^7O_\u0012,7/F\u0001q!\tY\u0013/\u0003\u0002sY\t\u0019\u0011J\u001c;\u0002\u00139,XNT8eKN\u0004\u0013\u0001\u00038v[B\u000b'\u000f^:\u0002\u00139,X\u000eU1siN\u0004\u0013aD8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:\u0016\u0003a\u0004\"aV=\n\u0005iD&A\u0003)s_B,'\u000f^5fg\u0006\u0001rN^3se&$\u0017N\\4Qe>\u00048\u000fI\u0001\nG>t7/^7feN,\u0012A \t\u0005u}\f\u0019!C\u0002\u0002\u0002m\u00121aU3u!!\t)!!\u0007\u0002\u001e\u0005uQBAA\u0004\u0015\u0011\tI!a\u0003\u0002\u0011\r|gn];nKJTA!!\u0004\u0002\u0010\u000591\r\\5f]R\u001c(b\u0001\u0013\u0002\u0012)!\u00111CA\u000b\u0003\u0019\t\u0007/Y2iK*\u0011\u0011qC\u0001\u0004_J<\u0017\u0002BA\u000e\u0003\u000f\u0011\u0001bQ8ogVlWM\u001d\t\u0006W\u0005}\u00111E\u0005\u0004\u0003Ca#!B!se\u0006L\bcA\u0016\u0002&%\u0019\u0011q\u0005\u0017\u0003\t\tKH/Z\u0001\u000eG>t7/^7feN|F%Z9\u0015\t\u00055\u00121\u0007\t\u0004W\u0005=\u0012bAA\u0019Y\t!QK\\5u\u0011!\t)DDA\u0001\u0002\u0004q\u0018a\u0001=%c\u0005Q1m\u001c8tk6,'o\u001d\u0011\u0002\r\u0005$W.\u001b8t+\t\ti\u0004\u0005\u0003;\u007f\u0006}\u0002\u0003BA!\u0003\u000fj!!a\u0011\u000b\t\u0005\u0015\u00131B\u0001\u0006C\u0012l\u0017N\\\u0005\u0005\u0003\u0013\n\u0019EA\u0003BI6Lg.\u0001\u0006bI6Lgn]0%KF$B!!\f\u0002P!I\u0011QG\t\u0002\u0002\u0003\u0007\u0011QH\u0001\bC\u0012l\u0017N\\:!\u0003=9WM\\3sCR,7i\u001c8gS\u001e\u001cXCAA,!\u0019\tI&a\u0017\u0002`5\tQ(C\u0002\u0002^u\u00121aU3r!\r\u0011\u0015\u0011M\u0005\u0004\u0003G\u001a%aC&bM.\f7i\u001c8gS\u001e\f!\"\u001b8ji&\fG.\u001b>f)\t\ti\u0003K\u0002\u0015\u0003W\u0002B!!\u001c\u0002|5\u0011\u0011q\u000e\u0006\u0005\u0003c\n\u0019(A\u0002ba&TA!!\u001e\u0002x\u00059!.\u001e9ji\u0016\u0014(\u0002BA=\u0003+\tQA[;oSRLA!! \u0002p\tQ!)\u001a4pe\u0016,\u0015m\u00195\u0002\u0011MDW\u000f\u001e3po:D3!FAB!\u0011\ti'!\"\n\t\u0005\u001d\u0015q\u000e\u0002\n\u0003\u001a$XM]#bG\"\fa\u0003^3ti\u000e{gn];nKJd\u0015mZ'fiJL7m\u001d\u000b\u0005\u0003[\ti\t\u0003\u0004\u0002\u0010Z\u0001\r!S\u0001\u0007cV|'/^7)\u0007Y\t\u0019\n\u0005\u0003\u0002\u0016\u0006mUBAAL\u0015\u0011\tI*a\u001d\u0002\rA\f'/Y7t\u0013\u0011\ti*a&\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH\u000fK\u0004\u0017\u0003C\u000bi+a,\u0011\t\u0005\r\u0016\u0011V\u0007\u0003\u0003KSA!a*\u0002\u0018\u0006A\u0001O]8wS\u0012,'/\u0003\u0003\u0002,\u0006\u0015&a\u0003,bYV,7k\\;sG\u0016\fqa\u001d;sS:<7\u000f\f\u0003\u00022\u0006U\u0016EAAZ\u0003\tQ8.\t\u0002\u00028\u0006)1N]1gi\u0006AB/Z:u\u0011\u0006tG\r\\3He>,\b/T5he\u0006$\u0018n\u001c8\u0015\t\u00055\u0012Q\u0018\u0005\u0007\u0003\u001f;\u0002\u0019A%)\u0007]\t\u0019\nK\u0004\u0018\u0003C\u000bi+a1-\t\u0005E\u0016QW\u0001\u0012m\u0016\u0014\u0018NZ=D_:\u001cX/\\3s\u0019\u0006<G\u0003CA\u0017\u0003\u0013\fi-a6\t\r\u0005-\u0007\u00041\u00015\u0003A\u0011'o\\6fe^KG\u000f['f[\n,'\u000fC\u0004\u0002Pb\u0001\r!!5\u0002\tQ\fwm\u001d\t\u0006u\u0005M\u0017*S\u0005\u0004\u0003+\\$aA'ba\"9\u0011\u0011\u001c\rA\u0002\u0005m\u0017aE3ya\u0016\u001cG/\u001a3D_:\u001cX/\\3s\u0019\u0006<\u0007cA\u0016\u0002^&\u0019\u0011q\u001c\u0017\u0003\t1{gnZ\u0001 m\u0016\u0014\u0018NZ=D_:\u001cX/\\3s\u0019\u0006<W)\\5ui\u0016\u0014H*\u0019;f]\u000eLH\u0003BA\u0017\u0003KDa!a3\u001a\u0001\u0004!\u0014\u0001F5t\u001b\u0016$(/[2WC2,X-\u00138SC:<W\r\u0006\u0005\u0002l\u0006E(\u0011\u0002B\u0007!\rY\u0013Q^\u0005\u0004\u0003_d#a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003gT\u0002\u0019AA{\u0003-Y\u0017MZ6b\u001b\u0016$(/[2\u0011\u000b-\n90a?\n\u0007\u0005eHF\u0001\u0004PaRLwN\u001c\t\u0005\u0003{\u0014)!\u0004\u0002\u0002\u0000*\u0019!E!\u0001\u000b\t\t\r\u0011qB\u0001\u0007G>lWn\u001c8\n\t\t\u001d\u0011q \u0002\f\u0017\u000647.Y'fiJL7\rC\u0005\u0003\fi\u0001\n\u00111\u0001\u0002\\\u0006QAn\\<fe\n{WO\u001c3\t\u0013\t=!\u0004%AA\u0002\u0005m\u0017AC;qa\u0016\u0014(i\\;oI\u0006q\u0012n]'fiJL7MV1mk\u0016LeNU1oO\u0016$C-\u001a4bk2$HEM\u000b\u0003\u0005+QC!a7\u0003\u0018-\u0012!\u0011\u0004\t\u0005\u00057\u0011)#\u0004\u0002\u0003\u001e)!!q\u0004B\u0011\u0003%)hn\u00195fG.,GMC\u0002\u0003$1\n!\"\u00198o_R\fG/[8o\u0013\u0011\u00119C!\b\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-\u0001\u0010jg6+GO]5d-\u0006dW/Z%o%\u0006tw-\u001a\u0013eK\u001a\fW\u000f\u001c;%g\u0005q1M]3bi\u0016\u001cuN\\:v[\u0016\u0014H\u0003CA\u0002\u0005_\u0011\u0019Da\u000e\t\r\tER\u00041\u0001J\u0003)\u0011'o\\6fe2K7\u000f\u001e\u0005\u0007\u0005ki\u0002\u0019A%\u0002\u000f\u001d\u0014x.\u001e9JI\"I!\u0011H\u000f\u0011\u0002\u0003\u0007!1H\u0001\u0010OJ|W\u000f]%ogR\fgnY3JIB!1&a>J\u0003a\u0019'/Z1uK\u000e{gn];nKJ$C-\u001a4bk2$HeM\u000b\u0003\u0005\u0003RCAa\u000f\u0003\u0018\u0005\t2M]3bi\u0016\fE-\\5o\u00072LWM\u001c;\u0015\t\u0005}\"q\t\u0005\b\u0005\u0013z\u0002\u0019\u0001B&\u0003)\tG-\\5o!J|\u0007o\u001d\t\u0007\u0015\n5\u0013Ja\u0014\n\u0007\u0005U7\u000b\u0005\u0003\u0003R\t]SB\u0001B*\u0015\r\u0011)FW\u0001\u0005Y\u0006tw-\u0003\u0003\u0003Z\tM#AB(cU\u0016\u001cG\u000fK\u0004\u0006\u0005;\u0012\u0019G!\u001a\u0011\t\u00055$qL\u0005\u0005\u0005C\nyGA\u0002UC\u001e\fQA^1mk\u0016\f#Aa\u001a\u0002!\t\f'0\u001a7;g&TXM\u000f7be\u001e,\u0007")
public class ConsumerLagEmitterIntegrationTest
extends KafkaServerTestHarness {
    private final int numNodes;
    private final int numParts;
    private final Properties overridingProps = new Properties();
    private scala.collection.mutable.Set<Consumer<byte[], byte[]>> consumers;
    private scala.collection.mutable.Set<Admin> admins;

    public static BrokerWithMember findBrokerWithMember(java.util.List<KafkaBroker> brokers, String group) {
        return ConsumerLagEmitterIntegrationTest$.MODULE$.findBrokerWithMember(brokers, group);
    }

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    public scala.collection.mutable.Set<Consumer<byte[], byte[]>> consumers() {
        return this.consumers;
    }

    public void consumers_$eq(scala.collection.mutable.Set<Consumer<byte[], byte[]>> x$1) {
        this.consumers = x$1;
    }

    public scala.collection.mutable.Set<Admin> admins() {
        return this.admins;
    }

    public void admins_$eq(scala.collection.mutable.Set<Admin> x$1) {
        this.admins = x$1;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(this.numNodes(), this.zkConnectOrNull(), false, true, (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, false, false, (Map<Object, String>)((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$)), 1, false, 1, (short)1, 0, false).map((Function1 & Serializable)x$1 -> {
            Properties fromProps_overrides = this.overridingProps();
            return KafkaConfig$.MODULE$.fromProps(x$1, fromProps_overrides, true);
        });
    }

    @BeforeEach
    public void initialize() {
        this.consumers_$eq((scala.collection.mutable.Set<Consumer<byte[], byte[]>>)((scala.collection.mutable.Set)Set$.MODULE$.empty()));
        this.admins_$eq((scala.collection.mutable.Set<Admin>)((scala.collection.mutable.Set)Set$.MODULE$.empty()));
    }

    @AfterEach
    public void shutdown() {
        this.consumers().foreach((Function1 & Serializable)x$2 -> {
            x$2.close();
            return BoxedUnit.UNIT;
        });
        this.admins().foreach((Function1 & Serializable)x$3 -> {
            x$3.close();
            return BoxedUnit.UNIT;
        });
    }

    @ParameterizedTest
    @ValueSource(strings={"zk", "kraft"})
    public void testConsumerLagMetrics(String quorum) {
        .colon.colon tenants = new .colon.colon((Object)"lkc-xxxxx", (List)new .colon.colon((Object)"lkc-yyyyy", (List)Nil$.MODULE$));
        List topics = tenants.map((Function1 & Serializable)tenant -> new StringBuilder(20).append((String)tenant).append("_external.test.topic").toString());
        List groups = tenants.map((Function1 & Serializable)tenant -> new StringBuilder(11).append((String)tenant).append("_test-group").toString());
        List groupInstances = tenants.map((Function1 & Serializable)tenant -> new StringBuilder(15).append("group-instance-").append((String)tenant).toString());
        List numMessagesList = (List)package$.MODULE$.List().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{300, 600}));
        List expectedConsumerLags = (List)package$.MODULE$.List().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{500, 1000}));
        int numPartitions = 1;
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach((Function1 & Serializable)index -> ConsumerLagEmitterIntegrationTest.$anonfun$testConsumerLagMetrics$4(this, topics, numPartitions, numMessagesList, BoxesRunTime.unboxToInt((Object)index)));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)index -> {
            Consumer<byte[], byte[]> consumer = this.createConsumer(this.bootstrapServers(this.bootstrapServers$default$1()), (String)groups.apply(index), (Option<String>)new Some(groupInstances.apply(index)));
            consumer.subscribe(Collections.singleton(topics.apply(index)));
            TestUtils$.MODULE$.consumeRecords(consumer, BoxesRunTime.unboxToInt((Object)numMessagesList.apply(index)), 15000L);
            consumer.commitSync();
        });
        ArrayBuffer brokersWithMembers = ArrayBuffer$.MODULE$.empty();
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach((Function1 & Serializable)index -> ConsumerLagEmitterIntegrationTest.$anonfun$testConsumerLagMetrics$6(this, brokersWithMembers, groups, BoxesRunTime.unboxToInt((Object)index)));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)((JFunction1.mcVI.sp & Serializable)arg_0 -> ConsumerLagEmitterIntegrationTest.$anonfun$testConsumerLagMetrics$7(this, (List)tenants, groupInstances, brokersWithMembers, topics, expectedConsumerLags, arg_0)));
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest
    @ValueSource(strings={"zk", "kraft"})
    public void testHandleGroupMigration(String quorum) {
        String topic = "test-topic";
        String group = "test-group";
        int numMessages = 1000;
        int expectedConsumerLag = 1500;
        this.createTopic(topic, this.createTopic$default$2(), this.createTopic$default$3(), this.createTopic$default$4(), this.createTopic$default$5(), this.createTopic$default$6());
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, numMessages, -1);
        Consumer<byte[], byte[]> consumer = this.createConsumer(this.bootstrapServers(this.bootstrapServers$default$1()), group, (Option<String>)None$.MODULE$);
        Admin admin = this.createAdminClient((scala.collection.immutable.Map<String, Object>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)this.bootstrapServers(this.bootstrapServers$default$1()))}))));
        consumer.subscribe(Collections.singleton(topic));
        TestUtils$.MODULE$.consumeRecords(consumer, numMessages, 15000L);
        consumer.commitSync();
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, expectedConsumerLag, -1);
        BrokerWithMember brokerWithMember = ConsumerLagEmitterIntegrationTest$.MODULE$.kafka$metrics$ConsumerLagEmitterIntegrationTest$$findBrokerWithMember(this.brokers(), group);
        scala.collection.mutable.Map tags = (scala.collection.mutable.Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"consumer-group"), (Object)"test-group"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"member"), (Object)brokerWithMember.member().memberId()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"client-id"), (Object)brokerWithMember.member().clientId()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"topic"), (Object)topic), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"partition"), (Object)"0")}));
        this.verifyConsumerLag(brokerWithMember, (scala.collection.mutable.Map<String, String>)tags, expectedConsumerLag);
        this.verifyConsumerLagEmitterLatency(brokerWithMember);
        scala.collection.mutable.Map describedTopics = CollectionConverters$.MODULE$.MapHasAsScala((java.util.Map)admin.describeTopics((Collection)CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)"__consumer_offsets", (List)Nil$.MODULE$)).asJava()).allTopicNames().get()).asScala();
        Assertions.assertEquals((int)1, (int)describedTopics.keySet().size());
        java.util.List topicPartitionInfoList = (java.util.List)((IterableOps)describedTopics.map((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                return ((TopicDescription)x0$1._2()).partitions();
            }
            throw new MatchError(null);
        })).head();
        java.util.Map reversedReplicasReassignment = CollectionConverters$.MODULE$.MapHasAsJava((Map)((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(topicPartitionInfoList).asScala().map((Function1 & Serializable)partitionInfo -> {
            java.util.List reversedReplicas = CollectionConverters$.MODULE$.BufferHasAsJava((Buffer)((IterableOps)CollectionConverters$.MODULE$.ListHasAsScala(partitionInfo.replicas()).asScala().reverse()).map((Function1 & Serializable)node -> node.id())).asJava();
            return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("__consumer_offsets", partitionInfo.partition())), Optional.of(new NewPartitionReassignment(reversedReplicas)));
        })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl())).asJava();
        admin.alterPartitionReassignments(reversedReplicasReassignment).all().get();
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ConsumerLagEmitterIntegrationTest.$anonfun$testHandleGroupMigration$4(admin)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"reassignment did not complete.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        java.util.Set topicPartitions = CollectionConverters$.MODULE$.SetHasAsJava((Set)((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(topicPartitionInfoList).asScala().map((Function1 & Serializable)partitionInfo -> new TopicPartition("__consumer_offsets", partitionInfo.partition()))).toSet()).asJava();
        admin.electLeaders(ElectionType.PREFERRED, topicPartitions).all().get();
        long l2 = 100L;
        long waitUntilTrue_waitTimeMs2 = 15000L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ConsumerLagEmitterIntegrationTest.$anonfun$testHandleGroupMigration$7(brokerWithMember, tags)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                Assertions.fail((String)"consumer lag metrics for the test group should no longer exist.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
        }
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, expectedConsumerLag, -1);
        BrokerWithMember brokerWithMigratedMember = ConsumerLagEmitterIntegrationTest$.MODULE$.kafka$metrics$ConsumerLagEmitterIntegrationTest$$findBrokerWithMember(this.brokers(), group);
        Assertions.assertNotEquals((int)brokerWithMember.broker().config().brokerId(), (int)brokerWithMigratedMember.broker().config().brokerId());
        tags.$plus$eq((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"member"), (Object)brokerWithMigratedMember.member().memberId()));
        tags.$plus$eq((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"client-id"), (Object)brokerWithMigratedMember.member().clientId()));
        this.verifyConsumerLag(brokerWithMigratedMember, (scala.collection.mutable.Map<String, String>)tags, 2 * expectedConsumerLag);
        this.verifyConsumerLagEmitterLatency(brokerWithMigratedMember);
    }

    /*
     * WARNING - void declaration
     */
    private void verifyConsumerLag(BrokerWithMember brokerWithMember, scala.collection.mutable.Map<String, String> tags, long expectedConsumerLag) {
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ConsumerLagEmitterIntegrationTest.$anonfun$verifyConsumerLag$1(brokerWithMember, tags, expectedConsumerLag)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"consumer lag metric should exist");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    /*
     * WARNING - void declaration
     */
    private void verifyConsumerLagEmitterLatency(BrokerWithMember brokerWithMember) {
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ConsumerLagEmitterIntegrationTest.$anonfun$verifyConsumerLagEmitterLatency$1(this, brokerWithMember)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"consumer lag emitter should take 0 - 1 seconds.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Metrics metrics = brokerWithMember.broker().metrics();
        Option kafkaMetric = CollectionConverters$.MODULE$.MapHasAsScala(metrics.metrics()).asScala().get((Object)metrics.metricName(ConsumerLagEmitter$.MODULE$.TimeSinceLastSuccessfulRunMsMetricName(), ConsumerLagEmitter$.MODULE$.MetricGroupName()));
        Assertions.assertTrue((boolean)this.isMetricValueInRange((Option<KafkaMetric>)kafkaMetric, -1L, 1000L));
    }

    private boolean isMetricValueInRange(Option<KafkaMetric> kafkaMetric, long lowerBound, long upperBound) {
        return kafkaMetric.exists((Function1 & Serializable)metric -> BoxesRunTime.boxToBoolean((boolean)ConsumerLagEmitterIntegrationTest.$anonfun$isMetricValueInRange$1(lowerBound, upperBound, metric)));
    }

    private long isMetricValueInRange$default$2() {
        return 0L;
    }

    private long isMetricValueInRange$default$3() {
        return 1000L;
    }

    private Consumer<byte[], byte[]> createConsumer(String brokerList, String groupId, Option<String> groupInstanceId) {
        Consumer consumer = TestUtils$.MODULE$.createConsumer(brokerList, groupId, groupInstanceId, "earliest", true, false, 500, SecurityProtocol.PLAINTEXT, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, new ByteArrayDeserializer(), new ByteArrayDeserializer(), (String)null);
        this.consumers().$plus$eq(consumer);
        return consumer;
    }

    private Option<String> createConsumer$default$3() {
        return None$.MODULE$;
    }

    private Admin createAdminClient(scala.collection.immutable.Map<String, Object> adminProps) {
        Admin admin = Admin.create((java.util.Map)CollectionConverters$.MODULE$.MapHasAsJava(adminProps).asJava());
        this.admins().$plus$eq((Object)admin);
        return admin;
    }

    public static final /* synthetic */ Seq $anonfun$testConsumerLagMetrics$4(ConsumerLagEmitterIntegrationTest $this, List topics$1, int numPartitions$1, List numMessagesList$1, int index) {
        $this.createTopic((String)topics$1.apply(index), numPartitions$1, $this.createTopic$default$3(), $this.createTopic$default$4(), $this.createTopic$default$5(), $this.createTopic$default$6());
        return TestUtils$.MODULE$.generateAndProduceMessages($this.brokers(), (String)topics$1.apply(index), BoxesRunTime.unboxToInt((Object)numMessagesList$1.apply(index)), -1);
    }

    public static final /* synthetic */ ArrayBuffer $anonfun$testConsumerLagMetrics$6(ConsumerLagEmitterIntegrationTest $this, ArrayBuffer brokersWithMembers$1, List groups$1, int index) {
        return (ArrayBuffer)brokersWithMembers$1.$plus$eq((Object)ConsumerLagEmitterIntegrationTest$.MODULE$.kafka$metrics$ConsumerLagEmitterIntegrationTest$$findBrokerWithMember($this.brokers(), (String)groups$1.apply(index)));
    }

    public static final /* synthetic */ void $anonfun$testConsumerLagMetrics$7(ConsumerLagEmitterIntegrationTest $this, List tenants$1, List groupInstances$1, ArrayBuffer brokersWithMembers$1, List topics$1, List expectedConsumerLags$1, int index) {
        scala.collection.mutable.Map tags = (scala.collection.mutable.Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"tenant"), tenants$1.apply(index)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"consumer-group"), (Object)"test-group"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"group-instance-id"), groupInstances$1.apply(index)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"member"), (Object)((BrokerWithMember)brokersWithMembers$1.apply(index)).member().memberId()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"client-id"), (Object)((BrokerWithMember)brokersWithMembers$1.apply(index)).member().clientId()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"topic"), (Object)"external.test.topic"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"partition"), (Object)"0")}));
        $this.verifyConsumerLag((BrokerWithMember)brokersWithMembers$1.apply(index), (scala.collection.mutable.Map<String, String>)tags, 0L);
        $this.verifyConsumerLagEmitterLatency((BrokerWithMember)brokersWithMembers$1.apply(index));
        TestUtils$.MODULE$.generateAndProduceMessages($this.brokers(), (String)topics$1.apply(index), BoxesRunTime.unboxToInt((Object)expectedConsumerLags$1.apply(index)), -1);
        $this.verifyConsumerLag((BrokerWithMember)brokersWithMembers$1.apply(index), (scala.collection.mutable.Map<String, String>)tags, BoxesRunTime.unboxToInt((Object)expectedConsumerLags$1.apply(index)));
        $this.verifyConsumerLagEmitterLatency((BrokerWithMember)brokersWithMembers$1.apply(index));
    }

    public static final /* synthetic */ boolean $anonfun$testHandleGroupMigration$4(Admin admin$1) {
        return ((java.util.Map)admin$1.listPartitionReassignments().reassignments().get()).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testHandleGroupMigration$5() {
        return "reassignment did not complete.";
    }

    public static final /* synthetic */ boolean $anonfun$testHandleGroupMigration$7(BrokerWithMember brokerWithMember$2, scala.collection.mutable.Map tags$1) {
        Metrics metrics = brokerWithMember$2.broker().metrics();
        return CollectionConverters$.MODULE$.MapHasAsScala(metrics.metrics()).asScala().get((Object)metrics.metricName(ConsumerLagEmitter$.MODULE$.ConsumerLagMetricName(), "tenant-metrics", CollectionConverters$.MODULE$.MutableMapHasAsJava(tags$1).asJava())).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testHandleGroupMigration$8() {
        return "consumer lag metrics for the test group should no longer exist.";
    }

    public static final /* synthetic */ boolean $anonfun$verifyConsumerLag$1(BrokerWithMember brokerWithMember$3, scala.collection.mutable.Map tags$2, long expectedConsumerLag$1) {
        Metrics metrics = brokerWithMember$3.broker().metrics();
        Option kafkaMetric = CollectionConverters$.MODULE$.MapHasAsScala(metrics.metrics()).asScala().get((Object)metrics.metricName(ConsumerLagEmitter$.MODULE$.ConsumerLagMetricName(), "tenant-metrics", CollectionConverters$.MODULE$.MutableMapHasAsJava(tags$2).asJava()));
        return kafkaMetric.isDefined() && BoxesRunTime.equals((Object)((KafkaMetric)kafkaMetric.get()).metricValue(), (Object)BoxesRunTime.boxToLong((long)expectedConsumerLag$1));
    }

    public static final /* synthetic */ String $anonfun$verifyConsumerLag$2() {
        return "consumer lag metric should exist";
    }

    public static final /* synthetic */ boolean $anonfun$verifyConsumerLagEmitterLatency$1(ConsumerLagEmitterIntegrationTest $this, BrokerWithMember brokerWithMember$4) {
        Metrics metrics = brokerWithMember$4.broker().metrics();
        Option kafkaMetric = CollectionConverters$.MODULE$.MapHasAsScala(metrics.metrics()).asScala().get((Object)metrics.metricName(ConsumerLagEmitter$.MODULE$.ExecutionTimeMetricName(), ConsumerLagEmitter$.MODULE$.MetricGroupName()));
        return $this.isMetricValueInRange((Option<KafkaMetric>)kafkaMetric, 0L, 1000L);
    }

    public static final /* synthetic */ String $anonfun$verifyConsumerLagEmitterLatency$2() {
        return "consumer lag emitter should take 0 - 1 seconds.";
    }

    public static final /* synthetic */ boolean $anonfun$isMetricValueInRange$1(long lowerBound$1, long upperBound$1, KafkaMetric metric) {
        return BoxesRunTime.unboxToLong((Object)metric.metricValue()) > lowerBound$1 && BoxesRunTime.unboxToLong((Object)metric.metricValue()) < upperBound$1;
    }

    public ConsumerLagEmitterIntegrationTest() {
        this.numNodes = 2;
        this.numParts = 2;
        this.overridingProps().put(KafkaConfig$.MODULE$.NumPartitionsProp(), Integer.toString(this.numParts()));
        this.overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), Integer.toString(this.numParts()));
        this.overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), Integer.toString(this.numNodes()));
        this.overridingProps().put("confluent.consumer.lag.emitter.enabled", "true");
        this.overridingProps().put("confluent.consumer.lag.emitter.interval.ms", "1000");
        this.consumers = null;
        this.admins = null;
    }
}

