/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.File;
import java.io.Serializable;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkSourceConnectionManager;
import kafka.server.link.ClusterLinkSourceConnectionManagerTest$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.LinkMode;
import kafka.server.link.RemoteNetworkClient;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.SourceReverseConnectionManager;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\reb\u0001B(Q\u0001]CQA\u0018\u0001\u0005\u0002}CqA\u0019\u0001C\u0002\u0013%1\r\u0003\u0004i\u0001\u0001\u0006I\u0001\u001a\u0005\bS\u0002\u0011\r\u0011\"\u0003k\u0011\u00199\b\u0001)A\u0005W\"9\u0001\u0010\u0001b\u0001\n\u0013I\bbBA\u0003\u0001\u0001\u0006IA\u001f\u0005\n\u0003\u000f\u0001!\u0019!C\u0005\u0003\u0013A\u0001\"a\u0006\u0001A\u0003%\u00111\u0002\u0005\t\u00033\u0001!\u0019!C\u0005s\"9\u00111\u0004\u0001!\u0002\u0013Q\b\u0002CA\u000f\u0001\t\u0007I\u0011B=\t\u000f\u0005}\u0001\u0001)A\u0005u\"I\u0011\u0011\u0005\u0001C\u0002\u0013%\u00111\u0005\u0005\t\u0003c\u0001\u0001\u0015!\u0003\u0002&!I\u00111\u0007\u0001C\u0002\u0013%\u0011Q\u0007\u0005\t\u0003{\u0001\u0001\u0015!\u0003\u00028!I\u0011q\b\u0001C\u0002\u0013%\u0011\u0011\t\u0005\t\u0003\u0013\u0002\u0001\u0015!\u0003\u0002D!I\u00111\n\u0001C\u0002\u0013%\u0011Q\n\u0005\t\u00037\u0002\u0001\u0015!\u0003\u0002P!I\u0011Q\f\u0001C\u0002\u0013%\u0011q\f\u0005\t\u0003O\u0002\u0001\u0015!\u0003\u0002b!I\u0011\u0011\u000e\u0001C\u0002\u0013%\u00111\u000e\u0005\t\u0003s\u0002\u0001\u0015!\u0003\u0002n!I\u00111\u0010\u0001C\u0002\u0013%\u0011Q\u0010\u0005\t\u0003\u001b\u0003\u0001\u0015!\u0003\u0002\u0000!I\u0011q\u0012\u0001C\u0002\u0013%\u0011\u0011\u0013\u0005\t\u0003;\u0003\u0001\u0015!\u0003\u0002\u0014\"I\u0011q\u0014\u0001C\u0002\u0013%\u0011\u0011\u0015\u0005\t\u0003g\u0003\u0001\u0015!\u0003\u0002$\"I\u0011Q\u0017\u0001C\u0002\u0013%\u0011q\u0017\u0005\t\u00033\u0004\u0001\u0015!\u0003\u0002:\"Y\u00111\u001c\u0001A\u0002\u0003\u0007I\u0011BAo\u0011-\t)\u000f\u0001a\u0001\u0002\u0004%I!a:\t\u0017\u0005M\b\u00011A\u0001B\u0003&\u0011q\u001c\u0005\f\u0003k\u0004\u0001\u0019!a\u0001\n\u0013\t9\u0010C\u0006\u0002\u0000\u0002\u0001\r\u00111A\u0005\n\t\u0005\u0001b\u0003B\u0003\u0001\u0001\u0007\t\u0011)Q\u0005\u0003sD1Ba\u0002\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003\n!Y!\u0011\u0003\u0001A\u0002\u0003\u0007I\u0011\u0002B\n\u0011-\u00119\u0002\u0001a\u0001\u0002\u0003\u0006KAa\u0003\t\u0013\te\u0001A1A\u0005\n\tm\u0001\u0002\u0003B\u0015\u0001\u0001\u0006IA!\b\t\u0017\t-\u0002\u00011AA\u0002\u0013%!Q\u0006\u0005\f\u0005w\u0001\u0001\u0019!a\u0001\n\u0013\u0011i\u0004C\u0006\u0003B\u0001\u0001\r\u0011!Q!\n\t=\u0002b\u0003B\"\u0001\u0001\u0007\t\u0019!C\u0005\u0005\u000bB1B!\u0014\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003P!Y!1\u000b\u0001A\u0002\u0003\u0005\u000b\u0015\u0002B$\u0011%\u0011)\u0006\u0001a\u0001\n\u0013\u00119\u0006C\u0005\u0003f\u0001\u0001\r\u0011\"\u0003\u0003h!A!1\u000e\u0001!B\u0013\u0011I\u0006C\u0005\u0003n\u0001\u0001\r\u0011\"\u0003\u0003p!I!q\u000f\u0001A\u0002\u0013%!\u0011\u0010\u0005\t\u0005{\u0002\u0001\u0015)\u0003\u0003r!9!q\u0010\u0001\u0005\u0002\t\u0005\u0005b\u0002BM\u0001\u0011\u0005!\u0011\u0011\u0005\b\u0005G\u0003A\u0011\u0001BA\u0011\u001d\u00119\u000b\u0001C\u0001\u0005\u0003CqAa+\u0001\t\u0003\u0011\t\tC\u0004\u00030\u0002!\tA!!\t\u000f\tM\u0006\u0001\"\u0001\u0003\u0002\"9!q\u0017\u0001\u0005\u0002\t\u0005\u0005b\u0002B^\u0001\u0011\u0005!\u0011\u0011\u0005\b\u0005\u007f\u0003A\u0011\u0002Ba\u0011\u001d\u0011\u0019\r\u0001C\u0005\u0005\u000bD\u0011Ba5\u0001#\u0003%IA!6\t\u000f\t-\b\u0001\"\u0003\u0003n\"9!Q\u001f\u0001\u0005\n\t]\b\"CB\u0004\u0001E\u0005I\u0011BB\u0005\u0011\u001d\u0019i\u0001\u0001C\u0005\u0007\u001fAqa!\u0007\u0001\t\u0013\u0019YbB\u0004\u0004\u001eAC\taa\b\u0007\r=\u0003\u0006\u0012AB\u0011\u0011\u0019q6\n\"\u0001\u0004$!91QE&\u0005\u0002\t\u0005\u0005bBB\u0018\u0017\u0012\u0005!\u0011\u0011\u0002'\u00072,8\u000f^3s\u0019&t7nU8ve\u000e,7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:UKN$(BA)S\u0003\u0011a\u0017N\\6\u000b\u0005M#\u0016AB:feZ,'OC\u0001V\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001-\u0011\u0005ecV\"\u0001.\u000b\u0003m\u000bQa]2bY\u0006L!!\u0018.\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t\u0001\r\u0005\u0002b\u00015\t\u0001+\u0001\u0007ce>\\WM]\"p]\u001aLw-F\u0001e!\t)g-D\u0001S\u0013\t9'KA\u0006LC\u001a\\\u0017mQ8oM&<\u0017!\u00042s_.,'oQ8oM&<\u0007%\u0001\u0006tKJ4XM]%oM>,\u0012a\u001b\t\u0003YVl\u0011!\u001c\u0006\u0003]>\f!\"Y;uQ>\u0014\u0018N_3s\u0015\t\u0019\u0006O\u0003\u0002Vc*\u0011!o]\u0001\u0007CB\f7\r[3\u000b\u0003Q\f1a\u001c:h\u0013\t1XN\u0001\u000bBkRDwN]5{KJ\u001cVM\u001d<fe&sgm\\\u0001\fg\u0016\u0014h/\u001a:J]\u001a|\u0007%\u0001\u0005mS:\\g*Y7f+\u0005Q\bcA>\u0002\u00025\tAP\u0003\u0002~}\u0006!A.\u00198h\u0015\u0005y\u0018\u0001\u00026bm\u0006L1!a\u0001}\u0005\u0019\u0019FO]5oO\u0006IA.\u001b8l\u001d\u0006lW\rI\u0001\u0007Y&t7.\u00133\u0016\u0005\u0005-\u0001\u0003BA\u0007\u0003'i!!a\u0004\u000b\u0007\u0005Ea0\u0001\u0003vi&d\u0017\u0002BA\u000b\u0003\u001f\u0011A!V+J\t\u00069A.\u001b8l\u0013\u0012\u0004\u0013aD:pkJ\u001cWm\u00117vgR,'/\u00133\u0002!M|WO]2f\u00072,8\u000f^3s\u0013\u0012\u0004\u0013!\u00043fgR\u001cE.^:uKJLE-\u0001\beKN$8\t\\;ti\u0016\u0014\u0018\n\u001a\u0011\u0002\u00111Lgn\u001b#bi\u0006,\"!!\n\u0011\t\u0005\u001d\u0012QF\u0007\u0003\u0003SQ1!a\u000bU\u0003\tQ8.\u0003\u0003\u00020\u0005%\"aD\"mkN$XM\u001d'j].$\u0015\r^1\u0002\u00131Lgn\u001b#bi\u0006\u0004\u0013!\u00037j].\u0004&o\u001c9t+\t\t9\u0004\u0005\u0003\u0002\u000e\u0005e\u0012\u0002BA\u001e\u0003\u001f\u0011!\u0002\u0015:pa\u0016\u0014H/[3t\u0003)a\u0017N\\6Qe>\u00048\u000fI\u0001\u0010[\u0016$\u0018\rZ1uC6\u000bg.Y4feV\u0011\u00111\t\t\u0004C\u0006\u0015\u0013bAA$!\nQ2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0006\u0001R.\u001a;bI\u0006$\u0018-T1oC\u001e,'\u000fI\u0001\rg>\u001c7.\u001a;TKJ4XM]\u000b\u0003\u0003\u001f\u0002B!!\u0015\u0002X5\u0011\u00111\u000b\u0006\u0004\u0003+\"\u0016a\u00028fi^|'o[\u0005\u0005\u00033\n\u0019F\u0001\u0007T_\u000e\\W\r^*feZ,'/A\u0007t_\u000e\\W\r^*feZ,'\u000fI\u0001\fY&t7.T1oC\u001e,'/\u0006\u0002\u0002bA\u0019\u0011-a\u0019\n\u0007\u0005\u0015\u0004K\u0001\nDYV\u001cH/\u001a:MS:\\W*\u00198bO\u0016\u0014\u0018\u0001\u00047j].l\u0015M\\1hKJ\u0004\u0013\u0001\u0002;j[\u0016,\"!!\u001c\u0011\t\u0005=\u0014QO\u0007\u0003\u0003cR1!a\u001dU\u0003\u0015)H/\u001b7t\u0013\u0011\t9(!\u001d\u0003\u00115{7m\u001b+j[\u0016\fQ\u0001^5nK\u0002\nq!\\3ue&\u001c7/\u0006\u0002\u0002\u0000A!\u0011\u0011QAE\u001b\t\t\u0019I\u0003\u0003\u0002|\u0005\u0015%bAADa\u000611m\\7n_:LA!a#\u0002\u0004\n9Q*\u001a;sS\u000e\u001c\u0018\u0001C7fiJL7m\u001d\u0011\u0002\u000f\rD\u0017M\u001c8fYV\u0011\u00111\u0013\t\u0005\u0003+\u000bI*\u0004\u0002\u0002\u0018*!\u0011QKAC\u0013\u0011\tY*a&\u0003\u0019-\u000bgm[1DQ\u0006tg.\u001a7\u0002\u0011\rD\u0017M\u001c8fY\u0002\n\u0001#\\3uC\u0012\fG/\u0019*fcV,7\u000f^:\u0016\u0005\u0005\r\u0006\u0003BAS\u0003_k!!a*\u000b\t\u0005%\u00161V\u0001\u0007CR|W.[2\u000b\t\u00055\u0016qB\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BAY\u0003O\u0013Q\"\u0011;p[&\u001c\u0017J\u001c;fO\u0016\u0014\u0018!E7fi\u0006$\u0017\r^1SKF,Xm\u001d;tA\u0005q1\r\\8tK\u0012\u001c\u0005.\u00198oK2\u001cXCAA]!\u0019\tY,!2\u0002J6\u0011\u0011Q\u0018\u0006\u0005\u0003\u007f\u000b\t-A\u0004nkR\f'\r\\3\u000b\u0007\u0005\r',\u0001\u0006d_2dWm\u0019;j_:LA!a2\u0002>\n\u00191+\u001a;\u0011\t\u0005-\u0017Q[\u0007\u0003\u0003\u001bTA!a4\u0002R\u0006A1\r[1o]\u0016d7OC\u0002\u0002Tz\f1A\\5p\u0013\u0011\t9.!4\u0003\u001bM{7m[3u\u0007\"\fgN\\3m\u0003=\u0019Gn\\:fI\u000eC\u0017M\u001c8fYN\u0004\u0013A\u00037j].\u001cuN\u001c4jOV\u0011\u0011q\u001c\t\u0004C\u0006\u0005\u0018bAAr!\n\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0002\u001d1Lgn[\"p]\u001aLwm\u0018\u0013fcR!\u0011\u0011^Ax!\rI\u00161^\u0005\u0004\u0003[T&\u0001B+oSRD\u0011\"!=$\u0003\u0003\u0005\r!a8\u0002\u0007a$\u0013'A\u0006mS:\\7i\u001c8gS\u001e\u0004\u0013a\u00037j].lU\r\u001e:jGN,\"!!?\u0011\u0007\u0005\fY0C\u0002\u0002~B\u0013!c\u00117vgR,'\u000fT5oW6+GO]5dg\u0006yA.\u001b8l\u001b\u0016$(/[2t?\u0012*\u0017\u000f\u0006\u0003\u0002j\n\r\u0001\"CAyM\u0005\u0005\t\u0019AA}\u00031a\u0017N\\6NKR\u0014\u0018nY:!\u0003-\u0019wN\u001c8NC:\fw-\u001a:\u0016\u0005\t-\u0001cA1\u0003\u000e%\u0019!q\u0002)\u0003E\rcWo\u001d;fe2Kgn[*pkJ\u001cWmQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0003=\u0019wN\u001c8NC:\fw-\u001a:`I\u0015\fH\u0003BAu\u0005+A\u0011\"!=*\u0003\u0003\u0005\rAa\u0003\u0002\u0019\r|gN\\'b]\u0006<WM\u001d\u0011\u0002\u001f1|7-\u00197N_\u000e\\7\t\\5f]R,\"A!\b\u0011\t\t}!QE\u0007\u0003\u0005CQ1Aa\tq\u0003\u001d\u0019G.[3oiNLAAa\n\u0003\"\tQQj\\2l\u00072LWM\u001c;\u0002!1|7-\u00197N_\u000e\\7\t\\5f]R\u0004\u0013A\u00037pG\u0006d\u0017\tZ7j]V\u0011!q\u0006\t\u0005\u0005c\u00119$\u0004\u0002\u00034)!!Q\u0007B\u0011\u0003\u0015\tG-\\5o\u0013\u0011\u0011IDa\r\u0003\u001d\r{gN\u001a7vK:$\u0018\tZ7j]\u0006qAn\\2bY\u0006#W.\u001b8`I\u0015\fH\u0003BAu\u0005\u007fA\u0011\"!=/\u0003\u0003\u0005\rAa\f\u0002\u00171|7-\u00197BI6Lg\u000eI\u0001\fe\u0016lw\u000e^3BI6Lg.\u0006\u0002\u0003HA\u0019\u0011M!\u0013\n\u0007\t-\u0003KA\nSK6|G/\u001a(fi^|'o[\"mS\u0016tG/A\bsK6|G/Z!e[&tw\fJ3r)\u0011\tIO!\u0015\t\u0013\u0005E\u0018'!AA\u0002\t\u001d\u0013\u0001\u0004:f[>$X-\u00113nS:\u0004\u0013A\u0006:f[>$XmQ8oiJ|G\u000e\\3s\u001d>$W-\u00133\u0016\u0005\te\u0003#B-\u0003\\\t}\u0013b\u0001B/5\n1q\n\u001d;j_:\u00042!\u0017B1\u0013\r\u0011\u0019G\u0017\u0002\u0004\u0013:$\u0018A\u0007:f[>$XmQ8oiJ|G\u000e\\3s\u001d>$W-\u00133`I\u0015\fH\u0003BAu\u0005SB\u0011\"!=5\u0003\u0003\u0005\rA!\u0017\u0002/I,Wn\u001c;f\u0007>tGO]8mY\u0016\u0014hj\u001c3f\u0013\u0012\u0004\u0013!E5t\u0019>\u001c\u0017\r\\\"p]R\u0014x\u000e\u001c7feV\u0011!\u0011\u000f\t\u00043\nM\u0014b\u0001B;5\n9!i\\8mK\u0006t\u0017!F5t\u0019>\u001c\u0017\r\\\"p]R\u0014x\u000e\u001c7fe~#S-\u001d\u000b\u0005\u0003S\u0014Y\bC\u0005\u0002r^\n\t\u00111\u0001\u0003r\u0005\u0011\u0012n\u001d'pG\u0006d7i\u001c8ue>dG.\u001a:!\u0003!!X-\u0019:E_^tGCAAuQ\rI$Q\u0011\t\u0005\u0005\u000f\u0013)*\u0004\u0002\u0003\n*!!1\u0012BG\u0003\r\t\u0007/\u001b\u0006\u0005\u0005\u001f\u0013\t*A\u0004kkBLG/\u001a:\u000b\u0007\tM5/A\u0003kk:LG/\u0003\u0003\u0003\u0018\n%%!C!gi\u0016\u0014X)Y2i\u0003e!Xm\u001d;D_:tWm\u0019;j_:lu\u000eZ3J]\n|WO\u001c3)\u0007i\u0012i\n\u0005\u0003\u0003\b\n}\u0015\u0002\u0002BQ\u0005\u0013\u0013A\u0001V3ti\u0006QB/Z:u\u0007>tg.Z2uS>tWj\u001c3f\u001fV$(m\\;oI\"\u001a1H!(\u00021Q,7\u000f\u001e)feNL7\u000f^3oi\u000e{gN\\3di&|g\u000eK\u0002=\u0005;\u000b\u0001\u0007^3tiB+'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8o%\u0016lw\u000e^3D_:$(o\u001c7mKJtu\u000e^&o_^t\u0007fA\u001f\u0003\u001e\u00061B/Z:u\u001d>$Hj\\2bY\u000e{g\u000e\u001e:pY2,'\u000fK\u0002?\u0005;\u000bA\u0002^3ti6+G/\u00193bi\u0006D3a\u0010BO\u0003=!Xm\u001d;SK\u000e|gNZ5hkJ,\u0007f\u0001!\u0003\u001e\u0006\u0011C/Z:u%\u0016\u001cwN\u001c4jOV\u0014XmV5uQ\u000e{gN\\3di&|gNU3tKRD3!\u0011BO\u0003I\u0019'/Z1uK\n\u0013xn[3s\u0007>tg-[4\u0015\u0003\u0011\fac]3ukB\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u000b\u0007\u0003S\u00149M!5\t\u000f\t%7\t1\u0001\u0003L\u0006q1m\u001c8oK\u000e$\u0018n\u001c8N_\u0012,\u0007cA1\u0003N&\u0019!q\u001a)\u0003\u001d\r{gN\\3di&|g.T8eK\"I!QN\"\u0011\u0002\u0003\u0007!\u0011O\u0001!g\u0016$X\u000f]\"p]:,7\r^5p]6\u000bg.Y4fe\u0012\"WMZ1vYR$#'\u0006\u0002\u0003X*\"!\u0011\u000fBmW\t\u0011Y\u000e\u0005\u0003\u0003^\n\u001dXB\u0001Bp\u0015\u0011\u0011\tOa9\u0002\u0013Ut7\r[3dW\u0016$'b\u0001Bs5\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\t%(q\u001c\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017\u0001D:fiV\u00038\t[1o]\u0016dGCBAu\u0005_\u0014\t\u0010C\u0004\u0002\u0010\u0016\u0003\r!a%\t\u000f\tMX\t1\u0001\u0003`\u0005I1\r[1o]\u0016d\u0017\nZ\u0001\fe\u00164XM]:f\u001d>$W\r\u0006\u0004\u0003z\n}81\u0001\t\u0005\u0003+\u0013Y0\u0003\u0003\u0003~\u0006]%a\u0003*fm\u0016\u00148/\u001a(pI\u0016Dqa!\u0001G\u0001\u0004\u0011y&\u0001\u0004o_\u0012,\u0017\n\u001a\u0005\n\u0007\u000b1\u0005\u0013!a\u0001\u0005?\n\u0011B]3rk\u0016\u001cH/\u00133\u0002+I,g/\u001a:tK:{G-\u001a\u0013eK\u001a\fW\u000f\u001c;%eU\u001111\u0002\u0016\u0005\u0005?\u0012I.A\u0007sK6|G/Z\"mkN$XM]\u000b\u0003\u0007#\u0001Baa\u0005\u0004\u00165\u0011\u0011QQ\u0005\u0005\u0007/\t)IA\u0004DYV\u001cH/\u001a:\u0002K%t\u0017\u000e^5bi\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c*fcV,7\u000f^\"pk:$XC\u0001B0\u0003\u0019\u001aE.^:uKJd\u0015N\\6T_V\u00148-Z\"p]:,7\r^5p]6\u000bg.Y4feR+7\u000f\u001e\t\u0003C.\u001b\"a\u0013-\u0015\u0005\r}\u0011AC:fiV\u00038\t\\1tg\"\u001aQj!\u000b\u0011\t\t\u001d51F\u0005\u0005\u0007[\u0011IIA\u0005CK\u001a|'/Z!mY\u0006iA/Z1s\t><hn\u00117bgND3ATB\u001a!\u0011\u00119i!\u000e\n\t\r]\"\u0011\u0012\u0002\t\u0003\u001a$XM]!mY\u0002")
public class ClusterLinkSourceConnectionManagerTest {
    private final KafkaConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig = this.createBrokerConfig();
    private final AuthorizerServerInfo kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo = (AuthorizerServerInfo)EasyMock.createNiceMock(AuthorizerServerInfo.class);
    private final String linkName;
    private final UUID linkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId;
    private final String destClusterId;
    private final ClusterLinkData kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData = new ClusterLinkData(this.linkName(), this.linkId(), (Option)new Some((Object)this.destClusterId()), (Option)None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final ClusterLinkMetadataManager kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager = (ClusterLinkMetadataManager)EasyMock.createNiceMock(ClusterLinkMetadataManager.class);
    private final SocketServer kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer = (SocketServer)EasyMock.createNiceMock(SocketServer.class);
    private final ClusterLinkManager linkManager = (ClusterLinkManager)EasyMock.createNiceMock(ClusterLinkManager.class);
    private final MockTime kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final KafkaChannel channel = (KafkaChannel)EasyMock.createNiceMock(KafkaChannel.class);
    private final AtomicInteger kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests = new AtomicInteger();
    private final scala.collection.mutable.Set<SocketChannel> kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
    private ClusterLinkConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig;
    private ClusterLinkMetrics kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics;
    private ClusterLinkSourceConnectionManager connManager;
    private final MockClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient = new MockClient((Time)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time());
    private ConfluentAdmin kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin;
    private RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin;
    private Option<Object> remoteControllerNodeId = new Some((Object)BoxesRunTime.boxToInteger((int)20));
    private boolean isLocalController = false;

    @AfterAll
    public static void tearDownClass() {
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@AfterAll");
    }

    @BeforeAll
    public static void setUpClass() {
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@BeforeAll");
    }

    public KafkaConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig;
    }

    public AuthorizerServerInfo kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo;
    }

    private String linkName() {
        return this.linkName;
    }

    private UUID linkId() {
        return this.linkId;
    }

    public String kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    public ClusterLinkData kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    public ClusterLinkMetadataManager kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager;
    }

    public SocketServer kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    public MockTime kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private KafkaChannel channel() {
        return this.channel;
    }

    public AtomicInteger kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests;
    }

    public scala.collection.mutable.Set<SocketChannel> kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig = x$1;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics = x$1;
    }

    private ClusterLinkSourceConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkSourceConnectionManager x$1) {
        this.connManager = x$1;
    }

    public MockClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient;
    }

    public ConfluentAdmin kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin = x$1;
    }

    public RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin;
    }

    public void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(RemoteNetworkClient x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin = x$1;
    }

    private Option<Object> remoteControllerNodeId() {
        return this.remoteControllerNodeId;
    }

    private void remoteControllerNodeId_$eq(Option<Object> x$1) {
        this.remoteControllerNodeId = x$1;
    }

    private boolean isLocalController() {
        return this.isLocalController;
    }

    private void isLocalController_$eq(boolean x$1) {
        this.isLocalController = x$1;
    }

    @AfterEach
    public void tearDown() {
        if (this.connManager() != null) {
            this.connManager().shutdown();
        }
        this.metrics().close();
    }

    @Test
    public void testConnectionModeInbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, false);
        Assertions.assertThrows(IllegalStateException.class, () -> this.connManager().startup());
        Assertions.assertThrows(InvalidRequestException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin());
    }

    @Test
    public void testConnectionModeOutbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnection() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkSourceConnectionManagerTest.$anonfun$testPersistentConnection$1(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                Assertions.fail((String)"Persistent connection not initiated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l), l2));
        }
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnectionRemoteControllerNotKnown() {
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
    }

    @Test
    public void testNotLocalController() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, 5));
        KafkaChannel channel2 = (KafkaChannel)EasyMock.createNiceMock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 10));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testMetadata() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().startup();
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(true);
        this.connManager().onControllerChange(true);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(false);
        this.connManager().onControllerChange(false);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(true);
        this.remoteControllerNodeId_$eq((Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)25)));
        Node coordinator = new Node(25, "", 0);
        this.connManager().onNewRemoteLinkCoordinator(coordinator);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().onNewRemoteLinkCoordinator(coordinator);
        Assertions.assertEquals((int)3, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigure() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        SocketChannel socketChannel = this.channel().socketChannel();
        EasyMock.replay((Object[])new Object[]{socketChannel});
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("metadata.max.age.ms", "1000");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testReconfigureWithConnectionReset() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel2 = (KafkaChannel)EasyMock.createNiceMock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 5));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"security.protocol"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new SocketChannel[]{this.channel().socketChannel(), channel2.socketChannel()})), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels());
    }

    private KafkaConfig createBrokerConfig() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    private void setupConnectionManager(ConnectionMode connectionMode, boolean isLocalController) {
        this.isLocalController_$eq(isLocalController);
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LinkModeProp(), LinkMode.Source$.MODULE$.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "EXTERNAL");
        this.linkProps().put("bootstrap.servers", "localhost:123");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.linkMetrics_$eq(new ClusterLinkMetrics(this.linkName(), this.linkId(), (LinkMode)LinkMode.Source$.MODULE$, this.linkManager(), (Option)None$.MODULE$, this.metrics(), (Option)None$.MODULE$));
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics().startup();
        Endpoint endpoint = new Endpoint("EXTERNAL", SecurityProtocol.PLAINTEXT, "host", 123);
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo().endpoints()).andReturn(Collections.singletonList(endpoint));
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager().isLinkCoordinator(EasyMock.anyString(), EasyMock.anyBoolean()))).andAnswer(() -> BoxesRunTime.boxToBoolean((boolean)this.isLocalController())).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo(), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager()});
        this.setUpChannel(this.channel(), 123);
        this.connManager_$eq(new ClusterLinkSourceConnectionManager(this){
            private final /* synthetic */ ClusterLinkSourceConnectionManagerTest $outer;

            public ConfluentAdmin createLocalAdmin() {
                Map<String, String> configs = Collections.singletonMap("bootstrap.servers", "localhost:9092");
                AdminClientConfig config = new AdminClientConfig(configs);
                AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 1000L, 300000L);
                List<Node> nodes = Collections.singletonList(new Node(1, "host1", 123));
                Cluster cluster = new Cluster(this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
                adminMetadataManager.update(cluster, this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time().milliseconds());
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(ConfluentAdminUtils.createConfluentAdmin((AdminClientConfig)config, (AdminMetadataManager)adminMetadataManager, (KafkaClient)this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient(), (Time)this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time()));
                return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin();
            }

            public RemoteNetworkClient createRemoteAdmin() {
                NetworkClient networkClient = (NetworkClient)EasyMock.createNiceMock(NetworkClient.class);
                SourceReverseConnectionManager reverseConnectionManager = (SourceReverseConnectionManager)EasyMock.createNiceMock(SourceReverseConnectionManager.class);
                ClusterLinkMetadata metadata = (ClusterLinkMetadata)EasyMock.createNiceMock(ClusterLinkMetadata.class);
                ClusterLinkMetadataThread metadataThread = (ClusterLinkMetadataThread)EasyMock.createNiceMock(ClusterLinkMetadataThread.class);
                EasyMock.expect((Object)networkClient.reverseConnectionManager()).andReturn((Object)reverseConnectionManager).anyTimes();
                EasyMock.expect((Object)metadataThread.clusterLinkMetadata()).andReturn((Object)metadata).anyTimes();
                EasyMock.expect((Object)metadataThread.remoteLinkCoordinator()).andAnswer(() -> Option$.MODULE$.apply((Object)$this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster().controller())).anyTimes();
                EasyMock.expect((Object)metadata.fetch()).andAnswer(() -> $this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster()).anyTimes();
                EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)metadata.requestUpdate())).andAnswer(() -> BoxesRunTime.boxToInteger((int)$anon$1.$anonfun$createRemoteAdmin$3(this))).anyTimes();
                EasyMock.replay((Object[])new Object[]{networkClient, metadataThread, metadata});
                Assertions.assertNull((Object)this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(new RemoteNetworkClient(networkClient, metadataThread));
                return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
            }

            public void closeReverseConnectionAdmin() {
                if (this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin() != null) {
                    this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin().close(Duration.ZERO);
                }
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(null);
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(null);
            }

            public void closeSocketChannel(SocketChannel socketChannel) {
                super.closeSocketChannel(socketChannel);
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels().$plus$eq((Object)socketChannel);
            }

            public static final /* synthetic */ int $anonfun$createRemoteAdmin$3($anon$1 $this) {
                return $this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().incrementAndGet();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super($outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId(), (Option)None$.MODULE$, $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataManager(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo(), (Time)$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time());
            }
        });
    }

    private boolean setupConnectionManager$default$2() {
        return false;
    }

    private void setUpChannel(KafkaChannel channel, int channelId) {
        EasyMock.expect((Object)channel.id()).andReturn((Object)Integer.toString(channelId)).anyTimes();
        SocketChannel socketChannel = (SocketChannel)EasyMock.createNiceMock(SocketChannel.class);
        EasyMock.expect((Object)channel.socketChannel()).andReturn((Object)socketChannel).anyTimes();
        EasyMock.replay((Object[])new Object[]{channel});
    }

    private ReverseNode reverseNode(int nodeId, int requestId) {
        return new ReverseNode(nodeId, nodeId, new StringBuilder(4).append("host").append(nodeId).toString(), 1234, this.linkId(), requestId, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS, Optional.empty(), null);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public Cluster kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster() {
        ArrayList<Node> nodes = new ArrayList<Node>();
        nodes.add(new Node(1, "host1", 123));
        Option remoteControllerNode = this.remoteControllerNodeId().map((Function1 & Serializable)id -> ClusterLinkSourceConnectionManagerTest.$anonfun$remoteCluster$1(BoxesRunTime.unboxToInt((Object)id)));
        remoteControllerNode.foreach((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)nodes.add(x$1)));
        return new Cluster(this.destClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), (Node)remoteControllerNode.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
    }

    private int initiateReverseConnectionRequestCount() {
        return CollectionConverters$.MODULE$.CollectionHasAsScala((Collection)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient().requests()).asScala().count((Function1 & Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSourceConnectionManagerTest.$anonfun$initiateReverseConnectionRequestCount$1(x$6)));
    }

    public static final /* synthetic */ boolean $anonfun$testPersistentConnection$1(ClusterLinkSourceConnectionManagerTest $this) {
        return $this.initiateReverseConnectionRequestCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testPersistentConnection$2() {
        return "Persistent connection not initiated";
    }

    public static final /* synthetic */ Node $anonfun$remoteCluster$1(int id) {
        return new Node(id, new StringBuilder(4).append("host").append(id).toString(), 123);
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnectionRequestCount$1(ClientRequest x$6) {
        ApiKeys apiKeys = x$6.apiKey();
        ApiKeys apiKeys2 = ApiKeys.INITIATE_REVERSE_CONNECTIONS;
        return !(apiKeys != null ? !apiKeys.equals(apiKeys2) : apiKeys2 != null);
    }

    public ClusterLinkSourceConnectionManagerTest() {
        this.linkName = "testLink";
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId = "sourceCluster";
        this.destClusterId = "destCluster";
    }
}

