/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
import kafka.cluster.Partition$;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.BlockingSend;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.FetcherPool;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.MetadataCache$;
import kafka.server.OffsetTruncationState;
import kafka.server.QuotaFactory;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThreadTest;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.link.ClusterLinkAdminManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestConnectionManager;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkFetcherManager$;
import kafka.server.link.ClusterLinkFetcherThread;
import kafka.server.link.ClusterLinkFollowerFetchThrottler;
import kafka.server.link.ClusterLinkLeaderEndPoint;
import kafka.server.link.ClusterLinkLeaderRequestBuilder;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ClusterLinkTestUtils$;
import kafka.server.link.FetchResponseSize;
import kafka.server.link.FetchState;
import kafka.server.link.LinkMode;
import kafka.server.link.MirrorFailureType;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidClusterLinkException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\t}g\u0001\u0002\u0015*\u0001ABQ!\u000e\u0001\u0005\u0002YBq!\u000f\u0001C\u0002\u0013%!\b\u0003\u0004D\u0001\u0001\u0006Ia\u000f\u0005\b\t\u0002\u0011\r\u0011\"\u0003F\u0011\u0019a\u0005\u0001)A\u0005\r\"9Q\n\u0001b\u0001\n\u0013q\u0005BB+\u0001A\u0003%q\nC\u0004W\u0001\t\u0007I\u0011\u0002(\t\r]\u0003\u0001\u0015!\u0003P\u0011%A\u0006\u00011AA\u0002\u0013%\u0011\fC\u0005^\u0001\u0001\u0007\t\u0019!C\u0005=\"IA\r\u0001a\u0001\u0002\u0003\u0006KA\u0017\u0005\bK\u0002\u0001\r\u0011\"\u0003g\u0011\u001dQ\u0007\u00011A\u0005\n-Da!\u001c\u0001!B\u00139\u0007b\u00028\u0001\u0005\u0004%Ia\u001c\u0005\u0007g\u0002\u0001\u000b\u0011\u00029\t\u000fQ\u0004!\u0019!C\u0005k\"1\u0011\u0010\u0001Q\u0001\nYDQA\u001f\u0001\u0005RmD\u0011\"a\u0016\u0001#\u0003%\t\"!\u0017\t\u0013\u0005=\u0004!%A\u0005\u0012\u0005E\u0004bBA;\u0001\u0011E\u0013q\u000f\u0005\n\u0003o\u0004\u0011\u0013!C\t\u0003sD\u0011\"!@\u0001#\u0003%\t\"!?\t\u000f\u0005}\b\u0001\"\u0011\u0003\u0002!9!\u0011\u0004\u0001\u0005\n\tm\u0001b\u0002B\u0012\u0001\u0011\u0005#\u0011\u0001\u0005\b\u0005[\u0001A\u0011\u0001B\u0001\u0011\u001d\u0011\t\u0004\u0001C\u0001\u0005\u0003AqA!\u000e\u0001\t\u0013\u00119\u0004C\u0004\u0003J\u0001!\tAa\u0013\t\u000f\t\u0005\u0005\u0001\"\u0011\u0003\u0002!9!1\u0013\u0001\u0005B\tU\u0005\"\u0003BP\u0001E\u0005I\u0011\u0001BQ\u0011\u001d\u0011)\u000b\u0001C!\u0005\u0003AqA!+\u0001\t\u0003\u0011\t\u0001C\u0004\u0003.\u0002!\tA!\u0001\t\u000f\tE\u0006\u0001\"\u0015\u00034\na2\t\\;ti\u0016\u0014H*\u001b8l\r\u0016$8\r[3s)\"\u0014X-\u00193UKN$(B\u0001\u0016,\u0003\u0011a\u0017N\\6\u000b\u00051j\u0013AB:feZ,'OC\u0001/\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0019\u0011\u0005I\u001aT\"A\u0016\n\u0005QZ#\u0001\u0007*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rV3ti\u00061A(\u001b8jiz\"\u0012a\u000e\t\u0003q\u0001i\u0011!K\u0001\u000eG2,8\u000f^3s\u0019&t7.\u00133\u0016\u0003m\u0002\"\u0001P!\u000e\u0003uR!AP \u0002\tU$\u0018\u000e\u001c\u0006\u0002\u0001\u0006!!.\u0019<b\u0013\t\u0011UH\u0001\u0003V+&#\u0015AD2mkN$XM\u001d'j].LE\rI\u0001\u0010G2,8\u000f^3s\u0019&t7NT1nKV\ta\t\u0005\u0002H\u00156\t\u0001J\u0003\u0002J\u007f\u0005!A.\u00198h\u0013\tY\u0005J\u0001\u0004TiJLgnZ\u0001\u0011G2,8\u000f^3s\u0019&t7NT1nK\u0002\nAc\u00197vgR,'\u000fT5oW\n\u000b7m[8gM6\u001bX#A(\u0011\u0005A\u001bV\"A)\u000b\u0003I\u000bQa]2bY\u0006L!\u0001V)\u0003\u0007%sG/A\u000bdYV\u001cH/\u001a:MS:\\')Y2l_\u001a4Wj\u001d\u0011\u0002\u001b1\fwmZ5oORKW.Z't\u00039a\u0017mZ4j]\u001e$\u0016.\\3Ng\u0002\nQBZ3uG\",'\u000f\u00165sK\u0006$W#\u0001.\u0011\u0005aZ\u0016B\u0001/*\u0005a\u0019E.^:uKJd\u0015N\\6GKR\u001c\u0007.\u001a:UQJ,\u0017\rZ\u0001\u0012M\u0016$8\r[3s)\"\u0014X-\u00193`I\u0015\fHCA0c!\t\u0001\u0006-\u0003\u0002b#\n!QK\\5u\u0011\u001d\u00197\"!AA\u0002i\u000b1\u0001\u001f\u00132\u000391W\r^2iKJ$\u0006N]3bI\u0002\n\u0011\"[:EK2\f\u00170\u001a3\u0016\u0003\u001d\u0004\"\u0001\u00155\n\u0005%\f&a\u0002\"p_2,\u0017M\\\u0001\u000eSN$U\r\\1zK\u0012|F%Z9\u0015\u0005}c\u0007bB2\u000f\u0003\u0003\u0005\raZ\u0001\u000bSN$U\r\\1zK\u0012\u0004\u0013!\u00054fi\u000eD'+Z:q_:\u001cXmU5{KV\t\u0001\u000f\u0005\u00029c&\u0011!/\u000b\u0002\u0012\r\u0016$8\r\u001b*fgB|gn]3TSj,\u0017A\u00054fi\u000eD'+Z:q_:\u001cXmU5{K\u0002\n!c\u00197vgR,'\u000fT5oW6+GO]5dgV\ta\u000f\u0005\u00029o&\u0011\u00010\u000b\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T3ue&\u001c7/A\ndYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\b%\u0001\u000ede\u0016\fG/\u001a*f[>$X\rT3bI\u0016\u0014XI\u001c3Q_&tG\u000f\u0006\u0007}\u007f\u0006%\u00111CA\u000f\u0003O\tY\u0005\u0005\u00023{&\u0011ap\u000b\u0002\u0015%\u0016lw\u000e^3MK\u0006$WM]#oIB{\u0017N\u001c;\t\u000f\u0005\u0005A\u00031\u0001\u0002\u0004\u0005a!M]8lKJ\u001cuN\u001c4jOB\u0019!'!\u0002\n\u0007\u0005\u001d1FA\u0006LC\u001a\\\u0017mQ8oM&<\u0007bBA\u0006)\u0001\u0007\u0011QB\u0001\u000be\u0016\u0004H.[2b\u001b\u001e\u0014\bc\u0001\u001a\u0002\u0010%\u0019\u0011\u0011C\u0016\u0003\u001dI+\u0007\u000f\\5dC6\u000bg.Y4fe\"9\u0011Q\u0003\u000bA\u0002\u0005]\u0011!B9v_R\f\u0007c\u0001\u001a\u0002\u001a%\u0019\u00111D\u0016\u0003\u0019I+\u0007\u000f\\5dCF+x\u000e^1\t\u000f\u0005}A\u00031\u0001\u0002\"\u0005QB.Z1eKJ,e\u000e\u001a9pS:$(\t\\8dW&twmU3oIB\u0019!'a\t\n\u0007\u0005\u00152F\u0001\u0007CY>\u001c7.\u001b8h'\u0016tG\rC\u0005\u0002*Q\u0001\n\u00111\u0001\u0002,\u0005iAn\\4D_:$X\r\u001f;PaR\u0004R\u0001UA\u0017\u0003cI1!a\fR\u0005\u0019y\u0005\u000f^5p]B!\u00111GA$\u001b\t\t)D\u0003\u0003\u00028\u0005e\u0012!B;uS2\u001c(\u0002BA\u001e\u0003{\taaY8n[>t'b\u0001\u0018\u0002@)!\u0011\u0011IA\"\u0003\u0019\t\u0007/Y2iK*\u0011\u0011QI\u0001\u0004_J<\u0017\u0002BA%\u0003k\u0011!\u0002T8h\u0007>tG/\u001a=u\u0011%\ti\u0005\u0006I\u0001\u0002\u0004\ty%A\u0004uS6,w\n\u001d;\u0011\u000bA\u000bi#!\u0015\u0011\t\u0005M\u00121K\u0005\u0005\u0003+\n)D\u0001\u0003US6,\u0017\u0001J2sK\u0006$XMU3n_R,G*Z1eKJ,e\u000e\u001a)pS:$H\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\u0005m#\u0006BA\u0016\u0003;Z#!a\u0018\u0011\t\u0005\u0005\u00141N\u0007\u0003\u0003GRA!!\u001a\u0002h\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003S\n\u0016AC1o]>$\u0018\r^5p]&!\u0011QNA2\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001%GJ,\u0017\r^3SK6|G/\u001a'fC\u0012,'/\u00128e!>Lg\u000e\u001e\u0013eK\u001a\fW\u000f\u001c;%mU\u0011\u00111\u000f\u0016\u0005\u0003\u001f\ni&\u0001\u000ede\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\u0006\u0010\u0002z\u0005}\u0014qSAN\u0003W\u000bi+a.\u0002:\u0006\u001d\u00171ZAg\u0003\u001f\f)/a:\u0002tB\u0019!'a\u001f\n\u0007\u0005u4F\u0001\u000bSKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0005\b\u0003\u0003;\u0002\u0019AAB\u0003\u0011q\u0017-\\3\u0011\t\u0005\u0015\u00151\u0013\b\u0005\u0003\u000f\u000by\tE\u0002\u0002\nFk!!a#\u000b\u0007\u00055u&\u0001\u0004=e>|GOP\u0005\u0004\u0003#\u000b\u0016A\u0002)sK\u0012,g-C\u0002L\u0003+S1!!%R\u0011\u0019\tIj\u0006a\u0001\u001f\u0006Ia-\u001a;dQ\u0016\u0014\u0018\n\u001a\u0005\b\u0003;;\u0002\u0019AAP\u00031\u0019x.\u001e:dK\n\u0013xn[3s!\u0011\t\t+a*\u000e\u0005\u0005\r&bAAS[\u000591\r\\;ti\u0016\u0014\u0018\u0002BAU\u0003G\u0013aB\u0011:pW\u0016\u0014XI\u001c3Q_&tG\u000fC\u0004\u0002\u0002]\u0001\r!a\u0001\t\u000f\u0005=v\u00031\u0001\u00022\u0006\u0001b-Y5mK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\t\u0004e\u0005M\u0016bAA[W\t\u0001b)Y5mK\u0012\u0004\u0016M\u001d;ji&|gn\u001d\u0005\b\u0003\u00179\u0002\u0019AA\u0007\u0011\u001d\tYl\u0006a\u0001\u0003{\u000bq!\\3ue&\u001c7\u000f\u0005\u0003\u0002@\u0006\rWBAAa\u0015\u0011\tY,!\u000f\n\t\u0005\u0015\u0017\u0011\u0019\u0002\b\u001b\u0016$(/[2t\u0011\u001d\tIm\u0006a\u0001\u0003#\nA\u0001^5nK\"9\u0011QC\fA\u0002\u0005]\u0001bBA\u0010/\u0001\u0007\u0011\u0011\u0005\u0005\b\u0003#<\u0002\u0019AAj\u0003A!\u0018.\u001a:Ti\u0006$XMR3uG\",'\u000fE\u0003Q\u0003[\t)\u000e\u0005\u0003\u0002X\u0006\u0005XBAAm\u0015\u0011\tY.!8\u0002\u000f\u0019,Go\u00195fe*\u0019\u0011q\\\u0017\u0002\tQLWM]\u0005\u0005\u0003G\fIN\u0001\tUS\u0016\u00148\u000b^1uK\u001a+Go\u00195fe\"I\u0011\u0011F\f\u0011\u0002\u0003\u0007\u00111\u0006\u0005\n\u0003S<\u0002\u0013!a\u0001\u0003W\f\u0011CZ3uG\"\u0014\u0015mY6PM\u001al5o\u00149u!\u0015\u0001\u0016QFAw!\r\u0001\u0016q^\u0005\u0004\u0003c\f&\u0001\u0002'p]\u001eD\u0011\"!>\u0018!\u0003\u0005\r!a;\u0002)\u0019,Go\u00195CC\u000e\\wJ\u001a4Ng6\u000b\u0007p\u00149u\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\n4'\u0006\u0002\u0002|*\"\u00111^A/\u0003\u0015\u001a'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1eI\u0011,g-Y;mi\u0012\nD'A\u0004dY\u0016\fg.\u001e9\u0015\u0003}C3A\u0007B\u0003!\u0011\u00119A!\u0006\u000e\u0005\t%!\u0002\u0002B\u0006\u0005\u001b\t1!\u00199j\u0015\u0011\u0011yA!\u0005\u0002\u000f),\b/\u001b;fe*!!1CA\"\u0003\u0015QWO\\5u\u0013\u0011\u00119B!\u0003\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017!E2mkN$XM\u001d'j].\u001cuN\u001c4jOV\u0011!Q\u0004\t\u0004q\t}\u0011b\u0001B\u0011S\t\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0002gMDw.\u001e7e+N,G*Z1eKJ,e\u000eZ(gMN,G/\u00134J]R,'O\u0011:pW\u0016\u0014h+\u001a:tS>t')\u001a7poJ\u0002\u0004f\u0001\u000f\u0003(A!!q\u0001B\u0015\u0013\u0011\u0011YC!\u0003\u0003\tQ+7\u000f^\u0001\u001ei\u0016\u001cHoU8ve\u000e,wJ\u001a4tKR\u001c\b+\u001a8eS:<7\u000b^1uK\"\u001aQDa\n\u0002MQ,7\u000f^*pkJ\u001cWm\u00144gg\u0016$8\u000fU3oI&twm\u0015;bi\u0016<\u0016\u000e\u001e5JEB\u0014d\u0007K\u0002\u001f\u0005O\tqD^3sS\u001aL8k\\;sG\u0016|eMZ:fiN\u0004VM\u001c3j]\u001e\u001cF/\u0019;f)\ry&\u0011\b\u0005\b\u0005wy\u0002\u0019\u0001B\u001f\u0003\rI'\r\u001d\t\u0005\u0005\u007f\u0011)%\u0004\u0002\u0003B)!\u00111\bB\"\u0015\ra\u0013QH\u0005\u0005\u0005\u000f\u0012\tEA\bNKR\fG-\u0019;b-\u0016\u00148/[8o\u0003\u0001\u001aX\r^;q\r\u0016$8\r[3s\u001b\u0006t\u0017mZ3s\u0003:$\u0007+\u0019:uSRLwN\\:\u0015\u0011\t5#q\u000eB>\u0005\u007f\u0002r\u0001\u0015B(\u0005'\u0012I&C\u0002\u0003RE\u0013a\u0001V;qY\u0016\u0014\u0004c\u0001\u001d\u0003V%\u0019!qK\u0015\u00033\rcWo\u001d;fe2Kgn\u001b$fi\u000eDWM]'b]\u0006<WM\u001d\t\u0007\u00057\u0012)G!\u001b\u000e\u0005\tu#\u0002\u0002B0\u0005C\nq!\\;uC\ndWMC\u0002\u0003dE\u000b!bY8mY\u0016\u001cG/[8o\u0013\u0011\u00119G!\u0018\u0003\u0007M+G\u000f\u0005\u0003\u0002\"\n-\u0014\u0002\u0002B7\u0003G\u0013\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\u0005%\u0007\u00051\u0001\u0003rA!!1\u000fB<\u001b\t\u0011)HC\u0002\u000285JAA!\u001f\u0003v\tAQj\\2l)&lW\r\u0003\u0004\u0003~\u0001\u0002\raT\u0001\u000e]Vl\u0007+\u0019:uSRLwN\\:\t\u000f\tm\u0002\u00051\u0001\u0003>\u0005\u0001C/Z:u\r>dGn\\<fe&\u001bH\u000b\u001b:piRdW\rZ(o\u0019><H)[:lQ\r\t#q\u0005\u0015\bC\t\u001d%Q\u0012BH!\u0011\u00119A!#\n\t\t-%\u0011\u0002\u0002\t\t&\u001c\u0018M\u00197fI\u0006)a/\u00197vK\u0006\u0012!\u0011S\u0001\u001d\t&\u001c8\u000e\t;ie>$H\u000f\\3!SN\u0004cn\u001c;!CB\u0004H.[3e\u0003e1XM]5gs6\u000b'o\u001b*fa2L7-\u0019+ie>$H\u000f\\3\u0015\u000b}\u00139Ja'\t\u000f\te%\u00051\u0001\u0002\u000e\u0005q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\b\u0002\u0003BOEA\u0005\t\u0019A(\u0002\u000bQLW.Z:\u0002GY,'/\u001b4z\u001b\u0006\u00148NU3qY&\u001c\u0017\r\u00165s_R$H.\u001a\u0013eK\u001a\fW\u000f\u001c;%eU\u0011!1\u0015\u0016\u0004\u001f\u0006u\u0013\u0001O:i_VdGMT8u\r\u0016$8\r\u001b'fC\u0012,'/\u00129pG\"|eNR5sgR4U\r^2i/&$\b\u000e\u0016:v]\u000e\fG/Z(o\r\u0016$8\r\u001b\u0015\u0004I\t\u001d\u0012\u0001\u0007;fgR4U\r^2iKJ$\u0006N]3bI\n\u000b7m[8gM\"\u001aQEa\n\u00027Q,7\u000f^!eUV\u001cH\u000fT1hO&tw\rU1si&$\u0018n\u001c8tQ\r1#qE\u0001\u001bm\u0016\u0014\u0018NZ=PM\u001a\u001cX\r\u001e*fcV,7\u000f\u001e,feNLwN\u001c\u000b\b?\nU&q\u0017Bh\u0011\u001d\u0011Yd\na\u0001\u0005{AqA!/(\u0001\u0004\u0011Y,A\u000epM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\t\u0005\u0005{\u0013IM\u0004\u0003\u0003@\n\u0015WB\u0001Ba\u0015\u0011\u0011\u0019-!\u000f\u0002\u0011I,\u0017/^3tiNLAAa2\u0003B\u0006arJ\u001a4tKR\u001chi\u001c:MK\u0006$WM]#q_\u000eD'+Z9vKN$\u0018\u0002\u0002Bf\u0005\u001b\u0014qAQ;jY\u0012,'O\u0003\u0003\u0003H\n\u0005\u0007b\u0002BiO\u0001\u0007!1[\u0001\u0013Y&\u001cHo\u00144gg\u0016$8OU3rk\u0016\u001cH\u000f\u0005\u0003\u0003V\nmg\u0002\u0002B`\u0005/LAA!7\u0003B\u0006\u0011B*[:u\u001f\u001a47/\u001a;t%\u0016\fX/Z:u\u0013\u0011\u0011YM!8\u000b\t\te'\u0011\u0019")
public class ClusterLinkFetcherThreadTest
extends ReplicaFetcherThreadTest {
    private final UUID kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName;
    private final int clusterLinkBackoffMs;
    private final int laggingTimeMs;
    private ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    private boolean kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed = false;
    private final FetchResponseSize kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize = new FetchResponseSize(100, 1000);
    private final ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics = new ClusterLinkMetrics(this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), (LinkMode)LinkMode.Destination$.MODULE$, null, (Option)None$.MODULE$, new Metrics(), (Option)None$.MODULE$);

    public UUID kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId;
    }

    public String kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName;
    }

    private int clusterLinkBackoffMs() {
        return this.clusterLinkBackoffMs;
    }

    private int laggingTimeMs() {
        return this.laggingTimeMs;
    }

    public ClusterLinkFetcherThread kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread_$eq(ClusterLinkFetcherThread x$1) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread = x$1;
    }

    private boolean kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed;
    }

    public void kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed_$eq(boolean x$1) {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed = x$1;
    }

    public FetchResponseSize kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics() {
        return this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics;
    }

    @Override
    public RemoteLeaderEndPoint createRemoteLeaderEndPoint(KafkaConfig brokerConfig, ReplicaManager replicaMgr, ReplicaQuota quota, BlockingSend leaderEndpointBlockingSend, Option<LogContext> logContextOpt, Option<Time> timeOpt) {
        LogContext logContext = (LogContext)logContextOpt.getOrElse((Function0 & Serializable & scala.Serializable)() -> new LogContext());
        FetchSessionHandler fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id());
        ClusterLinkLeaderRequestBuilder requestBuilder = new ClusterLinkLeaderRequestBuilder(this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig(), (Time)timeOpt.getOrElse((Function0 & Serializable & scala.Serializable)() -> new SystemTime()));
        ClusterLinkFollowerFetchThrottler throttler = new ClusterLinkFollowerFetchThrottler();
        return new ClusterLinkLeaderEndPoint(this, logContext, leaderEndpointBlockingSend, fetchSessionHandler, requestBuilder, throttler, brokerConfig, replicaMgr, quota){

            public boolean isReadyForFetch(TopicPartition tp) {
                return true;
            }
        };
    }

    @Override
    public Option<LogContext> createRemoteLeaderEndPoint$default$5() {
        return None$.MODULE$;
    }

    @Override
    public Option<Time> createRemoteLeaderEndPoint$default$6() {
        return None$.MODULE$;
    }

    @Override
    public ReplicaFetcherThread createReplicaFetcherThread(String name, int fetcherId, BrokerEndPoint sourceBroker, KafkaConfig brokerConfig, FailedPartitions failedPartitions, ReplicaManager replicaMgr, Metrics metrics, Time time, ReplicaQuota quota, BlockingSend leaderEndpointBlockingSend, Option<TierStateFetcher> tierStateFetcher, Option<LogContext> logContextOpt, Option<Object> fetchBackOffMsOpt, Option<Object> fetchBackOffMsMaxOpt) {
        ClusterLinkFetcherManager fetcherManager = (ClusterLinkFetcherManager)Mockito.mock(ClusterLinkFetcherManager.class);
        Mockito.when((Object)fetcherManager.partition((TopicPartition)ArgumentMatchers.any(TopicPartition.class))).thenReturn((Object)None$.MODULE$);
        ((ClusterLinkFetcherManager)Mockito.doNothing().when((Object)fetcherManager)).updatePartitionFetchState((TopicPartition)ArgumentMatchers.any(TopicPartition.class), (FetchState)ArgumentMatchers.any(FetchState.class));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)fetcherManager.onPartitionLinkFailure((TopicPartition)ArgumentMatchers.any(TopicPartition.class), (MirrorFailureType)ArgumentMatchers.any(MirrorFailureType.class), ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        RemoteLeaderEndPoint leader = this.createRemoteLeaderEndPoint(brokerConfig, replicaMgr, quota, leaderEndpointBlockingSend, logContextOpt, (Option<Time>)new Some((Object)time));
        return new ClusterLinkFetcherThread(this, name, leader, brokerConfig, fetcherManager, failedPartitions, replicaMgr, quota, time){
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

            public void clearPartitionLinkFailure(TopicPartition tp, long fetchOffset) {
            }

            public void delayPartitions(Iterable<TopicPartition> partitions) {
                super.delayPartitions(partitions);
                this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed_$eq(true);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super(name$1, 0, (ClusterLinkLeaderEndPoint)leader$1, brokerConfig$2, $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig(), new ClusterLinkMetadata(brokerConfig$2, $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), (LinkMode)LinkMode.Destination$.MODULE$, 100L, 60000L), fetcherManager$1, failedPartitions$1, replicaMgr$2, quota$2, $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics(), time$1, (Function0)new scala.Serializable($outer){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;

                    public final FetchResponseSize apply() {
                        return this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize();
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                }, (ClusterLinkNetworkClient)Mockito.mock(ClusterLinkNetworkClient.class), (Option)None$.MODULE$, (Option)None$.MODULE$);
            }
        };
    }

    @Override
    public Option<Object> createReplicaFetcherThread$default$13() {
        return None$.MODULE$;
    }

    @Override
    public Option<Object> createReplicaFetcherThread$default$14() {
        return None$.MODULE$;
    }

    @Override
    @AfterEach
    public void cleanup() {
        if (this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread() != null) {
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().shutdown();
        }
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().shutdown();
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed_$eq(false);
        super.cleanup();
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", new StringBuilder(1).append(this.brokerEndPoint().host()).append(":").append(this.brokerEndPoint().port()).toString());
        props.put(KafkaConfig$.MODULE$.ReplicaFetchBackoffMsProp(), Integer.toString(this.clusterLinkBackoffMs()));
        props.put(ClusterLinkConfig$.MODULE$.LinkFetcherMaxLaggingPartitionsProp(), "2");
        props.put(ClusterLinkConfig$.MODULE$.LinkFetcherLaggingPartitionMsProp(), Integer.toString(this.laggingTimeMs()));
        return ClusterLinkConfig$.MODULE$.create((java.util.Map)props, ClusterLinkConfig$.MODULE$.create$default$2());
    }

    @Override
    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig brokerConfig = KafkaConfig$.MODULE$.fromProps(props);
        Metrics metrics = new Metrics();
        boolean isMultiTenant = ConfluentConfigs.buildMultitenantMetadata((java.util.Map)brokerConfig.values(), (Metrics)metrics) != null;
        ClusterLinkAdminManager clusterLinkAdminManager = new ClusterLinkAdminManager(brokerConfig, "clusterId", ClusterLinkTestUtils$.MODULE$.createClusterLinkManager(MetadataVersion.IBP_0_11_0_IV0), metrics, (Time)new MockTime(), isMultiTenant);
        try {
            NewClusterLink newClusterLink = new NewClusterLink("test-link", "clusterId", (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().empty()).asJava());
            Assertions.assertThrows(InvalidClusterLinkException.class, () -> clusterLinkAdminManager.createClusterLink(newClusterLink, (Option)None$.MODULE$, new ListenerName("EXTERNAL"), false, false, 1000, 1).get());
        }
        finally {
            clusterLinkAdminManager.shutdown();
        }
    }

    @Test
    public void testSourceOffsetsPendingState() {
        this.verifySourceOffsetsPendingState(MetadataVersion.latest());
    }

    @Test
    public void testSourceOffsetsPendingStateWithIbp26() {
        this.verifySourceOffsetsPendingState(MetadataVersion.IBP_2_6_IV0);
    }

    /*
     * WARNING - void declaration
     */
    private void verifySourceOffsetsPendingState(MetadataVersion ibp) {
        void var5_5;
        void var4_4;
        MockTime time = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = this.setupFetcherManagerAndPartitions(time, 1, ibp);
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        ClusterLinkFetcherManager fetcherManager = (ClusterLinkFetcherManager)tuple2._1();
        Set partitions = (Set)tuple2._2();
        void fetcherManager2 = var4_4;
        void partitions2 = var5_5;
        TopicPartition tp = new TopicPartition("topic", 0);
        Partition partition = (Partition)partitions2.head();
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), (String)"Fetcher thread created without metadata");
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith((String)"cluster", (int)1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(1)), x$2 -> Predef$.MODULE$.int2Integer(1), MetadataResponse.PartitionMetadata::new, (short)ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
        fetcherManager2.currentMetadata().update(1, metadataResponse, false, time.milliseconds());
        fetcherManager2.onNewMetadata(fetcherManager2.currentMetadata().fetch());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), (String)"Fetcher thread not created");
        if (ibp.isTruncationOnFetchSupported()) {
            Assertions.assertFalse((boolean)ClusterLinkFetcherThreadTest.offsetsPending$1(partition), (String)new StringBuilder(24).append("State not reset for IBP ").append(ibp).toString());
            return;
        }
        Assertions.assertTrue((boolean)ClusterLinkFetcherThreadTest.offsetsPending$1(partition), (String)"State reset before fetching offsets");
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((Map)Predef$.MODULE$.Map().empty());
        Assertions.assertTrue((boolean)ClusterLinkFetcherThreadTest.offsetsPending$1(partition), (String)"State reset before source offsets available");
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)new OffsetTruncationState(10L, false))})));
        Assertions.assertTrue((boolean)ClusterLinkFetcherThreadTest.offsetsPending$1(partition), (String)"State reset before truncation");
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetchOffsetAndMaybeMarkTruncationComplete((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)new OffsetTruncationState(10L, true))})));
        Assertions.assertFalse((boolean)ClusterLinkFetcherThreadTest.offsetsPending$1(partition), (String)"State not reset after truncation");
    }

    public Tuple2<ClusterLinkFetcherManager, Set<Partition>> setupFetcherManagerAndPartitions(MockTime time, int numPartitions, MetadataVersion ibp) {
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        AlterPartitionListener alterPartitionListener = (AlterPartitionListener)Mockito.mock(AlterPartitionListener.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        BrokerTopicStats brokerTopicStats = (BrokerTopicStats)Mockito.mock(BrokerTopicStats.class);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn((Object)brokerTopicStats);
        Set partitions = (Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), numPartitions - 1).foreach((Function1 & Serializable & scala.Serializable)partitionNum -> ClusterLinkFetcherThreadTest.$anonfun$setupFetcherManagerAndPartitions$1(time, alterPartitionListener, logManager, log, replicaManager, partitions, BoxesRunTime.unboxToInt((Object)partitionNum)));
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), ibp.shortVersion());
        KafkaConfig brokerConfig = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend blockingSend = (BlockingSend)Mockito.mock(BlockingSend.class);
        ((BlockingSend)Mockito.doNothing().when((Object)blockingSend)).close();
        Mockito.when((Object)blockingSend.brokerEndPoint()).thenReturn((Object)this.brokerEndPoint());
        ClusterLinkDestConnectionManager connManager = (ClusterLinkDestConnectionManager)Mockito.mock(ClusterLinkDestConnectionManager.class);
        Mockito.when((Object)connManager.reverseConnectionProvider((NetworkClient)ArgumentMatchers.any(), (Option)ArgumentMatchers.any(), (String)ArgumentMatchers.any())).thenReturn((Object)None$.MODULE$);
        ClusterLinkManager linkManager = (ClusterLinkManager)Mockito.mock(ClusterLinkManager.class);
        Mockito.when((Object)linkManager.fetchResponseSize((ClusterLinkConfig)ArgumentMatchers.any())).thenReturn((Object)this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetchResponseSize());
        ClusterLinkFetcherManager fetcherManager = new ClusterLinkFetcherManager(this, linkManager, connManager, brokerConfig, replicaManager, time, blockingSend, numPartitions){
            private final /* synthetic */ ClusterLinkFetcherThreadTest $outer;
            public final KafkaConfig brokerConfig$3;
            public final ReplicaManager replicaManager$1;
            private final BlockingSend blockingSend$1;
            private final MockTime time$2;
            private final int numPartitions$1;

            public ClusterLinkFetcherThread createFetcherThread(int fetcherId, BrokerEndPoint sourceBroker, FetcherPool fetcherPool) {
                ClusterLinkLeaderEndPoint leader = (ClusterLinkLeaderEndPoint)this.$outer.createRemoteLeaderEndPoint(this.brokerConfig$3, this.replicaManager$1, (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, this.blockingSend$1, (Option<LogContext>)None$.MODULE$, (Option<Time>)new Some((Object)((Object)this.time$2)));
                this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread_$eq(new ClusterLinkFetcherThread(this, leader){

                    public void truncate(TopicPartition tp, OffsetTruncationState offsetTruncationState) {
                    }

                    public Option<Object> latestEpoch(TopicPartition topicPartition) {
                        return new Some((Object)BoxesRunTime.boxToInteger((int)1));
                    }
                });
                return this.$outer.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread();
            }

            public int partitionCount(String topic) {
                return this.numPartitions$1;
            }

            public /* synthetic */ ClusterLinkFetcherThreadTest kafka$server$link$ClusterLinkFetcherThreadTest$$anon$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.brokerConfig$3 = brokerConfig$3;
                this.replicaManager$1 = replicaManager$1;
                this.blockingSend$1 = blockingSend$1;
                this.time$2 = time$2;
                this.numPartitions$1 = numPartitions$1;
                super($outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName(), $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkId(), $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkConfig(), linkManager$1, connManager$1, brokerConfig$3, replicaManager$1, null, (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, $outer.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics(), (Option)None$.MODULE$, (Option)None$.MODULE$, (Time)time$2, brokerConfig$3.interBrokerProtocolVersion().isTruncationOnFetchSupported(), ClusterLinkFetcherManager$.MODULE$.$lessinit$greater$default$15());
            }
        };
        fetcherManager.initializeMetadata();
        fetcherManager.addLinkedFetcherForPartitions((Iterable)partitions);
        return new Tuple2((Object)fetcherManager, (Object)partitions);
    }

    @Override
    @Disabled(value="Disk throttle is not applied")
    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        super.testFollowerIsThrottledOnLowDisk();
    }

    @Override
    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int times) {
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)times))).markClusterLinkReplicaThrottle();
    }

    @Override
    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Override
    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        this.verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest(), 1);
    }

    @Test
    public void testFetcherThreadBackoff() {
        super.shouldPollIndefinitelyIfLeaderReturnsAnyException();
        Assertions.assertTrue((boolean)this.kafka$server$link$ClusterLinkFetcherThreadTest$$isDelayed());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testAdjustLaggingPartitions() {
        void var4_4;
        void var3_3;
        MockTime time = new MockTime();
        Tuple2<ClusterLinkFetcherManager, Set<Partition>> tuple2 = this.setupFetcherManagerAndPartitions(time, 4, MetadataVersion.latest());
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        ClusterLinkFetcherManager fetcherManager = (ClusterLinkFetcherManager)tuple2._1();
        Set partitions = (Set)tuple2._2();
        void fetcherManager2 = var3_3;
        void partitions2 = var4_4;
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), (String)"Fetcher thread created without metadata");
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith((String)"cluster", (int)1, Collections.singletonMap("topic", Errors.NONE), Collections.singletonMap("topic", Predef$.MODULE$.int2Integer(4)), x$4 -> Predef$.MODULE$.int2Integer(1), MetadataResponse.PartitionMetadata::new, (short)ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
        fetcherManager2.currentMetadata().update(1, metadataResponse, false, time.milliseconds());
        fetcherManager2.onNewMetadata(fetcherManager2.currentMetadata().fetch());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread(), (String)"Fetcher thread not created");
        Assertions.assertEquals((int)1, (int)fetcherManager2.fetcherThreadMap().size());
        Assertions.assertEquals((int)0, (int)fetcherManager2.unassignedPartitionCount());
        int laggingPartitionCaughtUpTime = 1;
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partitionNum -> {
            TopicPartition tp = new TopicPartition("topic", partitionNum);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(tp, (long)laggingPartitionCaughtUpTime);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(tp, (long)(partitionNum + 100));
        });
        fetcherManager2.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals((int)0, (int)fetcherManager2.unassignedPartitionCount());
        Assertions.assertEquals((int)2, (int)fetcherManager2.throttledPartitionCount());
        fetcherManager2.throttledPartitions().contains((Object)new TopicPartition("topic", 2));
        fetcherManager2.throttledPartitions().contains((Object)new TopicPartition("topic", 3));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partitionNum -> {
            TopicPartition tp = new TopicPartition("topic", partitionNum);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(tp, 0L);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(tp, Long.MAX_VALUE);
        });
        fetcherManager2.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals((int)0, (int)fetcherManager2.unassignedPartitionCount());
        Assertions.assertEquals((int)0, (int)fetcherManager2.throttledPartitionCount());
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(2), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partitionNum -> {
            TopicPartition tp = new TopicPartition("topic", partitionNum);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(tp, (long)laggingPartitionCaughtUpTime);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(tp, (long)(partitionNum + 100));
        });
        Assertions.assertEquals((Object)new Tuple2((Object)ClusterLinkFetcherThread.AdjustmentType$.MODULE$.NoAdjustment(), (Object)None$.MODULE$), (Object)this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().adjustLaggingPartitionsRequired((long)(laggingPartitionCaughtUpTime + this.laggingTimeMs() * 2)));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)partitionNum -> {
            TopicPartition tp = new TopicPartition("topic", partitionNum);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updatePartitionLastCaughtUpTime(tp, (long)laggingPartitionCaughtUpTime);
            this.kafka$server$link$ClusterLinkFetcherThreadTest$$fetcherThread().updateFetcherLagStats(tp, (long)(partitionNum + 100));
        });
        fetcherManager2.maybeAdjustFetcherLaggingPartitions();
        Assertions.assertEquals((int)0, (int)fetcherManager2.unassignedPartitionCount());
        Assertions.assertEquals((int)2, (int)fetcherManager2.throttledPartitionCount());
        fetcherManager2.throttledPartitions().contains((Object)new TopicPartition("topic", 2));
        fetcherManager2.throttledPartitions().contains((Object)new TopicPartition("topic", 3));
        fetcherManager2.addLinkedFetcherForPartitions((Iterable)partitions2);
        Assertions.assertEquals((int)0, (int)fetcherManager2.unassignedPartitionCount());
        Assertions.assertEquals((int)0, (int)fetcherManager2.throttledPartitionCount());
    }

    @Override
    public void verifyOffsetRequestVersion(MetadataVersion ibp, OffsetsForLeaderEpochRequest.Builder offsetForLeaderEpochRequest, ListOffsetsRequest.Builder listOffsetsRequest) {
        Assertions.assertEquals((int)3, (int)offsetForLeaderEpochRequest.oldestAllowedVersion());
        Assertions.assertEquals((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (short)offsetForLeaderEpochRequest.latestAllowedVersion());
        Assertions.assertEquals((int)0, (int)listOffsetsRequest.oldestAllowedVersion());
        Assertions.assertEquals((short)ApiKeys.LIST_OFFSETS.latestVersion(), (short)listOffsetsRequest.latestAllowedVersion());
    }

    private static final boolean offsetsPending$1(Partition partition$1) {
        return BoxesRunTime.unboxToBoolean((Object)TestUtils.fieldValue((Object)partition$1, Partition.class, (String)"needsLinkedLeaderOffsets"));
    }

    public static final /* synthetic */ Set $anonfun$setupFetcherManagerAndPartitions$1(MockTime time$2, AlterPartitionListener alterPartitionListener$1, LogManager logManager$1, AbstractLog log$1, ReplicaManager replicaManager$1, Set partitions$1, int partitionNum) {
        TopicPartition tp = new TopicPartition("topic", partitionNum);
        Partition partition = new Partition(tp, 10000L, MetadataVersion.latest(), 0, (Time)time$2, alterPartitionListener$1, (DelayedOperations)Mockito.mock(DelayedOperations.class), (MetadataCache)MetadataCache$.MODULE$.zkMetadataCache(0, MetadataVersion.latest(), MetadataCache$.MODULE$.zkMetadataCache$default$3(), MetadataCache$.MODULE$.zkMetadataCache$default$4()), logManager$1, (Option)None$.MODULE$, (Option)None$.MODULE$, (Option)None$.MODULE$, (AlterPartitionManager)TestUtils$.MODULE$.createAlterIsrManager(), (Option)None$.MODULE$, Partition$.MODULE$.$lessinit$greater$default$15(), Partition$.MODULE$.$lessinit$greater$default$16());
        partition.log_$eq((Option)new Some((Object)log$1));
        Mockito.when((Object)replicaManager$1.localLogOrException(tp)).thenReturn((Object)log$1);
        Mockito.when((Object)replicaManager$1.onlinePartition(tp)).thenReturn((Object)new Some((Object)partition));
        TestUtils.setFieldValue((Object)partition, (String)"leaderEpoch", (Object)BoxesRunTime.boxToInteger((int)2));
        return (Set)partitions$1.$plus$eq((Object)partition);
    }

    public ClusterLinkFetcherThreadTest() {
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkName = "testCluster";
        this.clusterLinkBackoffMs = 100;
        this.laggingTimeMs = 1000;
        this.kafka$server$link$ClusterLinkFetcherThreadTest$$clusterLinkMetrics().startup();
    }
}

