/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.epoch;

import com.typesafe.scalalogging.Logger;
import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.cluster.BrokerEndPoint;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.log.LogSegment;
import kafka.server.BlockingSend;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.ReplicaFetcherBlockingSend$;
import kafka.utils.Implicits;
import kafka.utils.Implicits$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.compat.MapExtensionMethods$;
import scala.collection.compat.package$;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\u0005=h\u0001B\u0013'\u00015BQA\u000f\u0001\u0005\u0002mBqA\u0010\u0001A\u0002\u0013\u0005q\bC\u0004O\u0001\u0001\u0007I\u0011A(\t\rY\u0003\u0001\u0015)\u0003A\u0011\u001d9\u0006A1A\u0005\u0002aCa!\u0019\u0001!\u0002\u0013I\u0006b\u00022\u0001\u0005\u0004%\t\u0001\u0017\u0005\u0007G\u0002\u0001\u000b\u0011B-\t\u000f\u0011\u0004!\u0019!C\u0001K\"1\u0011\u000f\u0001Q\u0001\n\u0019DqA\u001d\u0001C\u0002\u0013\u0005Q\r\u0003\u0004t\u0001\u0001\u0006IA\u001a\u0005\bi\u0002\u0011\r\u0011\"\u0001f\u0011\u0019)\b\u0001)A\u0005M\"9a\u000f\u0001b\u0001\n\u0003)\u0007BB<\u0001A\u0003%a\rC\u0004y\u0001\t\u0007I\u0011A3\t\re\u0004\u0001\u0015!\u0003g\u0011\u001dQ\bA1A\u0005\u0002\u0015Daa\u001f\u0001!\u0002\u00131\u0007b\u0002?\u0001\u0001\u0004%\t! \u0005\n\u0003/\u0001\u0001\u0019!C\u0001\u00033Aq!!\b\u0001A\u0003&a\u0010C\u0004\u0002 \u0001!\t%!\t\t\u000f\u0005e\u0002\u0001\"\u0001\u0002\"!9\u00111\t\u0001\u0005\u0002\u0005\u0005\u0002bBA$\u0001\u0011\u0005\u0011\u0011\u0005\u0005\b\u0003\u0017\u0002A\u0011AA'\u0011\u001d\t\t\u0007\u0001C\u0005\u0003cCq!a/\u0001\t\u0013\ti\fC\u0004\u0002X\u0002!I!!7\t\u000f\u00055\b\u0001\"\u0003\u0002\"\u00199\u0011q\u000b\u0001\u0001M\u0005e\u0003BCA1C\t\u0005\t\u0015!\u0003\u0002d!1!(\tC\u0001\u0003SBq!!\u001c\"\t\u0003\tyG\u0001\u000eMK\u0006$WM]#q_\u000eD\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002(Q\u0005)Q\r]8dQ*\u0011\u0011FK\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003-\nQa[1gW\u0006\u001c\u0001aE\u0002\u0001]Q\u0002\"a\f\u001a\u000e\u0003AR!!\r\u0016\u0002\u0005i\\\u0017BA\u001a1\u0005QQvn\\&fKB,'\u000fV3ti\"\u000b'O\\3tgB\u0011Q\u0007O\u0007\u0002m)\u0011qGK\u0001\u0006kRLGn]\u0005\u0003sY\u0012q\u0001T8hO&tw-\u0001\u0004=S:LGO\u0010\u000b\u0002yA\u0011Q\bA\u0007\u0002M\u00059!M]8lKJ\u001cX#\u0001!\u0011\u0007\u0005C%*D\u0001C\u0015\t\u0019E)A\u0004nkR\f'\r\\3\u000b\u0005\u00153\u0015AC2pY2,7\r^5p]*\tq)A\u0003tG\u0006d\u0017-\u0003\u0002J\u0005\nQA*[:u\u0005V4g-\u001a:\u0011\u0005-cU\"\u0001\u0015\n\u00055C#aC&bM.\f7+\u001a:wKJ\f1B\u0019:pW\u0016\u00148o\u0018\u0013fcR\u0011\u0001\u000b\u0016\t\u0003#Jk\u0011AR\u0005\u0003'\u001a\u0013A!\u00168ji\"9QkAA\u0001\u0002\u0004\u0001\u0015a\u0001=%c\u0005A!M]8lKJ\u001c\b%\u0001\u0004u_BL7-M\u000b\u00023B\u0011!lX\u0007\u00027*\u0011A,X\u0001\u0005Y\u0006twMC\u0001_\u0003\u0011Q\u0017M^1\n\u0005\u0001\\&AB*ue&tw-A\u0004u_BL7-\r\u0011\u0002\rQ|\u0007/[23\u0003\u001d!x\u000e]5de\u0001\nA\u0001^\u0019qaU\ta\r\u0005\u0002h_6\t\u0001N\u0003\u0002jU\u000611m\\7n_:T!aK6\u000b\u00051l\u0017AB1qC\u000eDWMC\u0001o\u0003\ry'oZ\u0005\u0003a\"\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.A\u0003ucA\u0004\u0004%\u0001\u0003ucA\f\u0014!\u0002;2aF\u0002\u0013\u0001\u0002;2aJ\nQ\u0001^\u0019qe\u0001\nA\u0001\u001e\u001aqa\u0005)AO\r91A\u0005!AO\r93\u0003\u0015!(\u0007\u001d\u001a!\u0003\t!\b/A\u0002ua\u0002\n\u0001\u0002\u001d:pIV\u001cWM]\u000b\u0002}B9q0a\u0002\u0002\f\u0005-QBAA\u0001\u0015\ra\u00181\u0001\u0006\u0004\u0003\u000bQ\u0017aB2mS\u0016tGo]\u0005\u0005\u0003\u0013\t\tAA\u0007LC\u001a\\\u0017\r\u0015:pIV\u001cWM\u001d\t\u0006#\u00065\u0011\u0011C\u0005\u0004\u0003\u001f1%!B!se\u0006L\bcA)\u0002\u0014%\u0019\u0011Q\u0003$\u0003\t\tKH/Z\u0001\raJ|G-^2fe~#S-\u001d\u000b\u0004!\u0006m\u0001bB+\u0017\u0003\u0003\u0005\rA`\u0001\naJ|G-^2fe\u0002\n\u0001\u0002^3be\u0012{wO\u001c\u000b\u0002!\"\u001a\u0001$!\n\u0011\t\u0005\u001d\u0012QG\u0007\u0003\u0003SQA!a\u000b\u0002.\u0005\u0019\u0011\r]5\u000b\t\u0005=\u0012\u0011G\u0001\bUV\u0004\u0018\u000e^3s\u0015\r\t\u0019$\\\u0001\u0006UVt\u0017\u000e^\u0005\u0005\u0003o\tICA\u0005BMR,'/R1dQ\u0006i4\u000f[8vY\u0012\fE\rZ\"veJ,g\u000e\u001e'fC\u0012,'/\u00129pG\"$v.T3tg\u0006<Wm]!t)\",\u00170\u0011:f/JLG\u000f^3o)>dU-\u00193fe\"\u001a\u0011$!\u0010\u0011\t\u0005\u001d\u0012qH\u0005\u0005\u0003\u0003\nIC\u0001\u0003UKN$\u0018aK:i_VdGmU3oI2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f^!oI\u001e+G/\u0011*fgB|gn]3)\u0007i\ti$\u0001\u0018tQ>,H\u000eZ%oGJ,\u0017m]3MK\u0006$WM]#q_\u000eD')\u001a;xK\u0016tG*Z1eKJ\u0014Vm\u001d;beR\u001c\bfA\u000e\u0002>\u0005a3\u000f[8vY\u0012\u001cV\u000f\u001d9peR\u0014V-];fgR\u001chi\u001c:Fa>\u001c\u0007n\u001d(pi>sG\u000b[3MK\u0006$WM\u001d\u000b\u0004!\u0006=\u0003bBA)9\u0001\u0007\u00111K\u0001\bM\u0016$8\r[3s!\r\t)&I\u0007\u0002\u0001\t\tB+Z:u\r\u0016$8\r[3s)\"\u0014X-\u00193\u0014\t\u0005\nY\u0006\u000e\t\u0004#\u0006u\u0013bAA0\r\n1\u0011I\\=SK\u001a\faa]3oI\u0016\u0014\bcA&\u0002f%\u0019\u0011q\r\u0015\u0003\u0019\tcwnY6j]\u001e\u001cVM\u001c3\u0015\t\u0005M\u00131\u000e\u0005\b\u0003C\u001a\u0003\u0019AA2\u0003AaW-\u00193fe>3gm]3ug\u001a{'\u000f\u0006\u0003\u0002r\u0005\u0015\u0006cBA:\u0003k2\u0017\u0011P\u0007\u0002\t&\u0019\u0011q\u000f#\u0003\u00075\u000b\u0007\u000f\u0005\u0003\u0002|\u0005}e\u0002BA?\u00033sA!a \u0002\u0016:!\u0011\u0011QAJ\u001d\u0011\t\u0019)!%\u000f\t\u0005\u0015\u0015q\u0012\b\u0005\u0003\u000f\u000bi)\u0004\u0002\u0002\n*\u0019\u00111\u0012\u0017\u0002\rq\u0012xn\u001c;?\u0013\u0005q\u0017B\u00017n\u0013\tY3.\u0003\u0002jU&\u0019\u0011q\u00135\u0002\u000f5,7o]1hK&!\u00111TAO\u0003\u0001zeMZ:fi\u001a{'\u000fT3bI\u0016\u0014X\t]8dQJ+7\u000f]8og\u0016$\u0015\r^1\u000b\u0007\u0005]\u0005.\u0003\u0003\u0002\"\u0006\r&AD#q_\u000eDWI\u001c3PM\u001a\u001cX\r\u001e\u0006\u0005\u00037\u000bi\nC\u0004\u0002(\u0012\u0002\r!!+\u0002\u0015A\f'\u000f^5uS>t7\u000fE\u0004\u0002t\u0005Ud-a+\u0011\u0007E\u000bi+C\u0002\u00020\u001a\u00131!\u00138u)\u0019\t\u0019'a-\u00028\"1\u0011QW\u000fA\u0002)\u000bAA\u001a:p[\"1\u0011\u0011X\u000fA\u0002)\u000b!\u0001^8\u0002)]\f\u0017\u000e\u001e$pe\u0016\u0003xn\u00195DQ\u0006tw-\u001a+p)\u001d\u0001\u0016qXAi\u0003+Dq!!1\u001f\u0001\u0004\t\u0019-A\u0003u_BL7\r\u0005\u0003\u0002F\u00065g\u0002BAd\u0003\u0013\u00042!a\"G\u0013\r\tYMR\u0001\u0007!J,G-\u001a4\n\u0007\u0001\fyMC\u0002\u0002L\u001aCq!a5\u001f\u0001\u0004\tY+A\u0005qCJ$\u0018\u000e^5p]\"1qE\ba\u0001\u0003W\u000bq#\\3tg\u0006<Wm\u001d%bm\u0016dU-\u00193fe\u0016\u0003xn\u00195\u0015\u0011\u0005m\u0017\u0011]As\u0003S\u00042!UAo\u0013\r\tyN\u0012\u0002\b\u0005>|G.Z1o\u0011\u0019\t\u0019o\ba\u0001\u0015\u00061!M]8lKJDq!a: \u0001\u0004\tY+A\nfqB,7\r^3e\u0019\u0016\fG-\u001a:Fa>\u001c\u0007\u000eC\u0004\u0002l~\u0001\r!a+\u0002\u00135Lgn\u00144gg\u0016$\u0018aG:f]\u00124u.\u001e:NKN\u001c\u0018mZ3t)>,\u0015m\u00195U_BL7\r")
public class LeaderEpochIntegrationTest
extends ZooKeeperTestHarness {
    private ListBuffer<KafkaServer> brokers = (ListBuffer)ListBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final String topic1;
    private final String topic2;
    private final TopicPartition t1p0 = new TopicPartition(this.topic1(), 0);
    private final TopicPartition t1p1 = new TopicPartition(this.topic1(), 1);
    private final TopicPartition t1p2 = new TopicPartition(this.topic1(), 2);
    private final TopicPartition t2p0 = new TopicPartition(this.topic2(), 0);
    private final TopicPartition t2p2 = new TopicPartition(this.topic2(), 2);
    private final TopicPartition tp = this.t1p0();
    private KafkaProducer<byte[], byte[]> producer = null;

    public ListBuffer<KafkaServer> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(ListBuffer<KafkaServer> x$1) {
        this.brokers = x$1;
    }

    public String topic1() {
        return this.topic1;
    }

    public String topic2() {
        return this.topic2;
    }

    public TopicPartition t1p0() {
        return this.t1p0;
    }

    public TopicPartition t1p1() {
        return this.t1p1;
    }

    public TopicPartition t1p2() {
        return this.t1p2;
    }

    public TopicPartition t2p0() {
        return this.t2p0;
    }

    public TopicPartition t2p2() {
        return this.t2p2;
    }

    public TopicPartition tp() {
        return this.tp;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    @Override
    @AfterEach
    public void tearDown() {
        if (this.producer() != null) {
            this.producer().close();
        }
        TestUtils$.MODULE$.shutdownServers((Seq<KafkaServer>)this.brokers());
        super.tearDown();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
        this.brokers().$plus$plus$eq((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom()));
        new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).foreach((Function1 & Serializable & scala.Serializable)topic -> TestUtils$.MODULE$.createTopic(this.zkClient(), (String)topic, (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))}))), (Seq<KafkaServer>)this.brokers()));
        this.sendFourMessagesToEachTopic();
        IntRef expectedLeaderEpoch = IntRef.create((int)0);
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!this.messagesHaveLeaderEpoch((KafkaServer)this.brokers().apply(0), expectedLeaderEpoch.elem, 0)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LeaderEpochIntegrationTest.$anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        ((KafkaServer)this.brokers().apply(0)).startup();
        expectedLeaderEpoch.elem = 1;
        this.waitForEpochChangeTo(this.topic1(), 0, expectedLeaderEpoch.elem);
        this.waitForEpochChangeTo(this.topic2(), 0, expectedLeaderEpoch.elem);
        this.sendFourMessagesToEachTopic();
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!this.messagesHaveLeaderEpoch((KafkaServer)this.brokers().apply(0), expectedLeaderEpoch.elem, 4)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LeaderEpochIntegrationTest.$anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    @Test
    public void shouldSendLeaderEpochRequestAndGetAResponse() {
        this.brokers().$plus$plus$eq((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 102).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom()));
        Map assignment1 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{101})))}));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic1(), (Map<Object, Seq<Object>>)assignment1, (Seq<KafkaServer>)this.brokers());
        Map assignment2 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100})))}));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic2(), (Map<Object, Seq<Object>>)assignment2, (Seq<KafkaServer>)this.brokers());
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach((Function1 & Serializable & scala.Serializable)x$1 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(this, BoxesRunTime.unboxToInt((Object)x$1)));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 20).foreach((Function1 & Serializable & scala.Serializable)x$2 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(this, BoxesRunTime.unboxToInt((Object)x$2)));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 30).foreach((Function1 & Serializable & scala.Serializable)x$3 -> LeaderEpochIntegrationTest.$anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(this, BoxesRunTime.unboxToInt((Object)x$3)));
        this.producer().flush();
        TestFetcherThread fetcher0 = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(2), (KafkaServer)this.brokers().apply(0)));
        Map epochsRequested = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p0()), (Object)BoxesRunTime.boxToInteger((int)0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p2()), (Object)BoxesRunTime.boxToInteger((int)0))}));
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> offsetsForEpochs = fetcher0.leaderOffsetsFor((Map<TopicPartition, Object>)epochsRequested);
        Assertions.assertEquals((long)10L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p0())).endOffset());
        Assertions.assertEquals((long)30L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t2p0())).endOffset());
        Assertions.assertEquals((short)Errors.NOT_LEADER_OR_FOLLOWER.code(), (short)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p1())).errorCode());
        Assertions.assertEquals((long)-1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs.apply((Object)this.t1p1())).endOffset());
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> offsetsForEpochs1 = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(2), (KafkaServer)this.brokers().apply(1))).leaderOffsetsFor((Map<TopicPartition, Object>)epochsRequested);
        Assertions.assertEquals((long)20L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)offsetsForEpochs1.apply((Object)this.t1p1())).endOffset());
    }

    @Test
    public void shouldIncreaseLeaderEpochBetweenLeaderRestarts() {
        this.brokers().$plus$eq((Object)TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(100, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()));
        Assertions.assertEquals((int)100, (int)TestUtils$.MODULE$.waitUntilControllerElected(this.zkClient(), TestUtils$.MODULE$.waitUntilControllerElected$default$2()));
        this.brokers().$plus$eq((Object)TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(101, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.tp().topic(), (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)this.tp().partition())), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{101})))}))), (Seq<KafkaServer>)this.brokers());
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        TestFetcherThread fetcher = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1)));
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((long)1L, (long)this.leo$1());
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        fetcher = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1)));
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)1))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)0, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)1L, (long)epochEndOffset.endOffset());
        epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)2))})))).apply((Object)this.tp());
        Assertions.assertEquals((int)2, (int)epochEndOffset.leaderEpoch());
        Assertions.assertEquals((long)2L, (long)epochEndOffset.endOffset());
        Assertions.assertEquals((long)2L, (long)this.leo$1());
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.producer().send(new ProducerRecord(this.tp().topic(), Predef$.MODULE$.int2Integer(this.tp().partition()), null, (Object)"IHeartLogs".getBytes())).get();
        fetcher = new TestFetcherThread(this, this.sender((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1)));
        Assertions.assertEquals((long)1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)0))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)2L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)2))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)3L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)4))})))).apply((Object)this.tp())).endOffset());
        Assertions.assertEquals((long)this.leo$1(), (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.tp()), (Object)BoxesRunTime.boxToInteger((int)4))})))).apply((Object)this.tp())).endOffset());
        this.shouldSupportRequestsForEpochsNotOnTheLeader(fetcher);
    }

    public void shouldSupportRequestsForEpochsNotOnTheLeader(TestFetcherThread fetcher) {
        Map epoch1 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)1))}));
        Assertions.assertEquals((long)1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch1).apply((Object)this.t1p0())).endOffset());
        Map epoch3 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)3))}));
        Assertions.assertEquals((long)2L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch3).apply((Object)this.t1p0())).endOffset());
        Map epoch5 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)BoxesRunTime.boxToInteger((int)5))}));
        Assertions.assertEquals((long)-1L, (long)((OffsetForLeaderEpochResponseData.EpochEndOffset)fetcher.leaderOffsetsFor((Map<TopicPartition, Object>)epoch5).apply((Object)this.t1p0())).endOffset());
    }

    private BlockingSend sender(KafkaServer from, KafkaServer to) {
        Node node = (Node)from.metadataCache().getAliveBrokerNode(to.config().brokerId(), from.config().interBrokerListenerName()).get();
        BrokerEndPoint endPoint = new BrokerEndPoint(node.id(), node.host(), node.port());
        return ReplicaFetcherBlockingSend$.MODULE$.apply(endPoint, from.config(), new Metrics(), (Time)new SystemTime(), 42, "TestFetcher", new LogContext(), ReplicaFetcherBlockingSend$.MODULE$.apply$default$8());
    }

    /*
     * WARNING - void declaration
     */
    private void waitForEpochChangeTo(String topic, int partition, int epoch) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$1(this, topic, partition, epoch)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private boolean messagesHaveLeaderEpoch(KafkaServer broker, int expectedLeaderEpoch, int minOffset) {
        BooleanRef result = BooleanRef.create((boolean)true);
        new .colon.colon((Object)this.topic1(), (List)new .colon.colon((Object)this.topic2(), (List)Nil$.MODULE$)).foreach((Function1 & Serializable & scala.Serializable)topic -> {
            LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$1(this, broker, result, minOffset, expectedLeaderEpoch, topic);
            return BoxedUnit.UNIT;
        });
        return result.elem;
    }

    private void sendFourMessagesToEachTopic() {
        .colon.colon testMessageList1 = new .colon.colon((Object)"test1", (List)new .colon.colon((Object)"test2", (List)new .colon.colon((Object)"test3", (List)new .colon.colon((Object)"test4", (List)Nil$.MODULE$))));
        .colon.colon testMessageList2 = new .colon.colon((Object)"test5", (List)new .colon.colon((Object)"test6", (List)new .colon.colon((Object)"test7", (List)new .colon.colon((Object)"test8", (List)Nil$.MODULE$))));
        String x$12 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        StringSerializer x$2 = new StringSerializer();
        StringSerializer x$3 = new StringSerializer();
        int x$4 = TestUtils$.MODULE$.createProducer$default$2();
        long x$5 = TestUtils$.MODULE$.createProducer$default$3();
        long x$62 = TestUtils$.MODULE$.createProducer$default$4();
        int x$7 = TestUtils$.MODULE$.createProducer$default$5();
        int x$8 = TestUtils$.MODULE$.createProducer$default$6();
        int x$9 = TestUtils$.MODULE$.createProducer$default$7();
        int x$10 = TestUtils$.MODULE$.createProducer$default$8();
        String x$11 = TestUtils$.MODULE$.createProducer$default$9();
        int x$122 = TestUtils$.MODULE$.createProducer$default$10();
        SecurityProtocol x$13 = TestUtils$.MODULE$.createProducer$default$11();
        Option<File> x$14 = TestUtils$.MODULE$.createProducer$default$12();
        Option<Properties> x$15 = TestUtils$.MODULE$.createProducer$default$13();
        boolean x$16 = TestUtils$.MODULE$.createProducer$default$16();
        KafkaProducer producer = TestUtils$.MODULE$.createProducer(x$12, x$4, x$5, x$62, x$7, x$8, x$9, x$10, x$11, x$122, x$13, x$14, x$15, x$2, x$3, x$16);
        ((List)((List)((List)testMessageList1.map((Function1 & Serializable & scala.Serializable)m -> new ProducerRecord(this.topic1(), m, m), List$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce)testMessageList2.map((Function1 & Serializable & scala.Serializable)m -> new ProducerRecord(this.topic2(), m, m), List$.MODULE$.canBuildFrom()), List$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x$1 -> producer.send(x$1), List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$6 -> (RecordMetadata)x$6.get());
        producer.close();
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4() {
        return "Leader epoch should be 0";
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6() {
        return "Leader epoch should be 1";
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(LeaderEpochIntegrationTest $this, int x$1) {
        return $this.producer().send(new ProducerRecord($this.topic1(), Predef$.MODULE$.int2Integer(0), null, (Object)"IHeartLogs".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(LeaderEpochIntegrationTest $this, int x$2) {
        return $this.producer().send(new ProducerRecord($this.topic1(), Predef$.MODULE$.int2Integer(1), null, (Object)"OhAreThey".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(LeaderEpochIntegrationTest $this, int x$3) {
        return $this.producer().send(new ProducerRecord($this.topic2(), Predef$.MODULE$.int2Integer(0), null, (Object)"IReallyDo".getBytes()));
    }

    private final long leo$1() {
        return ((AbstractLog)((KafkaServer)this.brokers().apply(1)).replicaManager().localLog(this.tp()).get()).logEndOffset();
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$2(int epoch$1, UpdateMetadataRequestData.UpdateMetadataPartitionState x$4) {
        return x$4.leaderEpoch() == epoch$1;
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$1(LeaderEpochIntegrationTest $this, String topic$1, int partition$1, int epoch$1) {
        return ((KafkaServer)$this.brokers().apply(0)).metadataCache().getPartitionInfo(topic$1, partition$1).exists((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$waitForEpochChangeTo$2(epoch$1, x$4)));
    }

    public static final /* synthetic */ String $anonfun$waitForEpochChangeTo$3() {
        return "Epoch didn't change";
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$4(int expectedLeaderEpoch$2, RecordBatch x$5) {
        return expectedLeaderEpoch$2 == x$5.partitionLeaderEpoch();
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$3(int minOffset$1, int expectedLeaderEpoch$2, LogSegment segment) {
        if (segment.read((long)minOffset$1, Integer.MAX_VALUE, segment.read$default$3(), segment.read$default$4()) == null) {
            return false;
        }
        return ((Iterator)CollectionConverters$.MODULE$.asScalaIteratorConverter(segment.read((long)minOffset$1, Integer.MAX_VALUE, segment.read$default$3(), segment.read$default$4()).records().batches().iterator()).asScala()).forall((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$4(expectedLeaderEpoch$2, x$5)));
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$2(TopicPartition tp$1, int minOffset$1, int expectedLeaderEpoch$2, KafkaServer broker) {
        LogManager qual$2 = broker.getLogManager();
        boolean x$4 = qual$2.getLog$default$2();
        return ((AbstractLog)qual$2.getLog(tp$1, x$4).get()).localLogSegments().iterator().forall((Function1 & Serializable & scala.Serializable)segment -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$3(minOffset$1, expectedLeaderEpoch$2, segment)));
    }

    public static final /* synthetic */ void $anonfun$messagesHaveLeaderEpoch$1(LeaderEpochIntegrationTest $this, KafkaServer broker$1, BooleanRef result$1, int minOffset$1, int expectedLeaderEpoch$2, String topic) {
        TopicPartition tp = new TopicPartition(topic, 0);
        LogManager qual$1 = broker$1.getLogManager();
        boolean x$2 = qual$1.getLog$default$2();
        long leo = ((AbstractLog)qual$1.getLog(tp, x$2).get()).logEndOffset();
        result$1.elem = result$1.elem && leo > 0L && $this.brokers().forall((Function1 & Serializable & scala.Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)LeaderEpochIntegrationTest.$anonfun$messagesHaveLeaderEpoch$2(tp, minOffset$1, expectedLeaderEpoch$2, broker)));
    }

    public LeaderEpochIntegrationTest() {
        this.topic1 = "foo";
        this.topic2 = "bar";
    }

    public class TestFetcherThread
    implements Logging {
        private final BlockingSend sender;
        private Logger logger;
        private String logIdent;
        private volatile boolean bitmap$0;
        public final /* synthetic */ LeaderEpochIntegrationTest $outer;

        public String loggerName() {
            return Logging.loggerName$((Logging)this);
        }

        public String msgWithLogIdent(String msg) {
            return Logging.msgWithLogIdent$((Logging)this, (String)msg);
        }

        public void trace(Function0<String> msg) {
            Logging.trace$((Logging)this, msg);
        }

        public void trace(Function0<String> msg, Function0<Throwable> e) {
            Logging.trace$((Logging)this, msg, e);
        }

        public boolean isDebugEnabled() {
            return Logging.isDebugEnabled$((Logging)this);
        }

        public boolean isTraceEnabled() {
            return Logging.isTraceEnabled$((Logging)this);
        }

        public void debug(Function0<String> msg) {
            Logging.debug$((Logging)this, msg);
        }

        public void debug(Function0<String> msg, Function0<Throwable> e) {
            Logging.debug$((Logging)this, msg, e);
        }

        public void info(Function0<String> msg) {
            Logging.info$((Logging)this, msg);
        }

        public void info(Function0<String> msg, Function0<Throwable> e) {
            Logging.info$((Logging)this, msg, e);
        }

        public void warn(Function0<String> msg) {
            Logging.warn$((Logging)this, msg);
        }

        public void warn(Function0<String> msg, Function0<Throwable> e) {
            Logging.warn$((Logging)this, msg, e);
        }

        public void error(Function0<String> msg) {
            Logging.error$((Logging)this, msg);
        }

        public void error(Function0<String> msg, Function0<Throwable> e) {
            Logging.error$((Logging)this, msg, e);
        }

        public void fatal(Function0<String> msg) {
            Logging.fatal$((Logging)this, msg);
        }

        public void fatal(Function0<String> msg, Function0<Throwable> e) {
            Logging.fatal$((Logging)this, msg, e);
        }

        private Logger logger$lzycompute() {
            synchronized (this) {
                if (!this.bitmap$0) {
                    this.logger = Logging.logger$((Logging)this);
                    this.bitmap$0 = true;
                }
            }
            return this.logger;
        }

        public Logger logger() {
            if (!this.bitmap$0) {
                return this.logger$lzycompute();
            }
            return this.logger;
        }

        public String logIdent() {
            return this.logIdent;
        }

        public void logIdent_$eq(String x$1) {
            this.logIdent = x$1;
        }

        /*
         * WARNING - void declaration
         */
        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> leaderOffsetsFor(Map<TopicPartition, Object> partitions) {
            void forKeyValue$extension_f;
            void forKeyValue$extension_$this;
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection topics = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection(partitions.size());
            Function2 & Serializable & scala.Serializable intersect = (Function2 & Serializable & scala.Serializable)(topicPartition, leaderEpoch) -> BoxesRunTime.boxToBoolean((boolean)TestFetcherThread.$anonfun$leaderOffsetsFor$1(topics, topicPartition, BoxesRunTime.unboxToInt((Object)leaderEpoch)));
            Map map = Implicits$.MODULE$.MapExtensionMethods(partitions);
            if (Implicits.MapExtensionMethods$.MODULE$ == null) {
                throw null;
            }
            MapExtensionMethods$.MODULE$.foreachEntry$extension(package$.MODULE$.toMapExtensionMethods((Map)forKeyValue$extension_$this), (arg_0, arg_1) -> Implicits.MapExtensionMethods$.$anonfun$forKeyValue$1((Function2)forKeyValue$extension_f, arg_0, arg_1));
            OffsetsForLeaderEpochRequest.Builder request = OffsetsForLeaderEpochRequest.Builder.forFollower((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection)topics, (int)1);
            ClientResponse response = this.sender.sendRequest((AbstractRequest.Builder)request);
            return ((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)((OffsetsForLeaderEpochResponse)response.responseBody()).data().topics()).asScala()).flatMap((Function1 & Serializable & scala.Serializable)topic -> (Buffer)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(topic.partitions()).asScala()).map((Function1 & Serializable & scala.Serializable)partition -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic.topic(), partition.partition())), partition), Buffer$.MODULE$.canBuildFrom()), Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        }

        public /* synthetic */ LeaderEpochIntegrationTest kafka$server$epoch$LeaderEpochIntegrationTest$TestFetcherThread$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ boolean $anonfun$leaderOffsetsFor$1(OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection topics$1, TopicPartition topicPartition, int leaderEpoch) {
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic = topics$1.find(topicPartition.topic());
            if (topic == null) {
                topic = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic().setTopic(topicPartition.topic());
                topics$1.add((ImplicitLinkedHashCollection.Element)topic);
            }
            return topic.partitions().add(new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setLeaderEpoch(leaderEpoch));
        }

        public TestFetcherThread(LeaderEpochIntegrationTest $outer, BlockingSend sender) {
            this.sender = sender;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            Logging.$init$((Logging)this);
        }
    }
}

