/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.server.BlockingSend;
import kafka.server.FailedPartitions;
import kafka.server.FetcherPool;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestConnectionManager;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkFetcherManager$;
import kafka.server.link.ClusterLinkFetcherThread;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.FetchResponseSize;
import kafka.server.link.LinkMode;
import kafka.server.link.MetadataListener;
import kafka.server.link.MirrorFailureType;
import kafka.server.link.PartitionAndState;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\rMc\u0001\u0002%J\u0001ACQa\u0016\u0001\u0005\u0002aCqa\u0017\u0001C\u0002\u0013EA\f\u0003\u0004d\u0001\u0001\u0006I!\u0018\u0005\bI\u0002\u0011\r\u0011\"\u0003f\u0011\u0019q\u0007\u0001)A\u0005M\"9q\u000e\u0001b\u0001\n\u0013\u0001\bBB<\u0001A\u0003%\u0011\u000fC\u0004y\u0001\t\u0007I\u0011B=\t\u000f\u0005-\u0001\u0001)A\u0005u\"I\u0011Q\u0002\u0001C\u0002\u0013%\u0011q\u0002\u0005\t\u0003/\u0001\u0001\u0015!\u0003\u0002\u0012!I\u0011\u0011\u0004\u0001C\u0002\u0013%\u00111\u0004\u0005\t\u0003S\u0001\u0001\u0015!\u0003\u0002\u001e!I\u00111\u0006\u0001C\u0002\u0013%\u0011Q\u0006\u0005\t\u0003o\u0001\u0001\u0015!\u0003\u00020!I\u0011\u0011\b\u0001C\u0002\u0013%\u00111\b\u0005\t\u0003\u000f\u0002\u0001\u0015!\u0003\u0002>!Y\u0011\u0011\n\u0001A\u0002\u0003\u0007I\u0011BA&\u0011-\t\u0019\u0006\u0001a\u0001\u0002\u0004%I!!\u0016\t\u0017\u0005\u0005\u0004\u00011A\u0001B\u0003&\u0011Q\n\u0005\n\u0003G\u0002!\u0019!C\u0005\u0003KB\u0001\"!\u001c\u0001A\u0003%\u0011q\r\u0005\n\u0003_\u0002!\u0019!C\u0005\u0003cB\u0001\"!\u001f\u0001A\u0003%\u00111\u000f\u0005\f\u0003w\u0002\u0001\u0019!a\u0001\n\u0013\ti\bC\u0006\u0002\u0006\u0002\u0001\r\u00111A\u0005\n\u0005\u001d\u0005bCAF\u0001\u0001\u0007\t\u0011)Q\u0005\u0003\u007fB1\"!$\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u0010\"Y\u0011\u0011\u0015\u0001A\u0002\u0003\u0007I\u0011BAR\u0011-\t9\u000b\u0001a\u0001\u0002\u0003\u0006K!!%\t\u0013\u0005%\u0006\u00011A\u0005\n\u0005-\u0006\"CAZ\u0001\u0001\u0007I\u0011BA[\u0011!\tI\f\u0001Q!\n\u00055\u0006bCA^\u0001\u0001\u0007\t\u0019!C\u0005\u0003{C1\"!2\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002H\"Y\u00111\u001a\u0001A\u0002\u0003\u0005\u000b\u0015BA`\u0011\u001d\ti\r\u0001C\u0001\u0003\u001fDq!!:\u0001\t\u0003\ty\rC\u0004\u0002p\u0002!\t!a4\t\u000f\u0005e\b\u0001\"\u0001\u0002P\"9\u0011Q \u0001\u0005\u0002\u0005=\u0007b\u0002B\u0001\u0001\u0011\u0005\u0011q\u001a\u0005\b\u0005\u000b\u0001A\u0011AAh\u0011\u001d\u0011I\u0001\u0001C\u0001\u0003\u001fDqA!\u0004\u0001\t\u0003\ty\rC\u0004\u0003\u0012\u0001!\t!a4\t\u000f\tU\u0001\u0001\"\u0001\u0002P\"9!\u0011\u0004\u0001\u0005\u0002\u0005=\u0007b\u0002B\u000f\u0001\u0011\u0005\u0011q\u001a\u0005\b\u0005C\u0001A\u0011AAh\u0011\u001d\u0011)\u0003\u0001C\u0001\u0003\u001fDqA!\u000b\u0001\t\u0013\u0011Y\u0003C\u0004\u0003`\u0001!\t!a4\t\u000f\t\r\u0004\u0001\"\u0001\u0002P\"9!q\r\u0001\u0005\u0002\u0005=\u0007b\u0002B6\u0001\u0011%!Q\u000e\u0005\b\u0005\u0007\u0003A\u0011AAh\u0011\u001d\u00119\t\u0001C\u0001\u0003\u001fDqAa#\u0001\t\u0003\ty\rC\u0004\u0003\u0010\u0002!IA!%\t\u000f\tU\u0005\u0001\"\u0003\u0003\u0018\"9!1\u0015\u0001\u0005\n\t\u0015\u0006b\u0002Bh\u0001\u0011%!\u0011\u001b\u0005\n\u0005w\u0004\u0011\u0013!C\u0005\u0005{D\u0011ba\u0005\u0001#\u0003%IA!@\t\u0013\rU\u0001!%A\u0005\n\r]\u0001\"CB\u000e\u0001E\u0005I\u0011BB\f\u0011\u001d\u0019i\u0002\u0001C\u0005\u0007?A\u0011b!\u000e\u0001#\u0003%Iaa\u000e\t\u000f\rm\u0002\u0001\"\u0003\u0004>!91\u0011\n\u0001\u0005\n\r-#!H\"mkN$XM\u001d'j].4U\r^2iKJl\u0015M\\1hKJ$Vm\u001d;\u000b\u0005)[\u0015\u0001\u00027j].T!\u0001T'\u0002\rM,'O^3s\u0015\u0005q\u0015!B6bM.\f7\u0001A\n\u0003\u0001E\u0003\"AU+\u000e\u0003MS\u0011\u0001V\u0001\u0006g\u000e\fG.Y\u0005\u0003-N\u0013a!\u00118z%\u00164\u0017A\u0002\u001fj]&$h\bF\u0001Z!\tQ\u0006!D\u0001J\u0003\rI'\r]\u000b\u0002;B\u0011a,Y\u0007\u0002?*\u0011\u0001-T\u0001\u0004CBL\u0017B\u00012`\u0005)\t\u0005/\u001b,feNLwN\\\u0001\u0005S\n\u0004\b%\u0001\u0004mS:\\\u0017\nZ\u000b\u0002MB\u0011q\r\\\u0007\u0002Q*\u0011\u0011N[\u0001\u0005kRLGNC\u0001l\u0003\u0011Q\u0017M^1\n\u00055D'\u0001B+V\u0013\u0012\u000bq\u0001\\5oW&#\u0007%\u0001\u0005mS:\\g*Y7f+\u0005\t\bC\u0001:v\u001b\u0005\u0019(B\u0001;k\u0003\u0011a\u0017M\\4\n\u0005Y\u001c(AB*ue&tw-A\u0005mS:\\g*Y7fA\u0005i1o\\;sG\u0016$v\u000e]5d\u0013\u0012,\u0012A\u001f\t\u0004w\u0006\u001dQ\"\u0001?\u000b\u0005ut\u0018AB2p[6|gN\u0003\u0002O\u007f*!\u0011\u0011AA\u0002\u0003\u0019\t\u0007/Y2iK*\u0011\u0011QA\u0001\u0004_J<\u0017bAA\u0005y\n!Q+^5e\u00039\u0019x.\u001e:dKR{\u0007/[2JI\u0002\nq!\\3ue&\u001c7/\u0006\u0002\u0002\u0012A\u0019!,a\u0005\n\u0007\u0005U\u0011J\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018\u0001C7fiJL7m\u001d\u0011\u0002\tQLW.Z\u000b\u0003\u0003;\u0001B!a\b\u0002&5\u0011\u0011\u0011\u0005\u0006\u0004\u0003Ga\u0018!B;uS2\u001c\u0018\u0002BA\u0014\u0003C\u0011\u0001\"T8dWRKW.Z\u0001\u0006i&lW\rI\u0001\u000fe\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s+\t\ty\u0003\u0005\u0003\u00022\u0005MR\"A&\n\u0007\u0005U2J\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\u0002\u001fI,\u0007\u000f\\5dC6\u000bg.Y4fe\u0002\n1\u0001\\8h+\t\ti\u0004\u0005\u0003\u0002@\u0005\rSBAA!\u0015\r\tI$T\u0005\u0005\u0003\u000b\n\tEA\u0006BEN$(/Y2u\u0019><\u0017\u0001\u00027pO\u0002\nAB\u0019:pW\u0016\u00148i\u001c8gS\u001e,\"!!\u0014\u0011\t\u0005E\u0012qJ\u0005\u0004\u0003#Z%aC&bM.\f7i\u001c8gS\u001e\f\u0001C\u0019:pW\u0016\u00148i\u001c8gS\u001e|F%Z9\u0015\t\u0005]\u0013Q\f\t\u0004%\u0006e\u0013bAA.'\n!QK\\5u\u0011%\tyfEA\u0001\u0002\u0004\ti%A\u0002yIE\nQB\u0019:pW\u0016\u00148i\u001c8gS\u001e\u0004\u0013a\u00037j].l\u0015M\\1hKJ,\"!a\u001a\u0011\u0007i\u000bI'C\u0002\u0002l%\u0013!c\u00117vgR,'\u000fT5oW6\u000bg.Y4fe\u0006aA.\u001b8l\u001b\u0006t\u0017mZ3sA\u0005Y1m\u001c8o\u001b\u0006t\u0017mZ3s+\t\t\u0019\bE\u0002[\u0003kJ1!a\u001eJ\u0005\u0001\u001aE.^:uKJd\u0015N\\6EKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0002\u0019\r|gN\\'b]\u0006<WM\u001d\u0011\u0002\u001d\u0019,Go\u00195fe6\u000bg.Y4feV\u0011\u0011q\u0010\t\u00045\u0006\u0005\u0015bAAB\u0013\nI2\t\\;ti\u0016\u0014H*\u001b8l\r\u0016$8\r[3s\u001b\u0006t\u0017mZ3s\u0003I1W\r^2iKJl\u0015M\\1hKJ|F%Z9\u0015\t\u0005]\u0013\u0011\u0012\u0005\n\u0003?R\u0012\u0011!a\u0001\u0003\u007f\nqBZ3uG\",'/T1oC\u001e,'\u000fI\u0001\u0010I\u0016\u001cH/\u00113nS:\u001cE.[3oiV\u0011\u0011\u0011\u0013\t\u0005\u0003'\u000bi*\u0004\u0002\u0002\u0016*!\u0011qSAM\u0003\u0015\tG-\\5o\u0015\r\tYJ`\u0001\bG2LWM\u001c;t\u0013\u0011\ty*!&\u0003\u000b\u0005#W.\u001b8\u0002'\u0011,7\u000f^!e[&t7\t\\5f]R|F%Z9\u0015\t\u0005]\u0013Q\u0015\u0005\n\u0003?j\u0012\u0011!a\u0001\u0003#\u000b\u0001\u0003Z3ti\u0006#W.\u001b8DY&,g\u000e\u001e\u0011\u0002\u001b9,X\u000eU1si&$\u0018n\u001c8t+\t\ti\u000bE\u0002S\u0003_K1!!-T\u0005\rIe\u000e^\u0001\u0012]Vl\u0007+\u0019:uSRLwN\\:`I\u0015\fH\u0003BA,\u0003oC\u0011\"a\u0018!\u0003\u0003\u0005\r!!,\u0002\u001d9,X\u000eU1si&$\u0018n\u001c8tA\u0005\t2\r\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0016\u0005\u0005}\u0006c\u0001.\u0002B&\u0019\u00111Y%\u0003#\rcWo\u001d;fe2Kgn[\"p]\u001aLw-A\u000bdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e|F%Z9\u0015\t\u0005]\u0013\u0011\u001a\u0005\n\u0003?\u001a\u0013\u0011!a\u0001\u0003\u007f\u000b!c\u00197vgR,'\u000fT5oW\u000e{gNZ5hA\u0005)1/\u001a;VaR\u0011\u0011q\u000b\u0015\u0004K\u0005M\u0007\u0003BAk\u0003Cl!!a6\u000b\u0007\u0001\fIN\u0003\u0003\u0002\\\u0006u\u0017a\u00026va&$XM\u001d\u0006\u0005\u0003?\f\u0019!A\u0003kk:LG/\u0003\u0003\u0002d\u0006]'A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000eK\u0002'\u0003S\u0004B!!6\u0002l&!\u0011Q^Al\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\nuKN$X*\u001a;bI\u0006$\u0018\rV8qS\u000e\u001c\bfA\u0014\u0002tB!\u0011Q[A{\u0013\u0011\t90a6\u0003\tQ+7\u000f^\u0001\u0013i\u0016\u001cHOR3uG\",'\u000f\u00165sK\u0006$7\u000fK\u0002)\u0003g\fq\u0003^3ti\u0006#GmU8ve\u000e,\u0007+\u0019:uSRLwN\\:)\u0007%\n\u00190A\buKN$(+Z2p]\u001aLw-\u001e:fQ\rQ\u00131_\u0001\u0019i\u0016\u001cH\u000fU1vg\u0016$g)\u001a;dQ\u0016\u00148\u000b^1siV\u0004\bfA\u0016\u0002t\u00069B/Z:u\u001d>$\u0018NZ=SK\u0006$\u0017PR8s\r\u0016$8\r\u001b\u0015\u0004Y\u0005M\u0018A\u0006;fgR\u001cv.\u001e:dK:{G/\u0011<bS2\f'\r\\3)\u00075\n\u00190\u0001\u0013uKN$\b+\u0019:uSRLwN\\*uCR,7k\\;sG\u0016tu\u000e^!wC&d\u0017M\u00197fQ\rq\u00131_\u0001;i\u0016\u001cH\u000fU1si&$\u0018n\u001c8Ti\u0006$XmU8ve\u000e,gj\u001c;Bm\u0006LG.\u00192mKR{\u0007/[2O_RLe.T3uC\u0012\fG/Y(oG\u0016D3aLAz\u0003\u0001#Xm\u001d;QCJ$\u0018\u000e^5p]N#\u0018\r^3T_V\u00148-\u001a(pi\u00063\u0018-\u001b7bE2,Gk\u001c9jG:{G/\u00138NKR\fG-\u0019;b\r>\u0014H+[7f_V$\bf\u0001\u0019\u0002t\u0006!C/Z:u!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;f)>\u0004\u0018n\u0019(pi&sW*\u001a;bI\u0006$\u0018\rK\u00022\u0003g\fA\u0005^3tiB\u000b'\u000f^5uS>t7\u000b^1uKN{WO]2f)>\u0004\u0018n\u0019#fY\u0016$X\r\u001a\u0015\u0004e\u0005M\u0018A\u000e;fgR\u0004\u0016M\u001d;ji&|gn\u0015;bi\u0016\u001cv.\u001e:dK:{G/\u0011<bS2\f'\r\\3T_V\u00148-\u001a+pa&\u001cG)\u001a7fi\u0016$\u0007fA\u001a\u0002t\u0006Yb/\u001a:jMf\u0004\u0016M\u001d;ji&|gNR1jYV\u0014Xm\u0015;bi\u0016$bA!\f\u00034\tU\u0003c\u0001.\u00030%\u0019!\u0011G%\u0003#A\u000b'\u000f^5uS>t\u0017I\u001c3Ti\u0006$X\rC\u0004\u00036Q\u0002\rAa\u000e\u0002\u0011\u0019\f\u0017\u000e\\;sKN\u0004bA!\u000f\u0003J\t=c\u0002\u0002B\u001e\u0005\u000brAA!\u0010\u0003D5\u0011!q\b\u0006\u0004\u0005\u0003z\u0015A\u0002\u001fs_>$h(C\u0001U\u0013\r\u00119eU\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\u0011YE!\u0014\u0003\t1K7\u000f\u001e\u0006\u0004\u0005\u000f\u001a\u0006c\u0001.\u0003R%\u0019!1K%\u0003#5K'O]8s\r\u0006LG.\u001e:f)f\u0004X\rC\u0004\u0003XQ\u0002\rA!\u0017\u0002/\u0015D\b/Z2u!\u0016\u00148/[:uK:$h)Y5mkJ,\u0007c\u0001*\u0003\\%\u0019!QL*\u0003\u000f\t{w\u000e\\3b]\u0006)C/Z:u'V\u001c7-Z:tMVdG*\u001b8lK\u0012dU-\u00193fe\u0016\u0003xn\u00195Va\u0012\fG/\u001a\u0015\u0004k\u0005M\u0018!\t;fgR4\u0015-\u001b7fI2Kgn[3e\u0019\u0016\fG-\u001a:Fa>\u001c\u0007.\u00169eCR,\u0007f\u0001\u001c\u0002t\u0006aC/Z:u+:,\u0007\u0010]3di\u0016$WI\u001d:pe&sG*\u001b8lK\u0012dU-\u00193fe\u0016\u0003xn\u00195Va\u0012\fG/\u001a\u0015\u0004o\u0005M\u0018!\b<fe&4\u0017\u0010T5oW\u0016$G*Z1eKJ,\u0005o\\2i+B$\u0017\r^3\u0015\r\u0005]#q\u000eB@\u0011\u001d\u0011\t\b\u000fa\u0001\u0005g\n1\"\u001e9eCR,WI\u001d:peB!!Q\u000fB>\u001b\t\u00119HC\u0002\u0003zq\f\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0005\u0005{\u00129H\u0001\u0004FeJ|'o\u001d\u0005\b\u0005\u0003C\u0004\u0019\u0001B-\u0003E)\u0007\u0010]3di2Kgn\u001b$bS2,(/Z\u0001 i\u0016\u001cHoU;dG\u0016\u001c8OZ;m\u0019&t7NR1jYV\u0014X-\u00169eCR,\u0007fA\u001d\u0002t\u0006YB/Z:u\r\u0006LG.\u001a3MS:\\g)Y5mkJ,W\u000b\u001d3bi\u0016D3AOAz\u0003\u0019\"Xm\u001d;V]\u0016D\b/Z2uK\u0012,%O]8s\u0013:d\u0015N\\6GC&dWO]3Va\u0012\fG/\u001a\u0015\u0004w\u0005M\u0018a\u0006<fe&4\u0017\u0010T5oW\u001a\u000b\u0017\u000e\\;sKV\u0003H-\u0019;f)\u0011\t9Fa%\t\u000f\tED\b1\u0001\u0003t\u0005A2M]3bi\u0016d\u0015N\\6GKR\u001c\u0007.\u001a:NC:\fw-\u001a:\u0015\t\u0005}$\u0011\u0014\u0005\b\u00057k\u0004\u0019\u0001BO\u0003\u0015\u0001(o\u001c9t!\r9'qT\u0005\u0004\u0005CC'A\u0003)s_B,'\u000f^5fg\u0006qQ\u000f\u001d3bi\u0016lU\r^1eCR\fGCBA,\u0005O\u0013Y\rC\u0004\u0003*z\u0002\rAa+\u0002\rQ|\u0007/[2t!!\u0011iKa-\u00038\n\u0015WB\u0001BX\u0015\r\u0011\tlU\u0001\u000bG>dG.Z2uS>t\u0017\u0002\u0002B[\u0005_\u00131!T1q!\u0011\u0011IL!1\u000f\t\tm&Q\u0018\t\u0004\u0005{\u0019\u0016b\u0001B`'\u00061\u0001K]3eK\u001aL1A\u001eBb\u0015\r\u0011yl\u0015\t\u0004e\n\u001d\u0017b\u0001Beg\n9\u0011J\u001c;fO\u0016\u0014\bb\u0002Bg}\u0001\u0007\u0011QV\u0001\u0012Y&t7.\u001a3MK\u0006$WM]#q_\u000eD\u0017!C:fiV\u0004Xj\\2l)9\t9Fa5\u0003d\n5(q\u001eBz\u0005oDqA!6@\u0001\u0004\u00119.A\u0005qCJ$\u0018\u000e^5p]B!!\u0011\u001cBp\u001b\t\u0011YNC\u0002\u0003^6\u000bqa\u00197vgR,'/\u0003\u0003\u0003b\nm'!\u0003)beRLG/[8o\u0011\u001d\u0011)o\u0010a\u0001\u0005O\f!\u0001\u001e9\u0011\u0007m\u0014I/C\u0002\u0003lr\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g\u000eC\u0005\u0003N~\u0002\n\u00111\u0001\u0002.\"I!\u0011_ \u0011\u0002\u0003\u0007\u0011QV\u0001\u0010]VlW\t]8dQV\u0003H-\u0019;fg\"I!Q_ \u0011\u0002\u0003\u0007!\u0011L\u0001\u0014G2,\u0017M](gMN,Go\u001d)f]\u0012Lgn\u001a\u0005\n\u0005s|\u0004\u0013!a\u0001\u00053\n!B]3qY\u0006LXj\\2l\u0003M\u0019X\r^;q\u001b>\u001c7\u000e\n3fM\u0006,H\u000e\u001e\u00134+\t\u0011yP\u000b\u0003\u0002.\u000e\u00051FAB\u0002!\u0011\u0019)aa\u0004\u000e\u0005\r\u001d!\u0002BB\u0005\u0007\u0017\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\r51+\u0001\u0006b]:|G/\u0019;j_:LAa!\u0005\u0004\b\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002'M,G/\u001e9N_\u000e\\G\u0005Z3gCVdG\u000f\n\u001b\u0002'M,G/\u001e9N_\u000e\\G\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\re!\u0006\u0002B-\u0007\u0003\t1c]3ukBlunY6%I\u00164\u0017-\u001e7uIY\nac]3ukB4U\r^2iKJ$\u0006N]3bI6{7m\u001b\u000b\u0007\u0003/\u001a\tca\u000b\t\u000f\r\rB\t1\u0001\u0004&\u0005ia-\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012\u00042AWB\u0014\u0013\r\u0019I#\u0013\u0002\u0019\u00072,8\u000f^3s\u0019&t7NR3uG\",'\u000f\u00165sK\u0006$\u0007\"CB\u0017\tB\u0005\t\u0019AB\u0018\u0003)\u0001\u0018M\u001d;ji&|gn\u001d\t\u0007\u0005s\u001b\tDa:\n\t\rM\"1\u0019\u0002\u0004'\u0016$\u0018\u0001I:fiV\u0004h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012lunY6%I\u00164\u0017-\u001e7uII*\"a!\u000f+\t\r=2\u0011A\u0001\u000f[\u0016$\u0018\rZ1uCR{\u0007/[2t+\t\u0019y\u0004E\u0003\u0004B\r\u001d\u0013/\u0004\u0002\u0004D)!1Q\tBX\u0003%IW.\\;uC\ndW-\u0003\u0003\u00044\r\r\u0013!F7fi\u0006$\u0017\r^1SK\u001a\u0014Xm\u001d5UQJ,\u0017\rZ\u000b\u0003\u0007\u001b\u00022AWB(\u0013\r\u0019\t&\u0013\u0002\u001a\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/\u0019+ie\u0016\fG\r")
public class ClusterLinkFetcherManagerTest {
    private final ApiVersion ibp = ApiVersion$.MODULE$.latestVersion();
    private final UUID kafka$server$link$ClusterLinkFetcherManagerTest$$linkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkFetcherManagerTest$$linkName;
    private final Uuid sourceTopicId = Uuid.randomUuid();
    private final ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherManagerTest$$metrics = new ClusterLinkMetrics(this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId(), (LinkMode)LinkMode.Destination$.MODULE$, null, (Option)None$.MODULE$, new Metrics(), (Option)None$.MODULE$);
    private final MockTime kafka$server$link$ClusterLinkFetcherManagerTest$$time = new MockTime();
    private final ReplicaManager kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
    private final AbstractLog log = (AbstractLog)EasyMock.createNiceMock(AbstractLog.class);
    private KafkaConfig kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig;
    private final ClusterLinkManager kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager = (ClusterLinkManager)EasyMock.createNiceMock(ClusterLinkManager.class);
    private final ClusterLinkDestConnectionManager kafka$server$link$ClusterLinkFetcherManagerTest$$connManager = (ClusterLinkDestConnectionManager)EasyMock.createNiceMock(ClusterLinkDestConnectionManager.class);
    private ClusterLinkFetcherManager kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager;
    private Admin kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient;
    private int kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions = 2;
    private ClusterLinkConfig kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig;

    public ApiVersion ibp() {
        return this.ibp;
    }

    public UUID kafka$server$link$ClusterLinkFetcherManagerTest$$linkId() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId;
    }

    public String kafka$server$link$ClusterLinkFetcherManagerTest$$linkName() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName;
    }

    private Uuid sourceTopicId() {
        return this.sourceTopicId;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkFetcherManagerTest$$metrics() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics;
    }

    public MockTime kafka$server$link$ClusterLinkFetcherManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$time;
    }

    public ReplicaManager kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager;
    }

    private AbstractLog log() {
        return this.log;
    }

    public KafkaConfig kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig_$eq(KafkaConfig x$1) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig = x$1;
    }

    public ClusterLinkManager kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager;
    }

    public ClusterLinkDestConnectionManager kafka$server$link$ClusterLinkFetcherManagerTest$$connManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$connManager;
    }

    public ClusterLinkFetcherManager kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(ClusterLinkFetcherManager x$1) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager = x$1;
    }

    public Admin kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient_$eq(Admin x$1) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient = x$1;
    }

    public int kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(int x$1) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions = x$1;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig() {
        return this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig;
    }

    private void kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig_$eq(ClusterLinkConfig x$1) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig = x$1;
    }

    @BeforeEach
    public void setUp() {
        Properties brokerProps = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        brokerProps.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), this.ibp().shortVersion());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig_$eq(KafkaConfig$.MODULE$.fromProps(brokerProps));
        Properties linkProps = new Properties();
        linkProps.put("bootstrap.servers", "localhost:1234");
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics().startup();
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient_$eq((Admin)EasyMock.createNiceMock(Admin.class));
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(this.createLinkFetcherManager(linkProps));
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().initializeMetadata();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)this.log().localLogEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager().fetchResponseSize((ClusterLinkConfig)EasyMock.anyObject())).andReturn((Object)new FetchResponseSize(10, 10)).anyTimes();
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$connManager().reverseConnectionProvider((NetworkClient)EasyMock.anyObject(), (Option)EasyMock.anyObject(), (String)EasyMock.anyObject())).andReturn((Object)None$.MODULE$).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.log(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$connManager()});
    }

    @AfterEach
    public void tearDown() {
        if (this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager() != null) {
            this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdown();
        }
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics().shutdown();
    }

    @Test
    public void testMetadataTopics() {
        String topic1 = "testTopic1";
        TopicPartition tp1_0 = new TopicPartition(topic1, 0);
        Partition partition1_0 = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition1_0, tp1_0, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition1_0})));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1})), this.metadataTopics());
        Assertions.assertEquals((long)0L, (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds()));
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp1_0})), true);
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1})), this.metadataTopics());
        String topic2 = "testTopic2";
        TopicPartition tp2_4 = new TopicPartition(topic2, 4);
        Partition partition2_4 = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition2_4, tp2_4, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition2_4})));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1, topic2})), this.metadataTopics());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp1_0})), false);
        Assertions.assertEquals(Collections.singletonList(topic2), (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics());
        TopicPartition tp1_1 = new TopicPartition(topic1, 1);
        Partition partition1_1 = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition1_1, tp1_1, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition1_1})));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1, topic2})), this.metadataTopics());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition1_0})));
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics().size());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1, topic2})), this.metadataTopics());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp1_0})), false);
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic1, topic2})), this.metadataTopics());
    }

    @Test
    public void testFetcherThreads() {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition, tp, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(tp));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), this.metadataTopics());
        Map topics = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(2))}));
        this.setupMock(partition, tp, 1, 1, this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 5);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        EasyMock.verify((Object[])new Object[]{partition});
        this.setupMock(partition, tp, 5, 0, this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 5);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        EasyMock.verify((Object[])new Object[]{partition});
        this.setupMock(partition, tp, 5, 1, this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 6);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        EasyMock.verify((Object[])new Object[]{partition});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp})), true);
        Assertions.assertEquals(Collections.singletonList(topic), (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdownIdleFetcherThreads();
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        EasyMock.verify((Object[])new Object[]{partition});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        this.setupMock(partition, tp, 6, 0, this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 6);
        Assertions.assertTrue((boolean)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(tp).nonEmpty());
        EasyMock.verify((Object[])new Object[]{partition});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdown();
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(null);
    }

    @Test
    public void testAddSourcePartitions() {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition, tp, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        CreatePartitionsResult createPartitionsResult = (CreatePartitionsResult)EasyMock.createNiceMock(CreatePartitionsResult.class);
        EasyMock.expect((Object)createPartitionsResult.values()).andReturn(Collections.singletonMap(topic, KafkaFuture.completedFuture(null))).anyTimes();
        Capture capturedRequests = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient().createPartitions((java.util.Map)EasyMock.capture((Capture)capturedRequests))).andReturn((Object)createPartitionsResult).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient(), createPartitionsResult});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(1);
        Integer numSourcePartitions = Predef$.MODULE$.int2Integer(1);
        int sourceEpoch = 1;
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)numSourcePartitions)}))), sourceEpoch);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        numSourcePartitions = Predef$.MODULE$.int2Integer(4);
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)numSourcePartitions)}))), sourceEpoch);
        Assertions.assertEquals((int)1, (int)capturedRequests.getValues().size());
        java.util.Map captured1 = (java.util.Map)capturedRequests.getValues().get(0);
        Assertions.assertEquals((int)1, (int)captured1.size());
        Assertions.assertEquals((int)4, (int)((NewPartitions)captured1.get(topic)).totalCount());
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)numSourcePartitions)}))), sourceEpoch);
        Assertions.assertEquals((int)2, (int)capturedRequests.getValues().size());
        java.util.Map captured2 = (java.util.Map)capturedRequests.getValues().get(1);
        Assertions.assertEquals((int)1, (int)captured2.size());
        Assertions.assertEquals((int)4, (int)((NewPartitions)captured2.get(topic)).totalCount());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(4);
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)numSourcePartitions)}))), sourceEpoch);
        Assertions.assertEquals((int)2, (int)capturedRequests.getValues().size());
    }

    @Test
    public void testReconfigure() {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition, tp, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startMetadataThread();
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(tp));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), this.metadataTopics());
        Map topics = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(2))}));
        this.setupMock(partition, tp, 2, 1, this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 2);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        ClusterLinkFetcherThread fetcherThread1 = (ClusterLinkFetcherThread)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head();
        ClusterLinkMetadata metadata1 = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata();
        ClusterLinkMetadataThread metadataThread1 = this.metadataRefreshThread();
        ClusterLinkNetworkClient metadataClient1 = metadataThread1.clusterLinkClient();
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(fetcherThread1, (scala.collection.immutable.Set<TopicPartition>)((scala.collection.immutable.Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{new TopicPartition(topic, 0)}))));
        ClusterLinkNetworkClient fetcherClient1 = fetcherThread1.clusterLinkClient();
        fetcherClient1.reconfigure((java.util.Map)EasyMock.anyObject());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(1);
        EasyMock.replay((Object[])new Object[]{fetcherClient1});
        HashMap<String, String> newDynamicProps = new HashMap<String, String>();
        newDynamicProps.putAll(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        newDynamicProps.put("ssl.truststore.location", "truststore.jks");
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(newDynamicProps), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"ssl.truststore.location"})));
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertSame((Object)fetcherThread1, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head());
        Assertions.assertSame((Object)metadata1, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata());
        EasyMock.verify((Object[])new Object[]{fetcherClient1});
        HashMap newPeriodicMigrationProps = new HashMap();
        newDynamicProps.putAll(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        newDynamicProps.put(ClusterLinkConfig$.MODULE$.AclSyncMsProp(), "120000");
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(newPeriodicMigrationProps), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.AclSyncMsProp()})));
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertSame((Object)fetcherThread1, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head());
        Assertions.assertSame((Object)metadata1, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata());
        EasyMock.verify((Object[])new Object[]{fetcherClient1});
        HashMap<String, String> newNonDynamicProps = new HashMap<String, String>();
        newNonDynamicProps.putAll(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        newNonDynamicProps.put("bootstrap.servers", "localhost:5678");
        EasyMock.reset((Object[])new Object[]{fetcherThread1.clusterLinkClient()});
        fetcherClient1.close();
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.replay((Object[])new Object[]{fetcherClient1});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(newNonDynamicProps), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"bootstrap.servers"})));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertNotSame((Object)metadata1, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), this.metadataTopics());
        this.updateMetadata((Map<String, Integer>)topics, 2);
        Assertions.assertNotSame((Object)fetcherThread1, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head());
        Assertions.assertFalse((boolean)metadataClient1.networkClient().active(), (String)"Metadata client not closed");
        ClusterLinkMetadataThread metadataThread2 = this.metadataRefreshThread();
        Assertions.assertNotSame((Object)metadataThread1, (Object)metadataThread2);
        Assertions.assertNotSame((Object)metadataClient1, (Object)metadataThread2.clusterLinkClient());
        Assertions.assertTrue((boolean)metadataThread2.clusterLinkClient().networkClient().active(), (String)"Metadata client not active");
        EasyMock.verify((Object[])new Object[]{fetcherClient1});
        ClusterLinkFetcherThread fetcherThread2 = (ClusterLinkFetcherThread)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().values().head();
        ClusterLinkNetworkClient metadataClient2 = metadataThread2.clusterLinkClient();
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(fetcherThread2, (scala.collection.immutable.Set<TopicPartition>)((scala.collection.immutable.Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{new TopicPartition(topic, 0)}))));
        ClusterLinkNetworkClient fetcherClient2 = fetcherThread2.clusterLinkClient();
        fetcherClient2.reconfigure((java.util.Map)EasyMock.anyObject());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(1);
        EasyMock.replay((Object[])new Object[]{fetcherClient2});
        HashMap<String, String> pausedProps = new HashMap<String, String>();
        pausedProps.putAll(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        pausedProps.put(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        EasyMock.reset((Object[])new Object[]{fetcherThread2.clusterLinkClient()});
        fetcherClient2.close();
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.replay((Object[])new Object[]{fetcherClient2});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(pausedProps), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertFalse((boolean)metadataClient2.networkClient().active(), (String)"Metadata client not closed");
        EasyMock.verify((Object[])new Object[]{fetcherClient2});
        HashMap<String, String> newNonDynamicButPausedProps = new HashMap<String, String>();
        newNonDynamicButPausedProps.putAll(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        newNonDynamicButPausedProps.put("bootstrap.servers", "localhost:6789");
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(newNonDynamicButPausedProps), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"bootstrap.servers"})));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        EasyMock.verify((Object[])new Object[]{fetcherClient2});
        HashMap<String, String> unpausedProps = new HashMap<String, String>();
        unpausedProps.putAll(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentConfig().originalsStrings());
        unpausedProps.put(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "false");
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().reconfigure(ClusterLinkConfig$.MODULE$.create(unpausedProps), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), this.metadataTopics());
        ClusterLinkMetadataThread metadataThread3 = this.metadataRefreshThread();
        Assertions.assertNotSame((Object)metadataThread2, (Object)metadataThread3);
        Assertions.assertTrue((boolean)metadataThread3.clusterLinkClient().networkClient().active(), (String)"Metadata client not active");
        EasyMock.verify((Object[])new Object[]{fetcherClient2});
    }

    @Test
    public void testPausedFetcherStartup() {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().shutdown();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:1234");
        props.put(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager_$eq(this.createLinkFetcherManager(props));
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startup();
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        boolean x$3 = true;
        int x$4 = this.setupMock$default$3();
        int x$5 = this.setupMock$default$4();
        boolean x$6 = this.setupMock$default$6();
        this.setupMock(partition, tp, x$4, x$5, x$3, x$6);
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().removeLinkedFetcherForPartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{tp})), false);
        CreatePartitionsResult createPartitionsResult = (CreatePartitionsResult)EasyMock.createNiceMock(CreatePartitionsResult.class);
        EasyMock.expect((Object)createPartitionsResult.values()).andReturn(Collections.singletonMap(topic, KafkaFuture.completedFuture(null))).anyTimes();
        Capture capturedRequests = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient().createPartitions((java.util.Map)EasyMock.capture((Capture)capturedRequests))).andReturn((Object)createPartitionsResult).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager(), this.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient(), createPartitionsResult});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
    }

    @Test
    public void testNotifyReadyForFetch() {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition, tp, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(tp));
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        IntRef notificationCount = IntRef.create((int)0);
        ClusterLinkFetcherThread fetcherThread = new ClusterLinkFetcherThread(this, tp, notificationCount){
            private final TopicPartition tp$1;
            private final IntRef notificationCount$1;

            public Set<TopicPartition> partitions() {
                return (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.tp$1}));
            }

            public void notifyReadyForFetch() {
                super.notifyReadyForFetch();
                ++this.notificationCount$1.elem;
            }
            {
                this.tp$1 = tp$1;
                this.notificationCount$1 = notificationCount$1;
                super("thread-0", 0, $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig(), new ClusterLinkMetadata($outer.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId(), (LinkMode)LinkMode.Destination$.MODULE$, 100L, 60000L), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), new BrokerEndPoint(0, "localhost", 1000), new FailedPartitions(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager(), null, $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics(), (Time)new MockTime(), (Function0)new scala.Serializable(null){
                    public static final long serialVersionUID = 0L;

                    public final FetchResponseSize apply() {
                        return new FetchResponseSize(100, 1000);
                    }
                }, (ClusterLinkNetworkClient)EasyMock.mock(ClusterLinkNetworkClient.class), (BlockingSend)EasyMock.mock(BlockingSend.class), (Option)None$.MODULE$, (Option)None$.MODULE$);
            }
        };
        Map topics = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(2))}));
        this.setupMock(partition, tp, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 1);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().put(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().keySet().head(), (Object)fetcherThread);
        this.setupLog$1(10L, 5L);
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        Assertions.assertEquals((int)0, (int)notificationCount.elem);
        this.setupLog$1(10L, 10L);
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().maybeNotifyReadyForFetch(partition);
        Assertions.assertEquals((int)1, (int)notificationCount.elem);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testSourceNotAvailable() {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        this.setupMock(partition, tp, this.setupMock$default$3(), this.setupMock$default$4(), this.setupMock$default$5(), this.setupMock$default$6());
        AtomicReference currentCluster = new AtomicReference();
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startMetadataThread();
        this.metadataRefreshThread().addListener(new MetadataListener(this, currentCluster){
            private final /* synthetic */ ClusterLinkFetcherManagerTest $outer;
            private final AtomicReference currentCluster$1;

            public void onMetadataFailure(Exception e) {
                MetadataListener.onMetadataFailure$((MetadataListener)this, (Exception)e);
            }

            public final void onNewMetadata(Cluster newCluster) {
                ClusterLinkFetcherManagerTest.kafka$server$link$ClusterLinkFetcherManagerTest$$$anonfun$testSourceNotAvailable$1(newCluster, this.currentCluster$1);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.currentCluster$1 = currentCluster$1;
                MetadataListener.$init$((MetadataListener)this);
            }
        });
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        Assertions.assertEquals((Object)None$.MODULE$, (Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().getFetcher(tp));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), this.metadataTopics());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkFetcherManagerTest.$anonfun$testSourceNotAvailable$3(currentCluster)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkFetcherManagerTest.$anonfun$testSourceNotAvailable$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Map topics = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(2))}));
        this.setupMock(partition, tp, 2, 1, this.setupMock$default$5(), this.setupMock$default$6());
        this.updateMetadata((Map<String, Integer>)topics, 2);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ClusterLinkFetcherManagerTest.$anonfun$testSourceNotAvailable$5(currentCluster)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkFetcherManagerTest.$anonfun$testSourceNotAvailable$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().onAvailabilityChange(false);
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        this.updateMetadata((Map<String, Integer>)topics, 2);
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().onAvailabilityChange(true);
        long l5 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l6 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (!ClusterLinkFetcherManagerTest.$anonfun$testSourceNotAvailable$7(currentCluster)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkFetcherManagerTest.$anonfun$testSourceNotAvailable$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    @Test
    public void testPartitionStateSourceNotAvailable() {
        PartitionAndState state = this.verifyPartitionFailureState((List<MirrorFailureType>)new .colon.colon((Object)MirrorFailureType.LinkNotAvailable$.MODULE$, (List)new .colon.colon((Object)MirrorFailureType.LinkNotAvailable$.MODULE$, (List)Nil$.MODULE$)), false);
        Assertions.assertEquals((Object)new Some((Object)MirrorFailureType.LinkNotAvailable$.MODULE$), (Object)state.apiFailureType());
        Assertions.assertNotEquals((long)0L, (long)state.failureStartMs().get());
        long timeSinceFirstFailure = this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - state.failureStartMs().get();
        Assertions.assertTrue((timeSinceFirstFailure >= (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs() ? 1 : 0) != 0, (String)new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(timeSinceFirstFailure).toString());
    }

    @Test
    public void testPartitionStateSourceNotAvailableTopicNotInMetadataOnce() {
        PartitionAndState state = this.verifyPartitionFailureState((List<MirrorFailureType>)new .colon.colon((Object)MirrorFailureType.LinkNotAvailable$.MODULE$, (List)new .colon.colon((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$, (List)Nil$.MODULE$)), false);
        Assertions.assertEquals((Object)None$.MODULE$, (Object)state.apiFailureType());
        Assertions.assertNotEquals((long)0L, (long)state.failureStartMs().get());
        long timeSinceFirstFailure = this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - state.failureStartMs().get();
        Assertions.assertTrue((timeSinceFirstFailure < (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs() ? 1 : 0) != 0, (String)new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(timeSinceFirstFailure).toString());
    }

    @Test
    public void testPartitionStateSourceNotAvailableTopicNotInMetadataForTimeout() {
        PartitionAndState state = this.verifyPartitionFailureState((List<MirrorFailureType>)new .colon.colon((Object)MirrorFailureType.LinkNotAvailable$.MODULE$, (List)new .colon.colon((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$, (List)new .colon.colon((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$, (List)Nil$.MODULE$))), true);
        Assertions.assertEquals((Object)new Some((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$), (Object)state.apiFailureType());
        Assertions.assertNotEquals((long)0L, (long)state.failureStartMs().get());
        long timeSinceFirstFailure = this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - state.failureStartMs().get();
        Assertions.assertTrue((timeSinceFirstFailure >= (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs() ? 1 : 0) != 0, (String)new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(timeSinceFirstFailure).toString());
    }

    @Test
    public void testPartitionStateTopicNotInMetadata() {
        PartitionAndState state = this.verifyPartitionFailureState((List<MirrorFailureType>)new .colon.colon((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$, (List)new .colon.colon((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$, (List)Nil$.MODULE$)), true);
        Assertions.assertEquals((Object)new Some((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$), (Object)state.apiFailureType());
        Assertions.assertNotEquals((long)0L, (long)state.failureStartMs().get());
        long timeSinceFirstFailure = this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - state.failureStartMs().get();
        Assertions.assertTrue((timeSinceFirstFailure >= (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs() ? 1 : 0) != 0, (String)new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(timeSinceFirstFailure).toString());
    }

    @Test
    public void testPartitionStateSourceTopicDeleted() {
        PartitionAndState state = this.verifyPartitionFailureState((List<MirrorFailureType>)new .colon.colon((Object)MirrorFailureType.SourceTopicDeleted$.MODULE$, (List)Nil$.MODULE$), true);
        Assertions.assertEquals((Object)new Some((Object)MirrorFailureType.SourceTopicDeleted$.MODULE$), (Object)state.apiFailureType());
        Assertions.assertNotEquals((long)0L, (long)state.failureStartMs().get());
        long timeSinceFirstFailure = this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - state.failureStartMs().get();
        Assertions.assertTrue((timeSinceFirstFailure < (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs() ? 1 : 0) != 0, (String)new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(timeSinceFirstFailure).toString());
    }

    @Test
    public void testPartitionStateSourceNotAvailableSourceTopicDeleted() {
        PartitionAndState state = this.verifyPartitionFailureState((List<MirrorFailureType>)new .colon.colon((Object)MirrorFailureType.SourceTopicUnavailable$.MODULE$, (List)new .colon.colon((Object)MirrorFailureType.SourceTopicDeleted$.MODULE$, (List)Nil$.MODULE$)), true);
        Assertions.assertEquals((Object)new Some((Object)MirrorFailureType.SourceTopicDeleted$.MODULE$), (Object)state.apiFailureType());
        Assertions.assertNotEquals((long)0L, (long)state.failureStartMs().get());
        long timeSinceFirstFailure = this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds() - state.failureStartMs().get();
        Assertions.assertTrue((timeSinceFirstFailure >= (long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs() ? 1 : 0) != 0, (String)new StringBuilder(34).append("Incorrect timeSinceFirstFailure:  ").append(timeSinceFirstFailure).toString());
    }

    /*
     * WARNING - void declaration
     */
    private PartitionAndState verifyPartitionFailureState(List<MirrorFailureType> failures, boolean expectPersistentFailure) {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        boolean x$3 = false;
        int x$4 = this.setupMock$default$3();
        int x$5 = this.setupMock$default$4();
        boolean x$6 = this.setupMock$default$5();
        this.setupMock(partition, tp, x$4, x$5, x$6, x$3);
        if (expectPersistentFailure) {
            partition.failClusterLink((Function1)EasyMock.anyObject());
            EasyMock.expect((Object)BoxedUnit.UNIT).once();
        }
        EasyMock.replay((Object[])new Object[]{partition});
        ConcurrentHashMap linkedPartitions = (ConcurrentHashMap)TestUtils.fieldValue((Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), ClusterLinkFetcherManager.class, (String)"linkedPartitions");
        AtomicReference currentCluster = new AtomicReference();
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().startMetadataThread();
        Assertions.assertEquals(Collections.emptyMap(), (Object)linkedPartitions);
        this.metadataRefreshThread().addListener(new MetadataListener(this, currentCluster){
            private final /* synthetic */ ClusterLinkFetcherManagerTest $outer;
            private final AtomicReference currentCluster$2;

            public void onMetadataFailure(Exception e) {
                MetadataListener.onMetadataFailure$((MetadataListener)this, (Exception)e);
            }

            public final void onNewMetadata(Cluster newCluster) {
                ClusterLinkFetcherManagerTest.kafka$server$link$ClusterLinkFetcherManagerTest$$$anonfun$verifyPartitionFailureState$1(newCluster, this.currentCluster$2);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.currentCluster$2 = currentCluster$2;
                MetadataListener.$init$((MetadataListener)this);
            }
        });
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic})), this.metadataTopics());
        Assertions.assertEquals(Collections.singleton(tp), (Object)linkedPartitions.keySet());
        Assertions.assertEquals((Object)None$.MODULE$, (Object)((PartitionAndState)linkedPartitions.get(tp)).apiFailureType());
        Assertions.assertEquals((long)0L, (long)((PartitionAndState)linkedPartitions.get(tp)).failureStartMs().get());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkFetcherManagerTest.$anonfun$verifyPartitionFailureState$3(currentCluster)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkFetcherManagerTest.$anonfun$verifyPartitionFailureState$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        failures.foreach((Function1 & Serializable & scala.Serializable)failureType -> {
            ClusterLinkFetcherManagerTest.$anonfun$verifyPartitionFailureState$5(this, tp, linkedPartitions, failureType);
            return BoxedUnit.UNIT;
        });
        return (PartitionAndState)linkedPartitions.get(tp);
    }

    @Test
    public void testSuccessfulLinkedLeaderEpochUpdate() {
        this.verifyLinkedLeaderEpochUpdate(Errors.NONE, false);
    }

    @Test
    public void testFailedLinkedLeaderEpochUpdate() {
        this.verifyLinkedLeaderEpochUpdate(Errors.OPERATION_NOT_ATTEMPTED, false);
    }

    @Test
    public void testUnexpectedErrorInLinkedLeaderEpochUpdate() {
        this.verifyLinkedLeaderEpochUpdate(Errors.UNKNOWN_SERVER_ERROR, true);
    }

    private void verifyLinkedLeaderEpochUpdate(Errors updateError, boolean expectLinkFailure) {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        Capture capturedRequests = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        boolean x$3 = false;
        int x$4 = this.setupMock$default$3();
        int x$5 = this.setupMock$default$4();
        boolean x$6 = this.setupMock$default$5();
        this.setupMock(partition, tp, x$4, x$5, x$6, x$3);
        partition.updateLinkedLeaderEpoch(EasyMock.anyInt(), (Function1)EasyMock.capture((Capture)capturedRequests));
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        partition.linkedLeaderOffsetsPending(true);
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        partition.linkedLeaderOffsetsPending(true);
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        if (expectLinkFailure) {
            partition.failClusterLink((Function1)EasyMock.anyObject());
            EasyMock.expect((Object)BoxedUnit.UNIT).once();
        }
        EasyMock.replay((Object[])new Object[]{partition});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(1);
        int sourceEpoch = 1;
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions()))}))), sourceEpoch);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        sourceEpoch = 5;
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions()))}))), sourceEpoch);
        long nextMetadataUpdate = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds());
        Assertions.assertTrue((nextMetadataUpdate > 1000L ? 1 : 0) != 0, (String)new StringBuilder(34).append("Unnecessary metadata update after ").append(nextMetadataUpdate).toString());
        ((Function1)capturedRequests.getValue()).apply((Object)updateError);
        nextMetadataUpdate = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds());
        Errors errors = updateError;
        Errors errors2 = Errors.NONE;
        if (!(errors == null ? errors2 != null : !errors.equals(errors2)) || expectLinkFailure) {
            Assertions.assertTrue((nextMetadataUpdate > 1000L ? 1 : 0) != 0, (String)new StringBuilder(34).append("Unnecessary metadata update after ").append(nextMetadataUpdate).toString());
            return;
        }
        Assertions.assertEquals((Long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().metadataRefreshBackoffMs(), (long)nextMetadataUpdate);
    }

    @Test
    public void testSuccessfulLinkFailureUpdate() {
        this.verifyLinkFailureUpdate(Errors.NONE);
    }

    @Test
    public void testFailedLinkFailureUpdate() {
        this.verifyLinkFailureUpdate(Errors.OPERATION_NOT_ATTEMPTED);
    }

    @Test
    public void testUnexpectedErrorInLinkFailureUpdate() {
        this.verifyLinkFailureUpdate(Errors.UNKNOWN_SERVER_ERROR);
    }

    private void verifyLinkFailureUpdate(Errors updateError) {
        String topic = "testTopic";
        TopicPartition tp = new TopicPartition(topic, 0);
        Partition partition = (Partition)EasyMock.mock(Partition.class);
        Capture capturedRequests = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        boolean x$3 = false;
        int x$4 = this.setupMock$default$3();
        int x$5 = this.setupMock$default$4();
        boolean x$6 = this.setupMock$default$5();
        this.setupMock(partition, tp, x$4, x$5, x$6, x$3);
        partition.failClusterLink((Function1)EasyMock.capture((Capture)capturedRequests));
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.replay((Object[])new Object[]{partition});
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions_$eq(1);
        int sourceEpoch = 1;
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().addLinkedFetcherForPartitions((Iterable)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Partition[]{partition})));
        this.updateMetadata((Map<String, Integer>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)Predef$.MODULE$.int2Integer(this.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions()))}))), sourceEpoch);
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().fetcherThreadMap().size());
        this.updateMetadata((Map<String, Integer>)Map$.MODULE$.empty(), -1);
        long nextMetadataUpdate = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds());
        Assertions.assertEquals((Long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().metadataRefreshBackoffMs(), (long)nextMetadataUpdate);
        Assertions.assertEquals(Collections.emptyList(), (Object)capturedRequests.getValues());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().sleep((long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs());
        this.updateMetadata((Map<String, Integer>)Map$.MODULE$.empty(), -1);
        Assertions.assertEquals((int)1, (int)capturedRequests.getValues().size());
        ((Function1)capturedRequests.getValue()).apply((Object)updateError);
        nextMetadataUpdate = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().timeToNextUpdate(this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds());
        Assertions.assertEquals((Long)this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().metadataRefreshBackoffMs(), (long)nextMetadataUpdate);
    }

    private ClusterLinkFetcherManager createLinkFetcherManager(Properties props) {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig_$eq(ClusterLinkConfig$.MODULE$.create((java.util.Map)props));
        return new ClusterLinkFetcherManager(this){
            private final /* synthetic */ ClusterLinkFetcherManagerTest $outer;

            public ClusterLinkFetcherThread createFetcherThread(int fetcherId, BrokerEndPoint sourceBroker, FetcherPool fetcherPool) {
                ClusterLinkFetcherThread thread = (ClusterLinkFetcherThread)EasyMock.createNiceMock(ClusterLinkFetcherThread.class);
                this.$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(thread, this.$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock$default$2());
                return thread;
            }

            public int partitionCount(String topic) {
                return this.$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$numPartitions();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super($outer.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$linkId(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$linkManager(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$connManager(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$replicaManager(), $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$destAdminClient(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, $outer.kafka$server$link$ClusterLinkFetcherManagerTest$$metrics(), (Option)None$.MODULE$, (Time)$outer.kafka$server$link$ClusterLinkFetcherManagerTest$$time(), ClusterLinkFetcherManager$.MODULE$.$lessinit$greater$default$13());
            }
        };
    }

    private void updateMetadata(Map<String, Integer> topics, int linkedLeaderEpoch) {
        ClusterLinkMetadata metadata = this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata();
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith((String)"sourceCluster", (int)1, Collections.emptyMap(), (java.util.Map)((java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(topics).asJava()), x$1 -> Predef$.MODULE$.int2Integer(linkedLeaderEpoch), MetadataResponse.PartitionMetadata::new, (short)ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
        metadata.update(metadata.updateVersion(), metadataResponse, false, this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().milliseconds());
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().onNewMetadata(TestUtils.clusterWith((int)1, (java.util.Map)((java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(topics).asJava())));
    }

    private void setupMock(Partition partition, TopicPartition tp, int linkedLeaderEpoch, int numEpochUpdates, boolean clearOffsetsPending, boolean replayMock) {
        EasyMock.reset((Object[])new Object[]{partition});
        EasyMock.expect((Object)partition.topicPartition()).andReturn((Object)tp).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)partition.isActiveLinkDestinationLeader())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)true)).anyTimes();
        EasyMock.expect((Object)partition.getLinkedTopicId()).andReturn((Object)this.sourceTopicId()).anyTimes();
        EasyMock.expect((Object)partition.getLinkedLeaderEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)linkedLeaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)partition.getLeaderEpoch())).andReturn((Object)BoxesRunTime.boxToInteger((int)10)).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)this.log()).anyTimes();
        EasyMock.expect((Object)partition.leaderLogIfLocal()).andReturn((Object)new Some((Object)this.log())).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)partition.isUnderMinIsr())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)false)).anyTimes();
        partition.truncateTo(0L, false);
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        if (numEpochUpdates > 0) {
            partition.updateLinkedLeaderEpoch(EasyMock.anyInt(), (Function1)EasyMock.anyObject());
            EasyMock.expect((Object)BoxedUnit.UNIT).times(numEpochUpdates);
        }
        if (ApiVersion$.MODULE$.isTruncationOnFetchSupported(this.ibp())) {
            partition.linkedLeaderOffsetsPending(false);
            EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        } else if (numEpochUpdates > 0) {
            partition.linkedLeaderOffsetsPending(true);
            EasyMock.expect((Object)BoxedUnit.UNIT).times(numEpochUpdates);
        } else if (clearOffsetsPending) {
            partition.linkedLeaderOffsetsPending(false);
            EasyMock.expect((Object)BoxedUnit.UNIT).once();
        }
        if (replayMock) {
            EasyMock.replay((Object[])new Object[]{partition});
        }
    }

    private int setupMock$default$3() {
        return 1;
    }

    private int setupMock$default$4() {
        return 0;
    }

    private boolean setupMock$default$5() {
        return false;
    }

    private boolean setupMock$default$6() {
        return true;
    }

    public void kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock(ClusterLinkFetcherThread fetcherThread, scala.collection.immutable.Set<TopicPartition> partitions) {
        EasyMock.reset((Object[])new Object[]{fetcherThread});
        InitialFetchState initialFetchState = (InitialFetchState)EasyMock.createNiceMock(InitialFetchState.class);
        scala.collection.immutable.Map partitionAndOffsets = ((TraversableOnce)partitions.map((Function1 & Serializable & scala.Serializable)x$2 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$2), (Object)initialFetchState), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        EasyMock.expect((Object)fetcherThread.partitionsAndOffsets()).andReturn((Object)partitionAndOffsets).anyTimes();
        PartitionFetchState fetchState = (PartitionFetchState)EasyMock.createNiceMock(PartitionFetchState.class);
        EasyMock.expect((Object)fetcherThread.fetchState((TopicPartition)EasyMock.anyObject())).andReturn((Object)new Some((Object)fetchState)).anyTimes();
        ClusterLinkNetworkClient fetcherClient = (ClusterLinkNetworkClient)EasyMock.createNiceMock(ClusterLinkNetworkClient.class);
        EasyMock.expect((Object)fetcherThread.clusterLinkClient()).andReturn((Object)fetcherClient).anyTimes();
        fetcherThread.shutdown();
        EasyMock.expect((Object)BoxedUnit.UNIT).andAnswer(() -> {
            fetcherClient.close();
            return BoxedUnit.UNIT;
        }).anyTimes();
        Capture removedPartitions = EasyMock.newCapture();
        EasyMock.expect((Object)fetcherThread.removePartitions((Set)EasyMock.capture((Capture)removedPartitions))).andAnswer(() -> ((TraversableOnce)((SetLike)removedPartitions.getValue()).map((Function1 & Serializable & scala.Serializable)tp -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)fetchState), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        EasyMock.replay((Object[])new Object[]{fetcherThread});
    }

    public scala.collection.immutable.Set<TopicPartition> kafka$server$link$ClusterLinkFetcherManagerTest$$setupFetcherThreadMock$default$2() {
        return Predef$.MODULE$.Set().empty();
    }

    private scala.collection.immutable.Set<String> metadataTopics() {
        return ((TraversableOnce)CollectionConverters$.MODULE$.asScalaBufferConverter(this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager().currentMetadata().newMetadataRequestBuilder().topics()).asScala()).toSet();
    }

    private ClusterLinkMetadataThread metadataRefreshThread() {
        return (ClusterLinkMetadataThread)TestUtils.fieldValue((Object)this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager(), ClusterLinkFetcherManager.class, (String)"metadataRefreshThread");
    }

    private final void setupLog$1(long leo, long hwm) {
        EasyMock.reset((Object[])new Object[]{this.log()});
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)this.log().logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)leo)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)this.log().highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)hwm)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.log()});
    }

    public static final /* synthetic */ void kafka$server$link$ClusterLinkFetcherManagerTest$$$anonfun$testSourceNotAvailable$1(Cluster cluster, AtomicReference currentCluster$1) {
        currentCluster$1.set(cluster);
    }

    public static final /* synthetic */ boolean $anonfun$testSourceNotAvailable$3(AtomicReference currentCluster$1) {
        return currentCluster$1.getAndSet(null) != null;
    }

    public static final /* synthetic */ String $anonfun$testSourceNotAvailable$4() {
        return "Metadata not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testSourceNotAvailable$5(AtomicReference currentCluster$1) {
        return currentCluster$1.getAndSet(null) != null;
    }

    public static final /* synthetic */ String $anonfun$testSourceNotAvailable$6() {
        return "Metadata not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testSourceNotAvailable$7(AtomicReference currentCluster$1) {
        return currentCluster$1.getAndSet(null) != null;
    }

    public static final /* synthetic */ String $anonfun$testSourceNotAvailable$8() {
        return "Metadata not updated";
    }

    public static final /* synthetic */ void kafka$server$link$ClusterLinkFetcherManagerTest$$$anonfun$verifyPartitionFailureState$1(Cluster cluster, AtomicReference currentCluster$2) {
        currentCluster$2.set(cluster);
    }

    public static final /* synthetic */ boolean $anonfun$verifyPartitionFailureState$3(AtomicReference currentCluster$2) {
        return currentCluster$2.getAndSet(null) != null;
    }

    public static final /* synthetic */ String $anonfun$verifyPartitionFailureState$4() {
        return "Metadata not updated";
    }

    public static final /* synthetic */ void $anonfun$verifyPartitionFailureState$5(ClusterLinkFetcherManagerTest $this, TopicPartition tp$2, ConcurrentHashMap linkedPartitions$1, MirrorFailureType failureType) {
        $this.kafka$server$link$ClusterLinkFetcherManagerTest$$time().sleep((long)$this.kafka$server$link$ClusterLinkFetcherManagerTest$$clusterLinkConfig().retryTimeoutMs());
        ClusterLinkFetcherManager qual$1 = $this.kafka$server$link$ClusterLinkFetcherManagerTest$$fetcherManager();
        String x$9 = failureType.toString();
        boolean x$10 = qual$1.onPartitionLinkFailure$default$4();
        qual$1.onPartitionLinkFailure(tp$2, failureType, x$9, x$10);
        Assertions.assertNotEquals((long)0L, (long)((PartitionAndState)linkedPartitions$1.get(tp$2)).failureStartMs().get());
    }

    public ClusterLinkFetcherManagerTest() {
        this.kafka$server$link$ClusterLinkFetcherManagerTest$$linkName = "testLink";
    }
}

