/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.epoch;

import java.io.File;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Future;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogLoader$;
import kafka.log.LogManager;
import kafka.log.LogSegment;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
import kafka.server.epoch.EpochEntry;
import kafka.server.epoch.LeaderEpochFileCache;
import kafka.tools.DumpLogSegments$;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.ListBuffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\tEc\u0001B\u0017/\u0001UBQ\u0001\u0011\u0001\u0005\u0002\u0005CQ\u0001\u0012\u0001\u0005B\u0015CqA\u0015\u0001C\u0002\u0013\u00051\u000b\u0003\u0004]\u0001\u0001\u0006I\u0001\u0016\u0005\b;\u0002\u0011\r\u0011\"\u0001_\u0011\u0019A\u0007\u0001)A\u0005?\"9\u0011\u000e\u0001b\u0001\n\u0003q\u0006B\u00026\u0001A\u0003%q\fC\u0004l\u0001\u0001\u0007I\u0011\u00017\t\u000fY\u0004\u0001\u0019!C\u0001o\"1Q\u0010\u0001Q!\n5DqA \u0001A\u0002\u0013\u0005q\u0010C\u0005\u0002\u0010\u0001\u0001\r\u0011\"\u0001\u0002\u0012!A\u0011Q\u0003\u0001!B\u0013\t\t\u0001C\u0005\u0002\u0018\u0001\u0001\r\u0011\"\u0001\u0002\u001a!I\u0011Q\u0005\u0001A\u0002\u0013\u0005\u0011q\u0005\u0005\t\u0003W\u0001\u0001\u0015)\u0003\u0002\u001c!9\u0011Q\u0006\u0001\u0005B\u0005=\u0002bBA)\u0001\u0011\u0005\u00131\u000b\u0005\b\u0003;\u0002A\u0011AA*\u0011\u001d\t9\u0007\u0001C\u0001\u0003'Bq!a\u001b\u0001\t\u0003\t\u0019\u0006C\u0004\u0002p\u0001!\t!a\u0015\t\u000f\u0005M\u0004\u0001\"\u0001\u0002T!9\u0011q\u000f\u0001\u0005\n\u0005e\u0004bBAB\u0001\u0011%\u0011Q\u0011\u0005\n\u00033\u0003\u0011\u0013!C\u0005\u00037Cq!!-\u0001\t\u0013\t\u0019\u0006C\u0004\u00024\u0002!I!!.\t\u000f\u0005]\u0006\u0001\"\u0003\u0002:\"1\u0011Q\u001a\u0001\u0005\n}Dq!a4\u0001\t\u0013\t\t\u000eC\u0004\u0002d\u0002!I!!:\t\u000f\u0005U\b\u0001\"\u0003\u0002x\"9\u00111 \u0001\u0005\n\u0005u\bb\u0002B\u0004\u0001\u0011%!\u0011\u0002\u0005\n\u0005C\u0001\u0011\u0013!C\u0005\u00037C\u0011Ba\t\u0001#\u0003%I!a'\t\u000f\t\u0015\u0002\u0001\"\u0003\u0003(!1!Q\u0007\u0001\u0005\n}Dq!! \u0001\t\u0013\u00119\u0004C\u0004\u0002\u0002\u0002!IAa\u000e\t\u000f\te\u0002\u0001\"\u0003\u0003<!I!1\n\u0001\u0012\u0002\u0013%!Q\n\u0002-\u000bB|7\r\u001b#sSZ,gNU3qY&\u001c\u0017\r^5p]B\u0013x\u000e^8d_2\f5mY3qi\u0006t7-\u001a+fgRT!a\f\u0019\u0002\u000b\u0015\u0004xn\u00195\u000b\u0005E\u0012\u0014AB:feZ,'OC\u00014\u0003\u0015Y\u0017MZ6b\u0007\u0001\u00192\u0001\u0001\u001c;!\t9\u0004(D\u00011\u0013\tI\u0004GA\tRk>\u0014X/\u001c+fgRD\u0015M\u001d8fgN\u0004\"a\u000f \u000e\u0003qR!!\u0010\u001a\u0002\u000bU$\u0018\u000e\\:\n\u0005}b$a\u0002'pO\u001eLgnZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\t\u0003\"a\u0011\u0001\u000e\u00039\nq\"\\3uC\u0012\fG/\u0019,feNLwN\\\u000b\u0002\rB\u0011q\tU\u0007\u0002\u0011*\u0011\u0011JS\u0001\u0007G>lWn\u001c8\u000b\u0005EZ%BA\u001aM\u0015\tie*\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001f\u0006\u0019qN]4\n\u0005EC%aD'fi\u0006$\u0017\r^1WKJ\u001c\u0018n\u001c8\u0002\u000bQ|\u0007/[2\u0016\u0003Q\u0003\"!\u0016.\u000e\u0003YS!a\u0016-\u0002\t1\fgn\u001a\u0006\u00023\u0006!!.\u0019<b\u0013\tYfK\u0001\u0004TiJLgnZ\u0001\u0007i>\u0004\u0018n\u0019\u0011\u0002\u00075\u001cx-F\u0001`!\r\u00017-Z\u0007\u0002C*\t!-A\u0003tG\u0006d\u0017-\u0003\u0002eC\n)\u0011I\u001d:bsB\u0011\u0001MZ\u0005\u0003O\u0006\u0014AAQ=uK\u0006!Qn]4!\u0003%i7o\u001a\"jO\u001e,'/\u0001\u0006ng\u001e\u0014\u0015nZ4fe\u0002\nqA\u0019:pW\u0016\u00148/F\u0001n!\rq\u0017o]\u0007\u0002_*\u0011\u0001/Y\u0001\u000bG>dG.Z2uS>t\u0017B\u0001:p\u0005\r\u0019V-\u001d\t\u0003oQL!!\u001e\u0019\u0003\u0017-\u000bgm[1TKJ4XM]\u0001\fEJ|7.\u001a:t?\u0012*\u0017\u000f\u0006\u0002ywB\u0011\u0001-_\u0005\u0003u\u0006\u0014A!\u00168ji\"9APCA\u0001\u0002\u0004i\u0017a\u0001=%c\u0005A!M]8lKJ\u001c\b%\u0001\u0005qe>$WoY3s+\t\t\t\u0001\u0005\u0004\u0002\u0004\u0005-qlX\u0007\u0003\u0003\u000bQ1A`A\u0004\u0015\r\tIaS\u0001\bG2LWM\u001c;t\u0013\u0011\ti!!\u0002\u0003\u001b-\u000bgm[1Qe>$WoY3s\u00031\u0001(o\u001c3vG\u0016\u0014x\fJ3r)\rA\u00181\u0003\u0005\ty6\t\t\u00111\u0001\u0002\u0002\u0005I\u0001O]8ek\u000e,'\u000fI\u0001\tG>t7/^7feV\u0011\u00111\u0004\t\u0007\u0003;\t\tcX0\u000e\u0005\u0005}!\u0002BA\f\u0003\u000fIA!a\t\u0002 \ti1*\u00194lC\u000e{gn];nKJ\fAbY8ogVlWM]0%KF$2\u0001_A\u0015\u0011!a\b#!AA\u0002\u0005m\u0011!C2p]N,X.\u001a:!\u0003\u0015\u0019X\r^+q)\rA\u0018\u0011\u0007\u0005\b\u0003g\u0011\u0002\u0019AA\u001b\u0003!!Xm\u001d;J]\u001a|\u0007\u0003BA\u001c\u0003\u000bj!!!\u000f\u000b\t\u0005m\u0012QH\u0001\u0004CBL'\u0002BA \u0003\u0003\nqA[;qSR,'OC\u0002\u0002D9\u000bQA[;oSRLA!a\u0012\u0002:\tAA+Z:u\u0013:4w\u000eK\u0002\u0013\u0003\u0017\u0002B!a\u000e\u0002N%!\u0011qJA\u001d\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fR8x]R\t\u0001\u0010K\u0002\u0014\u0003/\u0002B!a\u000e\u0002Z%!\u00111LA\u001d\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u0013tQ>,H\u000e\u001a$pY2|w\u000fT3bI\u0016\u0014X\t]8dQ\n\u000b7/[2X_J\\g\r\\8xQ\r!\u0012\u0011\r\t\u0005\u0003o\t\u0019'\u0003\u0003\u0002f\u0005e\"\u0001\u0002+fgR\f1d\u001d5pk2$gj\u001c;BY2|w\u000fR5wKJ<WM\u001c;M_\u001e\u001c\bfA\u000b\u0002b\u0005YrN\u001a4tKR\u001c8\u000b[8vY\u0012tu\u000e^$p\u0005\u0006\u001c7n^1sIND3AFA1\u0003u\u0019\bn\\;mIN+(O^5wK\u001a\u000b7\u000f\u001e'fC\u0012,'o\u00115b]\u001e,\u0007fA\f\u0002b\u0005aCn\\4t'\"|W\u000f\u001c3O_R$\u0015N^3sO\u0016|e.\u00168dY\u0016\fg\u000eT3bI\u0016\u0014X\t\\3di&|gn\u001d\u0015\u00041\u0005\u0005\u0014a\u00017pOR)\u00010a\u001f\u0002\u0000!1\u0011QP\rA\u0002M\fa\u0001\\3bI\u0016\u0014\bBBAA3\u0001\u00071/\u0001\u0005g_2dwn^3s\u0003I9\u0018-\u001b;G_JdunZ:U_6\u000bGo\u00195\u0015\u000fa\f9)a#\u0002\u0010\"1\u0011\u0011\u0012\u000eA\u0002M\f!AY\u0019\t\r\u00055%\u00041\u0001t\u0003\t\u0011'\u0007C\u0005\u0002\u0012j\u0001\n\u00111\u0001\u0002\u0014\u0006I\u0001/\u0019:uSRLwN\u001c\t\u0004A\u0006U\u0015bAALC\n\u0019\u0011J\u001c;\u00029]\f\u0017\u000e\u001e$pe2{wm\u001d+p\u001b\u0006$8\r\u001b\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011Q\u0014\u0016\u0005\u0003'\u000byj\u000b\u0002\u0002\"B!\u00111UAW\u001b\t\t)K\u0003\u0003\u0002(\u0006%\u0016!C;oG\",7m[3e\u0015\r\tY+Y\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAX\u0003K\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00035\u0001(/\u001b8u'\u0016<W.\u001a8ug\u0006i1\u000f^1si\u000e{gn];nKJ$\"!a\u0007\u00023\u0011,G.\u001a;f\u001b\u0016\u001c8/Y4fg\u001a\u0013x.\u001c'pO\u001aKG.\u001a\u000b\bq\u0006m\u0016QYAe\u0011\u001d\tiL\ba\u0001\u0003\u007f\u000bQAY=uKN\u00042\u0001YAa\u0013\r\t\u0019-\u0019\u0002\u0005\u0019>tw\r\u0003\u0004\u0002Hz\u0001\ra]\u0001\u0007EJ|7.\u001a:\t\u000f\u0005-g\u00041\u0001\u0002\u0014\u0006Y\u0001/\u0019:uSRLwN\\%e\u0003]\u0019'/Z1uK\n+hMZ3sS:<\u0007K]8ek\u000e,'/\u0001\u0006hKRdun\u001a$jY\u0016$b!a5\u0002`\u0006\u0005\b\u0003BAk\u00037l!!a6\u000b\u0007\u0005e\u0007,\u0001\u0002j_&!\u0011Q\\Al\u0005\u00111\u0015\u000e\\3\t\r\u0005\u001d\u0007\u00051\u0001t\u0011\u001d\t\t\n\ta\u0001\u0003'\u000baaZ3u\u0019><GCBAt\u0003c\f\u0019\u0010\u0005\u0003\u0002j\u00065XBAAv\u0015\r\t9HM\u0005\u0005\u0003_\fYOA\u0006BEN$(/Y2u\u0019><\u0007BBAdC\u0001\u00071\u000fC\u0004\u0002\u0012\u0006\u0002\r!a%\u0002\r\t|WO\\2f)\rA\u0018\u0011 \u0005\u0007\u0003\u0003\u0013\u0003\u0019A:\u0002\u0015\u0015\u0004xn\u00195DC\u000eDW\r\u0006\u0003\u0002\u0000\n\u0015\u0001cA\"\u0003\u0002%\u0019!1\u0001\u0018\u0003)1+\u0017\rZ3s\u000bB|7\r\u001b$jY\u0016\u001c\u0015m\u00195f\u0011\u0019\t9m\ta\u0001g\u0006aA.\u0019;fgR\u0014VmY8sIRA!1\u0002B\r\u00057\u0011y\u0002\u0005\u0003\u0003\u000e\tUQB\u0001B\b\u0015\u0011\u0011\tBa\u0005\u0002\rI,7m\u001c:e\u0015\tI5*\u0003\u0003\u0003\u0018\t=!a\u0003*fG>\u0014HMQ1uG\"Da!! %\u0001\u0004\u0019\b\"\u0003B\u000fIA\u0005\t\u0019AAJ\u0003\u0019ygMZ:fi\"I\u0011\u0011\u0013\u0013\u0011\u0002\u0003\u0007\u00111S\u0001\u0017Y\u0006$Xm\u001d;SK\u000e|'\u000f\u001a\u0013eK\u001a\fW\u000f\u001c;%e\u00051B.\u0019;fgR\u0014VmY8sI\u0012\"WMZ1vYR$3'\u0001\u0005bo\u0006LG/S*S)\rA(\u0011\u0006\u0005\b\u0005W9\u0003\u0019\u0001B\u0017\u0003\t!\b\u000f\u0005\u0003\u00030\tERB\u0001B\n\u0013\u0011\u0011\u0019Da\u0005\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006q1M]3bi\u0016\u0004&o\u001c3vG\u0016\u0014X#A:\u0002%\r\u0014X-\u0019;f\u0005J|7.\u001a:XSRD\u0017\n\u001a\u000b\u0006g\nu\"\u0011\t\u0005\b\u0005\u007fY\u0003\u0019AAJ\u0003\tIG\rC\u0005\u0003D-\u0002\n\u00111\u0001\u0003F\u0005YRM\\1cY\u0016,fn\u00197fC:dU-\u00193fe\u0016cWm\u0019;j_:\u00042\u0001\u0019B$\u0013\r\u0011I%\u0019\u0002\b\u0005>|G.Z1o\u0003q\u0019'/Z1uK\n\u0013xn[3s/&$\b.\u00133%I\u00164\u0017-\u001e7uII*\"Aa\u0014+\t\t\u0015\u0013q\u0014")
public class EpochDrivenReplicationProtocolAcceptanceTest
extends QuorumTestHarness {
    private final String topic;
    private final byte[] msg = new byte[1000];
    private final byte[] msgBigger = new byte[10000];
    private Seq<KafkaServer> brokers = null;
    private KafkaProducer<byte[], byte[]> producer = null;
    private KafkaConsumer<byte[], byte[]> consumer = null;

    @Override
    public MetadataVersion metadataVersion() {
        return MetadataVersion.latest();
    }

    public String topic() {
        return this.topic;
    }

    public byte[] msg() {
        return this.msg;
    }

    public byte[] msgBigger() {
        return this.msgBigger;
    }

    public Seq<KafkaServer> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(Seq<KafkaServer> x$1) {
        this.brokers = x$1;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> x$1) {
        this.producer = x$1;
    }

    public KafkaConsumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    public void consumer_$eq(KafkaConsumer<byte[], byte[]> x$1) {
        this.consumer = x$1;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
    }

    @Override
    @AfterEach
    public void tearDown() {
        this.producer().close();
        TestUtils$.MODULE$.shutdownServers(this.brokers(), TestUtils$.MODULE$.shutdownServers$default$2());
        super.tearDown();
    }

    @Test
    public void shouldFollowLeaderEpochBasicWorkflow() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$1 -> this.createBrokerWithId(BoxesRunTime.unboxToInt((Object)x$1), this.createBrokerWithId$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createProducer());
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        Assertions.assertEquals((int)0, (int)this.latestRecord(this.leader(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assertions.assertEquals((int)0, (int)this.latestRecord(this.follower(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L)})), (Object)this.epochCache(this.follower()).epochEntries());
        this.bounce(this.follower());
        this.awaitISR(tp);
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L)})), (Object)this.epochCache(this.follower()).epochEntries());
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.follower()).epochEntries());
        Assertions.assertEquals((int)1, (int)this.latestRecord(this.leader(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assertions.assertEquals((int)1, (int)this.latestRecord(this.follower(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        this.bounce(this.leader());
        this.awaitISR(tp);
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L), new EpochEntry(2, 2L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L)})), (Object)this.epochCache(this.follower()).epochEntries());
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        Assertions.assertEquals((int)2, (int)this.latestRecord(this.leader(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assertions.assertEquals((int)2, (int)this.latestRecord(this.follower(), this.latestRecord$default$2(), this.latestRecord$default$3()).partitionLeaderEpoch());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L), new EpochEntry(2, 2L)})), (Object)this.epochCache(this.leader()).epochEntries());
        Assertions.assertEquals((Object)ListBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new EpochEntry[]{new EpochEntry(0, 0L), new EpochEntry(1, 1L), new EpochEntry(2, 2L)})), (Object)this.epochCache(this.follower()).epochEntries());
    }

    @Test
    public void shouldNotAllowDivergentLogs() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)id -> TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(BoxesRunTime.unboxToInt((Object)id), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20())), TestUtils$.MODULE$.createServer$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        KafkaServer broker100 = (KafkaServer)this.brokers().apply(0);
        KafkaServer broker101 = (KafkaServer)this.brokers().apply(1);
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach((Function1 & Serializable & scala.Serializable)i -> EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldNotAllowDivergentLogs$2(this, BoxesRunTime.unboxToInt((Object)i)));
        broker101.shutdown();
        broker100.shutdown();
        new File((String)broker100.config().logDirs().head(), LogLoader$.MODULE$.CleanShutdownFile()).delete();
        this.deleteMessagesFromLogFile(5 * this.msg().length, broker100, 0);
        broker100.startup();
        this.producer().close();
        this.producer_$eq(this.createProducer());
        ((IterableLike)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(11), 20).map((Function1 & Serializable & scala.Serializable)i -> EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldNotAllowDivergentLogs$3(this, BoxesRunTime.unboxToInt((Object)i)), IndexedSeq$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$2 -> (RecordMetadata)x$2.get());
        broker101.startup();
        this.waitForLogsToMatch(broker100, broker101, this.waitForLogsToMatch$default$3());
        Assertions.assertEquals((long)this.getLogFile((KafkaServer)this.brokers().apply(0), 0).length(), (long)this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length(), (String)"Log files should match Broker0 vs Broker 1");
    }

    @Test
    public void offsetsShouldNotGoBackwards() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$3 -> this.createBrokerWithId(BoxesRunTime.unboxToInt((Object)x$3), this.createBrokerWithId$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createBufferingProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 100).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$4 -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        this.brokers().foreach((Function1 & Serializable & scala.Serializable)b -> {
            b.shutdown();
            return BoxedUnit.UNIT;
        });
        new File((String)((KafkaServer)this.brokers().apply(0)).config().logDirs().apply(0), LogLoader$.MODULE$.CleanShutdownFile()).delete();
        this.deleteMessagesFromLogFile(this.getLogFile((KafkaServer)this.brokers().apply(0), 0).length() / 2L, (KafkaServer)this.brokers().apply(0), 0);
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(this.createBufferingProducer());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 77).foreach((Function1 & Serializable & scala.Serializable)x$5 -> EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$offsetsShouldNotGoBackwards$4(this, BoxesRunTime.unboxToInt((Object)x$5)));
        this.producer().flush();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 77).foreach((Function1 & Serializable & scala.Serializable)x$6 -> EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$offsetsShouldNotGoBackwards$5(this, BoxesRunTime.unboxToInt((Object)x$6)));
        this.producer().flush();
        this.printSegments();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        this.printSegments();
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        this.startConsumer();
        Seq<ConsumerRecord<byte[], byte[]>> records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(this.consumer(), 100, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
        LongRef prevOffset = LongRef.create((long)-1L);
        records.foreach((Function1 & Serializable & scala.Serializable)r -> {
            EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$offsetsShouldNotGoBackwards$6(prevOffset, r);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals((long)this.getLogFile((KafkaServer)this.brokers().apply(0), 0).length(), (long)this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length(), (String)"Log files should match Broker0 vs Broker 1");
    }

    @Test
    public void shouldSurviveFastLeaderChange() {
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$7 -> this.createBrokerWithId(BoxesRunTime.unboxToInt((Object)x$7), this.createBrokerWithId$default$2()), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers());
        this.producer_$eq(this.createProducer());
        this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
        IntRef messagesWritten = IntRef.create((int)1);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            int leaderId = BoxesRunTime.unboxToInt((Object)this.zkClient().getLeaderForPartition(new TopicPartition(this.topic(), 0)).get());
            KafkaServer leader = (KafkaServer)((SeqLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldSurviveFastLeaderChange$3(leaderId, x$8)))).apply(0);
            KafkaServer follower = (KafkaServer)((SeqLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldSurviveFastLeaderChange$4(leaderId, x$9)))).apply(0);
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
            ++messagesWritten$1.elem;
            this.bounce(follower);
            this.log(leader, follower);
            this.awaitISR(tp);
            this.bounce(leader);
            this.log(leader, follower);
            this.awaitISR(tp);
            Assertions.assertTrue((boolean)this.brokers().forall((Function1 & Serializable & scala.Serializable)broker -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$shouldSurviveFastLeaderChange$5(this, messagesWritten, broker))));
        });
    }

    @Test
    public void logsShouldNotDivergeOnUncleanLeaderElections() {
        this.brokers_$eq((Seq<KafkaServer>)((Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 101).map((Function1 & Serializable & scala.Serializable)x$10 -> this.createBrokerWithId(BoxesRunTime.unboxToInt((Object)x$10), true), IndexedSeq$.MODULE$.canBuildFrom())));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 101})))}))), this.brokers(), CoreUtils$.MODULE$.propsWith((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)KafkaConfig$.MODULE$.MinInSyncReplicasProp(), (Object)"1")})));
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msgBigger()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(0)).shutdown();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.producer().close();
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg()));
            this.producer().flush();
        });
        ((KafkaServer)this.brokers().apply(1)).shutdown();
        ((KafkaServer)this.brokers().apply(0)).startup();
        this.producer().close();
        this.producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), 1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            this.producer().send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msgBigger()));
            this.producer().flush();
        });
        this.printSegments();
        ((KafkaServer)this.brokers().apply(1)).startup();
        this.waitForLogsToMatch((KafkaServer)this.brokers().apply(0), (KafkaServer)this.brokers().apply(1), this.waitForLogsToMatch$default$3());
        this.printSegments();
        Seq seq = this.crcSeq$1((KafkaServer)this.brokers().apply(0), EpochDrivenReplicationProtocolAcceptanceTest.crcSeq$default$2$1());
        Seq seq2 = this.crcSeq$1((KafkaServer)this.brokers().apply(1), EpochDrivenReplicationProtocolAcceptanceTest.crcSeq$default$2$1());
        Assertions.assertTrue((!(seq != null ? !seq.equals(seq2) : seq2 != null) ? 1 : 0) != 0, (String)"Logs on Broker 100 and Broker 101 should match");
    }

    private void log(KafkaServer leader, KafkaServer follower) {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Bounce complete for follower ").append(follower.config().brokerId()).toString());
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(21).append("Leader: leo").append(leader.config().brokerId()).append(": ").append(this.getLog(leader, 0).logEndOffset()).append(" cache: ").append(this.epochCache(leader).epochEntries()).toString());
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(23).append("Follower: leo").append(follower.config().brokerId()).append(": ").append(this.getLog(follower, 0).logEndOffset()).append(" cache: ").append(this.epochCache(follower).epochEntries()).toString());
    }

    /*
     * WARNING - void declaration
     */
    private void waitForLogsToMatch(KafkaServer b1, KafkaServer b2, int partition) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$waitForLogsToMatch$1(this, b1, partition, b2)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$waitForLogsToMatch$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private int waitForLogsToMatch$default$3() {
        return 0;
    }

    private void printSegments() {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Broker0:");
        DumpLogSegments$.MODULE$.main((String[])new .colon.colon((Object)"--files", (List)new .colon.colon((Object)this.getLogFile((KafkaServer)this.brokers().apply(0), 0).getCanonicalPath(), (List)Nil$.MODULE$)).toArray(ClassTag$.MODULE$.apply(String.class)));
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Broker1:");
        DumpLogSegments$.MODULE$.main((String[])new .colon.colon((Object)"--files", (List)new .colon.colon((Object)this.getLogFile((KafkaServer)this.brokers().apply(1), 0).getCanonicalPath(), (List)Nil$.MODULE$)).toArray(ClassTag$.MODULE$.apply(String.class)));
    }

    private KafkaConsumer<byte[], byte[]> startConsumer() {
        Properties consumerConfig = new Properties();
        consumerConfig.put("bootstrap.servers", TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()));
        consumerConfig.put("fetch.max.bytes", String.valueOf(this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length() * 2L));
        consumerConfig.put("max.partition.fetch.bytes", String.valueOf(this.getLogFile((KafkaServer)this.brokers().apply(1), 0).length() * 2L));
        this.consumer_$eq((KafkaConsumer<byte[], byte[]>)new KafkaConsumer(consumerConfig, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()));
        this.consumer().assign((Collection)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)new TopicPartition(this.topic(), 0), (List)Nil$.MODULE$)).asJava());
        this.consumer().seek(new TopicPartition(this.topic(), 0), 0L);
        return this.consumer();
    }

    private void deleteMessagesFromLogFile(long bytes, KafkaServer broker, int partitionId) {
        File logFile = this.getLogFile(broker, partitionId);
        RandomAccessFile writable = new RandomAccessFile(logFile, "rwd");
        writable.setLength(logFile.length() - bytes);
        writable.close();
    }

    private KafkaProducer<byte[], byte[]> createBufferingProducer() {
        String x$1 = TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers());
        int x$2 = -1;
        int x$3 = 10000;
        int x$4 = this.msg().length * 1000;
        String x$5 = "snappy";
        long x$6 = TestUtils$.MODULE$.createProducer$default$3();
        long x$7 = TestUtils$.MODULE$.createProducer$default$4();
        int x$8 = TestUtils$.MODULE$.createProducer$default$5();
        int x$9 = TestUtils$.MODULE$.createProducer$default$6();
        int x$10 = TestUtils$.MODULE$.createProducer$default$10();
        SecurityProtocol x$11 = TestUtils$.MODULE$.createProducer$default$11();
        Option<File> x$12 = TestUtils$.MODULE$.createProducer$default$12();
        Option<Properties> x$13 = TestUtils$.MODULE$.createProducer$default$13();
        ByteArraySerializer x$14 = TestUtils$.MODULE$.createProducer$default$14();
        ByteArraySerializer x$15 = TestUtils$.MODULE$.createProducer$default$15();
        boolean x$16 = TestUtils$.MODULE$.createProducer$default$16();
        return TestUtils$.MODULE$.createProducer(x$1, x$2, x$6, x$7, x$8, x$9, x$3, x$4, x$5, x$10, x$11, x$12, x$13, x$14, x$15, x$16);
    }

    private File getLogFile(KafkaServer broker, int partition) {
        AbstractLog log = this.getLog(broker, partition);
        log.flush(false);
        return ((File[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])log.dir().listFiles())).filter((Function1 & Serializable & scala.Serializable)x$12 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$getLogFile$1(x$12))))[0];
    }

    private AbstractLog getLog(KafkaServer broker, int partition) {
        LogManager qual$1 = broker.logManager();
        TopicPartition x$1 = new TopicPartition(this.topic(), partition);
        boolean x$2 = qual$1.getLog$default$2();
        return (AbstractLog)qual$1.getLog(x$1, x$2).orNull(Predef$.MODULE$.$conforms());
    }

    private void bounce(KafkaServer follower) {
        follower.shutdown();
        follower.startup();
        this.producer().close();
        this.producer_$eq(this.createProducer());
    }

    private LeaderEpochFileCache epochCache(KafkaServer broker) {
        return (LeaderEpochFileCache)this.getLog(broker, 0).leaderEpochCache().get();
    }

    private RecordBatch latestRecord(KafkaServer leader, int offset, int partition) {
        LogSegment qual$1 = this.getLog(leader, partition).activeSegment();
        long x$1 = 0L;
        int x$2 = Integer.MAX_VALUE;
        long x$3 = qual$1.read$default$3();
        boolean x$4 = qual$1.read$default$4();
        return (RecordBatch)((TraversableOnce)CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(qual$1.read(x$1, x$2, x$3, x$4).records().batches()).asScala()).toSeq().last();
    }

    private int latestRecord$default$2() {
        return -1;
    }

    private int latestRecord$default$3() {
        return 0;
    }

    /*
     * WARNING - void declaration
     */
    private void awaitISR(TopicPartition tp) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$awaitISR$1(this, tp)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$awaitISR$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private KafkaProducer<byte[], byte[]> createProducer() {
        return TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(this.brokers()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16());
    }

    private KafkaServer leader() {
        Assertions.assertEquals((int)2, (int)this.brokers().size());
        int leaderId = BoxesRunTime.unboxToInt((Object)this.zkClient().getLeaderForPartition(new TopicPartition(this.topic(), 0)).get());
        return (KafkaServer)((IterableLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$13 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$leader$1(leaderId, x$13)))).head();
    }

    private KafkaServer follower() {
        Assertions.assertEquals((int)2, (int)this.brokers().size());
        int leader = BoxesRunTime.unboxToInt((Object)this.zkClient().getLeaderForPartition(new TopicPartition(this.topic(), 0)).get());
        return (KafkaServer)((IterableLike)this.brokers().filter((Function1 & Serializable & scala.Serializable)x$14 -> BoxesRunTime.boxToBoolean((boolean)EpochDrivenReplicationProtocolAcceptanceTest.$anonfun$follower$1(leader, x$14)))).head();
    }

    private KafkaServer createBrokerWithId(int id, boolean enableUncleanLeaderElection) {
        Properties config = TestUtils$.MODULE$.createBrokerConfig(id, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        TestUtils$.MODULE$.setIbpAndMessageFormatVersions(config, this.metadataVersion());
        config.setProperty(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), Boolean.toString(enableUncleanLeaderElection));
        return TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(config), TestUtils$.MODULE$.createServer$default$2());
    }

    private boolean createBrokerWithId$default$2() {
        return false;
    }

    public static final /* synthetic */ RecordMetadata $anonfun$shouldNotAllowDivergentLogs$2(EpochDrivenReplicationProtocolAcceptanceTest $this, int i) {
        return (RecordMetadata)$this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(0), (Object)String.valueOf(BoxesRunTime.boxToInteger((int)i)).getBytes(), (Object)$this.msg())).get();
    }

    public static final /* synthetic */ Future $anonfun$shouldNotAllowDivergentLogs$3(EpochDrivenReplicationProtocolAcceptanceTest $this, int i) {
        return $this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(0), (Object)String.valueOf(BoxesRunTime.boxToInteger((int)i)).getBytes(), (Object)$this.msg()));
    }

    public static final /* synthetic */ Future $anonfun$offsetsShouldNotGoBackwards$4(EpochDrivenReplicationProtocolAcceptanceTest $this, int x$5) {
        return $this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)$this.msg()));
    }

    public static final /* synthetic */ Future $anonfun$offsetsShouldNotGoBackwards$5(EpochDrivenReplicationProtocolAcceptanceTest $this, int x$6) {
        return $this.producer().send(new ProducerRecord($this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)$this.msg()));
    }

    public static final /* synthetic */ void $anonfun$offsetsShouldNotGoBackwards$6(LongRef prevOffset$1, ConsumerRecord r) {
        Assertions.assertTrue((r.offset() > prevOffset$1.elem ? 1 : 0) != 0, (String)new StringBuilder(21).append("Offset ").append(prevOffset$1.elem).append(" came before ").append(r.offset()).append(" ").toString());
        prevOffset$1.elem = r.offset();
    }

    public static final /* synthetic */ boolean $anonfun$shouldSurviveFastLeaderChange$3(int leaderId$1, KafkaServer x$8) {
        return x$8.config().brokerId() == leaderId$1;
    }

    public static final /* synthetic */ boolean $anonfun$shouldSurviveFastLeaderChange$4(int leaderId$1, KafkaServer x$9) {
        return x$9.config().brokerId() != leaderId$1;
    }

    public static final /* synthetic */ boolean $anonfun$shouldSurviveFastLeaderChange$5(EpochDrivenReplicationProtocolAcceptanceTest $this, IntRef messagesWritten$1, KafkaServer broker) {
        return $this.getLog(broker, 0).logEndOffset() == (long)messagesWritten$1.elem;
    }

    private final Seq crcSeq$1(KafkaServer broker, int partition) {
        LogSegment qual$1 = this.getLog(broker, partition).activeSegment();
        long x$1 = 0L;
        int x$2 = Integer.MAX_VALUE;
        long x$3 = qual$1.read$default$3();
        boolean x$4 = qual$1.read$default$4();
        return (Seq)((TraversableOnce)CollectionConverters$.MODULE$.iterableAsScalaIterableConverter(qual$1.read(x$1, x$2, x$3, x$4).records().batches()).asScala()).toSeq().map((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToLong((long)x$11.checksum()), Seq$.MODULE$.canBuildFrom());
    }

    private static final int crcSeq$default$2$1() {
        return 0;
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogsToMatch$1(EpochDrivenReplicationProtocolAcceptanceTest $this, KafkaServer b1$1, int partition$1, KafkaServer b2$1) {
        return $this.getLog(b1$1, partition$1).logEndOffset() == $this.getLog(b2$1, partition$1).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$waitForLogsToMatch$2() {
        return "Logs didn't match.";
    }

    public static final /* synthetic */ boolean $anonfun$getLogFile$1(File x$12) {
        return x$12.getName().endsWith(".log");
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$1(EpochDrivenReplicationProtocolAcceptanceTest $this, TopicPartition tp$2) {
        return ((Partition)$this.leader().replicaManager().onlinePartition(tp$2).get()).inSyncReplicaIds().size() == 2;
    }

    public static final /* synthetic */ String $anonfun$awaitISR$2() {
        return "Timed out waiting for replicas to join ISR";
    }

    public static final /* synthetic */ boolean $anonfun$leader$1(int leaderId$2, KafkaServer x$13) {
        return x$13.config().brokerId() == leaderId$2;
    }

    public static final /* synthetic */ boolean $anonfun$follower$1(int leader$2, KafkaServer x$14) {
        return x$14.config().brokerId() != leader$2;
    }

    public EpochDrivenReplicationProtocolAcceptanceTest() {
        this.topic = "topic1";
    }
}

