/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.yammer.metrics.core.Meter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.BrokerTopicStats;
import kafka.server.DiskUsageBasedThrottleListener;
import kafka.server.DiskUsageBasedThrottler$;
import kafka.server.FailedPartitions;
import kafka.server.Fetching$;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.PartitionFetchState$;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsManager;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThread$;
import kafka.server.ReplicaFetcherThreadTest$Quota$1$;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicaState;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\t5g\u0001B\u00181\u0001UBQ\u0001\u0010\u0001\u0005\u0002uBq\u0001\u0011\u0001C\u0002\u0013%\u0011\t\u0003\u0004N\u0001\u0001\u0006IA\u0011\u0005\b\u001d\u0002\u0011\r\u0011\"\u0003B\u0011\u0019y\u0005\u0001)A\u0005\u0005\"9\u0001\u000b\u0001b\u0001\n\u0013\t\u0005BB)\u0001A\u0003%!\tC\u0004S\u0001\t\u0007I\u0011C*\t\ri\u0003\u0001\u0015!\u0003U\u0011\u001dY\u0006A1A\u0005\u0012qCa\u0001\u0019\u0001!\u0002\u0013i\u0006\"B1\u0001\t\u0013\u0011\u0007b\u00029\u0001#\u0003%I!\u001d\u0005\u0006y\u0002!\t! \u0005\b\u00033\u0001A\u0011CA\u000e\u0011%\t)\fAI\u0001\n#\t9\fC\u0005\u0002<\u0002\t\n\u0011\"\u0005\u0002>\"1\u0011\u0011\u0019\u0001\u0005\u0002uDa!a3\u0001\t\u0003i\bbBAh\u0001\u0011\u0005\u0011\u0011\u001b\u0005\u0007\u0003[\u0004A\u0011A?\t\r\u0005E\b\u0001\"\u0001~\u0011\u0019\t)\u0010\u0001C\u0001{\"9\u0011\u0011 \u0001\u0005\u0012\u0005m\b\u0002\u0003B\b\u0001E\u0005I\u0011C9\t\u000f\tE\u0001\u0001\"\u0001\u0003\u0014!A!Q\u0004\u0001\u0012\u0002\u0013\u0005\u0011\u000f\u0003\u0004\u0003 \u0001!\t! \u0005\u0007\u0005G\u0001A\u0011A?\t\r\t\u001d\u0002\u0001\"\u0001~\u0011\u0019\u0011Y\u0003\u0001C\u0001{\"1!q\u0006\u0001\u0005\u0002uDaAa\r\u0001\t\u0003i\bB\u0002B\u001c\u0001\u0011\u0005Q\u0010\u0003\u0004\u0003<\u0001!\t! \u0005\u0007\u0005\u007f\u0001A\u0011A?\t\r\t\r\u0003\u0001\"\u0001~\u0011\u0019\u00119\u0005\u0001C\u0001{\"1!1\n\u0001\u0005\u0002uDaAa\u0014\u0001\t\u0003i\bB\u0002B*\u0001\u0011\u0005Q\u0010C\u0004\u0003X\u0001!IA!\u0017\t\u000f\t]\u0003\u0001\"\u0003\u0003\f\"9!1\u0015\u0001\u0005\n\t\u0015\u0006b\u0002BV\u0001\u0011\u0005!Q\u0016\u0005\b\u0005\u0013\u0004A\u0011\u0002Bf\u0005a\u0011V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bIR+7\u000f\u001e\u0006\u0003cI\naa]3sm\u0016\u0014(\"A\u001a\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u000e\t\u0003oij\u0011\u0001\u000f\u0006\u0002s\u0005)1oY1mC&\u00111\b\u000f\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005q\u0004CA \u0001\u001b\u0005\u0001\u0014\u0001\u0002;2aB*\u0012A\u0011\t\u0003\u0007.k\u0011\u0001\u0012\u0006\u0003\u000b\u001a\u000baaY8n[>t'BA\u001aH\u0015\tA\u0015*\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0015\u0006\u0019qN]4\n\u00051#%A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\\\u0001\u0006iF\u0002\b\u0007I\u0001\u0005iF\u0002\u0018'A\u0003ucA\f\u0004%\u0001\u0003ueA\f\u0014!\u0002;3aF\u0002\u0013A\u00042s_.,'/\u00128e!>Lg\u000e^\u000b\u0002)B\u0011Q\u000bW\u0007\u0002-*\u0011qKM\u0001\bG2,8\u000f^3s\u0013\tIfK\u0001\bCe>\\WM]#oIB{\u0017N\u001c;\u0002\u001f\t\u0014xn[3s\u000b:$\u0007k\\5oi\u0002\n\u0001CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:\u0016\u0003u\u0003\"a\u00100\n\u0005}\u0003$\u0001\u0005$bS2,G\rU1si&$\u0018n\u001c8t\u0003E1\u0017-\u001b7fIB\u000b'\u000f^5uS>t7\u000fI\u0001\u0012S:LG/[1m\r\u0016$8\r[*uCR,GcA2gWB\u0011q\bZ\u0005\u0003KB\u0012\u0011#\u00138ji&\fGNR3uG\"\u001cF/\u0019;f\u0011\u00159G\u00021\u0001i\u0003-1W\r^2i\u001f\u001a47/\u001a;\u0011\u0005]J\u0017B\u000169\u0005\u0011auN\\4\t\u000f1d\u0001\u0013!a\u0001[\u0006YA.Z1eKJ,\u0005o\\2i!\t9d.\u0003\u0002pq\t\u0019\u0011J\u001c;\u00027%t\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\r\n3fM\u0006,H\u000e\u001e\u00133+\u0005\u0011(FA7tW\u0005!\bCA;{\u001b\u00051(BA<y\u0003%)hn\u00195fG.,GM\u0003\u0002zq\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\u0005m4(!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u000691\r\\3b]V\u0004H#\u0001@\u0011\u0005]z\u0018bAA\u0001q\t!QK\\5uQ\rq\u0011Q\u0001\t\u0005\u0003\u000f\t)\"\u0004\u0002\u0002\n)!\u00111BA\u0007\u0003\r\t\u0007/\u001b\u0006\u0005\u0003\u001f\t\t\"A\u0004kkBLG/\u001a:\u000b\u0007\u0005M\u0011*A\u0003kk:LG/\u0003\u0003\u0002\u0018\u0005%!!C!gi\u0016\u0014X)Y2i\u0003i\u0019'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1e)i\ti\"a\t\u0002>\u0005\u0005\u0013QIA(\u0003#\nY&!\u001b\u0002z\u0005\r\u0015QTAU!\ry\u0014qD\u0005\u0004\u0003C\u0001$\u0001\u0006*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rC\u0004\u0002&=\u0001\r!a\n\u0002\t9\fW.\u001a\t\u0005\u0003S\t9D\u0004\u0003\u0002,\u0005M\u0002cAA\u0017q5\u0011\u0011q\u0006\u0006\u0004\u0003c!\u0014A\u0002\u001fs_>$h(C\u0002\u00026a\na\u0001\u0015:fI\u00164\u0017\u0002BA\u001d\u0003w\u0011aa\u0015;sS:<'bAA\u001bq!1\u0011qH\bA\u00025\f\u0011BZ3uG\",'/\u00133\t\r\u0005\rs\u00021\u0001U\u00031\u0019x.\u001e:dK\n\u0013xn[3s\u0011\u001d\t9e\u0004a\u0001\u0003\u0013\nAB\u0019:pW\u0016\u00148i\u001c8gS\u001e\u00042aPA&\u0013\r\ti\u0005\r\u0002\f\u0017\u000647.Y\"p]\u001aLw\rC\u0003\\\u001f\u0001\u0007Q\fC\u0004\u0002T=\u0001\r!!\u0016\u0002\u0015I,\u0007\u000f\\5dC6;'\u000fE\u0002@\u0003/J1!!\u00171\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJDq!!\u0018\u0010\u0001\u0004\ty&A\u0004nKR\u0014\u0018nY:\u0011\t\u0005\u0005\u0014QM\u0007\u0003\u0003GR1!!\u0018E\u0013\u0011\t9'a\u0019\u0003\u000f5+GO]5dg\"9\u00111N\bA\u0002\u00055\u0014\u0001\u0002;j[\u0016\u0004B!a\u001c\u0002v5\u0011\u0011\u0011\u000f\u0006\u0004\u0003g\"\u0015!B;uS2\u001c\u0018\u0002BA<\u0003c\u0012A\u0001V5nK\"9\u00111P\bA\u0002\u0005u\u0014!B9v_R\f\u0007cA \u0002\u0000%\u0019\u0011\u0011\u0011\u0019\u0003\u0019I+\u0007\u000f\\5dCF+x\u000e^1\t\u000f\u0005\u0015u\u00021\u0001\u0002\b\u0006\u0001B/[3s'R\fG/\u001a$fi\u000eDWM\u001d\t\u0006o\u0005%\u0015QR\u0005\u0004\u0003\u0017C$AB(qi&|g\u000e\u0005\u0003\u0002\u0010\u0006eUBAAI\u0015\u0011\t\u0019*!&\u0002\u000f\u0019,Go\u00195fe*\u0019\u0011q\u0013\u001a\u0002\tQLWM]\u0005\u0005\u00037\u000b\tJ\u0001\tUS\u0016\u00148\u000b^1uK\u001a+Go\u00195fe\"I\u0011qT\b\u0011\u0002\u0003\u0007\u0011\u0011U\u0001\u001bY\u0016\fG-\u001a:F]\u0012\u0004x.\u001b8u\u00052|7m[5oON+g\u000e\u001a\t\u0006o\u0005%\u00151\u0015\t\u0004\u007f\u0005\u0015\u0016bAATa\ta!\t\\8dW&twmU3oI\"I\u00111V\b\u0011\u0002\u0003\u0007\u0011QV\u0001\u000eY><7i\u001c8uKb$x\n\u001d;\u0011\u000b]\nI)a,\u0011\t\u0005=\u0014\u0011W\u0005\u0005\u0003g\u000b\tH\u0001\u0006M_\u001e\u001cuN\u001c;fqR\fQe\u0019:fCR,'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$C-\u001a4bk2$H%M\u0019\u0016\u0005\u0005e&fAAQg\u0006)3M]3bi\u0016\u0014V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\u0012\"WMZ1vYR$\u0013GM\u000b\u0003\u0003\u007fS3!!,t\u0003!\u001a\bn\\;mIN+g\u000e\u001a'bi\u0016\u001cHOU3rk\u0016\u001cHOV3sg&|gn\u001d\"z\t\u00164\u0017-\u001e7uQ\r\u0011\u0012Q\u0019\t\u0005\u0003\u000f\t9-\u0003\u0003\u0002J\u0006%!\u0001\u0002+fgR\fa\b^3ti\u001a+Go\u00195MK\u0006$WM]#q_\u000eD'+Z9vKN$\u0018J\u001a'bgR,\u0005o\\2i\t\u00164\u0017N\\3e\r>\u00148k\\7f!\u0006\u0014H/\u001b;j_:\u001c\bfA\n\u0002F\u0006)\u0012m]:feR\u0004\u0016M\u001d;ji&|gn\u0015;bi\u0016\u001cH#\u0003@\u0002T\u0006m\u0017Q]Au\u0011\u001d\t\u0019\n\u0006a\u0001\u0003+\u00042aPAl\u0013\r\tI\u000e\r\u0002\u0016\u0003\n\u001cHO]1di\u001a+Go\u00195feRC'/Z1e\u0011\u001d\ti\u000e\u0006a\u0001\u0003?\fQc\u001d5pk2$')\u001a*fC\u0012Lhi\u001c:GKR\u001c\u0007\u000eE\u00028\u0003CL1!a99\u0005\u001d\u0011un\u001c7fC:Dq!a:\u0015\u0001\u0004\ty.A\u000btQ>,H\u000e\u001a\"f)J,hnY1uS:<Gj\\4\t\u000f\u0005-H\u00031\u0001\u0002`\u0006y1\u000f[8vY\u0012\u0014U\rR3mCf,G-A\u0013tQ>,H\u000e\u001a%b]\u0012dW-\u0012=dKB$\u0018n\u001c8Ge>l'\t\\8dW&twmU3oI\"\u001aQ#!2\u0002\u0007NDw.\u001e7e\r\u0016$8\r\u001b'fC\u0012,'/\u00129pG\"|eNR5sgR4U\r^2i\u001f:d\u00170\u00134MK\u0006$WM]#q_\u000eD7J\\8x]R{'i\u001c;i\u0013\n\u0004(G\u000e\u0015\u0004-\u0005\u0015\u0017\u0001O:i_VdGMT8u\r\u0016$8\r\u001b'fC\u0012,'/\u00129pG\"|eNR5sgR4U\r^2i/&$\b\u000e\u0016:v]\u000e\fG/Z(o\r\u0016$8\r\u001b\u0015\u0004/\u0005\u0015\u0017A\t<fe&4\u0017PR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007\u000eF\u0003\u007f\u0003{\u0014Y\u0001C\u0004\u0002\u0000b\u0001\rA!\u0001\u0002\u0007%\u0014\u0007\u000f\u0005\u0003\u0003\u0004\t\u001dQB\u0001B\u0003\u0015\r\tYAM\u0005\u0005\u0005\u0013\u0011)A\u0001\u0006Ba&4VM]:j_:D\u0001B!\u0004\u0019!\u0003\u0005\r!\\\u0001\u0010KB|7\r\u001b$fi\u000eD7i\\;oi\u0006ac/\u001a:jMf4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"$C-\u001a4bk2$HEM\u0001\u001aKb\u0004Xm\u0019;NCJ\\'+\u001a9mS\u000e\fG\u000b\u001b:piRdW\rF\u0003\u007f\u0005+\u0011I\u0002C\u0004\u0003\u0018i\u0001\r!!\u0016\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4fe\"A!1\u0004\u000e\u0011\u0002\u0003\u0007Q.A\u0003uS6,7/A\u0012fqB,7\r^'be.\u0014V\r\u001d7jG\u0006$\u0006N]8ui2,G\u0005Z3gCVdG\u000f\n\u001a\u0002;MDw.\u001e7e)\"\u0014x\u000e\u001e;mK\u001a{G\u000e\\8xKJ\u0014V\r\u001d7jG\u0006D3\u0001HAc\u0003\u0001\"Xm\u001d;G_2dwn^3s\u0013N$\u0006N]8ui2,Gm\u00148M_^$\u0015n]6)\u0007u\t)-\u0001\u001btQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_>3gm]3u'B,7-\u001b4jK\u0012Le.\u00129pG\"|eMZ:fiJ+7\u000f]8og\u0016D3AHAc\u00035\u001b\bn\\;mIR\u0013XO\\2bi\u0016$vn\u00144gg\u0016$8\u000b]3dS\u001aLW\rZ%o\u000bB|7\r[(gMN,GOU3ta>t7/Z%g\r>dGn\\<fe\"\u000b7OT8N_J,W\t]8dQND3aHAc\u0003)\u001b\bn\\;mI\u001a+Go\u00195MK\u0006$WM]#q_\u000eD7+Z2p]\u0012$\u0016.\\3JM2+\u0017\rZ3s%\u0016\u0004H.[3t/&$\b.\u00129pG\"tu\u000e^&o_^tGk\u001c$pY2|w/\u001a:)\u0007\u0001\n)-A!tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3JM2+\u0017\rZ3s%\u0016\u0004H.[3t/&$\b\u000eR5wKJ<\u0017N\\4Fa>\u001c\u0007NT8u\u0017:|wO\u001c+p\r>dGn\\<fe\"\u001a\u0011%!2\u0002gMDw.\u001e7e+N,G*Z1eKJ,e\u000eZ(gMN,G/\u00134J]R,'O\u0011:pW\u0016\u0014h+\u001a:tS>t')\u001a7poJ\u0002\u0004f\u0001\u0012\u0002F\u0006\u00015\u000f[8vY\u0012$&/\u001e8dCR,Gk\\%oSRL\u0017\r\u001c$fi\u000eDwJ\u001a4tKRLe\rT3bI\u0016\u0014(+\u001a;ve:\u001cXK\u001c3fM&tW\rZ(gMN,G\u000fK\u0002$\u0003\u000b\f\u0011g\u001d5pk2$\u0007k\u001c7m\u0013:$WMZ5oSR,G._%g\u0019\u0016\fG-\u001a:SKR,(O\\:B]f,\u0005pY3qi&|g\u000eK\u0002%\u0003\u000b\f1f\u001d5pk2$Wj\u001c<f!\u0006\u0014H/\u001b;j_:\u001cx*\u001e;PMR\u0013XO\\2bi&tw\rT8h'R\fG/\u001a\u0015\u0004K\u0005\u0015\u0017\u0001O:i_VdGMR5mi\u0016\u0014\b+\u0019:uSRLwN\\:NC\u0012,G*Z1eKJ$UO]5oO2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\u0015\u0004M\u0005\u0015\u0017\u0001S:i_VdGmQ1uG\",\u0005pY3qi&|gN\u0012:p[\ncwnY6j]\u001e\u001cVM\u001c3XQ\u0016t7\u000b[;ui&tw\rR8x]J+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eQ\r9\u0013QY\u0001'g\"|W\u000f\u001c3Va\u0012\fG/\u001a*fCN\u001c\u0018n\u001a8nK:$()\u001f;fg&sW*\u001a;sS\u000e\u001c\bf\u0001\u0015\u0002F\u000615\u000f[8vY\u0012tu\u000e^+qI\u0006$XMU3bgNLwM\\7f]R\u0014\u0015\u0010^3t\u0013:lU\r\u001e:jGN<\u0006.\u001a8O_J+\u0017m]:jO:lWM\u001c;t\u0013:\u0004&o\\4sKN\u001c\bfA\u0015\u0002F\u0006\tc.Z<PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s!\u0006\u0014H/\u001b;j_:\u0014Vm];miRA!1\fBA\u0005\u000b\u00139\t\u0005\u0003\u0003^\tmd\u0002\u0002B0\u0005krAA!\u0019\u0003r9!!1\rB8\u001d\u0011\u0011)G!\u001c\u000f\t\t\u001d$1\u000e\b\u0005\u0003[\u0011I'C\u0001K\u0013\tA\u0015*\u0003\u00024\u000f&\u0011QIR\u0005\u0004\u0005g\"\u0015aB7fgN\fw-Z\u0005\u0005\u0005o\u0012I(\u0001\u0011PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fgB|gn]3ECR\f'b\u0001B:\t&!!Q\u0010B@\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTAAa\u001e\u0003z!1!1\u0011\u0016A\u0002\t\u000b!\u0001\u001e9\t\u000b1T\u0003\u0019A7\t\r\t%%\u00061\u0001i\u0003%)g\u000eZ(gMN,G\u000f\u0006\u0006\u0003\\\t5%q\u0012BP\u0005CCaAa!,\u0001\u0004\u0011\u0005b\u0002BIW\u0001\u0007!1S\u0001\u0006KJ\u0014xN\u001d\t\u0005\u0005+\u0013Y*\u0004\u0002\u0003\u0018*\u0019!\u0011\u0014#\u0002\u0011A\u0014x\u000e^8d_2LAA!(\u0003\u0018\n1QI\u001d:peNDQ\u0001\\\u0016A\u00025DaA!#,\u0001\u0004A\u0017AH1tg\u0016\u0014H\u000f\u0015:pG\u0016\u001c8\u000fU1si&$\u0018n\u001c8ECR\fw\u000b[3o)\rq(q\u0015\u0005\b\u0005Sc\u0003\u0019AAp\u00035I7OU3bgNLwM\\5oO\u0006!1\u000f^;c)\u001dq(q\u0016B]\u0005wCqA!-.\u0001\u0004\u0011\u0019,A\u0005qCJ$\u0018\u000e^5p]B\u0019QK!.\n\u0007\t]fKA\u0005QCJ$\u0018\u000e^5p]\"9!qC\u0017A\u0002\u0005U\u0003b\u0002B_[\u0001\u0007!qX\u0001\u0004Y><\u0007\u0003\u0002Ba\u0005\u000bl!Aa1\u000b\u0007\tu&'\u0003\u0003\u0003H\n\r'aC!cgR\u0014\u0018m\u0019;M_\u001e\fAd[1gW\u0006\u001cuN\u001c4jO:{GK];oG\u0006$Xm\u00148GKR\u001c\u0007.\u0006\u0002\u0002J\u0001")
public class ReplicaFetcherThreadTest {
    private final TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    public TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0() {
        return this.kafka$server$ReplicaFetcherThreadTest$$t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private InitialFetchState initialFetchState(long fetchOffset, int leaderEpoch) {
        BrokerEndPoint x$1 = new BrokerEndPoint(0, "localhost", 9092);
        return new InitialFetchState(x$1, leaderEpoch, fetchOffset);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    public ReplicaFetcherThread createReplicaFetcherThread(String name, int fetcherId, BrokerEndPoint sourceBroker, KafkaConfig brokerConfig, FailedPartitions failedPartitions, ReplicaManager replicaMgr, Metrics metrics, Time time, ReplicaQuota quota, Option<TierStateFetcher> tierStateFetcher, Option<BlockingSend> leaderEndpointBlockingSend, Option<LogContext> logContextOpt) {
        return new ReplicaFetcherThread(name, fetcherId, sourceBroker, brokerConfig, failedPartitions, replicaMgr, metrics, time, quota, leaderEndpointBlockingSend, logContextOpt, ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$13());
    }

    public Option<BlockingSend> createReplicaFetcherThread$default$11() {
        return None$.MODULE$;
    }

    public Option<LogContext> createReplicaFetcherThread$default$12() {
        return None$.MODULE$;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay((Object[])new Object[]{replicaManager});
        String x$1 = "bob";
        int x$2 = 0;
        BrokerEndPoint x$3 = this.brokerEndPoint();
        FailedPartitions x$5 = this.failedPartitions();
        None$ x$7 = None$.MODULE$;
        Metrics x$8 = new Metrics();
        SystemTime x$9 = new SystemTime();
        QuotaFactory.UnboundedQuota$ x$10 = QuotaFactory.UnboundedQuota$.MODULE$;
        None$ x$11 = None$.MODULE$;
        Option<LogContext> x$12 = this.createReplicaFetcherThread$default$12();
        ReplicaFetcherThread thread = this.createReplicaFetcherThread(x$1, x$2, x$3, config, x$5, replicaManager, x$8, (Time)x$9, (ReplicaQuota)x$10, (Option<TierStateFetcher>)x$7, (Option<BlockingSend>)x$11, x$12);
        Assertions.assertEquals((short)ApiKeys.FETCH.latestVersion(), (short)thread.fetchRequestVersion());
        Assertions.assertEquals((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (short)thread.offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals((short)ApiKeys.LIST_OFFSETS.latestVersion(), (short)thread.listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)None$.MODULE$).once();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(3);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        this.assertPartitionStates((AbstractFetcherThread)thread, false, true, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread fetcher, boolean shouldBeReadyForFetch, boolean shouldBeTruncatingLog, boolean shouldBeDelayed) {
        new .colon.colon((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), (List)new .colon.colon((Object)this.t1p1(), (List)new .colon.colon((Object)this.t2p1(), (List)Nil$.MODULE$))).foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$assertPartitionStates$1(fetcher, shouldBeReadyForFetch, shouldBeTruncatingLog, shouldBeDelayed, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        EasyMock.expect((Object)mockBlockingSend.sendRequest((AbstractRequest.Builder)EasyMock.anyObject())).andThrow((Throwable)new NullPointerException()).once();
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay((Object[])new Object[]{mockBlockingSend, replicaManager});
        String x$1 = "bob";
        int x$2 = 0;
        BrokerEndPoint x$3 = this.brokerEndPoint();
        FailedPartitions x$5 = this.failedPartitions();
        None$ x$7 = None$.MODULE$;
        Metrics x$8 = new Metrics();
        SystemTime x$9 = new SystemTime();
        Some x$11 = new Some((Object)mockBlockingSend);
        Option<LogContext> x$12 = this.createReplicaFetcherThread$default$12();
        Map result = this.createReplicaFetcherThread(x$1, x$2, x$3, config, x$5, replicaManager, x$8, (Time)x$9, null, (Option<TierStateFetcher>)x$7, (Option<BlockingSend>)x$11, x$12).fetchEpochEndOffsets((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLeaderEpoch(0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p1().partition()).setLeaderEpoch(0))})));
        Assertions.assertEquals((Object)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))), (Object)result, (String)"results from leader epoch request should have undefined offset");
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        this.verifyFetchLeaderEpochOnFirstFetch((ApiVersion)KAFKA_2_6_IV0$.MODULE$, this.verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        this.verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 0);
    }

    public void verifyFetchLeaderEpochOnFirstFetch(ApiVersion ibp, int epochFetchCount) {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), ibp.version());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, partition, log});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    public int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    public void expectMarkReplicaThrottle(ReplicaManager replicaManager, int times) {
        replicaManager.markFollowerReplicaThrottle();
        EasyMock.expect((Object)BoxedUnit.UNIT).times(times);
    }

    public int expectMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Test
    public void shouldThrottleFollowerReplica() {
        LazyRef Quota$module = new LazyRef();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.expectMarkReplicaThrottle(replicaManager, 1);
        this.stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, partition, log});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setLeaderEpoch(leaderEpoch).setEndOffset(100L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setLeaderEpoch(leaderEpoch).setEndOffset(1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), this.Quota$2(Quota$module), (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertEquals((Object)new Some((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.t1p1()}))), (Object)mockNetwork.lastFetchRequest().map((Function1 & Serializable & scala.Serializable)x$1 -> (scala.collection.mutable.Set)CollectionConverters$.MODULE$.asScalaSetConverter(x$1.fetchData().keySet()).asScala()));
    }

    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        replicaManager.markFollowerReplicaThrottle();
        EasyMock.expect((Object)BoxedUnit.UNIT).andVoid().times(4);
        this.stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createMock(ReplicationQuotaManager.class);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)true)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isThrottled((TopicPartition)EasyMock.anyObject(TopicPartition.class)))).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)true)).anyTimes();
        EasyMock.expect((Object)quotaManager.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<Some>(new Some((Object)BoxesRunTime.boxToLong((long)42L)))).times(3);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, partition, log, quotaManager});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(leaderEpoch).setEndOffset(100L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(leaderEpoch).setEndOffset(1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("audi", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quotaManager, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        Map partitionMap = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)PartitionFetchState$.MODULE$.apply(0L, (Option)new Some((Object)BoxesRunTime.boxToLong((long)0L)), leaderEpoch, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)PartitionFetchState$.MODULE$.apply(0L, (Option)None$.MODULE$, leaderEpoch, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$))}));
        thread.buildFetch(partitionMap);
        DiskUsageBasedThrottler$.MODULE$.registerListener((DiskUsageBasedThrottleListener)quotaManager);
        thread.buildFetch(partitionMap);
        DiskUsageBasedThrottler$.MODULE$.deRegisterListener((DiskUsageBasedThrottleListener)quotaManager);
        thread.buildFetch(partitionMap);
        EasyMock.verify((Object[])new Object[]{quotaManager, replicaManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 1))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t2p1(), leaderEpoch, 172L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)172)), (String)new StringBuilder(58).append("Expected ").append(this.t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochAtFollower = 5;
        int leaderEpochAtLeader = 4;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 3))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpochAtFollower))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpochAtLeader)).andReturn((Object)None$.MODULE$).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpochAtLeader, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t2p1(), leaderEpochAtLeader, 202L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)initialLEO)), (String)new StringBuilder(55).append("Expected ").append(this.t2p1()).append(" to truncate to offset ").append(initialLEO).append(" (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 4, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)0, (int)mockNetwork.fetchCount());
        java.util.Map nextOffsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 3, 101L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 3, 102L))}))).asJava();
        mockNetwork.setOffsetsForNextResponse(nextOffsets);
        thread.doWork();
        Assertions.assertEquals((int)2, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((mockNetwork.lastUsedOffsetForLeaderEpochVersion() >= 3 ? 1 : 0) != 0, (String)"OffsetsForLeaderEpochRequest version.");
        thread.doWork();
        Assertions.assertEquals((int)2, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)102)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)101)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createNiceMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        ObjectRef latestLogEpoch = ObjectRef.create((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)115L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andAnswer(() -> (Option)latestLogEpoch$1.elem).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(149L, 4))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(129L, 2))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(2)).andReturn((Object)new Some((Object)new OffsetAndEpoch(119L, 1))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread(this, config, replicaManager, quota, mockNetwork){

            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long fetchOffset, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }
        };
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(initialLEO, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(initialLEO, this.initialFetchState$default$2()))})));
        scala.collection.immutable.Set partitions = (scala.collection.immutable.Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), this.t1p1()}));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponseData.PartitionData>)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))}))));
        latestLogEpoch.elem = new Some((Object)BoxesRunTime.boxToInteger((int)4));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)140)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)141)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponseData.PartitionData>)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))}))));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)129)), (String)new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponseData.PartitionData>)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))}))));
        latestLogEpoch.elem = None$.MODULE$;
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)4, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)119)), (String)new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(thread, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), -1, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertEquals((int)0, (int)mockNetwork.lastUsedOffsetForLeaderEpochVersion(), (String)"OffsetsForLeaderEpochRequest version.");
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)155)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)143)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialFetchOffset = 100;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)initialFetchOffset)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).times(2);
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(initialFetchOffset, this.initialFetchState$default$2()))})));
        thread.doWork();
        Assertions.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int highWatermark = 100;
        int initialLeo = 300;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)highWatermark)).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLeo, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLeo)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$2 -> thread.doWork());
        Assertions.assertEquals((int)0, (int)truncated.getValues().size());
        offsetsReply.put(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 156L));
        thread.doWork();
        Assertions.assertEquals((long)156L, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        int leaderEpoch = 4;
        partition.truncateTo(0L, false);
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.state()));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.state()));
        thread.doWork();
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.state()));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.state()));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        int initialLEO = 100;
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(5)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, 5))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)quota.lastSignalledQuotaOptRef()).andReturn(new AtomicReference<None$>(None$.MODULE$)).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 52L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 5, 49L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, this.initialFetchState$default$2()))})));
        TopicPartition partitionThatBecameLeader = this.kafka$server$ReplicaFetcherThreadTest$$t1p0();
        mockNetwork.setEpochRequestCallback((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> thread.removePartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{partitionThatBecameLeader}))));
        thread.doWork();
        Assertions.assertEquals((long)49L, (long)BoxesRunTime.unboxToLong((Object)truncateToCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        mockBlockingSend.initiateClose();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalArgumentException()).once();
        mockBlockingSend.close();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalStateException()).once();
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay((Object[])new Object[]{mockBlockingSend, replicaManager});
        String x$1 = "bob";
        int x$2 = 0;
        BrokerEndPoint x$3 = this.brokerEndPoint();
        FailedPartitions x$5 = this.failedPartitions();
        None$ x$7 = None$.MODULE$;
        Metrics x$8 = new Metrics();
        SystemTime x$9 = new SystemTime();
        Some x$11 = new Some((Object)mockBlockingSend);
        Option<LogContext> x$12 = this.createReplicaFetcherThread$default$12();
        ReplicaFetcherThread thread = this.createReplicaFetcherThread(x$1, x$2, x$3, config, x$5, replicaManager, x$8, (Time)x$9, null, (Option<TierStateFetcher>)x$7, (Option<BlockingSend>)x$11, x$12);
        thread.start();
        thread.initiateShutdown();
        thread.awaitShutdown();
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        this.assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        this.assertProcessPartitionDataWhen(false);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition tp, int leaderEpoch, long endOffset) {
        return this.newOffsetForLeaderPartitionResult(tp, Errors.NONE, leaderEpoch, endOffset);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition tp, Errors error, int leaderEpoch, long endOffset) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(tp.partition()).setErrorCode(error.code()).setLeaderEpoch(leaderEpoch).setEndOffset(endOffset);
    }

    private void assertProcessPartitionDataWhen(boolean isReassigning) {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createNiceMock(BlockingSend.class);
        AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
        Partition partition = (Partition)EasyMock.createNiceMock(Partition.class);
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)partition.isReassigning())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)isReassigning));
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)partition.isAddingLocalReplica())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)isReassigning));
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.getPartitionOrException((TopicPartition)EasyMock.anyObject())).andReturn((Object)partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn((Object)brokerTopicStats).anyTimes();
        ReplicaQuota replicaQuota = (ReplicaQuota)EasyMock.createNiceMock(ReplicaQuota.class);
        EasyMock.replay((Object[])new Object[]{mockBlockingSend, replicaManager, partition, log, replicaQuota});
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), replicaQuota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockBlockingSend), this.createReplicaFetcherThread$default$12());
        MemoryRecords records = MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))});
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords((BaseRecords)records);
        thread.processPartitionData(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 0L, partitionData);
        if (isReassigning) {
            Assertions.assertEquals((long)records.sizeInBytes(), (long)((Meter)brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals((long)0L, (long)((Meter)brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals((long)records.sizeInBytes(), (long)((Meter)brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog log) {
        EasyMock.expect((Object)replicaManager.localLogOrException(this.kafka$server$ReplicaFetcherThreadTest$$t1p0())).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.kafka$server$ReplicaFetcherThreadTest$$t1p0())).andReturn((Object)partition).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p1())).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1())).andReturn((Object)partition).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t2p1())).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t2p1())).andReturn((Object)partition).anyTimes();
        EasyMock.expect((Object)partition.getLinkedLeaderEpoch()).andReturn((Object)None$.MODULE$).anyTimes();
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread fetcher$1, boolean shouldBeReadyForFetch$1, boolean shouldBeTruncatingLog$1, boolean shouldBeDelayed$1, TopicPartition tp) {
        Assertions.assertTrue((boolean)fetcher$1.fetchState(tp).isDefined());
        PartitionFetchState fetchState = (PartitionFetchState)fetcher$1.fetchState(tp).get();
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeReadyForFetch$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isReadyForFetch()), (String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeReadyForFetch$1 ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeTruncatingLog$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isTruncating()), (String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeTruncatingLog$1 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeDelayed$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isDelayed()), (String)new StringBuilder(28).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeDelayed$1 ? " NOT" : "")).append(" be delayed").toString());
    }

    private final /* synthetic */ ReplicaFetcherThreadTest$Quota$1$ Quota$lzycompute$1(LazyRef Quota$module$1) {
        synchronized (Quota$module$1) {
            ReplicaFetcherThreadTest$Quota$1$ replicaFetcherThreadTest$Quota$1$ = Quota$module$1.initialized() ? (ReplicaFetcherThreadTest$Quota$1$)Quota$module$1.value() : (ReplicaFetcherThreadTest$Quota$1$)Quota$module$1.initialize((Object)new ReplicaFetcherThreadTest$Quota$1$(this));
            return replicaFetcherThreadTest$Quota$1$;
        }
    }

    private final ReplicaFetcherThreadTest$Quota$1$ Quota$2(LazyRef Quota$module$1) {
        if (Quota$module$1.initialized()) {
            return (ReplicaFetcherThreadTest$Quota$1$)Quota$module$1.value();
        }
        return this.Quota$lzycompute$1(Quota$module$1);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int partition, FetchResponseData.EpochEndOffset divergingEpoch) {
        return new FetchResponseData.PartitionData().setPartitionIndex(partition).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(divergingEpoch);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }
}

