package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.PartitionLinkState;
import kafka.cluster.ClusterLinkState;
import kafka.cluster.Partition;
import kafka.cluster.PartitionState;
import kafka.controller.ControllerContext;
import kafka.controller.KafkaController;
import kafka.network.SocketServer;
import kafka.server.BrokerEpochManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerFeatures$;
import kafka.server.ClusterLinkQuotas;
import kafka.server.ClusterLinkQuotas$;
import kafka.server.FetcherPool$Default$;
import kafka.server.FetcherTag;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.MetadataCache$;
import kafka.server.MetadataSupport;
import kafka.server.ReplicaManager;
import kafka.server.ZkAdminManager;
import kafka.server.ZkSupport;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFailed;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkTopicState;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ClusterLinkExistsException;
import org.apache.kafka.common.errors.ClusterLinkNotFoundException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.NodeToControllerChannelManager;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Growable;
import scala.collection.mutable.Map$;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\r=d\u0001B(Q\u0001]CQA\u0018\u0001\u0005\u0002}CqA\u0019\u0001C\u0002\u0013%1\r\u0003\u0004i\u0001\u0001\u0006I\u0001\u001a\u0005\bS\u0002\u0011\r\u0011\"\u0003k\u0011\u00199\b\u0001)A\u0005W\"9\u0001\u0010\u0001b\u0001\n\u0013I\bbBA\u0001\u0001\u0001\u0006IA\u001f\u0005\n\u0003\u0007\u0001!\u0019!C\u0005\u0003\u000bA\u0001\"!\u0004\u0001A\u0003%\u0011q\u0001\u0005\n\u0003\u001f\u0001!\u0019!C\u0005\u0003#A\u0001\"!\b\u0001A\u0003%\u00111\u0003\u0005\n\u0003?\u0001!\u0019!C\u0005\u0003CA\u0001\"!\u000b\u0001A\u0003%\u00111\u0005\u0005\n\u0003W\u0001!\u0019!C\u0005\u0003[A\u0001\"a\u000f\u0001A\u0003%\u0011q\u0006\u0005\n\u0003{\u0001!\u0019!C\u0005\u0003\u007fA\u0001\"a\u0012\u0001A\u0003%\u0011\u0011\t\u0005\f\u0003\u0013\u0002\u0001\u0019!a\u0001\n\u0013\tY\u0005C\u0006\u0002T\u0001\u0001\r\u00111A\u0005\n\u0005U\u0003bCA1\u0001\u0001\u0007\t\u0011)Q\u0005\u0003\u001bB1\"a\u0019\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002f!Y\u0011Q\u000e\u0001A\u0002\u0003\u0007I\u0011BA8\u0011-\t\u0019\b\u0001a\u0001\u0002\u0003\u0006K!a\u001a\t\u0013\u0005U\u0004A1A\u0005\n\u0005]\u0004\u0002CAH\u0001\u0001\u0006I!!\u001f\t\u000f\u0005E\u0005\u0001\"\u0001\u0002\u0014\"9\u00111\u0016\u0001\u0005\u0002\u0005M\u0005bBA[\u0001\u0011\u0005\u00111\u0013\u0005\b\u0003\u007f\u0003A\u0011AAJ\u0011\u001d\t\u0019\r\u0001C\u0001\u0003'Cq!a2\u0001\t\u0003\t\u0019\nC\u0004\u0002L\u0002!\t!a%\t\u000f\u0005=\u0007\u0001\"\u0001\u0002\u0014\"9\u00111\u001b\u0001\u0005\n\u0005M\u0005bBAk\u0001\u0011%\u00111\u0013\u0005\b\u0003/\u0004A\u0011BAJ\u0011\u001d\tI\u000e\u0001C\u0005\u0003'Cq!a7\u0001\t\u0013\t\u0019\nC\u0004\u0002^\u0002!\t!a8\t\u000f\tM\u0001\u0001\"\u0001\u0002\u0014\"9!q\u0003\u0001\u0005\u0002\u0005M\u0005b\u0002B\u000e\u0001\u0011\u0005\u00111\u0013\u0005\b\u0005?\u0001A\u0011\u0002B\u0011\u0011\u001d\u0011i\u0003\u0001C\u0001\u0003'CqA!\r\u0001\t\u0003\t\u0019\nC\u0004\u00036\u0001!\t!a%\t\u000f\te\u0002\u0001\"\u0001\u0002\u0014\"9!\u0011\n\u0001\u0005\u0002\u0005M\u0005b\u0002B'\u0001\u0011\u0005\u00111\u0013\u0005\b\u0005#\u0002A\u0011AAJ\u0011\u001d\u0011)\u0006\u0001C\u0001\u0005/BqA!\u0018\u0001\t\u0003\t\u0019\nC\u0004\u0003b\u0001!\t!a%\t\u000f\t\u0015\u0004\u0001\"\u0001\u0002\u0014\"9!\u0011\u000e\u0001\u0005\u0002\u0005M\u0005b\u0002B7\u0001\u0011\u0005\u00111\u0013\u0005\b\u0005c\u0002A\u0011AAJ\u0011\u001d\u0011)\b\u0001C\u0001\u0003'CqA!\u001f\u0001\t\u0003\t\u0019\nC\u0004\u0003~\u0001!IAa \t\u000f\t%\u0007\u0001\"\u0001\u0002\u0014\"9!Q\u001a\u0001\u0005\u0002\u0005M\u0005b\u0002Bi\u0001\u0011\u0005\u00111\u0013\u0005\b\u0005+\u0004A\u0011AAJ\u0011\u001d\u0011I\u000e\u0001C\u0005\u00057DqA!:\u0001\t\u0013\u00119\u000fC\u0004\u0003j\u0002!\tAa;\t\u000f\t]\b\u0001\"\u0003\u0003z\"9!1 \u0001\u0005\n\tu\bbBB\u0003\u0001\u0011%1q\u0001\u0005\b\u0007W\u0001A\u0011BB\u0017\u0011\u001d\u0019Y\u0004\u0001C\u0005\u0007{Aqaa\u0013\u0001\t\u0013\u0019ieB\u0004\u0004TAC\ta!\u0016\u0007\r=\u0003\u0006\u0012AB,\u0011\u0019q6\n\"\u0001\u0004Z!911L&\u0005\u0002\u0005M\u0005bBB3\u0017\u0012\u0005\u00111\u0013\u0002\u0017\u00072,8\u000f^3s\u0019&t7.T1oC\u001e,'\u000fV3ti*\u0011\u0011KU\u0001\u0005Y&t7N\u0003\u0002T)\u000611/\u001a:wKJT\u0011!V\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\f\u0005\u0002Z96\t!LC\u0001\\\u0003\u0015\u00198-\u00197b\u0013\ti&L\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\u0001\u0004\"!\u0019\u0001\u000e\u0003A\u000bAB\u0019:pW\u0016\u00148i\u001c8gS\u001e,\u0012\u0001\u001a\t\u0003K\u001al\u0011AU\u0005\u0003OJ\u00131bS1gW\u0006\u001cuN\u001c4jO\u0006i!M]8lKJ\u001cuN\u001c4jO\u0002\nq!\\3ue&\u001c7/F\u0001l!\taW/D\u0001n\u0015\tIgN\u0003\u0002pa\u000611m\\7n_:T!!V9\u000b\u0005I\u001c\u0018AB1qC\u000eDWMC\u0001u\u0003\ry'oZ\u0005\u0003m6\u0014q!T3ue&\u001c7/\u0001\u0005nKR\u0014\u0018nY:!\u0003\u0011!\u0018.\\3\u0016\u0003i\u0004\"a\u001f@\u000e\u0003qT!! 8\u0002\u000bU$\u0018\u000e\\:\n\u0005}d(\u0001C'pG.$\u0016.\\3\u0002\u000bQLW.\u001a\u0011\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feV\u0011\u0011q\u0001\t\u0004K\u0006%\u0011bAA\u0006%\nq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\u0018a\u0004:fa2L7-Y'b]\u0006<WM\u001d\u0011\u0002\u0015\r|g\u000e\u001e:pY2,'/\u0006\u0002\u0002\u0014A!\u0011QCA\r\u001b\t\t9BC\u0002\u0002\u0010QKA!a\u0007\u0002\u0018\ty1*\u00194lC\u000e{g\u000e\u001e:pY2,'/A\u0006d_:$(o\u001c7mKJ\u0004\u0013!E2p]R\u0014x\u000e\u001c7fe\u000e{g\u000e^3yiV\u0011\u00111\u0005\t\u0005\u0003+\t)#\u0003\u0003\u0002(\u0005]!!E\"p]R\u0014x\u000e\u001c7fe\u000e{g\u000e^3yi\u0006\u00112m\u001c8ue>dG.\u001a:D_:$X\r\u001f;!\u0003!Q8n\u00117jK:$XCAA\u0018!\u0011\t\t$a\u000e\u000e\u0005\u0005M\"bAA\u001b)\u0006\u0011!p[\u0005\u0005\u0003s\t\u0019DA\u0007LC\u001a\\\u0017MW6DY&,g\u000e^\u0001\nu.\u001cE.[3oi\u0002\nQ\"\\3uC\u0012\fG/Y\"bG\",WCAA!!\r)\u00171I\u0005\u0004\u0003\u000b\u0012&!D'fi\u0006$\u0017\r^1DC\u000eDW-\u0001\bnKR\fG-\u0019;b\u0007\u0006\u001c\u0007.\u001a\u0011\u0002%\rdWo\u001d;fe2Kgn['b]\u0006<WM]\u000b\u0003\u0003\u001b\u00022!YA(\u0013\r\t\t\u0006\u0015\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T1oC\u001e,'/\u0001\fdYV\u001cH/\u001a:MS:\\W*\u00198bO\u0016\u0014x\fJ3r)\u0011\t9&!\u0018\u0011\u0007e\u000bI&C\u0002\u0002\\i\u0013A!\u00168ji\"I\u0011qL\n\u0002\u0002\u0003\u0007\u0011QJ\u0001\u0004q\u0012\n\u0014aE2mkN$XM\u001d'j].l\u0015M\\1hKJ\u0004\u0013!\u0003>l'V\u0004\bo\u001c:u+\t\t9\u0007E\u0002f\u0003SJ1!a\u001bS\u0005%Q6nU;qa>\u0014H/A\u0007{WN+\b\u000f]8si~#S-\u001d\u000b\u0005\u0003/\n\t\bC\u0005\u0002`Y\t\t\u00111\u0001\u0002h\u0005Q!p[*vaB|'\u000f\u001e\u0011\u0002;Q|\u0007/[2D_:4\u0017nZ*z]\u000eLen\u00197vI\u0016$UMZ1vYR,\"!!\u001f\u0011\t\u0005m\u0014\u0011\u0012\b\u0005\u0003{\n)\tE\u0002\u0002��ik!!!!\u000b\u0007\u0005\re+\u0001\u0004=e>|GOP\u0005\u0004\u0003\u000fS\u0016A\u0002)sK\u0012,g-\u0003\u0003\u0002\f\u00065%AB*ue&twMC\u0002\u0002\bj\u000ba\u0004^8qS\u000e\u001cuN\u001c4jONKhnY%oG2,H-\u001a#fM\u0006,H\u000e\u001e\u0011\u0002\u000bM,G/\u00169\u0015\u0005\u0005]\u0003f\u0001\u000e\u0002\u0018B!\u0011\u0011TAT\u001b\t\tYJ\u0003\u0003\u0002\u001e\u0006}\u0015aA1qS*!\u0011\u0011UAR\u0003\u001dQW\u000f]5uKJT1!!*t\u0003\u0015QWO\\5u\u0013\u0011\tI+a'\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<oQ\rY\u0012q\u0016\t\u0005\u00033\u000b\t,\u0003\u0003\u00024\u0006m%!C!gi\u0016\u0014X)Y2i\u0003A!Xm\u001d;DYV\u001cH/\u001a:MS:\\7\u000fK\u0002\u001d\u0003s\u0003B!!'\u0002<&!\u0011QXAN\u0005\u0011!Vm\u001d;\u0002/R,7\u000f^\"sK\u0006$Xm\u00117vgR,'\u000fT5oWJ+'.Z2ugJ+\u0017/^3ti&3G+\u001a8b]R\u0004&/\u001a4jq&\u001bh*\u001e7m\u0003:$7I]3bi\u0016\u001cE.^:uKJd\u0015N\\6Q_2L7-_#ySN$8\u000fK\u0002\u001e\u0003s\u000bq\u0002^3tiJ+7m\u001c8gS\u001e,(/\u001a\u0015\u0004=\u0005e\u0016!\u0006;fgR\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0015\u0004?\u0005e\u0016a\u0006;fgR\u0014V\r^3oi&|gnQ8oM&<7+\u001f8dQ\r\u0001\u0013\u0011X\u0001(i\u0016\u001cH\u000fV8qS\u000e\u001cuN\u001c4jONKhnY%oG2,H-Z\"p[B\fG/\u001b2jY&$\u0018\u0010K\u0002\"\u0003s\u000bQB^3sS\u001aLH)\u001a4bk2$\u0018a\u0007<fe&4\u0017PW&ICNl\u0015\r\u001c4pe6,GmQ8oM&<7/A\rwKJLg-\u001f.L\u0011\u0006\u001cXK\\6o_^t7i\u001c8gS\u001e\u001c\u0018a\u0007<fe&4\u0017PW&NSN\u001cXm]!mo\u0006L8oQ8oM&<7/A\u000fwKJLg-\u001f.L\u0011\u0006\u001c\u0018J\u001c3fa\u0016tG-\u001a8u\u0007>tg-[4t\u0003E\u0019'/Z1uK\u000ecWo\u001d;fe2Kgn\u001b\u000b\t\u0003C\fy/a=\u0002��B!\u00111]Au\u001d\r\t\u0017Q]\u0005\u0004\u0003O\u0004\u0016AE\"mkN$XM\u001d'j].4\u0015m\u0019;pefLA!a;\u0002n\nqa)\u001a;dQ\u0016\u0014X*\u00198bO\u0016\u0014(bAAt!\"9\u0011\u0011_\u0014A\u0002\u0005e\u0014\u0001\u00037j].t\u0015-\\3\t\u000f\u0005Ux\u00051\u0001\u0002x\u00061A.\u001b8l\u0013\u0012\u0004B!!?\u0002|6\ta.C\u0002\u0002~:\u0014A!V;jI\"9!\u0011A\u0014A\u0002\t\r\u0011\u0001E2mkN$XM\u001d'j].\u0004&o\u001c9t!\u0011\u0011)Aa\u0004\u000e\u0005\t\u001d!\u0002\u0002B\u0005\u0005\u0017\tA!\u001e;jY*\u0011!QB\u0001\u0005U\u00064\u0018-\u0003\u0003\u0003\u0012\t\u001d!A\u0003)s_B,'\u000f^5fg\u0006AB/Z:u\r\u0006LG.\u001a3BI\u0012\u001cE.^:uKJd\u0015N\\6)\u0007!\nI,\u0001\u0012uKN$(+Z2p]\u001aLw-\u001e:f\u001d>t'+\u001a;sS\u0006\u0014G.\u001a$bS2,(/\u001a\u0015\u0004S\u0005e\u0016!\r;fgR\u0014V\r\u001e:z%\u0016\u001cwN\u001c4jOV\u0014XMU3ue&\f'\r\\3GC&dWO]3P]2Kgn\u001b*v]:Lgn\u001a\u0015\u0004U\u0005e\u0016AD2p]R\f\u0017N\\:NKR\u0014\u0018n\u0019\u000b\u0005\u0005G\u0011I\u0003E\u0002Z\u0005KI1Aa\n[\u0005\u001d\u0011un\u001c7fC:DqAa\u000b,\u0001\u0004\tI(\u0001\u0003oC6,\u0017!\r;fgR\u0014V\r\u001e:z%\u0016\u001cwN\u001c4jOV\u0014XMU3ue&\f'\r\\3GC&dWO]3P]2Kgn[*uCJ$X\u000f\u001d\u0015\u0004Y\u0005e\u0016!\u0006;fgR$U\r\\3uK\u000ecWo\u001d;fe2Kgn\u001b\u0015\u0004[\u0005e\u0016A\t;fgR\u001cuN\u001c;s_2dWM\u001d'jgR,g.\u001a:NSJ\u0014xN]*uCR,7\u000fK\u0002/\u0003s\u000b1\u0004^3ti2K7\u000f^\"mkN$XM\u001d'j].\u001cHj\\2lS:<\u0007fA\u0018\u0002:\":qFa\u0010\u0003F\t\u001d\u0003\u0003BAM\u0005\u0003JAAa\u0011\u0002\u001c\n9A+[7f_V$\u0018!\u0002<bYV,g$\u0001\u000b\u0002!Q,7\u000f\u001e*fiJLHj\\2lS:<\u0007f\u0001\u0019\u0002:\u0006)C/Z:u\u001d>tw\n\u001d;j[&TX\r\u001a(p]\ncwnY6j]\u001edunY1m\u0003\u0012l\u0017N\u001c\u0015\u0004c\u0005e\u0016A\t;fgR|\u0005\u000f^5nSj,GMT8o\u00052|7m[5oO2{7-\u00197BI6Lg\u000eK\u00023\u0003s\u000b1D^3sS\u001aLhj\u001c8CY>\u001c7.\u001b8h\u0019>\u001c\u0017\r\\!e[&tG\u0003BA,\u00053BqAa\u00174\u0001\u0004\u0011\u0019#A\u000bf]\u0006\u0014G.Z(qi&l\u0017N_3e\u00072LWM\u001c;\u00025Q,7\u000f\u001e+f[B|'/\u0019:z\u0003\u0012l\u0017N\\%t\u00072|7/\u001a3)\u0007Q\nI,\u0001\u0011uKN$hj\u001c8UK6\u0004xN]1ss\u0006#W.\u001b8Jg:{Go\u00117pg\u0016$\u0007fA\u001b\u0002:\u00069B/Z:u)\u0016l\u0007o\u001c:bef\fE-\\5o\u0013:$W\r\u001f\u0015\u0004m\u0005e\u0016\u0001\u0006;fgR$\u0015P\\1nS\u000e4U\r^2i'&TX\rK\u00028\u0003s\u000b\u0011\u0005^3ti\u000ecWo\u001d;fe2Kgn[\"p]\u001aLwMU3f]\u000e\u0014\u0018\u0010\u001d;j_:D3\u0001OA]\u0003u!Xm\u001d;TG\",G-\u001e7fe^KG\u000f\u001b'j].\feMZ5oSRL\bfA\u001d\u0002:\u0006AC/Z:u\u001bVdG/\u001b+f]\u0006tGoU2iK\u0012,H.\u001a:XSRDG*\u001b8l\u0003\u001a4\u0017N\\5us\"\u001a!(!/\u0002UQ,7\u000f^'vYRLG+\u001a8b]R\u001c6\r[3ek2,'oV5uQR+g.\u00198u\u0003\u001a4\u0017N\\5us\"\u001a1(!/\u0002'Y,'/\u001b4z\u0019&t7nU2iK\u0012,H.\u001a:\u0015\r\u0005]#\u0011\u0011BZ\u0011\u001d\u0011\u0019\t\u0010a\u0001\u0005\u000b\u000ba\u0002\u001e5sK\u0006$\u0017I\u001a4j]&$\u0018\u0010\u0005\u0003\u0003\b\n5f\u0002\u0002BE\u0005OsAAa#\u0003\":!!Q\u0012BO\u001d\u0011\u0011yIa'\u000f\t\tE%\u0011\u0014\b\u0005\u0005'\u00139J\u0004\u0003\u0002��\tU\u0015\"\u0001;\n\u0005I\u001c\u0018BA+r\u0013\ty\u0007/C\u0002\u0003 :\faaY8oM&<\u0017\u0002\u0002BR\u0005K\u000b\u0011\"\u001b8uKJt\u0017\r\\:\u000b\u0007\t}e.\u0003\u0003\u0003*\n-\u0016\u0001E\"p]\u001adW/\u001a8u\u0007>tg-[4t\u0015\u0011\u0011\u0019K!*\n\t\t=&\u0011\u0017\u0002\u001a\u00072,8\u000f^3s\u0019&t7\u000e\u00165sK\u0006$\u0017I\u001a4j]&$\u0018P\u0003\u0003\u0003*\n-\u0006b\u0002B[y\u0001\u0007!qW\u0001\u000fY&t7nU2iK\u0012,H.\u001a:t!!\tYH!/\u0003>\n\r\u0017\u0002\u0002B^\u0003\u001b\u00131!T1q!\u0011\t\tDa0\n\t\t\u0005\u00171\u0007\u0002\u0010\u00072,8\u000f^3s\u0019&t7\u000eR1uCB\u0019\u0011L!2\n\u0007\t\u001d'LA\u0002J]R\f\u0001\u0005^3ti2+\u0017\rZ3s\u0003:$\u0017j\u001d:CK\u001a|'/\u001a'j].,\u0006\u000fZ1uK\"\u001aQ(!/\u0002]Q,7\u000f^!eIJ+Wn\u001c<f!\u0006\u0014H/\u001b;j_:\u001cH*\u001b8l\u0007>|'\u000fZ5oCR|'\u000fR5tC\ndW\r\u001a\u0015\u0004}\u0005e\u0016A\n;fgR\fE\r\u001a*f[>4X\rU1si&$\u0018n\u001c8t\u0019&t7nQ8pe\u0012Lg.\u0019;pe\"\u001aq(!/\u0002SQ,7\u000f^!eIJ+Wn\u001c<f!\u0006\u0014H/\u001b;j_:\u001chj\u001c;MS:\\7i\\8sI&t\u0017\r^8sQ\r\u0001\u0015\u0011X\u0001\u001am\u0016\u0014\u0018NZ=BI\u0012\u0014V-\\8wKB\u000b'\u000f^5uS>t7\u000f\u0006\u0004\u0002X\tu'\u0011\u001d\u0005\b\u0005?\f\u0005\u0019\u0001B\u0012\u0003Ya\u0017N\\6D_>\u0014H-\u001b8bi>\u0014XI\\1cY\u0016$\u0007b\u0002Br\u0003\u0002\u0007!1E\u0001\u0012SNd\u0015N\\6D_>\u0014H-\u001b8bi>\u0014\u0018AE2sK\u0006$XM\u0011:pW\u0016\u00148i\u001c8gS\u001e$\u0012\u0001Z\u0001\u001dO\u0016$8\t\\;ti\u0016\u0014H*\u001b8l\r\u0006LG.\u001a3BiR,W\u000e\u001d;t)\u0019\u0011iOa=\u0003vB\u0019\u0011La<\n\u0007\tE(L\u0001\u0003M_:<\u0007bBA%\u0007\u0002\u0007\u0011Q\n\u0005\b\u0003k\u001c\u0005\u0019AA|\u0003i\u0019G.^:uKJd\u0015N\\6QKJ\u001c\u0018n\u001d;f]R\u0004&o\u001c9t+\t\u0011\u0019!A\tdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e,\"Aa@\u0011\u0007\u0005\u001c\t!C\u0002\u0004\u0004A\u0013\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u0003%\u0019X\r^;q\u001b>\u001c7\u000e\u0006\u0005\u0002X\r%1\u0011DB\u0012\u0011\u001d\u0019YA\u0012a\u0001\u0007\u001b\t\u0011\u0002]1si&$\u0018n\u001c8\u0011\t\r=1QC\u0007\u0003\u0007#Q1aa\u0005U\u0003\u001d\u0019G.^:uKJLAaa\u0006\u0004\u0012\tI\u0001+\u0019:uSRLwN\u001c\u0005\b\u000771\u0005\u0019AB\u000f\u0003\t!\b\u000f\u0005\u0003\u0002z\u000e}\u0011bAB\u0011]\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007bBA{\r\u0002\u00071Q\u0005\t\u00063\u000e\u001d\u0012q_\u0005\u0004\u0007SQ&AB(qi&|g.\u0001\rde\u0016\fG/Z\"mkN$XM\u001d'j].l\u0015M\\1hKJ$b!!\u0014\u00040\rE\u0002\"\u00022H\u0001\u0004!\u0007bBB\u001a\u000f\u0002\u00071QG\u0001\u0010[\u0016$\u0018\rZ1uCN+\b\u000f]8siB\u0019Qma\u000e\n\u0007\re\"KA\bNKR\fG-\u0019;b'V\u0004\bo\u001c:u\u0003]\u0019H/\u0019:u\u00072,8\u000f^3s\u0019&t7.T1oC\u001e,'\u000f\u0006\u0004\u0002X\r}2\u0011\n\u0005\b\u0007\u0003B\u0005\u0019AB\"\u0003\u001di\u0017M\\1hKJ\u0004B!a9\u0004F%!1qIAw\u0005-a\u0015N\\6NC:\fw-\u001a:\t\u000f\rM\u0002\n1\u0001\u00046\u0005ISM\\:ve\u0016dunY6O_R\f5-];je\u0016$')\u001a4pe\u0016LeN^8lS:<W*\u001a;i_\u0012$B!a\u0016\u0004P!91\u0011K%A\u0002\u0005e\u0014AC7fi\"|GMT1nK\u000612\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3s)\u0016\u001cH\u000f\u0005\u0002b\u0017N\u00111\n\u0017\u000b\u0003\u0007+\n!b]3u+B\u001cE.Y:tQ\ri5q\f\t\u0005\u00033\u001b\t'\u0003\u0003\u0004d\u0005m%!\u0003\"fM>\u0014X-\u00117m\u00035!X-\u0019:E_^t7\t\\1tg\"\u001aaj!\u001b\u0011\t\u0005e51N\u0005\u0005\u0007[\nYJ\u0001\u0005BMR,'/\u00117m\u0001")
/* loaded from: input_file:kafka/server/link/ClusterLinkManagerTest.class */
public class ClusterLinkManagerTest {
    private final KafkaConfig kafka$server$link$ClusterLinkManagerTest$$brokerConfig = createBrokerConfig();
    private final Metrics kafka$server$link$ClusterLinkManagerTest$$metrics = new Metrics();
    private final MockTime kafka$server$link$ClusterLinkManagerTest$$time = new MockTime();
    private final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
    private final KafkaController controller = (KafkaController) Mockito.mock(KafkaController.class);
    private final ControllerContext controllerContext = (ControllerContext) Mockito.mock(ControllerContext.class);
    private final KafkaZkClient zkClient = (KafkaZkClient) Mockito.mock(KafkaZkClient.class);
    private final MetadataCache metadataCache;
    private ClusterLinkManager clusterLinkManager;
    private ZkSupport zkSupport;
    private final String topicConfigSyncIncludeDefault;

    @AfterAll
    public static void tearDownClass() {
        ClusterLinkManagerTest$ clusterLinkManagerTest$ = ClusterLinkManagerTest$.MODULE$;
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@AfterAll");
    }

    @BeforeAll
    public static void setUpClass() {
        ClusterLinkManagerTest$ clusterLinkManagerTest$ = ClusterLinkManagerTest$.MODULE$;
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@BeforeAll");
    }

    public KafkaConfig kafka$server$link$ClusterLinkManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkManagerTest$$brokerConfig;
    }

    public Metrics kafka$server$link$ClusterLinkManagerTest$$metrics() {
        return this.kafka$server$link$ClusterLinkManagerTest$$metrics;
    }

    public MockTime kafka$server$link$ClusterLinkManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkManagerTest$$time;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private KafkaController controller() {
        return this.controller;
    }

    private ControllerContext controllerContext() {
        return this.controllerContext;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private MetadataCache metadataCache() {
        return this.metadataCache;
    }

    private ClusterLinkManager clusterLinkManager() {
        return this.clusterLinkManager;
    }

    private void clusterLinkManager_$eq(ClusterLinkManager clusterLinkManager) {
        this.clusterLinkManager = clusterLinkManager;
    }

    private ZkSupport zkSupport() {
        return this.zkSupport;
    }

    private void zkSupport_$eq(ZkSupport zkSupport) {
        this.zkSupport = zkSupport;
    }

    private String topicConfigSyncIncludeDefault() {
        return this.topicConfigSyncIncludeDefault;
    }

    @BeforeEach
    public void setUp() {
        ((ReplicaManager) Mockito.doReturn(metadataCache(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(replicaManager())).metadataCache();
        ((ReplicaManager) Mockito.doReturn(new Some(zkClient()), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(replicaManager())).zkClient();
        ((ReplicaManager) Mockito.doReturn(Predef$.MODULE$.Set().empty().iterator(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(replicaManager())).leaderPartitionsIterator();
        ((KafkaController) Mockito.doReturn(BoxesRunTime.boxToBoolean(true), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controller())).isActive();
        ((KafkaZkClient) Mockito.doReturn(package$.MODULE$.Seq().empty(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getChildren("/cluster_links");
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().empty(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks(Predef$.MODULE$.Set().empty());
        ((KafkaZkClient) Mockito.doNothing().when(zkClient())).transformEntityConfigs((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (Function1) ArgumentMatchers.any());
        final ClusterLinkManagerTest clusterLinkManagerTest = null;
        zkSupport_$eq(new ZkSupport((ZkAdminManager) null, controller(), zkClient(), None$.MODULE$, metadataCache(), new BrokerEpochManager(clusterLinkManagerTest) { // from class: kafka.server.link.ClusterLinkManagerTest$$anon$1
            public long get() {
                return 1L;
            }

            public boolean isBrokerEpochStale(long j, boolean z) {
                return false;
            }
        }));
        clusterLinkManager_$eq(createClusterLinkManager(kafka$server$link$ClusterLinkManagerTest$$brokerConfig(), zkSupport()));
    }

    @AfterEach
    public void tearDown() {
        clusterLinkManager().shutdown();
        kafka$server$link$ClusterLinkManagerTest$$metrics().close();
    }

    @Test
    public void testClusterLinks() {
        String str = "testLink";
        Uuid randomUuid = Uuid.randomUuid();
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(randomUuid);
        String str2 = "testClusterId";
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false);
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUuid));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUuid));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().resolveLinkId("testLink"));
        Assertions$.MODULE$.intercept(() -> {
            return this.clusterLinkManager().resolveLinkIdOrThrow(str);
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 99));
        clusterLinkManager().ensureLinkNameDoesntExist("testLink");
        Assertions.assertEquals(package$.MODULE$.Seq().empty(), clusterLinkManager().listClusterLinks());
        setupMock(partition, topicPartition, None$.MODULE$);
        clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition})));
        setupMock(partition, topicPartition, new Some(randomUuid));
        Assertions.assertEquals(0, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        setupMock(partition, topicPartition, new Some(randomUuid));
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertNotEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUuid));
        Assertions.assertNotEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUuid));
        Assertions.assertEquals(new Some(randomUuid), clusterLinkManager().resolveLinkId("testLink"));
        Assertions.assertEquals(new $colon.colon(clusterLinkData, Nil$.MODULE$), clusterLinkManager().listClusterLinks());
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) clusterLinkManager().fetcherManager(randomUuid).get();
        ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkDestClientManager) clusterLinkManager().clientManager(randomUuid).get();
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().createClusterLink(new ClusterLinkData(str, Uuid.randomUuid(), new Some(str2), None$.MODULE$, false), this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }, ClassTag$.MODULE$.apply(ClusterLinkExistsException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 121));
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()), "Topic not added to metadata");
        Assertions.assertTrue(clusterLinkDestClientManager.getTopics().contains("testTopic"), "Topic not added to client manager");
        Assertions.assertFalse(clusterLinkFetcherManager.isEmpty(), "Fetcher not recording active topic");
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
        LeaderAndIsrRequestData.LeaderAndIsrPartitionState leaderAndIsrPartitionState = (LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.mock(LeaderAndIsrRequestData.LeaderAndIsrPartitionState.class);
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(javaUUID.toString(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkId();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn("Mirror", ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkTopicState();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(BoxesRunTime.boxToInteger(1), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).linkedLeaderEpoch();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(Uuid.randomUuid(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkSourceTopicId();
        clusterLinkManager().removePartitions((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()), "Topic removed from metadata");
        Assertions.assertFalse(clusterLinkDestClientManager.getTopics().contains("testTopic"), "Topic not removed from client manager");
        Mockito.reset(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState[]{leaderAndIsrPartitionState});
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn((Object) null, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkId();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(Uuid.randomUuid(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkSourceTopicId();
        clusterLinkManager().removePartitions((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()), "Topic not removed from metadata");
        Assertions.assertFalse(clusterLinkDestClientManager.getTopics().contains("testTopic"), "Topic should not be in client manager");
        Mockito.reset(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState[]{leaderAndIsrPartitionState});
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(javaUUID.toString(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkId();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(Uuid.randomUuid(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkSourceTopicId();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn("FailedMirror", ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).clusterLinkTopicState();
        ((LeaderAndIsrRequestData.LeaderAndIsrPartitionState) Mockito.doReturn(BoxesRunTime.boxToInteger(-1), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(leaderAndIsrPartitionState)).linkedLeaderEpoch();
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        clusterLinkManager().removePartitions((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()), "Topic not removed from metadata for failed mirror");
        Assertions.assertFalse(clusterLinkDestClientManager.getTopics().contains("testTopic"), "Topic should not be in client manager for failed mirror");
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        setupMock(partition2, topicPartition2, new Some(randomUuid));
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition2}))));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()), "Topic not added to metadata");
        Assertions.assertFalse(clusterLinkDestClientManager.getTopics().contains("testTopic"), "Topic should not be added to client manager");
        clusterLinkManager().removePartitionsAndMetadata((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition2})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()), "Topic not removed from metadata");
        Assertions.assertFalse(clusterLinkDestClientManager.getTopics().contains("testTopic"), "Topic should not be in to client manager");
        Object obj = clusterLinkManager().fetcherManager(randomUuid).get();
        Assertions.assertTrue(obj != null && obj.equals(clusterLinkFetcherManager), "Unexpected fetcher manager");
        Object obj2 = clusterLinkManager().clientManager(randomUuid).get();
        Assertions.assertTrue(obj2 != null && obj2.equals(clusterLinkDestClientManager), "Unexpected client manager");
        Mockito.reset(new KafkaZkClient[]{zkClient()});
        ClusterLinkData clusterLinkData2 = new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, true);
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(true), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), clusterLinkData2)})), new Object[]{Predef$.MODULE$.Map().apply(Nil$.MODULE$)}).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        ((ControllerContext) Mockito.doReturn(BoxesRunTime.boxToInteger(0), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controllerContext())).epochZkVersion();
        ((KafkaController) Mockito.doReturn(controllerContext(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controller())).controllerContext();
        ((KafkaZkClient) Mockito.doNothing().when(zkClient())).setClusterLink(clusterLinkData2, 0);
        clusterLinkManager().deleteClusterLink("testLink", randomUuid);
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().resolveLinkId("testLink"));
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().deleteClusterLink(str, randomUuid);
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 196));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testClusterLinks$4(this, randomUuid)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testClusterLinks$5(randomUuid));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        ((KafkaZkClient) Mockito.verify(zkClient(), Mockito.times(3))).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
    }

    @Test
    public void testCreateClusterLinkRejectsRequestIfTenantPrefixIsNullAndCreateClusterLinkPolicyExists() {
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", Uuid.randomUuid(), new Some("testClusterId"), None$.MODULE$, false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig.put(KafkaConfig$.MODULE$.CreateClusterLinkPolicyClassNameProp(), TestCreateClusterLinkPolicy.class);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        clusterLinkManager().shutdown();
        ClusterLinkManager createClusterLinkManager = createClusterLinkManager(fromProps, zkSupport());
        Assertions.assertEquals("Tenant prefix cannot be null if a CreateClusterLinkPolicy is set.", Assertions.assertThrows(InvalidRequestException.class, () -> {
            createClusterLinkManager.createClusterLink(clusterLinkData, this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }).getMessage());
    }

    @Test
    public void testReconfigure() {
        String str = "testLink";
        Uuid randomUuid = Uuid.randomUuid();
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(randomUuid);
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().updateClusterLinkConfig(str, properties -> {
                return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$2(properties));
            });
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 230));
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), new ClusterLinkData("testLink", randomUuid, None$.MODULE$, None$.MODULE$, false))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUuid));
        clusterLinkManager().createClusterLink(new ClusterLinkData("testLink", randomUuid, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), clusterLinkPersistentProps());
        ClusterLinkFactory.FetcherManager fetcherManager = (ClusterLinkFactory.FetcherManager) clusterLinkManager().fetcherManager(randomUuid).get();
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), fetcherManager.currentConfig().bootstrapServers());
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), fetcherManager.currentConfig().bootstrapServers());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
        ((KafkaZkClient) Mockito.doReturn(properties, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getEntityConfigs("cluster-links", javaUUID.toString());
        clusterLinkManager().updateClusterLinkConfig("testLink", properties2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$3(properties2));
        });
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), fetcherManager.currentConfig().bootstrapServers());
        ((KafkaZkClient) Mockito.verify(zkClient())).getEntityConfigs("cluster-links", javaUUID.toString());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(true), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        ((KafkaZkClient) Mockito.doReturn(properties, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getEntityConfigs("cluster-links", javaUUID.toString());
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), new ClusterLinkData("testLink", randomUuid, None$.MODULE$, None$.MODULE$, false))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        ((KafkaZkClient) Mockito.doNothing().when(zkClient())).setOrCreateEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(randomUuid.toString()), (Properties) ArgumentMatchers.any(Properties.class));
        clusterLinkManager().updateClusterLinkConfig("testLink", properties3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$4(properties3));
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Properties.class);
        ((KafkaZkClient) Mockito.verify(zkClient())).setOrCreateEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(javaUUID.toString()), (Properties) forClass.capture());
        ClusterLinkManager clusterLinkManager2 = clusterLinkManager();
        clusterLinkManager2.processClusterLinkChanges(randomUuid, (Properties) forClass.getValue(), clusterLinkManager2.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), fetcherManager.currentConfig().bootstrapServers());
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
        ((KafkaZkClient) Mockito.verify(zkClient())).getEntityConfigs("cluster-links", javaUUID.toString());
    }

    @Test
    public void testConnectionManager() {
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists((Uuid) ArgumentMatchers.any());
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, ClusterLinkOutboundConnectionManager.class, false);
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Inbound$.MODULE$, None$.MODULE$, ClusterLinkInboundConnectionManager.class, true);
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.SOURCE, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, ClusterLinkOutboundConnectionManager.class, true);
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ConnectionMode$Outbound$.MODULE$, new Some(ConnectionMode$Outbound$.MODULE$), ClusterLinkOutboundConnectionManager.class, false);
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ConnectionMode$Outbound$.MODULE$, new Some(ConnectionMode$Inbound$.MODULE$), ClusterLinkOutboundConnectionManager.class, true);
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, ClusterLinkOutboundConnectionManager.class, false);
        verifyConnectionManager$1(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ConnectionMode$Inbound$.MODULE$, None$.MODULE$, ClusterLinkInboundConnectionManager.class, true);
    }

    @Test
    public void testRetentionConfigSync() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-retention", randomUuid, clusterLinkPersistentProps());
        Assertions.assertFalse(createClusterLink.currentConfig().useIndependentRetention());
        Properties properties = new Properties();
        scala.collection.immutable.Set $plus$plus = MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().$plus$plus((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"retention.ms", "retention.bytes"})));
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), $plus$plus.mkString(","));
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals($plus$plus, createClusterLink.currentConfig().topicConfigSyncRules().include());
        Assertions.assertFalse(createClusterLink.currentConfig().useIndependentRetention());
        scala.collection.immutable.Set $plus$plus2 = MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().$plus$plus((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"segment.index.bytes"})));
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), $plus$plus2.mkString(","));
        ClusterLinkManager clusterLinkManager2 = clusterLinkManager();
        clusterLinkManager2.processClusterLinkChanges(randomUuid, properties, clusterLinkManager2.processClusterLinkChanges$default$3());
        Assertions.assertEquals($plus$plus2, createClusterLink.currentConfig().topicConfigSyncRules().include());
        Assertions.assertTrue(createClusterLink.currentConfig().useIndependentRetention());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
    }

    @Test
    public void testTopicConfigSyncIncludeCompatibility() {
        verifyDefault();
        verifyZKHasMalformedConfigs();
        verifyZKHasUnknownConfigs();
        verifyZKMissesAlwaysConfigs();
        verifyZKHasIndependentConfigs();
    }

    private void verifyDefault() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-default", randomUuid, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(CollectionConverters$.MODULE$.ListHasAsScala(MirrorTopicConfigSyncRules$.MODULE$.SyncIncludeDefault()).asScala().toSet(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
    }

    private void verifyZKHasMalformedConfigs() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-malformed", randomUuid, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(23).append(topicConfigSyncIncludeDefault()).append(",\u001fmin.compaction.lag.ms").toString());
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(CollectionConverters$.MODULE$.ListHasAsScala(MirrorTopicConfigSyncRules$.MODULE$.SyncIncludeDefault()).asScala().toSet(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
    }

    private void verifyZKHasUnknownConfigs() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-unknown", randomUuid, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(21).append(topicConfigSyncIncludeDefault()).append(",unknown.topic.config").toString());
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(CollectionConverters$.MODULE$.ListHasAsScala(MirrorTopicConfigSyncRules$.MODULE$.SyncIncludeDefault()).asScala().toSet(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
    }

    private void verifyZKMissesAlwaysConfigs() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-always", randomUuid, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), "min.compaction.lag.ms");
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().$plus$plus((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"min.compaction.lag.ms"}))), createClusterLink.currentConfig().topicConfigSyncRules().include());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
    }

    private void verifyZKHasIndependentConfigs() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-independent", randomUuid, clusterLinkPersistentProps());
        Properties properties = new Properties();
        String mkString = MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().$plus$plus((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"confluent.tier.enable"}))).mkString(",");
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), mkString);
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        Mockito.reset(new KafkaZkClient[]{zkClient()});
    }

    public ClusterLinkFactory.FetcherManager createClusterLink(String str, Uuid uuid, Properties properties) {
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().updateClusterLinkConfig(str, properties2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$createClusterLink$2(properties2));
            });
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 468));
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(uuid);
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(uuid);
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), new ClusterLinkData(str, uuid, None$.MODULE$, None$.MODULE$, false))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(uuid));
        clusterLinkManager().createClusterLink(new ClusterLinkData(str, uuid, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), properties);
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(uuid);
        return (ClusterLinkFactory.FetcherManager) clusterLinkManager().fetcherManager(uuid).get();
    }

    @Test
    public void testFailedAddClusterLink() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false);
        Assertions.assertTrue(clusterLinkManager().resolveLinkId("testLink").isEmpty());
        Assertions.assertTrue(clusterLinkManager().listClusterLinks().isEmpty());
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        ((KafkaZkClient) Mockito.doThrow(new Throwable[]{new RuntimeException("")}).when(zkClient())).createClusterLink(clusterLinkData);
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().createClusterLink(clusterLinkData, this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }, ClassTag$.MODULE$.apply(RuntimeException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 500));
        Assertions.assertTrue(clusterLinkManager().resolveLinkId("testLink").isEmpty());
        Assertions.assertTrue(clusterLinkManager().listClusterLinks().isEmpty());
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
        ((KafkaZkClient) Mockito.verify(zkClient())).createClusterLink(clusterLinkData);
        Mockito.reset(new KafkaZkClient[]{zkClient()});
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertEquals(new Some(randomUuid), clusterLinkManager().resolveLinkId("testLink"));
        Assertions.assertEquals(new $colon.colon(clusterLinkData, Nil$.MODULE$), clusterLinkManager().listClusterLinks());
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
    }

    @Test
    public void testReconfigureNonRetriableFailure() {
        Uuid randomUuid = Uuid.randomUuid();
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(randomUuid);
        Throwable kafkaException = new KafkaException("Test exception");
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "");
        clusterLinkManager().configEncoder_$eq((ClusterLinkConfigEncoder) Mockito.mock(ClusterLinkConfigEncoder.class));
        ((ClusterLinkConfigEncoder) Mockito.doThrow(new Throwable[]{kafkaException}).when(clusterLinkManager().configEncoder())).encode((Properties) ArgumentMatchers.any());
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(FailedClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
        ((KafkaZkClient) Mockito.verify(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
    }

    @Test
    public void testRetryReconfigureRetriableFailureOnLinkRunning() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false);
        clusterLinkManager().metadataManager().shutdown();
        MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
        clusterLinkManager().metadataManager_$eq((ClusterLinkMetadataManager) Mockito.mock(ClusterLinkMetadataManager.class));
        ((ClusterLinkMetadataManager) Mockito.doReturn(Option$.MODULE$.apply(clusterLinkData), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).getClusterLinkData(randomUuid);
        ((ClusterLinkMetadataManager) Mockito.doReturn(metadataCache, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).metadataCache();
        ((MetadataCache) Mockito.doReturn(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(metadataCache)).metadataVersion();
        ((KafkaController) Mockito.doReturn(controllerContext(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controller())).controllerContext();
        ((ControllerContext) Mockito.doReturn(CollectionConverters$.MODULE$.ConcurrentMapHasAsScala(new ConcurrentHashMap()).asScala(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controllerContext())).linkedTopics();
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "");
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        increaseElapsedTimeAndWaitForRetry$1(2L, "", "after increasing the elapsed time", 0L, randomUuid, "testLink");
        increaseElapsedTimeAndWaitForRetry$1(3L, "", "after increasing the elapsed time", ClusterLinkRetryState$.MODULE$.RETRY_BACKOFF_MAX_MS(), randomUuid, "testLink");
        increaseElapsedTimeAndWaitForRetry$1(4L, "", "after increasing the elapsed time second time", ClusterLinkRetryState$.MODULE$.RETRY_BACKOFF_MAX_MS(), randomUuid, "testLink");
        kafka$server$link$ClusterLinkManagerTest$$time().setCurrentTimeMs(kafka$server$link$ClusterLinkManagerTest$$time().milliseconds() + 1);
        properties.put("bootstrap.servers", "invalid-bootstrap-server:9093");
        ClusterLinkManager clusterLinkManager2 = clusterLinkManager();
        clusterLinkManager2.processClusterLinkChanges(randomUuid, properties, clusterLinkManager2.processClusterLinkChanges$default$3());
        increaseElapsedTimeAndWaitForRetry$1(2L, "invalid-bootstrap-server:9093", "after updating link config", 0L, randomUuid, "testLink");
        increaseElapsedTimeAndWaitForRetry$1(3L, "invalid-bootstrap-server:9093", "after updating link config and increasing elapsed time", ClusterLinkRetryState$.MODULE$.RETRY_BACKOFF_MAX_MS(), randomUuid, "testLink");
        properties.put("bootstrap.servers", "localhost:2222");
        clusterLinkManager().updateLinkPropertiesCache(randomUuid, properties);
        kafka$server$link$ClusterLinkManagerTest$$time().setCurrentTimeMs(kafka$server$link$ClusterLinkManagerTest$$time().milliseconds() + ClusterLinkRetryState$.MODULE$.RETRY_BACKOFF_MAX_MS());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testRetryReconfigureRetriableFailureOnLinkRunning$5(this, randomUuid)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Cluster link retry candidate not removed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(ActiveClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
        Assertions.assertFalse(containsMetric("broker-failed-link-count"));
        Assertions.assertFalse(containsMetric("link-failure-retry-attempts-rate"));
        Assertions.assertFalse(containsMetric("link-failure-retry-attempts-total"));
    }

    private boolean containsMetric(String str) {
        return CollectionConverters$.MODULE$.MapHasAsScala(kafka$server$link$ClusterLinkManagerTest$$metrics().metrics()).asScala().exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$containsMetric$1(str, tuple2));
        });
    }

    @Test
    public void testRetryReconfigureRetriableFailureOnLinkStartup() {
        Uuid randomUuid = Uuid.randomUuid();
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(randomUuid);
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false);
        MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
        ClusterLinkMetadataManager metadataManager = clusterLinkManager().metadataManager();
        clusterLinkManager().metadataManager_$eq((ClusterLinkMetadataManager) Mockito.mock(ClusterLinkMetadataManager.class));
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), clusterLinkData)})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        ((ClusterLinkMetadataManager) Mockito.doReturn(Option$.MODULE$.apply(clusterLinkData), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).getClusterLinkData(randomUuid);
        ((ClusterLinkMetadataManager) Mockito.doReturn(metadataCache, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).metadataCache();
        ((MetadataCache) Mockito.doReturn(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(metadataCache)).metadataVersion();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "");
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, properties, clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(FailedClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
        Assertions.assertEquals(ClusterLinkError.UNRESOLVABLE_BOOTSTRAP_ERROR, ((UnavailableLinkReason) ((ClusterLinkInfo) clusterLinkManager().clusterLinkInfo("testLink").get()).linkStateInfo().unavailableLinkReason().get()).clusterLinkError());
        Assertions.assertTrue(clusterLinkManager().linkPropertiesCache().get(randomUuid).nonEmpty());
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", "localhost:1234");
        clusterLinkManager().updateLinkPropertiesCache(randomUuid, properties2);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testRetryReconfigureRetriableFailureOnLinkStartup$1(this, randomUuid)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Cluster link retry candidate not removed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        Assertions.assertEquals(ActiveClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
        clusterLinkManager().metadataManager_$eq(metadataManager);
    }

    @Test
    public void testDeleteClusterLink() {
        Uuid randomUuid = Uuid.randomUuid();
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(randomUuid);
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, true))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Set().empty(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getAllTopicsInCluster(false);
        ((KafkaController) Mockito.doReturn(controllerContext(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controller())).controllerContext();
        ((ControllerContext) Mockito.doReturn(CollectionConverters$.MODULE$.ConcurrentMapHasAsScala(new ConcurrentHashMap()).asScala(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controllerContext())).linkedTopics();
        Assertions.assertEquals(UnavailableClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), clusterLinkManager().clearMirrorTopics().keySet());
        Assertions.assertEquals(Map$.MODULE$.empty(), (scala.collection.mutable.Map) TestUtils.fieldValue(clusterLinkManager(), ClusterLinkManager.class, "managers"));
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, new Properties(), clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Uuid[]{randomUuid})), clusterLinkManager().clearMirrorTopics().keySet());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDeleteClusterLink$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Cluster link not removed");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    @Test
    public void testControllerListenerMirrorStates() {
        Uuid randomUuid = Uuid.randomUuid();
        UUID javaUUID = CoreUtils$.MODULE$.toJavaUUID(randomUuid);
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(javaUUID), new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{javaUUID})));
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"testTopic"})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getAllTopicsInCluster(false);
        ((KafkaController) Mockito.doReturn(controllerContext(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controller())).controllerContext();
        Uuid randomUuid2 = Uuid.randomUuid();
        ClusterLinkTopicState$PendingStoppedMirror$ clusterLinkTopicState$PendingStoppedMirror$ = ClusterLinkTopicState$PendingStoppedMirror$.MODULE$;
        ClusterLinkTopicState.PendingStoppedMirror pendingStoppedMirror = new ClusterLinkTopicState.PendingStoppedMirror("testLink", randomUuid, randomUuid2, true, Time.SYSTEM.milliseconds());
        ((ControllerContext) Mockito.doReturn(CollectionConverters$.MODULE$.ConcurrentMapHasAsScala(new ConcurrentHashMap()).asScala(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(controllerContext())).linkedTopics();
        controllerContext().linkedTopics().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), pendingStoppedMirror));
        ((ClusterLinkControllerListener) clusterLinkManager().controllerListener().get()).onStateChange("testTopic", None$.MODULE$, new Some(pendingStoppedMirror));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUuid));
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        clusterLinkManager.processClusterLinkChanges(randomUuid, clusterLinkPersistentProps(), clusterLinkManager.processClusterLinkChanges$default$3());
        Assertions.assertEquals(new Some(ClusterLinkDestClientManager.class), clusterLinkManager().clientManager(randomUuid).map(clientManager -> {
            return clientManager.getClass();
        }));
        ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkDestClientManager) clusterLinkManager().clientManager(randomUuid).get();
        Assertions.assertEquals(0, clusterLinkDestClientManager.taskManager().clusterLinkStopMirrors().subTaskCount());
        clusterLinkManager().processControllerMirrorStates(controllerContext().linkedTopics());
        Assertions.assertEquals(1, clusterLinkDestClientManager.taskManager().clusterLinkStopMirrors().subTaskCount());
    }

    @Timeout(20)
    @Test
    public void testListClusterLinksLocking() {
        Object fieldValue = TestUtils.fieldValue(clusterLinkManager(), ClusterLinkManager.class, "updateLock");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Semaphore semaphore = new Semaphore(0);
        try {
            newSingleThreadExecutor.submit(() -> {
                synchronized (fieldValue) {
                    countDownLatch.countDown();
                    semaphore.tryAcquire(30L, TimeUnit.SECONDS);
                }
            }, BoxesRunTime.boxToInteger(0));
            countDownLatch.await(10L, TimeUnit.SECONDS);
            Assertions.assertEquals(package$.MODULE$.Seq().empty(), clusterLinkManager().listClusterLinks());
            semaphore.release();
        } finally {
            newSingleThreadExecutor.shutdownNow();
        }
    }

    @Test
    public void testRetryLocking() {
        clusterLinkManager().shutdown();
        final ClusterLinkQuotas unboundedClusterLinkQuotas = ClusterLinkQuotas$.MODULE$.unboundedClusterLinkQuotas();
        clusterLinkManager_$eq(new ClusterLinkManager(this, unboundedClusterLinkQuotas) { // from class: kafka.server.link.ClusterLinkManagerTest$$anon$2
            private final /* synthetic */ ClusterLinkManagerTest $outer;

            public void processClusterLinkChanges(Uuid uuid, Properties properties, boolean z) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("processClusterLinkChanges");
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                KafkaConfig kafka$server$link$ClusterLinkManagerTest$$brokerConfig = this.kafka$server$link$ClusterLinkManagerTest$$brokerConfig();
                Metrics kafka$server$link$ClusterLinkManagerTest$$metrics = this.kafka$server$link$ClusterLinkManagerTest$$metrics();
                MockTime kafka$server$link$ClusterLinkManagerTest$$time = this.kafka$server$link$ClusterLinkManagerTest$$time();
                None$ none$ = None$.MODULE$;
            }
        });
        ClusterLinkData clusterLinkData = new ClusterLinkData("link1", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false);
        clusterLinkManager().linkPropertiesCache().put(clusterLinkData.linkId(), clusterLinkPersistentProps());
        clusterLinkManager().linkRetryCandidates().put(clusterLinkData.linkId(), new ClusterLinkRetryState(clusterLinkManager(), clusterLinkManager().scheduler(clusterLinkData.linkName(), None$.MODULE$), clusterLinkData, unboundedClusterLinkQuotas, kafka$server$link$ClusterLinkManagerTest$$time(), new ClusterLinkFailed.FailedClusterLinkMetrics(clusterLinkData.linkName(), kafka$server$link$ClusterLinkManagerTest$$metrics(), None$.MODULE$, UnauthorizedBootstrapFailedLinkReason$.MODULE$)));
        clusterLinkManager().retryConfigUpdate(clusterLinkData.linkId(), kafka$server$link$ClusterLinkManagerTest$$time().milliseconds());
    }

    @Test
    public void testNonOptimizedNonBlockingLocalAdmin() {
        verifyNonBlockingLocalAdmin(false);
    }

    @Test
    public void testOptimizedNonBlockingLocalAdmin() {
        verifyNonBlockingLocalAdmin(true);
    }

    public void verifyNonBlockingLocalAdmin(boolean z) {
        clusterLinkManager().shutdown();
        final ClusterLinkQuotas unboundedClusterLinkQuotas = ClusterLinkQuotas$.MODULE$.unboundedClusterLinkQuotas();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig.put("confluent.cluster.link.enable.local.admin", Boolean.toString(z));
        final KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        final AtomicReference atomicReference = new AtomicReference();
        final Buffer apply = Buffer$.MODULE$.apply(Nil$.MODULE$);
        clusterLinkManager_$eq(new ClusterLinkManager(this, fromProps, unboundedClusterLinkQuotas, atomicReference, apply) { // from class: kafka.server.link.ClusterLinkManagerTest$$anon$3
            private final /* synthetic */ ClusterLinkManagerTest $outer;
            private final AtomicReference lastAdmin$1;
            private final Buffer adminIndices$1;

            public ClusterLinkLocalAdmin createLocalAdmin(ClusterLinkScheduler clusterLinkScheduler, int i) {
                ClusterLinkLocalAdmin clusterLinkLocalAdmin = (ClusterLinkLocalAdmin) Mockito.mock(ClusterLinkLocalAdmin.class);
                this.lastAdmin$1.set(clusterLinkLocalAdmin);
                this.adminIndices$1.$plus$eq(BoxesRunTime.boxToInteger(i));
                return clusterLinkLocalAdmin;
            }

            public void processClusterLinkChanges(Uuid uuid, Properties properties, boolean z2) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("processClusterLinkChanges");
                super.processClusterLinkChanges(uuid, properties, z2);
            }

            public void createClusterLink(ClusterLinkData clusterLinkData, ClusterLinkConfig clusterLinkConfig, Properties properties) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("createClusterLink");
                super.createClusterLink(clusterLinkData, clusterLinkConfig, properties);
            }

            public void updateClusterLinkConfig(String str, Function1<Properties, Object> function1) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("updateClusterLinkConfig");
                super.updateClusterLinkConfig(str, function1);
            }

            public void deleteClusterLink(String str, Uuid uuid) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("deleteClusterLink");
                super.deleteClusterLink(str, uuid);
            }

            public void reportUnavailableLink(String str, UnavailableLinkReason unavailableLinkReason) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("reportUnavailableLink");
                super.reportUnavailableLink(str, unavailableLinkReason);
            }

            public void reportAvailableLink(String str) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("reportAvailableLink");
                super.reportAvailableLink(str);
            }

            public void purgeClusterLink(Uuid uuid, String str, boolean z2) {
                this.$outer.kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod("purgeClusterLink");
                super.purgeClusterLink(uuid, str, z2);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.lastAdmin$1 = atomicReference;
                this.adminIndices$1 = apply;
                Metrics kafka$server$link$ClusterLinkManagerTest$$metrics = this.kafka$server$link$ClusterLinkManagerTest$$metrics();
                MockTime kafka$server$link$ClusterLinkManagerTest$$time = this.kafka$server$link$ClusterLinkManagerTest$$time();
                None$ none$5 = None$.MODULE$;
            }
        });
        startClusterLinkManager(clusterLinkManager(), zkSupport());
        Assertions.assertNull(atomicReference.getAndSet(null));
        ClusterLinkLocalAdmin sharedLocalAdmin = clusterLinkManager().sharedLocalAdmin();
        Assertions.assertNotNull(sharedLocalAdmin);
        Assertions.assertSame(sharedLocalAdmin, atomicReference.getAndSet(null));
        Assertions.assertSame(sharedLocalAdmin, clusterLinkManager().sharedLocalAdmin());
        Assertions.assertNull(atomicReference.getAndSet(null));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference2 = new AtomicReference();
        clusterLinkManager().withNonBlockingLocalAdmin("name", clusterLinkManager().taskManager("", None$.MODULE$), clusterLinkLocalAdmin -> {
            atomicReference2.set(clusterLinkLocalAdmin);
            return KafkaFuture.completedFuture((Object) null);
        }, kafkaFuture -> {
            countDownLatch.countDown();
            return BoxedUnit.UNIT;
        });
        countDownLatch.await(10L, TimeUnit.SECONDS);
        ClusterLinkLocalAdmin clusterLinkLocalAdmin2 = (ClusterLinkLocalAdmin) atomicReference2.get();
        Assertions.assertNotNull(clusterLinkLocalAdmin2);
        if (z) {
            Assertions.assertNull(atomicReference.get());
            Assertions.assertSame(sharedLocalAdmin, clusterLinkLocalAdmin2);
        } else {
            ClusterLinkLocalAdmin clusterLinkLocalAdmin3 = (ClusterLinkLocalAdmin) atomicReference.getAndSet(null);
            Assertions.assertNotNull(clusterLinkLocalAdmin3);
            Assertions.assertSame(clusterLinkLocalAdmin3, clusterLinkLocalAdmin2);
            Assertions.assertNotSame(sharedLocalAdmin, clusterLinkLocalAdmin2);
        }
        clusterLinkManager().withNonBlockingLocalAdmin("name", clusterLinkManager().taskManager("", None$.MODULE$), clusterLinkLocalAdmin4 -> {
            return KafkaFuture.completedFuture((Object) null);
        }, kafkaFuture2 -> {
            $anonfun$verifyNonBlockingLocalAdmin$4(kafkaFuture2);
            return BoxedUnit.UNIT;
        });
        if (z) {
            Assertions.assertEquals(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})), apply.toSeq());
        } else {
            Assertions.assertEquals(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0, 2147483646, 2147483645})), apply.toSeq());
        }
    }

    @Test
    public void testTemporaryAdminIsClosed() {
        ClusterLinkLocalAdmin clusterLinkLocalAdmin = (ClusterLinkLocalAdmin) Mockito.mock(ClusterLinkLocalAdmin.class);
        KafkaFuture.completedFuture((Object) null);
        ClusterLinkManager$ clusterLinkManager$ = ClusterLinkManager$.MODULE$;
        clusterLinkLocalAdmin.close(Duration.ZERO);
        ((ClusterLinkLocalAdmin) Mockito.verify(clusterLinkLocalAdmin, Mockito.times(1))).close((Duration) ArgumentMatchers.any());
    }

    @Test
    public void testNonTemporaryAdminIsNotClosed() {
        ClusterLinkLocalAdmin clusterLinkLocalAdmin = (ClusterLinkLocalAdmin) Mockito.mock(ClusterLinkLocalAdmin.class);
        KafkaFuture.completedFuture((Object) null);
        ClusterLinkManager$ clusterLinkManager$ = ClusterLinkManager$.MODULE$;
        ((ClusterLinkLocalAdmin) Mockito.verify(clusterLinkLocalAdmin, Mockito.never())).close((Duration) ArgumentMatchers.any());
    }

    @Test
    public void testTemporaryAdminIndex() {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(i -> {
            RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp(i -> {
                Assertions.assertEquals(Integer.MAX_VALUE - i, this.clusterLinkManager().temporaryAdminIndex(2147483637));
            });
            Assertions.assertEquals(Integer.MAX_VALUE, this.clusterLinkManager().temporaryAdminIndex(2147483637));
        });
    }

    @Test
    public void testDynamicFetchSize() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        FetchResponseSize fetchResponseSize = new FetchResponseSize(Predef$.MODULE$.Integer2int(clusterLinkConfig().replicaFetchMaxBytes()), Predef$.MODULE$.Integer2int(clusterLinkConfig().replicaFetchResponseMaxBytes()));
        Assertions.assertEquals(fetchResponseSize, clusterLinkManager().fetchResponseSize(clusterLinkConfig()));
        Uuid randomUuid = Uuid.randomUuid();
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        clusterLinkManager().createClusterLink(new ClusterLinkData("link", randomUuid, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), clusterLinkPersistentProps());
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) clusterLinkManager().fetcherManager(randomUuid).get();
        Assertions.assertEquals(0, clusterLinkFetcherManager.fetcherCount());
        verifyFetchSize$1(fetchResponseSize, createBrokerConfig);
        ((KafkaZkClient) Mockito.verify(zkClient())).clusterLinkExists(randomUuid);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", "10000");
        verifyFetchSize$1(new FetchResponseSize(5000, 10000), createBrokerConfig);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.min.bytes", "6000");
        verifyFetchSize$1(new FetchResponseSize(6000, 10000), createBrokerConfig);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", Integer.toString(Integer.MAX_VALUE));
        verifyFetchSize$1(fetchResponseSize, createBrokerConfig);
        ClusterLinkFetcherThread clusterLinkFetcherThread = (ClusterLinkFetcherThread) Mockito.mock(ClusterLinkFetcherThread.class);
        IntRef create = IntRef.create(0);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", Integer.toString(Predef$.MODULE$.Integer2int(clusterLinkConfig().replicaFetchResponseMaxBytes()) * 6));
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp(i -> {
            create.elem++;
            clusterLinkFetcherManager.fetcherThreadMap().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new FetcherTag(0, create.elem, FetcherPool$Default$.MODULE$)), clusterLinkFetcherThread));
            this.verifyFetchSize$1(new FetchResponseSize(Predef$.MODULE$.Integer2int(this.clusterLinkConfig().replicaFetchMaxBytes()), Predef$.MODULE$.Integer2int(this.clusterLinkConfig().replicaFetchResponseMaxBytes())), createBrokerConfig);
        });
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", "10000");
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.min.bytes", "1");
        verifyFetchSize$1(new FetchResponseSize(1000, 2000), createBrokerConfig);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp(i2 -> {
            addFetcherThread$1(create, clusterLinkFetcherManager, clusterLinkFetcherThread);
        });
        verifyFetchSize$1(new FetchResponseSize(500, 1000), createBrokerConfig);
        clusterLinkFetcherManager.fetcherThreadMap().clear();
        verifyFetchSize$1(new FetchResponseSize(5000, 10000), createBrokerConfig);
    }

    @Test
    public void testClusterLinkConfigReencryption() {
        UUID randomUUID = UUID.randomUUID();
        Throwable kafkaException = new KafkaException("Test exception");
        setupZkClient$1(randomUUID, "link1");
        clusterLinkManager().shutdown();
        ((KafkaZkClient) Mockito.doThrow(new Throwable[]{kafkaException}).when(zkClient())).transformEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(randomUUID.toString()), (Function1) ArgumentMatchers.any());
        ClusterLinkManager recreateClusterLinkManager$1 = recreateClusterLinkManager$1(Long.MAX_VALUE);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$1(recreateClusterLinkManager$1)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Retry not scheduled after failure");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        recreateClusterLinkManager$1.shutdown();
        setupZkClient$1(randomUUID, "link1");
        ((KafkaZkClient) Mockito.doNothing().when(zkClient())).transformEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(randomUUID.toString()), (Function1) ArgumentMatchers.any());
        ClusterLinkManager recreateClusterLinkManager$12 = recreateClusterLinkManager$1(Long.MAX_VALUE);
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$3(recreateClusterLinkManager$12)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("Unnecessary retry scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        recreateClusterLinkManager$12.shutdown();
        setupZkClient$1(randomUUID, "link1");
        ((KafkaZkClient) Mockito.doNothing().when(zkClient())).transformEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(randomUUID.toString()), (Function1) ArgumentMatchers.any());
        ClusterLinkManager recreateClusterLinkManager$13 = recreateClusterLinkManager$1(30000L);
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$5(recreateClusterLinkManager$13)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                Assertions.fail("Old encoder delete not scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        recreateClusterLinkManager$13.shutdown();
        setupZkClient$1(randomUUID, "link1");
        ((KafkaZkClient) Mockito.doNothing().when(zkClient())).transformEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(randomUUID.toString()), (Function1) ArgumentMatchers.any());
        ClusterLinkManager recreateClusterLinkManager$14 = recreateClusterLinkManager$1(1L);
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$7(recreateClusterLinkManager$14)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + 15000) {
                Assertions.fail("Unnecessary delete retry scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        recreateClusterLinkManager$14.shutdown();
        setupZkClient$1(randomUUID, "link1");
        ((KafkaZkClient) Mockito.doNothing().doThrow(new Throwable[]{kafkaException}).when(zkClient())).transformEntityConfigs((String) ArgumentMatchers.eq("cluster-links"), (String) ArgumentMatchers.eq(randomUUID.toString()), (Function1) ArgumentMatchers.any());
        ClusterLinkManager recreateClusterLinkManager$15 = recreateClusterLinkManager$1(1L);
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        long currentTimeMillis5 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$9(recreateClusterLinkManager$15)) {
            if (System.currentTimeMillis() > currentTimeMillis5 + 15000) {
                Assertions.fail("Delete retry not scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        recreateClusterLinkManager$15.shutdown();
    }

    @Test
    public void testSchedulerWithLinkAffinity() {
        verifyLinkScheduler(ConfluentConfigs.ClusterLinkThreadAffinity.LINK, (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link1", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link2", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link3", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link4", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link5", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(0))})));
    }

    @Test
    public void testMultiTenantSchedulerWithLinkAffinity() {
        verifyLinkScheduler(ConfluentConfigs.ClusterLinkThreadAffinity.LINK, (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link1", Uuid.randomUuid(), None$.MODULE$, new Some("tenant1_"), false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link2", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link3", Uuid.randomUuid(), None$.MODULE$, new Some("tenant2"), false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link4", Uuid.randomUuid(), None$.MODULE$, new Some("tenant3"), false)), BoxesRunTime.boxToInteger(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("link5", Uuid.randomUuid(), None$.MODULE$, new Some("tenant1"), false)), BoxesRunTime.boxToInteger(0))})));
    }

    @Test
    public void testMultiTenantSchedulerWithTenantAffinity() {
        verifyLinkScheduler(ConfluentConfigs.ClusterLinkThreadAffinity.TENANT, (scala.collection.immutable.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant1_link1", Uuid.randomUuid(), None$.MODULE$, new Some("tenant1_"), false)), BoxesRunTime.boxToInteger(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant2_link2", Uuid.randomUuid(), None$.MODULE$, None$.MODULE$, false)), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant2_link3", Uuid.randomUuid(), None$.MODULE$, new Some("tenant2_"), false)), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant3_link4", Uuid.randomUuid(), None$.MODULE$, new Some("tenant3_"), false)), BoxesRunTime.boxToInteger(1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant1_link5", Uuid.randomUuid(), None$.MODULE$, new Some("tenant1_"), false)), BoxesRunTime.boxToInteger(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant1_link6", Uuid.randomUuid(), None$.MODULE$, new Some("tenant1_"), false)), BoxesRunTime.boxToInteger(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new ClusterLinkData("tenant1_link7", Uuid.randomUuid(), None$.MODULE$, new Some("tenant1_"), false)), BoxesRunTime.boxToInteger(2))})));
    }

    private void verifyLinkScheduler(ConfluentConfigs.ClusterLinkThreadAffinity clusterLinkThreadAffinity, scala.collection.immutable.Map<ClusterLinkData, Object> map) {
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig.put("confluent.cluster.link.num.background.threads", "3");
        createBrokerConfig.put("confluent.cluster.link.background.thread.affinity", clusterLinkThreadAffinity.name());
        clusterLinkManager_$eq(new ClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), "clusterId", ClusterLinkQuotas$.MODULE$.unboundedClusterLinkQuotas(), kafka$server$link$ClusterLinkManagerTest$$metrics(), kafka$server$link$ClusterLinkManagerTest$$time(), false, None$.MODULE$));
        startClusterLinkManager(clusterLinkManager(), zkSupport());
        Buffer apply = Buffer$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 0, 0}));
        Buffer apply2 = Buffer$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new scala.collection.mutable.Set[]{(scala.collection.mutable.Set) Set$.MODULE$.apply(Nil$.MODULE$), (scala.collection.mutable.Set) Set$.MODULE$.apply(Nil$.MODULE$), (scala.collection.mutable.Set) Set$.MODULE$.apply(Nil$.MODULE$)}));
        Assertions.assertEquals(apply, actualThreadUsage$1());
        Assertions.assertEquals(((IterableOnceOps) apply2.map(set -> {
            return BoxesRunTime.boxToInteger(set.size());
        })).toSeq(), actualThreadTenants$1());
        ClusterLinkManager.Managers managers = (ClusterLinkManager.Managers) Mockito.mock(ClusterLinkManager.Managers.class);
        ClusterLinkOutboundConnectionManager clusterLinkOutboundConnectionManager = (ClusterLinkOutboundConnectionManager) Mockito.mock(ClusterLinkOutboundConnectionManager.class);
        ((ClusterLinkManager.Managers) Mockito.doReturn(clusterLinkOutboundConnectionManager, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(managers)).connectionManager();
        ((ClusterLinkConnectionManager) Mockito.doReturn(clusterLinkConfig(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkOutboundConnectionManager)).currentConfig();
        map.foreach(tuple2 -> {
            $anonfun$verifyLinkScheduler$4(this, managers, apply, apply2, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testLeaderAndIsrBeforeLinkUpdate() {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false);
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, new Some(randomUuid));
        Mockito.reset(new ReplicaManager[]{replicaManager()});
        ((ReplicaManager) Mockito.doReturn(metadataCache(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(replicaManager())).metadataCache();
        ((ReplicaManager) Mockito.doReturn(new Some(zkClient()), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(replicaManager())).zkClient();
        ((ReplicaManager) Mockito.doReturn(((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))).iterator(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(replicaManager())).leaderPartitionsIterator();
        Assertions.assertEquals(0, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) Mockito.mock(ClusterLinkFetcherManager.class);
        ((ClusterLinkFetcherManager) Mockito.doNothing().when(clusterLinkFetcherManager)).addLinkedFetcherForPartitions((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition})));
        ((ClusterLinkFetcherManager) Mockito.doNothing().when(clusterLinkFetcherManager)).shutdown();
        ((ClusterLinkFetcherManager) Mockito.doReturn(package$.MODULE$.Seq().empty(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkFetcherManager)).lazyResources();
        ClusterLinkOutboundConnectionManager clusterLinkOutboundConnectionManager = (ClusterLinkOutboundConnectionManager) Mockito.mock(ClusterLinkOutboundConnectionManager.class);
        ((ClusterLinkConnectionManager) Mockito.doReturn(clusterLinkConfig(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkOutboundConnectionManager)).currentConfig();
        ((ClusterLinkOutboundConnectionManager) Mockito.doReturn(package$.MODULE$.Seq().empty(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkOutboundConnectionManager)).lazyResources();
        ((ClusterLinkConnectionManager) Mockito.doReturn(new ClusterLinkData("testLink", randomUuid, new Some("testClusterId"), None$.MODULE$, false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkOutboundConnectionManager)).linkData();
        ((ClusterLinkConnectionManager) Mockito.doNothing().when(clusterLinkOutboundConnectionManager)).shutdown();
        ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkDestClientManager) Mockito.mock(ClusterLinkDestClientManager.class);
        ((ClusterLinkDestClientManager) Mockito.doNothing().when(clusterLinkDestClientManager)).shutdown();
        clusterLinkManager().commitAddClusterLink(clusterLinkData, new ClusterLinkManager.Managers(new Some(clusterLinkFetcherManager), clusterLinkDestClientManager, clusterLinkOutboundConnectionManager, (ClusterLinkFactory.LinkMetrics) null), ClusterLinkConfig.LinkMode.DESTINATION);
        ((ClusterLinkFetcherManager) Mockito.verify(clusterLinkFetcherManager)).addLinkedFetcherForPartitions((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition})));
    }

    @Test
    public void testAddRemovePartitionsLinkCoordinatorDisabled() {
        verifyAddRemovePartitions(false, false);
    }

    @Test
    public void testAddRemovePartitionsLinkCoordinator() {
        verifyAddRemovePartitions(true, true);
    }

    @Test
    public void testAddRemovePartitionsNotLinkCoordinator() {
        verifyAddRemovePartitions(true, false);
    }

    private void verifyAddRemovePartitions(boolean z, boolean z2) {
        Uuid randomUuid = Uuid.randomUuid();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUuid, new Some("remoteClusterId"), None$.MODULE$, false);
        ((KafkaZkClient) Mockito.doReturn(BoxesRunTime.boxToBoolean(false), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).clusterLinkExists(randomUuid);
        clusterLinkManager().metadataManager().shutdown();
        MetadataCache metadataCache = (MetadataCache) Mockito.mock(MetadataCache.class);
        clusterLinkManager().metadataManager_$eq((ClusterLinkMetadataManager) Mockito.mock(ClusterLinkMetadataManager.class));
        ((ClusterLinkMetadataManager) Mockito.doReturn(BoxesRunTime.boxToBoolean(z), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).isLinkCoordinatorEnabled();
        ((ClusterLinkMetadataManager) Mockito.doReturn(BoxesRunTime.boxToBoolean(z2), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).isLinkCoordinator("testLink");
        ((ClusterLinkMetadataManager) Mockito.doReturn(Option$.MODULE$.apply(clusterLinkData), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).getClusterLinkData(randomUuid);
        ((ClusterLinkMetadataManager) Mockito.doReturn(metadataCache, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).metadataCache();
        ClusterLinkTopicState clusterLinkTopicState = (ClusterLinkTopicState) Mockito.mock(ClusterLinkTopicState.class);
        ((ClusterLinkTopicState) Mockito.doReturn(TopicLinkMirror$.MODULE$, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkTopicState)).state();
        ((ClusterLinkMetadataManager) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("testTopic"), clusterLinkTopicState), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("anotherTopic"), clusterLinkTopicState)})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).mirrorTopicStatesFromMetadataCache(randomUuid);
        ((MetadataCache) Mockito.doReturn(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(metadataCache)).metadataVersion();
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertEquals(new $colon.colon(clusterLinkData, Nil$.MODULE$), clusterLinkManager().listClusterLinks());
        ClusterLinkDestClientManager clusterLinkDestClientManager = (ClusterLinkDestClientManager) clusterLinkManager().clientManager(randomUuid).get();
        scala.collection.immutable.Set empty = z ? z2 ? (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"testTopic", "anotherTopic"})) : Predef$.MODULE$.Set().empty() : (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"testTopic"}));
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        setupMock(partition, topicPartition, new Some(randomUuid));
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        Assertions.assertEquals(empty, clusterLinkDestClientManager.getTopics());
        TopicPartition topicPartition2 = new TopicPartition("anotherTopic", 1);
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        setupMock(partition2, topicPartition2, new Some(randomUuid));
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition2}))));
        Assertions.assertEquals(empty, clusterLinkDestClientManager.getTopics());
        TopicPartition topicPartition3 = new TopicPartition("anotherLinkTopic", 0);
        Partition partition3 = (Partition) Mockito.mock(Partition.class);
        setupMock(partition3, topicPartition3, new Some(Uuid.randomUuid()));
        Assertions.assertEquals(0, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition3}))));
        Assertions.assertEquals(empty, clusterLinkDestClientManager.getTopics());
        clusterLinkManager().removePartitionsAndMetadata((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition2})));
        Assertions.assertEquals(empty, clusterLinkDestClientManager.getTopics());
        clusterLinkManager().removePartitionsAndMetadata((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        if (z2) {
            Assertions.assertEquals(empty, clusterLinkDestClientManager.getTopics());
            ((ClusterLinkMetadataManager) Mockito.doReturn(Predef$.MODULE$.Map().empty(), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(clusterLinkManager().metadataManager())).mirrorTopicStatesFromMetadataCache(randomUuid);
        }
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), clusterLinkDestClientManager.getTopics());
    }

    private KafkaConfig createBrokerConfig() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public long getClusterLinkFailedAttempts(ClusterLinkManager clusterLinkManager, Uuid uuid) {
        Some some = clusterLinkManager.linkRetryCandidates().get(uuid);
        if (some instanceof Some) {
            return ((ClusterLinkRetryState) some.value()).getFailedAttempts();
        }
        if (None$.MODULE$.equals(some)) {
            return 0L;
        }
        throw new MatchError(some);
    }

    private Properties clusterLinkPersistentProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        return properties;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        Properties clusterLinkPersistentProps = clusterLinkPersistentProps();
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        return clusterLinkConfig$.create(clusterLinkPersistentProps, none$, true);
    }

    private void setupMock(Partition partition, TopicPartition topicPartition, Option<Uuid> option) {
        Mockito.reset(new Partition[]{partition});
        ((Partition) Mockito.doReturn(topicPartition, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(partition)).topicPartition();
        ((Partition) Mockito.doReturn(option, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(partition)).getClusterLinkId();
        ((Partition) Mockito.doReturn(BoxesRunTime.boxToBoolean(option.nonEmpty()), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(partition)).isActiveLinkDestinationLeader();
        ((Partition) Mockito.doReturn(new Some(BoxesRunTime.boxToInteger(1)), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(partition)).getLinkedLeaderEpoch();
        PartitionState partitionState = (PartitionState) Mockito.mock(PartitionState.class);
        ((PartitionState) Mockito.doReturn(option.map(uuid -> {
            return new ClusterLinkState(uuid, TopicLinkMirror$.MODULE$, (PartitionLinkState) null);
        }), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(partitionState)).clusterLink();
        ((Partition) Mockito.doReturn(partitionState, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(partition)).partitionState();
    }

    private ClusterLinkManager createClusterLinkManager(KafkaConfig kafkaConfig, MetadataSupport metadataSupport) {
        boolean z = ConfluentConfigs.buildMultitenantMetadata(kafkaConfig.values(), kafka$server$link$ClusterLinkManagerTest$$metrics()) != null;
        ClusterLinkFactory$ clusterLinkFactory$ = ClusterLinkFactory$.MODULE$;
        ClusterLinkQuotas unboundedClusterLinkQuotas = ClusterLinkQuotas$.MODULE$.unboundedClusterLinkQuotas();
        Metrics kafka$server$link$ClusterLinkManagerTest$$metrics = kafka$server$link$ClusterLinkManagerTest$$metrics();
        MockTime kafka$server$link$ClusterLinkManagerTest$$time = kafka$server$link$ClusterLinkManagerTest$$time();
        ClusterLinkFactory$ clusterLinkFactory$2 = ClusterLinkFactory$.MODULE$;
        ClusterLinkManager createLinkManager = clusterLinkFactory$.createLinkManager(kafkaConfig, "clusterId", unboundedClusterLinkQuotas, kafka$server$link$ClusterLinkManagerTest$$metrics, kafka$server$link$ClusterLinkManagerTest$$time, z, None$.MODULE$);
        startClusterLinkManager(createLinkManager, metadataSupport);
        return createLinkManager;
    }

    private void startClusterLinkManager(ClusterLinkFactory.LinkManager linkManager, MetadataSupport metadataSupport) {
        Endpoint endpoint = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 1234);
        AuthorizerServerInfo authorizerServerInfo = (AuthorizerServerInfo) Mockito.mock(AuthorizerServerInfo.class);
        ((AuthorizerServerInfo) Mockito.doReturn(endpoint, ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(authorizerServerInfo)).interBrokerEndpoint();
        ((AuthorizerServerInfo) Mockito.doReturn(Collections.singleton(endpoint), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(authorizerServerInfo)).endpoints();
        SocketServer socketServer = (SocketServer) Mockito.mock(SocketServer.class);
        ((SocketServer) Mockito.doReturn(BoxesRunTime.boxToInteger(1234), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(socketServer)).boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
        linkManager.initialize(authorizerServerInfo, socketServer, None$.MODULE$, metadataSupport, (NodeToControllerChannelManager) null, replicaManager(), (GroupCoordinator) null, None$.MODULE$, None$.MODULE$);
        linkManager.startup(true);
    }

    public void kafka$server$link$ClusterLinkManagerTest$$ensureLockNotAcquiredBeforeInvokingMethod(String str) {
        Assertions.assertFalse(Thread.holdsLock(TestUtils.fieldValue(clusterLinkManager(), ClusterLinkManager.class, "lock")), new StringBuilder(53).append("Invalid locking order, `lock` is held while invoking ").append(str).toString());
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinks$4(ClusterLinkManagerTest clusterLinkManagerTest, Uuid uuid) {
        return clusterLinkManagerTest.clusterLinkManager().fetcherManager(uuid).isEmpty() && clusterLinkManagerTest.clusterLinkManager().clientManager(uuid).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testClusterLinks$5(Uuid uuid) {
        return new StringBuilder(38).append("Linked fetcher/client for ").append(uuid).append(" not removed").toString();
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$2(Properties properties) {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$3(Properties properties) {
        properties.put("bootstrap.servers", "localhost:1234");
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$4(Properties properties) {
        properties.put("bootstrap.servers", "localhost:1234");
        return true;
    }

    private final void verifyConnectionManager$1(ClusterLinkConfig.LinkMode linkMode, ConnectionMode connectionMode, Option option, Class cls, boolean z) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        properties.put(ClusterLinkConfig$.MODULE$.LinkModeProp(), linkMode.name());
        properties.put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        option.foreach(connectionMode2 -> {
            return properties.put(ClusterLinkConfig$.MODULE$.RemoteLinkConnectionModeProp(), connectionMode2.name());
        });
        properties.put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "PLAINTEXT");
        ClusterLinkData clusterLinkData = new ClusterLinkData(new StringBuilder(7).append("link-").append(linkMode).append("-").append(connectionMode).append("-").append(option).toString(), Uuid.randomUuid(), new Some("testClusterId"), None$.MODULE$, false);
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        clusterLinkManager.createClusterLink(clusterLinkData, clusterLinkConfig$.create(properties, none$, true), properties);
        ClusterLinkConnectionManager clusterLinkConnectionManager = (ClusterLinkFactory.ConnectionManager) clusterLinkManager().connectionManager(clusterLinkData.linkId()).get();
        Assertions.assertEquals(cls, clusterLinkConnectionManager.getClass());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(clusterLinkConnectionManager.reverseConnectionEnabled()));
    }

    public static final /* synthetic */ boolean $anonfun$createClusterLink$2(Properties properties) {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testRetryReconfigureRetriableFailureOnLinkRunning$1(ClusterLinkManagerTest clusterLinkManagerTest, LongRef longRef, Uuid uuid, long j) {
        longRef.elem = clusterLinkManagerTest.getClusterLinkFailedAttempts(clusterLinkManagerTest.clusterLinkManager(), uuid);
        return longRef.elem == j;
    }

    public static final /* synthetic */ boolean $anonfun$testRetryReconfigureRetriableFailureOnLinkRunning$3(ClusterLinkManagerTest clusterLinkManagerTest, LongRef longRef, String str) {
        longRef.elem = ((ClusterLinkInfo) clusterLinkManagerTest.clusterLinkManager().clusterLinkInfo(str).get()).linkStateInfo().stateMs();
        return longRef.elem == clusterLinkManagerTest.kafka$server$link$ClusterLinkManagerTest$$time().milliseconds();
    }

    private final void increaseElapsedTimeAndWaitForRetry$1(long j, String str, String str2, long j2, Uuid uuid, String str3) {
        kafka$server$link$ClusterLinkManagerTest$$time().setCurrentTimeMs(kafka$server$link$ClusterLinkManagerTest$$time().milliseconds() + j2);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long clusterLinkFailedAttempts = getClusterLinkFailedAttempts(clusterLinkManager(), uuid);
            if (clusterLinkFailedAttempts == j) {
                break;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail(new StringBuilder(81).append("processing cluster link config retry count ").append(clusterLinkFailedAttempts).append(" doesn't match expected value of ").append(j).append(" for ").append(str2).toString());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            long stateMs = ((ClusterLinkInfo) clusterLinkManager().clusterLinkInfo(str3).get()).linkStateInfo().stateMs();
            if (stateMs == kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()) {
                Assertions.assertEquals(FailedClusterLink$.MODULE$, clusterLinkManager().linkState(str3), String.valueOf(str2));
                Assertions.assertEquals(ClusterLinkError.UNRESOLVABLE_BOOTSTRAP_ERROR, ((UnavailableLinkReason) ((ClusterLinkInfo) clusterLinkManager().clusterLinkInfo(str3).get()).linkStateInfo().unavailableLinkReason().get()).clusterLinkError(), String.valueOf(str2));
                Option option = clusterLinkManager().linkPropertiesCache().get(uuid);
                Assertions.assertTrue(option.isDefined(), String.valueOf(str2));
                Assertions.assertEquals(str, ((Properties) option.get()).getProperty("bootstrap.servers"), String.valueOf(str2));
                Assertions.assertTrue(containsMetric("broker-failed-link-count"));
                Assertions.assertTrue(containsMetric("link-failure-retry-attempts-rate"));
                Assertions.assertTrue(containsMetric("link-failure-retry-attempts-total"));
                return;
            }
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail(new StringBuilder(31).append("expected stateMs is ").append(kafka$server$link$ClusterLinkManagerTest$$time().milliseconds()).append(", got ").append(stateMs).append(" for ").append(str2).toString());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testRetryReconfigureRetriableFailureOnLinkRunning$5(ClusterLinkManagerTest clusterLinkManagerTest, Uuid uuid) {
        return clusterLinkManagerTest.clusterLinkManager().linkRetryCandidates().get(uuid).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testRetryReconfigureRetriableFailureOnLinkRunning$6() {
        return "Cluster link retry candidate not removed";
    }

    public static final /* synthetic */ boolean $anonfun$containsMetric$1(String str, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String name = ((MetricName) tuple2._1()).name();
        return name == null ? str == null : name.equals(str);
    }

    public static final /* synthetic */ boolean $anonfun$testRetryReconfigureRetriableFailureOnLinkStartup$1(ClusterLinkManagerTest clusterLinkManagerTest, Uuid uuid) {
        return clusterLinkManagerTest.clusterLinkManager().linkRetryCandidates().get(uuid).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testRetryReconfigureRetriableFailureOnLinkStartup$2() {
        return "Cluster link retry candidate not removed";
    }

    public static final /* synthetic */ boolean $anonfun$testDeleteClusterLink$1(ClusterLinkManagerTest clusterLinkManagerTest) {
        return clusterLinkManagerTest.clusterLinkManager().clearMirrorTopics().isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testDeleteClusterLink$2() {
        return "Cluster link not removed";
    }

    public static final /* synthetic */ void $anonfun$verifyNonBlockingLocalAdmin$4(KafkaFuture kafkaFuture) {
    }

    public static final /* synthetic */ void $anonfun$testTemporaryAdminIsClosed$1(KafkaFuture kafkaFuture) {
    }

    public static final /* synthetic */ void $anonfun$testNonTemporaryAdminIsNotClosed$1(KafkaFuture kafkaFuture) {
    }

    private final void verifyFetchSize$1(FetchResponseSize fetchResponseSize, Properties properties) {
        kafka$server$link$ClusterLinkManagerTest$$brokerConfig().updateCurrentConfig(KafkaConfig$.MODULE$.fromProps(properties));
        clusterLinkManager().updateDynamicFetchSize();
        Assertions.assertEquals(fetchResponseSize, clusterLinkManager().fetchResponseSize(clusterLinkConfig()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void addFetcherThread$1(IntRef intRef, ClusterLinkFetcherManager clusterLinkFetcherManager, ClusterLinkFetcherThread clusterLinkFetcherThread) {
        intRef.elem++;
        clusterLinkFetcherManager.fetcherThreadMap().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new FetcherTag(0, intRef.elem, FetcherPool$Default$.MODULE$)), clusterLinkFetcherThread));
    }

    private final void setupZkClient$1(UUID uuid, String str) {
        Mockito.reset(new KafkaZkClient[]{zkClient()});
        ((KafkaZkClient) Mockito.doReturn(new $colon.colon(uuid.toString(), Nil$.MODULE$), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getChildren("/cluster_links");
        ((KafkaZkClient) Mockito.doReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uuid), new ClusterLinkData(str, CoreUtils$.MODULE$.toKafkaUUID(uuid), None$.MODULE$, None$.MODULE$, false))})), ScalaRunTime$.MODULE$.toObjectArray(Nil$.MODULE$.toArray(ClassTag$.MODULE$.Any()))).when(zkClient())).getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{uuid})));
    }

    private final ClusterLinkManager recreateClusterLinkManager$1(long j) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(j));
        return createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), zkSupport());
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$1(ClusterLinkManager clusterLinkManager) {
        return clusterLinkManager.scheduledTasks().size() == 1 && BoxesRunTime.unboxToInt(clusterLinkManager.scheduledTasks().apply("Re-encryptCredentials")) == 1;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$2() {
        return "Retry not scheduled after failure";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$3(ClusterLinkManager clusterLinkManager) {
        return clusterLinkManager.scheduledTasks().size() == 0;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$4() {
        return "Unnecessary retry scheduled";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$5(ClusterLinkManager clusterLinkManager) {
        return clusterLinkManager.scheduledTasks().size() == 1 && BoxesRunTime.unboxToInt(clusterLinkManager.scheduledTasks().apply("DeleteCredentialsEncryptedUsingOldSecret")) == 1;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$6() {
        return "Old encoder delete not scheduled";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$7(ClusterLinkManager clusterLinkManager) {
        return clusterLinkManager.scheduledTasks().size() == 1 && BoxesRunTime.unboxToInt(clusterLinkManager.scheduledTasks().apply("DeleteCredentialsEncryptedUsingOldSecret")) == 1;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$8() {
        return "Unnecessary delete retry scheduled";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$9(ClusterLinkManager clusterLinkManager) {
        return clusterLinkManager.scheduledTasks().size() == 1 && BoxesRunTime.unboxToInt(clusterLinkManager.scheduledTasks().apply("DeleteCredentialsEncryptedUsingOldSecret")) == 2;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$10() {
        return "Delete retry not scheduled";
    }

    private final IndexedSeq actualThreadUsage$1() {
        Range until$extension = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 3);
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        return until$extension.map(i -> {
            return clusterLinkManager.backgroundThreadUsage(i);
        });
    }

    private final IndexedSeq actualThreadTenants$1() {
        Range until$extension = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 3);
        ClusterLinkManager clusterLinkManager = clusterLinkManager();
        return until$extension.map(i -> {
            return clusterLinkManager.backgroundThreadTenants(i);
        });
    }

    public static final /* synthetic */ void $anonfun$verifyLinkScheduler$4(ClusterLinkManagerTest clusterLinkManagerTest, ClusterLinkManager.Managers managers, Buffer buffer, Buffer buffer2, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        ClusterLinkData clusterLinkData = (ClusterLinkData) tuple2._1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        clusterLinkManagerTest.clusterLinkManager().commitAddClusterLink(clusterLinkData, managers, ClusterLinkConfig.LinkMode.DESTINATION);
        buffer.update(_2$mcI$sp, BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(buffer.apply(_2$mcI$sp)) + 1));
        Assertions.assertEquals(buffer, clusterLinkManagerTest.actualThreadUsage$1());
        clusterLinkData.tenantPrefix().foreach(str -> {
            return ((Growable) buffer2.apply(_2$mcI$sp)).$plus$eq(str);
        });
        Assertions.assertEquals(((IterableOnceOps) buffer2.map(set -> {
            return BoxesRunTime.boxToInteger(set.size());
        })).toSeq(), clusterLinkManagerTest.actualThreadTenants$1());
    }

    public ClusterLinkManagerTest() {
        MetadataCache$ metadataCache$ = MetadataCache$.MODULE$;
        MetadataVersion interBrokerProtocolVersion = kafka$server$link$ClusterLinkManagerTest$$brokerConfig().interBrokerProtocolVersion();
        MetadataCache$ metadataCache$2 = MetadataCache$.MODULE$;
        BrokerFeatures createEmpty = BrokerFeatures$.MODULE$.createEmpty();
        MetadataCache$ metadataCache$3 = MetadataCache$.MODULE$;
        Seq empty = Seq$.MODULE$.empty();
        MetadataCache$ metadataCache$4 = MetadataCache$.MODULE$;
        MetadataCache$ metadataCache$5 = MetadataCache$.MODULE$;
        this.metadataCache = new ZkMetadataCache(0, interBrokerProtocolVersion, createEmpty, empty, false, false);
        this.topicConfigSyncIncludeDefault = CollectionConverters$.MODULE$.ListHasAsScala(ClusterLinkConfigDefaults$.MODULE$.TopicConfigSyncIncludeDefault()).asScala().mkString(",");
    }

    public static final /* synthetic */ Object $anonfun$testTemporaryAdminIsClosed$1$adapted(KafkaFuture kafkaFuture) {
        $anonfun$testTemporaryAdminIsClosed$1(kafkaFuture);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testNonTemporaryAdminIsNotClosed$1$adapted(KafkaFuture kafkaFuture) {
        $anonfun$testNonTemporaryAdminIsNotClosed$1(kafkaFuture);
        return BoxedUnit.UNIT;
    }
}
