/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Optional;
import kafka.api.Request$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.FetchPartitionData;
import kafka.server.Fetching$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.PartitionFetchState$;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicaState;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.mutable.StringBuilder;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005-f\u0001B\u0001\u0003\u0001\u001d\u0011QDU3qY&\u001c\u0017-\u00117uKJdun\u001a#jeN$\u0006N]3bIR+7\u000f\u001e\u0006\u0003\u0007\u0011\taa]3sm\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0002\u0017\u0005)1oY1mC&\u0011QB\u0003\u0002\u0007\u0003:L(+\u001a4\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001b\u0002\u000b\u0001\u0005\u0004%I!F\u0001\u0005iF\u0002\b'F\u0001\u0017!\t9r$D\u0001\u0019\u0015\tI\"$\u0001\u0004d_6lwN\u001c\u0006\u0003\u000bmQ!\u0001H\u000f\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005q\u0012aA8sO&\u0011\u0001\u0005\u0007\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0011\u0019\u0011\u0003\u0001)A\u0005-\u0005)A/\r91A!9A\u0005\u0001b\u0001\n\u0013)\u0012\u0001\u0002;2aFBaA\n\u0001!\u0002\u00131\u0012!\u0002;2aF\u0002\u0003b\u0002\u0015\u0001\u0005\u0004%I!K\u0001\u0011M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N,\u0012A\u000b\t\u0003%-J!\u0001\f\u0002\u0003!\u0019\u000b\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001c\bB\u0002\u0018\u0001A\u0003%!&A\tgC&dW\r\u001a)beRLG/[8og\u0002BQ\u0001\r\u0001\u0005\nE\nab\u001c4gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000eF\u00023ki\u0002\"AE\u001a\n\u0005Q\u0012!AD(gMN,G/\u00118e\u000bB|7\r\u001b\u0005\u0006m=\u0002\raN\u0001\fM\u0016$8\r[(gMN,G\u000f\u0005\u0002\nq%\u0011\u0011H\u0003\u0002\u0005\u0019>tw\rC\u0004<_A\u0005\t\u0019\u0001\u001f\u0002\u00171,\u0017\rZ3s\u000bB|7\r\u001b\t\u0003\u0013uJ!A\u0010\u0006\u0003\u0007%sG\u000fC\u0003A\u0001\u0011\u0005\u0011)\u0001\u0017tQ>,H\u000e\u001a(pi\u0006#G\rU1si&$\u0018n\u001c8JM\u001a+H/\u001e:f\u0019><\u0017j\u001d(pi\u0012+g-\u001b8fIR\t!\t\u0005\u0002\n\u0007&\u0011AI\u0003\u0002\u0005+:LG\u000f\u000b\u0002@\rB\u0011qIS\u0007\u0002\u0011*\u0011\u0011*H\u0001\u0006UVt\u0017\u000e^\u0005\u0003\u0017\"\u0013A\u0001V3ti\")Q\n\u0001C\u0001\u0003\u0006a3\u000f[8vY\u0012,\u0006\u000fZ1uK2+\u0017\rZ3s\u000bB|7\r[!gi\u0016\u0014h)\u001a8dK\u0012,\u0005o\\2i\u000bJ\u0014xN\u001d\u0015\u0003\u0019\u001aCQ\u0001\u0015\u0001\u0005\u0002\u0005\u000bae\u001d5pk2$'+\u001a9mC\u000e,7)\u001e:sK:$Hj\\4ESJ<\u0006.\u001a8DCV<\u0007\u000e^+qQ\tye\tC\u0003T\u0001\u0011%A+A\fn_\u000e\\g)\u001a;dQ\u001a\u0013x.\\\"veJ,g\u000e\u001e'pOR1!)V,dQ6DQA\u0016*A\u0002Y\ta\u0002^8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eC\u0003Y%\u0002\u0007\u0011,A\u0006sKF,Xm\u001d;ECR\f\u0007C\u0001.a\u001d\tYf,D\u0001]\u0015\ti\u0006$\u0001\u0005sKF,Xm\u001d;t\u0013\tyF,\u0001\u0007GKR\u001c\u0007NU3rk\u0016\u001cH/\u0003\u0002bE\ni\u0001+\u0019:uSRLwN\u001c#bi\u0006T!a\u0018/\t\u000b\u0011\u0014\u0006\u0019A3\u0002\r\r|gNZ5h!\t\u0011b-\u0003\u0002h\u0005\tY1*\u00194lC\u000e{gNZ5h\u0011\u0015I'\u000b1\u0001k\u00039\u0011X\r\u001d7jG\u0006l\u0015M\\1hKJ\u0004\"AE6\n\u00051\u0014!A\u0004*fa2L7-Y'b]\u0006<WM\u001d\u0005\u0006]J\u0003\ra\\\u0001\re\u0016\u001c\bo\u001c8tK\u0012\u000bG/\u0019\t\u0003%AL!!\u001d\u0002\u0003%\u0019+Go\u00195QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u0005\u0006g\u0002!\t!Q\u0001#SN\u001cX/Z:Fa>\u001c\u0007NU3rk\u0016\u001cHO\u0012:p[2{7-\u00197SKBd\u0017nY1)\u0005I4\u0005\"\u0002<\u0001\t\u0003\t\u0015!\u00104fi\u000eDW\t]8dQN4%o\\7MK\u0006$WM]*i_VdG\rS1oI2,W\t_2faRLwN\u001c$s_6<U\r\u001e'pG\u0006d'+\u001a9mS\u000e\f\u0007FA;G\u0011\u0015I\b\u0001\"\u0001B\u0003u\u0019\bn\\;mIR\u0013XO\\2bi\u0016$vNU3qY&\u001c\u0017m\u00144gg\u0016$\bF\u0001=G\u0011\u0015a\b\u0001\"\u0001B\u00035\u001a\bn\\;mIR\u0013XO\\2bi\u0016$v.\u00128e\u001f\u001a47/\u001a;PM2\u000b'oZ3ti\u000e{W.\\8o\u000bB|7\r\u001b\u0015\u0003w\u001aCQa \u0001\u0005\u0002\u0005\u000b\u0011i\u001d5pk2$GK];oG\u0006$X\rV8J]&$\u0018.\u00197GKR\u001c\u0007n\u00144gg\u0016$\u0018J\u001a*fa2L7-\u0019*fiV\u0014hn]+oI\u00164\u0017N\\3e\u001f\u001a47/\u001a;)\u0005y4\u0005BBA\u0003\u0001\u0011\u0005\u0011)A\u0016tQ>,H\u000e\u001a)pY2Le\u000eZ3gS:LG/\u001a7z\u0013\u001a\u0014V\r\u001d7jG\u0006tu\u000e^!wC&d\u0017M\u00197fQ\r\t\u0019A\u0012\u0005\u0007\u0003\u0017\u0001A\u0011A!\u0002MMDw.\u001e7e\r\u0016$8\r\u001b'fC\u0012,'/\u00129pG\"|eNR5sgR4U\r^2i\u001f:d\u0017\u0010K\u0002\u0002\n\u0019Ca!!\u0005\u0001\t\u0003\t\u0015\u0001H:i_VdGMR3uG\"|e.\u001a*fa2L7-Y!u\u0003RKW.\u001a\u0015\u0004\u0003\u001f1\u0005BBA\f\u0001\u0011\u0005\u0011)A\u0017tQ>,H\u000e\u001a$fi\u000eDgj\u001c8EK2\f\u00170\u001a3B]\u0012tuN\u001c+sk:\u001c\u0017\r^5oOJ+\u0007\u000f\\5dCND3!!\u0006G\u0011\u001d\ti\u0002\u0001C\u0001\u0003?\tAa\u001d;vERa\u0011\u0011EA \u0003\u001f\n\u0019&a\u0016\u0002\\A1\u00111EA\u0015\u0003[i!!!\n\u000b\u0007\u0005\u001dR$\u0001\u0005fCNLXn\\2l\u0013\u0011\tY#!\n\u0003'%+\u0005\u0010]3di\u0006$\u0018n\u001c8TKR$XM]:\u0011\u000b%\ty#a\r\n\u0007\u0005E\"B\u0001\u0004PaRLwN\u001c\t\u0005\u0003k\tY$\u0004\u0002\u00028)\u0019\u0011\u0011\b\u0003\u0002\u000f\rdWo\u001d;fe&!\u0011QHA\u001c\u0005%\u0001\u0016M\u001d;ji&|g\u000e\u0003\u0005\u0002B\u0005m\u0001\u0019AA\"\u0003\u001dawn\u001a+2aB\u0002B!!\u0012\u0002L5\u0011\u0011q\t\u0006\u0004\u0003\u0013\"\u0011a\u00017pO&!\u0011QJA$\u0005-\t%m\u001d;sC\u000e$Hj\\4\t\u0011\u0005E\u00131\u0004a\u0001\u0003\u0007\nq\u0001\\8h)F\u0002\u0018\u0007\u0003\u0005\u0002V\u0005m\u0001\u0019AA\"\u0003%1W\u000f^;sK2{w\r\u0003\u0005\u0002Z\u0005m\u0001\u0019AA\u001a\u0003%\u0001\u0018M\u001d;ji&|g\u000e\u0003\u0004j\u00037\u0001\rA\u001b\u0005\b\u0003?\u0002A\u0011AA1\u0003U\u0019H/\u001e2XSRDg)\u001a;dQ6+7o]1hKN$b\"a\u0019\u0002f\u0005\u001d\u0014\u0011NA6\u0003[\ny\u0007E\u0003\u0002$\u0005%\"\t\u0003\u0005\u0002B\u0005u\u0003\u0019AA\"\u0011!\t\t&!\u0018A\u0002\u0005\r\u0003\u0002CA+\u0003;\u0002\r!a\u0011\t\u0011\u0005e\u0013Q\fa\u0001\u0003gAa![A/\u0001\u0004Q\u0007\u0002CA9\u0003;\u0002\r!a\u001d\u0002!I,7\u000f]8og\u0016\u001c\u0015\r\u001c7cC\u000e\\\u0007CBA\u0012\u0003k\nI(\u0003\u0003\u0002x\u0005\u0015\"aB\"baR,(/\u001a\t\u0007\u0013\u0005m\u0014q\u0010\"\n\u0007\u0005u$BA\u0005Gk:\u001cG/[8ocA1\u0011\u0011QAD\u0003\u0017k!!a!\u000b\u0007\u0005\u0015%\"\u0001\u0006d_2dWm\u0019;j_:LA!!#\u0002\u0004\n\u00191+Z9\u0011\u000b%\tiIF8\n\u0007\u0005=%B\u0001\u0004UkBdWM\r\u0005\n\u0003'\u0003\u0011\u0013!C\u0005\u0003+\u000b\u0001d\u001c4gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000e\n3fM\u0006,H\u000e\u001e\u00133+\t\t9JK\u0002=\u00033[#!a'\u0011\t\u0005u\u0015qU\u0007\u0003\u0003?SA!!)\u0002$\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003KS\u0011AC1o]>$\u0018\r^5p]&!\u0011\u0011VAP\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
public class ReplicaAlterLogDirsThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private OffsetAndEpoch offsetAndEpoch(long fetchOffset, int leaderEpoch) {
        return new OffsetAndEpoch(fetchOffset, leaderEpoch);
    }

    private int offsetAndEpoch$default$2() {
        return 1;
    }

    @Test
    public void shouldNotAddPartitionIfFutureLogIsNotDefined() {
        int brokerId = 1;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats());
        Set addedPartitions = thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        Assert.assertEquals((Object)Predef$.MODULE$.Set().empty(), (Object)addedPartitions);
        Assert.assertEquals((long)0L, (long)thread.partitionCount());
        Assert.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
    }

    @Test
    public void shouldUpdateLeaderEpochAfterFencedEpochError() {
        int brokerId = 1;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog futureLog = (AbstractLog)Mockito.mock(AbstractLog.class);
        int leaderEpoch = 5;
        int logEndOffset2 = 0;
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.nonOfflinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).thenReturn((Object)partition);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new EpochEndOffset(leaderEpoch, (long)logEndOffset2));
        Mockito.when((Object)partition.futureLocalLogOrException()).thenReturn((Object)futureLog);
        ((Partition)Mockito.doNothing().when((Object)partition)).truncateTo(0L, true);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.maybeReplaceCurrentWithFutureReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        FetchRequest.PartitionData fencedRequestData = new FetchRequest.PartitionData(0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch - 1)));
        FetchPartitionData fencedResponseData = new FetchPartitionData(Errors.FENCED_LEADER_EPOCH, -1L, -1L, (Records)MemoryRecords.EMPTY, (Option)None$.MODULE$, (Option)None$.MODULE$, (Option)None$.MODULE$, false);
        this.mockFetchFromCurrentLog(this.t1p0(), fencedRequestData, config, replicaManager, fencedResponseData);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats());
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch - 1))})));
        Assert.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assert.assertEquals((long)1L, (long)thread.partitionCount());
        thread.doWork();
        Assert.assertTrue((boolean)this.failedPartitions().contains(this.t1p0()));
        Assert.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assert.assertEquals((long)0L, (long)thread.partitionCount());
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch))})));
        Assert.assertEquals((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)), (Object)thread.fetchState(this.t1p0()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final int apply(PartitionFetchState x$1) {
                return x$1.currentLeaderEpoch();
            }
        }));
        Assert.assertEquals((long)1L, (long)thread.partitionCount());
        FetchRequest.PartitionData requestData2 = new FetchRequest.PartitionData(0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch)));
        FetchPartitionData responseData = new FetchPartitionData(Errors.NONE, 0L, 0L, (Records)MemoryRecords.EMPTY, (Option)None$.MODULE$, (Option)None$.MODULE$, (Option)None$.MODULE$, false);
        this.mockFetchFromCurrentLog(this.t1p0(), requestData2, config, replicaManager, responseData);
        thread.doWork();
        Assert.assertFalse((boolean)this.failedPartitions().contains(this.t1p0()));
        Assert.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assert.assertEquals((long)0L, (long)thread.partitionCount());
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUp() {
        int brokerId = 1;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        AbstractLog futureLog = (AbstractLog)Mockito.mock(AbstractLog.class);
        int leaderEpoch = 5;
        int logEndOffset2 = 0;
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.nonOfflinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).thenReturn((Object)partition);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new EpochEndOffset(leaderEpoch, (long)logEndOffset2));
        Mockito.when((Object)partition.futureLocalLogOrException()).thenReturn((Object)futureLog);
        ((Partition)Mockito.doNothing().when((Object)partition)).truncateTo(0L, true);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.maybeReplaceCurrentWithFutureReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        FetchRequest.PartitionData requestData2 = new FetchRequest.PartitionData(0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch)));
        FetchPartitionData responseData = new FetchPartitionData(Errors.NONE, 0L, 0L, (Records)MemoryRecords.EMPTY, (Option)None$.MODULE$, (Option)None$.MODULE$, (Option)None$.MODULE$, false);
        this.mockFetchFromCurrentLog(this.t1p0(), requestData2, config, replicaManager, responseData);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats());
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch))})));
        Assert.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assert.assertEquals((long)1L, (long)thread.partitionCount());
        thread.doWork();
        Assert.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assert.assertEquals((long)0L, (long)thread.partitionCount());
    }

    private void mockFetchFromCurrentLog(TopicPartition topicPartition, FetchRequest.PartitionData requestData2, KafkaConfig config, ReplicaManager replicaManager, FetchPartitionData responseData) {
        ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Function1.class);
        replicaManager.fetchMessages(ArgumentMatchers.eq((long)0L), ArgumentMatchers.eq((int)Request$.MODULE$.FutureLocalReplicaId()), ArgumentMatchers.eq((int)0), BoxesRunTime.unboxToInt((Object)ArgumentMatchers.eq((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(config.replicaFetchResponseMaxBytes())))), ArgumentMatchers.eq((boolean)false), (Seq)ArgumentMatchers.eq((Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)requestData2)}))), (ReplicaQuota)ArgumentMatchers.eq((Object)QuotaFactory.UnboundedQuota$.MODULE$), (Function1)callbackCaptor.capture(), (IsolationLevel)ArgumentMatchers.eq((Object)IsolationLevel.READ_UNCOMMITTED), (Option)ArgumentMatchers.eq((Object)None$.MODULE$));
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer((Answer)new Answer<BoxedUnit>(this, topicPartition, responseData, callbackCaptor){
            private final TopicPartition topicPartition$1;
            private final FetchPartitionData responseData$1;
            private final ArgumentCaptor callbackCaptor$1;

            public void answer(InvocationOnMock invocation) {
                ((Function1)this.callbackCaptor$1.getValue()).apply((Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)this.topicPartition$1, (Object)this.responseData$1)})));
            }
            {
                this.topicPartition$1 = topicPartition$1;
                this.responseData$1 = responseData$1;
                this.callbackCaptor$1 = callbackCaptor$1;
            }
        });
    }

    @Test
    public void issuesEpochRequestFromLocalReplica() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partitionT1p0 = (Partition)EasyMock.createMock(Partition.class);
        Partition partitionT1p1 = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochT1p0 = 2;
        int leaderEpochT1p1 = 5;
        int leoT1p0 = 13;
        int leoT1p1 = 232;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partitionT1p0);
        EasyMock.expect((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p0, false)).andReturn((Object)new EpochEndOffset(leaderEpochT1p0, (long)leoT1p0)).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1(), false)).andStubReturn((Object)partitionT1p1);
        EasyMock.expect((Object)partitionT1p1.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p1, false)).andReturn((Object)new EpochEndOffset(leaderEpochT1p1, (long)leoT1p1)).anyTimes();
        EasyMock.replay((Object[])new Object[]{partitionT1p0, partitionT1p1, replicaManager});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, null, null);
        Map result = thread.fetchEpochEndOffsets((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p1))})));
        Map expected = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NONE, leaderEpochT1p0, (long)leoT1p0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.NONE, leaderEpochT1p1, (long)leoT1p1))}));
        Assert.assertEquals((String)"results from leader epoch request should have offset from local replica", (Object)expected, (Object)result);
    }

    @Test
    public void fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partitionT1p0 = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 2;
        int leo = 13;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partitionT1p0);
        EasyMock.expect((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)leo)).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1(), false)).andThrow((Throwable)new KafkaStorageException()).once();
        EasyMock.replay((Object[])new Object[]{partitionT1p0, replicaManager});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, null, null);
        Map result = thread.fetchEpochEndOffsets((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch))})));
        Map expected = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NONE, leaderEpoch, (long)leo)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, -1, -1L))}));
        Assert.assertEquals((Object)expected, (Object)result);
    }

    @Test
    public void shouldTruncateToReplicaOffset() {
        Capture truncateCaptureT1p0 = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture truncateCaptureT1p1 = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog logT1p0 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog logT1p1 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLogT1p0 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLogT1p1 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partitionT1p0 = (Partition)EasyMock.createMock(Partition.class);
        Partition partitionT1p1 = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int leaderEpoch = 2;
        int futureReplicaLEO = 191;
        int replicaT1p0LEO = 190;
        int replicaT1p1LEO = 192;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partitionT1p0);
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1(), false)).andStubReturn((Object)partitionT1p1);
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p0())).andStubReturn((Object)futureLogT1p0);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p1())).andStubReturn((Object)futureLogT1p1);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p1()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        partitionT1p0.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateCaptureT1p0)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        partitionT1p1.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateCaptureT1p1)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureLogT1p0.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureLogT1p1.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)futureLogT1p0.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)futureLogT1p0.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)replicaT1p0LEO)).anyTimes();
        EasyMock.expect((Object)futureLogT1p1.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)futureLogT1p1.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)replicaT1p1LEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stubWithFetchMessages(logT1p0, logT1p1, futureLogT1p0, partitionT1p0, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partitionT1p0, partitionT1p1, logT1p0, logT1p1, futureLogT1p0, futureLogT1p1});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)replicaT1p0LEO, (long)BoxesRunTime.unboxToLong((Object)truncateCaptureT1p0.getValue()));
        Assert.assertEquals((long)futureReplicaLEO, (long)BoxesRunTime.unboxToLong((Object)truncateCaptureT1p1.getValue()));
    }

    @Test
    public void shouldTruncateToEndOffsetOfLargestCommonEpoch() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog log2 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLog = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int leaderEpoch = 5;
        int futureReplicaLEO = 195;
        int replicaLEO = 200;
        int replicaEpochEndOffset = 190;
        int futureReplicaEpochEndOffset = 191;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p0())).andStubReturn((Object)futureLog);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.eq((boolean)true));
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)futureLog.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)futureLog.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)(leaderEpoch - 2)))).once();
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch - 1, (long)replicaLEO)).anyTimes();
        EasyMock.expect((Object)futureLog.endOffsetForEpoch(leaderEpoch - 1)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch - 2))).anyTimes();
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch - 2, false)).andReturn((Object)new EpochEndOffset(leaderEpoch - 2, (long)replicaEpochEndOffset)).anyTimes();
        EasyMock.expect((Object)futureLog.endOffsetForEpoch(leaderEpoch - 2)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaEpochEndOffset, leaderEpoch - 2))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stubWithFetchMessages(log2, null, futureLog, partition, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partition, log2, futureLog});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected offset ").append((Object)BoxesRunTime.boxToInteger((int)replicaEpochEndOffset)).append((Object)" in captured truncation offsets ").append((Object)truncateToCapture.getValues()).toString(), (boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)replicaEpochEndOffset)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog log2 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLog = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int initialFetchOffset = 100;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.eq((boolean)true));
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p0())).andStubReturn((Object)futureLog);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)futureLog.latestEpoch()).andReturn((Object)None$.MODULE$).anyTimes();
        this.stubWithFetchMessages(log2, null, futureLog, partition, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partition, log2, futureLog});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(initialFetchOffset, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((String)"Expected future replica to truncate to initial fetch offset if replica returns UNDEFINED_EPOCH_OFFSET", (long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfReplicaNotAvailable() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog log2 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLog = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int futureReplicaLeaderEpoch = 1;
        int futureReplicaLEO = 290;
        int replicaLEO = 300;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.eq((boolean)true));
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p0())).andStubReturn((Object)futureLog);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)futureLog.latestEpoch()).andStubReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)futureReplicaLeaderEpoch)));
        EasyMock.expect((Object)futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, futureReplicaLeaderEpoch)));
        EasyMock.expect((Object)replicaManager.localLog(this.t1p0())).andReturn((Object)new Some((Object)log2)).anyTimes();
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), futureReplicaLeaderEpoch, false)).andReturn((Object)new EpochEndOffset(Errors.REPLICA_NOT_AVAILABLE, -1, -1L)).times(3).andReturn((Object)new EpochEndOffset(futureReplicaLeaderEpoch, (long)replicaLEO));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean((Object)EasyMock.anyObject()), (Seq)EasyMock.anyObject(), (ReplicaQuota)EasyMock.anyObject(), (Function1)EasyMock.capture((Capture)responseCallback), (IsolationLevel)EasyMock.anyObject(), (Option)EasyMock.anyObject());
        EasyMock.expect((Object)BoxedUnit.UNIT).andAnswer((IAnswer)new IAnswer<BoxedUnit>(this, responseCallback){
            private final Capture responseCallback$2;

            public void answer() {
                ((Function1)this.responseCallback$2.getValue()).apply((Object)Seq$.MODULE$.empty());
            }
            {
                this.responseCallback$2 = responseCallback$2;
            }
        }).anyTimes();
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partition, log2, futureLog});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)new Serializable(this, thread){
            public static final long serialVersionUID = 0L;
            private final ReplicaAlterLogDirsThread thread$1;

            public final void apply(int x$2) {
                this.apply$mcVI$sp(x$2);
            }

            public void apply$mcVI$sp(int x$2) {
                this.thread$1.doWork();
            }
            {
                this.thread$1 = thread$1;
            }
        });
        Assert.assertEquals((long)0L, (long)truncated.getValues().size());
        thread.doWork();
        Assert.assertEquals((long)futureReplicaLEO, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog log2 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLog = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int leaderEpoch = 5;
        int futureReplicaLEO = 190;
        int replicaLEO = 213;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)replicaLEO));
        partition.truncateTo((long)futureReplicaLEO, true);
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p0())).andStubReturn((Object)futureLog);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        EasyMock.expect((Object)futureLog.latestEpoch()).andStubReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).andStubReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO));
        EasyMock.expect((Object)futureLog.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch)));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stubWithFetchMessages(log2, null, futureLog, partition, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partition, log2, futureLog});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)new Serializable(this, thread){
            public static final long serialVersionUID = 0L;
            private final ReplicaAlterLogDirsThread thread$2;

            public final void apply(int x$3) {
                this.apply$mcVI$sp(x$3);
            }

            public void apply$mcVI$sp(int x$3) {
                this.thread$2.doWork();
            }
            {
                this.thread$2 = thread$2;
            }
        });
        EasyMock.verify((Object[])new Object[]{partition});
    }

    @Test
    public void shouldFetchOneReplicaAtATime() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog log2 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLog = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stub(log2, null, futureLog, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partition, log2});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        int leaderEpoch = 1;
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, leaderEpoch))})));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState(150L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState(160L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$))})));
        if (resultWithPartitions != null) {
            Tuple2 tuple2;
            Option fetchRequestOpt = (Option)resultWithPartitions.result();
            Set partitionsWithError = resultWithPartitions.partitionsWithError();
            Tuple2 tuple22 = tuple2 = new Tuple2((Object)fetchRequestOpt, (Object)partitionsWithError);
            Option fetchRequestOpt2 = (Option)tuple22._1();
            Set partitionsWithError2 = (Set)tuple22._2();
            Assert.assertTrue((boolean)fetchRequestOpt2.isDefined());
            FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch)fetchRequestOpt2.get()).fetchRequest();
            Assert.assertFalse((boolean)fetchRequest.fetchData().isEmpty());
            Assert.assertFalse((boolean)partitionsWithError2.nonEmpty());
            FetchRequest request = (FetchRequest)fetchRequest.build();
            Assert.assertEquals((long)0L, (long)request.minBytes());
            Seq fetchInfos = ((MapLike)CollectionConverters$.MODULE$.mapAsScalaMapConverter(request.fetchData()).asScala()).toSeq();
            Assert.assertEquals((long)1L, (long)fetchInfos.length());
            Assert.assertEquals((String)"Expected fetch request for first partition", (Object)this.t1p0(), (Object)((Tuple2)fetchInfos.head())._1());
            Assert.assertEquals((long)150L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos.head())._2()).fetchOffset);
            return;
        }
        throw new MatchError((Object)resultWithPartitions);
    }

    @Test
    public void shouldFetchNonDelayedAndNonTruncatingReplicas() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        AbstractLog log2 = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        AbstractLog futureLog = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int startOffset = 123;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)startOffset)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stub(log2, null, futureLog, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, partition, log2, futureLog});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        int leaderEpoch = 1;
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, leaderEpoch))})));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)PartitionFetchState$.MODULE$.apply(150L, (Option)None$.MODULE$, leaderEpoch, (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)PartitionFetchState$.MODULE$.apply(160L, (Option)None$.MODULE$, leaderEpoch, (ReplicaState)Truncating$.MODULE$))})));
        if (resultWithPartitions != null) {
            Tuple2 tuple2;
            Option fetchRequestOpt = (Option)resultWithPartitions.result();
            Set partitionsWithError = resultWithPartitions.partitionsWithError();
            Tuple2 tuple22 = tuple2 = new Tuple2((Object)fetchRequestOpt, (Object)partitionsWithError);
            Option fetchRequestOpt2 = (Option)tuple22._1();
            Set partitionsWithError2 = (Set)tuple22._2();
            Assert.assertTrue((boolean)fetchRequestOpt2.isDefined());
            AbstractFetcherThread.ReplicaFetch fetchRequest = (AbstractFetcherThread.ReplicaFetch)fetchRequestOpt2.get();
            Assert.assertFalse((boolean)fetchRequest.partitionData().isEmpty());
            Assert.assertFalse((boolean)partitionsWithError2.nonEmpty());
            Seq fetchInfos = ((MapLike)CollectionConverters$.MODULE$.mapAsScalaMapConverter(((FetchRequest)fetchRequest.fetchRequest().build()).fetchData()).asScala()).toSeq();
            Assert.assertEquals((long)1L, (long)fetchInfos.length());
            Assert.assertEquals((String)"Expected fetch request for non-truncating partition", (Object)this.t1p0(), (Object)((Tuple2)fetchInfos.head())._1());
            Assert.assertEquals((long)150L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos.head())._2()).fetchOffset);
            AbstractFetcherThread.ResultWithPartitions resultWithPartitions2 = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)PartitionFetchState$.MODULE$.apply(140L, (Option)None$.MODULE$, leaderEpoch, (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState(160L, (Option)None$.MODULE$, leaderEpoch, (Option)new Some((Object)new DelayedItem(5000L)), (ReplicaState)Fetching$.MODULE$))})));
            if (resultWithPartitions2 != null) {
                Tuple2 tuple23;
                Option fetchRequest2Opt = (Option)resultWithPartitions2.result();
                Set partitionsWithError22 = resultWithPartitions2.partitionsWithError();
                Tuple2 tuple24 = tuple23 = new Tuple2((Object)fetchRequest2Opt, (Object)partitionsWithError22);
                Option fetchRequest2Opt2 = (Option)tuple24._1();
                Set partitionsWithError23 = (Set)tuple24._2();
                Assert.assertTrue((boolean)fetchRequest2Opt2.isDefined());
                AbstractFetcherThread.ReplicaFetch fetchRequest2 = (AbstractFetcherThread.ReplicaFetch)fetchRequest2Opt2.get();
                Assert.assertFalse((boolean)fetchRequest2.partitionData().isEmpty());
                Assert.assertFalse((boolean)partitionsWithError23.nonEmpty());
                Seq fetchInfos2 = ((MapLike)CollectionConverters$.MODULE$.mapAsScalaMapConverter(((FetchRequest)fetchRequest2.fetchRequest().build()).fetchData()).asScala()).toSeq();
                Assert.assertEquals((long)1L, (long)fetchInfos2.length());
                Assert.assertEquals((String)"Expected fetch request for non-delayed partition", (Object)this.t1p0(), (Object)((Tuple2)fetchInfos2.head())._1());
                Assert.assertEquals((long)140L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos2.head())._2()).fetchOffset);
                AbstractFetcherThread.ResultWithPartitions resultWithPartitions3 = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState(140L, (Option)None$.MODULE$, leaderEpoch, (Option)new Some((Object)new DelayedItem(5000L)), (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState(160L, (Option)None$.MODULE$, leaderEpoch, (Option)new Some((Object)new DelayedItem(5000L)), (ReplicaState)Fetching$.MODULE$))})));
                if (resultWithPartitions3 != null) {
                    Tuple2 tuple25;
                    Option fetchRequest3Opt = (Option)resultWithPartitions3.result();
                    Set partitionsWithError3 = resultWithPartitions3.partitionsWithError();
                    Tuple2 tuple26 = tuple25 = new Tuple2((Object)fetchRequest3Opt, (Object)partitionsWithError3);
                    Option fetchRequest3Opt2 = (Option)tuple26._1();
                    Set partitionsWithError32 = (Set)tuple26._2();
                    Assert.assertTrue((String)"Expected no fetch requests since all partitions are delayed", (boolean)fetchRequest3Opt2.isEmpty());
                    Assert.assertFalse((boolean)partitionsWithError32.nonEmpty());
                    return;
                }
                throw new MatchError((Object)resultWithPartitions3);
            }
            throw new MatchError((Object)resultWithPartitions2);
        }
        throw new MatchError((Object)resultWithPartitions);
    }

    public IExpectationSetters<Option<Partition>> stub(AbstractLog logT1p0, AbstractLog logT1p1, AbstractLog futureLog, Partition partition, ReplicaManager replicaManager) {
        EasyMock.expect((Object)replicaManager.localLog(this.t1p0())).andReturn((Object)new Some((Object)logT1p0)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p0())).andReturn((Object)logT1p0).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p0())).andReturn((Object)futureLog).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        EasyMock.expect((Object)replicaManager.nonOfflinePartition(this.t1p0())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLog(this.t1p1())).andReturn((Object)new Some((Object)logT1p1)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p1())).andReturn((Object)logT1p1).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalLogOrException(this.t1p1())).andReturn((Object)futureLog).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p1()))).andStubReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        return EasyMock.expect((Object)replicaManager.nonOfflinePartition(this.t1p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
    }

    public IExpectationSetters<BoxedUnit> stubWithFetchMessages(AbstractLog logT1p0, AbstractLog logT1p1, AbstractLog futureLog, Partition partition, ReplicaManager replicaManager, Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> responseCallback) {
        this.stub(logT1p0, logT1p1, futureLog, partition, replicaManager);
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean((Object)EasyMock.anyObject()), (Seq)EasyMock.anyObject(), (ReplicaQuota)EasyMock.anyObject(), (Function1)EasyMock.capture(responseCallback), (IsolationLevel)EasyMock.anyObject(), (Option)EasyMock.anyObject());
        return EasyMock.expect((Object)BoxedUnit.UNIT).andAnswer((IAnswer)new IAnswer<BoxedUnit>(this, responseCallback){
            private final Capture responseCallback$1;

            public void answer() {
                ((Function1)this.responseCallback$1.getValue()).apply((Object)Seq$.MODULE$.empty());
            }
            {
                this.responseCallback$1 = responseCallback$1;
            }
        }).anyTimes();
    }
}

