/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.File;
import java.io.Serializable;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.network.SocketServer;
import kafka.server.ClusterLinkRequestQuota;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkOutboundConnectionManager;
import kafka.server.link.ClusterLinkOutboundConnectionManagerTest$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.RemoteNetworkClient;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.SourceReverseConnectionManager;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\r\rc\u0001\u0002*T\u0001iCQ!\u0019\u0001\u0005\u0002\tDq!\u001a\u0001C\u0002\u0013%a\r\u0003\u0004l\u0001\u0001\u0006Ia\u001a\u0005\bY\u0002\u0011\r\u0011\"\u0003n\u0011\u0019Q\b\u0001)A\u0005]\"91\u0010\u0001b\u0001\n\u0013a\bbBA\u0001\u0001\u0001\u0006I! \u0005\n\u0003\u0007\u0001!\u0019!C\u0005\u0003\u000bA\u0001\"a\u0006\u0001A\u0003%\u0011q\u0001\u0005\n\u00033\u0001!\u0019!C\u0005\u00037A\u0001\"!\u000b\u0001A\u0003%\u0011Q\u0004\u0005\n\u0003W\u0001!\u0019!C\u0005\u0003\u000bA\u0001\"!\f\u0001A\u0003%\u0011q\u0001\u0005\n\u0003_\u0001!\u0019!C\u0005\u0003\u000bA\u0001\"!\r\u0001A\u0003%\u0011q\u0001\u0005\n\u0003g\u0001!\u0019!C\u0005\u0003kA\u0001\"a\u0011\u0001A\u0003%\u0011q\u0007\u0005\n\u0003\u000b\u0002!\u0019!C\u0005\u0003\u000fB\u0001\"!\u0016\u0001A\u0003%\u0011\u0011\n\u0005\n\u0003/\u0002!\u0019!C\u0005\u00033B\u0001\"!\u0019\u0001A\u0003%\u00111\f\u0005\n\u0003G\u0002!\u0019!C\u0005\u0003KB\u0001\"a\u001d\u0001A\u0003%\u0011q\r\u0005\n\u0003k\u0002!\u0019!C\u0005\u0003oB\u0001\"a \u0001A\u0003%\u0011\u0011\u0010\u0005\n\u0003\u0003\u0003!\u0019!C\u0005\u0003\u0007C\u0001\"a$\u0001A\u0003%\u0011Q\u0011\u0005\n\u0003#\u0003!\u0019!C\u0005\u0003'C\u0001\"a(\u0001A\u0003%\u0011Q\u0013\u0005\n\u0003C\u0003!\u0019!C\u0005\u0003GC\u0001\"a,\u0001A\u0003%\u0011Q\u0015\u0005\n\u0003c\u0003!\u0019!C\u0005\u0003gC\u0001\"!2\u0001A\u0003%\u0011Q\u0017\u0005\n\u0003\u000f\u0004!\u0019!C\u0005\u0003\u0013D\u0001\"!9\u0001A\u0003%\u00111\u001a\u0005\f\u0003G\u0004\u0001\u0019!a\u0001\n\u0013\t)\u000fC\u0006\u0002n\u0002\u0001\r\u00111A\u0005\n\u0005=\bbCA~\u0001\u0001\u0007\t\u0011)Q\u0005\u0003OD1\"!@\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u0000\"Y!q\u0001\u0001A\u0002\u0003\u0007I\u0011\u0002B\u0005\u0011-\u0011i\u0001\u0001a\u0001\u0002\u0003\u0006KA!\u0001\t\u0017\t=\u0001\u00011AA\u0002\u0013%!\u0011\u0003\u0005\f\u00053\u0001\u0001\u0019!a\u0001\n\u0013\u0011Y\u0002C\u0006\u0003 \u0001\u0001\r\u0011!Q!\n\tM\u0001\"\u0003B\u0011\u0001\t\u0007I\u0011\u0002B\u0012\u0011!\u0011\t\u0004\u0001Q\u0001\n\t\u0015\u0002b\u0003B\u001a\u0001\u0001\u0007\t\u0019!C\u0005\u0005kA1Ba\u0011\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003F!Y!\u0011\n\u0001A\u0002\u0003\u0005\u000b\u0015\u0002B\u001c\u0011-\u0011Y\u0005\u0001a\u0001\u0002\u0004%IA!\u0014\t\u0017\tU\u0003\u00011AA\u0002\u0013%!q\u000b\u0005\f\u00057\u0002\u0001\u0019!A!B\u0013\u0011y\u0005C\u0005\u0003^\u0001\u0001\r\u0011\"\u0003\u0003`!I!Q\u000e\u0001A\u0002\u0013%!q\u000e\u0005\t\u0005g\u0002\u0001\u0015)\u0003\u0003b!I!Q\u000f\u0001A\u0002\u0013%!q\u000f\u0005\n\u0005\u007f\u0002\u0001\u0019!C\u0005\u0005\u0003C\u0001B!\"\u0001A\u0003&!\u0011\u0010\u0005\b\u0005\u000f\u0003A\u0011\u0001BE\u0011\u001d\u0011\t\u000b\u0001C\u0001\u0005\u0013CqAa+\u0001\t\u0003\u0011I\tC\u0004\u00030\u0002!\tA!#\t\u000f\tM\u0006\u0001\"\u0001\u0003\n\"9!q\u0017\u0001\u0005\u0002\t%\u0005b\u0002B^\u0001\u0011\u0005!\u0011\u0012\u0005\b\u0005\u007f\u0003A\u0011\u0001BE\u0011\u001d\u0011\u0019\r\u0001C\u0001\u0005\u0013CqAa2\u0001\t\u0003\u0011I\tC\u0004\u0003L\u0002!IA!4\t\u000f\t=\u0007\u0001\"\u0003\u0003R\"I!q\u001c\u0001\u0012\u0002\u0013%!\u0011\u001d\u0005\b\u0005o\u0004A\u0011\u0002B}\u0011\u001d\u0019\t\u0001\u0001C\u0005\u0007\u0007A\u0011ba\u0005\u0001#\u0003%Ia!\u0006\t\u000f\re\u0001\u0001\"\u0003\u0004\u001c!911\u0005\u0001\u0005\n\r\u0015raBB\u0014'\"\u00051\u0011\u0006\u0004\u0007%NC\taa\u000b\t\r\u0005tE\u0011AB\u0017\u0011\u001d\u0019yC\u0014C\u0001\u0005\u0013Cqa!\u000fO\t\u0003\u0011II\u0001\u0015DYV\u001cH/\u001a:MS:\\w*\u001e;c_VtGmQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s)\u0016\u001cHO\u0003\u0002U+\u0006!A.\u001b8l\u0015\t1v+\u0001\u0004tKJ4XM\u001d\u0006\u00021\u0006)1.\u00194lC\u000e\u00011C\u0001\u0001\\!\tav,D\u0001^\u0015\u0005q\u0016!B:dC2\f\u0017B\u00011^\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012a\u0019\t\u0003I\u0002i\u0011aU\u0001\rEJ|7.\u001a:D_:4\u0017nZ\u000b\u0002OB\u0011\u0001.[\u0007\u0002+&\u0011!.\u0016\u0002\f\u0017\u000647.Y\"p]\u001aLw-A\u0007ce>\\WM]\"p]\u001aLw\rI\u0001\u000bg\u0016\u0014h/\u001a:J]\u001a|W#\u00018\u0011\u0005=DX\"\u00019\u000b\u0005E\u0014\u0018AC1vi\"|'/\u001b>fe*\u0011ak\u001d\u0006\u00031RT!!\u001e<\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00059\u0018aA8sO&\u0011\u0011\u0010\u001d\u0002\u0015\u0003V$\bn\u001c:ju\u0016\u00148+\u001a:wKJLeNZ8\u0002\u0017M,'O^3s\u0013:4w\u000eI\u0001\u0006cV|G/Y\u000b\u0002{B\u0011\u0001N`\u0005\u0003\u007fV\u0013qc\u00117vgR,'\u000fT5oWJ+\u0017/^3tiF+x\u000e^1\u0002\rE,x\u000e^1!\u0003!a\u0017N\\6OC6,WCAA\u0004!\u0011\tI!a\u0005\u000e\u0005\u0005-!\u0002BA\u0007\u0003\u001f\tA\u0001\\1oO*\u0011\u0011\u0011C\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002\u0016\u0005-!AB*ue&tw-A\u0005mS:\\g*Y7fA\u00051A.\u001b8l\u0013\u0012,\"!!\b\u0011\t\u0005}\u0011QE\u0007\u0003\u0003CQ1!a\tt\u0003\u0019\u0019w.\\7p]&!\u0011qEA\u0011\u0005\u0011)V/\u001b3\u0002\u000f1Lgn[%eA\u0005y1o\\;sG\u0016\u001cE.^:uKJLE-\u0001\tt_V\u00148-Z\"mkN$XM]%eA\u0005iA-Z:u\u00072,8\u000f^3s\u0013\u0012\fa\u0002Z3ti\u000ecWo\u001d;fe&#\u0007%\u0001\u0005mS:\\G)\u0019;b+\t\t9\u0004\u0005\u0003\u0002:\u0005}RBAA\u001e\u0015\r\tidV\u0001\u0003u.LA!!\u0011\u0002<\ty1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-A\u0005mS:\\G)\u0019;bA\u0005IA.\u001b8l!J|\u0007o]\u000b\u0003\u0003\u0013\u0002B!a\u0013\u0002R5\u0011\u0011Q\n\u0006\u0005\u0003\u001f\ny!\u0001\u0003vi&d\u0017\u0002BA*\u0003\u001b\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0003)a\u0017N\\6Qe>\u00048\u000fI\u0001\u0010[\u0016$\u0018\rZ1uC6\u000bg.Y4feV\u0011\u00111\f\t\u0004I\u0006u\u0013bAA0'\nQ2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0006\u0001R.\u001a;bI\u0006$\u0018-T1oC\u001e,'\u000fI\u0001\rg>\u001c7.\u001a;TKJ4XM]\u000b\u0003\u0003O\u0002B!!\u001b\u0002p5\u0011\u00111\u000e\u0006\u0004\u0003[:\u0016a\u00028fi^|'o[\u0005\u0005\u0003c\nYG\u0001\u0007T_\u000e\\W\r^*feZ,'/A\u0007t_\u000e\\W\r^*feZ,'\u000fI\u0001\fY&t7.T1oC\u001e,'/\u0006\u0002\u0002zA\u0019A-a\u001f\n\u0007\u0005u4K\u0001\nDYV\u001cH/\u001a:MS:\\W*\u00198bO\u0016\u0014\u0018\u0001\u00047j].l\u0015M\\1hKJ\u0004\u0013\u0001\u0002;j[\u0016,\"!!\"\u0011\t\u0005\u001d\u00151R\u0007\u0003\u0003\u0013S1!a\u0014s\u0013\u0011\ti)!#\u0003\u00115{7m\u001b+j[\u0016\fQ\u0001^5nK\u0002\nq!\\3ue&\u001c7/\u0006\u0002\u0002\u0016B!\u0011qSAN\u001b\t\tIJ\u0003\u0003\u0002\u0012\u0006\u0005\u0012\u0002BAO\u00033\u0013q!T3ue&\u001c7/\u0001\u0005nKR\u0014\u0018nY:!\u0003\u001d\u0019\u0007.\u00198oK2,\"!!*\u0011\t\u0005\u001d\u00161V\u0007\u0003\u0003SSA!!\u001c\u0002\"%!\u0011QVAU\u00051Y\u0015MZ6b\u0007\"\fgN\\3m\u0003!\u0019\u0007.\u00198oK2\u0004\u0013\u0001E7fi\u0006$\u0017\r^1SKF,Xm\u001d;t+\t\t)\f\u0005\u0003\u00028\u0006\u0005WBAA]\u0015\u0011\tY,!0\u0002\r\u0005$x.\\5d\u0015\u0011\ty,!\u0014\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0003\u0002D\u0006e&!D!u_6L7-\u00138uK\u001e,'/A\tnKR\fG-\u0019;b%\u0016\fX/Z:ug\u0002\nab\u00197pg\u0016$7\t[1o]\u0016d7/\u0006\u0002\u0002LB1\u0011QZAl\u00037l!!a4\u000b\t\u0005E\u00171[\u0001\b[V$\u0018M\u00197f\u0015\r\t).X\u0001\u000bG>dG.Z2uS>t\u0017\u0002BAm\u0003\u001f\u00141aU3u!\u0011\t9+!8\n\t\u0005}\u0017\u0011\u0016\u0002\u000f%\u00164XM]:f\u0007\"\fgN\\3m\u0003=\u0019Gn\\:fI\u000eC\u0017M\u001c8fYN\u0004\u0013A\u00037j].\u001cuN\u001c4jOV\u0011\u0011q\u001d\t\u0004I\u0006%\u0018bAAv'\n\t2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4\u0002\u001d1Lgn[\"p]\u001aLwm\u0018\u0013fcR!\u0011\u0011_A|!\ra\u00161_\u0005\u0004\u0003kl&\u0001B+oSRD\u0011\"!?&\u0003\u0003\u0005\r!a:\u0002\u0007a$\u0013'A\u0006mS:\\7i\u001c8gS\u001e\u0004\u0013a\u00037j].lU\r\u001e:jGN,\"A!\u0001\u0011\u0007\u0011\u0014\u0019!C\u0002\u0003\u0006M\u0013!c\u00117vgR,'\u000fT5oW6+GO]5dg\u0006yA.\u001b8l\u001b\u0016$(/[2t?\u0012*\u0017\u000f\u0006\u0003\u0002r\n-\u0001\"CA}Q\u0005\u0005\t\u0019\u0001B\u0001\u00031a\u0017N\\6NKR\u0014\u0018nY:!\u0003-\u0019wN\u001c8NC:\fw-\u001a:\u0016\u0005\tM\u0001c\u00013\u0003\u0016%\u0019!qC*\u0003I\rcWo\u001d;fe2Kgn[(vi\n|WO\u001c3D_:tWm\u0019;j_:l\u0015M\\1hKJ\fqbY8o]6\u000bg.Y4fe~#S-\u001d\u000b\u0005\u0003c\u0014i\u0002C\u0005\u0002z.\n\t\u00111\u0001\u0003\u0014\u0005a1m\u001c8o\u001b\u0006t\u0017mZ3sA\u0005yAn\\2bY6{7m[\"mS\u0016tG/\u0006\u0002\u0003&A!!q\u0005B\u0017\u001b\t\u0011ICC\u0002\u0003,M\fqa\u00197jK:$8/\u0003\u0003\u00030\t%\"AC'pG.\u001cE.[3oi\u0006\u0001Bn\\2bY6{7m[\"mS\u0016tG\u000fI\u0001\u000bY>\u001c\u0017\r\\!e[&tWC\u0001B\u001c!\u0011\u0011IDa\u0010\u000e\u0005\tm\"\u0002\u0002B\u001f\u0005S\tQ!\u00193nS:LAA!\u0011\u0003<\tq1i\u001c8gYV,g\u000e^!e[&t\u0017A\u00047pG\u0006d\u0017\tZ7j]~#S-\u001d\u000b\u0005\u0003c\u00149\u0005C\u0005\u0002zB\n\t\u00111\u0001\u00038\u0005YAn\\2bY\u0006#W.\u001b8!\u0003-\u0011X-\\8uK\u0006#W.\u001b8\u0016\u0005\t=\u0003c\u00013\u0003R%\u0019!1K*\u0003'I+Wn\u001c;f\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\u0002\u001fI,Wn\u001c;f\u0003\u0012l\u0017N\\0%KF$B!!=\u0003Z!I\u0011\u0011`\u001a\u0002\u0002\u0003\u0007!qJ\u0001\re\u0016lw\u000e^3BI6Lg\u000eI\u0001\u0017e\u0016lw\u000e^3D_:$(o\u001c7mKJtu\u000eZ3JIV\u0011!\u0011\r\t\u00069\n\r$qM\u0005\u0004\u0005Kj&AB(qi&|g\u000eE\u0002]\u0005SJ1Aa\u001b^\u0005\rIe\u000e^\u0001\u001be\u0016lw\u000e^3D_:$(o\u001c7mKJtu\u000eZ3JI~#S-\u001d\u000b\u0005\u0003c\u0014\t\bC\u0005\u0002zZ\n\t\u00111\u0001\u0003b\u00059\"/Z7pi\u0016\u001cuN\u001c;s_2dWM\u001d(pI\u0016LE\rI\u0001\u0012SNdunY1m\u0007>tGO]8mY\u0016\u0014XC\u0001B=!\ra&1P\u0005\u0004\u0005{j&a\u0002\"p_2,\u0017M\\\u0001\u0016SNdunY1m\u0007>tGO]8mY\u0016\u0014x\fJ3r)\u0011\t\tPa!\t\u0013\u0005e\u0018(!AA\u0002\te\u0014AE5t\u0019>\u001c\u0017\r\\\"p]R\u0014x\u000e\u001c7fe\u0002\n\u0001\u0002^3be\u0012{wO\u001c\u000b\u0003\u0003cD3a\u000fBG!\u0011\u0011yI!(\u000e\u0005\tE%\u0002\u0002BJ\u0005+\u000b1!\u00199j\u0015\u0011\u00119J!'\u0002\u000f),\b/\u001b;fe*\u0019!1\u0014<\u0002\u000b),h.\u001b;\n\t\t}%\u0011\u0013\u0002\n\u0003\u001a$XM]#bG\"\f\u0011\u0004^3ti\u000e{gN\\3di&|g.T8eK&s'm\\;oI\"\u001aAH!*\u0011\t\t=%qU\u0005\u0005\u0005S\u0013\tJ\u0001\u0003UKN$\u0018A\u0007;fgR\u001cuN\u001c8fGRLwN\\'pI\u0016|U\u000f\u001e2pk:$\u0007fA\u001f\u0003&\u0006AB/Z:u!\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8)\u0007y\u0012)+\u0001\u0019uKN$\b+\u001a:tSN$XM\u001c;D_:tWm\u0019;j_:\u0014V-\\8uK\u000e{g\u000e\u001e:pY2,'OT8u\u0017:|wO\u001c\u0015\u0004\u007f\t\u0015\u0016A\u0006;fgRtu\u000e\u001e'pG\u0006d7i\u001c8ue>dG.\u001a:)\u0007\u0001\u0013)+\u0001\u0007uKN$X*\u001a;bI\u0006$\u0018\rK\u0002B\u0005K\u000bq\u0002^3tiJ+7m\u001c8gS\u001e,(/\u001a\u0015\u0004\u0005\n\u0015\u0016A\t;fgR\u0014VmY8oM&<WO]3XSRD7i\u001c8oK\u000e$\u0018n\u001c8SKN,G\u000fK\u0002D\u0005K\u000bQ\u0004^3tiJ+7m\u001c8gS\u001e,(/Z,ji\"\f5/\u001f8d\u00072|7/\u001a\u0015\u0004\t\n\u0015\u0016AE2sK\u0006$XM\u0011:pW\u0016\u00148i\u001c8gS\u001e$\u0012aZ\u0001\u0017g\u0016$X\u000f]\"p]:,7\r^5p]6\u000bg.Y4feR1\u0011\u0011\u001fBj\u0005;DqA!6G\u0001\u0004\u00119.\u0001\bd_:tWm\u0019;j_:lu\u000eZ3\u0011\u0007\u0011\u0014I.C\u0002\u0003\\N\u0013abQ8o]\u0016\u001cG/[8o\u001b>$W\rC\u0005\u0003v\u0019\u0003\n\u00111\u0001\u0003z\u0005\u00013/\u001a;va\u000e{gN\\3di&|g.T1oC\u001e,'\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\u0011\u0019O\u000b\u0003\u0003z\t\u00158F\u0001Bt!\u0011\u0011IOa=\u000e\u0005\t-(\u0002\u0002Bw\u0005_\f\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\tEX,\u0001\u0006b]:|G/\u0019;j_:LAA!>\u0003l\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002\u0019M,G/\u00169DQ\u0006tg.\u001a7\u0015\r\u0005E(1 B\u007f\u0011\u001d\t\t\u000b\u0013a\u0001\u0003KCqAa@I\u0001\u0004\u00119'A\u0005dQ\u0006tg.\u001a7JI\u0006Y!/\u001a<feN,gj\u001c3f)\u0019\u0019)aa\u0003\u0004\u0010A!\u0011qUB\u0004\u0013\u0011\u0019I!!+\u0003\u0017I+g/\u001a:tK:{G-\u001a\u0005\b\u0007\u001bI\u0005\u0019\u0001B4\u0003\u0019qw\u000eZ3JI\"I1\u0011C%\u0011\u0002\u0003\u0007!qM\u0001\ne\u0016\fX/Z:u\u0013\u0012\fQC]3wKJ\u001cXMT8eK\u0012\"WMZ1vYR$#'\u0006\u0002\u0004\u0018)\"!q\rBs\u00035\u0011X-\\8uK\u000ecWo\u001d;feV\u00111Q\u0004\t\u0005\u0003?\u0019y\"\u0003\u0003\u0004\"\u0005\u0005\"aB\"mkN$XM]\u0001&S:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|gNU3rk\u0016\u001cHoQ8v]R,\"Aa\u001a\u0002Q\rcWo\u001d;fe2Kgn[(vi\n|WO\u001c3D_:tWm\u0019;j_:l\u0015M\\1hKJ$Vm\u001d;\u0011\u0005\u0011t5C\u0001(\\)\t\u0019I#\u0001\u0006tKR,\u0006o\u00117bgND3\u0001UB\u001a!\u0011\u0011yi!\u000e\n\t\r]\"\u0011\u0013\u0002\n\u0005\u00164wN]3BY2\fQ\u0002^3be\u0012{wO\\\"mCN\u001c\bfA)\u0004>A!!qRB \u0013\u0011\u0019\tE!%\u0003\u0011\u00053G/\u001a:BY2\u0004")
public class ClusterLinkOutboundConnectionManagerTest {
    private final KafkaConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig = this.createBrokerConfig();
    private final AuthorizerServerInfo kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo = (AuthorizerServerInfo)Mockito.mock(AuthorizerServerInfo.class);
    private final ClusterLinkRequestQuota kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$quota = (ClusterLinkRequestQuota)Mockito.mock(ClusterLinkRequestQuota.class);
    private final String linkName;
    private final Uuid linkId = Uuid.randomUuid();
    private final String kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId;
    private final String destClusterId;
    private final ClusterLinkData kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData = new ClusterLinkData(this.linkName(), this.linkId(), (Option)new Some((Object)this.destClusterId()), (Option)None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final ClusterLinkMetadataManager kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager = (ClusterLinkMetadataManager)Mockito.mock(ClusterLinkMetadataManager.class);
    private final SocketServer kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer = (SocketServer)Mockito.mock(SocketServer.class);
    private final ClusterLinkManager kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkManager = (ClusterLinkManager)Mockito.mock(ClusterLinkManager.class);
    private final MockTime kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final KafkaChannel channel = (KafkaChannel)Mockito.mock(KafkaChannel.class);
    private final AtomicInteger kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests = new AtomicInteger();
    private final scala.collection.mutable.Set<ReverseChannel> kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
    private ClusterLinkConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig;
    private ClusterLinkMetrics kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics;
    private ClusterLinkOutboundConnectionManager connManager;
    private final MockClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient = new MockClient((Time)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time());
    private ConfluentAdmin kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin;
    private RemoteNetworkClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin;
    private Option<Object> remoteControllerNodeId = new Some((Object)BoxesRunTime.boxToInteger((int)20));
    private boolean isLocalController = false;

    @AfterAll
    public static void tearDownClass() {
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@AfterAll");
    }

    @BeforeAll
    public static void setUpClass() {
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@BeforeAll");
    }

    public KafkaConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig;
    }

    public AuthorizerServerInfo kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo;
    }

    public ClusterLinkRequestQuota kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$quota() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$quota;
    }

    private String linkName() {
        return this.linkName;
    }

    private Uuid linkId() {
        return this.linkId;
    }

    public String kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    public ClusterLinkData kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    public ClusterLinkMetadataManager kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager;
    }

    public SocketServer kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer;
    }

    public ClusterLinkManager kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkManager() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkManager;
    }

    public MockTime kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private KafkaChannel channel() {
        return this.channel;
    }

    public AtomicInteger kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests;
    }

    public scala.collection.mutable.Set<ReverseChannel> kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig = x$1;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics = x$1;
    }

    private ClusterLinkOutboundConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkOutboundConnectionManager x$1) {
        this.connManager = x$1;
    }

    public MockClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient;
    }

    public ConfluentAdmin kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin = x$1;
    }

    public RemoteNetworkClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin;
    }

    public void kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin_$eq(RemoteNetworkClient x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin = x$1;
    }

    private Option<Object> remoteControllerNodeId() {
        return this.remoteControllerNodeId;
    }

    private void remoteControllerNodeId_$eq(Option<Object> x$1) {
        this.remoteControllerNodeId = x$1;
    }

    private boolean isLocalController() {
        return this.isLocalController;
    }

    private void isLocalController_$eq(boolean x$1) {
        this.isLocalController = x$1;
    }

    @AfterEach
    public void tearDown() {
        if (this.connManager() != null) {
            this.connManager().shutdown();
        }
        this.metrics().close();
    }

    @Test
    public void testConnectionModeInbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, false);
        Assertions.assertThrows(IllegalStateException.class, () -> this.connManager().startup());
        Assertions.assertThrows(InvalidRequestException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin());
    }

    @Test
    public void testConnectionModeOutbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnection() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        long waitUntilTrue_pause = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkOutboundConnectionManagerTest.$anonfun$testPersistentConnection$1(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Persistent connection not initiated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), waitUntilTrue_pause));
        }
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnectionRemoteControllerNotKnown() {
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
    }

    @Test
    public void testNotLocalController() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, 5));
        KafkaChannel channel2 = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 10));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testMetadata() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().startup();
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(true);
        this.connManager().onControllerChange(true);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(false);
        this.connManager().onControllerChange(false);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(true);
        this.remoteControllerNodeId_$eq((Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)25)));
        Node coordinator = new Node(25, "", 0);
        this.connManager().onNewRemoteLinkCoordinator(coordinator);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().onNewRemoteLinkCoordinator(coordinator);
        Assertions.assertEquals((int)3, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigure() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("metadata.max.age.ms", "1000");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testReconfigureWithConnectionReset() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel2 = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 5));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"security.protocol"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new SocketChannel[]{this.channel().socketChannel(), channel2.socketChannel()})), (Object)((IterableOnceOps)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels().map((Function1 & Serializable)x$6 -> x$6.socketChannel())).toSet());
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigureWithAsyncClose() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel2 = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 5));
        scala.collection.mutable.Set closingChannels = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        ((ConcurrentHashMap)TestUtils.fieldValue((Object)this.connManager(), ClusterLinkOutboundConnectionManager.class, (String)"activeReverseConnections")).values().forEach(c -> c.closeRunnable(() -> closingChannels.$plus$eq(c)));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"security.protocol"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        closingChannels.foreach((Function1 & Serializable)c -> {
            ClusterLinkOutboundConnectionManagerTest.$anonfun$testReconfigureWithAsyncClose$3(c);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new SocketChannel[]{this.channel().socketChannel(), channel2.socketChannel()})), (Object)((IterableOnceOps)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels().map((Function1 & Serializable)x$7 -> x$7.socketChannel())).toSet());
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
    }

    private KafkaConfig createBrokerConfig() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false);
        props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    private void setupConnectionManager(ConnectionMode connectionMode, boolean isLocalController) {
        this.isLocalController_$eq(isLocalController);
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.SOURCE.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "EXTERNAL");
        this.linkProps().put("bootstrap.servers", "localhost:123");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.linkMetrics_$eq(new ClusterLinkMetrics(this.linkName(), this.linkId(), ClusterLinkConfig.LinkMode.SOURCE, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Inbound$.MODULE$, false, this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkManager(), (Option)None$.MODULE$, this.metrics(), (Option)None$.MODULE$, false));
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics().startup();
        Endpoint endpoint = new Endpoint("EXTERNAL", SecurityProtocol.PLAINTEXT, "host", 123);
        Mockito.when((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo().endpoints()).thenReturn(Collections.singletonList(endpoint));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager().isLinkCoordinator(ArgumentMatchers.anyString()))).thenAnswer(invocation -> BoxesRunTime.boxToBoolean((boolean)this.isLocalController()));
        this.setUpChannel(this.channel(), 123);
        this.connManager_$eq(new ClusterLinkOutboundConnectionManager(this){
            private final /* synthetic */ ClusterLinkOutboundConnectionManagerTest $outer;

            public ConfluentAdmin createLocalAdmin() {
                Map<String, String> configs = Collections.singletonMap("bootstrap.servers", "localhost:9092");
                AdminClientConfig config = new AdminClientConfig(configs);
                AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 1000L, 300000L, false);
                List<Node> nodes = Collections.singletonList(new Node(1, "host1", 123));
                Cluster cluster = new Cluster(this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
                adminMetadataManager.update(cluster, this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time().milliseconds());
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin_$eq(ConfluentAdminUtils.createConfluentAdmin((AdminClientConfig)config, (AdminMetadataManager)adminMetadataManager, (KafkaClient)this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient(), (Time)this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time()));
                return this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin();
            }

            public RemoteNetworkClient createRemoteAdmin() {
                NetworkClient networkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
                SourceReverseConnectionManager reverseConnectionManager = (SourceReverseConnectionManager)Mockito.mock(SourceReverseConnectionManager.class);
                ClusterLinkMetadata metadata = (ClusterLinkMetadata)Mockito.mock(ClusterLinkMetadata.class);
                ClusterLinkMetadataThread metadataThread = (ClusterLinkMetadataThread)Mockito.mock(ClusterLinkMetadataThread.class);
                Mockito.when((Object)networkClient.reverseConnectionManager()).thenReturn((Object)reverseConnectionManager);
                Mockito.when((Object)metadataThread.clusterLinkMetadata()).thenReturn((Object)metadata);
                Mockito.when((Object)metadataThread.remoteLinkCoordinator()).thenAnswer(invocation -> Option$.MODULE$.apply((Object)$this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteCluster().controller()));
                Mockito.when((Object)metadata.fetch()).thenAnswer(invocation -> $this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteCluster());
                Mockito.when((Object)BoxesRunTime.boxToInteger((int)metadata.requestUpdate())).thenAnswer(invocation -> BoxesRunTime.boxToInteger((int)$anon$1.$anonfun$createRemoteAdmin$3(this, invocation)));
                Assertions.assertNull((Object)this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin_$eq(new RemoteNetworkClient(networkClient, metadataThread));
                return this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
            }

            public void closeReverseConnectionAdmin() {
                super.closeReverseConnectionAdmin();
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin_$eq(null);
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin_$eq(null);
            }

            public boolean closeReverseChannel(ReverseChannel reverseChannel) {
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels().$plus$eq((Object)reverseChannel);
                return super.closeReverseChannel(reverseChannel);
            }

            public static final /* synthetic */ int $anonfun$createRemoteAdmin$3($anon$1 $this, InvocationOnMock invocation) {
                return $this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().incrementAndGet();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super($outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId(), (Option)None$.MODULE$, $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkManager().selectorMetricsRegistry(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$quota(), (Time)$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time(), true);
            }
        });
    }

    private boolean setupConnectionManager$default$2() {
        return false;
    }

    private void setUpChannel(KafkaChannel channel, int channelId) {
        Mockito.when((Object)channel.id()).thenReturn((Object)Integer.toString(channelId));
        SocketChannel socketChannel = (SocketChannel)Mockito.mock(SocketChannel.class);
        Mockito.when((Object)channel.socketChannel()).thenReturn((Object)socketChannel);
    }

    private ReverseNode reverseNode(int nodeId, int requestId) {
        return new ReverseNode(nodeId, nodeId, new StringBuilder(4).append("host").append(nodeId).toString(), 1234, this.linkId(), requestId, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS, Optional.empty(), null);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public Cluster kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteCluster() {
        ArrayList<Node> nodes = new ArrayList<Node>();
        nodes.add(new Node(1, "host1", 123));
        Option remoteControllerNode = this.remoteControllerNodeId().map((Function1 & Serializable)id -> ClusterLinkOutboundConnectionManagerTest.$anonfun$remoteCluster$1(BoxesRunTime.unboxToInt((Object)id)));
        remoteControllerNode.foreach((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)nodes.add(x$1)));
        return new Cluster(this.destClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), (Node)remoteControllerNode.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
    }

    private int initiateReverseConnectionRequestCount() {
        return CollectionConverters$.MODULE$.CollectionHasAsScala((Collection)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient().requests()).asScala().count((Function1 & Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManagerTest.$anonfun$initiateReverseConnectionRequestCount$1(x$8)));
    }

    public static final /* synthetic */ boolean $anonfun$testPersistentConnection$1(ClusterLinkOutboundConnectionManagerTest $this) {
        return $this.initiateReverseConnectionRequestCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testPersistentConnection$2() {
        return "Persistent connection not initiated";
    }

    public static final /* synthetic */ void $anonfun$testReconfigureWithAsyncClose$3(ReverseChannel c) {
        c.closeListener().accept(c.channel());
    }

    public static final /* synthetic */ Node $anonfun$remoteCluster$1(int id) {
        return new Node(id, new StringBuilder(4).append("host").append(id).toString(), 123);
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnectionRequestCount$1(ClientRequest x$8) {
        ApiKeys apiKeys = x$8.apiKey();
        ApiKeys apiKeys2 = ApiKeys.INITIATE_REVERSE_CONNECTIONS;
        return !(apiKeys != null ? !apiKeys.equals(apiKeys2) : apiKeys2 != null);
    }

    public ClusterLinkOutboundConnectionManagerTest() {
        this.linkName = "testLink";
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId = "sourceCluster";
        this.destClusterId = "destCluster";
    }
}

