/*
 * Decompiled with CFR 0.152.
 */
package kafka.network;

import java.io.File;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.security.auth.login.Configuration;
import kafka.cluster.EndPoint;
import kafka.network.LinkComponents;
import kafka.network.RequestChannel;
import kafka.network.SocketServer;
import kafka.network.SocketServer$;
import kafka.network.TestInterceptor;
import kafka.network.TestInterceptor$;
import kafka.network.TestPrincipal;
import kafka.network.TooManyConnectionsException;
import kafka.security.CredentialProvider;
import kafka.server.ApiVersionManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.SimpleApiVersionManager;
import kafka.utils.JaasTestUtils;
import kafka.utils.JaasTestUtils$;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.test.TestUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0011ua\u0001\u0002\"D\u0001!CQa\u0014\u0001\u0005\u0002ACqa\u0015\u0001C\u0002\u0013%A\u000b\u0003\u0004\\\u0001\u0001\u0006I!\u0016\u0005\b9\u0002\u0011\r\u0011\"\u0003^\u0011\u0019Y\u0007\u0001)A\u0005=\"9A\u000e\u0001b\u0001\n\u0013i\u0007BB=\u0001A\u0003%a\u000eC\u0004{\u0001\t\u0007I\u0011B>\t\u000f\u0005]\u0001\u0001)A\u0005y\"I\u0011\u0011\u0004\u0001C\u0002\u0013%\u00111\u0004\u0005\t\u0003k\u0001\u0001\u0015!\u0003\u0002\u001e!I\u0011q\u0007\u0001C\u0002\u0013%\u0011\u0011\b\u0005\t\u0003\u000f\u0002\u0001\u0015!\u0003\u0002<!I\u0011\u0011\n\u0001C\u0002\u0013%\u00111\n\u0005\t\u00033\u0002\u0001\u0015!\u0003\u0002N!I\u00111\f\u0001C\u0002\u0013%\u0011Q\f\u0005\t\u0003K\u0002\u0001\u0015!\u0003\u0002`!9\u0011q\r\u0001\u0005\u0002\u0005%\u0004bBAD\u0001\u0011\u0005\u0011\u0011\u000e\u0005\b\u0003#\u0003A\u0011AA5\u0011\u001d\tY\n\u0001C\u0001\u0003SBq!a(\u0001\t\u0003\tI\u0007C\u0004\u0002$\u0002!\t!!\u001b\t\u000f\u0005\u001d\u0006\u0001\"\u0001\u0002j!9\u00111\u0016\u0001\u0005\u0002\u0005%\u0004bBAX\u0001\u0011\u0005\u0011\u0011\u000e\u0005\b\u0003g\u0003A\u0011AA5\u0011\u001d\t9\f\u0001C\u0001\u0003SBq!a/\u0001\t\u0003\tI\u0007C\u0004\u0002@\u0002!\t!!\u001b\t\u000f\u0005\r\u0007\u0001\"\u0001\u0002j!9\u0011q\u0019\u0001\u0005\u0002\u0005%\u0004bBAf\u0001\u0011%\u0011Q\u001a\u0005\b\u0003G\u0004A\u0011BAs\u0011\u001d\tY\r\u0001C\u0005\u0003gD\u0011B!\b\u0001#\u0003%IAa\b\t\u0013\tU\u0002!%A\u0005\n\t]\u0002b\u0002B\u001e\u0001\u0011%!Q\b\u0005\n\u0005g\u0002\u0011\u0013!C\u0005\u0005kB\u0011B!\u001f\u0001#\u0003%IAa\u001f\t\u000f\t}\u0004\u0001\"\u0003\u0003\u0002\"9!Q\u0011\u0001\u0005\n\t\u001d\u0005b\u0002BF\u0001\u0011%!Q\u0012\u0005\b\u0005+\u0003A\u0011\u0002BL\u0011\u001d\u0011)\u000b\u0001C\u0005\u0005OCqAa-\u0001\t\u0013\u0011)\fC\u0004\u0003:\u0002!IAa/\t\u000f\r]\u0001\u0001\"\u0003\u0004\u001a!I1Q\b\u0001\u0012\u0002\u0013%1q\b\u0005\b\u0007\u0007\u0002A\u0011BB#\u0011%\u0019i\u0006AI\u0001\n\u0013\u0011y\u0002C\u0004\u0004`\u0001!Ia!\u0019\t\u000f\r=\u0004\u0001\"\u0003\u0004r!911\u0010\u0001\u0005\n\ru\u0004bBBH\u0001\u0011%1\u0011\u0013\u0005\b\u0007S\u0003A\u0011BBV\u0011\u001d\u0019\u0019\f\u0001C\u0005\u0007kCqaa1\u0001\t\u0013\u0019)\rC\u0005\u0004^\u0002\t\n\u0011\"\u0003\u0004`\"I1\u0011\u001e\u0001\u0012\u0002\u0013%!q\u0004\u0005\b\u0007W\u0004A\u0011BBw\u0011\u001d\u0019y\u0010\u0001C\u0005\t\u0003Aq\u0001b\u0003\u0001\t\u0013!i\u0001C\u0004\u0005\u0012\u0001!I\u0001b\u0005\t\u000f\u0011]\u0001\u0001\"\u0003\u0005\u001a\t)\"+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8UKN$(B\u0001#F\u0003\u001dqW\r^<pe.T\u0011AR\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0011\n\u0005\u0002K\u001b6\t1JC\u0001M\u0003\u0015\u00198-\u00197b\u0013\tq5J\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003E\u0003\"A\u0015\u0001\u000e\u0003\r\u000b!c\u0019:fI\u0016tG/[1m!J|g/\u001b3feV\tQ\u000b\u0005\u0002W36\tqK\u0003\u0002Y\u000b\u0006A1/Z2ve&$\u00180\u0003\u0002[/\n\u00112I]3eK:$\u0018.\u00197Qe>4\u0018\u000eZ3s\u0003M\u0019'/\u001a3f]RL\u0017\r\u001c)s_ZLG-\u001a:!\u0003\u0011!\u0018.\\3\u0016\u0003y\u0003\"aX5\u000e\u0003\u0001T!!\u00192\u0002\u000bU$\u0018\u000e\\:\u000b\u0005\r$\u0017AB2p[6|gN\u0003\u0002GK*\u0011amZ\u0001\u0007CB\f7\r[3\u000b\u0003!\f1a\u001c:h\u0013\tQ\u0007M\u0001\u0003US6,\u0017!\u0002;j[\u0016\u0004\u0013aB:feZ,'o]\u000b\u0002]B\u0019q\u000e\u001e<\u000e\u0003AT!!\u001d:\u0002\u000f5,H/\u00192mK*\u00111oS\u0001\u000bG>dG.Z2uS>t\u0017BA;q\u0005\u0019\u0011UO\u001a4feB\u0011!k^\u0005\u0003q\u000e\u0013AbU8dW\u0016$8+\u001a:wKJ\f\u0001b]3sm\u0016\u00148\u000fI\u0001\u000f]\u0016$xo\u001c:l\u00072LWM\u001c;t+\u0005a\b#B8~\u007f\u0006-\u0011B\u0001@q\u0005\ri\u0015\r\u001d\t\u0005\u0003\u0003\t9!\u0004\u0002\u0002\u0004)\u0019\u0011Q\u00013\u0002\u000f\rd\u0017.\u001a8ug&!\u0011\u0011BA\u0002\u00055qU\r^<pe.\u001cE.[3oiB!\u0011QBA\n\u001b\t\tyAC\u0002\u0002\u0012\t\fq!\\3ue&\u001c7/\u0003\u0003\u0002\u0016\u0005=!aB'fiJL7m]\u0001\u0010]\u0016$xo\u001c:l\u00072LWM\u001c;tA\u0005)A.\u001b8lgV\u0011\u0011Q\u0004\t\u0007_v\fy\"a\f\u0011\t\u0005\u0005\u00121F\u0007\u0003\u0003GQA!!\n\u0002(\u0005!Q\u000f^5m\u0015\t\tI#\u0001\u0003kCZ\f\u0017\u0002BA\u0017\u0003G\u0011A!V+J\tB\u0019!+!\r\n\u0007\u0005M2I\u0001\bMS:\\7i\\7q_:,g\u000e^:\u0002\r1Lgn[:!\u0003U\u0011XM^3sg\u0016\u001cv.\u001e:dK\u000eC\u0017M\u001c8fYN,\"!a\u000f\u0011\t=$\u0018Q\b\t\u0005\u0003\u007f\t\u0019%\u0004\u0002\u0002B)\u0011AIY\u0005\u0005\u0003\u000b\n\tE\u0001\u0007LC\u001a\\\u0017m\u00115b]:,G.\u0001\fsKZ,'o]3T_V\u00148-Z\"iC:tW\r\\:!\u0003-Y\u0017MZ6b\u0019><w-\u001a:\u0016\u0005\u00055\u0003\u0003BA(\u0003+j!!!\u0015\u000b\u0007\u0005MS-A\u0003m_\u001e$$.\u0003\u0003\u0002X\u0005E#A\u0002'pO\u001e,'/\u0001\u0007lC\u001a\\\u0017\rT8hO\u0016\u0014\b%A\tm_\u001edUM^3m)>\u0014Vm\u001d;pe\u0016,\"!a\u0018\u0011\t\u0005=\u0013\u0011M\u0005\u0005\u0003G\n\tFA\u0003MKZ,G.\u0001\nm_\u001edUM^3m)>\u0014Vm\u001d;pe\u0016\u0004\u0013!B:fiV\u0003HCAA6!\rQ\u0015QN\u0005\u0004\u0003_Z%\u0001B+oSRD3AEA:!\u0011\t)(a!\u000e\u0005\u0005]$\u0002BA=\u0003w\n1!\u00199j\u0015\u0011\ti(a \u0002\u000f),\b/\u001b;fe*\u0019\u0011\u0011Q4\u0002\u000b),h.\u001b;\n\t\u0005\u0015\u0015q\u000f\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$un\u001e8)\u0007M\tY\t\u0005\u0003\u0002v\u00055\u0015\u0002BAH\u0003o\u0012\u0011\"\u00114uKJ,\u0015m\u00195\u0002#Q,7\u000f\u001e(fi^|'o[\"mS\u0016tG\u000fK\u0002\u0015\u0003+\u0003B!!\u001e\u0002\u0018&!\u0011\u0011TA<\u0005\u0011!Vm\u001d;\u0002=Q,7\u000f\u001e*fm\u0016\u00148/\u001a)mC&tG/\u001a=u\u0007>tg.Z2uS>t\u0007fA\u000b\u0002\u0016\u0006AB/Z:u%\u00164XM]:f'Nd7i\u001c8oK\u000e$\u0018n\u001c8)\u0007Y\t)*\u0001\u0012uKN$(+\u001a<feN,7+Y:m!2\f\u0017N\u001c;fqR\u001cuN\u001c8fGRLwN\u001c\u0015\u0004/\u0005U\u0015\u0001\b;fgR\u0014VM^3sg\u0016\u001c\u0016m\u001d7Tg2\u001cuN\u001c8fGRLwN\u001c\u0015\u00041\u0005U\u0015a\f;fgR\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\,ji\"\u0004F.Y5oi\u0016DHoU8ve\u000e,7k\u001d7EKN$\bfA\r\u0002\u0016\u0006\u0001D/Z:u%\u00164XM]:f\u0007>tg.Z2uS>tw+\u001b;i\t&4g-\u001a:f]R\u001c\u0016m\u001d7NK\u000eD\u0017M\\5t[ND3AGAK\u0003e!Xm\u001d;BkRDWM\u001c;jG\u0006$\u0018n\u001c8GC&dWO]3)\u0007m\t)*A\u000buKN$(I]8lKJLe\u000e^3sG\u0016\u0004Ho\u001c:)\u0007q\t)*A\u0014uKN$(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t)>lU\u000f\u001c;ja2,7+\u001a:wKJ\u001c\bfA\u000f\u0002\u0016\u0006IC/Z:u%\u00164XM]:f\u0007>tg.Z2uS>t7O\u0012:p[6+H\u000e^5qY\u0016\u001cVM\u001d<feND3AHAK\u0003A\"Xm\u001d;Nk2$\u0018\u000e\u001d7f%\u00164XM]:f\u0007>tg.Z2uS>t7OQ3uo\u0016,gnU1nKN+'O^3sg\"\u001aq$!&\u0002'Q,7\u000f^\"p]:,7\r^5p]2KW.\u001b;)\u0007\u0001\n)*A\fwKJLg-\u001f*fm\u0016\u00148/Z\"p]:,7\r^5p]R1\u00111NAh\u0003?Dq!!5\"\u0001\u0004\t\u0019.\u0001\nt_V\u00148-Z*feZ,'oQ8oM&<\u0007\u0003BAk\u00037l!!a6\u000b\u0007\u0005eW)\u0001\u0004tKJ4XM]\u0005\u0005\u0003;\f9NA\u0006LC\u001a\\\u0017mQ8oM&<\u0007bBAqC\u0001\u0007\u00111[\u0001\u0011I\u0016\u001cHoU3sm\u0016\u00148i\u001c8gS\u001e\f\u0011c\u0019:fCR,G*\u001b8l\u00072LWM\u001c;t)!\ty#a:\u0002l\u0006=\bbBAuE\u0001\u0007\u0011qD\u0001\u0007Y&t7.\u00133\t\r\u00055(\u00051\u0001w\u00031\u0019x.\u001e:dKN+'O^3s\u0011\u0019\t\tP\ta\u0001m\u0006QA-Z:u'\u0016\u0014h/\u001a:\u0015\u0019\u0005-\u0014Q_A|\u0003s\fYPa\u0005\t\u000f\u0005%8\u00051\u0001\u0002 !1\u0011Q^\u0012A\u0002YDa!!=$\u0001\u00041\b\"CA\u007fGA\u0005\t\u0019AA\u0000\u00031\u0001(/\u001b8dSB\fGn\u00149u!\u0015Q%\u0011\u0001B\u0003\u0013\r\u0011\u0019a\u0013\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\t\u001d!qB\u0007\u0003\u0005\u0013QAAa\u0003\u0003\u000e\u0005!\u0011-\u001e;i\u0015\tA&-\u0003\u0003\u0003\u0012\t%!AD&bM.\f\u0007K]5oG&\u0004\u0018\r\u001c\u0005\n\u0005+\u0019\u0003\u0013!a\u0001\u0005/\t1B\\;n%\u0016\fX/Z:ugB\u0019!J!\u0007\n\u0007\tm1JA\u0002J]R\f\u0011E^3sS\u001aL(+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8%I\u00164\u0017-\u001e7uIQ*\"A!\t+\t\u0005}(1E\u0016\u0003\u0005K\u0001BAa\n\u000325\u0011!\u0011\u0006\u0006\u0005\u0005W\u0011i#A\u0005v]\u000eDWmY6fI*\u0019!qF&\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u00034\t%\"!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006\tc/\u001a:jMf\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u0013eK\u001a\fW\u000f\u001c;%kU\u0011!\u0011\b\u0016\u0005\u0005/\u0011\u0019#A\u0006lC\u001a\\\u0017mQ8oM&<GCCAj\u0005\u007f\u0011\u0019E!\u0014\u0003j!9!\u0011\t\u0014A\u0002\t]\u0011\u0001\u00032s_.,'/\u00133\t\u000f\t\u0015c\u00051\u0001\u0003H\u0005\u00012/Z2ve&$\u0018\u0010\u0015:pi>\u001cw\u000e\u001c\t\u0005\u0005\u000f\u0011I%\u0003\u0003\u0003L\t%!\u0001E*fGV\u0014\u0018\u000e^=Qe>$xnY8m\u0011%\u0011yE\nI\u0001\u0002\u0004\u0011\t&A\u0007tCNdW*Z2iC:L7/\u001c\t\u0006\u0015\n\u0005!1\u000b\t\u0005\u0005+\u0012\u0019G\u0004\u0003\u0003X\t}\u0003c\u0001B-\u00176\u0011!1\f\u0006\u0004\u0005;:\u0015A\u0002\u001fs_>$h(C\u0002\u0003b-\u000ba\u0001\u0015:fI\u00164\u0017\u0002\u0002B3\u0005O\u0012aa\u0015;sS:<'b\u0001B1\u0017\"I!1\u000e\u0014\u0011\u0002\u0003\u0007!QN\u0001\u000bKb$(/\u0019)s_B\u001c\b\u0003BA\u0011\u0005_JAA!\u001d\u0002$\tQ\u0001K]8qKJ$\u0018.Z:\u0002+-\fgm[1D_:4\u0017n\u001a\u0013eK\u001a\fW\u000f\u001c;%gU\u0011!q\u000f\u0016\u0005\u0005#\u0012\u0019#A\u000blC\u001a\\\u0017mQ8oM&<G\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\tu$\u0006\u0002B7\u0005G\tA\u0003\u001d7bS:$X\r\u001f;TKJ4XM\u001d)s_B\u001cH\u0003\u0002B7\u0005\u0007CqA!\u0011*\u0001\u0004\u00119\"\u0001\btg2\u001cVM\u001d<feB\u0013x\u000e]:\u0015\t\t5$\u0011\u0012\u0005\b\u0005\u0003R\u0003\u0019\u0001B\f\u0003=\u0019\u0018m\u001d7TKJ4XM\u001d)s_B\u001cH\u0003\u0003B7\u0005\u001f\u0013\tJa%\t\u000f\t\u00053\u00061\u0001\u0003\u0018!9!QI\u0016A\u0002\t\u001d\u0003b\u0002B(W\u0001\u0007!\u0011K\u0001\rC\u0012$7k\u0019:b[V\u001bXM\u001d\u000b\t\u0003W\u0012IJ!(\u0003\"\"9!1\u0014\u0017A\u0002\tM\u0013!C7fG\"\fg.[:n\u0011\u001d\u0011y\n\fa\u0001\u0005'\n\u0001\"^:fe:\fW.\u001a\u0005\b\u0005Gc\u0003\u0019\u0001B*\u0003!\u0001\u0018m]:x_J$\u0017\u0001\u00047jgR,g.\u001a:OC6,G\u0003\u0002BU\u0005_\u0003B!a\u0010\u0003,&!!QVA!\u00051a\u0015n\u001d;f]\u0016\u0014h*Y7f\u0011\u001d\u0011\t,\fa\u0001\u0003'\faaY8oM&<\u0017a\u00048foN{7m[3u'\u0016\u0014h/\u001a:\u0015\u0007Y\u00149\fC\u0004\u00032:\u0002\r!a5\u0002!9,wo\u00117jK:$(+Z9vKN$HC\u0003B_\u0005\u0007\u00149Ma3\u0003|B!\u0011\u0011\u0001B`\u0013\u0011\u0011\t-a\u0001\u0003\u001b\rc\u0017.\u001a8u%\u0016\fX/Z:u\u0011\u0019\u0011)m\fa\u0001\u007f\u0006ia.\u001a;x_J\\7\t\\5f]RDqA!30\u0001\u0004\u0011\u0019&\u0001\u0004o_\u0012,\u0017\n\u001a\u0005\b\u0005\u001b|\u0003\u0019\u0001Bh\u0003\u001d\u0011W/\u001b7eKJ\u0004DA!5\u0003jB1!1\u001bBp\u0005KtAA!6\u0003\\6\u0011!q\u001b\u0006\u0004\u00053\u0014\u0017\u0001\u0003:fcV,7\u000f^:\n\t\tu'q[\u0001\u0010\u0003\n\u001cHO]1diJ+\u0017/^3ti&!!\u0011\u001dBr\u0005\u001d\u0011U/\u001b7eKJTAA!8\u0003XB!!q\u001dBu\u0019\u0001!ABa;\u0003L\u0006\u0005\t\u0011!B\u0001\u0005[\u00141a\u0018\u00132#\u0011\u0011yO!>\u0011\u0007)\u0013\t0C\u0002\u0003t.\u0013qAT8uQ&tw\rE\u0002K\u0005oL1A!?L\u0005\r\te.\u001f\u0005\b\u0005{|\u0003\u0019\u0001B\u0000\u0003\u00191W\u000f^;sKB11\u0011AB\u0004\u0007\u0017i!aa\u0001\u000b\t\r\u0015\u00111E\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BB\u0005\u0007\u0007\u0011\u0011cQ8na2,G/\u00192mK\u001a+H/\u001e:f!\u0011\u0019iaa\u0005\u000e\u0005\r=!\u0002BB\t\u0003O\tA\u0001\\1oO&!1QCB\b\u0005\u00111v.\u001b3\u0002\u001dI,7-Z5wKJ+\u0017/^3tiR111DB\u0015\u0007g\u0001Ba!\b\u0004$9\u0019!ka\b\n\u0007\r\u00052)\u0001\bSKF,Xm\u001d;DQ\u0006tg.\u001a7\n\t\r\u00152q\u0005\u0002\b%\u0016\fX/Z:u\u0015\r\u0019\tc\u0011\u0005\b\u0007W\u0001\u0004\u0019AB\u0017\u0003\u001d\u0019\u0007.\u00198oK2\u00042AUB\u0018\u0013\r\u0019\td\u0011\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011%\u0019)\u0004\rI\u0001\u0002\u0004\u00199$A\u0004uS6,w.\u001e;\u0011\u0007)\u001bI$C\u0002\u0004<-\u0013A\u0001T8oO\u0006A\"/Z2fSZ,'+Z9vKN$H\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\r\u0005#\u0006BB\u001c\u0005G\t!\u0003\u001d:pG\u0016\u001c8OT3yiJ+\u0017/^3tiRQ\u00111NB$\u0007\u0013\u001aie!\u0017\t\r\u0005e'\u00071\u0001w\u0011\u0019\u0019YE\ra\u0001\u007f\u000611\r\\5f]RDqaa\u00143\u0001\u0004\u0019\t&\u0001\u0003o_\u0012,\u0007\u0003BB*\u0007+j\u0011AY\u0005\u0004\u0007/\u0012'\u0001\u0002(pI\u0016D\u0011ba\u00173!\u0003\u0005\r!a@\u0002#\u0015D\b/Z2uK\u0012\u0004&/\u001b8dSB\fG.\u0001\u000fqe>\u001cWm]:OKb$(+Z9vKN$H\u0005Z3gCVdG\u000f\n\u001b\u0002-5,G/\u00193bi\u0006\u0014V-];fgR\u0014U/\u001b7eKJ,\"aa\u0019\u0011\t\r\u001541\u000e\b\u0005\u0005+\u001c9'\u0003\u0003\u0004j\t]\u0017aD'fi\u0006$\u0017\r^1SKF,Xm\u001d;\n\t\t\u00058Q\u000e\u0006\u0005\u0007S\u00129.A\bnKR\fG-\u0019;b+B$\u0017\r^3s)\u0011\u0019\u0019h!\u001f\u0011\t\u0005\u00051QO\u0005\u0005\u0007o\n\u0019AA\u000bNC:,\u0018\r\\'fi\u0006$\u0017\r^1Va\u0012\fG/\u001a:\t\u000f\r=S\u00071\u0001\u0004R\u0005\u0001b.Z<OKR<xN]6DY&,g\u000e\u001e\u000b\b\u007f\u000e}41QBD\u0011\u0019\u0019\tI\u000ea\u0001m\u0006a!/Z7pi\u0016\u001cVM\u001d<fe\"11Q\u0011\u001cA\u0002Y\f1\u0002\\8dC2\u001cVM\u001d<fe\"91q\u000e\u001cA\u0002\r%\u0005\u0003BA\u0001\u0007\u0017KAa!$\u0002\u0004\tyQ*\u001a;bI\u0006$\u0018-\u00169eCR,'/A\u0007xC&$hi\u001c:DY&,g\u000e\u001e\u000b\t\u0003W\u001a\u0019j!&\u0004&\"111J\u001cA\u0002}Dqaa&8\u0001\u0004\u0019I*A\u0005qe\u0016$\u0017nY1uKB1!ja'\u0000\u0007?K1a!(L\u0005%1UO\\2uS>t\u0017\u0007E\u0002K\u0007CK1aa)L\u0005\u001d\u0011un\u001c7fC:Dqaa*8\u0001\u0004\u0011\u0019&\u0001\u0007feJ|'/T3tg\u0006<W-\u0001\u0007xC&$hi\u001c:SK\u0006$\u0017\u0010\u0006\u0005\u0002l\r56qVBY\u0011\u0019\u0019Y\u0005\u000fa\u0001\u007f\"1\u0011\u0011\u001c\u001dA\u0002YDqaa\u00149\u0001\u0004\u0019\t&A\bxC&$hi\u001c:SKZ,'o]1m)!\tYga.\u0004:\u000em\u0006BBB&s\u0001\u0007q\u0010\u0003\u0004\u0002Zf\u0002\rA\u001e\u0005\b\u0007\u001fJ\u0004\u0019AB_!\u0011\tyda0\n\t\r\u0005\u0017\u0011\t\u0002\f%\u00164XM]:f\u001d>$W-\u0001\btK:$\u0017I\u001c3SK\u000e,\u0017N^3\u0015\u0019\u0005-4qYBe\u0007\u0017\u001cima7\t\r\r-#\b1\u0001\u0000\u0011\u0019\tIN\u000fa\u0001m\"91q\n\u001eA\u0002\rE\u0003\"CBhuA\u0005\t\u0019ABi\u00039\u0011X-];fgR\u0014U/\u001b7eKJ\u0004Daa5\u0004XB1!1\u001bBp\u0007+\u0004BAa:\u0004X\u0012a1\u0011\\Bg\u0003\u0003\u0005\tQ!\u0001\u0003n\n\u0019q\f\n\u001a\t\u0013\rm#\b%AA\u0002\u0005}\u0018\u0001G:f]\u0012\fe\u000e\u001a*fG\u0016Lg/\u001a\u0013eK\u001a\fW\u000f\u001c;%iU\u00111\u0011\u001d\u0019\u0005\u0007G\u001c9\u000f\u0005\u0004\u0003T\n}7Q\u001d\t\u0005\u0005O\u001c9\u000fB\u0006\u0004Zn\n\t\u0011!A\u0003\u0002\t5\u0018\u0001G:f]\u0012\fe\u000e\u001a*fG\u0016Lg/\u001a\u0013eK\u001a\fW\u000f\u001c;%k\u0005a!/\u001a<feN\fG\u000eR1uCR11q^B~\u0007{\u0004Ba!=\u0004x6\u001111\u001f\u0006\u0004\u0007k\u0014\u0017aB7fgN\fw-Z\u0005\u0005\u0007s\u001c\u0019P\u0001\u000fSKZ,'o]3D_:tWm\u0019;j_:\u0014V-];fgR$\u0015\r^1\t\u000f\u0005%X\b1\u0001\u0002 !1\u0011Q^\u001fA\u0002Y\f\u0011d\u001c8SKZ,'o]3DY&,g\u000e^\"p]:,7\r^5p]R1\u00111\u000eC\u0002\t\u000fAq\u0001\"\u0002?\u0001\u0004\ti$A\u0007t_V\u00148-Z\"iC:tW\r\u001c\u0005\b\t\u0013q\u0004\u0019AB_\u0003-\u0011XM^3sg\u0016tu\u000eZ3\u0002\u0013A\u0014\u0018N\\2ja\u0006dG\u0003\u0002B\u0003\t\u001fAa!!7@\u0001\u00041\u0018\u0001G:ikR$wn\u001e8TKJ4XM]!oI6+GO]5dgR!\u00111\u000eC\u000b\u0011\u0019\tI\u000e\u0011a\u0001m\u0006Ab/\u001a:jMftU\r^<pe.\u001cE.[3oi\u0016k\u0007\u000f^=\u0015\t\u0005-D1\u0004\u0005\u0007\u0007\u0017\n\u0005\u0019A@")
public class ReverseConnectionTest {
    private final CredentialProvider credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames(), null);
    private final Time time = Time.SYSTEM;
    private final Buffer<SocketServer> servers = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final scala.collection.mutable.Map<NetworkClient, Metrics> networkClients = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final scala.collection.mutable.Map<UUID, LinkComponents> kafka$network$ReverseConnectionTest$$links = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final Buffer<KafkaChannel> reverseSourceChannels = (Buffer)Buffer$.MODULE$.apply((Seq)Nil$.MODULE$);
    private final Logger kafkaLogger = LogManager.getLogger((String)"kafka");
    private final Level logLevelToRestore = this.kafkaLogger().getLevel();

    private CredentialProvider credentialProvider() {
        return this.credentialProvider;
    }

    private Time time() {
        return this.time;
    }

    private Buffer<SocketServer> servers() {
        return this.servers;
    }

    private scala.collection.mutable.Map<NetworkClient, Metrics> networkClients() {
        return this.networkClients;
    }

    public scala.collection.mutable.Map<UUID, LinkComponents> kafka$network$ReverseConnectionTest$$links() {
        return this.kafka$network$ReverseConnectionTest$$links;
    }

    private Buffer<KafkaChannel> reverseSourceChannels() {
        return this.reverseSourceChannels;
    }

    private Logger kafkaLogger() {
        return this.kafkaLogger;
    }

    private Level logLevelToRestore() {
        return this.logLevelToRestore;
    }

    @BeforeEach
    public void setUp() {
        TestUtils$.MODULE$.clearYammerMetrics();
        this.kafkaLogger().setLevel(Level.TRACE);
    }

    @AfterEach
    public void tearDown() {
        this.networkClients().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            ReverseConnectionTest.$anonfun$tearDown$1(x0$1);
            return BoxedUnit.UNIT;
        });
        this.servers().foreach((Function1 & Serializable & scala.Serializable)server -> {
            this.shutdownServerAndMetrics(server);
            return BoxedUnit.UNIT;
        });
        this.kafkaLogger().setLevel(this.logLevelToRestore());
        LoginManager.closeAll();
        Configuration.setConfiguration(null);
    }

    @Test
    public void testNetworkClient() {
        int nodeId = 1;
        KafkaConfig config = this.kafkaConfig(nodeId, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        SocketServer server = this.newSocketServer(config);
        Node node = new Node(nodeId, "localhost", server.boundPort(this.listenerName(config)));
        NetworkClient client = this.newNetworkClient(server, server, (MetadataUpdater)this.metadataUpdater(node));
        client.enableDestinationClusterLink(UUID.randomUUID(), null, null);
        this.waitForReady(client, server, node);
        this.sendAndReceive(client, server, node, this.sendAndReceive$default$4(), this.sendAndReceive$default$5());
        client.close(Integer.toString(nodeId));
        this.verifyNetworkClientEmpty(client);
    }

    @Test
    public void testReversePlaintextConnection() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        this.verifyReverseConnection(sourceConfig, destConfig);
    }

    @Test
    public void testReverseSslConnection() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        this.verifyReverseConnection(sourceConfig, destConfig);
    }

    @Test
    public void testReverseSaslPlaintextConnection() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        this.verifyReverseConnection(sourceConfig, destConfig);
    }

    @Test
    public void testReverseSaslSslConnection() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        this.verifyReverseConnection(sourceConfig, destConfig);
    }

    @Test
    public void testReverseConnectionWithPlaintextSourceSslDest() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        this.verifyReverseConnection(sourceConfig, destConfig);
    }

    @Test
    public void testReverseConnectionWithDifferentSaslMechanisms() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, (Option<String>)new Some((Object)"PLAIN"), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, (Option<String>)new Some((Object)"SCRAM-SHA-256"), this.kafkaConfig$default$4());
        this.verifyReverseConnection(sourceConfig, destConfig);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testAuthenticationFailure() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, (Option<String>)new Some((Object)"PLAIN"), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SASL_PLAINTEXT, (Option<String>)new Some((Object)"SCRAM-SHA-256"), this.kafkaConfig$default$4());
        SocketServer sourceServer = this.newSocketServer(sourceConfig);
        SocketServer destServer = this.newSocketServer(destConfig);
        this.credentialProvider().credentialCache().cache("SCRAM-SHA-256", ScramCredential.class).remove(JaasTestUtils$.MODULE$.KafkaScramUser());
        this.credentialProvider().credentialCache().cache("SCRAM-SHA-256", ScramCredential.class).remove(JaasTestUtils$.MODULE$.KafkaScramUser2());
        LinkComponents link = this.createLinkClients(UUID.randomUUID(), sourceServer, destServer);
        ListenerName sourceListenerName = ((EndPoint)sourceServer.config().advertisedListeners().head()).listenerName();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(1), 3).foreach((Function1 & Serializable & scala.Serializable)i -> ReverseConnectionTest.$anonfun$testAuthenticationFailure$1(this, link, destServer, sourceListenerName, sourceServer, BoxesRunTime.unboxToInt((Object)i)));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long l = 15000L;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ReverseConnectionTest.$anonfun$testAuthenticationFailure$2(this, link);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
    }

    @Test
    public void testBrokerInterceptor() {
        Properties extraProps = new Properties();
        TestInterceptor$.MODULE$.reset();
        extraProps.setProperty(KafkaConfig$.MODULE$.BrokerInterceptorClassProp(), TestInterceptor.class.getName());
        int x$1 = 1;
        SecurityProtocol x$2 = SecurityProtocol.SASL_SSL;
        Option<String> x$4 = this.kafkaConfig$default$3();
        KafkaConfig sourceConfig = this.kafkaConfig(x$1, x$2, x$4, extraProps);
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        SocketServer sourceServer = this.newSocketServer(sourceConfig);
        SocketServer destServer = this.newSocketServer(destConfig);
        int numRequests = 10;
        TestPrincipal testPrincipal = new TestPrincipal(this.principal(sourceServer));
        this.verifyReverseConnection(UUID.randomUUID(), sourceServer, destServer, (Option<KafkaPrincipal>)new Some((Object)testPrincipal), numRequests);
        TestInterceptor$.MODULE$.verify(numRequests + 1, testPrincipal);
    }

    @Test
    public void testReverseConnectionsToMultipleServers() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig1 = this.kafkaConfig(2, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig2 = this.kafkaConfig(3, SecurityProtocol.SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        SocketServer sourceServer = this.newSocketServer(sourceConfig);
        SocketServer destServer1 = this.newSocketServer(destConfig1);
        SocketServer destServer2 = this.newSocketServer(destConfig2);
        this.verifyReverseConnection(UUID.randomUUID(), sourceServer, destServer1, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
        this.verifyReverseConnection(UUID.randomUUID(), sourceServer, destServer2, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
    }

    @Test
    public void testReverseConnectionsFromMultipleServers() {
        KafkaConfig config1 = this.kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig config2 = this.kafkaConfig(2, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig config3 = this.kafkaConfig(3, SecurityProtocol.SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        SocketServer server1 = this.newSocketServer(config1);
        SocketServer server2 = this.newSocketServer(config2);
        SocketServer server3 = this.newSocketServer(config3);
        this.verifyReverseConnection(UUID.randomUUID(), server1, server2, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
        this.verifyReverseConnection(UUID.randomUUID(), server3, server2, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
        this.verifyReverseConnection(UUID.randomUUID(), server3, server1, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
    }

    @Test
    public void testMultipleReverseConnectionsBetweenSameServers() {
        KafkaConfig sourceConfig = this.kafkaConfig(1, SecurityProtocol.SASL_PLAINTEXT, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        KafkaConfig destConfig = this.kafkaConfig(2, SecurityProtocol.SASL_SSL, this.kafkaConfig$default$3(), this.kafkaConfig$default$4());
        SocketServer sourceServer = this.newSocketServer(sourceConfig);
        SocketServer destServer = this.newSocketServer(destConfig);
        this.verifyReverseConnection(UUID.randomUUID(), sourceServer, destServer, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
        this.verifyReverseConnection(UUID.randomUUID(), sourceServer, destServer, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testConnectionLimit() {
        Properties extraProps = new Properties();
        extraProps.put(KafkaConfig$.MODULE$.MaxConnectionsPerIpProp(), "1");
        int x$1 = 1;
        SecurityProtocol x$2 = SecurityProtocol.PLAINTEXT;
        Option<String> x$4 = this.kafkaConfig$default$3();
        KafkaConfig sourceConfig = this.kafkaConfig(x$1, x$2, x$4, extraProps);
        SocketServer sourceServer = this.newSocketServer(sourceConfig);
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT);
        try (Socket socket = new Socket("localhost", sourceServer.boundPort(listenerName), null, 0);){
            long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
            long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            if (TestUtils$.MODULE$ == null) {
                throw null;
            }
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!ReverseConnectionTest.$anonfun$testConnectionLimit$1(sourceServer)) {
                void waitUntilTrue_pause;
                void waitUntilTrue_waitTimeMs;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)ReverseConnectionTest.$anonfun$testConnectionLimit$2());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            KafkaChannel channel = (KafkaChannel)EasyMock.createMock(KafkaChannel.class);
            EasyMock.expect((Object)channel.socketAddress()).andReturn((Object)InetAddress.getLoopbackAddress());
            EasyMock.expect((Object)channel.socketDescription()).andReturn((Object)"socket description");
            EasyMock.replay((Object[])new Object[]{channel});
            ReverseNode reverseNode = new ReverseNode(1, 1, "localhost", 0, UUID.randomUUID(), -1, listenerName, KafkaPrincipal.ANONYMOUS);
            Assertions.assertThrows(TooManyConnectionsException.class, () -> sourceServer.reverseAndAdd(listenerName, new ReverseChannel(channel, reverseNode, x$1 -> {})));
        }
    }

    private void verifyReverseConnection(KafkaConfig sourceServerConfig, KafkaConfig destServerConfig) {
        SocketServer sourceServer = this.newSocketServer(sourceServerConfig);
        SocketServer destServer = this.newSocketServer(destServerConfig);
        this.verifyReverseConnection(UUID.randomUUID(), sourceServer, destServer, this.verifyReverseConnection$default$4(), this.verifyReverseConnection$default$5());
        Assertions.assertEquals((int)1, (int)sourceServer.connectionCount(InetAddress.getLoopbackAddress()));
        Assertions.assertEquals((int)0, (int)destServer.connectionCount(InetAddress.getLoopbackAddress()));
        this.kafka$network$ReverseConnectionTest$$links().values().foreach((Function1 & Serializable & scala.Serializable)link -> {
            ReverseConnectionTest.$anonfun$verifyReverseConnection$1(this, link);
            return BoxedUnit.UNIT;
        });
    }

    private LinkComponents createLinkClients(UUID linkId, SocketServer sourceServer, SocketServer destServer) {
        Node sourceNode = new Node(sourceServer.config().brokerId(), "localhost", sourceServer.boundPort(this.listenerName(sourceServer.config())));
        Node destNode = new Node(destServer.config().brokerId(), "localhost", destServer.boundPort(this.listenerName(destServer.config())));
        NetworkClient clientFromSource = this.newNetworkClient(destServer, sourceServer, (MetadataUpdater)this.metadataUpdater(destNode));
        clientFromSource.enableSourceClusterLink(linkId, null, this.reversalData(linkId, sourceServer), (sourceChannel, reverseNode) -> this.onReverseClientConnection(sourceChannel, reverseNode));
        NetworkClient clientFromDest = this.newNetworkClient(sourceServer, destServer, (MetadataUpdater)this.metadataUpdater(sourceNode));
        UUID id = new UUID(linkId.getMostSignificantBits(), linkId.getLeastSignificantBits());
        clientFromDest.enableDestinationClusterLink(id, null, x$2 -> {});
        LinkComponents linkComponents = new LinkComponents(sourceServer, destServer, clientFromSource, clientFromDest);
        this.kafka$network$ReverseConnectionTest$$links().$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)linkId), (Object)linkComponents));
        return linkComponents;
    }

    /*
     * WARNING - void declaration
     */
    private void verifyReverseConnection(UUID linkId, SocketServer sourceServer, SocketServer destServer, Option<KafkaPrincipal> principalOpt, int numRequests) {
        void waitForClient_predicate;
        void waitForClient_predicate2;
        LinkComponents link = this.createLinkClients(linkId, sourceServer, destServer);
        KafkaPrincipal sourcePrincipal = (KafkaPrincipal)principalOpt.getOrElse((Function0 & Serializable & scala.Serializable)() -> this.principal(sourceServer));
        ListenerName sourceListenerName = ((EndPoint)sourceServer.config().advertisedListeners().head()).listenerName();
        ReverseNode reverseDestNode = link.clientFromSource().reverseConnectionManager().createReversibleConnection(123, destServer.config().brokerId(), sourceListenerName, sourcePrincipal, this.time().milliseconds());
        Node sourceNode = new Node(sourceServer.config().brokerId(), "localhost", sourceServer.boundPort(this.listenerName(sourceServer.config())));
        this.waitForReversal(link.clientFromSource(), destServer, reverseDestNode);
        String string = "Reversed node did not send ApiVersions request in destination client";
        Function1 & Serializable & scala.Serializable intersect = (Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionTest.$anonfun$verifyReverseConnection$3(link, sourceNode, x$3));
        NetworkClient waitForClient_client = link.clientFromDest();
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitForClient_waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$waitForClient$1(this, waitForClient_client, (Function1)waitForClient_predicate2)) {
            void waitForClient_waitUntilTrue_pause;
            void waitForClient_waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitForClient_waitUntilTrue_startTime + waitForClient_waitUntilTrue_waitTimeMs) {
                void waitForClient_errorMessage;
                Assertions.fail((String)((String)waitForClient_errorMessage));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitForClient_waitUntilTrue_waitTimeMs), (long)waitForClient_waitUntilTrue_pause));
        }
        this.processNextRequest(sourceServer, link.clientFromDest(), sourceNode, (Option<KafkaPrincipal>)new Some((Object)sourcePrincipal));
        String string2 = "Reversed node not ready in destination client";
        Function1 & Serializable & scala.Serializable intersect2 = (Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionTest.$anonfun$verifyReverseConnection$4(link, sourceNode, x$4));
        NetworkClient waitForClient_client2 = link.clientFromDest();
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitForClient_waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$waitForClient$1(this, waitForClient_client2, (Function1)waitForClient_predicate)) {
            void waitForClient_waitUntilTrue_pause;
            void waitForClient_waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitForClient_waitUntilTrue_startTime2 + waitForClient_waitUntilTrue_waitTimeMs) {
                void waitForClient_errorMessage;
                Assertions.fail((String)((String)waitForClient_errorMessage));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitForClient_waitUntilTrue_waitTimeMs), (long)waitForClient_waitUntilTrue_pause));
        }
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numRequests).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$5 -> {
            NetworkClient x$1 = link.clientFromDest();
            Some x$4 = new Some((Object)sourcePrincipal);
            AbstractRequest.Builder<?> x$52 = this.sendAndReceive$default$4();
            this.sendAndReceive(x$1, sourceServer, sourceNode, x$52, (Option<KafkaPrincipal>)x$4);
        });
    }

    private Option<KafkaPrincipal> verifyReverseConnection$default$4() {
        return None$.MODULE$;
    }

    private int verifyReverseConnection$default$5() {
        return 5;
    }

    private KafkaConfig kafkaConfig(int brokerId, SecurityProtocol securityProtocol, Option<String> saslMechanism, Properties extraProps) {
        Properties properties;
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            properties = this.plaintextServerProps(brokerId);
        } else if (SecurityProtocol.SSL.equals(securityProtocol)) {
            properties = this.sslServerProps(brokerId);
        } else {
            boolean bl = SecurityProtocol.SASL_PLAINTEXT.equals(securityProtocol) ? true : SecurityProtocol.SASL_SSL.equals(securityProtocol);
            if (bl) {
                properties = this.saslServerProps(brokerId, securityProtocol, saslMechanism);
            } else {
                throw new MatchError((Object)securityProtocol);
            }
        }
        properties.setProperty(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), "1");
        properties.setProperty(KafkaConfig$.MODULE$.QueuedMaxRequestsProp(), "50");
        properties.setProperty(KafkaConfig$.MODULE$.SocketRequestMaxBytesProp(), "1000");
        properties.setProperty(KafkaConfig$.MODULE$.MaxConnectionsPerIpProp(), "5");
        properties.setProperty(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), "60000");
        properties.putAll((Map<?, ?>)extraProps);
        return KafkaConfig$.MODULE$.fromProps(properties);
    }

    private Option<String> kafkaConfig$default$3() {
        return None$.MODULE$;
    }

    private Properties kafkaConfig$default$4() {
        return new Properties();
    }

    private Properties plaintextServerProps(int brokerId) {
        String x$2 = TestUtils$.MODULE$.MockZkConnect();
        int x$3 = 0;
        boolean x$4 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$5 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        Option<SecurityProtocol> x$6 = TestUtils$.MODULE$.createBrokerConfig$default$6();
        Option<File> x$7 = TestUtils$.MODULE$.createBrokerConfig$default$7();
        Option<Properties> x$8 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$9 = TestUtils$.MODULE$.createBrokerConfig$default$9();
        boolean x$10 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$11 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$12 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$13 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$14 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$15 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$16 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        int x$17 = TestUtils$.MODULE$.createBrokerConfig$default$17();
        boolean x$18 = TestUtils$.MODULE$.createBrokerConfig$default$18();
        int x$19 = TestUtils$.MODULE$.createBrokerConfig$default$19();
        short x$20 = TestUtils$.MODULE$.createBrokerConfig$default$20();
        return TestUtils$.MODULE$.createBrokerConfig(brokerId, x$2, x$4, x$5, x$3, x$6, x$7, x$8, x$9, x$10, x$11, x$12, x$13, x$14, x$15, x$16, x$17, x$18, x$19, x$20);
    }

    private Properties sslServerProps(int brokerId) {
        File trustStoreFile = File.createTempFile("truststore", ".jks");
        String x$2 = TestUtils$.MODULE$.MockZkConnect();
        Some x$3 = new Some((Object)SecurityProtocol.SSL);
        Some x$4 = new Some((Object)trustStoreFile);
        boolean x$5 = false;
        boolean x$6 = true;
        boolean x$7 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$8 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        int x$9 = TestUtils$.MODULE$.createBrokerConfig$default$5();
        Option<Properties> x$10 = TestUtils$.MODULE$.createBrokerConfig$default$8();
        boolean x$11 = TestUtils$.MODULE$.createBrokerConfig$default$10();
        int x$12 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        int x$13 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        boolean x$14 = TestUtils$.MODULE$.createBrokerConfig$default$14();
        int x$15 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$16 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        int x$17 = TestUtils$.MODULE$.createBrokerConfig$default$17();
        boolean x$18 = TestUtils$.MODULE$.createBrokerConfig$default$18();
        int x$19 = TestUtils$.MODULE$.createBrokerConfig$default$19();
        short x$20 = TestUtils$.MODULE$.createBrokerConfig$default$20();
        return TestUtils$.MODULE$.createBrokerConfig(brokerId, x$2, x$7, x$8, x$9, (Option<SecurityProtocol>)x$3, (Option<File>)x$4, x$10, x$5, x$11, x$12, x$6, x$13, x$14, x$15, x$16, x$17, x$18, x$19, x$20);
    }

    private Properties saslServerProps(int brokerId, SecurityProtocol securityProtocol, Option<String> saslMechanism) {
        String mechanism = (String)saslMechanism.getOrElse((Function0 & Serializable & scala.Serializable)() -> "SCRAM-SHA-256");
        Properties saslProps = JaasTestUtils$.MODULE$.saslConfigs((Option<Properties>)None$.MODULE$);
        saslProps.setProperty(KafkaConfig$.MODULE$.SaslMechanismInterBrokerProtocolProp(), mechanism);
        saslProps.setProperty("sasl.enabled.mechanisms", mechanism);
        saslProps.setProperty(new StringBuilder(16).append(ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol).saslMechanismConfigPrefix(mechanism)).append("sasl.jaas.config").toString(), ((JaasTestUtils.JaasModule)JaasTestUtils$.MODULE$.kafkaServerSection("KafkaServer", (Seq<String>)new .colon.colon((Object)mechanism, (List)Nil$.MODULE$), (Option<File>)None$.MODULE$).modules().head()).toString());
        if (ScramMechanism.isScram((String)mechanism)) {
            this.addScramUser(mechanism, JaasTestUtils$.MODULE$.KafkaScramUser(), JaasTestUtils$.MODULE$.KafkaScramPassword());
            this.addScramUser(mechanism, JaasTestUtils$.MODULE$.KafkaScramUser2(), JaasTestUtils$.MODULE$.KafkaScramPassword2());
            this.addScramUser(mechanism, JaasTestUtils$.MODULE$.KafkaScramAdmin(), JaasTestUtils$.MODULE$.KafkaScramAdminPassword());
        }
        String x$2 = TestUtils$.MODULE$.MockZkConnect();
        Some x$3 = new Some((Object)securityProtocol);
        SecurityProtocol securityProtocol2 = securityProtocol;
        SecurityProtocol securityProtocol3 = SecurityProtocol.SASL_SSL;
        None$ x$4 = !(securityProtocol2 != null ? !securityProtocol2.equals(securityProtocol3) : securityProtocol3 != null) ? new Some((Object)File.createTempFile("truststore", ".jks")) : None$.MODULE$;
        Some x$5 = new Some((Object)saslProps);
        boolean x$6 = false;
        SecurityProtocol securityProtocol4 = securityProtocol;
        SecurityProtocol securityProtocol5 = SecurityProtocol.SASL_PLAINTEXT;
        boolean x$7 = !(securityProtocol4 != null ? !securityProtocol4.equals(securityProtocol5) : securityProtocol5 != null);
        SecurityProtocol securityProtocol6 = securityProtocol;
        SecurityProtocol securityProtocol7 = SecurityProtocol.SASL_SSL;
        boolean x$8 = !(securityProtocol6 != null ? !securityProtocol6.equals(securityProtocol7) : securityProtocol7 != null);
        boolean x$9 = TestUtils$.MODULE$.createBrokerConfig$default$3();
        boolean x$10 = TestUtils$.MODULE$.createBrokerConfig$default$4();
        int x$11 = TestUtils$.MODULE$.createBrokerConfig$default$5();
        int x$12 = TestUtils$.MODULE$.createBrokerConfig$default$11();
        boolean x$13 = TestUtils$.MODULE$.createBrokerConfig$default$12();
        int x$14 = TestUtils$.MODULE$.createBrokerConfig$default$13();
        int x$15 = TestUtils$.MODULE$.createBrokerConfig$default$15();
        Option<String> x$16 = TestUtils$.MODULE$.createBrokerConfig$default$16();
        int x$17 = TestUtils$.MODULE$.createBrokerConfig$default$17();
        boolean x$18 = TestUtils$.MODULE$.createBrokerConfig$default$18();
        int x$19 = TestUtils$.MODULE$.createBrokerConfig$default$19();
        short x$20 = TestUtils$.MODULE$.createBrokerConfig$default$20();
        return TestUtils$.MODULE$.createBrokerConfig(brokerId, x$2, x$9, x$10, x$11, (Option<SecurityProtocol>)x$3, (Option<File>)x$4, (Option<Properties>)x$5, x$6, x$7, x$12, x$13, x$14, x$8, x$15, x$16, x$17, x$18, x$19, x$20);
    }

    private void addScramUser(String mechanism, String username, String password) {
        if (this.credentialProvider().credentialCache().cache(mechanism, ScramCredential.class) == null) {
            ScramCredentialUtils.createCache((CredentialCache)this.credentialProvider().credentialCache(), Collections.singletonList(mechanism));
        }
        ScramMechanism scramMechanism = ScramMechanism.forMechanismName((String)mechanism);
        ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential(password, 4096);
        this.credentialProvider().credentialCache().cache(scramMechanism.mechanismName(), ScramCredential.class).put(username, (Object)credential);
    }

    private ListenerName listenerName(KafkaConfig config) {
        return ((EndPoint)config.listeners().head()).listenerName();
    }

    private SocketServer newSocketServer(KafkaConfig config) {
        ReverseNode.ReverseCallback reverseCallback = new ReverseNode.ReverseCallback(this){
            private final /* synthetic */ ReverseConnectionTest $outer;

            public void onReverseConnection(KafkaChannel channel, ReverseNode reverseNode) {
                ((LinkComponents)this.$outer.kafka$network$ReverseConnectionTest$$links().apply((Object)reverseNode.linkId())).clientFromDest().reverseAndAdd(new ReverseChannel(channel, reverseNode, x$6 -> {}));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        };
        SimpleApiVersionManager apiVersionManager = new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER);
        Metrics x$2 = new Metrics();
        Time x$3 = this.time();
        CredentialProvider x$4 = this.credentialProvider();
        Some x$6 = new Some((Object)reverseCallback);
        AuditLogProvider x$7 = SocketServer$.MODULE$.$lessinit$greater$default$6();
        Option x$8 = SocketServer$.MODULE$.$lessinit$greater$default$8();
        SocketServer server = new SocketServer(config, x$2, x$3, x$4, (ApiVersionManager)apiVersionManager, x$7, (Option)x$6, x$8);
        server.startup(server.startup$default$1(), server.startup$default$2(), server.startup$default$3());
        this.servers().$plus$eq((Object)server);
        return server;
    }

    private ClientRequest newClientRequest(NetworkClient networkClient, String nodeId, AbstractRequest.Builder<?> builder, CompletableFuture<Void> future) {
        RequestCompletionHandler handler = new RequestCompletionHandler(null, future){
            private final CompletableFuture future$1;

            public void onComplete(ClientResponse response) {
                this.future$1.complete(null);
            }
            {
                this.future$1 = future$1;
            }
        };
        return networkClient.newClientRequest(nodeId, builder, this.time().milliseconds(), true, 10000, handler);
    }

    private RequestChannel.Request receiveRequest(RequestChannel channel, long timeout) {
        RequestChannel.BaseRequest baseRequest = channel.receiveRequest(timeout);
        if (!(baseRequest instanceof RequestChannel.Request)) {
            if (RequestChannel.ShutdownRequest$.MODULE$.equals(baseRequest)) {
                throw Assertions$.MODULE$.fail("Unexpected shutdown received", new Position("ReverseConnectionTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 431));
            }
            if (baseRequest == null) {
                throw Assertions$.MODULE$.fail("receiveRequest timed out", new Position("ReverseConnectionTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 432));
            }
            throw new MatchError((Object)baseRequest);
        }
        RequestChannel.Request request = (RequestChannel.Request)baseRequest;
        return request;
    }

    private long receiveRequest$default$2() {
        return 2000L;
    }

    /*
     * WARNING - void declaration
     */
    private void processNextRequest(SocketServer server, NetworkClient client, Node node, Option<KafkaPrincipal> expectedPrincipal) {
        void var11_16;
        void var10_15;
        ApiVersionsResponse apiVersionsResponse;
        None$ none$;
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$processNextRequest$1(this, client, node, server)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionTest.$anonfun$processNextRequest$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        RequestChannel.Request request = this.receiveRequest(server.dataPlaneRequestChannel(), this.receiveRequest$default$2());
        expectedPrincipal.foreach((Function1 & Serializable & scala.Serializable)principal -> {
            ReverseConnectionTest.$anonfun$processNextRequest$3(request, principal);
            return BoxedUnit.UNIT;
        });
        ApiKeys apiKeys = request.header().apiKey();
        if (ApiKeys.API_VERSIONS.equals(apiKeys)) {
            none$ = None$.MODULE$;
            apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse((ApiMessageType.ListenerType)ApiMessageType.ListenerType.ZK_BROKER);
        } else if (ApiKeys.METADATA.equals(apiKeys)) {
            MetadataResponseData data = new MetadataResponseData().setClusterId("clusterId").setControllerId(0).setTopics(new MetadataResponseData.MetadataResponseTopicCollection()).setBrokers(new MetadataResponseData.MetadataResponseBrokerCollection());
            none$ = None$.MODULE$;
            apiVersionsResponse = new MetadataResponse(data, 0);
        } else if (ApiKeys.REVERSE_CONNECTION.equals(apiKeys)) {
            ReverseConnectionRequestData data = ((ReverseConnectionRequest)request.body(ClassTag$.MODULE$.apply(ReverseConnectionRequest.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()))).data();
            ReverseNode node2 = new ReverseNode(data.sourceBrokerId(), data.sourceBrokerId(), data.sourceHost(), data.sourcePort(), new UUID(data.clusterLinkId().getMostSignificantBits(), data.clusterLinkId().getLeastSignificantBits()), data.initiateRequestId(), request.context().listenerName, request.context().principal);
            none$ = new Some((Object)node2);
            apiVersionsResponse = new ReverseConnectionResponse(Errors.NONE, null, 0);
        } else {
            throw new IllegalArgumentException(new StringBuilder(14).append(apiKeys).append(" not supported").toString());
        }
        AbstractResponse response = (AbstractResponse)apiVersionsResponse;
        Option reverseNode = (Option)none$;
        void response2 = var10_15;
        void reverseNode2 = var11_16;
        ByteBuffer byteBuffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)response2, (short)request.header().apiVersion(), (int)request.header().correlationId());
        byteBuffer.rewind();
        NetworkSend send = new NetworkSend(request.context().connectionId, (Send)ByteBufferSend.sizePrefixed((ByteBuffer)byteBuffer));
        RequestChannel.SendResponse responseData = reverseNode2.isEmpty() ? new RequestChannel.SendResponse(request, (Send)send, (Option)None$.MODULE$) : new RequestChannel.SendResponseAndReverse(request, (Send)send, (Option)None$.MODULE$, (ReverseNode)reverseNode2.get());
        server.dataPlaneRequestChannel().sendResponse((RequestChannel.Response)responseData);
    }

    private Option<KafkaPrincipal> processNextRequest$default$4() {
        return None$.MODULE$;
    }

    private MetadataRequest.Builder metadataRequestBuilder() {
        return MetadataRequest.Builder.allTopics();
    }

    /*
     * WARNING - void declaration
     */
    private ManualMetadataUpdater metadataUpdater(Node node) {
        void var2_2;
        ManualMetadataUpdater metadataUpdater = new ManualMetadataUpdater();
        metadataUpdater.setNodes(Collections.singletonList(node));
        return var2_2;
    }

    private NetworkClient newNetworkClient(SocketServer remoteServer, SocketServer localServer, MetadataUpdater metadataUpdater) {
        SecurityProtocol securityProtocol;
        HashMap<String, String> clientProps;
        String clientId;
        LogContext logContext;
        block3: {
            KafkaConfig serverConfig;
            block2: {
                logContext = new LogContext();
                serverConfig = remoteServer.config();
                clientId = new StringBuilder(8).append("client-").append(localServer.config().brokerId()).append("-").append(this.networkClients().size()).toString();
                clientProps = new HashMap<String, String>();
                serverConfig.originals().forEach((k, v) -> clientProps.put((String)k, (String)v));
                EndPoint endpoint = (EndPoint)serverConfig.listeners().head();
                securityProtocol = endpoint.securityProtocol();
                clientProps.put("bootstrap.servers", endpoint.connectionString());
                clientProps.put("security.protocol", securityProtocol.name);
                if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT)) break block2;
                SecurityProtocol securityProtocol2 = securityProtocol;
                SecurityProtocol securityProtocol3 = SecurityProtocol.SASL_SSL;
                if (securityProtocol2 != null ? !securityProtocol2.equals(securityProtocol3) : securityProtocol3 != null) break block3;
            }
            String saslMechanism = serverConfig.saslMechanismInterBrokerProtocol();
            clientProps.put("sasl.mechanism", saslMechanism);
            clientProps.put("sasl.jaas.config", ((JaasTestUtils.JaasModule)JaasTestUtils$.MODULE$.kafkaClientSection((Option<String>)new Some((Object)saslMechanism), (Option<File>)None$.MODULE$).modules().head()).toString());
        }
        AdminClientConfig clientConfig = new AdminClientConfig(clientProps);
        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)securityProtocol, (JaasContext.Type)JaasContext.Type.CLIENT, (AbstractConfig)clientConfig, null, (String)clientConfig.getString("sasl.mechanism"), (Time)this.time(), (boolean)true, (LogContext)logContext);
        Metrics metrics = new Metrics();
        Selector selector = new Selector(-1, Predef$.MODULE$.Long2long(clientConfig.getLong("connections.max.idle.ms")), metrics, this.time(), clientId, Collections.emptyMap(), false, channelBuilder, logContext);
        NetworkClient networkClient = new NetworkClient(metadataUpdater, null, (Selectable)selector, clientId, 1, 50L, 50L, -1, Predef$.MODULE$.Integer2int(clientConfig.getInt("receive.buffer.bytes")), Predef$.MODULE$.Integer2int(clientConfig.getInt("request.timeout.ms")), Predef$.MODULE$.Long2long(clientConfig.getLong("socket.connection.setup.timeout.ms")), Predef$.MODULE$.Long2long(clientConfig.getLong("socket.connection.setup.timeout.max.ms")), this.time(), true, new ApiVersions(), null, new LogContext());
        this.networkClients().$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)networkClient), (Object)metrics));
        return networkClient;
    }

    /*
     * WARNING - void declaration
     */
    private void waitForClient(NetworkClient client, Function1<NetworkClient, Object> predicate, String errorMessage) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$waitForClient$1(this, client, predicate)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)errorMessage);
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    /*
     * WARNING - void declaration
     */
    private void waitForReady(NetworkClient client, SocketServer server, Node node) {
        Assertions.assertFalse((boolean)client.ready(node, this.time().milliseconds()), (String)"Client ready before poll");
        this.processNextRequest(server, client, node, this.processNextRequest$default$4());
        String string = "Node not ready";
        Function1 & Serializable & scala.Serializable waitForClient_predicate = (Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)x$8.ready(node, this.time().milliseconds()));
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitForClient_waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$waitForClient$1(this, client, waitForClient_predicate)) {
            void waitForClient_waitUntilTrue_pause;
            void waitForClient_waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitForClient_waitUntilTrue_startTime + waitForClient_waitUntilTrue_waitTimeMs) {
                void waitForClient_errorMessage;
                Assertions.fail((String)((String)waitForClient_errorMessage));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitForClient_waitUntilTrue_waitTimeMs), (long)waitForClient_waitUntilTrue_pause));
        }
    }

    /*
     * WARNING - void declaration
     */
    private void waitForReversal(NetworkClient client, SocketServer server, ReverseNode node) {
        Assertions.assertFalse((boolean)client.ready((Node)node, this.time().milliseconds()), (String)"Client ready before poll");
        this.processNextRequest(server, client, (Node)node, this.processNextRequest$default$4());
        this.processNextRequest(server, client, (Node)node, this.processNextRequest$default$4());
        CompletableFuture future = node.future();
        String string = "Reversal not complete";
        Function1 & Serializable & scala.Serializable waitForClient_predicate = (Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)future.isDone());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitForClient_waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$waitForClient$1(this, client, waitForClient_predicate)) {
            void waitForClient_waitUntilTrue_pause;
            void waitForClient_waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitForClient_waitUntilTrue_startTime + waitForClient_waitUntilTrue_waitTimeMs) {
                void waitForClient_errorMessage;
                Assertions.fail((String)((String)waitForClient_errorMessage));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitForClient_waitUntilTrue_waitTimeMs), (long)waitForClient_waitUntilTrue_pause));
        }
        future.get();
    }

    /*
     * WARNING - void declaration
     */
    private void sendAndReceive(NetworkClient client, SocketServer server, Node node, AbstractRequest.Builder<?> requestBuilder, Option<KafkaPrincipal> expectedPrincipal) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ClientRequest request = this.newClientRequest(client, node.idString(), requestBuilder, future);
        client.send(request, this.time().milliseconds());
        this.processNextRequest(server, client, node, expectedPrincipal);
        String string = "Response not processed";
        Function1 & Serializable & scala.Serializable waitForClient_predicate = (Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToBoolean((boolean)future.isDone());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitForClient_waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionTest.$anonfun$waitForClient$1(this, client, waitForClient_predicate)) {
            void waitForClient_waitUntilTrue_pause;
            void waitForClient_waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitForClient_waitUntilTrue_startTime + waitForClient_waitUntilTrue_waitTimeMs) {
                void waitForClient_errorMessage;
                Assertions.fail((String)((String)waitForClient_errorMessage));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitForClient_waitUntilTrue_waitTimeMs), (long)waitForClient_waitUntilTrue_pause));
        }
    }

    private AbstractRequest.Builder<?> sendAndReceive$default$4() {
        return this.metadataRequestBuilder();
    }

    private Option<KafkaPrincipal> sendAndReceive$default$5() {
        return None$.MODULE$;
    }

    private ReverseConnectionRequestData reversalData(UUID linkId, SocketServer sourceServer) {
        EndPoint endpoint = (EndPoint)sourceServer.config().listeners().head();
        return new ReverseConnectionRequestData().setClusterLinkId(new Uuid(linkId.getMostSignificantBits(), linkId.getLeastSignificantBits())).setTargetClusterId("destClusterId").setSourceClusterId("sourceClusterId").setSourceBrokerId(sourceServer.config().brokerId()).setSourceHost(endpoint.host()).setSourcePort(endpoint.port());
    }

    private void onReverseClientConnection(KafkaChannel sourceChannel, ReverseNode reverseNode) {
        ((LinkComponents)this.kafka$network$ReverseConnectionTest$$links().apply((Object)reverseNode.linkId())).sourceServer().reverseAndAdd(reverseNode.localListenerName(), new ReverseChannel(sourceChannel, reverseNode, x$11 -> {}));
        this.reverseSourceChannels().$plus$eq((Object)sourceChannel);
    }

    private KafkaPrincipal principal(SocketServer server) {
        KafkaPrincipal kafkaPrincipal;
        SecurityProtocol securityProtocol = ((EndPoint)server.config().listeners().head()).securityProtocol();
        if (SecurityProtocol.PLAINTEXT.equals(securityProtocol)) {
            kafkaPrincipal = KafkaPrincipal.ANONYMOUS;
        } else if (SecurityProtocol.SSL.equals(securityProtocol)) {
            kafkaPrincipal = new KafkaPrincipal("User", "CN=test");
        } else {
            String string = server.config().saslMechanismInterBrokerProtocol();
            String string2 = "PLAIN";
            String userName = string != null && string.equals(string2) ? JaasTestUtils$.MODULE$.KafkaPlainUser() : JaasTestUtils$.MODULE$.KafkaScramUser();
            kafkaPrincipal = new KafkaPrincipal("User", userName);
        }
        return kafkaPrincipal;
    }

    private void shutdownServerAndMetrics(SocketServer server) {
        server.shutdown();
        server.metrics().close();
    }

    private void verifyNetworkClientEmpty(NetworkClient client) {
        client.poll(0L, this.time().milliseconds());
        TestUtils.verifyEmptyFields((Object)client, (int)2, (String[])new String[]{"metadataUpdater"});
    }

    public static final /* synthetic */ void $anonfun$tearDown$1(Tuple2 x0$1) {
        if (x0$1 != null) {
            NetworkClient client = (NetworkClient)x0$1._1();
            Metrics metrics = (Metrics)x0$1._2();
            client.close();
            metrics.close();
            return;
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ ReverseNode $anonfun$testAuthenticationFailure$1(ReverseConnectionTest $this, LinkComponents link$1, SocketServer destServer$1, ListenerName sourceListenerName$1, SocketServer sourceServer$1, int i) {
        return link$1.clientFromSource().reverseConnectionManager().createReversibleConnection(i, destServer$1.config().brokerId(), sourceListenerName$1, $this.principal(sourceServer$1), $this.time().milliseconds());
    }

    public static final /* synthetic */ void $anonfun$testAuthenticationFailure$2(ReverseConnectionTest $this, LinkComponents link$1) {
        link$1.clientFromSource().poll(1L, $this.time().milliseconds());
        $this.verifyNetworkClientEmpty(link$1.clientFromSource());
    }

    public static final /* synthetic */ boolean $anonfun$testConnectionLimit$1(SocketServer sourceServer$2) {
        return sourceServer$2.connectionCount(InetAddress.getLoopbackAddress()) > 0;
    }

    public static final /* synthetic */ String $anonfun$testConnectionLimit$2() {
        return "Connection not created";
    }

    public static final /* synthetic */ void $anonfun$verifyReverseConnection$1(ReverseConnectionTest $this, LinkComponents link) {
        $this.verifyNetworkClientEmpty(link.clientFromSource());
        link.clientFromDest().close(link.clientFromDest().leastLoadedNode($this.time().milliseconds()).idString());
        $this.verifyNetworkClientEmpty(link.clientFromDest());
    }

    public static final /* synthetic */ boolean $anonfun$verifyReverseConnection$3(LinkComponents link$2, Node sourceNode$1, NetworkClient x$3) {
        return link$2.clientFromDest().inFlightRequestCount(sourceNode$1.idString()) > 0;
    }

    public static final /* synthetic */ boolean $anonfun$verifyReverseConnection$4(LinkComponents link$2, Node sourceNode$1, NetworkClient x$4) {
        return link$2.clientFromDest().isReady(sourceNode$1, System.currentTimeMillis());
    }

    public static final /* synthetic */ boolean $anonfun$processNextRequest$1(ReverseConnectionTest $this, NetworkClient client$1, Node node$1, SocketServer server$1) {
        client$1.poll(1L, $this.time().milliseconds());
        Assertions.assertNull((Object)client$1.authenticationException(node$1));
        return server$1.dataPlaneRequestChannel().requestQueueSize() > 0;
    }

    public static final /* synthetic */ String $anonfun$processNextRequest$2() {
        return "Request did not arrive on server";
    }

    public static final /* synthetic */ void $anonfun$processNextRequest$3(RequestChannel.Request request$1, KafkaPrincipal principal) {
        Assertions.assertEquals((Object)principal, (Object)request$1.session().principal());
    }

    public static final /* synthetic */ boolean $anonfun$waitForClient$1(ReverseConnectionTest $this, NetworkClient client$2, Function1 predicate$1) {
        client$2.poll(1000L, $this.time().milliseconds());
        return BoxesRunTime.unboxToBoolean((Object)predicate$1.apply((Object)client$2));
    }
}

