/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.File;
import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkAdminClient;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkInboundConnectionManager;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ConnectionMode;
import kafka.utils.LogCaptureAppender;
import kafka.utils.LogCaptureAppender$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\tEh\u0001B A\u0001\u001dCQA\u0014\u0001\u0005\u0002=CqA\u0015\u0001C\u0002\u0013%1\u000b\u0003\u0004Y\u0001\u0001\u0006I\u0001\u0016\u0005\b3\u0002\u0011\r\u0011\"\u0003[\u0011\u0019\u0019\u0007\u0001)A\u00057\"9A\r\u0001b\u0001\n\u0013)\u0007BB9\u0001A\u0003%a\rC\u0004s\u0001\t\u0007I\u0011\u0002.\t\rM\u0004\u0001\u0015!\u0003\\\u0011\u001d!\bA1A\u0005\niCa!\u001e\u0001!\u0002\u0013Y\u0006b\u0002<\u0001\u0005\u0004%Ia\u001e\u0005\u0007}\u0002\u0001\u000b\u0011\u0002=\t\u0011}\u0004!\u0019!C\u0005\u0003\u0003A\u0001\"a\u0004\u0001A\u0003%\u00111\u0001\u0005\n\u0003#\u0001!\u0019!C\u0005\u0003'A\u0001\"a\u0007\u0001A\u0003%\u0011Q\u0003\u0005\n\u0003;\u0001!\u0019!C\u0005\u0003?A\u0001\"a\n\u0001A\u0003%\u0011\u0011\u0005\u0005\n\u0003S\u0001!\u0019!C\u0005\u0003WA\u0001\"a\r\u0001A\u0003%\u0011Q\u0006\u0005\n\u0003k\u0001!\u0019!C\u0005\u0003oA\u0001\"!\u0012\u0001A\u0003%\u0011\u0011\b\u0005\n\u0003\u000f\u0002!\u0019!C\u0005\u0003\u0013B\u0001\"a\u0017\u0001A\u0003%\u00111\n\u0005\n\u0003;\u0002!\u0019!C\u0005\u0003?B\u0001\"a\u001b\u0001A\u0003%\u0011\u0011\r\u0005\n\u0003[\u0002!\u0019!C\u0005\u0003_B\u0001\"a\"\u0001A\u0003%\u0011\u0011\u000f\u0005\f\u0003\u0013\u0003\u0001\u0019!a\u0001\n\u0013\tY\tC\u0006\u0002\u0014\u0002\u0001\r\u00111A\u0005\n\u0005U\u0005bCAQ\u0001\u0001\u0007\t\u0011)Q\u0005\u0003\u001bC1\"a)\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002&\"Y\u0011Q\u0016\u0001A\u0002\u0003\u0007I\u0011BAX\u0011-\t\u0019\f\u0001a\u0001\u0002\u0003\u0006K!a*\t\u0017\u0005U\u0006\u00011AA\u0002\u0013%\u0011q\u0017\u0005\f\u0003\u007f\u0003\u0001\u0019!a\u0001\n\u0013\t\t\rC\u0006\u0002F\u0002\u0001\r\u0011!Q!\n\u0005e\u0006bCAd\u0001\u0001\u0007\t\u0019!C\u0005\u0003\u0013D1\"a3\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002N\"Y\u0011\u0011\u001b\u0001A\u0002\u0003\u0005\u000b\u0015BAA\u0011-\t\u0019\u000e\u0001a\u0001\u0002\u0004%I!!6\t\u0017\u0005}\u0007\u00011AA\u0002\u0013%\u0011\u0011\u001d\u0005\f\u0003K\u0004\u0001\u0019!A!B\u0013\t9\u000eC\u0004\u0002h\u0002!\t!!;\t\u000f\t\u0005\u0001\u0001\"\u0001\u0002j\"9!1\u0002\u0001\u0005\u0002\u0005%\bb\u0002B\b\u0001\u0011\u0005\u0011\u0011\u001e\u0005\b\u0005'\u0001A\u0011AAu\u0011\u001d\u00119\u0002\u0001C\u0001\u0003SDqAa\u0007\u0001\t\u0003\tI\u000fC\u0004\u0003 \u0001!\t!!;\t\u000f\t\r\u0002\u0001\"\u0001\u0002j\"9!q\u0005\u0001\u0005\n\t%\u0002b\u0002BD\u0001\u0011%!\u0011\u0012\u0005\b\u0005\u0017\u0003A\u0011\u0002BG\u0011%\u0011\u0019\u000bAI\u0001\n\u0013\u0011)\u000bC\u0004\u0003<\u0002!IA!0\t\u000f\t}\u0006\u0001\"\u0003\u0003B\"9!1\u001a\u0001\u0005\n\t5\u0007\"\u0003Bu\u0001E\u0005I\u0011\u0002Bv\u0011\u001d\u0011y\u000f\u0001C\u0005\u0003S\u0014qe\u00117vgR,'\u000fT5oW&s'm\\;oI\u000e{gN\\3di&|g.T1oC\u001e,'\u000fV3ti*\u0011\u0011IQ\u0001\u0005Y&t7N\u0003\u0002D\t\u000611/\u001a:wKJT\u0011!R\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\n\u0005\u0002J\u00196\t!JC\u0001L\u0003\u0015\u00198-\u00197b\u0013\ti%J\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003A\u0003\"!\u0015\u0001\u000e\u0003\u0001\u000bAB\u0019:pW\u0016\u00148i\u001c8gS\u001e,\u0012\u0001\u0016\t\u0003+Zk\u0011AQ\u0005\u0003/\n\u00131bS1gW\u0006\u001cuN\u001c4jO\u0006i!M]8lKJ\u001cuN\u001c4jO\u0002\n\u0001\u0002\\5oW:\u000bW.Z\u000b\u00027B\u0011A,Y\u0007\u0002;*\u0011alX\u0001\u0005Y\u0006twMC\u0001a\u0003\u0011Q\u0017M^1\n\u0005\tl&AB*ue&tw-A\u0005mS:\\g*Y7fA\u00051A.\u001b8l\u0013\u0012,\u0012A\u001a\t\u0003O>l\u0011\u0001\u001b\u0006\u0003S*\faaY8n[>t'BA#l\u0015\taW.\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002]\u0006\u0019qN]4\n\u0005AD'\u0001B+vS\u0012\fq\u0001\\5oW&#\u0007%A\bt_V\u00148-Z\"mkN$XM]%e\u0003A\u0019x.\u001e:dK\u000ecWo\u001d;fe&#\u0007%A\u0007eKN$8\t\\;ti\u0016\u0014\u0018\nZ\u0001\u000fI\u0016\u001cHo\u00117vgR,'/\u00133!\u0003!a\u0017N\\6ECR\fW#\u0001=\u0011\u0005edX\"\u0001>\u000b\u0005m$\u0015A\u0001>l\u0013\ti(PA\bDYV\u001cH/\u001a:MS:\\G)\u0019;b\u0003%a\u0017N\\6ECR\f\u0007%A\u0005mS:\\\u0007K]8qgV\u0011\u00111\u0001\t\u0005\u0003\u000b\tY!\u0004\u0002\u0002\b)\u0019\u0011\u0011B0\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003\u001b\t9A\u0001\u0006Qe>\u0004XM\u001d;jKN\f!\u0002\\5oWB\u0013x\u000e]:!\u0003=iW\r^1eCR\fW*\u00198bO\u0016\u0014XCAA\u000b!\r\t\u0016qC\u0005\u0004\u00033\u0001%AG\"mkN$XM\u001d'j].lU\r^1eCR\fW*\u00198bO\u0016\u0014\u0018\u0001E7fi\u0006$\u0017\r^1NC:\fw-\u001a:!\u0003%\u00198\r[3ek2,'/\u0006\u0002\u0002\"A\u0019\u0011+a\t\n\u0007\u0005\u0015\u0002I\u0001\u000bDYV\u001cH/\u001a:MS:\\7k\u00195fIVdWM]\u0001\u000bg\u000eDW\rZ;mKJ\u0004\u0013a\u00037j].l\u0015M\\1hKJ,\"!!\f\u0011\u0007E\u000by#C\u0002\u00022\u0001\u0013!c\u00117vgR,'\u000fT5oW6\u000bg.Y4fe\u0006aA.\u001b8l\u001b\u0006t\u0017mZ3sA\u0005ia.\u001a;x_J\\7\t\\5f]R,\"!!\u000f\u0011\t\u0005m\u0012\u0011I\u0007\u0003\u0003{Q1!a\u0010k\u0003\u001d\u0019G.[3oiNLA!a\u0011\u0002>\tia*\u001a;x_J\\7\t\\5f]R\faB\\3uo>\u00148n\u00117jK:$\b%\u0001\u000bbI6Lg.T3uC\u0012\fG/Y'b]\u0006<WM]\u000b\u0003\u0003\u0017\u0002B!!\u0014\u0002X5\u0011\u0011q\n\u0006\u0005\u0003#\n\u0019&A\u0005j]R,'O\\1mg*!\u0011QKA\u001f\u0003\u0015\tG-\\5o\u0013\u0011\tI&a\u0014\u0003)\u0005#W.\u001b8NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u0003U\tG-\\5o\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0002\nq!\\3ue&\u001c7/\u0006\u0002\u0002bA!\u00111MA4\u001b\t\t)GC\u0002\u0002^!LA!!\u001b\u0002f\t9Q*\u001a;sS\u000e\u001c\u0018\u0001C7fiJL7m\u001d\u0011\u0002\u001b\rdwn]3e\u00072LWM\u001c;t+\t\t\t\b\u0005\u0004\u0002t\u0005u\u0014\u0011Q\u0007\u0003\u0003kRA!a\u001e\u0002z\u00059Q.\u001e;bE2,'bAA>\u0015\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u0005}\u0014Q\u000f\u0002\f\u0003J\u0014\u0018-\u001f\"vM\u001a,'\u000fE\u0002R\u0003\u0007K1!!\"A\u0005Y\u0019E.^:uKJd\u0015N\\6BI6Lgn\u00117jK:$\u0018AD2m_N,Gm\u00117jK:$8\u000fI\u0001\u000bY&t7nQ8oM&<WCAAG!\r\t\u0016qR\u0005\u0004\u0003#\u0003%!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006qA.\u001b8l\u0007>tg-[4`I\u0015\fH\u0003BAL\u0003;\u00032!SAM\u0013\r\tYJ\u0013\u0002\u0005+:LG\u000fC\u0005\u0002 ~\t\t\u00111\u0001\u0002\u000e\u0006\u0019\u0001\u0010J\u0019\u0002\u00171Lgn[\"p]\u001aLw\rI\u0001\fY&t7.T3ue&\u001c7/\u0006\u0002\u0002(B\u0019\u0011+!+\n\u0007\u0005-\u0006I\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018a\u00047j].lU\r\u001e:jGN|F%Z9\u0015\t\u0005]\u0015\u0011\u0017\u0005\n\u0003?\u0013\u0013\u0011!a\u0001\u0003O\u000bA\u0002\\5oW6+GO]5dg\u0002\n1bY8o]6\u000bg.Y4feV\u0011\u0011\u0011\u0018\t\u0004#\u0006m\u0016bAA_\u0001\n\u00193\t\\;ti\u0016\u0014H*\u001b8l\u0013:\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014\u0018aD2p]:l\u0015M\\1hKJ|F%Z9\u0015\t\u0005]\u00151\u0019\u0005\n\u0003?+\u0013\u0011!a\u0001\u0003s\u000bAbY8o]6\u000bg.Y4fe\u0002\n1B]3n_R,\u0017\tZ7j]V\u0011\u0011\u0011Q\u0001\u0010e\u0016lw\u000e^3BI6Lgn\u0018\u0013fcR!\u0011qSAh\u0011%\ty\nKA\u0001\u0002\u0004\t\t)\u0001\u0007sK6|G/Z!e[&t\u0007%\u0001\u0006m_\u000e\fG.\u00113nS:,\"!a6\u0011\t\u0005e\u00171\\\u0007\u0003\u0003'JA!!8\u0002T\t\u00012*\u00194lC\u0006#W.\u001b8DY&,g\u000e^\u0001\u000fY>\u001c\u0017\r\\!e[&tw\fJ3r)\u0011\t9*a9\t\u0013\u0005}5&!AA\u0002\u0005]\u0017a\u00037pG\u0006d\u0017\tZ7j]\u0002\n\u0001\u0002^3be\u0012{wO\u001c\u000b\u0003\u0003/C3!LAw!\u0011\ty/!@\u000e\u0005\u0005E(\u0002BAz\u0003k\f1!\u00199j\u0015\u0011\t90!?\u0002\u000f),\b/\u001b;fe*\u0019\u00111`7\u0002\u000b),h.\u001b;\n\t\u0005}\u0018\u0011\u001f\u0002\n\u0003\u001a$XM]#bG\"\f!\u0004^3ti\u000e{gN\\3di&|g.T8eK>+HOY8v]\u0012D3A\fB\u0003!\u0011\tyOa\u0002\n\t\t%\u0011\u0011\u001f\u0002\u0005)\u0016\u001cH/A\ruKN$8i\u001c8oK\u000e$\u0018n\u001c8N_\u0012,\u0017J\u001c2pk:$\u0007fA\u0018\u0003\u0006\u0005YC/Z:u\u0007>tg.Z2uS>tWj\u001c3f\u0013:\u0014w.\u001e8e\u001d>$H*\u001b8l\u0007>|'\u000fZ5oCR|'\u000fK\u00021\u0005\u000b\tq\u0004^3tiB+'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8o\r\u0006LG.\u001e:fQ\r\t$QA\u0001\u0010i\u0016\u001cHOU3d_:4\u0017nZ;sK\"\u001a!G!\u0002\u0002\u0013Q,7\u000f\u001e)bkN,\u0007fA\u001a\u0003\u0006\u0005YB/Z:u!\u0006,8/\u001a(pi2Kgn[\"p_J$\u0017N\\1u_JD3\u0001\u000eB\u0003\u00031!Xm\u001d;M_\u001edUM^3mQ\r)$QA\u0001\u000fm\u0016\u0014\u0018NZ=M_\u001edUM^3m)!\t9Ja\u000b\u0003^\t5\u0004b\u0002B\u0017m\u0001\u0007!qF\u0001\u0006G2\f'P\u001f\u0019\u0005\u0005c\u0011Y\u0005\u0005\u0004\u00034\t\u0005#q\t\b\u0005\u0005k\u0011i\u0004E\u0002\u00038)k!A!\u000f\u000b\u0007\tmb)\u0001\u0004=e>|GOP\u0005\u0004\u0005\u007fQ\u0015A\u0002)sK\u0012,g-\u0003\u0003\u0003D\t\u0015#!B\"mCN\u001c(b\u0001B \u0015B!!\u0011\nB&\u0019\u0001!AB!\u0014\u0003,\u0005\u0005\t\u0011!B\u0001\u0005\u001f\u00121a\u0018\u00132#\u0011\u0011\tFa\u0016\u0011\u0007%\u0013\u0019&C\u0002\u0003V)\u0013qAT8uQ&tw\rE\u0002J\u00053J1Aa\u0017K\u0005\r\te.\u001f\u0005\b\u0005?2\u0004\u0019\u0001B1\u0003!\t\u0007\u000f]3oI\u0016\u0014\b\u0003\u0002B2\u0005Sj!A!\u001a\u000b\u0007\t\u001dD)A\u0003vi&d7/\u0003\u0003\u0003l\t\u0015$A\u0005'pO\u000e\u000b\u0007\u000f^;sK\u0006\u0003\b/\u001a8eKJDqAa\u001c7\u0001\u0004\u0011\t(A\u0006nCbdun\u001a'fm\u0016d\u0007#B%\u0003t\t]\u0014b\u0001B;\u0015\n1q\n\u001d;j_:\u0004BA!\u001f\u0003\u00046\u0011!1\u0010\u0006\u0005\u0005{\u0012y(A\u0003fm\u0016tGOC\u0002\u0003\u00026\fQa\u001d7gi)LAA!\"\u0003|\t)A*\u001a<fY\u0006\u00112M]3bi\u0016\u0014%o\\6fe\u000e{gNZ5h)\u0005!\u0016AF:fiV\u00048i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0015\r\u0005]%q\u0012BM\u0011\u001d\u0011\t\n\u000fa\u0001\u0005'\u000babY8o]\u0016\u001cG/[8o\u001b>$W\rE\u0002R\u0005+K1Aa&A\u00059\u0019uN\u001c8fGRLwN\\'pI\u0016D\u0011Ba'9!\u0003\u0005\rA!(\u0002\u001f1Lgn[\"p_J$\u0017N\\1u_J\u00042!\u0013BP\u0013\r\u0011\tK\u0013\u0002\b\u0005>|G.Z1o\u0003\u0001\u001aX\r^;q\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\t\u001d&\u0006\u0002BO\u0005S[#Aa+\u0011\t\t5&qW\u0007\u0003\u0005_SAA!-\u00034\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005kS\u0015AC1o]>$\u0018\r^5p]&!!\u0011\u0018BX\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u0011GJ,\u0017\r^3M_\u000e\fG.\u00113nS:$\"!a6\u0002#\r\u0014X-\u0019;f%\u0016lw\u000e^3BI6Lg\u000e\u0006\u0004\u0002\u0002\n\r'q\u0019\u0005\b\u0005\u000b\\\u0004\u0019AAG\u0003\u0019\u0019wN\u001c4jO\"9!\u0011Z\u001eA\u0002\u0005e\u0016aB7b]\u0006<WM]\u0001\fe\u00164XM]:f\u001d>$W\r\u0006\u0004\u0003P\nm'Q\u001d\t\u0005\u0005#\u00149.\u0004\u0002\u0003T*\u0019!Q\u001b5\u0002\u000f9,Go^8sW&!!\u0011\u001cBj\u0005-\u0011VM^3sg\u0016tu\u000eZ3\t\u000f\tuG\b1\u0001\u0003`\u00061an\u001c3f\u0013\u0012\u00042!\u0013Bq\u0013\r\u0011\u0019O\u0013\u0002\u0004\u0013:$\b\"\u0003BtyA\u0005\t\u0019\u0001Bp\u0003%\u0011X-];fgRLE-A\u000bsKZ,'o]3O_\u0012,G\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\t5(\u0006\u0002Bp\u0005S\u000baCZ8so\u0006\u0014H-\u00138ji&\fG/Z\"p]:,7\r\u001e")
public class ClusterLinkInboundConnectionManagerTest {
    private final KafkaConfig brokerConfig = this.createBrokerConfig();
    private final String linkName;
    private final Uuid linkId = Uuid.randomUuid();
    private final String sourceClusterId;
    private final String destClusterId;
    private final ClusterLinkData linkData = new ClusterLinkData(this.linkName(), this.linkId(), (Option)new Some((Object)this.sourceClusterId()), (Option)None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final ClusterLinkMetadataManager metadataManager = (ClusterLinkMetadataManager)Mockito.mock(ClusterLinkMetadataManager.class);
    private final ClusterLinkScheduler scheduler = new ClusterLinkScheduler(0, 100);
    private final ClusterLinkManager linkManager = (ClusterLinkManager)Mockito.mock(ClusterLinkManager.class);
    private final NetworkClient networkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
    private final AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 100L, 10000L, false);
    private final Metrics metrics = new Metrics();
    private final ArrayBuffer<ClusterLinkAdminClient> closedClients = ArrayBuffer$.MODULE$.empty();
    private ClusterLinkConfig linkConfig;
    private ClusterLinkMetrics linkMetrics;
    private ClusterLinkInboundConnectionManager connManager;
    private ClusterLinkAdminClient remoteAdmin;
    private KafkaAdminClient localAdmin;

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private String linkName() {
        return this.linkName;
    }

    private Uuid linkId() {
        return this.linkId;
    }

    private String sourceClusterId() {
        return this.sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    private ClusterLinkData linkData() {
        return this.linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    private ClusterLinkMetadataManager metadataManager() {
        return this.metadataManager;
    }

    private ClusterLinkScheduler scheduler() {
        return this.scheduler;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    private AdminMetadataManager adminMetadataManager() {
        return this.adminMetadataManager;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private ArrayBuffer<ClusterLinkAdminClient> closedClients() {
        return this.closedClients;
    }

    private ClusterLinkConfig linkConfig() {
        return this.linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig x$1) {
        this.linkConfig = x$1;
    }

    private ClusterLinkMetrics linkMetrics() {
        return this.linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics x$1) {
        this.linkMetrics = x$1;
    }

    private ClusterLinkInboundConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkInboundConnectionManager x$1) {
        this.connManager = x$1;
    }

    private ClusterLinkAdminClient remoteAdmin() {
        return this.remoteAdmin;
    }

    private void remoteAdmin_$eq(ClusterLinkAdminClient x$1) {
        this.remoteAdmin = x$1;
    }

    private KafkaAdminClient localAdmin() {
        return this.localAdmin;
    }

    private void localAdmin_$eq(KafkaAdminClient x$1) {
        this.localAdmin = x$1;
    }

    @AfterEach
    public void tearDown() {
        if (this.connManager() != null) {
            this.connManager().shutdown();
        }
        this.scheduler().shutdown();
        this.metrics().close();
    }

    @Test
    public void testConnectionModeOutbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        Assertions.assertNull((Object)this.remoteAdmin());
        Assertions.assertThrows(IllegalStateException.class, () -> this.connManager().startup());
        Assertions.assertNull((Object)this.remoteAdmin());
        KafkaChannel channel = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        Assertions.assertThrows(InvalidRequestException.class, () -> this.connManager().processReverseConnection(channel, this.reverseNode(1, -1)));
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client created unnecessarily for outbound dest connection manager");
        this.connManager().shutdown();
    }

    @Test
    public void testConnectionModeInbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, true);
        Assertions.assertNull((Object)this.remoteAdmin());
        this.connManager().startup();
        Assertions.assertNotNull((Object)this.remoteAdmin());
        Assertions.assertTrue((boolean)this.connManager().reverseConnectionProvider(this.networkClient(), (Option)new Some((Object)this.adminMetadataManager()), "").nonEmpty());
        KafkaChannel channel = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        ReverseNode node1 = this.reverseNode(1, -1);
        this.connManager().processReverseConnection(channel, node1);
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
        ReverseNode node2 = this.reverseNode(2, 5);
        Assertions.assertThrows(NetworkException.class, () -> this.connManager().processReverseConnection(channel, node2));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client created for inbound dest connection manager on link coordinator");
        Assertions.assertThrows(NetworkException.class, () -> ((ReverseNode.ConnectionProvider)this.connManager().reverseConnectionProvider(this.networkClient(), (Option)new Some((Object)this.adminMetadataManager()), "").get()).initiateConnect(new Node(0, "localhost", 1234)));
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client created for inbound dest connection manager on link coordinator");
        this.connManager().shutdown();
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client was not shutdown");
    }

    @Test
    public void testConnectionModeInboundNotLinkCoordinator() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, false);
        Assertions.assertNull((Object)this.remoteAdmin());
        this.connManager().startup();
        Assertions.assertNull((Object)this.remoteAdmin());
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client created for inbound dest connection manager");
        Assertions.assertThrows(Exception.class, () -> ((ReverseNode.ConnectionProvider)this.connManager().reverseConnectionProvider(this.networkClient(), (Option)new Some((Object)this.adminMetadataManager()), "").get()).initiateConnect(new Node(0, "localhost", 1234)));
        Assertions.assertNotNull((Object)this.localAdmin(), (String)"Local admin client created for inbound dest connection manager");
    }

    @Test
    public void testPersistentConnectionFailure() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        ((KafkaChannel)Mockito.doNothing().when((Object)channel)).close();
        ReverseNode node = this.reverseNode(1, -1);
        this.networkClient().reverseAndAdd((ReverseChannel)ArgumentMatchers.any());
        Mockito.when((Object)BoxedUnit.UNIT).thenThrow(new Throwable[]{new RuntimeException("Test exception")});
        Assertions.assertThrows(RuntimeException.class, () -> this.connManager().processReverseConnection(channel, node));
        ((NetworkClient)Mockito.verify((Object)this.networkClient())).reverseAndAdd((ReverseChannel)ArgumentMatchers.any());
    }

    @Test
    public void testReconfigure() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, true);
        this.connManager().startup();
        KafkaAdminClient oldLocalAdmin = this.localAdmin();
        ClusterLinkAdminClient oldAdmin = this.remoteAdmin();
        this.linkProps().setProperty("metadata.max.age.ms", "1000");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.linkConfig().originals());
        Assertions.assertNotNull((Object)this.remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.remoteAdmin());
        Assertions.assertEquals((Object)new .colon.colon((Object)oldAdmin, (List)Nil$.MODULE$), (Object)this.closedClients().toSeq());
        Assertions.assertSame((Object)oldLocalAdmin, (Object)this.localAdmin());
    }

    @Test
    public void testPause() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, true);
        this.connManager().startup();
        Assertions.assertNull((Object)this.localAdmin());
        Assertions.assertNotNull((Object)this.remoteAdmin(), (String)"Remote admin client was not recreated");
        ClusterLinkAdminClient oldAdmin = this.remoteAdmin();
        this.linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.linkConfig().originals());
        Assertions.assertEquals((Object)new .colon.colon((Object)oldAdmin, (List)Nil$.MODULE$), (Object)this.closedClients().toSeq());
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client created unnecessarily");
        Assertions.assertNull((Object)this.remoteAdmin(), (String)"Remote admin client was not shutdown");
        this.linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "false");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client created unnecessarily");
        Assertions.assertNotNull((Object)this.remoteAdmin(), (String)"Remote admin client was not recreated");
    }

    @Test
    public void testPauseNotLinkCoordinator() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertNull((Object)this.remoteAdmin());
        Assertions.assertNull((Object)this.localAdmin());
        this.forwardInitiateConnect();
        Assertions.assertNotNull((Object)this.localAdmin(), (String)"Local admin client was not created");
        this.linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "true");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.linkConfig().originals());
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client was not shutdown");
        Assertions.assertNull((Object)this.remoteAdmin());
        this.linkProps().setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp(), "false");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.connManager().reconfigure(this.linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
        Assertions.assertNull((Object)this.localAdmin(), (String)"Local admin client recreated unnecessarily");
        this.forwardInitiateConnect();
        Assertions.assertNotNull((Object)this.localAdmin(), (String)"Local admin client was not created");
        Assertions.assertNull((Object)this.remoteAdmin());
    }

    @Test
    public void testLogLevel() {
        Class<ClusterLinkInboundConnectionManager> clazz = ClusterLinkInboundConnectionManager.class;
        Level oldLevel = LogCaptureAppender$.MODULE$.setClassLoggerLevel(clazz, Level.ALL);
        LogCaptureAppender appender = LogCaptureAppender$.MODULE$.createAndRegister();
        try {
            this.verifyLogLevel(clazz, appender, (Option<org.slf4j.event.Level>)None$.MODULE$);
            this.verifyLogLevel(clazz, appender, (Option<org.slf4j.event.Level>)new Some(null));
            ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps((Object[])org.slf4j.event.Level.values()), (Function1 & Serializable)level -> {
                this.verifyLogLevel(clazz, appender, (Option<org.slf4j.event.Level>)new Some((Object)level));
                return BoxedUnit.UNIT;
            });
        }
        finally {
            LogCaptureAppender$.MODULE$.unregister(appender);
            LogCaptureAppender$.MODULE$.setClassLoggerLevel(clazz, oldLevel);
            appender.close();
        }
    }

    /*
     * Unable to fully structure code
     */
    private void verifyLogLevel(Class<?> clazz, LogCaptureAppender appender, Option<org.slf4j.event.Level> maxLogLevel) {
        currentMaxLevel = new AtomicReference<V>();
        if (maxLogLevel instanceof Some) {
            v0 = new LogContext("[TEST] ", currentMaxLevel);
        } else if (None$.MODULE$.equals(maxLogLevel)) {
            v0 = new LogContext("[TEST] ");
        } else {
            throw new MatchError(maxLogLevel);
        }
        logger = v0.logger(clazz);
        var7_6 = false;
        var8_7 = null;
        if (!(maxLogLevel instanceof Some)) ** GOTO lbl-1000
        var7_6 = true;
        var8_7 = (Some)maxLogLevel;
        if ((org.slf4j.event.Level)var8_7.value() == null) {
            v1 = org.slf4j.event.Level.ERROR;
        } else if (var7_6) {
            v1 = (org.slf4j.event.Level)var8_7.value();
        } else if (None$.MODULE$.equals(maxLogLevel)) {
            v1 = org.slf4j.event.Level.ERROR;
        } else {
            throw new MatchError(maxLogLevel);
        }
        maxLevel = v1;
        maxLogLevel.foreach((Function1)(Function1 & Serializable)LambdaMetafactory.altMetafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, $anonfun$verifyLogLevel$1$adapted(java.util.concurrent.atomic.AtomicReference org.slf4j.event.Level ), (Lorg/slf4j/event/Level;)Ljava/lang/Object;)(currentMaxLevel));
        logger.trace("trace message");
        ClusterLinkInboundConnectionManagerTest.assertLastLog$1(org.slf4j.event.Level.TRACE, "[TEST] trace message", appender, maxLevel);
        logger.debug("debug message");
        ClusterLinkInboundConnectionManagerTest.assertLastLog$1(org.slf4j.event.Level.DEBUG, "[TEST] debug message", appender, maxLevel);
        logger.info("info message");
        ClusterLinkInboundConnectionManagerTest.assertLastLog$1(org.slf4j.event.Level.INFO, "[TEST] info message", appender, maxLevel);
        logger.warn("warn message");
        ClusterLinkInboundConnectionManagerTest.assertLastLog$1(org.slf4j.event.Level.WARN, "[TEST] warn message", appender, maxLevel);
        logger.error("error message");
        ClusterLinkInboundConnectionManagerTest.assertLastLog$1(org.slf4j.event.Level.ERROR, "[TEST] error message", appender, maxLevel);
    }

    private KafkaConfig createBrokerConfig() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false);
        props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    private void setupConnectionManager(ConnectionMode connectionMode, boolean linkCoordinator) {
        this.linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        if (ConnectionMode.Outbound$.MODULE$.equals(connectionMode)) {
            this.linkProps().put("bootstrap.servers", "localhost:123");
        }
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), (Option)None$.MODULE$, true));
        this.linkMetrics_$eq(new ClusterLinkMetrics(this.linkName(), this.linkId(), ClusterLinkConfig.LinkMode.DESTINATION, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, (ConnectionMode)ConnectionMode.Inbound$.MODULE$, false, this.linkManager(), (Option)None$.MODULE$, this.metrics(), (Option)None$.MODULE$, false));
        this.linkMetrics().startup();
        this.connManager_$eq(new ClusterLinkInboundConnectionManager(this.linkData(), this.linkConfig(), this.destClusterId(), (Option)None$.MODULE$, this.linkMetrics(), (Function2 & Serializable)(config, manager) -> this.createRemoteAdmin((ClusterLinkConfig)config, (ClusterLinkInboundConnectionManager)manager), (Function1 & Serializable)x$6 -> this.createLocalAdmin(), this.metadataManager(), this.brokerConfig(), (Time)new MockTime()));
        Mockito.reset((Object[])new ClusterLinkMetadataManager[]{this.metadataManager()});
        if (linkCoordinator) {
            Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataManager().isLinkCoordinator(this.linkName()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
            return;
        }
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.metadataManager().isLinkCoordinator(this.linkName()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)this.metadataManager().linkCoordinator((String)ArgumentMatchers.any(), (ListenerName)ArgumentMatchers.any())).thenReturn((Object)new Some((Object)new Node(0, "localhost", 1234)));
    }

    private boolean setupConnectionManager$default$2() {
        return true;
    }

    private KafkaAdminClient createLocalAdmin() {
        this.localAdmin_$eq((KafkaAdminClient)Mockito.mock(KafkaAdminClient.class));
        ((KafkaAdminClient)Mockito.doAnswer(x$7 -> {
            this.localAdmin_$eq(null);
            return BoxedUnit.UNIT;
        }).when((Object)this.localAdmin())).close((Duration)ArgumentMatchers.any());
        return this.localAdmin();
    }

    private ClusterLinkAdminClient createRemoteAdmin(ClusterLinkConfig config, ClusterLinkInboundConnectionManager manager) {
        Assertions.assertEquals((Object)this.linkConfig().originals(), (Object)config.originals());
        Assertions.assertSame((Object)this.connManager(), (Object)manager);
        this.remoteAdmin_$eq((ClusterLinkAdminClient)Mockito.mock(ClusterLinkAdminClient.class));
        ((ClusterLinkAdminClient)Mockito.doAnswer(invocation -> {
            ClusterLinkInboundConnectionManagerTest.$anonfun$createRemoteAdmin$1(this, invocation);
            return BoxedUnit.UNIT;
        }).when((Object)this.remoteAdmin())).close();
        Mockito.when((Object)this.remoteAdmin().networkClient()).thenReturn((Object)this.networkClient());
        Mockito.when((Object)this.remoteAdmin().metadataManager()).thenReturn((Object)this.adminMetadataManager());
        return this.remoteAdmin();
    }

    private ReverseNode reverseNode(int nodeId, int requestId) {
        return new ReverseNode(nodeId, nodeId, new StringBuilder(4).append("host").append(nodeId).toString(), 1234, this.linkId(), requestId, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS, Optional.empty(), null);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    private void forwardInitiateConnect() {
        Assertions.assertThrows(Exception.class, () -> ((ReverseNode.ConnectionProvider)this.connManager().reverseConnectionProvider(this.networkClient(), (Option)new Some((Object)this.adminMetadataManager()), "").get()).initiateConnect(new Node(0, "localhost", 1234)));
    }

    private static final void assertLastLog$1(org.slf4j.event.Level logLevel, String expectedMessage, LogCaptureAppender appender$2, org.slf4j.event.Level maxLevel$1) {
        LoggingEvent event = (LoggingEvent)appender$2.getMessages().last();
        Assertions.assertEquals((Object)(maxLevel$1.toInt() < logLevel.toInt() ? maxLevel$1 : logLevel).name(), (Object)event.getLevel().toString());
        Assertions.assertEquals((Object)expectedMessage, (Object)event.getMessage());
    }

    public static final /* synthetic */ void $anonfun$createRemoteAdmin$1(ClusterLinkInboundConnectionManagerTest $this, InvocationOnMock invocation) {
        $this.closedClients().$plus$eq((Object)$this.remoteAdmin());
        $this.remoteAdmin_$eq(null);
    }

    public ClusterLinkInboundConnectionManagerTest() {
        this.linkName = "testLink";
        this.sourceClusterId = "sourceCluster";
        this.destClusterId = "destCluster";
    }

    public static final /* synthetic */ Object $anonfun$verifyLogLevel$1$adapted(AtomicReference currentMaxLevel$1, org.slf4j.event.Level x$1) {
        currentMaxLevel$1.set(x$1);
        return BoxedUnit.UNIT;
    }
}

