/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Optional;
import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.Log;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.FailedPartitions;
import kafka.server.Fetching$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsManager;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicaState;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.immutable.List$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\rd\u0001B\u0001\u0003\u0001\u001d\u0011\u0001DU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$G+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tIA\"D\u0001\u000b\u0015\u0005Y\u0011!B:dC2\f\u0017BA\u0007\u000b\u0005\u0019\te.\u001f*fM\")q\u0002\u0001C\u0001!\u00051A(\u001b8jiz\"\u0012!\u0005\t\u0003%\u0001i\u0011A\u0001\u0005\b)\u0001\u0011\r\u0011\"\u0003\u0016\u0003\u0011!\u0018\u0007\u001d\u0019\u0016\u0003Y\u0001\"aF\u0010\u000e\u0003aQ!!\u0007\u000e\u0002\r\r|W.\\8o\u0015\t)1D\u0003\u0002\u001d;\u00051\u0011\r]1dQ\u0016T\u0011AH\u0001\u0004_J<\u0017B\u0001\u0011\u0019\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:DaA\t\u0001!\u0002\u00131\u0012!\u0002;2aB\u0002\u0003b\u0002\u0013\u0001\u0005\u0004%I!F\u0001\u0005iF\u0002\u0018\u0007\u0003\u0004'\u0001\u0001\u0006IAF\u0001\u0006iF\u0002\u0018\u0007\t\u0005\bQ\u0001\u0011\r\u0011\"\u0003\u0016\u0003\u0011!(\u0007]\u0019\t\r)\u0002\u0001\u0015!\u0003\u0017\u0003\u0015!(\u0007]\u0019!\u0011\u001da\u0003A1A\u0005\n5\naB\u0019:pW\u0016\u0014XI\u001c3Q_&tG/F\u0001/!\ty#'D\u00011\u0015\t\tD!A\u0004dYV\u001cH/\u001a:\n\u0005M\u0002$A\u0004\"s_.,'/\u00128e!>Lg\u000e\u001e\u0005\u0007k\u0001\u0001\u000b\u0011\u0002\u0018\u0002\u001f\t\u0014xn[3s\u000b:$\u0007k\\5oi\u0002Bqa\u000e\u0001C\u0002\u0013%\u0001(\u0001\tgC&dW\r\u001a)beRLG/[8ogV\t\u0011\b\u0005\u0002\u0013u%\u00111H\u0001\u0002\u0011\r\u0006LG.\u001a3QCJ$\u0018\u000e^5p]NDa!\u0010\u0001!\u0002\u0013I\u0014!\u00054bS2,G\rU1si&$\u0018n\u001c8tA!)q\b\u0001C\u0005\u0001\u0006qqN\u001a4tKR\fe\u000eZ#q_\u000eDGcA!E\u0013B\u0011!CQ\u0005\u0003\u0007\n\u0011ab\u00144gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000eC\u0003F}\u0001\u0007a)A\u0006gKR\u001c\u0007n\u00144gg\u0016$\bCA\u0005H\u0013\tA%B\u0001\u0003M_:<\u0007b\u0002&?!\u0003\u0005\raS\u0001\fY\u0016\fG-\u001a:Fa>\u001c\u0007\u000e\u0005\u0002\n\u0019&\u0011QJ\u0003\u0002\u0004\u0013:$\b\"B(\u0001\t\u0003\u0001\u0016\u0001K:i_VdGmU3oI2\u000bG/Z:u%\u0016\fX/Z:u-\u0016\u00148/[8og\nKH)\u001a4bk2$H#A)\u0011\u0005%\u0011\u0016BA*\u000b\u0005\u0011)f.\u001b;)\u00059+\u0006C\u0001,Z\u001b\u00059&B\u0001-\u001e\u0003\u0015QWO\\5u\u0013\tQvK\u0001\u0003UKN$\b\"\u0002/\u0001\t\u0003\u0001\u0016\u0001Q:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195SKF,Xm\u001d;JM2\u000b7\u000f^#q_\u000eDG)\u001a4j]\u0016$gi\u001c:T_6,\u0007+\u0019:uSRLwN\\:)\u0005m+\u0006\"B0\u0001\t\u0003\u0001\u0017!F1tg\u0016\u0014H\u000fU1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u000b\u0006#\u000647.\u001c\u0005\u0006Ez\u0003\raY\u0001\bM\u0016$8\r[3s!\t\u0011B-\u0003\u0002f\u0005\t)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0007\"B4_\u0001\u0004A\u0017!F:i_VdGMQ3SK\u0006$\u0017PR8s\r\u0016$8\r\u001b\t\u0003\u0013%L!A\u001b\u0006\u0003\u000f\t{w\u000e\\3b]\")AN\u0018a\u0001Q\u0006)2\u000f[8vY\u0012\u0014U\r\u0016:v]\u000e\fG/\u001b8h\u0019><\u0007\"\u00028_\u0001\u0004A\u0017aD:i_VdGMQ3EK2\f\u00170\u001a3\t\u000bA\u0004A\u0011\u0001)\u0002KMDw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$\u0007FA8V\u0011\u0015\u0019\b\u0001\"\u0001Q\u0003y\u001a\bn\\;mI\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQ>sG._%g\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00138po:$vNQ8uQ\"\u0012!/\u0016\u0005\u0006m\u0002!\t\u0001U\u00015g\"|W\u000f\u001c3UeVt7-\u0019;f)>|eMZ:fiN\u0003XmY5gS\u0016$\u0017J\\#q_\u000eDwJ\u001a4tKR\u0014Vm\u001d9p]N,\u0007FA;V\u0011\u0015I\b\u0001\"\u0001Q\u00035\u001b\bn\\;mIR\u0013XO\\2bi\u0016$vn\u00144gg\u0016$8\u000b]3dS\u001aLW\rZ%o\u000bB|7\r[(gMN,GOU3ta>t7/Z%g\r>dGn\\<fe\"\u000b7OT8N_J,W\t]8dQND#\u0001_+\t\u000bq\u0004A\u0011\u0001)\u0002\u0015NDw.\u001e7e\r\u0016$8\r\u001b'fC\u0012,'/\u00129pG\"\u001cVmY8oIRKW.Z%g\u0019\u0016\fG-\u001a:SKBd\u0017.Z:XSRDW\t]8dQ:{Go\u00138po:$vNR8mY><XM\u001d\u0015\u0003wVCQa \u0001\u0005\u0002A\u000b1g\u001d5pk2$Wk]3MK\u0006$WM]#oI>3gm]3u\u0013\u001aLe\u000e^3s\u0005J|7.\u001a:WKJ\u001c\u0018n\u001c8CK2|wO\r\u0019)\u0005y,\u0006BBA\u0003\u0001\u0011\u0005\u0001+\u0001!tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_&s\u0017\u000e^5bY\u001a+Go\u00195PM\u001a\u001cX\r^%g\u0019\u0016\fG-\u001a:SKR,(O\\:V]\u0012,g-\u001b8fI>3gm]3uQ\r\t\u0019!\u0016\u0005\u0007\u0003\u0017\u0001A\u0011\u0001)\u0002cMDw.\u001e7e!>dG.\u00138eK\u001aLg.\u001b;fYfLe\rT3bI\u0016\u0014(+\u001a;ve:\u001c\u0018I\\=Fq\u000e,\u0007\u000f^5p]\"\u001a\u0011\u0011B+\t\r\u0005E\u0001\u0001\"\u0001Q\u0003-\u001a\bn\\;mI6{g/\u001a)beRLG/[8og>+Ho\u00144UeVt7-\u0019;j]\u001edunZ*uCR,\u0007fAA\b+\"1\u0011q\u0003\u0001\u0005\u0002A\u000b\u0001h\u001d5pk2$g)\u001b7uKJ\u0004\u0016M\u001d;ji&|gn]'bI\u0016dU-\u00193fe\u0012+(/\u001b8h\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH\u000fK\u0002\u0002\u0016UCa!!\b\u0001\t\u0003\u0001\u0016\u0001S:i_VdGmQ1uG\",\u0005pY3qi&|gN\u0012:p[\ncwnY6j]\u001e\u001cVM\u001c3XQ\u0016t7\u000b[;ui&tw\rR8x]J+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eQ\r\tY\"\u0016\u0005\b\u0003G\u0001A\u0011AA\u0013\u0003\u0011\u0019H/\u001e2\u0015\u000fE\u000b9#!\r\u0002<!A\u0011\u0011FA\u0011\u0001\u0004\tY#A\u0005qCJ$\u0018\u000e^5p]B\u0019q&!\f\n\u0007\u0005=\u0002GA\u0005QCJ$\u0018\u000e^5p]\"A\u00111GA\u0011\u0001\u0004\t)$\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\u0011\u0007I\t9$C\u0002\u0002:\t\u0011aBU3qY&\u001c\u0017-T1oC\u001e,'\u000f\u0003\u0005\u0002>\u0005\u0005\u0002\u0019AA \u0003\rawn\u001a\t\u0005\u0003\u0003\n)%\u0004\u0002\u0002D)\u0019\u0011Q\b\u0003\n\t\u0005\u001d\u00131\t\u0002\u0004\u0019><\u0007\"CA&\u0001E\u0005I\u0011BA'\u0003aygMZ:fi\u0006sG-\u00129pG\"$C-\u001a4bk2$HEM\u000b\u0003\u0003\u001fR3aSA)W\t\t\u0019\u0006\u0005\u0003\u0002V\u0005}SBAA,\u0015\u0011\tI&a\u0017\u0002\u0013Ut7\r[3dW\u0016$'bAA/\u0015\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005\u0005\u0014q\u000b\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007")
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private OffsetAndEpoch offsetAndEpoch(long fetchOffset, int leaderEpoch) {
        return new OffsetAndEpoch(fetchOffset, leaderEpoch);
    }

    private int offsetAndEpoch$default$2() {
        return 1;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), null, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)None$.MODULE$);
        Assert.assertEquals((long)ApiKeys.FETCH.latestVersion(), (long)thread.fetchRequestVersion());
        Assert.assertEquals((long)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (long)thread.offsetForLeaderEpochRequestVersion());
        Assert.assertEquals((long)ApiKeys.LIST_OFFSETS.latestVersion(), (long)thread.listOffsetRequestVersion());
    }

    @Test
    public void shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)None$.MODULE$).once();
        EasyMock.expect((Object)log2.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(3);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        this.assertPartitionStates((AbstractFetcherThread)thread, false, true, false);
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)3L, (long)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread fetcher, boolean shouldBeReadyForFetch, boolean shouldBeTruncatingLog, boolean shouldBeDelayed) {
        List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.t1p0(), this.t1p1(), this.t2p1()})).foreach((Function1)new Serializable(this, fetcher, shouldBeReadyForFetch, shouldBeTruncatingLog, shouldBeDelayed){
            public static final long serialVersionUID = 0L;
            private final AbstractFetcherThread fetcher$1;
            private final boolean shouldBeReadyForFetch$1;
            private final boolean shouldBeTruncatingLog$1;
            private final boolean shouldBeDelayed$1;

            public final void apply(TopicPartition tp) {
                Assert.assertTrue((boolean)this.fetcher$1.fetchState(tp).isDefined());
                PartitionFetchState fetchState = (PartitionFetchState)this.fetcher$1.fetchState(tp).get();
                Assert.assertEquals((String)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Partition ", " should", " be ready for fetching"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tp, this.shouldBeReadyForFetch$1 ? "" : " NOT"})), (Object)BoxesRunTime.boxToBoolean((boolean)this.shouldBeReadyForFetch$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isReadyForFetch()));
                Assert.assertEquals((String)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Partition ", " should", " be truncating its log"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tp, this.shouldBeTruncatingLog$1 ? "" : " NOT"})), (Object)BoxesRunTime.boxToBoolean((boolean)this.shouldBeTruncatingLog$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isTruncating()));
                Assert.assertEquals((String)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Partition ", " should", " be delayed"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tp, this.shouldBeDelayed$1 ? "" : " NOT"})), (Object)BoxesRunTime.boxToBoolean((boolean)this.shouldBeDelayed$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isDelayed()));
            }
            {
                this.fetcher$1 = fetcher$1;
                this.shouldBeReadyForFetch$1 = shouldBeReadyForFetch$1;
                this.shouldBeTruncatingLog$1 = shouldBeTruncatingLog$1;
                this.shouldBeDelayed$1 = shouldBeDelayed$1;
            }
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        EasyMock.expect((Object)mockBlockingSend.sendRequest((AbstractRequest.Builder)EasyMock.anyObject())).andThrow((Throwable)new NullPointerException()).once();
        EasyMock.replay((Object[])new Object[]{mockBlockingSend});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), null, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend));
        Map result = thread.fetchEpochEndOffsets((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0))})));
        Map expected = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}));
        Assert.assertEquals((String)"results from leader epoch request should have undefined offset", (Object)expected, (Object)result);
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, partition, log2});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)3L, (long)mockNetwork.fetchCount());
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 1))).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new EpochEndOffset(leaderEpoch, 172L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.head(), this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 156 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t2p1()).append((Object)" to truncate to offset 172 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)172)));
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochAtFollower = 5;
        int leaderEpochAtLeader = 4;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 3))).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpochAtFollower))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(leaderEpochAtLeader)).andReturn((Object)None$.MODULE$).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpochAtLeader, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new EpochEndOffset(leaderEpochAtLeader, 202L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.head(), this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 156 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t2p1()).append((Object)" to truncate to offset ").append((Object)BoxesRunTime.boxToInteger((int)initialLEO)).append((Object)" (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)initialLEO)));
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(4, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(4, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)0L, (long)mockNetwork.fetchCount());
        java.util.Map nextOffsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(3, 101L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(3, 102L))}))).asJava();
        mockNetwork.setOffsetsForNextResponse(nextOffsets);
        thread.doWork();
        Assert.assertEquals((long)2L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        Assert.assertEquals((String)"OffsetsForLeaderEpochRequest version.", (long)3L, (long)mockNetwork.lastUsedOffsetForLeaderEpochVersion());
        thread.doWork();
        Assert.assertEquals((long)2L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p1()).append((Object)" to truncate to offset 102 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)102)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 101 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)101)));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(-1, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(-1, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        Assert.assertEquals((String)"OffsetsForLeaderEpochRequest version.", (long)0L, (long)mockNetwork.lastUsedOffsetForLeaderEpochVersion());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p0()).append((Object)" to truncate to offset 155 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)155)));
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected ").append((Object)this.t1p1()).append((Object)" to truncate to offset 143 (truncation offsets: ").append((Object)truncateToCapture.getValues()).append((Object)")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)143)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialFetchOffset = 100;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)initialFetchOffset)).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(-1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.head(), this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(initialFetchOffset, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14(), TestUtils$.MODULE$.createBrokerConfigs$default$15(), TestUtils$.MODULE$.createBrokerConfigs$default$16()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int highWaterMark = 100;
        int initialLeo = 300;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)highWaterMark)).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLeo, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLeo)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.head(), this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)new Serializable(this, thread){
            public static final long serialVersionUID = 0L;
            private final ReplicaFetcherThread thread$1;

            public final void apply(int x$1) {
                this.apply$mcVI$sp(x$1);
            }

            public void apply$mcVI$sp(int x$1) {
                this.thread$1.doWork();
            }
            {
                this.thread$1 = thread$1;
            }
        });
        Assert.assertEquals((long)0L, (long)truncated.getValues().size());
        offsetsReply.put(this.t1p0(), new EpochEndOffset(leaderEpoch, 156L));
        thread.doWork();
        Assert.assertEquals((long)156L, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        int leaderEpoch = 4;
        partition.truncateTo(0L, false);
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p0()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final ReplicaState apply(PartitionFetchState x$2) {
                return x$2.state();
            }
        }));
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final ReplicaState apply(PartitionFetchState x$3) {
                return x$3.state();
            }
        }));
        thread.doWork();
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p0()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final ReplicaState apply(PartitionFetchState x$4) {
                return x$4.state();
            }
        }));
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final ReplicaState apply(PartitionFetchState x$5) {
                return x$5.state();
            }
        }));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        int initialLEO = 100;
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log2 = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log2.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log2.endOffsetForEpoch(5)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, 5))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log2.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log2});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(5, 52L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(5, 49L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        TopicPartition partitionThatBecameLeader = this.t1p0();
        mockNetwork.setEpochRequestCallback((Function0<BoxedUnit>)new Serializable(this, thread, partitionThatBecameLeader){
            public static final long serialVersionUID = 0L;
            private final ReplicaFetcherThread thread$2;
            private final TopicPartition partitionThatBecameLeader$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                this.thread$2.removePartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.partitionThatBecameLeader$1})));
            }
            {
                this.thread$2 = thread$2;
                this.partitionThatBecameLeader$1 = partitionThatBecameLeader$1;
            }
        });
        thread.doWork();
        Assert.assertEquals((long)49L, (long)BoxesRunTime.unboxToLong((Object)truncateToCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        mockBlockingSend.initiateClose();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalArgumentException()).once();
        mockBlockingSend.close();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalStateException()).once();
        EasyMock.replay((Object[])new Object[]{mockBlockingSend});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), null, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend));
        thread.start();
        thread.initiateShutdown();
        thread.awaitShutdown();
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    public void stub(Partition partition, ReplicaManager replicaManager, Log log2) {
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p0())).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.nonOfflinePartition(this.t1p0())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p1())).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.nonOfflinePartition(this.t1p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t2p1())).andReturn((Object)log2).anyTimes();
        EasyMock.expect((Object)replicaManager.nonOfflinePartition(this.t2p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
    }
}

