/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.epoch;

import com.typesafe.scalalogging.Logger;
import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.cluster.BrokerEndPoint;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.log.LogSegment;
import kafka.server.BlockingSend;
import kafka.server.BrokerBlockingSender$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
import kafka.utils.Implicits;
import kafka.utils.Implicits$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.compat.MapExtensionMethods$;
import scala.collection.compat.package$;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\u0005%h\u0001B\u0013'\u00015BQ\u0001\u000f\u0001\u0005\u0002eBq\u0001\u0010\u0001A\u0002\u0013\u0005Q\bC\u0004L\u0001\u0001\u0007I\u0011\u0001'\t\rM\u0003\u0001\u0015)\u0003?\u0011\u001d!\u0006A1A\u0005\u0002UCaA\u0018\u0001!\u0002\u00131\u0006bB0\u0001\u0005\u0004%\t!\u0016\u0005\u0007A\u0002\u0001\u000b\u0011\u0002,\t\u000f\u0005\u0004!\u0019!C\u0001E\"1a\u000e\u0001Q\u0001\n\rDqa\u001c\u0001C\u0002\u0013\u0005!\r\u0003\u0004q\u0001\u0001\u0006Ia\u0019\u0005\bc\u0002\u0011\r\u0011\"\u0001c\u0011\u0019\u0011\b\u0001)A\u0005G\"91\u000f\u0001b\u0001\n\u0003\u0011\u0007B\u0002;\u0001A\u0003%1\rC\u0004v\u0001\t\u0007I\u0011\u00012\t\rY\u0004\u0001\u0015!\u0003d\u0011\u001d9\bA1A\u0005\u0002\tDa\u0001\u001f\u0001!\u0002\u0013\u0019\u0007bB=\u0001\u0001\u0004%\tA\u001f\u0005\n\u0003#\u0001\u0001\u0019!C\u0001\u0003'Aq!a\u0006\u0001A\u0003&1\u0010C\u0004\u0002\u001a\u0001!\t%a\u0007\t\u000f\u0005M\u0002\u0001\"\u0001\u0002\u001c!9\u0011Q\b\u0001\u0005\u0002\u0005m\u0001bBA!\u0001\u0011\u0005\u00111\u0004\u0005\b\u0003\u000b\u0002A\u0011AA$\u0011\u001d\tY\u0006\u0001C\u0005\u0003WCq!!.\u0001\t\u0013\t9\fC\u0004\u0002R\u0002!I!a5\t\u000f\u0005\u001d\b\u0001\"\u0003\u0002\u001c\u00199\u0011\u0011\u000b\u0001\u0001M\u0005M\u0003BCA.C\t\u0005\t\u0015!\u0003\u0002^!1\u0001(\tC\u0001\u0003GBq!a\u001a\"\t\u0003\tIG\u0001\u000eMK\u0006$WM]#q_\u000eD\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002(Q\u0005)Q\r]8dQ*\u0011\u0011FK\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003-\nQa[1gW\u0006\u001c\u0001aE\u0002\u0001]I\u0002\"a\f\u0019\u000e\u0003!J!!\r\u0015\u0003#E+xN];n)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u00024m5\tAG\u0003\u00026U\u0005)Q\u000f^5mg&\u0011q\u0007\u000e\u0002\b\u0019><w-\u001b8h\u0003\u0019a\u0014N\\5u}Q\t!\b\u0005\u0002<\u00015\ta%A\u0004ce>\\WM]:\u0016\u0003y\u00022a\u0010$I\u001b\u0005\u0001%BA!C\u0003\u001diW\u000f^1cY\u0016T!a\u0011#\u0002\u0015\r|G\u000e\\3di&|gNC\u0001F\u0003\u0015\u00198-\u00197b\u0013\t9\u0005I\u0001\u0006MSN$()\u001e4gKJ\u0004\"aL%\n\u0005)C#aC&bM.\f7+\u001a:wKJ\f1B\u0019:pW\u0016\u00148o\u0018\u0013fcR\u0011Q*\u0015\t\u0003\u001d>k\u0011\u0001R\u0005\u0003!\u0012\u0013A!\u00168ji\"9!kAA\u0001\u0002\u0004q\u0014a\u0001=%c\u0005A!M]8lKJ\u001c\b%\u0001\u0004u_BL7-M\u000b\u0002-B\u0011q\u000bX\u0007\u00021*\u0011\u0011LW\u0001\u0005Y\u0006twMC\u0001\\\u0003\u0011Q\u0017M^1\n\u0005uC&AB*ue&tw-A\u0004u_BL7-\r\u0011\u0002\rQ|\u0007/[23\u0003\u001d!x\u000e]5de\u0001\nA\u0001^\u0019qaU\t1\r\u0005\u0002eY6\tQM\u0003\u0002gO\u000611m\\7n_:T!a\u000b5\u000b\u0005%T\u0017AB1qC\u000eDWMC\u0001l\u0003\ry'oZ\u0005\u0003[\u0016\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.A\u0003ucA\u0004\u0004%\u0001\u0003ucA\f\u0014!\u0002;2aF\u0002\u0013\u0001\u0002;2aJ\nQ\u0001^\u0019qe\u0001\nA\u0001\u001e\u001aqa\u0005)AO\r91A\u0005!AO\r93\u0003\u0015!(\u0007\u001d\u001a!\u0003\t!\b/A\u0002ua\u0002\n\u0001\u0002\u001d:pIV\u001cWM]\u000b\u0002wB9A0!\u0001\u0002\u0006\u0005\u0015Q\"A?\u000b\u0005et(BA@h\u0003\u001d\u0019G.[3oiNL1!a\u0001~\u00055Y\u0015MZ6b!J|G-^2feB)a*a\u0002\u0002\f%\u0019\u0011\u0011\u0002#\u0003\u000b\u0005\u0013(/Y=\u0011\u00079\u000bi!C\u0002\u0002\u0010\u0011\u0013AAQ=uK\u0006a\u0001O]8ek\u000e,'o\u0018\u0013fcR\u0019Q*!\u0006\t\u000fI3\u0012\u0011!a\u0001w\u0006I\u0001O]8ek\u000e,'\u000fI\u0001\ti\u0016\f'\u000fR8x]R\tQ\nK\u0002\u0019\u0003?\u0001B!!\t\u000205\u0011\u00111\u0005\u0006\u0005\u0003K\t9#A\u0002ba&TA!!\u000b\u0002,\u00059!.\u001e9ji\u0016\u0014(bAA\u0017U\u0006)!.\u001e8ji&!\u0011\u0011GA\u0012\u0005%\te\r^3s\u000b\u0006\u001c\u0007.A\u001ftQ>,H\u000eZ!eI\u000e+(O]3oi2+\u0017\rZ3s\u000bB|7\r\u001b+p\u001b\u0016\u001c8/Y4fg\u0006\u001bH\u000b[3z\u0003J,wK]5ui\u0016tGk\u001c'fC\u0012,'\u000fK\u0002\u001a\u0003o\u0001B!!\t\u0002:%!\u00111HA\u0012\u0005\u0011!Vm\u001d;\u0002WMDw.\u001e7e'\u0016tG\rT3bI\u0016\u0014X\t]8dQJ+\u0017/^3ti\u0006sGmR3u\u0003J+7\u000f]8og\u0016D3AGA\u001c\u00039\u001a\bn\\;mI&s7M]3bg\u0016dU-\u00193fe\u0016\u0003xn\u00195CKR<X-\u001a8MK\u0006$WM\u001d*fgR\f'\u000f^:)\u0007m\t9$\u0001\u0017tQ>,H\u000eZ*vaB|'\u000f\u001e*fcV,7\u000f^:G_J,\u0005o\\2ig:{Go\u00148UQ\u0016dU-\u00193feR\u0019Q*!\u0013\t\u000f\u0005-C\u00041\u0001\u0002N\u00059a-\u001a;dQ\u0016\u0014\bcAA(C5\t\u0001AA\tUKN$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012\u001cB!IA+eA\u0019a*a\u0016\n\u0007\u0005eCI\u0001\u0004B]f\u0014VMZ\u0001\u0007g\u0016tG-\u001a:\u0011\u0007=\ny&C\u0002\u0002b!\u0012AB\u00117pG.LgnZ*f]\u0012$B!!\u0014\u0002f!9\u00111L\u0012A\u0002\u0005u\u0013\u0001\u00057fC\u0012,'o\u00144gg\u0016$8OR8s)\u0011\tY'a(\u0011\u000f\u00055\u0014qN2\u0002t5\t!)C\u0002\u0002r\t\u00131!T1q!\u0011\t)(!'\u000f\t\u0005]\u00141\u0013\b\u0005\u0003s\nyI\u0004\u0003\u0002|\u00055e\u0002BA?\u0003\u0017sA!a \u0002\n:!\u0011\u0011QAD\u001b\t\t\u0019IC\u0002\u0002\u00062\na\u0001\u0010:p_Rt\u0014\"A6\n\u0005%T\u0017BA\u0016i\u0013\t1w-C\u0002\u0002\u0012\u0016\fq!\\3tg\u0006<W-\u0003\u0003\u0002\u0016\u0006]\u0015\u0001I(gMN,GOR8s\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3ta>t7/\u001a#bi\u0006T1!!%f\u0013\u0011\tY*!(\u0003\u001d\u0015\u0003xn\u00195F]\u0012|eMZ:fi*!\u0011QSAL\u0011\u001d\t\t\u000b\na\u0001\u0003G\u000b!\u0002]1si&$\u0018n\u001c8t!\u001d\ti'a\u001cd\u0003K\u00032ATAT\u0013\r\tI\u000b\u0012\u0002\u0004\u0013:$HCBA/\u0003[\u000b\t\f\u0003\u0004\u00020v\u0001\r\u0001S\u0001\u0005MJ|W\u000e\u0003\u0004\u00024v\u0001\r\u0001S\u0001\u0003i>\fAc^1ji\u001a{'/\u00129pG\"\u001c\u0005.\u00198hKR{GcB'\u0002:\u0006-\u0017q\u001a\u0005\b\u0003ws\u0002\u0019AA_\u0003\u0015!x\u000e]5d!\u0011\ty,a2\u000f\t\u0005\u0005\u00171\u0019\t\u0004\u0003\u0003#\u0015bAAc\t\u00061\u0001K]3eK\u001aL1!XAe\u0015\r\t)\r\u0012\u0005\b\u0003\u001bt\u0002\u0019AAS\u0003%\u0001\u0018M\u001d;ji&|g\u000e\u0003\u0004(=\u0001\u0007\u0011QU\u0001\u0018[\u0016\u001c8/Y4fg\"\u000bg/\u001a'fC\u0012,'/\u00129pG\"$\u0002\"!6\u0002\\\u0006}\u00171\u001d\t\u0004\u001d\u0006]\u0017bAAm\t\n9!i\\8mK\u0006t\u0007BBAo?\u0001\u0007\u0001*\u0001\u0004ce>\\WM\u001d\u0005\b\u0003C|\u0002\u0019AAS\u0003M)\u0007\u0010]3di\u0016$G*Z1eKJ,\u0005o\\2i\u0011\u001d\t)o\ba\u0001\u0003K\u000b\u0011\"\\5o\u001f\u001a47/\u001a;\u00027M,g\u000e\u001a$pkJlUm]:bO\u0016\u001cHk\\#bG\"$v\u000e]5d\u0001")
public class LeaderEpochIntegrationTest
extends QuorumTestHarness {
    private ListBuffer<KafkaServer> brokers = (ListBuffer)ListBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final String topic1;
    private final String topic2;
    private final TopicPartition t1p0 = new TopicPartition(this.topic1(), 0);
    private final TopicPartition t1p1 = new TopicPartition(this.topic1(), 1);
    private final TopicPartition t1p2 = new TopicPartition(this.topic1(), 2);
    private final TopicPartition t2p0 = new TopicPartition(this.topic2(), 0);
    private final TopicPartition t2p2 = new TopicPartition(this.topic2(), 2);
    private final TopicPartition tp = this.t1p0();
    private KafkaProducer<byte[], byte[]> producer = null;

    public ListBuffer<KafkaServer> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(ListBuffer<KafkaServer> x$1) {
        this.brokers = x$1;
    }

    public String topic1() {
        return this.topic1;
    }

    public String topic2() {
        return this.topic2;
    }

    public TopicPartition t1p0() {
        return this.t1p0;
    }

    public TopicPartition t1p1() {
        return this.t1p1;
    }

    public TopicPartition t1p2() {
        return this.t1p2;
    }

    public TopicPartition t2p0() {
        return this.t2p0;
    }

    public TopicPartition t2p2() {
        return this.t2p2;
    }

    public TopicPartition tp() {
        return this.tp;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    @Override
    @AfterEach
    public void tearDown() {
        if (this.producer() != null) {
            this.producer().close();
        }
        TestUtils$.MODULE$.shutdownServers(this.brokers(), TestUtils$.MODULE$.shutdownServers$default$2());
        super.tearDown();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
        this.brokers().$plus$plus$eq((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom()));
        new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).foreach((Function1 & Serializable & scala.Serializable)topic -> TestUtils$.MODULE$.createTopic(this.zkClient(), (String)topic, (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))}))), (Seq<KafkaBroker>)this.brokers()));
        this.sendFourMessagesToEachTopic();
        IntRef expectedLeaderEpoch = IntRef.create((int)0);
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!this.messagesHaveLeaderEpoch((KafkaServer)this.brokers().apply(0), expectedLeaderEpoch.elem, 0)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LeaderEpochIntegrationTest.$anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        ((KafkaServer)this.brokers().apply(0)).startup();
        expectedLeaderEpoch.elem = 1;
        this.waitForEpochChangeTo(this.topic1(), 0, expectedLeaderEpoch.elem);
        this.waitForEpochChangeTo(this.topic2(), 0, expectedLeaderEpoch.elem);
        this.sendFourMessagesToEachTopic();
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!this.messagesHaveLeaderEpoch((KafkaServer)this.brokers().apply(0), expectedLeaderEpoch.elem, 4)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LeaderEpochIntegrationTest.$anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    @Test
    public void shouldSendLeaderEpochRequestAndGetAResponse() {
        this.brokers().$plus$plus$eq((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 102).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom()));
        Map assignment1 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{101})))}));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic1(), (Map<Object, Seq<Object>>)assignment1, (Seq<KafkaBroker>)this.brokers());
        Map assignment2 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100})))}));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic2(), (Map<Object, Seq<Object>>)assignment2, (Seq<KafkaBroker>)this.brokers());
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach((Function1 & Serializable & scala.Serializable)x$1 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(this, BoxesRunTime.unboxToInt((Object)x$1)));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 20).foreach((Function1 & Serializable & scala.Serializable)x$2 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(this, BoxesRunTime.unboxToInt((Object)x$2)));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 30).foreach((Function1 & Serializable & scala.Serializable)x$3 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(this, BoxesRunTime.unboxToInt((Object)x$3)));
        this.producer().flush();
        TestFetcherThread fetcher0 = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(2), (KafkaServer)this.brokers().apply(0)));
        Map epochsRequested = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p2()), (Object)BoxesRunTime.boxToInteger((int)0))}));
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> offsetsForEpochs = fetcher0.leaderOffsetsFor((Map<TopicPartition, Object>)epochsRequested);
        Assertions.assertEquals((long)10L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p0())).endOffset());
        Assertions.assertEquals((long)30L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t2p0())).endOffset());
        Assertions.assertEquals((short)Errors.NOT_LEADER_OR_FOLLOWER.code(), (short)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p1())).errorCode());
        Assertions.assertEquals((long)-1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p1())).endOffset());
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> offsetsForEpochs1 = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(2), (KafkaServer)this.brokers().apply(1))).leaderOffsetsFor((Map<TopicPartition, Object>)epochsRequested);
        Assertions.assertEquals((long)20L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs1.apply((Object)this.t1p1())).endOffset());
    }

    @Test
    public void shouldIncreaseLeaderEpochBetweenLeaderRestarts() {
        this.brokers().$plus$eq((Object)TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(100, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()));
        Assertions.assertEquals((int)100, (int)TestUtils$.MODULE$.waitUntilControllerElected(this.zkClient(), TestUtils$.MODULE$.waitUntilControllerElected$default$2()));
        this.brokers().$plus$eq((Object)TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(101, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.tp().topic(), (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)this.tp().partition())), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{101})))}))), (Seq<KafkaBroker>)this.brokers());
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        TestFetcherThread fetcher = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1)));
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((long)1L, (long)this.leo$1());
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        fetcher = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1)));
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)1))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)2))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)2, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)2L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((long)2L, (long)this.leo$1());
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        fetcher = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1)));
        Assertions.assertEquals((long)1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)2L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)2))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)3L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)4))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)this.leo$1(), (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)4))})))).apply((Object)this.tp())).endOffset());
        this.shouldSupportRequestsForEpochsNotOnTheLeader(fetcher);
    }

    public void shouldSupportRequestsForEpochsNotOnTheLeader(TestFetcherThread fetcher) {
        Map epoch1 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)1))}));
        Assertions.assertEquals((long)1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch1).apply((Object)this.t1p0())).endOffset());
        Map epoch3 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)3))}));
        Assertions.assertEquals((long)2L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch3).apply((Object)this.t1p0())).endOffset());
        Map epoch5 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)5))}));
        Assertions.assertEquals((long)-1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch5).apply((Object)this.t1p0())).endOffset());
    }

    private BlockingSend sender(KafkaServer from, KafkaServer to) {
        Node node = (Node)from.metadataCache().getAliveBrokerNode(to.config().brokerId(), from.config().interBrokerListenerName()).get();
        BrokerEndPoint endPoint = new BrokerEndPoint(node.id(), node.host(), node.port());
        return BrokerBlockingSender$.MODULE$.apply(endPoint, from.config(), new Metrics(), (Time)new SystemTime(), 42, "TestFetcher", new LogContext(), BrokerBlockingSender$.MODULE$.apply$default$8());
    }

    /*
     * WARNING - void declaration
     */
    private void waitForEpochChangeTo(String topic, int partition, int epoch) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$1(this, topic, partition, epoch)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private boolean messagesHaveLeaderEpoch(KafkaServer broker, int expectedLeaderEpoch, int minOffset) {
        BooleanRef result = BooleanRef.create((boolean)true);
        new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).foreach((Function1 & Serializable & scala.Serializable)topic -> {
            LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$1(this, broker, result, minOffset, expectedLeaderEpoch, topic);
            return BoxedUnit.UNIT;
        });
        return result.elem;
    }

    private void sendFourMessagesToEachTopic() {
        .colon.colon testMessageList1 = new .colon.colon((Object)"test1", (List)new .colon.colon((Object)"test2", (List)new .colon.colon((Object)"test3", (List)new .colon.colon((Object)"test4", (List)Nil$.MODULE$))));
        .colon.colon testMessageList2 = new .colon.colon((Object)"test5", (List)new .colon.colon((Object)"test6", (List)new .colon.colon((Object)"test7", (List)new .colon.colon((Object)"test8", (List)Nil$.MODULE$))));
        String x$12 = TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers());
        StringSerializer x$2 = new StringSerializer();
        StringSerializer x$3 = new StringSerializer();
        int x$4 = TestUtils$.MODULE$.createProducer$default$2();
        long x$5 = TestUtils$.MODULE$.createProducer$default$3();
        long x$62 = TestUtils$.MODULE$.createProducer$default$4();
        int x$7 = TestUtils$.MODULE$.createProducer$default$5();
        int x$8 = TestUtils$.MODULE$.createProducer$default$6();
        int x$9 = TestUtils$.MODULE$.createProducer$default$7();
        int x$10 = TestUtils$.MODULE$.createProducer$default$8();
        String x$11 = TestUtils$.MODULE$.createProducer$default$9();
        int x$122 = TestUtils$.MODULE$.createProducer$default$10();
        SecurityProtocol x$13 = TestUtils$.MODULE$.createProducer$default$11();
        Option<File> x$14 = TestUtils$.MODULE$.createProducer$default$12();
        Option<Properties> x$15 = TestUtils$.MODULE$.createProducer$default$13();
        boolean x$16 = TestUtils$.MODULE$.createProducer$default$16();
        KafkaProducer producer = TestUtils$.MODULE$.createProducer(x$12, x$4, x$5, x$62, x$7, x$8, x$9, x$10, x$11, x$122, x$13, x$14, x$15, x$2, x$3, x$16);
        ((List)((List)((List)testMessageList1.map((Function1 & Serializable & scala.Serializable)m -> new ProducerRecord(this.topic1(), m, m), List$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)testMessageList2.map((Function1 & Serializable & scala.Serializable)m -> new ProducerRecord(this.topic2(), m, m), List$.MODULE$.canBuildFrom()), List$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x$1 -> producer.send(x$1), List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$6 -> (RecordMetadata)x$6.get());
        producer.close();
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4() {
        return "Leader epoch should be 0";
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6() {
        return "Leader epoch should be 1";
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(LeaderEpochIntegrationTest $this, int x$1) {
        return $this.producer().send(new ProducerRecord($this.topic1(), Predef$.MODULE$.int2Integer(0), null, (Object)"IHeartLogs".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(LeaderEpochIntegrationTest $this, int x$2) {
        return $this.producer().send(new ProducerRecord($this.topic1(), Predef$.MODULE$.int2Integer(1), null, (Object)"OhAreThey".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(LeaderEpochIntegrationTest $this, int x$3) {
        return $this.producer().send(new ProducerRecord($this.topic2(), Predef$.MODULE$.int2Integer(0), null, (Object)"IReallyDo".getBytes()));
    }

    private final long leo$1() {
        return ((AbstractLog)((KafkaServer)this.brokers().apply(1)).replicaManager().localLog(this.tp()).get()).logEndOffset();
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$2(int epoch$1, UpdateMetadataRequestData.UpdateMetadataPartitionState x$4) {
        return x$4.leaderEpoch() == epoch$1;
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$1(LeaderEpochIntegrationTest $this, String topic$1, int partition$1, int epoch$1) {
        return ((KafkaServer)$this.brokers().apply(0)).metadataCache().getPartitionInfo(topic$1, partition$1).exists((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$2(epoch$1, x$4)));
    }

    public static final /* synthetic */ String $anonfun$waitForEpochChangeTo$3() {
        return "Epoch didn't change";
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$4(int expectedLeaderEpoch$2, RecordBatch x$5) {
        return expectedLeaderEpoch$2 == x$5.partitionLeaderEpoch();
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$3(int minOffset$1, int expectedLeaderEpoch$2, LogSegment segment) {
        if (segment.read((long)minOffset$1, Integer.MAX_VALUE, segment.read$default$3(), segment.read$default$4()) == null) {
            return false;
        }
        return ((Iterator)CollectionConverters$.MODULE$.asScalaIteratorConverter(segment.read((long)minOffset$1, Integer.MAX_VALUE, segment.read$default$3(), segment.read$default$4()).records().batches().iterator()).asScala()).forall((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$4(expectedLeaderEpoch$2, x$5)));
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$2(TopicPartition tp$1, int minOffset$1, int expectedLeaderEpoch$2, KafkaServer broker) {
        LogManager qual$2 = broker.getLogManager();
        boolean x$4 = qual$2.getLog$default$2();
        return ((AbstractLog)qual$2.getLog(tp$1, x$4).get()).localLogSegments().iterator().forall((Function1 & Serializable & scala.Serializable)segment -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$3(minOffset$1, expectedLeaderEpoch$2, segment)));
    }

    public static final /* synthetic */ void $anonfun$messagesHaveLeaderEpoch$1(LeaderEpochIntegrationTest $this, KafkaServer broker$1, BooleanRef result$1, int minOffset$1, int expectedLeaderEpoch$2, String topic) {
        TopicPartition tp = new TopicPartition(topic, 0);
        LogManager qual$1 = broker$1.getLogManager();
        boolean x$2 = qual$1.getLog$default$2();
        long leo = ((AbstractLog)qual$1.getLog(tp, x$2).get()).logEndOffset();
        result$1.elem = result$1.elem && leo > 0L && $this.brokers().forall((Function1 & Serializable & scala.Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$2(tp, minOffset$1, expectedLeaderEpoch$2, broker)));
    }

    public LeaderEpochIntegrationTest() {
        this.topic1 = "foo";
        this.topic2 = "bar";
    }

    public class TestFetcherThread
    implements Logging {
        private final BlockingSend sender;
        private Logger logger;
        private String logIdent;
        private volatile boolean bitmap$0;
        public final /* synthetic */ LeaderEpochIntegrationTest $outer;

        public String loggerName() {
            return Logging.loggerName$((Logging)this);
        }

        public String msgWithLogIdent(String msg) {
            return Logging.msgWithLogIdent$((Logging)this, (String)msg);
        }

        public void trace(Function0<String> msg) {
            Logging.trace$((Logging)this, msg);
        }

        public void trace(Function0<String> msg, Function0<Throwable> e) {
            Logging.trace$((Logging)this, msg, e);
        }

        public boolean isDebugEnabled() {
            return Logging.isDebugEnabled$((Logging)this);
        }

        public boolean isTraceEnabled() {
            return Logging.isTraceEnabled$((Logging)this);
        }

        public void debug(Function0<String> msg) {
            Logging.debug$((Logging)this, msg);
        }

        public void debug(Function0<String> msg, Function0<Throwable> e) {
            Logging.debug$((Logging)this, msg, e);
        }

        public void info(Function0<String> msg) {
            Logging.info$((Logging)this, msg);
        }

        public void info(Function0<String> msg, Function0<Throwable> e) {
            Logging.info$((Logging)this, msg, e);
        }

        public void warn(Function0<String> msg) {
            Logging.warn$((Logging)this, msg);
        }

        public void warn(Function0<String> msg, Function0<Throwable> e) {
            Logging.warn$((Logging)this, msg, e);
        }

        public void error(Function0<String> msg) {
            Logging.error$((Logging)this, msg);
        }

        public void error(Function0<String> msg, Function0<Throwable> e) {
            Logging.error$((Logging)this, msg, e);
        }

        public void fatal(Function0<String> msg) {
            Logging.fatal$((Logging)this, msg);
        }

        public void fatal(Function0<String> msg, Function0<Throwable> e) {
            Logging.fatal$((Logging)this, msg, e);
        }

        private Logger logger$lzycompute() {
            synchronized (this) {
                if (!this.bitmap$0) {
                    this.logger = Logging.logger$((Logging)this);
                    this.bitmap$0 = true;
                }
            }
            return this.logger;
        }

        public Logger logger() {
            if (!this.bitmap$0) {
                return this.logger$lzycompute();
            }
            return this.logger;
        }

        public String logIdent() {
            return this.logIdent;
        }

        public void logIdent_$eq(String x$1) {
            this.logIdent = x$1;
        }

        /*
         * WARNING - void declaration
         */
        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> leaderOffsetsFor(Map<TopicPartition, Object> partitions) {
            void forKeyValue$extension_f;
            void forKeyValue$extension_$this;
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection topics = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection(partitions.size());
            Function2 & Serializable & scala.Serializable intersect = (Function2 & Serializable & scala.Serializable)(topicPartition, leaderEpoch) -> BoxesRunTime.boxToBoolean((boolean)TestFetcherThread.$anonfun$leaderOffsetsFor$1(topics, topicPartition, BoxesRunTime.unboxToInt((Object)leaderEpoch)));
            Map map = Implicits$.MODULE$.MapExtensionMethods(partitions);
            if (Implicits.MapExtensionMethods$.MODULE$ == null) {
                throw null;
            }
            MapExtensionMethods$.MODULE$.foreachEntry$extension(package$.MODULE$.toMapExtensionMethods((Map)forKeyValue$extension_$this), (arg_0, arg_1) -> Implicits.MapExtensionMethods$.$anonfun$forKeyValue$1((Function2)forKeyValue$extension_f, arg_0, arg_1));
            OffsetsForLeaderEpochRequest.Builder request = OffsetsForLeaderEpochRequest.Builder.forFollower((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection)topics, (int)1);
            ClientResponse response = this.sender.sendRequest((AbstractRequest.Builder)request);
            return ((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)((OffsetsForLeaderEpochResponse)response.responseBody()).data().topics()).asScala()).flatMap((Function1 & Serializable & scala.Serializable)topic -> (Buffer)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(topic.partitions()).asScala()).map((Function1 & Serializable & scala.Serializable)partition -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic.topic(), partition.partition())), partition), Buffer$.MODULE$.canBuildFrom()), Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        }

        public /* synthetic */ LeaderEpochIntegrationTest kafka$server$epoch$LeaderEpochIntegrationTest$TestFetcherThread$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ boolean $anonfun$leaderOffsetsFor$1(OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection topics$1, TopicPartition topicPartition, int leaderEpoch) {
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic = topics$1.find(topicPartition.topic());
            if (topic == null) {
                topic = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic().setTopic(topicPartition.topic());
                topics$1.add((ImplicitLinkedHashCollection.Element)topic);
            }
            return topic.partitions().add(new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setLeaderEpoch(leaderEpoch));
        }

        public TestFetcherThread(LeaderEpochIntegrationTest $outer, BlockingSend sender) {
            this.sender = sender;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            Logging.$init$((Logging)this);
        }
    }
}

