/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.epoch;

import java.io.File;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.api.KAFKA_0_11_0_IV1$;
import kafka.cluster.Partition;
import kafka.log.Log;
import kafka.log.Log$;
import kafka.log.LogManager;
import kafka.log.LogSegment;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.epoch.EpochEntry;
import kafka.server.epoch.LeaderEpochFileCache;
import kafka.tools.DumpLogSegments$;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.ListBuffer$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\tEf\u0001B\u0001\u0003\u0001%\u0011A&\u00129pG\"$%/\u001b<f]J+\u0007\u000f\\5dCRLwN\u001c)s_R|7m\u001c7BG\u000e,\u0007\u000f^1oG\u0016$Vm\u001d;\u000b\u0005\r!\u0011!B3q_\u000eD'BA\u0003\u0007\u0003\u0019\u0019XM\u001d<fe*\tq!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0007\u0001Q\u0001\u0003\u0005\u0002\f\u001d5\tAB\u0003\u0002\u000e\r\u0005\u0011!p[\u0005\u0003\u001f1\u0011ACW8p\u0017\u0016,\u0007/\u001a:UKN$\b*\u0019:oKN\u001c\bCA\t\u0015\u001b\u0005\u0011\"BA\n\u0007\u0003\u0015)H/\u001b7t\u0013\t)\"CA\u0004M_\u001e<\u0017N\\4\t\u000b]\u0001A\u0011\u0001\r\u0002\rqJg.\u001b;?)\u0005I\u0002C\u0001\u000e\u0001\u001b\u0005\u0011\u0001b\u0002\u000f\u0001\u0005\u0004%\t!H\u0001\u0006i>\u0004\u0018nY\u000b\u0002=A\u0011q\u0004J\u0007\u0002A)\u0011\u0011EI\u0001\u0005Y\u0006twMC\u0001$\u0003\u0011Q\u0017M^1\n\u0005\u0015\u0002#AB*ue&tw\r\u0003\u0004(\u0001\u0001\u0006IAH\u0001\u0007i>\u0004\u0018n\u0019\u0011\t\u000f%\u0002!\u0019!C\u0001U\u0005\u0019Qn]4\u0016\u0003-\u00022\u0001L\u00182\u001b\u0005i#\"\u0001\u0018\u0002\u000bM\u001c\u0017\r\\1\n\u0005Aj#!B!se\u0006L\bC\u0001\u00173\u0013\t\u0019TF\u0001\u0003CsR,\u0007BB\u001b\u0001A\u0003%1&\u0001\u0003ng\u001e\u0004\u0003bB\u001c\u0001\u0005\u0004%\tAK\u0001\n[N<')[4hKJDa!\u000f\u0001!\u0002\u0013Y\u0013AC7tO\nKwmZ3sA!91\b\u0001a\u0001\n\u0003a\u0014a\u00022s_.,'o]\u000b\u0002{A\u0019a(Q\"\u000e\u0003}R!\u0001Q\u0017\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002C\u007f\t\u00191+Z9\u0011\u0005\u0011+U\"\u0001\u0003\n\u0005\u0019#!aC&bM.\f7+\u001a:wKJDq\u0001\u0013\u0001A\u0002\u0013\u0005\u0011*A\u0006ce>\\WM]:`I\u0015\fHC\u0001&N!\ta3*\u0003\u0002M[\t!QK\\5u\u0011\u001dqu)!AA\u0002u\n1\u0001\u001f\u00132\u0011\u0019\u0001\u0006\u0001)Q\u0005{\u0005A!M]8lKJ\u001c\b\u0005C\u0004S\u0001\u0001\u0007I\u0011A*\u0002\u0011A\u0014x\u000eZ;dKJ,\u0012\u0001\u0016\t\u0005+z[3&D\u0001W\u0015\t\u0011vK\u0003\u0002Y3\u000691\r\\5f]R\u001c(BA\u0004[\u0015\tYF,\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002;\u0006\u0019qN]4\n\u0005}3&!D&bM.\f\u0007K]8ek\u000e,'\u000fC\u0004b\u0001\u0001\u0007I\u0011\u00012\u0002\u0019A\u0014x\u000eZ;dKJ|F%Z9\u0015\u0005)\u001b\u0007b\u0002(a\u0003\u0003\u0005\r\u0001\u0016\u0005\u0007K\u0002\u0001\u000b\u0015\u0002+\u0002\u0013A\u0014x\u000eZ;dKJ\u0004\u0003bB4\u0001\u0001\u0004%\t\u0001[\u0001\tG>t7/^7feV\t\u0011\u000e\u0005\u0003kY.ZS\"A6\u000b\u0005\u001d<\u0016BA7l\u00055Y\u0015MZ6b\u0007>t7/^7fe\"9q\u000e\u0001a\u0001\n\u0003\u0001\u0018\u0001D2p]N,X.\u001a:`I\u0015\fHC\u0001&r\u0011\u001dqe.!AA\u0002%Daa\u001d\u0001!B\u0013I\u0017!C2p]N,X.\u001a:!\u0011\u001d)\bA1A\u0005\u0002Y\fqbS%Q?F\u0002\u0014gX#O\u0003\ncU\tR\u000b\u0002oB\u0011A\u0006_\u0005\u0003s6\u0012qAQ8pY\u0016\fg\u000e\u0003\u0004|\u0001\u0001\u0006Ia^\u0001\u0011\u0017&\u0003v,\r\u00192?\u0016s\u0015I\u0011'F\t\u0002BQ! \u0001\u0005By\fQa]3u+B$\u0012A\u0013\u0015\u0004y\u0006\u0005\u0001\u0003BA\u0002\u0003\u0013i!!!\u0002\u000b\u0007\u0005\u001dA,A\u0003kk:LG/\u0003\u0003\u0002\f\u0005\u0015!A\u0002\"fM>\u0014X\r\u0003\u0004\u0002\u0010\u0001!\tE`\u0001\ti\u0016\f'\u000fR8x]\"\"\u0011QBA\n!\u0011\t\u0019!!\u0006\n\t\u0005]\u0011Q\u0001\u0002\u0006\u0003\u001a$XM\u001d\u0005\u0007\u00037\u0001A\u0011\u0001@\u0002IMDw.\u001e7e\r>dGn\\<MK\u0006$WM]#q_\u000eD')Y:jG^{'o\u001b4m_^DC!!\u0007\u0002 A!\u00111AA\u0011\u0013\u0011\t\u0019#!\u0002\u0003\tQ+7\u000f\u001e\u0005\u0007\u0003O\u0001A\u0011\u0001@\u00027MDw.\u001e7e\u001d>$\u0018\t\u001c7po\u0012Kg/\u001a:hK:$Hj\\4tQ\u0011\t)#a\b\t\r\u00055\u0002\u0001\"\u0001\u007f\u0003mygMZ:fiN\u001c\u0006n\\;mI:{GoR8CC\u000e\\w/\u0019:eg\"\"\u00111FA\u0010\u0011\u0019\t\u0019\u0004\u0001C\u0001}\u0006i2\u000f[8vY\u0012\u001cVO\u001d<jm\u00164\u0015m\u001d;MK\u0006$WM]\"iC:<W\r\u000b\u0003\u00022\u0005}\u0001BBA\u001d\u0001\u0011\u0005a0\u0001\u0017m_\u001e\u001c8\u000b[8vY\u0012tu\u000e\u001e#jm\u0016\u0014x-Z(o+:\u001cG.Z1o\u0019\u0016\fG-\u001a:FY\u0016\u001cG/[8og\"\"\u0011qGA\u0010\u0011\u001d\ty\u0004\u0001C\u0005\u0003\u0003\n1\u0001\\8h)\u0015Q\u00151IA$\u0011\u001d\t)%!\u0010A\u0002\r\u000ba\u0001\\3bI\u0016\u0014\bbBA%\u0003{\u0001\raQ\u0001\tM>dGn\\<fe\"9\u0011Q\n\u0001\u0005\n\u0005=\u0013AE<bSR4uN\u001d'pON$v.T1uG\"$rASA)\u0003+\nI\u0006C\u0004\u0002T\u0005-\u0003\u0019A\"\u0002\u0005\t\f\u0004bBA,\u0003\u0017\u0002\raQ\u0001\u0003EJB!\"a\u0017\u0002LA\u0005\t\u0019AA/\u0003%\u0001\u0018M\u001d;ji&|g\u000eE\u0002-\u0003?J1!!\u0019.\u0005\rIe\u000e\u001e\u0005\u0007\u0003K\u0002A\u0011\u0002@\u0002\u001bA\u0014\u0018N\u001c;TK\u001elWM\u001c;t\u0011\u001d\tI\u0007\u0001C\u0005\u0003W\nQb\u001d;beR\u001cuN\\:v[\u0016\u0014H#A5\t\u000f\u0005=\u0004\u0001\"\u0003\u0002r\u0005IB-\u001a7fi\u0016lUm]:bO\u0016\u001chI]8n\u0019><g)\u001b7f)\u001dQ\u00151OA?\u0003\u0003C\u0001\"!\u001e\u0002n\u0001\u0007\u0011qO\u0001\u0006Ef$Xm\u001d\t\u0004Y\u0005e\u0014bAA>[\t!Aj\u001c8h\u0011\u001d\ty(!\u001cA\u0002\r\u000baA\u0019:pW\u0016\u0014\b\u0002CAB\u0003[\u0002\r!!\u0018\u0002\u0017A\f'\u000f^5uS>t\u0017\n\u001a\u0005\u0007\u0003\u000f\u0003A\u0011B*\u0002/\r\u0014X-\u0019;f\u0005V4g-\u001a:j]\u001e\u0004&o\u001c3vG\u0016\u0014\bbBAF\u0001\u0011%\u0011QR\u0001\u000bO\u0016$Hj\\4GS2,GCBAH\u00037\u000bi\n\u0005\u0003\u0002\u0012\u0006]UBAAJ\u0015\r\t)JI\u0001\u0003S>LA!!'\u0002\u0014\n!a)\u001b7f\u0011\u001d\ty(!#A\u0002\rC\u0001\"a\u0017\u0002\n\u0002\u0007\u0011Q\f\u0005\b\u0003C\u0003A\u0011BAR\u0003\u00199W\r\u001e'pOR1\u0011QUAX\u0003c\u0003B!a*\u0002,6\u0011\u0011\u0011\u0016\u0006\u0004\u0003\u007f1\u0011\u0002BAW\u0003S\u00131\u0001T8h\u0011\u001d\ty(a(A\u0002\rC\u0001\"a\u0017\u0002 \u0002\u0007\u0011Q\f\u0005\b\u0003k\u0003A\u0011BA\\\u0003\u0019\u0011w.\u001e8dKR\u0019!*!/\t\u000f\u0005%\u00131\u0017a\u0001\u0007\"9\u0011Q\u0018\u0001\u0005\n\u0005}\u0016AC3q_\u000eD7)Y2iKR!\u0011\u0011YAd!\rQ\u00121Y\u0005\u0004\u0003\u000b\u0014!\u0001\u0006'fC\u0012,'/\u00129pG\"4\u0015\u000e\\3DC\u000eDW\rC\u0004\u0002\u0000\u0005m\u0006\u0019A\"\t\u000f\u0005-\u0007\u0001\"\u0003\u0002N\u0006aA.\u0019;fgR\u0014VmY8sIRA\u0011qZAp\u0003C\f)\u000f\u0005\u0003\u0002R\u0006mWBAAj\u0015\u0011\t).a6\u0002\rI,7m\u001c:e\u0015\r\tI.W\u0001\u0007G>lWn\u001c8\n\t\u0005u\u00171\u001b\u0002\f%\u0016\u001cwN\u001d3CCR\u001c\u0007\u000eC\u0004\u0002F\u0005%\u0007\u0019A\"\t\u0015\u0005\r\u0018\u0011\u001aI\u0001\u0002\u0004\ti&\u0001\u0004pM\u001a\u001cX\r\u001e\u0005\u000b\u00037\nI\r%AA\u0002\u0005u\u0003bBAu\u0001\u0011%\u00111^\u0001\tC^\f\u0017\u000e^%T%R\u0019!*!<\t\u0011\u0005=\u0018q\u001da\u0001\u0003c\f!\u0001\u001e9\u0011\t\u0005M\u0018Q_\u0007\u0003\u0003/LA!a>\u0002X\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007BBA~\u0001\u0011%1+\u0001\bde\u0016\fG/\u001a)s_\u0012,8-\u001a:\t\u000f\u0005\u0015\u0003\u0001\"\u0003\u0002\u0000R\t1\tC\u0004\u0002J\u0001!I!a@\t\u000f\t\u0015\u0001\u0001\"\u0003\u0003\b\u0005a1M]3bi\u0016\u0014%o\\6feR)1I!\u0003\u0003\u000e!A!1\u0002B\u0002\u0001\u0004\ti&\u0001\u0002jI\"I!q\u0002B\u0002!\u0003\u0005\ra^\u0001\u001cK:\f'\r\\3V]\u000edW-\u00198MK\u0006$WM]#mK\u000e$\u0018n\u001c8\u0007\r\tM\u0001\u0001\u0002B\u000b\u0005A\u0019F/\u001e2EKN,'/[1mSj,'o\u0005\u0004\u0003\u0012\t]!Q\u0004\t\u0004?\te\u0011b\u0001B\u000eA\t1qJ\u00196fGR\u0004RAa\b\u0003&-j!A!\t\u000b\t\t\r\u0012q[\u0001\u000eg\u0016\u0014\u0018.\u00197ju\u0006$\u0018n\u001c8\n\t\t\u001d\"\u0011\u0005\u0002\r\t\u0016\u001cXM]5bY&TXM\u001d\u0005\b/\tEA\u0011\u0001B\u0016)\t\u0011i\u0003\u0005\u0003\u00030\tEQ\"\u0001\u0001\t\u0011\tM\"\u0011\u0003C!\u0005k\t\u0011bY8oM&<WO]3\u0015\u000b)\u00139Da\u001d\t\u0011\te\"\u0011\u0007a\u0001\u0005w\tqaY8oM&<7\u000f\r\u0003\u0003>\t\u0005\u0004\u0003\u0003B \u0005\u000b\u0012IE!\u0018\u000e\u0005\t\u0005#b\u0001B\"E\u0005!Q\u000f^5m\u0013\u0011\u00119E!\u0011\u0003\u00075\u000b\u0007\u000f\u0005\u0003\u0003L\tec\u0002\u0002B'\u0005+\u00022Aa\u0014.\u001b\t\u0011\tFC\u0002\u0003T!\ta\u0001\u0010:p_Rt\u0014b\u0001B,[\u00051\u0001K]3eK\u001aL1!\nB.\u0015\r\u00119&\f\t\u0005\u0005?\u0012\t\u0007\u0004\u0001\u0005\u0019\t\r$qGA\u0001\u0002\u0003\u0015\tA!\u001a\u0003\u0007}#\u0013'\u0005\u0003\u0003h\t5\u0004c\u0001\u0017\u0003j%\u0019!1N\u0017\u0003\u000f9{G\u000f[5oOB\u0019AFa\u001c\n\u0007\tETFA\u0002B]fDqA!\u001e\u00032\u0001\u0007q/A\u0003jg.+\u0017\u0010\u0003\u0005\u0003z\tEA\u0011\tB>\u0003-!Wm]3sS\u0006d\u0017N_3\u0015\u000b-\u0012iHa \t\u000fq\u00119\b1\u0001\u0003J!9!\u0011\u0011B<\u0001\u0004Y\u0013\u0001\u00023bi\u0006DqA!\"\u0003\u0012\u0011\u0005c0A\u0003dY>\u001cX\rC\u0005\u0003\n\u0002\t\n\u0011\"\u0003\u0003\f\u000612M]3bi\u0016\u0014%o\\6fe\u0012\"WMZ1vYR$#'\u0006\u0002\u0003\u000e*\u001aqOa$,\u0005\tE\u0005\u0003\u0002BJ\u0005;k!A!&\u000b\t\t]%\u0011T\u0001\nk:\u001c\u0007.Z2lK\u0012T1Aa'.\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0005?\u0013)JA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016D\u0011Ba)\u0001#\u0003%IA!*\u0002-1\fG/Z:u%\u0016\u001cwN\u001d3%I\u00164\u0017-\u001e7uII*\"Aa*+\t\u0005u#q\u0012\u0005\n\u0005W\u0003\u0011\u0013!C\u0005\u0005K\u000ba\u0003\\1uKN$(+Z2pe\u0012$C-\u001a4bk2$He\r\u0005\n\u0005_\u0003\u0011\u0013!C\u0005\u0005K\u000bAd^1ji\u001a{'\u000fT8hgR{W*\u0019;dQ\u0012\"WMZ1vYR$3\u0007")
public class EpochDrivenReplicationProtocolAcceptanceTest
extends ZooKeeperTestHarness {
    private final String topic;
    private final byte[] msg = new byte[1000];
    private final byte[] msgBigger = new byte[10000];
    private Seq<KafkaServer> brokers = null;
    private KafkaProducer<byte[], byte[]> producer = null;
    private KafkaConsumer<byte[], byte[]> consumer = null;
    private final boolean KIP_101_ENABLED;

    public String topic() {
        return this.topic;
    }

    public byte[] msg() {
        return this.msg;
    }

    public byte[] msgBigger() {
        return this.msgBigger;
    }

    public Seq<KafkaServer> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(Seq<KafkaServer> x$1) {
        this.brokers = x$1;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    public KafkaConsumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    public void consumer_$eq(KafkaConsumer<byte[], byte[]> x$1) {
        this.consumer = x$1;
    }

    public boolean KIP_101_ENABLED() {
        return this.KIP_101_ENABLED;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
    }

    @Override
    @After
    public void tearDown() {
        this.producer().close();
        TestUtils$.MODULE$.shutdownServers(this.brokers());
        super.tearDown();
    }

    @Test
    public void shouldFollowLeaderEpochBasicWorkflow() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$1 -> this.createBroker(BoxesRunTime.unboxToInt((Object)x$1), this.createBroker$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createProducer());
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        Assert.assertEquals((long)0L, (long)this.latestRecord(this.leader(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assert.assertEquals((long)0L, (long)this.latestRecord(this.follower(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L)})), (Object)this.epochCache(this.follower()).epochEntries());
        this.bounce(this.follower());
        this.awaitISR(tp);
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L)})), (Object)this.epochCache(this.follower()).epochEntries());
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.follower()).epochEntries());
        Assert.assertEquals((long)1L, (long)this.latestRecord(this.leader(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assert.assertEquals((long)1L, (long)this.latestRecord(this.follower(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        this.bounce(this.leader());
        this.awaitISR(tp);
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L), new EpochEntry(2, 2L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.follower()).epochEntries());
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        Assert.assertEquals((long)2L, (long)this.latestRecord(this.leader(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assert.assertEquals((long)2L, (long)this.latestRecord(this.follower(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L), new EpochEntry(2, 2L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assert.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L), new EpochEntry(2, 2L)})), (Object)this.epochCache(this.follower()).epochEntries());
    }

    @Test
    public void shouldNotAllowDivergentLogs() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        this.brokers().foreach((Function1 & Serializable & scala.Serializable)b -> {
            b.shutdown();
            return BoxedUnit.UNIT;
        });
        new File((String)((KafkaServer)this.brokers().apply(0)).config().logDirs().apply(0), Log$.MODULE$.CleanShutdownFile()).delete();
        this.deleteMessagesFromLogFile(5 * this.msg().length, (KafkaServer)this.brokers().apply(0), 0);
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(this.createProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$2 -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msgBigger()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        Assert.assertEquals((String)"Log files should match Broker0 vs Broker 1", (long)this.getLogFile((KafkaServer)this.brokers().apply(0), 0).length(), (long)this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length());
    }

    @Test
    public void offsetsShouldNotGoBackwards() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$3 -> this.createBroker(BoxesRunTime.unboxToInt((Object)x$3), this.createBroker$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createBufferingProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 100).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        this.brokers().foreach((Function1 & Serializable & scala.Serializable)b -> {
            b.shutdown();
            return BoxedUnit.UNIT;
        });
        new File((String)((KafkaServer)this.brokers().apply(0)).config().logDirs().apply(0), Log$.MODULE$.CleanShutdownFile()).delete();
        this.deleteMessagesFromLogFile(this.getLogFile((KafkaServer)this.brokers().apply(0), 0).length() / 2L, (KafkaServer)this.brokers().apply(0), 0);
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(this.createBufferingProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 77).foreach((Function1 & Serializable & scala.Serializable)x$4 -> EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$offsetsShouldNotGoBackwards$4(this, BoxesRunTime.unboxToInt((Object)x$4)));
        this.producer().flush();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 77).foreach((Function1 & Serializable & scala.Serializable)x$5 -> EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$offsetsShouldNotGoBackwards$5(this, BoxesRunTime.unboxToInt((Object)x$5)));
        this.producer().flush();
        this.printSegments();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        this.printSegments();
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        this.startConsumer();
        Iterable records = (Iterable)JavaConverters$.MODULE$.iterableAsScalaIterableConverter((java.lang.Iterable)this.consumer().poll(1000L)).asScala();
        LongRef prevOffset = LongRef.create((long)-1L);
        records.foreach((Function1 & Serializable & scala.Serializable)r -> {
            EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$offsetsShouldNotGoBackwards$6(prevOffset, r);
            return BoxedUnit.UNIT;
        });
        Assert.assertEquals((String)"Log files should match Broker0 vs Broker 1", (long)this.getLogFile((KafkaServer)this.brokers().apply(0), 0).length(), (long)this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length());
    }

    @Test
    public void shouldSurviveFastLeaderChange() {
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$6 -> this.createBroker(BoxesRunTime.unboxToInt((Object)x$6), this.createBroker$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createProducer());
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        IntRef messagesWritten = IntRef.create((int)1);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            int leaderId = BoxesRunTime.unboxToInt((Object)this.zkClient().getLeaderForPartition(new TopicPartition(this.topic(), 0)).get());
            KafkaServer leader = (KafkaServer)((SeqLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldSurviveFastLeaderChange$3(leaderId, x$7)))).apply(0);
            KafkaServer follower = (KafkaServer)((SeqLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldSurviveFastLeaderChange$4(leaderId, x$8)))).apply(0);
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
            ++messagesWritten$1.elem;
            this.bounce(follower);
            this.log(leader, follower);
            this.awaitISR(tp);
            this.bounce(leader);
            this.log(leader, follower);
            this.awaitISR(tp);
            Assert.assertTrue((boolean)this.brokers().forall((Function1 & Serializable & scala.Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldSurviveFastLeaderChange$5(this, messagesWritten, broker))));
        });
    }

    @Test
    public void logsShouldNotDivergeOnUncleanLeaderElections() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$9 -> this.createBroker(BoxesRunTime.unboxToInt((Object)x$9), true), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers(), CoreUtils$.MODULE$.propsWith((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)KafkaConfig$.MODULE$.MinInSyncReplicasProp(), (Object)"1")})));
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msgBigger()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.producer().close();
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msgBigger()));
            this.producer().flush();
        });
        this.printSegments();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        this.printSegments();
        Seq seq = this.crcSeq$1((KafkaServer)this.brokers().apply(0), EpochDrivenReplicationProtocolAcceptanceTest.crcSeq$default$2$1());
        Seq seq2 = this.crcSeq$1((KafkaServer)this.brokers().apply(1), EpochDrivenReplicationProtocolAcceptanceTest.crcSeq$default$2$1());
        Assert.assertTrue((String)"Logs on Broker 100 and Broker 101 should match", (!(seq != null ? !seq.equals(seq2) : seq2 != null) ? 1 : 0) != 0);
    }

    private void log(KafkaServer leader, KafkaServer follower) {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Bounce complete for follower ").append(follower.config().brokerId()).toString());
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(21).append("Leader: leo").append(leader.config().brokerId()).append(": ").append(this.getLog(leader, 0).logEndOffset()).append(" cache: ").append(this.epochCache(leader).epochEntries()).toString());
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(23).append("Follower: leo").append(follower.config().brokerId()).append(": ").append(this.getLog(follower, 0).logEndOffset()).append(" cache: ").append(this.epochCache(follower).epochEntries()).toString());
    }

    private void waitForLogsToMatch(KafkaServer b1, KafkaServer b2, int partition) {
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.getLog(b1, partition).logEndOffset() == this.getLog(b2, partition).logEndOffset(), (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Logs didn't match.", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    private int waitForLogsToMatch$default$3() {
        return 0;
    }

    private void printSegments() {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Broker0:");
        DumpLogSegments$.MODULE$.main((String[])((TraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"--files", this.getLogFile((KafkaServer)this.brokers().apply(0), 0).getCanonicalPath()}))).toArray(ClassTag$.MODULE$.apply(String.class)));
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Broker1:");
        DumpLogSegments$.MODULE$.main((String[])((TraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"--files", this.getLogFile((KafkaServer)this.brokers().apply(1), 0).getCanonicalPath()}))).toArray(ClassTag$.MODULE$.apply(String.class)));
    }

    private KafkaConsumer<byte[], byte[]> startConsumer() {
        Properties consumerConfig = new Properties();
        consumerConfig.put("bootstrap.servers", TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()));
        consumerConfig.put("fetch.max.bytes", String.valueOf(this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length() * 2L));
        consumerConfig.put("max.partition.fetch.bytes", String.valueOf(this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length() * 2L));
        this.consumer_$eq((KafkaConsumer<byte[], byte[]>)new KafkaConsumer(consumerConfig, (Deserializer)new StubDeserializer(), (Deserializer)new StubDeserializer()));
        this.consumer().assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)new TopicPartition(this.topic(), 0), (List)Nil$.MODULE$)).asJava());
        this.consumer().seek(new TopicPartition(this.topic(), 0), 0L);
        return this.consumer();
    }

    private void deleteMessagesFromLogFile(long bytes, KafkaServer broker, int partitionId) {
        File logFile = this.getLogFile(broker, partitionId);
        RandomAccessFile writable = new RandomAccessFile(logFile, "rwd");
        writable.setLength(logFile.length() - bytes);
        writable.close();
    }

    private KafkaProducer<byte[], byte[]> createBufferingProducer() {
        String x$20 = TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        int x$21 = -1;
        int x$22 = 10000;
        int x$23 = this.msg().length * 1000;
        String x$24 = "snappy";
        long x$25 = TestUtils$.MODULE$.createProducer$default$3();
        long x$26 = TestUtils$.MODULE$.createProducer$default$4();
        int x$27 = TestUtils$.MODULE$.createProducer$default$5();
        int x$28 = TestUtils$.MODULE$.createProducer$default$6();
        int x$29 = TestUtils$.MODULE$.createProducer$default$10();
        SecurityProtocol x$30 = TestUtils$.MODULE$.createProducer$default$11();
        Option<File> x$31 = TestUtils$.MODULE$.createProducer$default$12();
        Option<Properties> x$32 = TestUtils$.MODULE$.createProducer$default$13();
        ByteArraySerializer x$33 = TestUtils$.MODULE$.createProducer$default$14();
        ByteArraySerializer x$34 = TestUtils$.MODULE$.createProducer$default$15();
        return TestUtils$.MODULE$.createProducer(x$20, x$21, x$25, x$26, x$27, x$28, x$22, x$23, x$24, x$29, x$30, x$31, x$32, x$33, x$34);
    }

    private File getLogFile(KafkaServer broker, int partition) {
        Log log = this.getLog(broker, partition);
        log.flush();
        return ((File[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])log.dir().listFiles())).filter((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$getLogFile$1(x$11))))[0];
    }

    private Log getLog(KafkaServer broker, int partition) {
        LogManager qual$2 = broker.logManager();
        TopicPartition x$35 = new TopicPartition(this.topic(), partition);
        boolean x$36 = qual$2.getLog$default$2();
        return (Log)qual$2.getLog(x$35, x$36).orNull(Predef$.MODULE$.$conforms());
    }

    private void bounce(KafkaServer follower) {
        follower.shutdown();
        follower.startup();
        this.producer().close();
        this.producer_$eq(this.createProducer());
    }

    private LeaderEpochFileCache epochCache(KafkaServer broker) {
        return (LeaderEpochFileCache)this.getLog(broker, 0).leaderEpochCache().get();
    }

    private RecordBatch latestRecord(KafkaServer leader, int offset, int partition) {
        LogSegment qual$3 = this.getLog(leader, partition).activeSegment();
        long x$37 = 0L;
        None$ x$38 = None$.MODULE$;
        int x$39 = Integer.MAX_VALUE;
        long x$40 = qual$3.read$default$4();
        boolean x$41 = qual$3.read$default$5();
        return (RecordBatch)((TraversableOnce)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(qual$3.read(x$37, (Option)x$38, x$39, x$40, x$41).records().batches()).asScala()).toSeq().last();
    }

    private int latestRecord$default$2() {
        return -1;
    }

    private int latestRecord$default$3() {
        return 0;
    }

    private void awaitISR(TopicPartition tp) {
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> ((TraversableOnce)((Partition)this.leader().replicaManager().getPartition(tp).get()).inSyncReplicas().map((Function1 & Serializable & scala.Serializable)x$12 -> BoxesRunTime.boxToInteger((int)x$12.brokerId()), Set$.MODULE$.canBuildFrom())).size() == 2, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Timed out waiting for replicas to join ISR", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    private KafkaProducer<byte[], byte[]> createProducer() {
        return TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromServers(this.brokers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15());
    }

    private KafkaServer leader() {
        Assert.assertEquals((long)2L, (long)this.brokers().size());
        int leaderId = BoxesRunTime.unboxToInt((Object)this.zkClient().getLeaderForPartition(new TopicPartition(this.topic(), 0)).get());
        return (KafkaServer)((SeqLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$13 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$leader$1(leaderId, x$13)))).apply(0);
    }

    private KafkaServer follower() {
        Assert.assertEquals((long)2L, (long)this.brokers().size());
        int leader = BoxesRunTime.unboxToInt((Object)this.zkClient().getLeaderForPartition(new TopicPartition(this.topic(), 0)).get());
        return (KafkaServer)((SeqLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$14 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$follower$1(leader, x$14)))).apply(0);
    }

    private KafkaServer createBroker(int id, boolean enableUncleanLeaderElection) {
        Object object;
        Properties config = TestUtils$.MODULE$.createBrokerConfig(id, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        if (!this.KIP_101_ENABLED()) {
            config.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_0_11_0_IV1$.MODULE$.version());
            object = config.setProperty(KafkaConfig$.MODULE$.LogMessageFormatVersionProp(), KAFKA_0_11_0_IV1$.MODULE$.version());
        } else {
            object = BoxedUnit.UNIT;
        }
        config.setProperty(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)enableUncleanLeaderElection)).toString());
        return TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(config), TestUtils$.MODULE$.createServer$default$2());
    }

    private boolean createBroker$default$2() {
        return false;
    }

    public static final /* synthetic */ Future $anonfun$offsetsShouldNotGoBackwards$4(EpochDrivenReplicationProtocolAcceptanceTest $this, int x$4) {
        return $this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)$this.msg()));
    }

    public static final /* synthetic */ Future $anonfun$offsetsShouldNotGoBackwards$5(EpochDrivenReplicationProtocolAcceptanceTest $this, int x$5) {
        return $this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)$this.msg()));
    }

    public static final /* synthetic */ void $anonfun$offsetsShouldNotGoBackwards$6(LongRef prevOffset$1, ConsumerRecord r) {
        Assert.assertTrue((String)new StringBuilder(21).append("Offset ").append(prevOffset$1.elem).append(" came before ").append(r.offset()).append(" ").toString(), (r.offset() > prevOffset$1.elem ? 1 : 0) != 0);
        prevOffset$1.elem = r.offset();
    }

    public static final /* synthetic */ boolean $anonfun$shouldSurviveFastLeaderChange$3(int leaderId$1, KafkaServer x$7) {
        return x$7.config().brokerId() == leaderId$1;
    }

    public static final /* synthetic */ boolean $anonfun$shouldSurviveFastLeaderChange$4(int leaderId$1, KafkaServer x$8) {
        return x$8.config().brokerId() != leaderId$1;
    }

    public static final /* synthetic */ boolean $anonfun$shouldSurviveFastLeaderChange$5(EpochDrivenReplicationProtocolAcceptanceTest $this, IntRef messagesWritten$1, KafkaServer broker) {
        return $this.getLog(broker, 0).logEndOffset() == (long)messagesWritten$1.elem;
    }

    private final Seq crcSeq$1(KafkaServer broker, int partition) {
        LogSegment qual$1 = this.getLog(broker, partition).activeSegment();
        long x$15 = 0L;
        None$ x$16 = None$.MODULE$;
        int x$17 = Integer.MAX_VALUE;
        long x$18 = qual$1.read$default$4();
        boolean x$19 = qual$1.read$default$5();
        Seq batches = ((TraversableOnce)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(qual$1.read(x$15, (Option)x$16, x$17, x$18, x$19).records().batches()).asScala()).toSeq();
        return (Seq)batches.map((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToLong((long)x$10.checksum()), Seq$.MODULE$.canBuildFrom());
    }

    private static final int crcSeq$default$2$1() {
        return 0;
    }

    public static final /* synthetic */ boolean $anonfun$getLogFile$1(File x$11) {
        return x$11.getName().endsWith(".log");
    }

    public static final /* synthetic */ boolean $anonfun$leader$1(int leaderId$2, KafkaServer x$13) {
        return x$13.config().brokerId() == leaderId$2;
    }

    public static final /* synthetic */ boolean $anonfun$follower$1(int leader$2, KafkaServer x$14) {
        return x$14.config().brokerId() != leader$2;
    }

    public EpochDrivenReplicationProtocolAcceptanceTest() {
        this.topic = "topic1";
        this.KIP_101_ENABLED = true;
    }

    public class StubDeserializer
    implements Deserializer<byte[]> {
        public Object deserialize(String x$1, Headers x$2, byte[] x$3) {
            return super.deserialize(x$1, x$2, x$3);
        }

        public void configure(java.util.Map<String, ?> configs, boolean isKey) {
        }

        public byte[] deserialize(String topic, byte[] data) {
            return data;
        }

        public void close() {
        }

        public /* synthetic */ EpochDrivenReplicationProtocolAcceptanceTest kafka$server$epoch$EpochDrivenReplicationProtocolAcceptanceTest$StubDeserializer$$$outer() {
            return EpochDrivenReplicationProtocolAcceptanceTest.this;
        }

        public StubDeserializer() {
            if (EpochDrivenReplicationProtocolAcceptanceTest.this == null) {
                throw null;
            }
        }
    }
}

