package kafka.cluster;

import io.confluent.kafka.replication.push.PushSession;
import io.confluent.kafka.replication.push.ReplicationState;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.Defaults$;
import kafka.server.LogDeleteRecordsResult;
import kafka.server.RequestLocal$;
import kafka.server.link.TopicLinkMirror$;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedReplicationSessionIdException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.PushReplicationStartedException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogReadInfo;
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

/* compiled from: PushReplicationPartitionTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t}f\u0001\u0002\u001c8\u0001qBQ!\u0011\u0001\u0005\u0002\tCQ\u0001\u0012\u0001\u0005B\u0015Cq\u0001\u0018\u0001C\u0002\u0013\u0005S\f\u0003\u0004g\u0001\u0001\u0006IA\u0018\u0005\bO\u0002\u0011\r\u0011\"\u0001i\u0011\u0019)\b\u0001)A\u0005S\")a\u000f\u0001C\u0001o\"1\u0011Q\u0002\u0001\u0005\u0002]Da!!\u0005\u0001\t\u00039\bBBA\u000b\u0001\u0011\u0005q\u000f\u0003\u0004\u0002\u001a\u0001!\ta\u001e\u0005\u0007\u0003;\u0001A\u0011A<\t\r\u0005\u0005\u0002\u0001\"\u0001x\u0011\u001d\t)\u0003\u0001C\u0001\u0003OAa!a\u0016\u0001\t\u00039\bBBA.\u0001\u0011\u0005q\u000f\u0003\u0004\u0002`\u0001!\ta\u001e\u0005\u0007\u0003G\u0002A\u0011A<\t\r\u0005\u001d\u0004\u0001\"\u0001x\u0011\u001d\tY\u0007\u0001C\u0001\u0003[Ba!a(\u0001\t\u00039\bBBAR\u0001\u0011\u0005q\u000f\u0003\u0004\u0002(\u0002!\ta\u001e\u0005\u0007\u0003W\u0003A\u0011A<\t\r\u0005=\u0006\u0001\"\u0001x\u0011\u0019\t\u0019\f\u0001C\u0001o\"1\u0011q\u0017\u0001\u0005\u0002]Da!a/\u0001\t\u00039\bBBA`\u0001\u0011\u0005q\u000f\u0003\u0004\u0002D\u0002!\ta\u001e\u0005\u0007\u0003\u000f\u0004A\u0011A<\t\r\u0005-\u0007\u0001\"\u0001x\u0011\u0019\ty\r\u0001C\u0001o\"1\u00111\u001b\u0001\u0005\u0002]Da!a6\u0001\t\u00039\bBBAn\u0001\u0011\u0005q\u000f\u0003\u0004\u0002`\u0002!\ta\u001e\u0005\u0007\u0003G\u0004A\u0011A<\t\r\u0005\u001d\b\u0001\"\u0001x\u0011\u0019\tY\u000f\u0001C\u0001o\"9\u0011q\u001e\u0001\u0005\n\u0005E\bB\u0002B\u0001\u0001\u0011\u0005q\u000f\u0003\u0004\u0003\u0006\u0001!\ta\u001e\u0005\u0007\u0005\u0013\u0001A\u0011A<\t\r\t5\u0001\u0001\"\u0001x\u0011\u0019\u0011\t\u0002\u0001C\u0001o\"9!Q\u0003\u0001\u0005\n\t]\u0001b\u0002B\u0015\u0001\u0011%!1\u0006\u0005\b\u0005\u000f\u0002A\u0011\u0002B%\u0011\u001d\u0011i\u0007\u0001C\u0005\u0005_BqA!\u001f\u0001\t\u0013\u0011Y\bC\u0004\u0003&\u0002!IAa*\t\u000f\tM\u0006\u0001\"\u0003\u00036\na\u0002+^:i%\u0016\u0004H.[2bi&|g\u000eU1si&$\u0018n\u001c8UKN$(B\u0001\u001d:\u0003\u001d\u0019G.^:uKJT\u0011AO\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001Q\b\u0005\u0002?\u007f5\tq'\u0003\u0002Ao\t)\u0012IY:ue\u0006\u001cG\u000fU1si&$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001D!\tq\u0004!\u0001\u000fde\u0016\fG/\u001a)vg\"\u0014V\r\u001d7jG\u0006$\u0018n\u001c8NC:\fw-\u001a:\u0016\u0003\u0019\u00032a\u0012&M\u001b\u0005A%\"A%\u0002\u000bM\u001c\u0017\r\\1\n\u0005-C%AB(qi&|g\u000e\u0005\u0002N3:\u0011aJ\u0016\b\u0003\u001fRs!\u0001U*\u000e\u0003ES!AU\u001e\u0002\rq\u0012xn\u001c;?\u0013\u0005Q\u0014BA+:\u0003\u0015)H/\u001b7t\u0013\t9\u0006,A\u0005UKN$X\u000b^5mg*\u0011Q+O\u0005\u00035n\u0013q\"T8dWB+8\u000f['b]\u0006<WM\u001d\u0006\u0003/b\u000bQ\"\\3uC\u0012\fG/Y\"bG\",W#\u00010\u0011\u0005}#W\"\u00011\u000b\u0005\u0005\u0014\u0017\u0001C7fi\u0006$\u0017\r^1\u000b\u0005\rL\u0014AB:feZ,'/\u0003\u0002fA\n\u00112JU1gi6+G/\u00193bi\u0006\u001c\u0015m\u00195f\u00039iW\r^1eCR\f7)Y2iK\u0002\n\u0001\u0003^8qS\u000eLE\rU1si&$\u0018n\u001c8\u0016\u0003%\u0004\"A[:\u000e\u0003-T!\u0001\\7\u0002\r\r|W.\\8o\u0015\t\u0019gN\u0003\u0002;_*\u0011\u0001/]\u0001\u0007CB\f7\r[3\u000b\u0003I\f1a\u001c:h\u0013\t!8N\u0001\tU_BL7-\u00133QCJ$\u0018\u000e^5p]\u0006\tBo\u001c9jG&#\u0007+\u0019:uSRLwN\u001c\u0011\u0002mQ,7\u000f^'bW\u0016dU-\u00193feJ+7/\u001a;t%\u0016\u0004H.[2bi&|gnU3tg&|gn\u00148OK^dU-\u00193fe\u0016\u0003xn\u00195\u0015\u0003a\u0004\"aR=\n\u0005iD%\u0001B+oSRD#a\u0002?\u0011\u0007u\fI!D\u0001\u007f\u0015\ry\u0018\u0011A\u0001\u0004CBL'\u0002BA\u0002\u0003\u000b\tqA[;qSR,'OC\u0002\u0002\bE\fQA[;oSRL1!a\u0003\u007f\u0005\u0011!Vm\u001d;\u0002cQ,7\u000f^'bW\u0016dU-\u00193fe\u0016sGm\u001d)vg\"\u001cVm]:j_:\u001cx\u000b[3o\r\u0016t7-\u001b8h%\u0016\u0004H.[2bg\"\u0012\u0001\u0002`\u00012i\u0016\u001cH/T1lK2+\u0017\rZ3s\u000b:$7\u000fU;tQN+7o]5p]^CWM\u001c*fa2L7-Y:TQV$Hi\\<oQ\tIA0\u0001\u001buKN$X*Y6f\u0019\u0016\fG-\u001a:F]\u0012\u001c\b+^:i'\u0016\u001c8/[8o/\",g\u000e\u0015:p[>$\u0018N\\4MS:\\G*Z1eKJD#A\u0003?\u0002UQ,7\u000f^!mi\u0016\u0014\b+\u0019:uSRLwN\\%teNC'/\u001b8l\u000b:$7\u000fU;tQN+7o]5p]\"\u00121\u0002`\u0001$i\u0016\u001cH\u000fR3mKR,\u0007+\u0019:uSRLwN\\#oIN\u0004Vo\u001d5TKN\u001c\u0018n\u001c8tQ\taA0\u0001\u0015uKN$X*\u0019:l!\u0006\u0014H/\u001b;j_:|eM\u001a7j]\u0016,e\u000eZ:QkND7+Z:tS>t7\u000f\u000b\u0002\u000ey\u0006IC/Z:u\u0007>t7-\u001e:sK:$X*Y6f\u0019\u0016\fG-\u001a:XSRD7)Y;hQR,\u0006OR3uG\"$2\u0001_A\u0015\u0011\u001d\tYC\u0004a\u0001\u0003[\t!\"\\1lK2+\u0017\rZ3s!\r9\u0015qF\u0005\u0004\u0003cA%a\u0002\"p_2,\u0017M\u001c\u0015\u0004\u001d\u0005U\u0002\u0003BA\u001c\u0003{i!!!\u000f\u000b\t\u0005m\u0012\u0011A\u0001\u0007a\u0006\u0014\u0018-\\:\n\t\u0005}\u0012\u0011\b\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\bf\u0002\b\u0002D\u0005=\u0013\u0011\u000b\t\u0005\u0003\u000b\nY%\u0004\u0002\u0002H)!\u0011\u0011JA\u001d\u0003!\u0001(o\u001c<jI\u0016\u0014\u0018\u0002BA'\u0003\u000f\u00121BV1mk\u0016\u001cv.\u001e:dK\u0006A!m\\8mK\u0006t7\u000f\f\u0003\u0002T\u0005U\u0013$\u0001\u0001\u001a\u0003\u0005\ta\u0005^3ti6\u000b7.\u001a$pY2|w/\u001a:F]\u0012\u001c(+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8oQ\tyA0\u0001\u001auKN$8i\u001c8dkJ\u0014XM\u001c;DCV<\u0007\u000e^+q\r\u0016$8\r[!oI:+wOU3qY&\u001c\u0017-\u00129pG\"4U\r^2iQ\t\u0001B0\u0001\u001buKN$8i\u001c8dkJ\u0014XM\u001c;DCV<\u0007\u000e^+q\r\u0016$8\r[!oI:+wOU3qY&\u001c\u0017mU3tg&|gNR3uG\"D#!\u0005?\u0002WQ,7\u000f\u001e$fi\u000eDGK]1og&$\u0018n\u001c8t)>\u0004Vo\u001d5XQ\u0016tg)\u001e7ms\u000e\u000bWo\u001a5u+BD#A\u0005?\u0002aQ,7\u000f\u001e$fi\u000eDGK]1og&$\u0018n\u001c8t)>\u0004Vo\u001d5P]2Lx+\u001b;i\t\u00164\u0017N\\3e)>\u0004\u0018nY%eQ\t\u0019B0\u0001\u0018uKN$h)\u001a;dQ\u0012{Wm\u001d(piR\u0013\u0018M\\:ji&|g.\u00138uKJt\u0017\r\u001c+pa&\u001c7\u000fV8QkNDGc\u0001=\u0002p!9\u0011\u0011\u000f\u000bA\u0002\u0005M\u0014!\u0002;pa&\u001c\u0007\u0003BA;\u0003{rA!a\u001e\u0002zA\u0011\u0001\u000bS\u0005\u0004\u0003wB\u0015A\u0002)sK\u0012,g-\u0003\u0003\u0002��\u0005\u0005%AB*ue&twMC\u0002\u0002|!C3\u0001FA\u001bQ\u001d!\u00121IAD\u0003\u0013\u000bqa\u001d;sS:<7\u000f\f\u0006\u0002\f\u0006=\u00151SAL\u00037\u000b#!!$\u0002%}{6m\u001c8tk6,'oX8gMN,Go]\u0011\u0003\u0003#\u000b1cX0ue\u0006t7/Y2uS>twl\u001d;bi\u0016\f#!!&\u0002+}\u001bwN\u001c4mk\u0016tG/\f;jKJl3\u000f^1uK\u0006\u0012\u0011\u0011T\u0001\u0019?\u000e|gN\u001a7vK:$X\u0006\\5oW6jW\r^1eCR\f\u0017EAAO\u0003Ey6m\u001c8gYV,g\u000e^\u0017rk>$\u0018m]\u0001-i\u0016\u001cHOR3uG\"$&/\u00198tSRLwN\\:U_B+8\u000f[,iK:LenQ8n[&$H/\u001a3JgJD#!\u0006?\u0002iQ,7\u000f\u001e$fi\u000eDw+\u001b;i\u0013:4\u0018\r\\5e'\u0016\u001c8/[8o\u0013\u0012$u.Z:O_R$&/\u00198tSRLwN\u001c+p!V\u001c\b\u000e\u000b\u0002\u0017y\u0006iD/Z:u\r\u0016$8\r[,ji\"4\u0015N\\1m%\u0016\u0004H.[2bi&|gnU3tg&|g.\u00133E_\u0016\u001chj\u001c;Ue\u0006t7/\u001b;j_:$v\u000eU;tQ\"\u0012q\u0003`\u00012i\u0016\u001cHOR3uG\"<\u0016\u000e\u001e5pkR\u0014%o\\6fe:{G-\u001a#pKNtu\u000e\u001e+sC:\u001c\u0018\u000e^5p]R{\u0007+^:iQ\tAB0\u0001,uKN$h)\u001a;dQ^KG\u000f\u001b$j]\u0006d'+\u001a9mS\u000e\fG/[8o'\u0016\u001c8/[8o\u0013\u0012$&/\u00198tSRLwN\\:U_B+H\u000e\\!oI\u000e\u000bgNT3wKJ$&/\u00198tSRLwN\u001c\"bG.$v\u000eU;tQ\"\u0012\u0011\u0004`\u00013i\u0016\u001cHOR3uG\"<\u0016\u000e\u001e5ESZ,'oZ5oO\u0016\u0003xn\u00195E_\u0016\u001chj\u001c;Ue\u0006t7/\u001b;j_:$v\u000eU;mY\"\u0012!\u0004`\u0001.i\u0016\u001cHOR3uG\"<\u0016\u000e\u001e5OK^\u0014V\r\u001d7jG\u0006,\u0005o\\2i)J\fgn]5uS>t7\u000fV8Qk2d\u0007FA\u000e}\u0003]\"Xm\u001d;GKR\u001c\u0007nV5uQ:+wOU3qY&\u001c\u0017-\u00129pG\",\u0006\u000fZ1uKN\u0014V\r\u001d7jG\u0006$\u0018n\u001c8TKN\u001c\u0018n\u001c8JI\"\u0012A\u0004`\u00014i\u0016\u001cHOR3uG\"<\u0016\u000e\u001e5OK^\u0014V\r\u001d7jG\u0006$\u0018n\u001c8TKN\u001c\u0018n\u001c8Ue\u0006t7/\u001b;j_:\u001cHk\u001c)vY2D#!\b?\u0002\u000bR,7\u000f\u001e$pY2|w/\u001a:GKR\u001c\u0007.\u00114uKJ$&/\u00198tSRLwN\u001c+ie><8\u000fU;tQJ+\u0007\u000f\\5dCRLwN\\*uCJ$X\rZ#yG\u0016\u0004H/[8oQ\tqB0\u0001&uKN$hi\u001c7m_^,'OR3uG\"<\u0016\u000e\u001e5Ue\u0006t7/\u001b;j_:,\u0006\u000fZ1uKN4U\r^2i'R\fG/Z!oI&s7M]3nK:$8\u000fS5hQ^\u000bG/\u001a:nCJ\\\u0007FA\u0010}\u00039\"Xm\u001d;Va\u0012\fG/\u001a$pY2|w/\u001a:GKR\u001c\u0007n\u0015;bi\u00164UM\\2fgN#\u0018\r\\3GKR\u001c\u0007.Z:)\u0005\u0001b\u0018!\n;fgR|e\u000eU;tQN+7o]5p]\u0016sG-\u001a3G_J\u001cF/\u00197f'\u0016\u001c8/[8oQ\t\tC0A\u0018uKN$xJ\u001c)vg\"\u001cVm]:j_:,e\u000eZ3e+B$\u0017\r^3t%\u0016\u0004H.[2bi&|gnU3tg&|g\u000e\u000b\u0002#y\u0006QC/Z:u\u001f:\u0004Vo\u001d5TKN\u001c\u0018n\u001c8F]\u0012,Gm\u00165f]B\u000b'\u000f^5uS>tG)\u001a7fi\u0016$\u0007FA\u0012}\u0003=\"Xm\u001d;P]\u0006\u0003\b/\u001a8e%\u0016\u001cwN\u001d3t%\u0016\u001c\bo\u001c8tKV\u0003H-\u0019;fg\"Kw\r[,bi\u0016\u0014X.\u0019:lQ\t!C0\u0001\u0018uKN$xJ\\!qa\u0016tGMU3d_J$7OU3ta>t7/Z+qI\u0006$Xm\u001d'po^\u000bG/\u001a:nCJ\\\u0007FA\u0013}\u00035\"Xm\u001d;P]\u0006\u0003\b/\u001a8e%\u0016\u001cwN\u001d3t%\u0016\u001c\bo\u001c8tK\u000e{W\u000e\u001d7fi\u0016\u001c\b+\u001e:hCR|'/\u001f\u0015\u0003Mq\fq\u0006^3ti>s\u0017\t\u001d9f]\u0012\u0014VmY8sIN\u0014Vm\u001d9p]N,w+\u001b;i'R\fG.\u001a)vg\"\u001cVm]:j_:D#a\n?\u0002_Q,7\u000f^(o\u0003B\u0004XM\u001c3SK\u000e|'\u000fZ:SKN\u0004xN\\:f/&$\b\u000eR3mKR,G\rU1si&$\u0018n\u001c8)\u0005!b\u0018aD:fK\u0012dunZ,ji\"$\u0015\r^1\u0015\u0007a\f\u0019\u0010C\u0004\u0002v&\u0002\r!a>\u0002\u00071|w\r\u0005\u0003\u0002z\u0006uXBAA~\u0015\r\t)0O\u0005\u0005\u0003\u007f\fYPA\u0006BEN$(/Y2u\u0019><\u0017a\u0010;fgRdU-\u00193fe2{w-\u00119qK:$G*[:uK:,'OT8uS\u001aLWm\u001d)vg\"\u0014V\r\u001d7jG\u0006\u001chi\u001c:MK\u0006$WM]!qa\u0016tGm\u001d\u0015\u0003Uq\fq\t^3ti2+\u0017\rZ3s\u001f\u001a47/\u001a;t\u0019&\u001cH/\u001a8fe:{G/\u001b4jKN\u0004Vo\u001d5SKBd\u0017nY1t\r>\u0014Hj\\4Ti\u0006\u0014Ho\u00144gg\u0016$\u0018J\\2sK6,g\u000e\u001e\u0015\u0003Wq\fa\t^3ti2+\u0017\rZ3s\u001f\u001a47/\u001a;t\u0019&\u001cH/\u001a8fe:{G/\u001b4jKN\u0004Vo\u001d5SKBd\u0017nY1t\r>\u0014\b*[4i/\u0006$XM]7be.Len\u0019:f[\u0016tG\u000f\u000b\u0002-y\u0006\u0001D/Z:u\u0019\u0016\fG-\u001a:M_\u001e\f\u0005\u000f]3oI2K7\u000f^3oKJLuM\\8sKN4u\u000e\u001c7po\u0016\u0014XI^3oiND#!\f?\u0002wQ,7\u000f\u001e'fC\u0012,'\u000fT8h\u0003B\u0004XM\u001c3MSN$XM\\3s\u0013:\u001cG.\u001e3fgR\u0013\u0018M\\:bGRLwN\\'be.,'/\u00119qK:$7\u000f\u000b\u0002/y\u0006Y\u0011\r\u001d9f]\u0012$v\u000eT8h)\u001dA(\u0011\u0004B\u000e\u0005KAq!!>0\u0001\u0004\t9\u0010C\u0004\u0003\u001e=\u0002\rAa\b\u0002\u00171,\u0017\rZ3s\u000bB|7\r\u001b\t\u0004\u000f\n\u0005\u0012b\u0001B\u0012\u0011\n\u0019\u0011J\u001c;\t\u000f\t\u001dr\u00061\u0001\u0003 \u0005)1m\\;oi\u0006\t\u0012\r\u001d9f]\u0012$v\u000eU1si&$\u0018n\u001c8\u0015\r\t5\"1\bB#!\u0011\u0011yCa\u000e\u000e\u0005\tE\"\u0002\u0002B\u001a\u0005k\taA]3d_J$'B\u00017o\u0013\u0011\u0011ID!\r\u0003\u001b5+Wn\u001c:z%\u0016\u001cwN\u001d3t\u0011\u001d\u0011i\u0004\ra\u0001\u0005\u007f\t\u0011\u0002]1si&$\u0018n\u001c8\u0011\u0007y\u0012\t%C\u0002\u0003D]\u0012\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\t\u001d\u0002\u00071\u0001\u0003 \u00051b-\u001a;dQ\u0006\u001b8)Y;hQR,\u0006OU3qY&\u001c\u0017\r\u0006\u0005\u0003L\tu#q\fB5!\u0011\u0011iE!\u0017\u000e\u0005\t=#\u0002BA{\u0005#RAAa\u0015\u0003V\u0005I\u0011N\u001c;fe:\fGn\u001d\u0006\u0004\u0005/r\u0017aB:u_J\fw-Z\u0005\u0005\u00057\u0012yEA\u0006M_\u001e\u0014V-\u00193J]\u001a|\u0007b\u0002B\u001fc\u0001\u0007!q\b\u0005\b\u0005C\n\u0004\u0019\u0001B2\u00031\u0011X\r\u001d7jG\u0006,\u0005o\\2i!\r9%QM\u0005\u0004\u0005OB%\u0001\u0002'p]\u001eDqAa\u001b2\u0001\u0004\u0011\u0019'\u0001\u000bsKBd\u0017nY1uS>t7+Z:tS>t\u0017\nZ\u0001\u001bCN\u001cXM\u001d;SKBd\u0017nY1uS>t7+Z:tS>t\u0017\n\u001a\u000b\u0006q\nE$Q\u000f\u0005\b\u0005g\u0012\u0004\u0019\u0001B\u0010\u0003%\u0011X\r\u001d7jG\u0006LE\rC\u0004\u0003lI\u0002\rAa\u001e\u0011\t\u001dS%1M\u0001\u001dCN\u001cXM\u001d;SKBd\u0017nY1uS>t7+Z:tS>tWj\u001c3f)\u0015A(Q\u0010B@\u0011\u001d\u0011\u0019h\ra\u0001\u0005?AqA!!4\u0001\u0004\u0011\u0019)A\bsKBd\u0017nY1uS>tWj\u001c3f!\u0011\u0011)Ia(\u000f\t\t\u001d%1T\u0007\u0003\u0005\u0013SAAa#\u0003\u000e\u0006!\u0001/^:i\u0015\u0011\u0011yI!%\u0002\u0017I,\u0007\u000f\\5dCRLwN\u001c\u0006\u0004u\tM%\u0002\u0002BK\u0005/\u000b\u0011bY8oM2,XM\u001c;\u000b\u0005\te\u0015AA5p\u0013\u0011\u0011iJ!#\u0002!I+\u0007\u000f\\5dCRLwN\\*uCR,\u0017\u0002\u0002BQ\u0005G\u0013A!T8eK*!!Q\u0014BE\u0003]\t7\r^5wKB+8\u000f[*fgNLwN\\(s\r\u0006LG\u000e\u0006\u0004\u0003*\n=&\u0011\u0017\t\u0005\u0005\u000f\u0013Y+\u0003\u0003\u0003.\n%%a\u0003)vg\"\u001cVm]:j_:DqAa\u001d5\u0001\u0004\u0011y\u0002C\u0004\u0003lQ\u0002\rAa\u0019\u0002-Y,'/\u001b4z!V\u001c\bnU3tg&|g.\u00128eK\u0012$r\u0001\u001fB\\\u0005s\u0013Y\fC\u0004\u0003tU\u0002\rAa\b\t\u000f\t-T\u00071\u0001\u0003d!9!QX\u001bA\u0002\u00055\u0012\u0001F:i_VdGmU3oI\u0016sGmU3tg&|g\u000e")
/* loaded from: input_file:kafka/cluster/PushReplicationPartitionTest.class */
public class PushReplicationPartitionTest extends AbstractPartitionTest {
    private final KRaftMetadataCache metadataCache = (KRaftMetadataCache) Mockito.mock(KRaftMetadataCache.class);
    private final TopicIdPartition topicIdPartition = new TopicIdPartition(topicId(), topicPartition().partition());

    @Override // kafka.cluster.AbstractPartitionTest
    public Option<TestUtils.MockPushManager> createPushReplicationManager() {
        return new Some(new TestUtils.MockPushManager());
    }

    @Override // kafka.cluster.AbstractPartitionTest
    /* renamed from: metadataCache, reason: merged with bridge method [inline-methods] */
    public KRaftMetadataCache mo59metadataCache() {
        return this.metadataCache;
    }

    public TopicIdPartition topicIdPartition() {
        return this.topicIdPartition;
    }

    @Test
    public void testMakeLeaderResetsReplicationSessionOnNewLeaderEpoch() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Assertions.assertTrue(partition().getReplica(remoteReplicaId()).isDefined());
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(2L)));
        assertReplicationSessionId(remoteReplicaId, new Some(BoxesRunTime.boxToLong(-1L)));
        Partition partition2 = partition();
        Assertions.assertFalse(partition2.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(3).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition2.makeLeader$default$4()), "Expected become leader transition not to change leader");
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(-1L)));
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        assertReplicationSessionId(remoteReplicaId, new Some(BoxesRunTime.boxToLong(-1L)));
    }

    @Test
    public void testMakeLeaderEndsPushSessionsWhenFencingReplicas() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo59metadataCache().isBrokerFenced(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
    }

    @Test
    public void testMakeLeaderEndsPushSessionWhenReplicasShutDown() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(BoxesRunTime.boxToBoolean(mo59metadataCache().isBrokerFenced(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToBoolean(mo59metadataCache().isBrokerShuttingDown(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @Test
    public void testMakeLeaderEndsPushSessionWhenPromotingLinkLeader() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        int remoteReplicaId = remoteReplicaId() + 1;
        Assertions.assertNotEquals(brokerId(), remoteReplicaId);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setClusterLinkId(clusterLinkId().toString()).setClusterLinkTopicState(TopicLinkMirror$.MODULE$.name()).setLinkedLeaderEpoch(-1).setIsNew(false), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        fetchAsCaughtUpReplica(partition(), 1L, 3L);
    }

    @Test
    public void testAlterPartitionIsrShrinkEndsPushSession() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        Partition partition = partition();
        MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, 5, false, new SimpleRecord[]{new SimpleRecord(Long.toString(time().milliseconds()).getBytes())});
        AppendOrigin appendOrigin = AppendOrigin.CLIENT;
        partition.appendRecordsToLeader(withRecords, AppendOrigin.CLIENT, 0, RequestLocal$.MODULE$.withThreadConfinedCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        time().sleep(partition().replicaLagTimeMaxMs() + 1);
        partition().maybeShrinkIsr();
        alterPartitionManager().completeIsrUpdate(2);
        verifyPushSessionEnded(remoteReplicaId(), 2L, true);
    }

    @Test
    public void testDeletePartitionEndsPushSessions() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        partition().delete();
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @Test
    public void testMarkPartitionOfflineEndsPushSessions() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        activePushSessionOrFail(remoteReplicaId(), 2L);
        partition().markOffline();
        verifyPushSessionEnded(remoteReplicaId(), 2L, false);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testConcurrentMakeLeaderWithCaughtUpFetch(boolean z) {
        int i = 5;
        setupPartitionWithMocks(5, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 0L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
        Future submit2 = executor().submit(() -> {
            return BoxesRunTime.boxToBoolean($anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$2(this, semaphore, z, i));
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for threads to prepare.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        semaphore.release(2);
        Assertions.assertNotEquals(BoxesRunTime.boxToBoolean(z), submit2.get(15L, TimeUnit.SECONDS), "Unexpected leadership change");
        Throwable cause = Assertions.assertThrows(Exception.class, () -> {
            submit.get(15L, TimeUnit.SECONDS);
        }).getCause();
        Assertions.assertTrue((cause instanceof FencedLeaderEpochException) || (cause instanceof PushReplicationStartedException));
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        if (!(cause instanceof FencedLeaderEpochException)) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, true);
        }
        assertReplicationSessionId(remoteReplicaId(), z ? new Some(BoxesRunTime.boxToLong(-1L)) : None$.MODULE$);
    }

    @Test
    public void testMakeFollowerEndsReplicationSession() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        activePushSessionOrFail(remoteReplicaId(), 0L);
        Partition partition = partition();
        Assertions.assertTrue(partition.makeFollower(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeFollower$default$4()));
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v43 */
    /* JADX WARN: Type inference failed for: r0v60, types: [java.lang.Object] */
    @Test
    public void testConcurrentCaughtUpFetchAndNewReplicaEpochFetch() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        int i = 10;
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L))).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(2L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(10, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 2L);
        });
        Future submit2 = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 12L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(2L)), 0L);
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for threads to prepare.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        semaphore.release(2);
        ExecutionException executionException = 0;
        boolean z = false;
        try {
            executionException = submit.get(15L, TimeUnit.SECONDS);
        } catch (ExecutionException unused) {
            if (executionException.getCause() instanceof PushReplicationStartedException) {
                z = true;
            }
        }
        submit2.get(15L, TimeUnit.SECONDS);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        Assertions.assertTrue(((TestUtils.MockPushManager) pushReplicationManager().get()).pushReplicationSessions().isEmpty());
        if (z) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v42 */
    /* JADX WARN: Type inference failed for: r0v59, types: [java.lang.Object] */
    @Test
    public void testConcurrentCaughtUpFetchAndNewReplicaSessionFetch() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        int i = 10;
        setupPartitionWithMocks(10, true);
        Semaphore semaphore = new Semaphore(0);
        Future submit = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 2L);
        });
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Future submit2 = executor().submit(() -> {
            semaphore.acquire();
            return this.fetchFollower(this.partition(), this.remoteReplicaId(), 12L, this.fetchFollower$default$4(), this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 3L);
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$3(semaphore)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for threads to prepare.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        semaphore.release(2);
        ExecutionException executionException = 0;
        boolean z = false;
        try {
            executionException = submit.get(15L, TimeUnit.SECONDS);
        } catch (ExecutionException unused) {
            if (executionException.getCause() instanceof PushReplicationStartedException) {
                z = true;
            }
        }
        submit2.get(15L, TimeUnit.SECONDS);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(3L)));
        Assertions.assertTrue(((TestUtils.MockPushManager) pushReplicationManager().get()).pushReplicationSessions().isEmpty());
        if (z) {
            verifyPushSessionEnded(remoteReplicaId(), 2L, false);
        }
    }

    @Test
    public void testFetchTransitionsToPushWhenFullyCaughtUp() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        int i = 10;
        long j = 0;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        Assertions.assertEquals(10, partition().getLeaderEpoch());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertEquals(17L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(20L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        appendToPartition(partition(), 3);
        Assertions.assertEquals(20L, orCreateLog.highWatermark());
        Assertions.assertEquals(23L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 20L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        alterPartitionManager().completeIsrUpdate(2);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), partition().inSyncReplicaIds());
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
        appendToPartition(partition(), 3);
        Assertions.assertEquals(26L, orCreateLog.logEndOffset());
        fetchFollower(partition(), remoteReplicaId(), 23L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager3 -> {
            return mockPushManager3.pushReplicationSessions();
        }).get()).isEmpty());
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), 26L, j, this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager4 -> {
            return mockPushManager4.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchTransitionsToPushOnlyWithDefinedTopicId() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(brokerId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), None$.MODULE$, partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
        Partition partition2 = partition();
        Assertions.assertFalse(partition2.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition2.makeLeader$default$4()), "Partition leader ID should not change");
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    @ValueSource(strings = {"__consumer_offsets", "__transaction_state", "_confluent-tier-state", "_confluent-link-metadata", "_confluent-quotas"})
    @ParameterizedTest
    public void testFetchDoesNotTransitionInternalTopicsToPush(String str) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        configRepository().setTopicConfig("_confluent-tier-state", "retention.ms", Integer.toString(-1));
        long ReplicaLagTimeMaxMs = Defaults$.MODULE$.ReplicaLagTimeMaxMs();
        MetadataVersion interBrokerProtocolVersion = interBrokerProtocolVersion();
        int brokerId = brokerId();
        JFunction0.mcJ.sp spVar = () -> {
            return this.defaultBrokerEpoch(this.brokerId());
        };
        MockTime time = time();
        TestUtils.MockAlterPartitionListener alterPartitionListener = alterPartitionListener();
        DelayedOperations delayedOperations = delayedOperations();
        KRaftMetadataCache mo59metadataCache = mo59metadataCache();
        LogManager logManager = logManager();
        Some some = new Some(tierReplicaManager());
        None$ none$ = None$.MODULE$;
        TestUtils.MockAlterPartitionManager alterPartitionManager = alterPartitionManager();
        None$ none$2 = None$.MODULE$;
        Option<TestUtils.MockPushManager> pushReplicationManager = pushReplicationManager();
        ListenerName interBrokerListenerName = interBrokerListenerName();
        Partition$ partition$ = Partition$.MODULE$;
        None$ none$3 = None$.MODULE$;
        Partition$ partition$2 = Partition$.MODULE$;
        Partition$ partition$3 = Partition$.MODULE$;
        None$ none$4 = None$.MODULE$;
        Partition$ partition$4 = Partition$.MODULE$;
        partition_$eq(new Partition(topicPartition, ReplicaLagTimeMaxMs, interBrokerProtocolVersion, interBrokerListenerName, brokerId, spVar, time, alterPartitionListener, delayedOperations, mo59metadataCache, logManager, some, none$, none$3, alterPartitionManager, none$2, false, none$4, pushReplicationManager, None$.MODULE$));
        Mockito.when(offsetCheckpoints().fetch(ArgumentMatchers.anyString(), (TopicPartition) ArgumentMatchers.eq(topicPartition))).thenReturn(None$.MODULE$);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
    }

    @Test
    public void testFetchTransitionsToPushWhenInCommittedIsr() {
        LogManager logManager = logManager();
        seedLogWithData(logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5()));
        int i = 10;
        long j = 0;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), Nil$.MODULE$)).asJava()).setPartitionEpoch(1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        Assertions.assertEquals(10, partition().getLeaderEpoch());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId()})), partition().inSyncReplicaIds());
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        alterPartitionManager().assertInFlightLeaderAndIsr((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), 10, 1);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        fetchFollower(partition(), remoteReplicaId(), 17L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
        alterPartitionManager().completeIsrUpdate(2);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{brokerId(), remoteReplicaId()})), partition().inSyncReplicaIds());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchFollower(this.partition(), this.remoteReplicaId(), 17L, j, this.fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(i)), this.fetchFollower$default$7(), this.fetchFollower$default$8(), this.fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        });
    }

    @Test
    public void testFetchWithInvalidSessionIdDoesNotTransitionToPush() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchAsCaughtUpReplica(partition(), 1L, -1L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithFinalReplicationSessionIdDoesNotTransitionToPush() {
        long j = 1;
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        setupPartitionWithMocks(5, true);
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(Long.MAX_VALUE)));
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, 0L);
        });
    }

    @Test
    public void testFetchWithoutBrokerNodeDoesNotTransitionToPush() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(Option$.MODULE$.empty());
        fetchAsCaughtUpReplica(partition(), 1L, 0L);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithFinalReplicationSessionIdTransitionsToPullAndCanNeverTransitionBackToPush() {
        long j = 1;
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), j, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(Long.MAX_VALUE)));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyPushSessionEnded(new TestUtils.TopicIdPartitionReplica(topicIdPartition(), remoteReplicaId()), 0L, false);
        fetchAsCaughtUpReplica(partition(), 1L, Long.MAX_VALUE);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithDivergingEpochDoesNotTransitionToPull() {
        LogManager logManager = logManager();
        seedLogWithData(logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5()));
        int i = 10;
        long j = 0;
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Partition partition = setupPartitionWithMocks(10, true);
        read$1(2, 5L, true, partition, 0L, 10);
        read$1(0, 4L, true, partition, 0L, 10);
        read$1(6, 6L, true, partition, 0L, 10);
        read$1(8, 17L, true, partition, 0L, 10);
        read$1(10, 18L, true, partition, 0L, 10);
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.read$1(9, 17L, false, partition, j, i);
        });
        Assertions.assertFalse(((IterableOnceOps) pushReplicationManager().map(mockPushManager2 -> {
            return mockPushManager2.pushReplicationSessions();
        }).get()).isEmpty());
    }

    @Test
    public void testFetchWithNewReplicaEpochTransitionsToPull() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L))).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(5L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        fetchAsCaughtUpReplica(partition(), 5L, 0L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        Assertions.assertThrows(NotLeaderOrFollowerException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
    }

    @Test
    public void testFetchWithNewReplicaEpochUpdatesReplicationSessionId() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 2L, 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 2L, 1L);
        });
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        fetchAsCaughtUpReplica(partition(), 3L, 0L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
    }

    @Test
    public void testFetchWithNewReplicationSessionTransitionsToPull() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertTrue(((IterableOnceOps) pushReplicationManager().map(mockPushManager -> {
            return mockPushManager.pushReplicationSessions();
        }).get()).isEmpty());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        activePushSessionOrFail(remoteReplicaId(), 1L);
    }

    @Test
    public void testFollowerFetchAfterTransitionThrowsPushReplicationStartedException() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        activePushSessionOrFail(remoteReplicaId(), 0L);
    }

    @Test
    public void testFollowerFetchWithTransitionUpdatesFetchStateAndIncrementsHighWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Partition partition = setupPartitionWithMocks(10, true);
        Assertions.assertEquals(0L, orCreateLog.highWatermark());
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        fetchFollower(partition, remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 0L);
        Assertions.assertEquals(5L, orCreateLog.highWatermark());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(partition, 1L, 0L);
        });
        Assertions.assertEquals(orCreateLog.logEndOffset(), orCreateLog.highWatermark());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderHwmEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), orCreateLog.highWatermark());
    }

    @Test
    public void testUpdateFollowerFetchStateFencesStaleFetches() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Partition partition = setupPartitionWithMocks(10, true);
        long j = 3;
        long j2 = 4;
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(3L)));
        fetchFollower(partition, remoteReplicaId(), 5L, 0L, fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(3L)), 4L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(4L)));
        Assertions.assertThrows(FencedLeaderEpochException.class, () -> {
            this.updateFetchState$1(this.remoteReplicaId(), Optional.of(Predef$.MODULE$.int2Integer(9)), j, j2, partition, orCreateLog);
        });
        Assertions.assertThrows(FencedReplicationSessionIdException.class, () -> {
            this.updateFetchState$1(this.remoteReplicaId(), Optional.of(Predef$.MODULE$.int2Integer(10)), j, j2 - 1, partition, orCreateLog);
        });
    }

    @Test
    public void testOnPushSessionEndedForStaleSession() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        fetchAsCaughtUpReplica(partition(), 1L, 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        activePushSessionOrFail.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 1L);
        });
        PushSession activePushSessionOrFail2 = activePushSessionOrFail(remoteReplicaId(), 1L);
        fetchAsCaughtUpReplica(partition(), 1L, 2L);
        verifyPushSessionEnded(remoteReplicaId(), 1L, false);
        activePushSessionOrFail2.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(2L)));
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 2L);
        });
        PushSession activePushSessionOrFail3 = activePushSessionOrFail(remoteReplicaId(), 2L);
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader not to change leadership");
        activePushSessionOrFail3.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(-1L)));
    }

    @Test
    public void testOnPushSessionEndedUpdatesReplicationSession() {
        setupPartitionWithMocks(5, true);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        activePushSessionOrFail.onPushSessionEnded();
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(1L)));
        assertReplicationSessionMode(remoteReplicaId(), ReplicationState.Mode.PULL);
    }

    @Test
    public void testOnPushSessionEndedWhenPartitionDeleted() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        assertReplicationSessionId(remoteReplicaId(), new Some(BoxesRunTime.boxToLong(0L)));
        partition().delete();
        activePushSessionOrFail.onPushSessionEnded();
    }

    @Test
    public void testOnAppendRecordsResponseUpdatesHighWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        activePushSessionOrFail.onAppendRecordsResponse(19L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        Assertions.assertEquals(0L, partition().lowWatermarkIfLeader());
        time().sleep(partition().replicaLagTimeMaxMs() + 1);
        partition().maybeShrinkIsr();
        alterPartitionManager().completeIsrUpdate(2);
        activePushSessionOrFail.onAppendRecordsResponse(orCreateLog.logEndOffset(), 3L);
        Assertions.assertTrue(alterPartitionManager().isrUpdates().isEmpty());
    }

    @Test
    public void testOnAppendRecordsResponseUpdatesLowWatermark() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Mockito.when(BoxesRunTime.boxToBoolean(mo59metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        setupPartitionWithMocks(10, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        LogDeleteRecordsResult deleteRecordsOnLeader = partition().deleteRecordsOnLeader(5L);
        Assertions.assertEquals(5L, deleteRecordsOnLeader.requestedOffset());
        Assertions.assertEquals(0L, deleteRecordsOnLeader.lowWatermark());
        activePushSessionOrFail.onAppendRecordsResponse(17L, 5L);
        Assertions.assertEquals(5L, partition().lowWatermarkIfLeader());
    }

    @Test
    public void testOnAppendRecordsResponseCompletesPurgatory() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(BoxesRunTime.boxToBoolean(mo59metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        activePushSessionOrFail.onAppendRecordsResponse(17L, 0L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.never())).checkAndCompleteAll();
        activePushSessionOrFail.onAppendRecordsResponse(19L, 0L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(1))).checkAndCompleteAll();
        Mockito.reset(new DelayedOperations[]{delayedOperations()});
        activePushSessionOrFail.onAppendRecordsResponse(19L, 3L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(0))).checkAndCompleteAll();
        Mockito.when(BoxesRunTime.boxToInteger(delayedOperations().numDelayedDelete())).thenReturn(BoxesRunTime.boxToInteger(1));
        partition().deleteRecordsOnLeader(5L);
        Assertions.assertEquals(3L, partition().lowWatermarkIfLeader());
        activePushSessionOrFail.onAppendRecordsResponse(19L, 5L);
        ((DelayedOperations) Mockito.verify(delayedOperations(), Mockito.times(1))).checkAndCompleteAll();
    }

    @Test
    public void testOnAppendRecordsResponseWithStalePushSession() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        setupPartitionWithMocks(10, true);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(new Some(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        Assertions.assertEquals(22L, orCreateLog.logEndOffset());
        activePushSessionOrFail.onAppendRecordsResponse(19L, 0L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        fetchFollower(partition(), remoteReplicaId(), 17L, fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(1L)), 1L);
        verifyPushSessionEnded(remoteReplicaId(), 0L, false);
        activePushSessionOrFail.onAppendRecordsResponse(21L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        fetchFollower(partition(), remoteReplicaId(), 17L, fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(10)), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(2L)), 0L);
        activePushSessionOrFail.onAppendRecordsResponse(21L, 3L);
        Assertions.assertEquals(19L, orCreateLog.highWatermark());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 2L, 0L);
        });
        PushSession activePushSessionOrFail2 = activePushSessionOrFail(remoteReplicaId(), 0L);
        appendToPartition(partition(), 5);
        Assertions.assertEquals(27L, orCreateLog.logEndOffset());
        activePushSessionOrFail2.onAppendRecordsResponse(23L, 3L);
        Assertions.assertEquals(23L, orCreateLog.highWatermark());
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(10 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader not to change leadership");
        activePushSessionOrFail2.onAppendRecordsResponse(25L, 3L);
        Assertions.assertEquals(23L, orCreateLog.highWatermark());
    }

    @Test
    public void testOnAppendRecordsResponseWithDeletedPartition() {
        LogManager logManager = logManager();
        AbstractLog orCreateLog = logManager.getOrCreateLog(topicPartition(), logManager.getOrCreateLog$default$2(), logManager.getOrCreateLog$default$3(), None$.MODULE$, logManager.getOrCreateLog$default$5());
        seedLogWithData(orCreateLog);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Mockito.when(BoxesRunTime.boxToBoolean(mo59metadataCache().hasAliveBroker(remoteReplicaId()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        setupPartitionWithMocks(10, true);
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        Assertions.assertEquals(17L, orCreateLog.highWatermark());
        appendToPartition(partition(), 5);
        Assertions.assertEquals(22L, orCreateLog.logEndOffset());
        partition().delete();
        activePushSessionOrFail.onAppendRecordsResponse(22L, 0L);
    }

    private void seedLogWithData(AbstractLog abstractLog) {
        appendToLog(abstractLog, 0, 2);
        appendToLog(abstractLog, 3, 3);
        appendToLog(abstractLog, 3, 3);
        appendToLog(abstractLog, 4, 5);
        appendToLog(abstractLog, 7, 1);
        appendToLog(abstractLog, 9, 3);
        Assertions.assertEquals(17L, abstractLog.logEndOffset());
    }

    @Test
    public void testLeaderLogAppendListenerNotifiesPushReplicasForLeaderAppends() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setPartitionEpoch(partition().getPartitionEpoch() + 1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        AbstractLog localLogOrException = partition().localLogOrException();
        appendToPartition(partition(), 5);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(5, localLogOrException.logEndOffset());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 5, appendToPartition(partition(), 5));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        int i = 5 + 5;
        Assertions.assertEquals(i, localLogOrException.logEndOffset());
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId)).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(3L)));
        fetchFollower(partition(), remoteReplicaId, 3L, fetchFollower$default$4(), fetchFollower$default$5(), fetchFollower$default$6(), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), fetchFollower$default$10(), fetchFollower$default$11());
        Assertions.assertEquals(3L, partition().localLogOrException().highWatermark());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), i, appendToPartition(partition(), 5));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(i + 5, localLogOrException.logEndOffset());
    }

    @Test
    public void testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement() {
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(partition().getPartitionEpoch()).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        appendToPartition(partition(), 5);
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 0L, 0L);
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
            return $anonfun$testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement$2(this, BoxesRunTime.unboxToInt(obj));
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i -> {
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyLeaderLsoEvent(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.remoteReplicaId())})), i);
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyNoLeaderLsoEvents(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        });
    }

    @Test
    public void testLeaderOffsetsListenerNotifiesPushReplicasForHighWatermarkIncrement() {
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertTrue(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(partition().getPartitionEpoch()).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(true), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition to succeed");
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(0L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 0L, 0L);
        });
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderHwmEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L);
        PushSession activePushSessionOrFail = activePushSessionOrFail(remoteReplicaId(), 0L);
        appendToPartition(partition(), 5);
        Assertions.assertEquals(partition().localLogOrException().highWatermark(), 0L);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i -> {
            activePushSessionOrFail.onAppendRecordsResponse(i, this.partition().logStartOffset());
            Assertions.assertEquals(i, this.partition().localLogOrException().highWatermark());
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp(i2 -> {
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyLeaderHwmEvent(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.remoteReplicaId())})), i2);
            ((TestUtils.MockPushManager) this.pushReplicationManager().get()).verifyNoLeaderHwmEvents(this.topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(this.brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        });
    }

    @Test
    public void testLeaderLogAppendListenerIgnoresFollowerEvents() {
        setupPartitionWithMocks(5, false);
        partition().appendRecordsToFollower(0L, AppendOrigin.REPLICATION, -1L, Optional.of(Predef$.MODULE$.int2Integer(5)), MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, 5, false, (SimpleRecord[]) RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
            return $anonfun$testLeaderLogAppendListenerIgnoresFollowerEvents$1(BoxesRunTime.unboxToInt(obj));
        }).toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), 3L, Optional.empty());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderHwmEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
        partition().localLogOrException().maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.LeaderOffsetIncremented);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderLsoEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId())})));
    }

    @Test
    public void testLeaderLogAppendListenerIncludesTransactionMarkerAppends() {
        Mockito.when(mo59metadataCache().getAliveBrokerEpoch(remoteReplicaId())).thenReturn(Option$.MODULE$.apply(BoxesRunTime.boxToLong(1L)));
        Mockito.when(mo59metadataCache().getAliveBrokerNode(remoteReplicaId(), interBrokerListenerName())).thenReturn(replicaNode());
        setupPartitionWithMocks(5, true);
        int remoteReplicaId = remoteReplicaId() + 1;
        Partition partition = partition();
        Assertions.assertFalse(partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(brokerId()).setLeaderEpoch(5 + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setPartitionEpoch(partition().getPartitionEpoch() + 1).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId()), new $colon.colon(Predef$.MODULE$.int2Integer(remoteReplicaId), Nil$.MODULE$)))).asJava()).setIsNew(false), offsetCheckpoints(), new Some(topicId()), partition.makeLeader$default$4()), "Expected become leader transition not to change leader");
        Assertions.assertThrows(PushReplicationStartedException.class, () -> {
            this.fetchAsCaughtUpReplica(this.partition(), 1L, 0L);
        });
        AbstractLog localLogOrException = partition().localLogOrException();
        VerificationGuard maybeStartTransactionVerification = partition().maybeStartTransactionVerification(1L, 0, (short) 0);
        AbstractRecords createTransactionalRecords = createTransactionalRecords(new $colon.colon(new SimpleRecord("k1".getBytes(), "v1".getBytes()), Nil$.MODULE$), 0L, createTransactionalRecords$default$3(), 1L);
        partition().appendRecordsToLeader(createTransactionalRecords, AppendOrigin.CLIENT, -1, RequestLocal$.MODULE$.withThreadConfinedCaching(), time().milliseconds(), maybeStartTransactionVerification);
        int size = CollectionConverters$.MODULE$.IterableHasAsScala(createTransactionalRecords.records()).asScala().size();
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), 0L, createTransactionalRecords);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(size, localLogOrException.logEndOffset());
        AbstractRecords withEndTransactionMarker = MemoryRecords.withEndTransactionMarker(1L, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        Partition partition2 = partition();
        AppendOrigin appendOrigin = AppendOrigin.COORDINATOR;
        partition2.appendRecordsToLeader(withEndTransactionMarker, AppendOrigin.COORDINATOR, -1, RequestLocal$.MODULE$.withThreadConfinedCaching(), partition2.appendRecordsToLeader$default$5(), partition2.appendRecordsToLeader$default$6());
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyLeaderAppendEvent(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(remoteReplicaId())})), size, withEndTransactionMarker);
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyNoLeaderAppendEvents(topicIdPartition(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Integer[]{Predef$.MODULE$.int2Integer(brokerId()), Predef$.MODULE$.int2Integer(remoteReplicaId)})));
        Assertions.assertEquals(size + CollectionConverters$.MODULE$.IterableHasAsScala(withEndTransactionMarker.records()).asScala().size(), localLogOrException.logEndOffset());
    }

    private void appendToLog(AbstractLog abstractLog, int i, int i2) {
        IndexedSeq map = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i2).map(obj -> {
            return $anonfun$appendToLog$1(BoxesRunTime.unboxToInt(obj));
        });
        abstractLog.appendAsLeader(MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, i, false, (SimpleRecord[]) map.toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))), i, abstractLog.appendAsLeader$default$3(), abstractLog.appendAsLeader$default$4(), abstractLog.appendAsLeader$default$5(), abstractLog.appendAsLeader$default$6());
    }

    private MemoryRecords appendToPartition(Partition partition, int i) {
        IndexedSeq map = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).map(obj -> {
            return $anonfun$appendToPartition$1(BoxesRunTime.unboxToInt(obj));
        });
        MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, (SimpleRecord[]) map.toArray(ClassTag$.MODULE$.apply(SimpleRecord.class)));
        partition.appendRecordsToLeader(withRecords, AppendOrigin.CLIENT, 1, RequestLocal$.MODULE$.NoCaching(), partition.appendRecordsToLeader$default$5(), partition.appendRecordsToLeader$default$6());
        return withRecords;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogReadInfo fetchAsCaughtUpReplica(Partition partition, long j, long j2) {
        AbstractLog localLogOrException = partition.localLogOrException();
        return fetchFollower(partition, remoteReplicaId(), localLogOrException.logEndOffset(), fetchFollower$default$4(), fetchFollower$default$5(), new Some(BoxesRunTime.boxToInteger(partition.getLeaderEpoch())), fetchFollower$default$7(), fetchFollower$default$8(), fetchFollower$default$9(), new Some(BoxesRunTime.boxToLong(j)), j2);
    }

    private void assertReplicationSessionId(int i, Option<Object> option) {
        Assertions.assertEquals(option, partition().getReplica(i).map(replica -> {
            return BoxesRunTime.boxToLong($anonfun$assertReplicationSessionId$1(replica));
        }));
    }

    private void assertReplicationSessionMode(int i, ReplicationState.Mode mode) {
        Assertions.assertEquals(mode, ((Replica) partition().getReplica(i).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected replica to exist");
        })).stateSnapshot().replicationSessionState().mode());
    }

    private PushSession activePushSessionOrFail(int i, long j) {
        Assertions.assertTrue(partition().remotePushReplicas().contains(Predef$.MODULE$.int2Integer(i)));
        return ((TestUtils.MockPushManager) pushReplicationManager().get()).activePushSessionOrFail(new TestUtils.TopicIdPartitionReplica(topicIdPartition(), i), j);
    }

    private void verifyPushSessionEnded(int i, long j, boolean z) {
        Assertions.assertFalse(partition().remotePushReplicas().contains(Predef$.MODULE$.int2Integer(i)));
        ((TestUtils.MockPushManager) pushReplicationManager().get()).verifyPushSessionEnded(new TestUtils.TopicIdPartitionReplica(topicIdPartition(), remoteReplicaId()), j, z);
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$2(PushReplicationPartitionTest pushReplicationPartitionTest, Semaphore semaphore, boolean z, int i) {
        semaphore.acquire();
        if (z) {
            Partition partition = pushReplicationPartitionTest.partition();
            return partition.makeLeader(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(pushReplicationPartitionTest.brokerId()).setLeaderEpoch(i + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), pushReplicationPartitionTest.offsetCheckpoints(), new Some(pushReplicationPartitionTest.topicId()), partition.makeLeader$default$4());
        }
        Partition partition2 = pushReplicationPartitionTest.partition();
        return partition2.makeFollower(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(pushReplicationPartitionTest.remoteReplicaId()).setLeaderEpoch(i + 1).setIsr(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setPartitionEpoch(2).setReplicas(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.brokerId()), new $colon.colon(Predef$.MODULE$.int2Integer(pushReplicationPartitionTest.remoteReplicaId()), Nil$.MODULE$))).asJava()).setIsNew(false), pushReplicationPartitionTest.offsetCheckpoints(), new Some(pushReplicationPartitionTest.topicId()), partition2.makeFollower$default$4());
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentMakeLeaderWithCaughtUpFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentCaughtUpFetchAndNewReplicaEpochFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    public static final /* synthetic */ boolean $anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$3(Semaphore semaphore) {
        return semaphore.getQueueLength() == 2;
    }

    public static final /* synthetic */ String $anonfun$testConcurrentCaughtUpFetchAndNewReplicaSessionFetch$4() {
        return "Timed out waiting for threads to prepare.";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final LogReadInfo read$1(int i, long j, boolean z, Partition partition, long j2, int i2) {
        int remoteReplicaId = remoteReplicaId();
        Some some = new Some(BoxesRunTime.boxToInteger(i2));
        Some some2 = new Some(BoxesRunTime.boxToLong(1L));
        LogReadInfo fetchFollower = fetchFollower(partition, remoteReplicaId, j, j2, fetchFollower$default$5(), some, new Some(BoxesRunTime.boxToInteger(i)), fetchFollower$default$8(), fetchFollower$default$9(), some2, 0L);
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(fetchFollower.divergingEpoch.isPresent()));
        return fetchFollower;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void updateFetchState$1(int i, Optional optional, long j, long j2, Partition partition, AbstractLog abstractLog) {
        partition.updateFollowerFetchState((Replica) partition.getReplica(i).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected replica to be defined");
        }), new LogOffsetMetadata(7L), 0L, time().milliseconds(), abstractLog.logEndOffset(), j, optional, j2);
    }

    public static final /* synthetic */ LogDeleteRecordsResult $anonfun$testLeaderOffsetsListenerNotifiesPushReplicasForLogStartOffsetIncrement$2(PushReplicationPartitionTest pushReplicationPartitionTest, int i) {
        return pushReplicationPartitionTest.partition().deleteRecordsOnLeader(i);
    }

    public static final /* synthetic */ SimpleRecord $anonfun$testLeaderLogAppendListenerIgnoresFollowerEvents$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ SimpleRecord $anonfun$appendToLog$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ SimpleRecord $anonfun$appendToPartition$1(int i) {
        return new SimpleRecord(String.valueOf(BoxesRunTime.boxToInteger(i)).getBytes());
    }

    public static final /* synthetic */ long $anonfun$assertReplicationSessionId$1(Replica replica) {
        return replica.stateSnapshot().replicationSessionState().replicationSessionId();
    }
}
