/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkAdminClient;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkDestConnectionManager$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ReverseClient;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\t%w!\u0002\u00192\u0011\u0003Ad!\u0002\u001e2\u0011\u0003Y\u0004\"\u0002\"\u0002\t\u0003\u0019\u0005b\u0002#\u0002\u0005\u0004%\t!\u0012\u0005\u0007%\u0006\u0001\u000b\u0011\u0002$\u0007\ti\n\u0004a\u0015\u0005\n=\u0016\u0011\t\u0011)A\u0005?\u0016D\u0001BZ\u0003\u0003\u0002\u0003\u0006Ia\u001a\u0005\nU\u0016\u0011\t\u0011)A\u0005WZD\u0001b^\u0003\u0003\u0002\u0003\u0006I\u0001\u001f\u0005\u000b\u0003\u001b)!\u0011!Q\u0001\n\u0005=\u0001BCA\u000b\u000b\t\u0005\t\u0015!\u0003\u0002\u0018!Q\u0011QE\u0003\u0003\u0002\u0003\u0006I!a\n\t\u0015\u0005eRA!A!\u0002\u0013\tY\u0004\u0003\u0006\u0002B\u0015\u0011\t\u0011)A\u0005\u0003\u0007B!\"a\u0013\u0006\u0005\u0003\u0005\u000b\u0011BA'\u0011\u0019\u0011U\u0001\"\u0001\u0002^!I\u00111O\u0003C\u0002\u0013%\u0011Q\u000f\u0005\t\u0003\u0017+\u0001\u0015!\u0003\u0002x!A\u0011QR\u0003C\u0002\u0013%Q\tC\u0004\u0002\u0010\u0016\u0001\u000b\u0011\u0002$\t\u0011\u0005EUA1A\u0005\n\u0015Cq!a%\u0006A\u0003%a\t\u0003\u0005\u0002\u0016\u0016\u0011\r\u0011\"\u0003F\u0011\u001d\t9*\u0002Q\u0001\n\u0019C\u0011\"!'\u0006\u0001\u0004%I!a'\t\u0013\u0005}U\u00011A\u0005\n\u0005\u0005\u0006\u0002CAW\u000b\u0001\u0006K!!(\t\u0013\u0005]V\u00011A\u0005\n\u0005e\u0006\"CA_\u000b\u0001\u0007I\u0011BA`\u0011!\t\u0019-\u0002Q!\n\u0005m\u0006bBAd\u000b\u0011\u0005\u0013\u0011\u001a\u0005\t\u0003K,A\u0011A\u0019\u0002h\"9!QB\u0003\u0005B\t=\u0001b\u0002B\u0013\u000b\u0011\u0005#q\u0005\u0005\b\u0005O*A\u0011\u0002B5\u0011\u001d\u0011\u0019)\u0002C\u0005\u0005\u000bCqAa'\u0006\t\u0003\u0012i\nC\u0004\u0003*\u0016!\tEa+\t\u000f\t5V\u0001\"\u0003\u0003,\"9!qV\u0003\u0005R\t-\u0006b\u0002BY\u000b\u0011E#1\u0016\u0005\b\u0005g+A\u0011\u0002BV\u0011\u001d\u0011),\u0002C\u0001\u0005oCqAa/\u0006\t\u0003\u0012i\fC\u0004\u0003@\u0016!\tE!0\t\u001b\t\u0005W\u0001%A\u0002\u0002\u0003%IAa1w\u00115\u0011)-\u0002I\u0001\u0004\u0003\u0005I\u0011\u0002BdK\u0006\u00013\t\\;ti\u0016\u0014H*\u001b8l\t\u0016\u001cHoQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0015\t\u00114'\u0001\u0003mS:\\'B\u0001\u001b6\u0003\u0019\u0019XM\u001d<fe*\ta'A\u0003lC\u001a\\\u0017m\u0001\u0001\u0011\u0005e\nQ\"A\u0019\u0003A\rcWo\u001d;fe2Kgn\u001b#fgR\u001cuN\u001c8fGRLwN\\'b]\u0006<WM]\n\u0003\u0003q\u0002\"!\u0010!\u000e\u0003yR\u0011aP\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0003z\u0012a!\u00118z%\u00164\u0017A\u0002\u001fj]&$h\bF\u00019\u0003QqU\r\u001f;SKZ,'o]3SKF,Xm\u001d;JIV\ta\t\u0005\u0002H!6\t\u0001J\u0003\u0002J\u0015\u00061\u0011\r^8nS\u000eT!a\u0013'\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002N\u001d\u0006!Q\u000f^5m\u0015\u0005y\u0015\u0001\u00026bm\u0006L!!\u0015%\u0003\u001b\u0005#x.\\5d\u0013:$XmZ3s\u0003UqU\r\u001f;SKZ,'o]3SKF,Xm\u001d;JI\u0002\u001a2!\u0002+X!\tIT+\u0003\u0002Wc\ta2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014\bC\u0001-\\\u001d\tI\u0014,\u0003\u0002[c\u0005\u00112\t\\;ti\u0016\u0014H*\u001b8l\r\u0006\u001cGo\u001c:z\u0013\taVLA\u000bEKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u000b\u0005i\u000b\u0014\u0001\u00037j].$\u0015\r^1\u0011\u0005\u0001\u001cW\"A1\u000b\u0005\t,\u0014A\u0001>l\u0013\t!\u0017MA\bDYV\u001cH/\u001a:MS:\\G)\u0019;b\u0013\tqV+A\u0007j]&$\u0018.\u00197D_:4\u0017n\u001a\t\u0003s!L!![\u0019\u0003#\rcWo\u001d;fe2Kgn[\"p]\u001aLw-A\nm_\u000e\fG\u000eT8hS\u000e\fGn\u00117vgR,'\u000f\u0005\u0002mg:\u0011Q.\u001d\t\u0003]zj\u0011a\u001c\u0006\u0003a^\na\u0001\u0010:p_Rt\u0014B\u0001:?\u0003\u0019\u0001&/\u001a3fM&\u0011A/\u001e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005It\u0014B\u00016V\u0003E\u0019G.[3oi&sG/\u001a:dKB$xN\u001d\t\u0004{e\\\u0018B\u0001>?\u0005\u0019y\u0005\u000f^5p]B\u0019A0!\u0003\u000e\u0003uT!A`@\u0002\u000f\rd\u0017.\u001a8ug*\u0019a'!\u0001\u000b\t\u0005\r\u0011QA\u0001\u0007CB\f7\r[3\u000b\u0005\u0005\u001d\u0011aA8sO&\u0019\u00111B?\u0003#\rc\u0017.\u001a8u\u0013:$XM]2faR|'/A\u0004nKR\u0014\u0018nY:\u0011\u0007e\n\t\"C\u0002\u0002\u0014E\u0012!c\u00117vgR,'\u000fT5oW6+GO]5dg\u0006\u0011\"/Z7pi\u0016\fE-\\5o\r\u0006\u001cGo\u001c:z!!i\u0014\u0011D4\u0002\u001e\u0005}\u0011bAA\u000e}\tIa)\u001e8di&|gN\r\t\u0003s\u0015\u00012!OA\u0011\u0013\r\t\u0019#\r\u0002\u0017\u00072,8\u000f^3s\u0019&t7.\u00113nS:\u001cE.[3oi\u0006)Bn\\2bY\u000e{gN\\!e[&tg)Y2u_JL\bCB\u001f\u0002*-\fi#C\u0002\u0002,y\u0012\u0011BR;oGRLwN\\\u0019\u0011\t\u0005=\u0012QG\u0007\u0003\u0003cQ1!a\r~\u0003\u0015\tG-\\5o\u0013\u0011\t9$!\r\u0003\u001d\r{gN\u001a7vK:$\u0018\tZ7j]\u0006yQ.\u001a;bI\u0006$\u0018-T1oC\u001e,'\u000fE\u0002:\u0003{I1!a\u00102\u0005i\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u00031\u0011'o\\6fe\u000e{gNZ5h!\u0011\t)%a\u0012\u000e\u0003MJ1!!\u00134\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\tQLW.\u001a\t\u0005\u0003\u001f\nI&\u0004\u0002\u0002R)!\u00111KA+\u0003\u0015)H/\u001b7t\u0015\r\t9f`\u0001\u0007G>lWn\u001c8\n\t\u0005m\u0013\u0011\u000b\u0002\u0005)&lW\r\u0006\f\u0002\u001e\u0005}\u0013\u0011MA2\u0003K\n9'!\u001b\u0002l\u00055\u0014qNA9\u0011\u0015q\u0006\u00031\u0001`\u0011\u00151\u0007\u00031\u0001h\u0011\u0015Q\u0007\u00031\u0001l\u0011\u00159\b\u00031\u0001y\u0011\u001d\ti\u0001\u0005a\u0001\u0003\u001fAq!!\u0006\u0011\u0001\u0004\t9\u0002C\u0004\u0002&A\u0001\r!a\n\t\u000f\u0005e\u0002\u00031\u0001\u0002<!9\u0011\u0011\t\tA\u0002\u0005\r\u0003bBA&!\u0001\u0007\u0011QJ\u0001\u0013G>tg.Z2uS>t'+Z9vKN$8/\u0006\u0002\u0002xAA\u0011\u0011PA>\u0003\u007f\n))D\u0001K\u0013\r\tiH\u0013\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bcA\u001f\u0002\u0002&\u0019\u00111\u0011 \u0003\u0007%sG\u000fE\u0002:\u0003\u000fK1!!#2\u00055\u0011VM^3sg\u0016\u001cE.[3oi\u0006\u00192m\u001c8oK\u000e$\u0018n\u001c8SKF,Xm\u001d;tA\u0005!b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\fQC\\3yiJ+g/\u001a:tKJ+\u0017/^3ti&#\u0007%A\u000bqKJ\u001c\u0018n\u001d;f]R\u001cuN\u001c8fGRLwN\\:\u0002-A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8og\u0002\n\u0001$Y2uSZ,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t\u0003e\t7\r^5wKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d\u0011\u0002-I,g/\u001a:tK\u000e{gN\\3di&|g.\u00113nS:,\"!!(\u0011\tuJ\u0018QQ\u0001\u001be\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]~#S-\u001d\u000b\u0005\u0003G\u000bI\u000bE\u0002>\u0003KK1!a*?\u0005\u0011)f.\u001b;\t\u0013\u0005-&$!AA\u0002\u0005u\u0015a\u0001=%c\u00059\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8BI6Lg\u000e\t\u0015\u00047\u0005E\u0006cA\u001f\u00024&\u0019\u0011Q\u0017 \u0003\u0011Y|G.\u0019;jY\u0016\fa\u0002\\8dC2\u001cuN\u001c8BI6Lg.\u0006\u0002\u0002<B!Q(_A\u0017\u0003IawnY1m\u0007>tg.\u00113nS:|F%Z9\u0015\t\u0005\r\u0016\u0011\u0019\u0005\n\u0003Wk\u0012\u0011!a\u0001\u0003w\u000bq\u0002\\8dC2\u001cuN\u001c8BI6Lg\u000e\t\u0015\u0004=\u0005E\u0016!E3oC\ndWm\u00117vgR,'\u000fT5oWR1\u00111UAf\u0003+Dq!!4 \u0001\u0004\ty-A\u0007oKR<xN]6DY&,g\u000e\u001e\t\u0004s\u0005E\u0017bAAjc\tA2\t\\;ti\u0016\u0014H*\u001b8l\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\t\u000f\u0005er\u00041\u0001\u0002XB!Q(_Am!\u0011\tY.!9\u000e\u0005\u0005u'\u0002BAp\u0003c\t\u0011\"\u001b8uKJt\u0017\r\\:\n\t\u0005\r\u0018Q\u001c\u0002\u0015\u0003\u0012l\u0017N\\'fi\u0006$\u0017\r^1NC:\fw-\u001a:\u00023I,g/\u001a:tK\u000e{gN\\3di&|g\u000e\u0015:pm&$WM\u001d\u000b\t\u0003S\fyPa\u0002\u0003\nA!Q(_Av!\u0011\ti/!?\u000f\t\u0005=\u0018Q_\u0007\u0003\u0003cTA!a=\u0002V\u00059a.\u001a;x_J\\\u0017\u0002BA|\u0003c\f1BU3wKJ\u001cXMT8eK&!\u00111`A\u007f\u0005I\u0019uN\u001c8fGRLwN\u001c)s_ZLG-\u001a:\u000b\t\u0005]\u0018\u0011\u001f\u0005\b\u0003\u001b\u0004\u0003\u0019\u0001B\u0001!\ra(1A\u0005\u0004\u0005\u000bi(!\u0004(fi^|'o[\"mS\u0016tG\u000fC\u0004\u0002:\u0001\u0002\r!a6\t\r\t-\u0001\u00051\u0001l\u0003!\u0019G.[3oi&#\u0017\u0001\u00079s_\u000e,7o\u001d*fm\u0016\u00148/Z\"p]:,7\r^5p]R1\u00111\u0015B\t\u00057AqAa\u0005\"\u0001\u0004\u0011)\"A\u0004dQ\u0006tg.\u001a7\u0011\t\u0005=(qC\u0005\u0005\u00053\t\tP\u0001\u0007LC\u001a\\\u0017m\u00115b]:,G\u000eC\u0004\u0003\u001e\u0005\u0002\rAa\b\u0002\u0017I,g/\u001a:tK:{G-\u001a\t\u0005\u0003_\u0014\t#\u0003\u0003\u0003$\u0005E(a\u0003*fm\u0016\u00148/\u001a(pI\u0016\f!$\u001b8ji&\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N$bA!\u000b\u0003N\tu\u0003C\u0002B\u0016\u0005k\u0011YD\u0004\u0003\u0003.\tEbb\u00018\u00030%\tq(C\u0002\u00034y\nq\u0001]1dW\u0006<W-\u0003\u0003\u00038\te\"aA*fc*\u0019!1\u0007 \u0011\r\u0005e$Q\bB!\u0013\r\u0011yD\u0013\u0002\u0012\u0007>l\u0007\u000f\\3uC\ndWMR;ukJ,\u0007\u0003\u0002B\"\u0005\u0013j!A!\u0012\u000b\u0007\t\u001dc*\u0001\u0003mC:<\u0017\u0002\u0002B&\u0005\u000b\u0012AAV8jI\"9!q\n\u0012A\u0002\tE\u0013!G5oSRL\u0017\r^3D_:tWm\u0019;j_:\u0014V-];fgR\u0004BAa\u0015\u0003Z5\u0011!Q\u000b\u0006\u0005\u0005/\n)&\u0001\u0005sKF,Xm\u001d;t\u0013\u0011\u0011YF!\u0016\u0003C%s\u0017\u000e^5bi\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\:SKF,Xm\u001d;\t\u000f\t}#\u00051\u0001\u0003b\u0005q!/Z9vKN$8i\u001c8uKb$\b\u0003\u0002B*\u0005GJAA!\u001a\u0003V\tq!+Z9vKN$8i\u001c8uKb$\u0018\u0001\u0007:fcV,7\u000f\u001e*fm\u0016\u00148/Z\"p]:,7\r^5p]RQ\u00111\u0015B6\u0005_\u0012\u0019Ha \t\u000f\t54\u00051\u0001\u0002\u0000\u0005I!/Z9vKN$\u0018\n\u001a\u0005\b\u0005c\u001a\u0003\u0019AAC\u0003\u0019\u0019G.[3oi\"9!QO\u0012A\u0002\t]\u0014AC:pkJ\u001cWMT8eKB!!\u0011\u0010B>\u001b\t\t)&\u0003\u0003\u0003~\u0005U#\u0001\u0002(pI\u0016DqA!!$\u0001\u0004\ty(\u0001\u0007eKN$(I]8lKJLE-\u0001\u000eg_J<\u0018M\u001d3U_J+Wn\u001c;f\u0007>|'\u000fZ5oCR|'\u000f\u0006\u0004\u0002$\n\u001d%q\u0013\u0005\b\u0005\u0013#\u0003\u0019\u0001BF\u0003-\u0011X-];fgR$\u0015\r^1\u0011\t\t5%1S\u0007\u0003\u0005\u001fSAA!%\u0002V\u00059Q.Z:tC\u001e,\u0017\u0002\u0002BK\u0005\u001f\u0013Q%\u00138ji&\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N\u0014V-];fgR$\u0015\r^1\t\u000f\teE\u00051\u0001\u0003*\u00059a-\u001e;ve\u0016\u001c\u0018AE8o\u0007>tGO]8mY\u0016\u00148\t[1oO\u0016$B!a)\u0003 \"9!\u0011U\u0013A\u0002\t\r\u0016AE5t\u0003\u000e$\u0018N^3D_:$(o\u001c7mKJ\u00042!\u0010BS\u0013\r\u00119K\u0010\u0002\b\u0005>|G.Z1o\u0003\rzg\u000eT5oW6+G/\u00193bi\u0006\u0004\u0016M\u001d;ji&|g\u000eT3bI\u0016\u00148\t[1oO\u0016$\"!a)\u0002;5\f\u0017PY3Qe>\u001cWm]:D_>\u0014H-\u001b8bi>\u00148\t[1oO\u0016\f1d\u00197pg\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\!e[&t\u0017\u0001H2sK\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u0017[\u0006L(-Z\"sK\u0006$XMU3n_R,\u0017\tZ7j]\u00069\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8DY&,g\u000e^\u000b\u0003\u0005s\u0003B!P=\u0003\u0002\u0005I\u0002/\u001a:tSN$XM\u001c;D_:tWm\u0019;j_:\u001cu.\u001e8u+\t\ty(\u0001\fsKZ,'o]3D_:tWm\u0019;j_:\u001cu.\u001e8u\u0003e\u0019X\u000f]3sI1|7-\u00197M_\u001eL7-\u00197DYV\u001cH/\u001a:\u0016\u0003-\fab];qKJ$C.\u001b8l\t\u0006$\u0018-F\u0001`\u0001")
public class ClusterLinkDestConnectionManager
extends ClusterLinkConnectionManager
implements ClusterLinkFactory.DestConnectionManager {
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final Function1<String, ConfluentAdmin> localConnAdminFactory;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;
    private volatile Option<ConfluentAdmin> localConnAdmin;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    private /* synthetic */ ClusterLinkData super$linkData() {
        return super.linkData();
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> x$1) {
        this.reverseConnectionAdmin = x$1;
    }

    private Option<ConfluentAdmin> localConnAdmin() {
        return this.localConnAdmin;
    }

    private void localConnAdmin_$eq(Option<ConfluentAdmin> x$1) {
        this.localConnAdmin = x$1;
    }

    @Override
    public void enableClusterLink(ClusterLinkNetworkClient networkClient, Option<AdminMetadataManager> metadataManager) {
        KafkaClient kafkaClient = networkClient.networkClient();
        if (kafkaClient instanceof NetworkClient) {
            NetworkClient networkClient2 = (NetworkClient)kafkaClient;
            networkClient2.enableDestinationClusterLink(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()), (ReverseNode.ConnectionProvider)this.reverseConnectionProvider(networkClient2, metadataManager, networkClient.clientId()).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
            return;
        }
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode == null) {
            return;
        }
        if (connectionMode.equals(connectionMode$Inbound$)) {
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> metadataManager, String clientId) {
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Inbound$)) {
            None$ x$4 = None$.MODULE$;
            ReverseClient reverseClient = new ReverseClient(networkClient, metadataManager, (Option<ClusterLinkAdminClient>)x$4, clientId);
            ReverseNode.ConnectionProvider provider = node -> this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node, $this.brokerConfig.brokerId());
            return new Some((Object)provider);
        }
        return None$.MODULE$;
    }

    @Override
    public void processReverseConnection(KafkaChannel channel2, ReverseNode reverseNode) {
        boolean bl;
        Option option;
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(channel2).append(" ").append(reverseNode).toString());
        this.ensureReverseConnectionsEnabled();
        if (!reverseNode.requestId().isPresent()) {
            this.maybeCreateRemoteAdmin();
            option = this.reverseConnectionAdmin();
            bl = true;
        } else {
            option = Option$.MODULE$.apply((Object)this.connectionRequests().remove(reverseNode.requestId().get()));
            bl = false;
        }
        boolean bl2 = bl;
        Option option2 = option;
        if (option2 instanceof Some) {
            ReverseClient client = (ReverseClient)((Some)option2).value();
            Consumer<KafkaChannel> closeCallback = channel -> {
                this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(38).append("Reverse channel ").append(channel).append(" has been disconnected").toString());
                $this.metrics.reverseConnectionClosedSensor().record();
                this.activeReverseConnections().decrementAndGet();
                if (bl2) {
                    if (this.persistentConnections().decrementAndGet() <= 0) {
                        client.persistentConnectionSource_$eq((Option<Integer>)None$.MODULE$);
                        if (this.isLinkCoordinator()) {
                            this.info((Function0<String>)(Function0 & Serializable)() -> "Persistent connection to source link coordinator was disconnected, awaiting new connection.");
                            return;
                        }
                        return;
                    }
                    return;
                }
            };
            this.activeReverseConnections().incrementAndGet();
            if (bl2) {
                client.persistentConnectionSource_$eq((Option<Integer>)new Some((Object)Predef$.MODULE$.int2Integer(reverseNode.remoteBrokerId())));
                this.persistentConnections().incrementAndGet();
            }
            this.metrics.reverseConnectionCreatedSensor().record();
            ReverseChannel reverseChannel = new ReverseChannel(channel2, reverseNode, closeCallback);
            client.networkClient().reverseAndAdd(reverseChannel);
            client.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString());
            return;
        }
        if (None$.MODULE$.equals(option2)) {
            throw new NetworkException("Reverse connection is no longer required");
        }
        throw new MatchError((Object)option2);
    }

    public Seq<CompletableFuture<Void>> initiateReverseConnections(InitiateReverseConnectionsRequest initiateConnectionRequest, RequestContext requestContext) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(70).append("Initiate or forward reverse connection request: localCluster=").append(this.super$localLogicalCluster()).append(" request=").append(initiateConnectionRequest).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData connData = initiateConnectionRequest.data();
        List futures = (List)package$.MODULE$.List().fill(connData.entries().size(), (Function0 & Serializable)() -> new CompletableFuture());
        try {
            String string = super.localLogicalCluster();
            String string2 = connData.sourceClusterId();
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                throw new InvalidRequestException(new StringBuilder(70).append("Cannot initiate reverse connection from destination cluster ").append(super.localLogicalCluster()).append(" to itself").toString());
            }
            this.forwardToRemoteCoordinator(connData, (Seq<CompletableFuture<Void>>)futures);
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable)() -> "Failing reverse connection request", (Function0<Throwable>)(Function0 & Serializable)() -> e);
            futures.foreach((Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)x$2.completeExceptionally(e)));
        }
        return futures;
    }

    private void requestReverseConnection(int requestId, ReverseClient client, Node sourceNode, int destBrokerId) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(90).append("Requesting reverse connection for dest broker ").append(destBrokerId).append(" with requestId ").append(requestId).append(" to source node ").append(sourceNode).append(" for client ").append(client.clientId()).toString());
        this.ensureReverseConnectionsEnabled();
        if (this.reverseConnectionAdmin().exists((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDestConnectionManager.$anonfun$requestReverseConnection$2(client, x$3)))) {
            throw new NetworkException(new StringBuilder(79).append("Waiting for persistent connection to request reverse connection for request id ").append(requestId).toString());
        }
        InitiateReverseConnectionsRequestData.EntryData entry = new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(requestId).setSourceBrokerId(sourceNode.id()).setTargetBrokerId(destBrokerId);
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(this.linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(this.currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String)super.linkData().clusterId().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(entry));
        if (this.isLinkCoordinator() || this.persistentConnectionCount() > 0) {
            CompletableFuture future = new CompletableFuture();
            this.forwardToRemoteCoordinator(requestData, (Seq<CompletableFuture<Void>>)new .colon.colon(future, (List)Nil$.MODULE$));
            future.whenComplete((x$4, e) -> this.onCompletion$1((Throwable)e, requestId, client, sourceNode));
        } else {
            int linkCoordinatorId = BoxesRunTime.unboxToInt((Object)this.linkCoordinator().map((Function1 & Serializable)x$5 -> BoxesRunTime.boxToInteger((int)x$5.id())).getOrElse((Function0 & Serializable)() -> {
                throw new CoordinatorNotAvailableException(new StringBuilder(43).append("Cluster link coordinator not available for ").append(this.super$linkData().linkName()).toString());
            }));
            ((KafkaFutureImpl)ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)((ConfluentAdmin)this.localConnAdmin().getOrElse((Function0 & Serializable)() -> {
                throw new IllegalStateException("Connection admin not created");
            })), (InitiateReverseConnectionsRequestData)requestData, (Integer)Predef$.MODULE$.int2Integer(linkCoordinatorId)).get(BoxesRunTime.boxToInteger((int)requestId))).whenComplete((x$6, e) -> this.onCompletion$1((Throwable)e, requestId, client, sourceNode)).toCompletionStage().toCompletableFuture();
        }
        this.connectionRequests().put(BoxesRunTime.boxToInteger((int)requestId), client);
    }

    private void forwardToRemoteCoordinator(InitiateReverseConnectionsRequestData requestData, Seq<CompletableFuture<Void>> futures) {
        boolean isLinkCoordinator = this.isLinkCoordinator();
        ConfluentAdmin admin = (ConfluentAdmin)this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$7 -> x$7.adminClient().map((Function1 & Serializable)x$8 -> x$8.admin())).getOrElse((Function0 & Serializable)() -> {
            if (isLinkCoordinator) {
                throw new NetworkException("Request cannot be forwarded to remote link coordinator at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote link coordinator since this broker is not the local link coordinator.");
        });
        Integer remoteCoordinator = (Integer)this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$9 -> x$9.persistentConnectionSource()).getOrElse((Function0 & Serializable)() -> {
            throw new NetworkException("Request cannot be forwarded to remote link coordinator because persistent connection is not yet available");
        });
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(91).append("Forward initiate reverse connection request to remote link coordinator: ").append(requestData).append(" remoteCoordinator=").append(remoteCoordinator).toString());
        Map requestFutures = ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)admin, (InitiateReverseConnectionsRequestData)requestData, (Integer)remoteCoordinator);
        ((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(requestData.entries()).asScala().zip(futures)).foreach((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
                CompletableFuture future = (CompletableFuture)x0$1._2();
                return ((KafkaFutureImpl)requestFutures.get(BoxesRunTime.boxToInteger((int)entry.initiateRequestId()))).whenComplete((x0$2, x1$1) -> {
                    if (x1$1 != null) {
                        this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entry.initiateRequestId()).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> x1$1);
                        future.completeExceptionally((Throwable)x1$1);
                        return;
                    }
                    this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entry.initiateRequestId()).toString());
                    future.complete(x0$2);
                });
            }
            throw new MatchError(null);
        });
    }

    @Override
    public void onControllerChange(boolean isActiveController) {
        this.maybeProcessCoordinatorChange();
    }

    @Override
    public void onLinkMetadataPartitionLeaderChange() {
        this.maybeProcessCoordinatorChange();
    }

    private void maybeProcessCoordinatorChange() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            boolean isCoordinator = this.isLinkCoordinator();
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(69).append("Process controller or metadata partition leader change isCoordinator=").append(isCoordinator).toString());
            if (this.reverseConnectionAdmin().isEmpty()) {
                if (isCoordinator) {
                    this.resetReverseConnectionAdmin();
                }
            } else if (!isCoordinator) {
                this.closeReverseConnectionAdmin();
            }
            return;
        }
    }

    @Override
    public void closeReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Closing reverse connection admin");
        this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$10 -> x$10.adminClient()).foreach((Function1 & Serializable)admin -> {
            CoreUtils$.MODULE$.swallow((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> admin.close(), this, Level.WARN);
            return BoxedUnit.UNIT;
        });
        this.reverseConnectionAdmin_$eq((Option<ReverseClient>)None$.MODULE$);
        if (!this.isActive()) {
            this.localConnAdmin().foreach((Function1 & Serializable)x$11 -> {
                x$11.close(Duration.ZERO);
                return BoxedUnit.UNIT;
            });
            this.localConnAdmin_$eq((Option<ConfluentAdmin>)None$.MODULE$);
            return;
        }
    }

    @Override
    public void createReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Recreate admin client used to initiate connection reversal requests");
        if (this.localConnAdmin().isEmpty()) {
            this.localConnAdmin_$eq((Option<ConfluentAdmin>)new Some(this.localConnAdminFactory.apply((Object)super.linkData().linkName())));
        }
        if (this.isLinkCoordinator()) {
            this.maybeCreateRemoteAdmin();
            return;
        }
    }

    private void maybeCreateRemoteAdmin() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            if (this.reverseConnectionAdmin().isEmpty()) {
                ClusterLinkAdminClient admin = (ClusterLinkAdminClient)this.remoteAdminFactory.apply((Object)this.currentConfig(), (Object)this);
                this.reverseConnectionAdmin_$eq((Option<ReverseClient>)new Some((Object)new ReverseClient(admin.networkClient(), (Option<AdminMetadataManager>)new Some((Object)admin.metadataManager()), (Option<ClusterLinkAdminClient>)new Some((Object)admin), admin.clientId())));
            }
            return;
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return this.reverseConnectionAdmin().map((Function1 & Serializable)x$12 -> x$12.networkClient());
    }

    @Override
    public int persistentConnectionCount() {
        if (this.isLinkCoordinator()) {
            return this.persistentConnections().get();
        }
        return 0;
    }

    @Override
    public int reverseConnectionCount() {
        return this.activeReverseConnections().get();
    }

    public static final /* synthetic */ boolean $anonfun$requestReverseConnection$2(ReverseClient client$2, ReverseClient x$3) {
        return x$3.networkClient() == client$2.networkClient();
    }

    private final void onCompletion$1(Throwable e, int requestId$1, ReverseClient client$2, Node sourceNode$1) {
        if (e != null) {
            this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(requestId$1).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId$1));
            client$2.networkClient().processReverseConnectionFailure(sourceNode$1);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Reverse connection has been created for requestId=").append(requestId$1).toString());
    }

    public ClusterLinkDestConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, Option<ClientInterceptor> clientInterceptor, ClusterLinkMetrics metrics, Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory, Function1<String, ConfluentAdmin> localConnAdminFactory, ClusterLinkMetadataManager metadataManager, KafkaConfig brokerConfig, Time time) {
        this.clientInterceptor = clientInterceptor;
        this.metrics = metrics;
        this.remoteAdminFactory = remoteAdminFactory;
        this.localConnAdminFactory = localConnAdminFactory;
        this.brokerConfig = brokerConfig;
        this.time = time;
        super(linkData, initialConfig, localLogicalCluster, metadataManager, metrics);
        this.connectionRequests = new ConcurrentHashMap();
        this.nextReverseRequestId = ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.localConnAdmin = None$.MODULE$;
        this.logIdent_$eq(new StringBuilder(44).append("[ClusterLinkDestConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(brokerConfig.brokerId()).append("] ").toString());
    }
}

