/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkAdminClient;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkInboundConnectionManager$;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.LazyResource;
import kafka.server.link.ReverseClient;
import kafka.server.link.ReverseClient$;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t}x!\u0002\u001b6\u0011\u0003ad!\u0002 6\u0011\u0003y\u0004\"\u0002$\u0002\t\u00039\u0005b\u0002%\u0002\u0005\u0004%\t!\u0013\u0005\u0007-\u0006\u0001\u000b\u0011\u0002&\u0007\ty*\u0004a\u0016\u0005\nE\u0016\u0011\t\u0011)A\u0005G&D\u0001B[\u0003\u0003\u0002\u0003\u0006Ia\u001b\u0005\n]\u0016\u0011\t\u0011)A\u0005_jD\u0001b_\u0003\u0003\u0002\u0003\u0006I\u0001 \u0005\u000b\u0003+)!\u0011!Q\u0001\n\u0005]\u0001BCA\u000f\u000b\t\u0005\t\u0015!\u0003\u0002 !Q\u0011QF\u0003\u0003\u0002\u0003\u0006I!a\f\t\u0015\u0005\u0005SA!A!\u0002\u0013\t\u0019\u0005\u0003\u0006\u0002J\u0015\u0011\t\u0011)A\u0005\u0003\u0017B!\"a\u0015\u0006\u0005\u0003\u0005\u000b\u0011BA+\u0011\u00191U\u0001\"\u0001\u0002f!I\u00111P\u0003C\u0002\u0013%\u0011Q\u0010\u0005\t\u0003'+\u0001\u0015!\u0003\u0002\u0000!A\u0011QS\u0003C\u0002\u0013%\u0011\nC\u0004\u0002\u0018\u0016\u0001\u000b\u0011\u0002&\t\u0011\u0005eUA1A\u0005\n%Cq!a'\u0006A\u0003%!\n\u0003\u0005\u0002\u001e\u0016\u0011\r\u0011\"\u0003J\u0011\u001d\ty*\u0002Q\u0001\n)C\u0011\"!)\u0006\u0001\u0004%I!a)\t\u0013\u0005\u001dV\u00011A\u0005\n\u0005%\u0006\u0002CA[\u000b\u0001\u0006K!!*\t\u0013\u0005}VA1A\u0005\n\u0005\u0005\u0007\u0002CAh\u000b\u0001\u0006I!a1\t\u0013\u0005EWA1A\u0005\n\u0005M\u0007\u0002CAq\u000b\u0001\u0006I!!6\t\u000f\u0005\rX\u0001\"\u0011\u0002f\"9\u0011q]\u0003\u0005B\u0005%\b\u0002\u0003B\u0003\u000b\u0011\u0005QGa\u0002\t\u000f\t5R\u0001\"\u0011\u00030!9!QI\u0003\u0005B\t\u001d\u0003b\u0002B>\u000b\u0011%!Q\u0010\u0005\b\u0005/+A\u0011\u0002BM\u0011\u001d\u0011y+\u0002C!\u0005cCqA!0\u0006\t\u0003\n)\u000fC\u0004\u0003@\u0016!I!!:\t\u000f\t\u0005W\u0001\"\u0015\u0002f\"9!1Y\u0003\u0005R\u0005\u0015\bb\u0002Bc\u000b\u0011%!q\u0019\u0005\b\u0005\u0013,A\u0011BAs\u0011\u001d\u0011Y-\u0002C\u0001\u0005\u001bDqA!5\u0006\t\u0003\u0012\u0019\u000eC\u0004\u0003V\u0016!\tEa5\t\u000f\t]W\u0001\"\u0011\u0003Z\"i!q_\u0003\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0003zjDQBa?\u0006!\u0003\r\t\u0011!C\u0005\u0005{L\u0017aI\"mkN$XM\u001d'j].LeNY8v]\u0012\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0006\u0003m]\nA\u0001\\5oW*\u0011\u0001(O\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003i\nQa[1gW\u0006\u001c\u0001\u0001\u0005\u0002>\u00035\tQGA\u0012DYV\u001cH/\u001a:MS:\\\u0017J\u001c2pk:$7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0014\u0005\u0005\u0001\u0005CA!E\u001b\u0005\u0011%\"A\"\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0013%AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002y\u0005!b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012,\u0012A\u0013\t\u0003\u0017Rk\u0011\u0001\u0014\u0006\u0003\u001b:\u000ba!\u0019;p[&\u001c'BA(Q\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003#J\u000bA!\u001e;jY*\t1+\u0001\u0003kCZ\f\u0017BA+M\u00055\tEo\\7jG&sG/Z4fe\u0006)b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u00043cA\u0003Y7B\u0011Q(W\u0005\u00035V\u0012Ad\u00117vgR,'\u000fT5oW\u000e{gN\\3di&|g.T1oC\u001e,'\u000f\u0005\u0002]?:\u0011Q(X\u0005\u0003=V\n!c\u00117vgR,'\u000fT5oW\u001a\u000b7\r^8ss&\u0011\u0001-\u0019\u0002\u0019\u0013:\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014(B\u000106\u0003!a\u0017N\\6ECR\f\u0007C\u00013h\u001b\u0005)'B\u00014:\u0003\tQ8.\u0003\u0002iK\ny1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-\u0003\u0002c3\u0006i\u0011N\\5uS\u0006d7i\u001c8gS\u001e\u0004\"!\u00107\n\u00055,$!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\u0019Bn\\2bY2{w-[2bY\u000ecWo\u001d;feB\u0011\u0001o\u001e\b\u0003cV\u0004\"A\u001d\"\u000e\u0003MT!\u0001^\u001e\u0002\rq\u0012xn\u001c;?\u0013\t1()\u0001\u0004Qe\u0016$WMZ\u0005\u0003qf\u0014aa\u0015;sS:<'B\u0001<C\u0013\tq\u0017,A\tdY&,g\u000e^%oi\u0016\u00148-\u001a9u_J\u00042!Q?\u0000\u0013\tq(I\u0001\u0004PaRLwN\u001c\t\u0005\u0003\u0003\t\t\"\u0004\u0002\u0002\u0004)!\u0011QAA\u0004\u0003\u001d\u0019G.[3oiNT1AOA\u0005\u0015\u0011\tY!!\u0004\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\ty!A\u0002pe\u001eLA!a\u0005\u0002\u0004\t\t2\t\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0002\u000f5,GO]5dgB\u0019Q(!\u0007\n\u0007\u0005mQG\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0005:f[>$X-\u00113nS:4\u0015m\u0019;pef\u0004\u0002\"QA\u0011W\u0006\u0015\u0012qE\u0005\u0004\u0003G\u0011%!\u0003$v]\u000e$\u0018n\u001c83!\tiT\u0001E\u0002>\u0003SI1!a\u000b6\u0005Y\u0019E.^:uKJd\u0015N\\6BI6Lgn\u00117jK:$\u0018!\u00067pG\u0006d7i\u001c8o\u0003\u0012l\u0017N\u001c$bGR|'/\u001f\t\u0007\u0003\u0006Er.!\u000e\n\u0007\u0005M\"IA\u0005Gk:\u001cG/[8ocA!\u0011qGA\u001f\u001b\t\tID\u0003\u0003\u0002<\u0005\r\u0011!B1e[&t\u0017\u0002BA \u0003s\u0011\u0001cS1gW\u0006\fE-\\5o\u00072LWM\u001c;\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ\u00042!PA#\u0013\r\t9%\u000e\u0002\u001b\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM]\u0001\rEJ|7.\u001a:D_:4\u0017n\u001a\t\u0005\u0003\u001b\ny%D\u00018\u0013\r\t\tf\u000e\u0002\f\u0017\u000647.Y\"p]\u001aLw-\u0001\u0003uS6,\u0007\u0003BA,\u0003Cj!!!\u0017\u000b\t\u0005m\u0013QL\u0001\u0006kRLGn\u001d\u0006\u0005\u0003?\n9!\u0001\u0004d_6lwN\\\u0005\u0005\u0003G\nIF\u0001\u0003US6,GCFA\u0013\u0003O\nI'a\u001b\u0002n\u0005=\u0014\u0011OA:\u0003k\n9(!\u001f\t\u000b\t\u0004\u0002\u0019A2\t\u000b)\u0004\u0002\u0019A6\t\u000b9\u0004\u0002\u0019A8\t\u000bm\u0004\u0002\u0019\u0001?\t\u000f\u0005U\u0001\u00031\u0001\u0002\u0018!9\u0011Q\u0004\tA\u0002\u0005}\u0001bBA\u0017!\u0001\u0007\u0011q\u0006\u0005\b\u0003\u0003\u0002\u0002\u0019AA\"\u0011\u001d\tI\u0005\u0005a\u0001\u0003\u0017Bq!a\u0015\u0011\u0001\u0004\t)&\u0001\nd_:tWm\u0019;j_:\u0014V-];fgR\u001cXCAA@!!\t\t)a!\u0002\b\u00065U\"\u0001(\n\u0007\u0005\u0015eJA\tD_:\u001cWO\u001d:f]RD\u0015m\u001d5NCB\u00042!QAE\u0013\r\tYI\u0011\u0002\u0004\u0013:$\bcA\u001f\u0002\u0010&\u0019\u0011\u0011S\u001b\u0003\u001bI+g/\u001a:tK\u000ec\u0017.\u001a8u\u0003M\u0019wN\u001c8fGRLwN\u001c*fcV,7\u000f^:!\u0003QqW\r\u001f;SKZ,'o]3SKF,Xm\u001d;JI\u0006)b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u0004\u0013!\u00069feNL7\u000f^3oi\u000e{gN\\3di&|gn]\u0001\u0017a\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8tA\u0005A\u0012m\u0019;jm\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\:\u00023\u0005\u001cG/\u001b<f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000fI\u0001\u0017e\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]V\u0011\u0011Q\u0015\t\u0005\u0003v\fi)\u0001\u000esKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o?\u0012*\u0017\u000f\u0006\u0003\u0002,\u0006E\u0006cA!\u0002.&\u0019\u0011q\u0016\"\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003gS\u0012\u0011!a\u0001\u0003K\u000b1\u0001\u001f\u00132\u0003]\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\\!e[&t\u0007\u0005K\u0002\u001c\u0003s\u00032!QA^\u0013\r\tiL\u0011\u0002\tm>d\u0017\r^5mK\u0006qAn\\2bY\u000e{gN\\!e[&tWCAAb!\u0015i\u0014QYAe\u0013\r\t9-\u000e\u0002\r\u0019\u0006T\u0018PU3t_V\u00148-\u001a\t\u0005\u0003o\tY-\u0003\u0003\u0002N\u0006e\"AD\"p]\u001adW/\u001a8u\u0003\u0012l\u0017N\\\u0001\u0010Y>\u001c\u0017\r\\\"p]:\fE-\\5oA\u0005q\u0002/\u001a:tSN$XM\u001c;D_:tWm\u0019;j_:\u001cv.\u001e:dK2{7m[\u000b\u0003\u0003+\u0004B!a6\u0002^6\u0011\u0011\u0011\u001c\u0006\u0004\u00037\u0014\u0016\u0001\u00027b]\u001eLA!a8\u0002Z\n1qJ\u00196fGR\fq\u0004]3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t7k\\;sG\u0016dunY6!\u0003\u001d\u0019H/\u0019:ukB$\"!a+\u0002#\u0015t\u0017M\u00197f\u00072,8\u000f^3s\u0019&t7\u000e\u0006\u0004\u0002,\u0006-\u0018Q\u001f\u0005\b\u0003[\f\u0003\u0019AAx\u00035qW\r^<pe.\u001cE.[3oiB\u0019Q(!=\n\u0007\u0005MXG\u0001\rDYV\u001cH/\u001a:MS:\\g*\u001a;x_J\\7\t\\5f]RDq!!\u0011\"\u0001\u0004\t9\u0010\u0005\u0003B{\u0006e\b\u0003BA~\u0005\u0003i!!!@\u000b\t\u0005}\u0018\u0011H\u0001\nS:$XM\u001d8bYNLAAa\u0001\u0002~\n!\u0012\tZ7j]6+G/\u00193bi\u0006l\u0015M\\1hKJ\f\u0011D]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o!J|g/\u001b3feRA!\u0011\u0002B\u0010\u0005O\u0011I\u0003\u0005\u0003B{\n-\u0001\u0003\u0002B\u0007\u00053qAAa\u0004\u0003\u00165\u0011!\u0011\u0003\u0006\u0005\u0005'\ti&A\u0004oKR<xN]6\n\t\t]!\u0011C\u0001\f%\u00164XM]:f\u001d>$W-\u0003\u0003\u0003\u001c\tu!AE\"p]:,7\r^5p]B\u0013xN^5eKJTAAa\u0006\u0003\u0012!9\u0011Q\u001e\u0012A\u0002\t\u0005\u0002\u0003BA\u0001\u0005GIAA!\n\u0002\u0004\tia*\u001a;x_J\\7\t\\5f]RDq!!\u0011#\u0001\u0004\t9\u0010\u0003\u0004\u0003,\t\u0002\ra\\\u0001\tG2LWM\u001c;JI\u0006A\u0002O]8dKN\u001c(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8\u0015\r\u0005-&\u0011\u0007B\u001e\u0011\u001d\u0011\u0019d\ta\u0001\u0005k\tqa\u00195b]:,G\u000e\u0005\u0003\u0003\u0010\t]\u0012\u0002\u0002B\u001d\u0005#\u0011AbS1gW\u0006\u001c\u0005.\u00198oK2DqA!\u0010$\u0001\u0004\u0011y$A\u0006sKZ,'o]3O_\u0012,\u0007\u0003\u0002B\b\u0005\u0003JAAa\u0011\u0003\u0012\tY!+\u001a<feN,gj\u001c3f\u0003iIg.\u001b;jCR,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t)\u0019\u0011IE!\u0019\u0003rA1!1\nB)\u0005+j!A!\u0014\u000b\u0007\t=#)\u0001\u0006d_2dWm\u0019;j_:LAAa\u0015\u0003N\t\u00191+Z9\u0011\r\u0005\u0005%q\u000bB.\u0013\r\u0011IF\u0014\u0002\u0012\u0007>l\u0007\u000f\\3uC\ndWMR;ukJ,\u0007\u0003BAl\u0005;JAAa\u0018\u0002Z\n!ak\\5e\u0011\u001d\u0011\u0019\u0007\na\u0001\u0005K\n\u0011$\u001b8ji&\fG/Z\"p]:,7\r^5p]J+\u0017/^3tiB!!q\rB7\u001b\t\u0011IG\u0003\u0003\u0003l\u0005u\u0013\u0001\u0003:fcV,7\u000f^:\n\t\t=$\u0011\u000e\u0002\"\u0013:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d*fcV,7\u000f\u001e\u0005\b\u0005g\"\u0003\u0019\u0001B;\u00039\u0011X-];fgR\u001cuN\u001c;fqR\u0004BAa\u001a\u0003x%!!\u0011\u0010B5\u00059\u0011V-];fgR\u001cuN\u001c;fqR\f\u0001D]3rk\u0016\u001cHOU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o))\tYKa \u0003\u0004\n\u001d%1\u0013\u0005\b\u0005\u0003+\u0003\u0019AAD\u0003%\u0011X-];fgRLE\rC\u0004\u0003\u0006\u0016\u0002\r!!$\u0002\r\rd\u0017.\u001a8u\u0011\u001d\u0011I)\na\u0001\u0005\u0017\u000b!b]8ve\u000e,gj\u001c3f!\u0011\u0011iIa$\u000e\u0005\u0005u\u0013\u0002\u0002BI\u0003;\u0012AAT8eK\"9!QS\u0013A\u0002\u0005\u001d\u0015\u0001\u00043fgR\u0014%o\\6fe&#\u0017A\u00074pe^\f'\u000f\u001a+p%\u0016lw\u000e^3D_>\u0014H-\u001b8bi>\u0014HCBAV\u00057\u0013Y\u000bC\u0004\u0003\u001e\u001a\u0002\rAa(\u0002\u0017I,\u0017/^3ti\u0012\u000bG/\u0019\t\u0005\u0005C\u00139+\u0004\u0002\u0003$*!!QUA/\u0003\u001diWm]:bO\u0016LAA!+\u0003$\n)\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+\u0017/^3ti\u0012\u000bG/\u0019\u0005\b\u0005[3\u0003\u0019\u0001B%\u0003\u001d1W\u000f^;sKN\f!c\u001c8D_:$(o\u001c7mKJ\u001c\u0005.\u00198hKR!\u00111\u0016BZ\u0011\u001d\u0011)l\na\u0001\u0005o\u000b!#[:BGRLg/Z\"p]R\u0014x\u000e\u001c7feB\u0019\u0011I!/\n\u0007\tm&IA\u0004C_>dW-\u00198\u0002G=tG*\u001b8l\u001b\u0016$\u0018\rZ1uCB\u000b'\u000f^5uS>tG*Z1eKJ\u001c\u0005.\u00198hK\u0006iR.Y=cKB\u0013xnY3tg\u000e{wN\u001d3j]\u0006$xN]\"iC:<W-A\u000edY>\u001cXMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u001dGJ,\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o\u0003Q\u0019'/Z1uK2{7-\u00197D_:t\u0017\tZ7j]R\u0011\u0011\u0011Z\u0001\u0017[\u0006L(-Z\"sK\u0006$XMU3n_R,\u0017\tZ7j]\u00069\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8DY&,g\u000e^\u000b\u0003\u0005\u001f\u0004B!Q?\u0003\"\u0005I\u0002/\u001a:tSN$XM\u001c;D_:tWm\u0019;j_:\u001cu.\u001e8u+\t\t9)\u0001\fsKZ,'o]3D_:tWm\u0019;j_:\u001cu.\u001e8u\u00035a\u0017M_=SKN|WO]2fgV\u0011!1\u001c\t\u0007\u0005\u0017\u0012\tF!81\t\t}'Q\u001d\t\u0006{\u0005\u0015'\u0011\u001d\t\u0005\u0005G\u0014)\u000f\u0004\u0001\u0005\u0017\t\u001d\u0018'!A\u0001\u0002\u000b\u0005!\u0011\u001e\u0002\u0004?\u0012\n\u0014\u0003\u0002Bv\u0005c\u00042!\u0011Bw\u0013\r\u0011yO\u0011\u0002\b\u001d>$\b.\u001b8h!\r\t%1_\u0005\u0004\u0005k\u0014%aA!os\u0006I2/\u001e9fe\u0012bwnY1m\u0019><\u0017nY1m\u00072,8\u000f^3s+\u0005y\u0017AD:va\u0016\u0014H\u0005\\5oW\u0012\u000bG/Y\u000b\u0002G\u0002")
public class ClusterLinkInboundConnectionManager
extends ClusterLinkConnectionManager
implements ClusterLinkFactory.InboundConnectionManager {
    private final ClusterLinkConfig initialConfig;
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkInboundConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final Function1<String, KafkaAdminClient> localConnAdminFactory;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;
    private final LazyResource<ConfluentAdmin> localConnAdmin;
    private final Object persistentConnectionSourceLock;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkInboundConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    private /* synthetic */ ClusterLinkData super$linkData() {
        return super.linkData();
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> x$1) {
        this.reverseConnectionAdmin = x$1;
    }

    private LazyResource<ConfluentAdmin> localConnAdmin() {
        return this.localConnAdmin;
    }

    private Object persistentConnectionSourceLock() {
        return this.persistentConnectionSourceLock;
    }

    @Override
    public void startup() {
        ConnectionMode connectionMode = this.initialConfig.connectionMode();
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        if (!(connectionMode != null ? !connectionMode.equals(connectionMode$Outbound$) : connectionMode$Outbound$ != null)) {
            throw new IllegalStateException("Inbound connection manager created in outbound connection mode");
        }
        super.startup();
    }

    @Override
    public void enableClusterLink(ClusterLinkNetworkClient networkClient, Option<AdminMetadataManager> metadataManager) {
        block8: {
            block7: {
                ConnectionMode$Inbound$ connectionMode$Inbound$;
                ConnectionMode connectionMode;
                block6: {
                    KafkaClient kafkaClient = networkClient.networkClient();
                    if (kafkaClient instanceof NetworkClient) {
                        NetworkClient networkClient2 = (NetworkClient)kafkaClient;
                        networkClient2.enableClusterLinkRequests(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull(Predef$.MODULE$.$conforms()), (ReverseNode.ConnectionProvider)this.reverseConnectionProvider(networkClient2, metadataManager, networkClient.clientId()).orNull(Predef$.MODULE$.$conforms()));
                        return;
                    }
                    connectionMode = this.currentConfig().connectionMode();
                    connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
                    if (connectionMode != null) break block6;
                    if (connectionMode$Inbound$ != null) {
                        return;
                    }
                    break block7;
                }
                if (!connectionMode.equals(connectionMode$Inbound$)) break block8;
            }
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> metadataManager, String clientId) {
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (!(connectionMode != null ? !connectionMode.equals(connectionMode$Inbound$) : connectionMode$Inbound$ != null)) {
            Option<ClusterLinkAdminClient> x$4 = ReverseClient$.MODULE$.$lessinit$greater$default$3();
            ReverseClient reverseClient = new ReverseClient(networkClient, metadataManager, x$4, clientId);
            ReverseNode.ConnectionProvider provider = node -> this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node, $this.brokerConfig.brokerId());
            return new Some((Object)provider);
        }
        return None$.MODULE$;
    }

    @Override
    public void processReverseConnection(KafkaChannel channel2, ReverseNode reverseNode) {
        boolean bl;
        Option option;
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(channel2).append(" ").append(reverseNode).toString());
        this.ensureReverseConnectionsEnabled();
        if (!reverseNode.requestId().isPresent()) {
            this.maybeCreateRemoteAdmin();
            option = this.reverseConnectionAdmin();
            bl = true;
        } else {
            option = Option$.MODULE$.apply((Object)this.connectionRequests().remove(reverseNode.requestId().get()));
            bl = false;
        }
        boolean bl2 = bl;
        Option option2 = option;
        if (option2 instanceof Some) {
            ReverseClient client = (ReverseClient)((Some)option2).value();
            Consumer<KafkaChannel> closeCallback = channel -> {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(38).append("Reverse channel ").append(channel).append(" has been disconnected").toString());
                $this.metrics.reverseConnectionClosedSensor().record();
                $this.metrics.deprecatedReverseConnectionClosedSensor().record();
                this.activeReverseConnections().decrementAndGet();
                if (bl2) {
                    Object object = this.persistentConnectionSourceLock();
                    synchronized (object) {
                        if (this.persistentConnections().decrementAndGet() <= 0) {
                            client.persistentConnectionSource_$eq((Option<Integer>)None$.MODULE$);
                            if (this.isLinkCoordinator()) {
                                this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Persistent connection to source link coordinator was disconnected, awaiting new connection.");
                            }
                        }
                        return;
                    }
                }
            };
            this.activeReverseConnections().incrementAndGet();
            if (bl2) {
                Object object = this.persistentConnectionSourceLock();
                synchronized (object) {
                    this.persistentConnections().incrementAndGet();
                    client.persistentConnectionSource_$eq((Option<Integer>)new Some((Object)Predef$.MODULE$.int2Integer(reverseNode.remoteBrokerId())));
                }
            }
            this.metrics.reverseConnectionCreatedSensor().record();
            this.metrics.deprecatedReverseConnectionCreatedSensor().record();
            ReverseChannel reverseChannel = new ReverseChannel(channel2, reverseNode, closeCallback);
            client.networkClient().reverseAndAdd(reverseChannel);
            client.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString());
            return;
        }
        if (None$.MODULE$.equals(option2)) {
            throw new NetworkException("Reverse connection is no longer required");
        }
        throw new MatchError((Object)option2);
    }

    @Override
    public Seq<CompletableFuture<Void>> initiateReverseConnections(InitiateReverseConnectionsRequest initiateConnectionRequest, RequestContext requestContext) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(70).append("Initiate or forward reverse connection request: localCluster=").append(this.super$localLogicalCluster()).append(" request=").append(initiateConnectionRequest).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData connData = initiateConnectionRequest.data();
        List futures = (List)List$.MODULE$.fill(connData.entries().size(), (Function0 & Serializable & scala.Serializable)() -> new CompletableFuture());
        try {
            String string = super.localLogicalCluster();
            String string2 = connData.sourceClusterId();
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                throw new InvalidRequestException(new StringBuilder(70).append("Cannot initiate reverse connection from destination cluster ").append(super.localLogicalCluster()).append(" to itself").toString());
            }
            this.forwardToRemoteCoordinator(connData, (Seq<CompletableFuture<Void>>)futures);
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failing reverse connection request", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            futures.foreach((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)x$4.completeExceptionally(e)));
        }
        return futures;
    }

    private void requestReverseConnection(int requestId, ReverseClient client, Node sourceNode, int destBrokerId) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(90).append("Requesting reverse connection for dest broker ").append(destBrokerId).append(" with requestId ").append(requestId).append(" to source node ").append(sourceNode).append(" for client ").append(client.clientId()).toString());
        this.ensureReverseConnectionsEnabled();
        if (this.reverseConnectionAdmin().exists((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkInboundConnectionManager.$anonfun$requestReverseConnection$2(client, x$5)))) {
            throw new NetworkException(new StringBuilder(79).append("Waiting for persistent connection to request reverse connection for request id ").append(requestId).toString());
        }
        InitiateReverseConnectionsRequestData.EntryData entry = new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(requestId).setSourceBrokerId(sourceNode.id()).setTargetBrokerId(destBrokerId);
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(this.linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(this.currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String)super.linkData().clusterId().orNull(Predef$.MODULE$.$conforms())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(entry));
        this.connectionRequests().put(BoxesRunTime.boxToInteger((int)requestId), client);
        try {
            if (!this.isLinkCoordinator() && this.persistentConnectionCount() <= 0) {
                int linkCoordinatorId = BoxesRunTime.unboxToInt((Object)this.linkCoordinatorId().getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                    throw new CoordinatorNotAvailableException(new StringBuilder(43).append("Cluster link coordinator not available for ").append(this.super$linkData().linkName()).toString());
                }));
                ((KafkaFutureImpl)ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)this.localConnAdmin().getOrCreate(true), (InitiateReverseConnectionsRequestData)requestData, (Integer)Predef$.MODULE$.int2Integer(linkCoordinatorId)).get(BoxesRunTime.boxToInteger((int)requestId))).whenComplete((x$7, e) -> this.onCompletion$1((Throwable)e, requestId, client, sourceNode));
                return;
            }
            CompletableFuture future = new CompletableFuture();
            this.forwardToRemoteCoordinator(requestData, (Seq<CompletableFuture<Void>>)new .colon.colon(future, (List)Nil$.MODULE$));
            future.whenComplete((x$6, e) -> this.onCompletion$1((Throwable)e, requestId, client, sourceNode));
        }
        catch (Throwable e2) {
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId));
            throw e2;
        }
    }

    private void forwardToRemoteCoordinator(InitiateReverseConnectionsRequestData requestData, Seq<CompletableFuture<Void>> futures) {
        boolean isLinkCoordinator = this.isLinkCoordinator();
        ConfluentAdmin admin = (ConfluentAdmin)this.reverseConnectionAdmin().flatMap((Function1 & Serializable & scala.Serializable)x$8 -> x$8.adminClient().map((Function1 & Serializable & scala.Serializable)x$9 -> x$9.admin())).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            if (isLinkCoordinator) {
                throw new NetworkException("Request cannot be forwarded to remote link coordinator at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote link coordinator since this broker is not the local link coordinator.");
        });
        Integer remoteCoordinator = (Integer)this.reverseConnectionAdmin().flatMap((Function1 & Serializable & scala.Serializable)x$10 -> x$10.persistentConnectionSource()).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            throw new NetworkException("Request cannot be forwarded to remote link coordinator because persistent connection is not yet available");
        });
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(91).append("Forward initiate reverse connection request to remote link coordinator: ").append(requestData).append(" remoteCoordinator=").append(remoteCoordinator).toString());
        Map requestFutures = ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)admin, (InitiateReverseConnectionsRequestData)requestData, (Integer)remoteCoordinator);
        ((IterableLike)((IterableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(requestData.entries()).asScala()).zip(futures, Buffer$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            if (x0$1 != null) {
                InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
                CompletableFuture future = (CompletableFuture)x0$1._2();
                return ((KafkaFutureImpl)requestFutures.get(BoxesRunTime.boxToInteger((int)entry.initiateRequestId()))).whenComplete((x0$2, x1$1) -> {
                    Void v = x0$2;
                    Throwable e = x1$1;
                    if (e != null) {
                        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entry.initiateRequestId()).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                        future.completeExceptionally(e);
                        return;
                    }
                    this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entry.initiateRequestId()).toString());
                    future.complete(v);
                });
            }
            throw new MatchError(null);
        });
    }

    @Override
    public void onControllerChange(boolean isActiveController) {
        this.maybeProcessCoordinatorChange();
    }

    @Override
    public void onLinkMetadataPartitionLeaderChange() {
        this.maybeProcessCoordinatorChange();
    }

    private void maybeProcessCoordinatorChange() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            boolean isCoordinator = this.isLinkCoordinator();
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(69).append("Process controller or metadata partition leader change isCoordinator=").append(isCoordinator).toString());
            if (this.reverseConnectionAdmin().isEmpty()) {
                if (isCoordinator) {
                    this.resetReverseConnectionAdmin();
                }
            } else if (!isCoordinator) {
                this.closeReverseConnectionAdmin();
            }
            return;
        }
    }

    @Override
    public void closeReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Closing reverse connection admin");
        this.reverseConnectionAdmin().flatMap((Function1 & Serializable & scala.Serializable)x$11 -> x$11.adminClient()).foreach((Function1 & Serializable & scala.Serializable)admin -> {
            CoreUtils$.MODULE$.swallow((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> admin.close(), this, CoreUtils$.MODULE$.swallow$default$3());
            return BoxedUnit.UNIT;
        });
        this.reverseConnectionAdmin_$eq((Option<ReverseClient>)None$.MODULE$);
        if (!this.isActive()) {
            this.localConnAdmin().closeResource();
            return;
        }
    }

    @Override
    public void createReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Recreate admin client used to initiate connection reversal requests");
        this.maybeCreateRemoteAdmin();
    }

    private ConfluentAdmin createLocalConnAdmin() {
        this.ensureReverseConnectionsEnabled();
        return (ConfluentAdmin)this.localConnAdminFactory.apply((Object)super.linkData().linkName());
    }

    private void maybeCreateRemoteAdmin() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            if (this.reverseConnectionAdmin().isEmpty() && this.isLinkCoordinator()) {
                ClusterLinkAdminClient admin = (ClusterLinkAdminClient)this.remoteAdminFactory.apply((Object)this.currentConfig(), (Object)this);
                this.reverseConnectionAdmin_$eq((Option<ReverseClient>)new Some((Object)new ReverseClient(admin.networkClient(), (Option<AdminMetadataManager>)new Some((Object)admin.metadataManager()), (Option<ClusterLinkAdminClient>)new Some((Object)admin), admin.clientId())));
            }
            return;
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return this.reverseConnectionAdmin().map((Function1 & Serializable & scala.Serializable)x$12 -> x$12.networkClient());
    }

    @Override
    public int persistentConnectionCount() {
        if (this.isLinkCoordinator()) {
            return this.persistentConnections().get();
        }
        return 0;
    }

    @Override
    public int reverseConnectionCount() {
        return this.activeReverseConnections().get();
    }

    @Override
    public Seq<LazyResource<?>> lazyResources() {
        return new .colon.colon(this.localConnAdmin(), (List)Nil$.MODULE$);
    }

    public static final /* synthetic */ boolean $anonfun$requestReverseConnection$2(ReverseClient client$2, ReverseClient x$5) {
        return x$5.networkClient() == client$2.networkClient();
    }

    private final void onCompletion$1(Throwable e, int requestId$1, ReverseClient client$2, Node sourceNode$1) {
        if (e != null) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(requestId$1).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId$1));
            client$2.networkClient().processReverseConnectionFailure(sourceNode$1);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Reverse connection has been created for requestId=").append(requestId$1).toString());
    }

    public ClusterLinkInboundConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, Option<ClientInterceptor> clientInterceptor, ClusterLinkMetrics metrics, Function2<ClusterLinkConfig, ClusterLinkInboundConnectionManager, ClusterLinkAdminClient> remoteAdminFactory, Function1<String, KafkaAdminClient> localConnAdminFactory, ClusterLinkMetadataManager metadataManager, KafkaConfig brokerConfig, Time time) {
        this.initialConfig = initialConfig;
        this.clientInterceptor = clientInterceptor;
        this.metrics = metrics;
        this.remoteAdminFactory = remoteAdminFactory;
        this.localConnAdminFactory = localConnAdminFactory;
        this.brokerConfig = brokerConfig;
        this.time = time;
        super(linkData, initialConfig, localLogicalCluster, metadataManager, metrics, brokerConfig);
        this.connectionRequests = new ConcurrentHashMap();
        this.nextReverseRequestId = ClusterLinkInboundConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.localConnAdmin = new LazyResource((Function0 & Serializable & scala.Serializable)() -> this.createLocalConnAdmin(), (Function1 & Serializable & scala.Serializable)x$2 -> {
            x$2.close(Duration.ZERO);
            return BoxedUnit.UNIT;
        }, time, (Function0<Object>)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> this.currentConfig().clientsMaxIdleMs());
        this.persistentConnectionSourceLock = new Object();
        this.logIdent_$eq(new StringBuilder(47).append("[ClusterLinkInboundConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(brokerConfig.brokerId()).append("] ").toString());
    }
}

