/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.controller.ControllerContext;
import kafka.controller.KafkaController;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkSourceConnectionManager;
import kafka.server.link.ClusterLinkSourceConnectionManagerTest$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.LinkMode;
import kafka.server.link.RemoteNetworkClient;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.SourceReverseConnectionManager;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\r\u001da\u0001B$I\u0001=CQA\u0016\u0001\u0005\u0002]CqA\u0017\u0001C\u0002\u0013%1\f\u0003\u0004a\u0001\u0001\u0006I\u0001\u0018\u0005\bC\u0002\u0011\r\u0011\"\u0003c\u0011\u0019y\u0007\u0001)A\u0005G\"9\u0001\u000f\u0001b\u0001\n\u0013\t\bB\u0002>\u0001A\u0003%!\u000fC\u0004|\u0001\t\u0007I\u0011\u0002?\t\u000f\u0005\u001d\u0001\u0001)A\u0005{\"A\u0011\u0011\u0002\u0001C\u0002\u0013%\u0011\u000fC\u0004\u0002\f\u0001\u0001\u000b\u0011\u0002:\t\u0011\u00055\u0001A1A\u0005\nEDq!a\u0004\u0001A\u0003%!\u000fC\u0005\u0002\u0012\u0001\u0011\r\u0011\"\u0003\u0002\u0014!A\u0011\u0011\u0005\u0001!\u0002\u0013\t)\u0002C\u0005\u0002$\u0001\u0011\r\u0011\"\u0003\u0002&!A\u0011Q\u0006\u0001!\u0002\u0013\t9\u0003C\u0005\u00020\u0001\u0011\r\u0011\"\u0003\u00022!A\u0011Q\b\u0001!\u0002\u0013\t\u0019\u0004C\u0005\u0002@\u0001\u0011\r\u0011\"\u0003\u0002B!A\u0011q\n\u0001!\u0002\u0013\t\u0019\u0005C\u0005\u0002R\u0001\u0011\r\u0011\"\u0003\u0002T!A\u00111\f\u0001!\u0002\u0013\t)\u0006C\u0005\u0002^\u0001\u0011\r\u0011\"\u0003\u0002`!A\u0011Q\u000e\u0001!\u0002\u0013\t\t\u0007C\u0005\u0002p\u0001\u0011\r\u0011\"\u0003\u0002r!A\u0011\u0011\u0011\u0001!\u0002\u0013\t\u0019\bC\u0005\u0002\u0004\u0002\u0011\r\u0011\"\u0003\u0002\u0006\"A\u0011\u0011\u0013\u0001!\u0002\u0013\t9\tC\u0005\u0002\u0014\u0002\u0011\r\u0011\"\u0003\u0002\u0016\"A\u0011q\u0015\u0001!\u0002\u0013\t9\nC\u0005\u0002*\u0002\u0011\r\u0011\"\u0003\u0002,\"A\u0011Q\u001a\u0001!\u0002\u0013\ti\u000bC\u0006\u0002P\u0002\u0001\r\u00111A\u0005\n\u0005E\u0007bCAm\u0001\u0001\u0007\t\u0019!C\u0005\u00037D1\"a:\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002T\"Y\u0011\u0011\u001e\u0001A\u0002\u0003\u0007I\u0011BAv\u0011-\t\u0019\u0010\u0001a\u0001\u0002\u0004%I!!>\t\u0017\u0005e\b\u00011A\u0001B\u0003&\u0011Q\u001e\u0005\f\u0003w\u0004\u0001\u0019!a\u0001\n\u0013\ti\u0010C\u0006\u0003\u0006\u0001\u0001\r\u00111A\u0005\n\t\u001d\u0001b\u0003B\u0006\u0001\u0001\u0007\t\u0011)Q\u0005\u0003\u007fD\u0011B!\u0004\u0001\u0005\u0004%IAa\u0004\t\u0011\tu\u0001\u0001)A\u0005\u0005#A1Ba\b\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003\"!Y!q\u0006\u0001A\u0002\u0003\u0007I\u0011\u0002B\u0019\u0011-\u0011)\u0004\u0001a\u0001\u0002\u0003\u0006KAa\t\t\u0017\t]\u0002\u00011AA\u0002\u0013%!\u0011\b\u0005\f\u0005\u0003\u0002\u0001\u0019!a\u0001\n\u0013\u0011\u0019\u0005C\u0006\u0003H\u0001\u0001\r\u0011!Q!\n\tm\u0002\"\u0003B%\u0001\u0001\u0007I\u0011\u0002B&\u0011%\u0011I\u0006\u0001a\u0001\n\u0013\u0011Y\u0006\u0003\u0005\u0003`\u0001\u0001\u000b\u0015\u0002B'\u0011\u001d\u0011\t\u0007\u0001C\u0001\u0005GBqAa\u001f\u0001\t\u0003\u0011\u0019\u0007C\u0004\u0003\u0006\u0002!\tAa\u0019\t\u000f\t%\u0005\u0001\"\u0001\u0003d!9!Q\u0012\u0001\u0005\u0002\t\r\u0004b\u0002BI\u0001\u0011\u0005!1\r\u0005\b\u0005+\u0003A\u0011\u0001B2\u0011\u001d\u0011I\n\u0001C\u0001\u0005GBqA!(\u0001\t\u0003\u0011\u0019\u0007C\u0004\u0003\"\u0002!IAa)\t\u000f\t\u0015\u0006\u0001\"\u0003\u0003(\"I!Q\u0018\u0001\u0012\u0002\u0013%!q\u0018\u0005\b\u0005+\u0004A\u0011\u0002Bl\u0011\u001d\u0011y\u000e\u0001C\u0005\u0005CD\u0011B!=\u0001#\u0003%IAa=\t\u000f\t]\b\u0001\"\u0003\u0003z\"911\u0001\u0001\u0005\n\r\u0015!AJ\"mkN$XM\u001d'j].\u001cv.\u001e:dK\u000e{gN\\3di&|g.T1oC\u001e,'\u000fV3ti*\u0011\u0011JS\u0001\u0005Y&t7N\u0003\u0002L\u0019\u000611/\u001a:wKJT\u0011!T\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\u000b\u0005\u0002R)6\t!KC\u0001T\u0003\u0015\u00198-\u00197b\u0013\t)&K\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003a\u0003\"!\u0017\u0001\u000e\u0003!\u000bAB\u0019:pW\u0016\u00148i\u001c8gS\u001e,\u0012\u0001\u0018\t\u0003;zk\u0011AS\u0005\u0003?*\u00131bS1gW\u0006\u001cuN\u001c4jO\u0006i!M]8lKJ\u001cuN\u001c4jO\u0002\n!b]3sm\u0016\u0014\u0018J\u001c4p+\u0005\u0019\u0007C\u00013n\u001b\u0005)'B\u00014h\u0003)\tW\u000f\u001e5pe&TXM\u001d\u0006\u0003\u0017\"T!!T5\u000b\u0005)\\\u0017AB1qC\u000eDWMC\u0001m\u0003\ry'oZ\u0005\u0003]\u0016\u0014A#Q;uQ>\u0014\u0018N_3s'\u0016\u0014h/\u001a:J]\u001a|\u0017aC:feZ,'/\u00138g_\u0002\n\u0001\u0002\\5oW:\u000bW.Z\u000b\u0002eB\u00111\u000f_\u0007\u0002i*\u0011QO^\u0001\u0005Y\u0006twMC\u0001x\u0003\u0011Q\u0017M^1\n\u0005e$(AB*ue&tw-A\u0005mS:\\g*Y7fA\u00051A.\u001b8l\u0013\u0012,\u0012! \t\u0004}\u0006\rQ\"A@\u000b\u0007\u0005\u0005a/\u0001\u0003vi&d\u0017bAA\u0003\u007f\n!Q+V%E\u0003\u001da\u0017N\\6JI\u0002\nqb]8ve\u000e,7\t\\;ti\u0016\u0014\u0018\nZ\u0001\u0011g>,(oY3DYV\u001cH/\u001a:JI\u0002\nQ\u0002Z3ti\u000ecWo\u001d;fe&#\u0017A\u00043fgR\u001cE.^:uKJLE\rI\u0001\tY&t7\u000eR1uCV\u0011\u0011Q\u0003\t\u0005\u0003/\ti\"\u0004\u0002\u0002\u001a)\u0019\u00111\u0004'\u0002\u0005i\\\u0017\u0002BA\u0010\u00033\u0011qb\u00117vgR,'\u000fT5oW\u0012\u000bG/Y\u0001\nY&t7\u000eR1uC\u0002\n\u0011\u0002\\5oWB\u0013x\u000e]:\u0016\u0005\u0005\u001d\u0002c\u0001@\u0002*%\u0019\u00111F@\u0003\u0015A\u0013x\u000e]3si&,7/\u0001\u0006mS:\\\u0007K]8qg\u0002\n!bY8oiJ|G\u000e\\3s+\t\t\u0019\u0004\u0005\u0003\u00026\u0005eRBAA\u001c\u0015\r\ty\u0003T\u0005\u0005\u0003w\t9DA\bLC\u001a\\\u0017mQ8oiJ|G\u000e\\3s\u0003-\u0019wN\u001c;s_2dWM\u001d\u0011\u0002\u0019M|7m[3u'\u0016\u0014h/\u001a:\u0016\u0005\u0005\r\u0003\u0003BA#\u0003\u0017j!!a\u0012\u000b\u0007\u0005%C*A\u0004oKR<xN]6\n\t\u00055\u0013q\t\u0002\r'>\u001c7.\u001a;TKJ4XM]\u0001\u000eg>\u001c7.\u001a;TKJ4XM\u001d\u0011\u0002\u00171Lgn['b]\u0006<WM]\u000b\u0003\u0003+\u00022!WA,\u0013\r\tI\u0006\u0013\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T1oC\u001e,'/\u0001\u0007mS:\\W*\u00198bO\u0016\u0014\b%\u0001\u0003uS6,WCAA1!\u0011\t\u0019'!\u001b\u000e\u0005\u0005\u0015$bAA4\u0019\u0006)Q\u000f^5mg&!\u00111NA3\u0005!iunY6US6,\u0017!\u0002;j[\u0016\u0004\u0013aB7fiJL7m]\u000b\u0003\u0003g\u0002B!!\u001e\u0002~5\u0011\u0011q\u000f\u0006\u0005\u0003_\nIHC\u0002\u0002|!\faaY8n[>t\u0017\u0002BA@\u0003o\u0012q!T3ue&\u001c7/\u0001\u0005nKR\u0014\u0018nY:!\u0003\u001d\u0019\u0007.\u00198oK2,\"!a\"\u0011\t\u0005%\u0015QR\u0007\u0003\u0003\u0017SA!!\u0013\u0002z%!\u0011qRAF\u00051Y\u0015MZ6b\u0007\"\fgN\\3m\u0003!\u0019\u0007.\u00198oK2\u0004\u0013\u0001E7fi\u0006$\u0017\r^1SKF,Xm\u001d;t+\t\t9\n\u0005\u0003\u0002\u001a\u0006\rVBAAN\u0015\u0011\ti*a(\u0002\r\u0005$x.\\5d\u0015\r\t\tk`\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BAS\u00037\u0013Q\"\u0011;p[&\u001c\u0017J\u001c;fO\u0016\u0014\u0018!E7fi\u0006$\u0017\r^1SKF,Xm\u001d;tA\u0005q1\r\\8tK\u0012\u001c\u0005.\u00198oK2\u001cXCAAW!\u0019\ty+!/\u0002>6\u0011\u0011\u0011\u0017\u0006\u0005\u0003g\u000b),A\u0004nkR\f'\r\\3\u000b\u0007\u0005]&+\u0001\u0006d_2dWm\u0019;j_:LA!a/\u00022\n\u00191+\u001a;\u0011\t\u0005}\u0016\u0011Z\u0007\u0003\u0003\u0003TA!a1\u0002F\u0006A1\r[1o]\u0016d7OC\u0002\u0002HZ\f1A\\5p\u0013\u0011\tY-!1\u0003\u001bM{7m[3u\u0007\"\fgN\\3m\u0003=\u0019Gn\\:fI\u000eC\u0017M\u001c8fYN\u0004\u0013A\u00037j].\u001cuN\u001c4jOV\u0011\u00111\u001b\t\u00043\u0006U\u0017bAAl\u0011\n\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0002\u001d1Lgn[\"p]\u001aLwm\u0018\u0013fcR!\u0011Q\\Ar!\r\t\u0016q\\\u0005\u0004\u0003C\u0014&\u0001B+oSRD\u0011\"!:$\u0003\u0003\u0005\r!a5\u0002\u0007a$\u0013'A\u0006mS:\\7i\u001c8gS\u001e\u0004\u0013a\u00037j].lU\r\u001e:jGN,\"!!<\u0011\u0007e\u000by/C\u0002\u0002r\"\u0013!c\u00117vgR,'\u000fT5oW6+GO]5dg\u0006yA.\u001b8l\u001b\u0016$(/[2t?\u0012*\u0017\u000f\u0006\u0003\u0002^\u0006]\b\"CAsM\u0005\u0005\t\u0019AAw\u00031a\u0017N\\6NKR\u0014\u0018nY:!\u0003-\u0019wN\u001c8NC:\fw-\u001a:\u0016\u0005\u0005}\bcA-\u0003\u0002%\u0019!1\u0001%\u0003E\rcWo\u001d;fe2Kgn[*pkJ\u001cWmQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0003=\u0019wN\u001c8NC:\fw-\u001a:`I\u0015\fH\u0003BAo\u0005\u0013A\u0011\"!:*\u0003\u0003\u0005\r!a@\u0002\u0019\r|gN\\'b]\u0006<WM\u001d\u0011\u0002\u001f1|7-\u00197N_\u000e\\7\t\\5f]R,\"A!\u0005\u0011\t\tM!\u0011D\u0007\u0003\u0005+Q1Aa\u0006i\u0003\u001d\u0019G.[3oiNLAAa\u0007\u0003\u0016\tQQj\\2l\u00072LWM\u001c;\u0002!1|7-\u00197N_\u000e\\7\t\\5f]R\u0004\u0013A\u00037pG\u0006d\u0017\tZ7j]V\u0011!1\u0005\t\u0005\u0005K\u0011Y#\u0004\u0002\u0003()!!\u0011\u0006B\u000b\u0003\u0015\tG-\\5o\u0013\u0011\u0011iCa\n\u0003\u001d\r{gN\u001a7vK:$\u0018\tZ7j]\u0006qAn\\2bY\u0006#W.\u001b8`I\u0015\fH\u0003BAo\u0005gA\u0011\"!:/\u0003\u0003\u0005\rAa\t\u0002\u00171|7-\u00197BI6Lg\u000eI\u0001\fe\u0016lw\u000e^3BI6Lg.\u0006\u0002\u0003<A\u0019\u0011L!\u0010\n\u0007\t}\u0002JA\nSK6|G/\u001a(fi^|'o[\"mS\u0016tG/A\bsK6|G/Z!e[&tw\fJ3r)\u0011\tiN!\u0012\t\u0013\u0005\u0015\u0018'!AA\u0002\tm\u0012\u0001\u0004:f[>$X-\u00113nS:\u0004\u0013A\u0006:f[>$XmQ8oiJ|G\u000e\\3s\u001d>$W-\u00133\u0016\u0005\t5\u0003#B)\u0003P\tM\u0013b\u0001B)%\n1q\n\u001d;j_:\u00042!\u0015B+\u0013\r\u00119F\u0015\u0002\u0004\u0013:$\u0018A\u0007:f[>$XmQ8oiJ|G\u000e\\3s\u001d>$W-\u00133`I\u0015\fH\u0003BAo\u0005;B\u0011\"!:5\u0003\u0003\u0005\rA!\u0014\u0002/I,Wn\u001c;f\u0007>tGO]8mY\u0016\u0014hj\u001c3f\u0013\u0012\u0004\u0013\u0001\u0003;fCJ$un\u001e8\u0015\u0005\u0005u\u0007f\u0001\u001c\u0003hA!!\u0011\u000eB<\u001b\t\u0011YG\u0003\u0003\u0003n\t=\u0014aA1qS*!!\u0011\u000fB:\u0003\u001dQW\u000f]5uKJT1A!\u001el\u0003\u0015QWO\\5u\u0013\u0011\u0011IHa\u001b\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017!\u0007;fgR\u001cuN\u001c8fGRLwN\\'pI\u0016LeNY8v]\u0012D3a\u000eB@!\u0011\u0011IG!!\n\t\t\r%1\u000e\u0002\u0005)\u0016\u001cH/\u0001\u000euKN$8i\u001c8oK\u000e$\u0018n\u001c8N_\u0012,w*\u001e;c_VtG\rK\u00029\u0005\u007f\n\u0001\u0004^3tiB+'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8oQ\rI$qP\u00011i\u0016\u001cH\u000fU3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t'+Z7pi\u0016\u001cuN\u001c;s_2dWM\u001d(pi.swn\u001e8)\u0007i\u0012y(\u0001\fuKN$hj\u001c;M_\u000e\fGnQ8oiJ|G\u000e\\3sQ\rY$qP\u0001\ri\u0016\u001cH/T3uC\u0012\fG/\u0019\u0015\u0004y\t}\u0014a\u0004;fgR\u0014VmY8oM&<WO]3)\u0007u\u0012y(\u0001\u0012uKN$(+Z2p]\u001aLw-\u001e:f/&$\bnQ8o]\u0016\u001cG/[8o%\u0016\u001cX\r\u001e\u0015\u0004}\t}\u0014AE2sK\u0006$XM\u0011:pW\u0016\u00148i\u001c8gS\u001e$\u0012\u0001X\u0001\u0017g\u0016$X\u000f]\"p]:,7\r^5p]6\u000bg.Y4feR1\u0011Q\u001cBU\u0005gCqAa+A\u0001\u0004\u0011i+\u0001\bd_:tWm\u0019;j_:lu\u000eZ3\u0011\u0007e\u0013y+C\u0002\u00032\"\u0013abQ8o]\u0016\u001cG/[8o\u001b>$W\rC\u0005\u00036\u0002\u0003\n\u00111\u0001\u00038\u0006\t\u0012n\u001d'pG\u0006d7i\u001c8ue>dG.\u001a:\u0011\u0007E\u0013I,C\u0002\u0003<J\u0013qAQ8pY\u0016\fg.\u0001\u0011tKR,\boQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3sI\u0011,g-Y;mi\u0012\u0012TC\u0001BaU\u0011\u00119La1,\u0005\t\u0015\u0007\u0003\u0002Bd\u0005#l!A!3\u000b\t\t-'QZ\u0001\nk:\u001c\u0007.Z2lK\u0012T1Aa4S\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0005'\u0014IMA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fAb]3u+B\u001c\u0005.\u00198oK2$b!!8\u0003Z\nm\u0007bBAB\u0005\u0002\u0007\u0011q\u0011\u0005\b\u0005;\u0014\u0005\u0019\u0001B*\u0003%\u0019\u0007.\u00198oK2LE-A\u0006sKZ,'o]3O_\u0012,GC\u0002Br\u0005S\u0014i\u000f\u0005\u0003\u0002\n\n\u0015\u0018\u0002\u0002Bt\u0003\u0017\u00131BU3wKJ\u001cXMT8eK\"9!1^\"A\u0002\tM\u0013A\u00028pI\u0016LE\rC\u0005\u0003p\u000e\u0003\n\u00111\u0001\u0003T\u0005I!/Z9vKN$\u0018\nZ\u0001\u0016e\u00164XM]:f\u001d>$W\r\n3fM\u0006,H\u000e\u001e\u00133+\t\u0011)P\u000b\u0003\u0003T\t\r\u0017!\u0004:f[>$Xm\u00117vgR,'/\u0006\u0002\u0003|B!!Q B\u0000\u001b\t\tI(\u0003\u0003\u0004\u0002\u0005e$aB\"mkN$XM]\u0001&S:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|gNU3rk\u0016\u001cHoQ8v]R,\"Aa\u0015")
public class ClusterLinkSourceConnectionManagerTest {
    private final KafkaConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig = this.createBrokerConfig();
    private final AuthorizerServerInfo kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo = (AuthorizerServerInfo)EasyMock.createNiceMock(AuthorizerServerInfo.class);
    private final String linkName;
    private final UUID linkId = UUID.randomUUID();
    private final String kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId;
    private final String destClusterId;
    private final ClusterLinkData kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData = new ClusterLinkData(this.linkName(), this.linkId(), (Option)new Some((Object)this.destClusterId()), (Option)None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final KafkaController kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller = (KafkaController)EasyMock.createNiceMock(KafkaController.class);
    private final SocketServer kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer = (SocketServer)EasyMock.createNiceMock(SocketServer.class);
    private final ClusterLinkManager linkManager = (ClusterLinkManager)EasyMock.createNiceMock(ClusterLinkManager.class);
    private final MockTime kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final KafkaChannel channel = (KafkaChannel)EasyMock.createNiceMock(KafkaChannel.class);
    private final AtomicInteger kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests = new AtomicInteger();
    private final scala.collection.mutable.Set<SocketChannel> kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
    private ClusterLinkConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig;
    private ClusterLinkMetrics kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics;
    private ClusterLinkSourceConnectionManager connManager;
    private final MockClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient = new MockClient((Time)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time());
    private ConfluentAdmin kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin;
    private RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin;
    private Option<Object> remoteControllerNodeId = new Some((Object)BoxesRunTime.boxToInteger((int)20));

    public KafkaConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig;
    }

    public AuthorizerServerInfo kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo;
    }

    private String linkName() {
        return this.linkName;
    }

    private UUID linkId() {
        return this.linkId;
    }

    public String kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    public ClusterLinkData kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    public KafkaController kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller;
    }

    public SocketServer kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    public MockTime kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private KafkaChannel channel() {
        return this.channel;
    }

    public AtomicInteger kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests;
    }

    public scala.collection.mutable.Set<SocketChannel> kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig;
    }

    private void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig_$eq(ClusterLinkConfig x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig = x$1;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics;
    }

    private void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics_$eq(ClusterLinkMetrics x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics = x$1;
    }

    private ClusterLinkSourceConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkSourceConnectionManager x$1) {
        this.connManager = x$1;
    }

    public MockClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient;
    }

    public ConfluentAdmin kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin = x$1;
    }

    public RemoteNetworkClient kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin() {
        return this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin;
    }

    public void kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(RemoteNetworkClient x$1) {
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin = x$1;
    }

    private Option<Object> remoteControllerNodeId() {
        return this.remoteControllerNodeId;
    }

    private void remoteControllerNodeId_$eq(Option<Object> x$1) {
        this.remoteControllerNodeId = x$1;
    }

    @AfterEach
    public void tearDown() {
        if (this.connManager() != null) {
            this.connManager().shutdown();
        }
        this.metrics().close();
    }

    @Test
    public void testConnectionModeInbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, this.setupConnectionManager$default$2());
        Assertions.assertThrows(IllegalStateException.class, () -> this.connManager().startup());
        Assertions.assertThrows(InvalidRequestException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin());
    }

    @Test
    public void testConnectionModeOutbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, this.setupConnectionManager$default$2());
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testPersistentConnection() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkSourceConnectionManagerTest.$anonfun$testPersistentConnection$1(this)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ClusterLinkSourceConnectionManagerTest.$anonfun$testPersistentConnection$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), this.reverseNode$default$2()));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnectionRemoteControllerNotKnown() {
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
    }

    @Test
    public void testNotLocalController() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, 5));
        KafkaChannel channel2 = (KafkaChannel)EasyMock.createNiceMock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 10));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testMetadata() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().startup();
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.connManager().onControllerChange(true);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.connManager().onControllerChange(false);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.remoteControllerNodeId_$eq((Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)25)));
        this.connManager().onNewMetadata(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster());
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().onNewMetadata(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster());
        Assertions.assertEquals((int)3, (int)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigure() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        SocketChannel socketChannel = this.channel().socketChannel();
        EasyMock.replay((Object[])new Object[]{socketChannel});
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), this.reverseNode$default$2()));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("metadata.max.age.ms", "1000");
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps()));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testReconfigureWithConnectionReset() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel2 = (KafkaChannel)EasyMock.createNiceMock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), this.reverseNode$default$2()));
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 5));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps()));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"security.protocol"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new SocketChannel[]{this.channel().socketChannel(), channel2.socketChannel()})), this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels());
    }

    private KafkaConfig createBrokerConfig() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    private void setupConnectionManager(ConnectionMode connectionMode, boolean isLocalController) {
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LinkModeProp(), LinkMode.Source$.MODULE$.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "EXTERNAL");
        this.linkProps().put("bootstrap.servers", "localhost:123");
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps()));
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics_$eq(new ClusterLinkMetrics(this.linkName(), this.linkId(), (LinkMode)LinkMode.Source$.MODULE$, this.linkManager(), (Option)None$.MODULE$, this.metrics(), (Option)None$.MODULE$));
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics().startup();
        Endpoint endpoint = new Endpoint("EXTERNAL", SecurityProtocol.PLAINTEXT, "host", 123);
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo().endpoints()).andReturn(Collections.singletonList(endpoint));
        ControllerContext controllerContext = (ControllerContext)EasyMock.createNiceMock(ControllerContext.class);
        EasyMock.expect((Object)controllerContext.liveBrokerIds()).andReturn((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3}))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller().isActive())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)isLocalController)).anyTimes();
        EasyMock.expect((Object)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller().controllerContext()).andReturn((Object)controllerContext).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo(), controllerContext, this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller()});
        this.setUpChannel(this.channel(), 123);
        this.connManager_$eq(new ClusterLinkSourceConnectionManager(this){
            private final /* synthetic */ ClusterLinkSourceConnectionManagerTest $outer;

            public ConfluentAdmin createLocalAdmin() {
                Map<String, String> configs = Collections.singletonMap("bootstrap.servers", "localhost:9092");
                AdminClientConfig config = new AdminClientConfig(configs);
                AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(), 1000L, 300000L);
                List<Node> nodes = Collections.singletonList(new Node(1, "host1", 123));
                Cluster cluster = new Cluster(this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
                metadataManager.update(cluster, this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time().milliseconds());
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(ConfluentAdminUtils.createConfluentAdmin((AdminClientConfig)config, (AdminMetadataManager)metadataManager, (KafkaClient)this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient(), (Time)this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time()));
                return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin();
            }

            public RemoteNetworkClient createRemoteAdmin() {
                NetworkClient networkClient = (NetworkClient)EasyMock.createNiceMock(NetworkClient.class);
                SourceReverseConnectionManager reverseConnectionManager = (SourceReverseConnectionManager)EasyMock.createNiceMock(SourceReverseConnectionManager.class);
                ClusterLinkMetadata metadata = (ClusterLinkMetadata)EasyMock.createNiceMock(ClusterLinkMetadata.class);
                ClusterLinkMetadataThread metadataThread = (ClusterLinkMetadataThread)EasyMock.createNiceMock(ClusterLinkMetadataThread.class);
                EasyMock.expect((Object)networkClient.reverseConnectionManager()).andReturn((Object)reverseConnectionManager).anyTimes();
                EasyMock.expect((Object)metadataThread.clusterLinkMetadata()).andReturn((Object)metadata).anyTimes();
                EasyMock.expect((Object)metadata.fetch()).andAnswer(() -> $this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster()).anyTimes();
                EasyMock.expect((Object)BoxesRunTime.boxToInteger((int)metadata.requestUpdate())).andAnswer(() -> BoxesRunTime.boxToInteger((int)$anon$1.$anonfun$createRemoteAdmin$2(this))).anyTimes();
                EasyMock.replay((Object[])new Object[]{networkClient, metadataThread, metadata});
                Assertions.assertNull((Object)this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin());
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(new RemoteNetworkClient(networkClient, metadataThread));
                return this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin();
            }

            public void closeReverseConnectionAdmin() {
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localAdmin_$eq(null);
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteAdmin_$eq(null);
            }

            public void closeSocketChannel(SocketChannel socketChannel) {
                super.closeSocketChannel(socketChannel);
                this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$closedChannels().$plus$eq((Object)socketChannel);
            }

            public static final /* synthetic */ int $anonfun$createRemoteAdmin$2($anon$1 $this) {
                return $this.$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$metadataRequests().incrementAndGet();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super($outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkData(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkConfig(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId(), (Option)None$.MODULE$, $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$linkMetrics(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$controller(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$socketServer(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$serverInfo(), (Time)$outer.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$time());
            }
        });
    }

    private boolean setupConnectionManager$default$2() {
        return false;
    }

    private void setUpChannel(KafkaChannel channel, int channelId) {
        EasyMock.expect((Object)channel.id()).andReturn((Object)Integer.toString(channelId)).anyTimes();
        SocketChannel socketChannel = (SocketChannel)EasyMock.createNiceMock(SocketChannel.class);
        EasyMock.expect((Object)channel.socketChannel()).andReturn((Object)socketChannel).anyTimes();
        EasyMock.replay((Object[])new Object[]{channel});
    }

    private ReverseNode reverseNode(int nodeId, int requestId) {
        return new ReverseNode(nodeId, nodeId, new StringBuilder(4).append("host").append(nodeId).toString(), 1234, this.linkId(), requestId, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public Cluster kafka$server$link$ClusterLinkSourceConnectionManagerTest$$remoteCluster() {
        ArrayList<Node> nodes = new ArrayList<Node>();
        nodes.add(new Node(1, "host1", 123));
        Option remoteControllerNode = this.remoteControllerNodeId().map((Function1 & Serializable & scala.Serializable)id -> ClusterLinkSourceConnectionManagerTest.$anonfun$remoteCluster$1(BoxesRunTime.unboxToInt((Object)id)));
        remoteControllerNode.foreach((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)nodes.add(x$1)));
        return new Cluster(this.destClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), (Node)remoteControllerNode.orNull(Predef$.MODULE$.$conforms()));
    }

    private int initiateReverseConnectionRequestCount() {
        return ((TraversableOnce)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$localMockClient().requests()).asScala()).count((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSourceConnectionManagerTest.$anonfun$initiateReverseConnectionRequestCount$1(x$1)));
    }

    public static final /* synthetic */ boolean $anonfun$testPersistentConnection$1(ClusterLinkSourceConnectionManagerTest $this) {
        return $this.initiateReverseConnectionRequestCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testPersistentConnection$2() {
        return "Persistent connection not initiated";
    }

    public static final /* synthetic */ Node $anonfun$remoteCluster$1(int id) {
        return new Node(id, new StringBuilder(4).append("host").append(id).toString(), 123);
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnectionRequestCount$1(ClientRequest x$1) {
        ApiKeys apiKeys = x$1.apiKey();
        ApiKeys apiKeys2 = ApiKeys.INITIATE_REVERSE_CONNECTIONS;
        return !(apiKeys != null ? !apiKeys.equals(apiKeys2) : apiKeys2 != null);
    }

    public ClusterLinkSourceConnectionManagerTest() {
        this.linkName = "testLink";
        this.kafka$server$link$ClusterLinkSourceConnectionManagerTest$$sourceClusterId = "sourceCluster";
        this.destClusterId = "destCluster";
    }
}

