/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import kafka.network.SocketServer;
import kafka.server.ClusterLinkRequestQuota;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkChannelContext;
import kafka.server.link.ClusterLinkClientType$OutboundConnectionAdmin$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ClusterLinkSelectorMetricsRegistry;
import kafka.server.link.ClusterLinkUtils$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.CoordinatorListener;
import kafka.server.link.LazyResource;
import kafka.server.link.RemoteNetworkClient;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005\r\rg\u0001B\"E\u0001-C\u0011B\u0017\u0001\u0003\u0002\u0003\u0006IaW1\t\u0011\t\u0004!\u0011!Q\u0001\n\rD\u0011B\u001a\u0001\u0003\u0002\u0003\u0006Ia\u001a;\t\u0011U\u0004!\u0011!Q\u0001\nYD!\"a\u0003\u0001\u0005\u0003\u0005\u000b\u0011BA\u0007\u0011)\t\u0019\u0002\u0001B\u0001B\u0003%\u0011Q\u0003\u0005\u000b\u00037\u0001!\u0011!Q\u0001\n\u0005u\u0001BCA\u0012\u0001\t\u0005\t\u0015!\u0003\u0002&!Q\u0011\u0011\u0007\u0001\u0003\u0002\u0003\u0006I!a\r\t\u0015\u0005m\u0002A!A!\u0002\u0013\ti\u0004\u0003\u0006\u0002L\u0001\u0011\t\u0011)A\u0005\u0003\u001bB!\"a\u0015\u0001\u0005\u0003\u0005\u000b\u0011BA+\u0011)\ti\u0006\u0001B\u0001B\u0003%\u0011q\f\u0005\u000b\u0003_\u0002!\u0011!Q\u0001\n\u0005E\u0004BCA<\u0001\t\u0005\t\u0015!\u0003\u0002z!9\u0011q\u0010\u0001\u0005\u0002\u0005\u0005\u0005\"CAR\u0001\t\u0007I\u0011BAS\u0011!\t9\f\u0001Q\u0001\n\u0005\u001d\u0006\"CA]\u0001\t\u0007I\u0011BA^\u0011!\ti\u000e\u0001Q\u0001\n\u0005u\u0006\"CAp\u0001\t\u0007I\u0011BA^\u0011!\t\t\u000f\u0001Q\u0001\n\u0005u\u0006\"CAr\u0001\t\u0007I\u0011BAs\u0011!\t\u0019\u0010\u0001Q\u0001\n\u0005\u001d\bbCA{\u0001\u0001\u0007\t\u0019!C\u0005\u0003oD1B!\u0001\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003\u0004!Y!q\u0002\u0001A\u0002\u0003\u0005\u000b\u0015BA}\u0011-\u0011I\u0002\u0001a\u0001\u0002\u0004%IAa\u0007\t\u0017\t\r\u0002\u00011AA\u0002\u0013%!Q\u0005\u0005\f\u0005S\u0001\u0001\u0019!A!B\u0013\u0011i\u0002C\u0006\u0003.\u0001\u0001\r\u00111A\u0005\n\t=\u0002b\u0003B\u001f\u0001\u0001\u0007\t\u0019!C\u0005\u0005\u007fA1Ba\u0011\u0001\u0001\u0004\u0005\t\u0015)\u0003\u00032!I!q\t\u0001A\u0002\u0013%!\u0011\n\u0005\n\u0005'\u0002\u0001\u0019!C\u0005\u0005+B\u0001B!\u0017\u0001A\u0003&!1\n\u0005\b\u0005;\u0002A\u0011\tB0\u0011!\u0011\t\u0007\u0001C!\t\n\r\u0004b\u0002B:\u0001\u0011\u0005#q\f\u0005\b\u0005k\u0002A\u0011\tB<\u0011\u001d\u0011\u0019\n\u0001C!\u0005+CqAa4\u0001\t\u0003\u0012\t\u000eC\u0004\u0003h\u0002!IA!;\t\u000f\r\u001d\u0001\u0001\"\u0003\u0004\n!91q\u0004\u0001\u0005\n\r\u0005\u0002bBB\u0015\u0001\u0011\u000531\u0006\u0005\b\u0007o\u0001A\u0011IB\u001d\u0011\u001d\u0019y\u0004\u0001C!\u0005?Bqa!\u0011\u0001\t\u0013\u0011y\u0006C\u0004\u0004D\u0001!\tFa\u0018\t\u000f\r\u0015\u0003\u0001\"\u0015\u0003`!A1q\t\u0001\u0005\u0002\u0011\u001bI\u0005\u0003\u0005\u0004L\u0001!\t\u0001RB'\u0011\u001d\u0019y\u0005\u0001C\u0005\u0007#Bqaa\u0016\u0001\t\u0013\u0019I\u0006C\u0004\u0004b\u0001!IAa\u0018\t\u000f\r\u0005\u0004\u0001\"\u0003\u0004d!91q\r\u0001\u0005\n\t}\u0003bBB5\u0001\u0011%!q\f\u0005\b\u0007W\u0002A\u0011\u0002B0\u0011\u001d\u0019i\u0007\u0001C\u0005\u0007_Bqa!\"\u0001\t#\u00199\t\u0003\u0005\u0004\u000e\u0002!\t\u0001RBH\u0011\u001d\u0019I\n\u0001C!\u00077Cqa!(\u0001\t\u0003\u001aY\nC\u0004\u0004 \u0002!\te!)\u0003I\rcWo\u001d;fe2Kgn[(vi\n|WO\u001c3D_:tWm\u0019;j_:l\u0015M\\1hKJT!!\u0012$\u0002\t1Lgn\u001b\u0006\u0003\u000f\"\u000baa]3sm\u0016\u0014(\"A%\u0002\u000b-\fgm[1\u0004\u0001M!\u0001\u0001\u0014)X!\tie*D\u0001E\u0013\tyEI\u0001\u000fDYV\u001cH/\u001a:MS:\\7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0011\u0005E#fBA'S\u0013\t\u0019F)\u0001\nDYV\u001cH/\u001a:MS:\\g)Y2u_JL\u0018BA+W\u0005eyU\u000f\u001e2pk:$7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u000b\u0005M#\u0005CA'Y\u0013\tIFIA\nD_>\u0014H-\u001b8bi>\u0014H*[:uK:,'/\u0001\u0005mS:\\G)\u0019;b!\tav,D\u0001^\u0015\tq\u0006*\u0001\u0002{W&\u0011\u0001-\u0018\u0002\u0010\u00072,8\u000f^3s\u0019&t7\u000eR1uC&\u0011!LT\u0001\u000eS:LG/[1m\u0007>tg-[4\u0011\u00055#\u0017BA3E\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u0014Y>\u001c\u0017\r\u001c'pO&\u001c\u0017\r\\\"mkN$XM\u001d\t\u0003QFt!![8\u0011\u0005)lW\"A6\u000b\u00051T\u0015A\u0002\u001fs_>$hHC\u0001o\u0003\u0015\u00198-\u00197b\u0013\t\u0001X.\u0001\u0004Qe\u0016$WMZ\u0005\u0003eN\u0014aa\u0015;sS:<'B\u00019n\u0013\t1g*A\tdY&,g\u000e^%oi\u0016\u00148-\u001a9u_J\u00042a\u001e={\u001b\u0005i\u0017BA=n\u0005\u0019y\u0005\u000f^5p]B\u001910a\u0002\u000e\u0003qT!! @\u0002\u000f\rd\u0017.\u001a8ug*\u0011\u0011j \u0006\u0005\u0003\u0003\t\u0019!\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0003\u0003\u000b\t1a\u001c:h\u0013\r\tI\u0001 \u0002\u0012\u00072LWM\u001c;J]R,'oY3qi>\u0014\u0018aB7fiJL7m\u001d\t\u0004\u001b\u0006=\u0011bAA\t\t\n\u00112\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0003]\u0019X\r\\3di>\u0014X*\u001a;sS\u000e\u001c(+Z4jgR\u0014\u0018\u0010E\u0002N\u0003/I1!!\u0007E\u0005\t\u001aE.^:uKJd\u0015N\\6TK2,7\r^8s\u001b\u0016$(/[2t%\u0016<\u0017n\u001d;ss\u0006yQ.\u001a;bI\u0006$\u0018-T1oC\u001e,'\u000fE\u0002N\u0003?I1!!\tE\u0005i\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u00031\u0019xnY6fiN+'O^3s!\u0011\t9#!\f\u000e\u0005\u0005%\"bAA\u0016\u0011\u00069a.\u001a;x_J\\\u0017\u0002BA\u0018\u0003S\u0011AbU8dW\u0016$8+\u001a:wKJ\fAB\u0019:pW\u0016\u00148i\u001c8gS\u001e\u0004B!!\u000e\u000285\ta)C\u0002\u0002:\u0019\u00131bS1gW\u0006\u001cuN\u001c4jO\u0006Q1/\u001a:wKJLeNZ8\u0011\t\u0005}\u0012qI\u0007\u0003\u0003\u0003RA!a\u0011\u0002F\u0005Q\u0011-\u001e;i_JL'0\u001a:\u000b\u0005\u001ds\u0018\u0002BA%\u0003\u0003\u0012A#Q;uQ>\u0014\u0018N_3s'\u0016\u0014h/\u001a:J]\u001a|\u0017!B9v_R\f\u0007\u0003BA\u001b\u0003\u001fJ1!!\u0015G\u0005]\u0019E.^:uKJd\u0015N\\6SKF,Xm\u001d;Rk>$\u0018-A\rdYV\u001cH/\u001a:MS:\\7\t[1o]\u0016d7i\u001c8uKb$\b\u0003B<y\u0003/\u00022!TA-\u0013\r\tY\u0006\u0012\u0002\u001a\u00072,8\u000f^3s\u0019&t7n\u00115b]:,GnQ8oi\u0016DH/\u0001\u0003uS6,\u0007\u0003BA1\u0003Wj!!a\u0019\u000b\t\u0005\u0015\u0014qM\u0001\u0006kRLGn\u001d\u0006\u0004\u0003Sr\u0018AB2p[6|g.\u0003\u0003\u0002n\u0005\r$\u0001\u0002+j[\u0016\f\u0001$\u001a8bE2,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t!\r9\u00181O\u0005\u0004\u0003kj'a\u0002\"p_2,\u0017M\\\u0001\"S:$(/\u00198fi\u000e{gN\\3di&4\u0018\u000e^=EK:LW\rZ\"iK\u000e\\WM\u001d\t\u0006o\u0006m\u0014\u0011O\u0005\u0004\u0003{j'!\u0003$v]\u000e$\u0018n\u001c81\u0003\u0019a\u0014N\\5u}Q\u0001\u00131QAC\u0003\u000f\u000bI)a#\u0002\u000e\u0006=\u0015\u0011SAJ\u0003+\u000b9*!'\u0002\u001c\u0006u\u0015qTAQ!\ti\u0005\u0001C\u0003[!\u0001\u00071\fC\u0003c!\u0001\u00071\rC\u0003g!\u0001\u0007q\rC\u0003v!\u0001\u0007a\u000fC\u0004\u0002\fA\u0001\r!!\u0004\t\u000f\u0005M\u0001\u00031\u0001\u0002\u0016!9\u00111\u0004\tA\u0002\u0005u\u0001bBA\u0012!\u0001\u0007\u0011Q\u0005\u0005\b\u0003c\u0001\u0002\u0019AA\u001a\u0011\u001d\tY\u0004\u0005a\u0001\u0003{Aq!a\u0013\u0011\u0001\u0004\ti\u0005C\u0004\u0002TA\u0001\r!!\u0016\t\u000f\u0005u\u0003\u00031\u0001\u0002`!9\u0011q\u000e\tA\u0002\u0005E\u0004bBA<!\u0001\u0007\u0011\u0011P\u0001\u0015G>tg.Z2uS>tW\u000b\u001d3bi\u0016dunY6\u0016\u0005\u0005\u001d\u0006\u0003BAU\u0003gk!!a+\u000b\t\u00055\u0016qV\u0001\u0005Y\u0006twM\u0003\u0002\u00022\u0006!!.\u0019<b\u0013\u0011\t),a+\u0003\r=\u0013'.Z2u\u0003U\u0019wN\u001c8fGRLwN\\+qI\u0006$X\rT8dW\u0002\nQ\u0003]3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t7/\u0006\u0002\u0002>BA\u0011qXAe\u0003\u001b\f\u0019.\u0004\u0002\u0002B*!\u00111YAc\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0005\u0003\u000f\fy+\u0001\u0003vi&d\u0017\u0002BAf\u0003\u0003\u0014\u0011cQ8oGV\u0014(/\u001a8u\u0011\u0006\u001c\b.T1q!\r9\u0018qZ\u0005\u0004\u0003#l'aA%oiB!\u0011Q[Am\u001b\t\t9N\u0003\u0003\u0002,\u0005\u001d\u0014\u0002BAn\u0003/\u0014aBU3wKJ\u001cXm\u00115b]:,G.\u0001\fqKJ\u001c\u0018n\u001d;f]R\u001cuN\u001c8fGRLwN\\:!\u0003a\t7\r^5wKJ+g/\u001a:tK\u000e{gN\\3di&|gn]\u0001\u001aC\u000e$\u0018N^3SKZ,'o]3D_:tWm\u0019;j_:\u001c\b%A\u0011m_\u000e\fGNU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0019&\u001cH/\u001a8fe6\u000b\u0007/\u0006\u0002\u0002hB1\u0011\u0011^AxO\u001el!!a;\u000b\u0007\u00055X.\u0001\u0006d_2dWm\u0019;j_:LA!!=\u0002l\n\u0019Q*\u00199\u0002E1|7-\u00197SKZ,'o]3D_:tWm\u0019;j_:d\u0015n\u001d;f]\u0016\u0014X*\u00199!\u0003Qa\u0017N\\6MSN$XM\\3s\u000b:$\u0007o\\5oiV\u0011\u0011\u0011 \t\u0005\u0003w\fi0\u0004\u0002\u0002h%!\u0011q`A4\u0005!)e\u000e\u001a9pS:$\u0018\u0001\u00077j].d\u0015n\u001d;f]\u0016\u0014XI\u001c3q_&tGo\u0018\u0013fcR!!Q\u0001B\u0006!\r9(qA\u0005\u0004\u0005\u0013i'\u0001B+oSRD\u0011B!\u0004\u001b\u0003\u0003\u0005\r!!?\u0002\u0007a$\u0013'A\u000bmS:\\G*[:uK:,'/\u00128ea>Lg\u000e\u001e\u0011)\u0007m\u0011\u0019\u0002E\u0002x\u0005+I1Aa\u0006n\u0005!1x\u000e\\1uS2,\u0017\u0001\u00057j].d\u0015n\u001d;f]\u0016\u0014h*Y7f+\t\u0011i\u0002\u0005\u0003\u0002V\n}\u0011\u0002\u0002B\u0011\u0003/\u0014A\u0002T5ti\u0016tWM\u001d(b[\u0016\fA\u0003\\5oW2K7\u000f^3oKJt\u0015-\\3`I\u0015\fH\u0003\u0002B\u0003\u0005OA\u0011B!\u0004\u001e\u0003\u0003\u0005\rA!\b\u0002#1Lgn\u001b'jgR,g.\u001a:OC6,\u0007\u0005K\u0002\u001f\u0005'\t!\u0002\\8dC2\fE-\\5o+\t\u0011\t\u0004\u0005\u0003\u00034\teRB\u0001B\u001b\u0015\r\u00119\u0004`\u0001\u0006C\u0012l\u0017N\\\u0005\u0005\u0005w\u0011)D\u0001\bD_:4G.^3oi\u0006#W.\u001b8\u0002\u001d1|7-\u00197BI6Lgn\u0018\u0013fcR!!Q\u0001B!\u0011%\u0011i\u0001IA\u0001\u0002\u0004\u0011\t$A\u0006m_\u000e\fG.\u00113nS:\u0004\u0003fA\u0011\u0003\u0014\u0005\u0019\"/Z7pi\u0016tU\r^<pe.\u001cE.[3oiV\u0011!1\n\t\u0005ob\u0014i\u0005E\u0002N\u0005\u001fJ1A!\u0015E\u0005M\u0011V-\\8uK:+Go^8sW\u000ec\u0017.\u001a8u\u0003]\u0011X-\\8uK:+Go^8sW\u000ec\u0017.\u001a8u?\u0012*\u0017\u000f\u0006\u0003\u0003\u0006\t]\u0003\"\u0003B\u0007G\u0005\u0005\t\u0019\u0001B&\u0003Q\u0011X-\\8uK:+Go^8sW\u000ec\u0017.\u001a8uA!\u001aAEa\u0005\u0002\u000fM$\u0018M\u001d;vaR\u0011!QA\u0001\fe\u0016\u001cwN\u001c4jOV\u0014X\r\u0006\u0004\u0003\u0006\t\u0015$\u0011\u000e\u0005\u0007\u0005O2\u0003\u0019A2\u0002\u00139,woQ8oM&<\u0007b\u0002B6M\u0001\u0007!QN\u0001\fkB$\u0017\r^3e\u0017\u0016L8\u000fE\u0003\u0002j\n=t-\u0003\u0003\u0003r\u0005-(aA*fi\u0006a1\r\\8tK\u000ec\u0017.\u001a8ug\u0006\tRM\\1cY\u0016\u001cE.^:uKJd\u0015N\\6\u0015\r\t\u0015!\u0011\u0010BB\u0011\u001d\u0011Y\b\u000ba\u0001\u0005{\nQB\\3uo>\u00148n\u00117jK:$\bcA'\u0003\u0000%\u0019!\u0011\u0011#\u00031\rcWo\u001d;fe2Kgn\u001b(fi^|'o[\"mS\u0016tG\u000fC\u0004\u0002\u001c!\u0002\rA!\"\u0011\t]D(q\u0011\t\u0005\u0005\u0013\u0013y)\u0004\u0002\u0003\f*!!Q\u0012B\u001b\u0003%Ig\u000e^3s]\u0006d7/\u0003\u0003\u0003\u0012\n-%\u0001F!e[&tW*\u001a;bI\u0006$\u0018-T1oC\u001e,'/\u0001\u000ej]&$\u0018.\u0019;f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000f\u0006\u0004\u0003\u0018\nU&Q\u0019\t\u0007\u00053\u0013\u0019K!+\u000f\t\tm%q\u0014\b\u0004U\nu\u0015\"\u00018\n\u0007\t\u0005V.A\u0004qC\u000e\\\u0017mZ3\n\t\t\u0015&q\u0015\u0002\u0004'\u0016\f(b\u0001BQ[B1\u0011q\u0018BV\u0005_KAA!,\u0002B\n\t2i\\7qY\u0016$\u0018M\u00197f\rV$XO]3\u0011\t\u0005%&\u0011W\u0005\u0005\u0005g\u000bYK\u0001\u0003W_&$\u0007b\u0002B\\S\u0001\u0007!\u0011X\u0001\u001aS:LG/[1uK\u000e{gN\\3di&|gNU3rk\u0016\u001cH\u000f\u0005\u0003\u0003<\n\u0005WB\u0001B_\u0015\u0011\u0011y,a\u001a\u0002\u0011I,\u0017/^3tiNLAAa1\u0003>\n\t\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+\u0017/^3ti\"9!qY\u0015A\u0002\t%\u0017A\u0004:fcV,7\u000f^\"p]R,\u0007\u0010\u001e\t\u0005\u0005w\u0013Y-\u0003\u0003\u0003N\nu&A\u0004*fcV,7\u000f^\"p]R,\u0007\u0010^\u0001\u0014_:\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u000b\u0007\u0005\u000b\u0011\u0019N!8\t\u000f\tU'\u00061\u0001\u0003X\u000691\r[1o]\u0016d\u0007\u0003BAk\u00053LAAa7\u0002X\na1*\u00194lC\u000eC\u0017M\u001c8fY\"9!q\u001c\u0016A\u0002\t\u0005\u0018a\u0003:fm\u0016\u00148/\u001a(pI\u0016\u0004B!!6\u0003d&!!Q]Al\u0005-\u0011VM^3sg\u0016tu\u000eZ3\u0002#=t7i\u001c8oK\u000e$\u0018n\u001c8DY>\u001cX\r\u0006\u0006\u0003\u0006\t-(Q\u001eB\u0000\u0007\u0007AqA!6,\u0001\u0004\u00119\u000eC\u0004\u0003p.\u0002\rA!=\u0002\u0013I,\u0017/^3ti&#\u0007C\u0002Bz\u0005k\u0014I0\u0004\u0002\u0002F&!!q_Ac\u0005!y\u0005\u000f^5p]\u0006d\u0007\u0003BAU\u0005wLAA!@\u0002,\n9\u0011J\u001c;fO\u0016\u0014\bbBB\u0001W\u0001\u0007\u0011QZ\u0001\u000fe\u0016lw\u000e^3Ce>\\WM]%e\u0011\u001d\u0019)a\u000ba\u0001\u0003c\n!$\u001e9eCR,W*\u001a;bI\u0006$\u0018-\u00134QKJ\u001c\u0018n\u001d;f]R\faCZ8so\u0006\u0014H\rV8T_V\u00148-\u001a\"s_.,'o\u001d\u000b\u0007\u0005\u000b\u0019Yaa\u0007\t\u000f\r5A\u00061\u0001\u0004\u0010\u0005Y!/Z9vKN$H)\u0019;b!\u0011\u0019\tba\u0006\u000e\u0005\rM!\u0002BB\u000b\u0003O\nq!\\3tg\u0006<W-\u0003\u0003\u0004\u001a\rM!!J%oSRL\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\u001c(+Z9vKN$H)\u0019;b\u0011\u001d\u0019i\u0002\fa\u0001\u0005/\u000bqAZ;ukJ,7/\u0001\rde\u0016\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N$\u0002B!\u0002\u0004$\r\u00152q\u0005\u0005\b\u0007\u001bi\u0003\u0019AB\b\u0011\u001d\u00119-\fa\u0001\u0005\u0013Dqa!\b.\u0001\u0004\u00119*\u0001\u000ep]:+wOU3n_R,G*\u001b8l\u0007>|'\u000fZ5oCR|'\u000f\u0006\u0003\u0003\u0006\r5\u0002bBB\u0018]\u0001\u00071\u0011G\u0001\fG>|'\u000fZ5oCR|'\u000f\u0005\u0003\u0002|\u000eM\u0012\u0002BB\u001b\u0003O\u0012AAT8eK\u0006\u0011rN\\\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\\4f)\u0011\u0011)aa\u000f\t\u000f\rur\u00061\u0001\u0002r\u0005A\u0011n]!di&4X-A\u0012p]2Kgn['fi\u0006$\u0017\r^1QCJ$\u0018\u000e^5p]2+\u0017\rZ3s\u0007\"\fgnZ3\u0002;5\f\u0017PY3Qe>\u001cWm]:D_>\u0014H-\u001b8bi>\u00148\t[1oO\u0016\f1d\u00197pg\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\!e[&t\u0017\u0001H2sK\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u0012GJ,\u0017\r^3SK6|G/Z!e[&tGC\u0001B'\u0003A\u0019'/Z1uK2{7-\u00197BI6Lg\u000e\u0006\u0002\u00032\u0005\u0011R\u000f\u001d3bi\u0016d\u0015N\\6MSN$XM\\3s)\u0011\u0011)aa\u0015\t\r\rUc\u00071\u0001d\u0003\u0019\u0019wN\u001c4jO\u0006a!/\u001a<feN\fG\u000eR1uCV\u001111\f\t\u0005\u0007#\u0019i&\u0003\u0003\u0004`\rM!\u0001\b*fm\u0016\u00148/Z\"p]:,7\r^5p]J+\u0017/^3ti\u0012\u000bG/Y\u0001 [\u0006L(-Z\"sK\u0006$X\rU3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>tG\u0003\u0002B\u0003\u0007KBqaa\f:\u0001\u0004\u0019\t$A\u000bsKF,Xm\u001d;NKR\fG-\u0019;b+B$\u0017\r^3\u00025\rdwn]3QKJ\u001c\u0018n\u001d;f]R\u001cuN\u001c8fGRLwN\\:\u0002;\rdwn]3BGRLg/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N\f\u0001c]8dW\u0016$8\t[1o]\u0016d7*Z=\u0015\t\u000557\u0011\u000f\u0005\b\u0007gj\u0004\u0019AB;\u00035\u0019xnY6fi\u000eC\u0017M\u001c8fYB!1qOBA\u001b\t\u0019IH\u0003\u0003\u0004|\ru\u0014\u0001C2iC:tW\r\\:\u000b\t\r}\u0014qV\u0001\u0004]&|\u0017\u0002BBB\u0007s\u0012QbU8dW\u0016$8\t[1o]\u0016d\u0017aE2m_N,'+\u001a<feN,7\t[1o]\u0016dG\u0003BA9\u0007\u0013Cqaa#?\u0001\u0004\t\u0019.\u0001\bsKZ,'o]3DQ\u0006tg.\u001a7\u0002\u001d5,G/\u00193bi\u0006$\u0006N]3bIV\u00111\u0011\u0013\t\u0005ob\u001c\u0019\nE\u0002N\u0007+K1aa&E\u0005e\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b)\"\u0014X-\u00193\u00023A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8o\u0007>,h\u000e^\u000b\u0003\u0003\u001b\faC]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0007>,h\u000e^\u0001\u000eY\u0006T\u0018PU3t_V\u00148-Z:\u0016\u0005\r\r\u0006C\u0002BM\u0005G\u001b)\u000b\r\u0003\u0004(\u000eE\u0006#B'\u0004*\u000e5\u0016bABV\t\naA*\u0019>z%\u0016\u001cx.\u001e:dKB!1qVBY\u0019\u0001!1ba-C\u0003\u0003\u0005\tQ!\u0001\u00046\n\u0019q\fJ\u0019\u0012\t\r]6Q\u0018\t\u0004o\u000ee\u0016bAB^[\n9aj\u001c;iS:<\u0007cA<\u0004@&\u00191\u0011Y7\u0003\u0007\u0005s\u0017\u0010")
public class ClusterLinkOutboundConnectionManager
extends ClusterLinkConnectionManager
implements ClusterLinkFactory.OutboundConnectionManager,
CoordinatorListener {
    private final ClusterLinkConfig initialConfig;
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final ClusterLinkSelectorMetricsRegistry selectorMetricsRegistry;
    private final ClusterLinkMetadataManager metadataManager;
    private final SocketServer socketServer;
    private final KafkaConfig brokerConfig;
    private final AuthorizerServerInfo serverInfo;
    private final ClusterLinkRequestQuota quota;
    private final Option<ClusterLinkChannelContext> clusterLinkChannelContext;
    private final Time time;
    private final boolean enableReverseConnections;
    private final Function0<Object> intranetConnectivityDeniedChecker;
    private final Object connectionUpdateLock;
    private final ConcurrentHashMap<Object, ReverseChannel> persistentConnections;
    private final ConcurrentHashMap<Object, ReverseChannel> activeReverseConnections;
    private final Map<String, String> localReverseConnectionListenerMap;
    private volatile Endpoint linkListenerEndpoint;
    private volatile ListenerName linkListenerName;
    private volatile ConfluentAdmin localAdmin;
    private volatile Option<RemoteNetworkClient> remoteNetworkClient;

    private Object connectionUpdateLock() {
        return this.connectionUpdateLock;
    }

    private ConcurrentHashMap<Object, ReverseChannel> persistentConnections() {
        return this.persistentConnections;
    }

    private ConcurrentHashMap<Object, ReverseChannel> activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Map<String, String> localReverseConnectionListenerMap() {
        return this.localReverseConnectionListenerMap;
    }

    private Endpoint linkListenerEndpoint() {
        return this.linkListenerEndpoint;
    }

    private void linkListenerEndpoint_$eq(Endpoint x$1) {
        this.linkListenerEndpoint = x$1;
    }

    private ListenerName linkListenerName() {
        return this.linkListenerName;
    }

    private void linkListenerName_$eq(ListenerName x$1) {
        this.linkListenerName = x$1;
    }

    private ConfluentAdmin localAdmin() {
        return this.localAdmin;
    }

    private void localAdmin_$eq(ConfluentAdmin x$1) {
        this.localAdmin = x$1;
    }

    private Option<RemoteNetworkClient> remoteNetworkClient() {
        return this.remoteNetworkClient;
    }

    private void remoteNetworkClient_$eq(Option<RemoteNetworkClient> x$1) {
        this.remoteNetworkClient = x$1;
    }

    @Override
    public void startup() {
        ConnectionMode connectionMode = this.initialConfig.connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Inbound$)) {
            throw new IllegalStateException("Outbound connection manager created in inbound connection mode");
        }
        super.startup();
    }

    @Override
    public void reconfigure(ClusterLinkConfig newConfig, Set<String> updatedKeys) {
        if (updatedKeys.contains((Object)ClusterLinkConfig$.MODULE$.LocalListenerNameProp())) {
            this.updateLinkListener(this.currentConfig());
        }
        super.reconfigure(newConfig, updatedKeys);
        if (updatedKeys.exists((Function1 & Serializable)configName -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkConfig$.MODULE$.needsConnectionResetOnUpdate(configName)))) {
            this.closeActiveReverseConnections();
            return;
        }
    }

    @Override
    public void closeClients() {
        super.closeClients();
        this.closeActiveReverseConnections();
    }

    @Override
    public void enableClusterLink(ClusterLinkNetworkClient networkClient, Option<AdminMetadataManager> metadataManager) {
        KafkaClient kafkaClient = networkClient.networkClient();
        if (kafkaClient instanceof NetworkClient) {
            ((NetworkClient)kafkaClient).enableClusterLinkRequests(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()), null);
            return;
        }
    }

    public Seq<CompletableFuture<Void>> initiateReverseConnections(InitiateReverseConnectionsRequest initiateConnectionRequest, RequestContext requestContext) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(48).append("Initiate or forward reverse connection request: ").append(initiateConnectionRequest).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData connData = initiateConnectionRequest.data();
        List futures = (List)package$.MODULE$.List().fill(connData.entries().size(), (Function0 & Serializable)() -> new CompletableFuture());
        try {
            String string = connData.sourceClusterId();
            String string2 = super.localLogicalCluster();
            if (string == null ? string2 != null : !string.equals(string2)) {
                throw new InvalidRequestException(new StringBuilder(67).append("Initiate reverse request for cluster ").append(connData.sourceClusterId()).append(" sent to wrong source cluster ").append(super.localLogicalCluster()).toString());
            }
            if (connData.forwardToBroker()) {
                this.forwardToSourceBrokers(connData, (Seq<CompletableFuture<Void>>)futures);
            } else {
                this.createReverseConnections(connData, requestContext, (Seq<CompletableFuture<Void>>)futures);
            }
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable)() -> "Failing reverse connection request", (Function0<Throwable>)(Function0 & Serializable)() -> e);
            futures.foreach((Function1 & Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)x$4.completeExceptionally(e)));
        }
        return futures;
    }

    public void onReverseConnection(KafkaChannel channel2, ReverseNode reverseNode) {
        Optional requestId = reverseNode.requestId();
        int remoteBrokerId = reverseNode.remoteBrokerId();
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(78).append("Destination has successfully reversed channel ").append(channel2).append(" with requestId ").append(requestId).append(" remoteBrokerId ").append(remoteBrokerId).toString());
        this.ensureReverseConnectionsEnabled();
        if (!requestId.isPresent() && !this.isLinkCoordinator()) {
            String errorMessage = new StringBuilder(88).append("Discarding persistent reverse connection since broker ").append(this.brokerConfig.brokerId()).append(" is no longer the link coordinator").toString();
            this.debug((Function0<String>)(Function0 & Serializable)() -> errorMessage);
            throw new NotControllerException(errorMessage);
        }
        SocketChannel socketChannel = channel2.socketChannel();
        ReverseChannel reverseChannel = new ReverseChannel(channel2, reverseNode, channel -> this.onConnectionClose((KafkaChannel)channel, reverseNode.requestId(), reverseNode.remoteBrokerId(), true));
        Object object = this.connectionUpdateLock();
        synchronized (object) {
            this.activeReverseConnections().put(BoxesRunTime.boxToInteger((int)System.identityHashCode(socketChannel)), reverseChannel);
            if (!requestId.isPresent()) {
                if (Option$.MODULE$.apply((Object)this.persistentConnections().get(BoxesRunTime.boxToInteger((int)remoteBrokerId))).exists((Function1 & Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManager.$anonfun$onReverseConnection$4(x$5)))) {
                    this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(71).append("Ignoring persistent connection because a connection already exists for ").append(remoteBrokerId).toString());
                    throw new IllegalStateException(new StringBuilder(41).append("A persistent connection is available for ").append(remoteBrokerId).toString());
                }
                this.persistentConnections().put(BoxesRunTime.boxToInteger((int)remoteBrokerId), reverseChannel);
                this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(43).append("Created persistent connection to ").append(remoteBrokerId).append(", channel=").append(channel2).toString());
            }
        }
        this.metrics.reverseConnectionCreatedSensor().record();
        this.metrics.deprecatedReverseConnectionCreatedSensor().record();
        this.socketServer.reverseAndAdd(this.linkListenerName(), reverseChannel);
        this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(61).append("Added reverse connection ").append(channel2).append(" to source socket server, requestId=").append(requestId).toString());
    }

    private void onConnectionClose(KafkaChannel channel, Optional<Integer> requestId, int remoteBrokerId, boolean updateMetadataIfPersistent) {
        boolean bl;
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(38).append("Reverse channel ").append(channel).append(" has been disconnected").toString());
        Object object = this.connectionUpdateLock();
        synchronized (object) {
            boolean bl2;
            if (this.activeReverseConnections().remove(BoxesRunTime.boxToInteger((int)System.identityHashCode(channel.socketChannel()))) != null) {
                this.metrics.reverseConnectionClosedSensor().record();
                this.metrics.deprecatedReverseConnectionClosedSensor().record();
            }
            if (!requestId.isPresent() && this.persistentConnections().remove(BoxesRunTime.boxToInteger((int)remoteBrokerId)) != null) {
                this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(62).append("Removed persistent connection for ").append(remoteBrokerId).append(" because channel ").append(channel.id()).append(" was closed").toString());
                bl2 = true;
            } else {
                bl2 = false;
            }
            bl = bl2;
        }
        if (updateMetadataIfPersistent && bl) {
            this.requestMetadataUpdate();
            return;
        }
    }

    private void forwardToSourceBrokers(InitiateReverseConnectionsRequestData requestData, Seq<CompletableFuture<Void>> futures) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(92).append("Forward initiate reverse connection request from source link coordinator to source brokers: ").append(requestData).toString());
        ConfluentAdmin admin = this.localAdmin();
        scala.collection.immutable.Map resultFutures = ((IterableOnceOps)((IterableOps)CollectionConverters$.MODULE$.ListHasAsScala(requestData.entries()).asScala().zip(futures)).map((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
                CompletableFuture future = (CompletableFuture)x0$1._2();
                return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)entry.initiateRequestId())), (Object)future);
            }
            throw new MatchError(null);
        })).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        CollectionConverters$.MODULE$.ListHasAsScala(requestData.entries()).asScala().groupBy((Function1 & Serializable)x$6 -> BoxesRunTime.boxToInteger((int)x$6.sourceBrokerId())).foreach((Function1 & Serializable)x0$2 -> {
            ClusterLinkOutboundConnectionManager.$anonfun$forwardToSourceBrokers$4(this, requestData, admin, resultFutures, x0$2);
            return BoxedUnit.UNIT;
        });
    }

    private void createReverseConnections(InitiateReverseConnectionsRequestData requestData, RequestContext requestContext, Seq<CompletableFuture<Void>> futures) {
        NetworkClient networkClient = ((RemoteNetworkClient)this.remoteNetworkClient().getOrElse((Function0 & Serializable)() -> {
            throw new IllegalStateException("Remote client connection manager not available");
        })).networkClient();
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(71).append("Create reverse connections from source brokers to destination brokers: ").append(requestData).toString());
        ((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(requestData.entries()).asScala().zip(futures)).foreach((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                Object object;
                block5: {
                    InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
                    CompletableFuture future = (CompletableFuture)x0$1._2();
                    try {
                        if (entry.initiateRequestId() == -1 && Option$.MODULE$.apply((Object)this.persistentConnections().get(BoxesRunTime.boxToInteger((int)entry.targetBrokerId()))).exists((Function1 & Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManager.$anonfun$createReverseConnections$4(x$7)))) {
                            object = BoxesRunTime.boxToBoolean((boolean)future.complete(null));
                            break block5;
                        }
                        if (entry.sourceBrokerId() == $this.brokerConfig.brokerId() || entry.sourceBrokerId() == -1) {
                            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(68).append("Create reverse connection from source broker to destination broker: ").append(entry).toString());
                            ReverseNode reverseNode = networkClient.reverseConnectionManager().createReversibleConnection(entry.initiateRequestId(), entry.targetBrokerId(), requestContext$1.listenerName, requestContext$1.principal, requestContext$1.principalSerde, requestContext$1.authenticationContext, $this.time.milliseconds());
                            object = reverseNode.future().whenComplete((x0$2, x1$1) -> {
                                if (x1$1 != null) {
                                    $this.metrics.outboundReverseConnectionFailedSensor().record();
                                    $this.metrics.deprecatedSourceReverseConnectionFailedSensor().record();
                                    networkClient.requestClusterLinkMetadataUpdate();
                                    future.completeExceptionally((Throwable)x1$1);
                                    this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(33).append("Failed to reverse connection for ").append(reverseNode).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> x1$1);
                                    return;
                                }
                                future.complete(x0$2);
                                this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(34).append("Completed connection reversal for ").append(reverseNode).toString());
                            });
                            break block5;
                        }
                        return BoxesRunTime.boxToBoolean((boolean)future.completeExceptionally(new InvalidRequestException(new StringBuilder(49).append("Incorrect source broker id, expected ").append($this.brokerConfig.brokerId()).append(", requested ").append(entry.sourceBrokerId()).toString())));
                    }
                    catch (Throwable e) {
                        future.completeExceptionally(e);
                        this.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(41).append("Failed to reverse connection for request ").append(requestData).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
                        object = BoxedUnit.UNIT;
                    }
                }
                return object;
            }
            throw new MatchError(null);
        });
    }

    @Override
    public void onNewRemoteLinkCoordinator(Node coordinator) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(63).append("Process remote metadata: isLocalCoordinator=").append(this.isLinkCoordinator()).append(" remoteCoordinator=").append(coordinator).toString());
        if (!this.enableReverseConnections) {
            return;
        }
        this.maybeCreatePersistentConnection();
        this.updateActiveLinkCount();
    }

    @Override
    public void onControllerChange(boolean isActive) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(52).append("Process local controller change, isActiveController=").append(isActive).toString());
        this.maybeProcessCoordinatorChange();
    }

    @Override
    public void onLinkMetadataPartitionLeaderChange() {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Process metadata partition leader change");
        this.maybeProcessCoordinatorChange();
    }

    private void maybeProcessCoordinatorChange() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            boolean isCoordinator = this.isLinkCoordinator();
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(51).append("Process link coordinator change isLocalCoordinator=").append(isCoordinator).toString());
            if (this.enableReverseConnections) {
                if (!isCoordinator) {
                    this.closePersistentConnections();
                } else {
                    this.maybeCreatePersistentConnection();
                }
                return;
            }
            return;
        }
    }

    @Override
    public void closeReverseConnectionAdmin() {
        this.remoteNetworkClient().foreach((Function1 & Serializable)x$8 -> {
            x$8.shutdown();
            return BoxedUnit.UNIT;
        });
        if (this.localAdmin() != null) {
            this.localAdmin().close(Duration.ZERO);
            return;
        }
    }

    @Override
    public void createReverseConnectionAdmin() {
        if (!this.enableReverseConnections) {
            return;
        }
        this.localAdmin_$eq(this.createLocalAdmin());
        this.remoteNetworkClient_$eq((Option<RemoteNetworkClient>)new Some((Object)this.createRemoteAdmin()));
        this.maybeCreatePersistentConnection();
    }

    public RemoteNetworkClient createRemoteAdmin() {
        ClusterLinkConfig config = this.currentConfig();
        ClusterLinkMetadata metadata = new ClusterLinkMetadata(this.brokerConfig, super.linkData().linkName(), super.linkData().linkId(), config.linkMode(), Predef$.MODULE$.Long2long(config.metadataRefreshBackoffMs()), Predef$.MODULE$.Long2long(config.metadataMaxAgeMs()));
        java.util.List addresses = ClientUtils.parseAndValidateAddresses(config.bootstrapServersToConnect(this.intranetConnectivityDeniedChecker), (ClientDnsLookup)config.dnsLookup());
        metadata.bootstrap(addresses);
        ClusterLinkMetadataThread metadataRefreshThread = new ClusterLinkMetadataThread(this.brokerConfig, config, (Option<ClusterLinkConnectionManager>)None$.MODULE$, metadata, this.metrics.metrics(), this.selectorMetricsRegistry, ClusterLinkClientType$OutboundConnectionAdmin$.MODULE$, this.quota, this.clusterLinkChannelContext, this.time);
        metadataRefreshThread.addCoordinatorListener(this);
        metadataRefreshThread.start();
        NetworkClient networkClient = (NetworkClient)metadataRefreshThread.clusterLinkClient().networkClient();
        networkClient.enableClusterLinkReverseConnectionAdmin(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()), this.reversalData(), (ReverseNode.ReverseCallback)this);
        return new RemoteNetworkClient(networkClient, metadataRefreshThread);
    }

    public ConfluentAdmin createLocalAdmin() {
        ClusterLinkConfig config = this.currentConfig();
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(77).append("Creating local admin for reverse connections from source cluster on listener ").append(this.linkListenerEndpoint()).toString());
        java.util.Map<String, Object> adminConfigs = config.localLinkClientConfigs(this.brokerConfig, this.linkListenerEndpoint(), super.linkData().tenantPrefix().nonEmpty());
        adminConfigs.put("client.id", new StringBuilder(38).append("cluster-link-").append(super.linkData().linkName()).append("-local-source-conn-admin-").append(this.brokerConfig.brokerId()).toString());
        return (ConfluentAdmin)Admin.create(adminConfigs);
    }

    private void updateLinkListener(ClusterLinkConfig config) {
        this.linkListenerEndpoint_$eq(ClusterLinkUtils$.MODULE$.linkListenerEndpoint(super.linkData().tenantPrefix(), config, this.serverInfo, this.localReverseConnectionListenerMap()));
        this.linkListenerName_$eq(new ListenerName((String)this.linkListenerEndpoint().listenerName().orElseThrow(() -> new IllegalStateException("Listener name not set"))));
    }

    private ReverseConnectionRequestData reversalData() {
        Endpoint endpoint = this.linkListenerEndpoint();
        return new ReverseConnectionRequestData().setClusterLinkId(this.linkId()).setTargetClusterId((String)super.linkData().clusterId().getOrElse((Function0 & Serializable)() -> {
            throw new IllegalStateException("Remote cluster id not known");
        })).setSourceClusterId(super.localLogicalCluster()).setSourceBrokerId(this.brokerConfig.brokerId()).setSourceHost(endpoint.host()).setSourcePort(endpoint.port());
    }

    private void maybeCreatePersistentConnection() {
        if (this.isLinkCoordinator()) {
            this.remoteNetworkClient().foreach((Function1 & Serializable)client -> {
                ClusterLinkOutboundConnectionManager.$anonfun$maybeCreatePersistentConnection$1(this, client);
                return BoxedUnit.UNIT;
            });
            return;
        }
    }

    private void maybeCreatePersistentConnection(Node coordinator) {
        if (!Option$.MODULE$.apply((Object)this.persistentConnections().get(BoxesRunTime.boxToInteger((int)coordinator.id()))).exists((Function1 & Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManager.$anonfun$maybeCreatePersistentConnection$3(x$9)))) {
            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(58).append("Creating persistent connection to remote link coordinator ").append(coordinator).toString());
            InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(new Uuid(super.linkData().linkId().getMostSignificantBits(), super.linkData().linkId().getLeastSignificantBits())).setForwardToBroker(false).setTimeoutMs(Predef$.MODULE$.Integer2int(this.currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId(super.localLogicalCluster()).setTargetClusterId((String)super.linkData().clusterId().getOrElse((Function0 & Serializable)() -> {
                throw new IllegalStateException("Remote cluster id not known");
            })).setEntries(Collections.singletonList(new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(-1).setSourceBrokerId(this.brokerConfig.brokerId()).setTargetBrokerId(coordinator.id())));
            CompletableFuture future = new CompletableFuture();
            this.forwardToSourceBrokers(requestData, (Seq<CompletableFuture<Void>>)new .colon.colon(future, (List)Nil$.MODULE$));
            future.whenComplete((x0$1, x1$1) -> {
                if (x1$1 != null) {
                    this.warn((Function0<String>)(Function0 & Serializable)() -> "Failed to create persistent reverse connection", (Function0<Throwable>)(Function0 & Serializable)() -> x1$1);
                    this.requestMetadataUpdate();
                    return;
                }
                this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(52).append("Successfully created persistent connection to ").append(coordinator).append("  for ").append(requestData).toString());
            });
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(77).append("Not creating persistent connection, remoteController=").append(coordinator).append(", persistentConnections=").append(this.persistentConnections()).toString());
    }

    private void requestMetadataUpdate() {
        if (this.isActive()) {
            try {
                this.remoteNetworkClient().foreach((Function1 & Serializable)x$10 -> BoxesRunTime.boxToInteger((int)ClusterLinkOutboundConnectionManager.$anonfun$requestMetadataUpdate$1(x$10)));
                return;
            }
            catch (Exception e) {
                this.error((Function0<String>)(Function0 & Serializable)() -> "Failed to request metadata refresh", (Function0<Throwable>)(Function0 & Serializable)() -> e);
                return;
            }
        }
    }

    private void closePersistentConnections() {
        Object object = this.connectionUpdateLock();
        synchronized (object) {
            this.info((Function0<String>)(Function0 & Serializable)() -> "Closing persistent connections");
            ((IterableOnceOps)CollectionConverters$.MODULE$.ConcurrentMapHasAsScala(this.persistentConnections()).asScala().filter((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManager.$anonfun$closePersistentConnections$2(this, x0$1)))).foreach((Function1 & Serializable)x0$2 -> {
                ClusterLinkOutboundConnectionManager.$anonfun$closePersistentConnections$3(this, x0$2);
                return BoxedUnit.UNIT;
            });
            return;
        }
    }

    private void closeActiveReverseConnections() {
        Object object = this.connectionUpdateLock();
        synchronized (object) {
            this.info((Function0<String>)(Function0 & Serializable)() -> "Closing active reverse connections");
            ((IterableOnceOps)CollectionConverters$.MODULE$.ConcurrentMapHasAsScala(this.activeReverseConnections()).asScala().filter((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManager.$anonfun$closeActiveReverseConnections$2(this, x0$1)))).foreach((Function1 & Serializable)x0$2 -> {
                ClusterLinkOutboundConnectionManager.$anonfun$closeActiveReverseConnections$3(this, x0$2);
                return BoxedUnit.UNIT;
            });
            return;
        }
    }

    private int socketChannelKey(SocketChannel socketChannel) {
        return System.identityHashCode(socketChannel);
    }

    public boolean closeReverseChannel(ReverseChannel reverseChannel) {
        if (!reverseChannel.maybeClose()) {
            SocketChannel socketChannel = reverseChannel.socketChannel();
            try {
                this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(122).append("Runnable to close reverse channel '").append(reverseChannel).append("' not set. Channel may not have been added to SocketServer yet, closing socket channel.").toString());
                reverseChannel.socketChannel().close();
            }
            catch (Exception e) {
                this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(31).append("Failed to close socket channel ").append(socketChannel).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
            }
            return true;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(83).append("Close initiated for reverse channel '").append(reverseChannel).append("', the channel will be closed by SocketServer.").toString());
        return false;
    }

    public Option<ClusterLinkMetadataThread> metadataThread() {
        return this.remoteNetworkClient().map((Function1 & Serializable)x$11 -> x$11.metadataRefreshThread());
    }

    @Override
    public int persistentConnectionCount() {
        return this.persistentConnections().size();
    }

    @Override
    public int reverseConnectionCount() {
        return this.activeReverseConnections().size();
    }

    public Seq<LazyResource<?>> lazyResources() {
        return (Seq)package$.MODULE$.Seq().empty();
    }

    public static final /* synthetic */ boolean $anonfun$onReverseConnection$4(ReverseChannel x$5) {
        return x$5.socketChannel().isConnected();
    }

    public static final /* synthetic */ boolean $anonfun$forwardToSourceBrokers$10(int brokerId$1, scala.collection.immutable.Map resultFutures$1, InitiateReverseConnectionsRequestData.EntryData entry) {
        NetworkException e = new NetworkException(new StringBuilder(39).append("Source broker with id ").append(brokerId$1).append(" is not available").toString());
        return ((CompletableFuture)resultFutures$1.apply((Object)BoxesRunTime.boxToInteger((int)entry.initiateRequestId()))).completeExceptionally((Throwable)e);
    }

    public static final /* synthetic */ void $anonfun$forwardToSourceBrokers$4(ClusterLinkOutboundConnectionManager $this, InitiateReverseConnectionsRequestData requestData$1, ConfluentAdmin admin$1, scala.collection.immutable.Map resultFutures$1, Tuple2 x0$2) {
        if (x0$2 != null) {
            int brokerId = x0$2._1$mcI$sp();
            Buffer entries = (Buffer)x0$2._2();
            if (brokerId == -1 || $this.brokerConfig.brokerId() == brokerId || $this.metadataManager.isBrokerOnline(brokerId)) {
                InitiateReverseConnectionsRequestData brokerRequest = new InitiateReverseConnectionsRequestData().setClusterLinkId(requestData$1.clusterLinkId()).setSourceClusterId(requestData$1.sourceClusterId()).setTargetClusterId(requestData$1.targetClusterId()).setForwardToBroker(false).setEntries(CollectionConverters$.MODULE$.BufferHasAsJava(entries).asJava());
                ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)admin$1, (InitiateReverseConnectionsRequestData)brokerRequest, (Integer)Predef$.MODULE$.int2Integer(brokerId)).forEach((x0$3, x1$1) -> x1$1.whenComplete((x0$4, x1$2) -> {
                    if (x1$2 != null) {
                        ((CompletableFuture)resultFutures$1.apply((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(x0$3)))).completeExceptionally((Throwable)x1$2);
                        $this.metrics.outboundReverseConnectionFailedSensor().record();
                        $this.metrics.deprecatedSourceReverseConnectionFailedSensor().record();
                        $this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(65).append("Connection reversal request to local broker failed for requestId=").append(x0$3).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> x1$2);
                        return;
                    }
                    ((CompletableFuture)resultFutures$1.apply((Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(x0$3)))).complete(x0$4);
                    $this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Completed initiate reversal request for requestId=").append(x0$3).toString());
                }));
                return;
            }
            entries.foreach((Function1 & Serializable)entry -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManager.$anonfun$forwardToSourceBrokers$10(brokerId, resultFutures$1, entry)));
            return;
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ boolean $anonfun$createReverseConnections$4(ReverseChannel x$7) {
        return x$7.socketChannel().isConnected();
    }

    public static final /* synthetic */ void $anonfun$maybeCreatePersistentConnection$1(ClusterLinkOutboundConnectionManager $this, RemoteNetworkClient client) {
        Option<Node> option = client.metadataRefreshThread().remoteLinkCoordinator();
        if (option instanceof Some) {
            Node coordinator = (Node)((Some)option).value();
            $this.maybeCreatePersistentConnection(coordinator);
            return;
        }
        if (None$.MODULE$.equals(option)) {
            $this.debug((Function0<String>)(Function0 & Serializable)() -> "Remote coordinator not known, request metadata");
            $this.requestMetadataUpdate();
            return;
        }
        throw new MatchError(option);
    }

    public static final /* synthetic */ boolean $anonfun$maybeCreatePersistentConnection$3(ReverseChannel x$9) {
        return x$9.socketChannel().isConnected();
    }

    public static final /* synthetic */ int $anonfun$requestMetadataUpdate$1(RemoteNetworkClient x$10) {
        return x$10.metadataRefreshThread().clusterLinkMetadata().requestUpdate();
    }

    public static final /* synthetic */ boolean $anonfun$closePersistentConnections$2(ClusterLinkOutboundConnectionManager $this, Tuple2 x0$1) {
        if (x0$1 != null) {
            ReverseChannel reverseChannel = (ReverseChannel)x0$1._2();
            return $this.closeReverseChannel(reverseChannel);
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ void $anonfun$closePersistentConnections$3(ClusterLinkOutboundConnectionManager $this, Tuple2 x0$2) {
        if (x0$2 != null) {
            ReverseChannel reverseChannel = (ReverseChannel)x0$2._2();
            $this.onConnectionClose(reverseChannel.channel(), reverseChannel.reverseNode().requestId(), reverseChannel.reverseNode().remoteBrokerId(), false);
            return;
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ boolean $anonfun$closeActiveReverseConnections$2(ClusterLinkOutboundConnectionManager $this, Tuple2 x0$1) {
        if (x0$1 != null) {
            ReverseChannel reverseChannel = (ReverseChannel)x0$1._2();
            return $this.closeReverseChannel(reverseChannel);
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ void $anonfun$closeActiveReverseConnections$3(ClusterLinkOutboundConnectionManager $this, Tuple2 x0$2) {
        if (x0$2 != null) {
            ReverseChannel reverseChannel = (ReverseChannel)x0$2._2();
            $this.onConnectionClose(reverseChannel.channel(), reverseChannel.reverseNode().requestId(), reverseChannel.reverseNode().remoteBrokerId(), false);
            return;
        }
        throw new MatchError(null);
    }

    public ClusterLinkOutboundConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, Option<ClientInterceptor> clientInterceptor, ClusterLinkMetrics metrics, ClusterLinkSelectorMetricsRegistry selectorMetricsRegistry, ClusterLinkMetadataManager metadataManager, SocketServer socketServer, KafkaConfig brokerConfig, AuthorizerServerInfo serverInfo, ClusterLinkRequestQuota quota, Option<ClusterLinkChannelContext> clusterLinkChannelContext, Time time, boolean enableReverseConnections, Function0<Object> intranetConnectivityDeniedChecker) {
        this.initialConfig = initialConfig;
        this.clientInterceptor = clientInterceptor;
        this.metrics = metrics;
        this.selectorMetricsRegistry = selectorMetricsRegistry;
        this.metadataManager = metadataManager;
        this.socketServer = socketServer;
        this.brokerConfig = brokerConfig;
        this.serverInfo = serverInfo;
        this.quota = quota;
        this.clusterLinkChannelContext = clusterLinkChannelContext;
        this.time = time;
        this.enableReverseConnections = enableReverseConnections;
        this.intranetConnectivityDeniedChecker = intranetConnectivityDeniedChecker;
        super(linkData, initialConfig, localLogicalCluster, metadataManager, metrics, brokerConfig);
        this.connectionUpdateLock = new Object();
        this.persistentConnections = new ConcurrentHashMap();
        this.activeReverseConnections = new ConcurrentHashMap();
        this.localReverseConnectionListenerMap = brokerConfig.clusterLinkLocalReverseConnectionListenerMap();
        this.remoteNetworkClient = None$.MODULE$;
        this.logIdent_$eq(new StringBuilder(48).append("[ClusterLinkOutboundConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(brokerConfig.brokerId()).append("] ").toString());
        if (enableReverseConnections) {
            this.updateLinkListener(initialConfig);
        }
    }
}

