/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.net.Socket;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import kafka.api.IntegrationTestHarness;
import kafka.server.BaseRequestTest;
import kafka.server.KafkaServer;
import kafka.server.ReverseConnectionRequestTest$LinkInfo$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestConnectionManager;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ConnectionMode;
import kafka.server.link.LinkMode;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateClusterLinksResponseData;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.CreateClusterLinksRequest;
import org.apache.kafka.common.requests.CreateClusterLinksResponse;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.InitiateReverseConnectionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t\u0015d\u0001\u0002\u001a4\u0001aBQ!\u0010\u0001\u0005\u0002yB\u0011\u0002\u0011\u0001A\u0002\u0003\u0005\u000b\u0015B!\t\u000f5\u0003!\u0019!C!\u001d\"1Q\u000b\u0001Q\u0001\n=CqA\u0016\u0001C\u0002\u0013%q\u000b\u0003\u0004i\u0001\u0001\u0006I\u0001\u0017\u0005\bS\u0002\u0011\r\u0011\"\u0003k\u0011\u0019\t\b\u0001)A\u0005W\u001a!!\u000f\u0001!t\u0011!i\u0018B!f\u0001\n\u0003q\b\"CA\n\u0013\tE\t\u0015!\u0003\u0000\u0011)\t)\"\u0003BK\u0002\u0013\u0005\u0011q\u0003\u0005\u000b\u0003KI!\u0011#Q\u0001\n\u0005e\u0001\"CA\u0014\u0013\tU\r\u0011\"\u0001\u007f\u0011%\tI#\u0003B\tB\u0003%q\u0010C\u0005\u0002,%\u0011)\u001a!C\u0001\u001d\"I\u0011QF\u0005\u0003\u0012\u0003\u0006Ia\u0014\u0005\u0007{%!\t!a\f\t\u0013\u0005u\u0012\"!A\u0005\u0002\u0005}\u0002\"CA%\u0013E\u0005I\u0011AA&\u0011%\t\t'CI\u0001\n\u0003\t\u0019\u0007C\u0005\u0002h%\t\n\u0011\"\u0001\u0002L!I\u0011\u0011N\u0005\u0012\u0002\u0013\u0005\u00111\u000e\u0005\t\u0003_J\u0011\u0011!C!U\"A\u0011\u0011O\u0005\u0002\u0002\u0013\u0005a\nC\u0005\u0002t%\t\t\u0011\"\u0001\u0002v!I\u0011\u0011Q\u0005\u0002\u0002\u0013\u0005\u00131\u0011\u0005\n\u0003\u001bK\u0011\u0011!C\u0001\u0003\u001fC\u0011\"!'\n\u0003\u0003%\t%a'\t\u0013\u0005u\u0015\"!A\u0005B\u0005}\u0005\"CAQ\u0013\u0005\u0005I\u0011IAR\u000f%\t9\u000bAA\u0001\u0012\u0003\tIK\u0002\u0005s\u0001\u0005\u0005\t\u0012AAV\u0011\u0019i\u0014\u0005\"\u0001\u0002:\"I\u0011QT\u0011\u0002\u0002\u0013\u0015\u0013q\u0014\u0005\n\u0003w\u000b\u0013\u0011!CA\u0003{C\u0011\"a2\"\u0003\u0003%\t)!3\t\u000f\u0005m\u0007\u0001\"\u0011\u0002^\"9\u0011\u0011\u001f\u0001\u0005B\u0005M\bbBA\u007f\u0001\u0011\u0005\u00111\u001f\u0005\b\u0005\u000f\u0001A\u0011AAz\u0011\u001d\u0011Y\u0001\u0001C\u0001\u0003gDqAa\u0004\u0001\t\u0003\t\u0019\u0010C\u0004\u0003\u0014\u0001!IA!\u0006\t\u000f\t\r\u0002\u0001\"\u0003\u0003&!9!Q\t\u0001\u0005\n\t\u001d\u0003\"\u0003B0\u0001E\u0005I\u0011BA&\u0011%\u0011\t\u0007AI\u0001\n\u0013\tY\u0007C\u0005\u0003d\u0001\t\n\u0011\"\u0003\u0002l\ta\"+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8SKF,Xm\u001d;UKN$(B\u0001\u001b6\u0003\u0019\u0019XM\u001d<fe*\ta'A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001I\u0004C\u0001\u001e<\u001b\u0005\u0019\u0014B\u0001\u001f4\u0005=\u0011\u0015m]3SKF,Xm\u001d;UKN$\u0018A\u0002\u001fj]&$h\bF\u0001@!\tQ\u0004!A\u0005`i\u0016\u001cH/\u00138g_B\u0011!iS\u0007\u0002\u0007*\u0011A)R\u0001\u0004CBL'B\u0001$H\u0003\u001dQW\u000f]5uKJT!\u0001S%\u0002\u000b),h.\u001b;\u000b\u0003)\u000b1a\u001c:h\u0013\ta5I\u0001\u0005UKN$\u0018J\u001c4p\u0003-\u0011'o\\6fe\u000e{WO\u001c;\u0016\u0003=\u0003\"\u0001U*\u000e\u0003ES\u0011AU\u0001\u0006g\u000e\fG.Y\u0005\u0003)F\u00131!\u00138u\u00031\u0011'o\\6fe\u000e{WO\u001c;!\u0003\u001d\u0019xnY6fiN,\u0012\u0001\u0017\t\u00043z\u0003W\"\u0001.\u000b\u0005mc\u0016aB7vi\u0006\u0014G.\u001a\u0006\u0003;F\u000b!bY8mY\u0016\u001cG/[8o\u0013\ty&L\u0001\u0004Ck\u001a4WM\u001d\t\u0003C\u001al\u0011A\u0019\u0006\u0003G\u0012\f1A\\3u\u0015\u0005)\u0017\u0001\u00026bm\u0006L!a\u001a2\u0003\rM{7m[3u\u0003!\u0019xnY6fiN\u0004\u0013\u0001\u00037j].t\u0015-\\3\u0016\u0003-\u0004\"\u0001\\8\u000e\u00035T!A\u001c3\u0002\t1\fgnZ\u0005\u0003a6\u0014aa\u0015;sS:<\u0017!\u00037j].t\u0015-\\3!\u0005!a\u0015N\\6J]\u001a|7\u0003B\u0005uoj\u0004\"\u0001U;\n\u0005Y\f&AB!osJ+g\r\u0005\u0002Qq&\u0011\u00110\u0015\u0002\b!J|G-^2u!\t\u000160\u0003\u0002}#\na1+\u001a:jC2L'0\u00192mK\u0006!a.Y7f+\u0005y\b\u0003BA\u0001\u0003\u001fqA!a\u0001\u0002\fA\u0019\u0011QA)\u000e\u0005\u0005\u001d!bAA\u0005o\u00051AH]8pizJ1!!\u0004R\u0003\u0019\u0001&/\u001a3fM&\u0019\u0001/!\u0005\u000b\u0007\u00055\u0011+A\u0003oC6,\u0007%\u0001\u0002jIV\u0011\u0011\u0011\u0004\t\u0005\u00037\t\t#\u0004\u0002\u0002\u001e)\u0019\u0011q\u00043\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003G\tiB\u0001\u0003V+&#\u0015aA5eA\u0005y!/Z7pi\u0016\u001cE.^:uKJLE-\u0001\tsK6|G/Z\"mkN$XM]%eA\u0005q!/Z7pi\u0016\u0014%o\\6fe&#\u0017a\u0004:f[>$XM\u0011:pW\u0016\u0014\u0018\n\u001a\u0011\u0015\u0015\u0005E\u0012QGA\u001c\u0003s\tY\u0004E\u0002\u00024%i\u0011\u0001\u0001\u0005\u0006{J\u0001\ra \u0005\b\u0003+\u0011\u0002\u0019AA\r\u0011\u0019\t9C\u0005a\u0001\u007f\"1\u00111\u0006\nA\u0002=\u000bAaY8qsRQ\u0011\u0011GA!\u0003\u0007\n)%a\u0012\t\u000fu\u001c\u0002\u0013!a\u0001\u007f\"I\u0011QC\n\u0011\u0002\u0003\u0007\u0011\u0011\u0004\u0005\t\u0003O\u0019\u0002\u0013!a\u0001\u007f\"A\u00111F\n\u0011\u0002\u0003\u0007q*\u0001\bd_BLH\u0005Z3gCVdG\u000fJ\u0019\u0016\u0005\u00055#fA@\u0002P-\u0012\u0011\u0011\u000b\t\u0005\u0003'\ni&\u0004\u0002\u0002V)!\u0011qKA-\u0003%)hn\u00195fG.,GMC\u0002\u0002\\E\u000b!\"\u00198o_R\fG/[8o\u0013\u0011\ty&!\u0016\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-\u0001\bd_BLH\u0005Z3gCVdG\u000f\n\u001a\u0016\u0005\u0005\u0015$\u0006BA\r\u0003\u001f\nabY8qs\u0012\"WMZ1vYR$3'\u0001\bd_BLH\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\u00055$fA(\u0002P\u0005i\u0001O]8ek\u000e$\bK]3gSb\fA\u0002\u001d:pIV\u001cG/\u0011:jif\fa\u0002\u001d:pIV\u001cG/\u00127f[\u0016tG\u000f\u0006\u0003\u0002x\u0005u\u0004c\u0001)\u0002z%\u0019\u00111P)\u0003\u0007\u0005s\u0017\u0010\u0003\u0005\u0002\u0000i\t\t\u00111\u0001P\u0003\rAH%M\u0001\u0010aJ|G-^2u\u0013R,'/\u0019;peV\u0011\u0011Q\u0011\t\u0007\u0003\u000f\u000bI)a\u001e\u000e\u0003qK1!a#]\u0005!IE/\u001a:bi>\u0014\u0018\u0001C2b]\u0016\u000bX/\u00197\u0015\t\u0005E\u0015q\u0013\t\u0004!\u0006M\u0015bAAK#\n9!i\\8mK\u0006t\u0007\"CA@9\u0005\u0005\t\u0019AA<\u0003!A\u0017m\u001d5D_\u0012,G#A(\u0002\u0011Q|7\u000b\u001e:j]\u001e$\u0012a[\u0001\u0007KF,\u0018\r\\:\u0015\t\u0005E\u0015Q\u0015\u0005\n\u0003\u007fz\u0012\u0011!a\u0001\u0003o\n\u0001\u0002T5oW&sgm\u001c\t\u0004\u0003g\t3\u0003B\u0011\u0002.j\u00042\"a,\u00026~\fIb`(\u000225\u0011\u0011\u0011\u0017\u0006\u0004\u0003g\u000b\u0016a\u0002:v]RLW.Z\u0005\u0005\u0003o\u000b\tLA\tBEN$(/Y2u\rVt7\r^5p]R\"\"!!+\u0002\u000b\u0005\u0004\b\u000f\\=\u0015\u0015\u0005E\u0012qXAa\u0003\u0007\f)\rC\u0003~I\u0001\u0007q\u0010C\u0004\u0002\u0016\u0011\u0002\r!!\u0007\t\r\u0005\u001dB\u00051\u0001\u0000\u0011\u0019\tY\u0003\na\u0001\u001f\u00069QO\\1qa2LH\u0003BAf\u0003/\u0004R\u0001UAg\u0003#L1!a4R\u0005\u0019y\u0005\u000f^5p]BA\u0001+a5\u0000\u00033yx*C\u0002\u0002VF\u0013a\u0001V;qY\u0016$\u0004\"CAmK\u0005\u0005\t\u0019AA\u0019\u0003\rAH\u0005M\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0005\u0003?\f)\u000fE\u0002Q\u0003CL1!a9R\u0005\u0011)f.\u001b;\t\r\u0005\u001dh\u00051\u0001B\u0003!!Xm\u001d;J]\u001a|\u0007f\u0001\u0014\u0002lB\u0019!)!<\n\u0007\u0005=8I\u0001\u0006CK\u001a|'/Z#bG\"\f\u0001\u0002^3be\u0012{wO\u001c\u000b\u0003\u0003?D3aJA|!\r\u0011\u0015\u0011`\u0005\u0004\u0003w\u001c%!C!gi\u0016\u0014X)Y2i\u0003U!Xm\u001d;SKZ,'o]3D_:tWm\u0019;j_:D3\u0001\u000bB\u0001!\r\u0011%1A\u0005\u0004\u0005\u000b\u0019%\u0001\u0002+fgR\fQ\u0004^3ti&s\u0017\u000e^5bi\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u0015\u0004S\t\u0005\u0011!\u0005;fgRtu\u000e^\"p]R\u0014x\u000e\u001c7fe\"\u001a!F!\u0001\u0002/Q,7\u000f^\"mkN$XM\u001d'j].tu\u000e\u001e$pk:$\u0007fA\u0016\u0003\u0002\u0005Q1M]3bi\u0016d\u0015N\\6\u0015\t\u0005E\"q\u0003\u0005\b\u00053a\u0003\u0019\u0001B\u000e\u00031\u0019x.\u001e:dKN+'O^3s!\u0015\u0001\u0016Q\u001aB\u000f!\rQ$qD\u0005\u0004\u0005C\u0019$aC&bM.\f7+\u001a:wKJ\f\u0011$\u001b8ji&\fG/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]R1!q\u0005B\u001f\u0005\u0003\u0002BA!\u000b\u0003:5\u0011!1\u0006\u0006\u0005\u0005[\u0011y#\u0001\u0005sKF,Xm\u001d;t\u0015\u0011\u0011\tDa\r\u0002\r\r|W.\\8o\u0015\r1$Q\u0007\u0006\u0004\u0005oI\u0015AB1qC\u000eDW-\u0003\u0003\u0003<\t-\"AI%oSRL\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\u001c(+Z:q_:\u001cX\r\u0003\u0004\u0003@5\u0002\r\u0001Y\u0001\u0007g>\u001c7.\u001a;\t\u000f\t\rS\u00061\u0001\u00022\u0005!A.\u001b8l\u0003E\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u000b\r\u0005\u0013\u0012yE!\u0015\u0003T\t]#1\f\t\u0005\u0005S\u0011Y%\u0003\u0003\u0003N\t-\"!\u0007*fm\u0016\u00148/Z\"p]:,7\r^5p]J+7\u000f]8og\u0016DaAa\u0010/\u0001\u0004\u0001\u0007b\u0002B\"]\u0001\u0007\u0011\u0011\u0007\u0005\t\u0005+r\u0003\u0013!a\u0001\u007f\u0006Q1o\\;sG\u0016Dun\u001d;\t\u0011\tec\u0006%AA\u0002=\u000b!b]8ve\u000e,\u0007k\u001c:u\u0011!\u0011iF\fI\u0001\u0002\u0004y\u0015!E5oSRL\u0017\r^3SKF,Xm\u001d;JI\u0006Y\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8%I\u00164\u0017-\u001e7uIM\n1D]3wKJ\u001cXmQ8o]\u0016\u001cG/[8oI\u0011,g-Y;mi\u0012\"\u0014a\u0007:fm\u0016\u00148/Z\"p]:,7\r^5p]\u0012\"WMZ1vYR$S\u0007")
public class ReverseConnectionRequestTest
extends BaseRequestTest {
    private volatile ReverseConnectionRequestTest$LinkInfo$ LinkInfo$module;
    private TestInfo _testInfo;
    private final int brokerCount;
    private final Buffer<Socket> sockets = (Buffer)Buffer$.MODULE$.empty();
    private final String linkName;

    public ReverseConnectionRequestTest$LinkInfo$ LinkInfo() {
        if (this.LinkInfo$module == null) {
            this.LinkInfo$lzycompute$1();
        }
        return this.LinkInfo$module;
    }

    @Override
    public int brokerCount() {
        return this.brokerCount;
    }

    private Buffer<Socket> sockets() {
        return this.sockets;
    }

    private String linkName() {
        return this.linkName;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        super.setUp(testInfo);
    }

    @Override
    @AfterEach
    public void tearDown() {
        this.sockets().foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
            x$1.close();
            return BoxedUnit.UNIT;
        });
        super.tearDown();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testReverseConnection() {
        LinkInfo link = this.createLink((Option<KafkaServer>)None$.MODULE$);
        KafkaServer controllerServer = (KafkaServer)((IterableLike)this.servers().filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testReverseConnection$1(x$2)))).head();
        ClusterLinkDestConnectionManager connectionManager = (ClusterLinkDestConnectionManager)controllerServer.clusterLinkManager().connectionManager(link.id()).get();
        Socket socket = this.connect(controllerServer.socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        String sourceHost = "127.0.0.1";
        int sourcePort = socket.getLocalPort();
        ReverseConnectionResponse response = this.reverseConnection(socket, link, sourceHost, sourcePort, this.reverseConnection$default$5());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        NetworkClient networkClient = (NetworkClient)connectionManager.reverseConnectionClient().get();
        String channelId = Integer.toString(link.remoteBrokerId());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!networkClient.hasInFlightRequests(channelId)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionRequestTest.$anonfun$testReverseConnection$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Selector selector = (Selector)TestUtils.fieldValue((Object)networkClient, NetworkClient.class, (String)"selector");
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!selector.isChannelReady(channelId)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionRequestTest.$anonfun$testReverseConnection$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        KafkaChannel channel = selector.channel(channelId);
        Assertions.assertNotNull((Object)channel);
        Assertions.assertEquals((Object)channel.socketChannel().getRemoteAddress(), (Object)socket.getLocalSocketAddress());
        Assertions.assertEquals((Object)channel.socketChannel().getLocalAddress(), (Object)socket.getRemoteSocketAddress());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testInitiateReverseConnection() {
        void runWithRemoteCluster_testInfo;
        TestInfo testInfo = this._testInfo;
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        IntegrationTestHarness runWithRemoteCluster_remoteCluster = new IntegrationTestHarness(){

            public int brokerCount() {
                return 1;
            }
        };
        runWithRemoteCluster_remoteCluster.setUp((TestInfo)runWithRemoteCluster_testInfo);
        try {
            IntegrationTestHarness integrationTestHarness = runWithRemoteCluster_remoteCluster;
            ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$1(this, integrationTestHarness);
        }
        finally {
            runWithRemoteCluster_remoteCluster.tearDown();
        }
    }

    @Test
    public void testNotController() {
        LinkInfo link = this.createLink((Option<KafkaServer>)None$.MODULE$);
        Socket socket = this.connect(((KafkaServer)((IterableLike)this.servers().filterNot((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testNotController$1(x$6)))).head()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        InitiateReverseConnectionsResponse response = this.initiateReverseConnection(socket, link);
        Assertions.assertEquals(Collections.singletonMap(Errors.NOT_CONTROLLER, BoxesRunTime.boxToInteger((int)1)), (Object)response.errorCounts());
        String sourceHost = "127.0.0.1";
        int sourcePort = socket.getLocalPort();
        ReverseConnectionResponse response2 = this.reverseConnection(socket, link, sourceHost, sourcePort, this.reverseConnection$default$5());
        Assertions.assertEquals((Object)Errors.NOT_CONTROLLER, (Object)response2.error());
    }

    @Test
    public void testClusterLinkNotFound() {
        Socket socket = this.connect(((KafkaServer)this.servers().find((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testClusterLinkNotFound$1(x$7))).get()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        LinkInfo link = new LinkInfo(this, "nonexistent", UUID.randomUUID(), "somehost", 1234);
        InitiateReverseConnectionsResponse initiateResponse = this.initiateReverseConnection(socket, link);
        Assertions.assertEquals(Collections.singletonMap(Errors.CLUSTER_LINK_NOT_FOUND, BoxesRunTime.boxToInteger((int)1)), (Object)initiateResponse.errorCounts());
        ReverseConnectionResponse reverseResponse = this.reverseConnection(socket, link, this.reverseConnection$default$3(), this.reverseConnection$default$4(), this.reverseConnection$default$5());
        Assertions.assertEquals((Object)Errors.CLUSTER_LINK_NOT_FOUND, (Object)reverseResponse.error());
    }

    /*
     * WARNING - void declaration
     */
    private LinkInfo createLink(Option<KafkaServer> sourceServer) {
        String sourceClusterId = (String)sourceServer.map((Function1 & Serializable & scala.Serializable)x$8 -> x$8.clusterId()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "sourceCluster1");
        int sourceBrokerId = BoxesRunTime.unboxToInt((Object)sourceServer.map((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToInteger((int)ReverseConnectionRequestTest.$anonfun$createLink$3(x$9))).getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 12));
        int sourcePort = BoxesRunTime.unboxToInt((Object)sourceServer.map((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToInteger((int)ReverseConnectionRequestTest.$anonfun$createLink$5(this, x$10))).getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 1234));
        scala.collection.immutable.Map linkConfigs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringBuilder(10).append("localhost:").append(sourcePort).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConnectionModeProp()), (Object)ConnectionMode.Inbound$.MODULE$.name())}));
        NewClusterLink newClusterLink = new NewClusterLink(this.linkName(), sourceClusterId, (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)linkConfigs).asJava());
        CreateClusterLinksRequest createRequest = (CreateClusterLinksRequest)new CreateClusterLinksRequest.Builder(Collections.singletonList(newClusterLink), false, false, 10000).build();
        KafkaServer controller = (KafkaServer)((IterableLike)this.servers().filter((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$createLink$7(x$11)))).head();
        CreateClusterLinksResponseData.EntryData response = (CreateClusterLinksResponseData.EntryData)((CreateClusterLinksResponse)this.connectAndReceive((AbstractRequest)createRequest, controller.socketServer(), this.connectAndReceive$default$3(), ClassTag$.MODULE$.apply(CreateClusterLinksResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()))).data().entries().get(0);
        Assertions.assertEquals((Object)"", (Object)response.errorMessage());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)response.errorCode());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionRequestTest.$anonfun$createLink$8(this)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionRequestTest.$anonfun$createLink$10());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        UUID linkId = (UUID)controller.clusterLinkManager().resolveLinkId(this.linkName()).get();
        return new LinkInfo(this, this.linkName(), linkId, sourceClusterId, sourceBrokerId);
    }

    private InitiateReverseConnectionsResponse initiateReverseConnection(Socket socket, LinkInfo link) {
        InitiateReverseConnectionsRequestData.EntryData connData = new InitiateReverseConnectionsRequestData.EntryData().setTargetBrokerId(((KafkaServer)this.servers().head()).config().brokerId()).setSourceBrokerId(link.remoteBrokerId()).setInitiateRequestId(1);
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(new Uuid(link.id().getMostSignificantBits(), link.id().getLeastSignificantBits())).setTargetClusterId(((KafkaServer)this.servers().head()).clusterId()).setSourceClusterId(link.remoteClusterId()).setForwardToBroker(true).setEntries(Collections.singletonList(connData));
        InitiateReverseConnectionsRequest reverseRequest = (InitiateReverseConnectionsRequest)new InitiateReverseConnectionsRequest.Builder(requestData).build();
        return (InitiateReverseConnectionsResponse)this.sendAndReceive((AbstractRequest)reverseRequest, socket, this.sendAndReceive$default$3(), this.sendAndReceive$default$4(), ClassTag$.MODULE$.apply(InitiateReverseConnectionsResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()));
    }

    private ReverseConnectionResponse reverseConnection(Socket socket, LinkInfo link, String sourceHost, int sourcePort, int initiateRequestId) {
        ReverseConnectionRequestData reverseRequestData = new ReverseConnectionRequestData().setClusterLinkId(new Uuid(link.id().getMostSignificantBits(), link.id().getLeastSignificantBits())).setTargetClusterId(((KafkaServer)this.servers().head()).clusterId()).setSourceClusterId(link.remoteClusterId()).setSourceBrokerId(link.remoteBrokerId()).setSourceHost(sourceHost).setSourcePort(sourcePort).setInitiateRequestId(initiateRequestId);
        ReverseConnectionRequest reverseRequest = (ReverseConnectionRequest)new ReverseConnectionRequest.Builder(reverseRequestData).build();
        return (ReverseConnectionResponse)this.sendAndReceive((AbstractRequest)reverseRequest, socket, this.sendAndReceive$default$3(), this.sendAndReceive$default$4(), ClassTag$.MODULE$.apply(ReverseConnectionResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()));
    }

    private String reverseConnection$default$3() {
        return "localhost";
    }

    private int reverseConnection$default$4() {
        return 0;
    }

    private int reverseConnection$default$5() {
        return -1;
    }

    private final void LinkInfo$lzycompute$1() {
        synchronized (this) {
            if (this.LinkInfo$module == null) {
                this.LinkInfo$module = new ReverseConnectionRequestTest$LinkInfo$(this);
            }
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnection$1(KafkaServer x$2) {
        return x$2.kafkaController().isActive();
    }

    public static final /* synthetic */ String $anonfun$testReverseConnection$3() {
        return "Channel not added to remote client";
    }

    public static final /* synthetic */ String $anonfun$testReverseConnection$5() {
        return "Channel not added to remote client";
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$2(KafkaServer x$3) {
        return x$3.kafkaController().isActive();
    }

    public static final /* synthetic */ void $anonfun$testInitiateReverseConnection$3(ReverseConnectionRequestTest $this, Socket sourceSocket$1, LinkInfo destLink$1) {
        InitiateReverseConnectionsResponse sourceResponse = $this.initiateReverseConnection(sourceSocket$1, destLink$1);
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)sourceResponse.errorCounts());
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$4(KafkaServer x$4) {
        return x$4.kafkaController().isActive();
    }

    public static final /* synthetic */ java.util.Map $anonfun$testInitiateReverseConnection$5(ReverseConnectionRequestTest $this, Socket destSocket$1, LinkInfo destLink$1) {
        return $this.initiateReverseConnection(destSocket$1, destLink$1).errorCounts();
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$6(java.util.Map x$5) {
        java.util.Map map = x$5;
        java.util.Map<Errors, Integer> map2 = Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1));
        return !(map != null ? !((Object)map).equals(map2) : map2 != null);
    }

    /*
     * WARNING - void declaration
     */
    public static final /* synthetic */ void $anonfun$testInitiateReverseConnection$1(ReverseConnectionRequestTest $this, IntegrationTestHarness sourceCluster) {
        void var9_19;
        Tuple2 tuple2;
        KafkaServer sourceController = (KafkaServer)((IterableLike)sourceCluster.servers().filter((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$2(x$3)))).head();
        LinkInfo destLink = $this.createLink((Option<KafkaServer>)new Some((Object)sourceController));
        ClusterLinkData sourceLinkData = new ClusterLinkData($this.linkName(), destLink.id(), (Option)new Some((Object)((KafkaServer)$this.servers().head()).clusterId()), (Option)None$.MODULE$, false);
        Properties sourceLinkProps = new Properties();
        sourceLinkProps.put("bootstrap.servers", new StringBuilder(10).append("localhost:").append(((KafkaServer)$this.servers().head()).socketServer().boundPort($this.listenerName())).toString());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.LinkModeProp(), LinkMode.Source$.MODULE$.name());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), ConnectionMode.Outbound$.MODULE$.name());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "PLAINTEXT");
        ClusterLinkConfig sourceLinkConfig = ClusterLinkConfig$.MODULE$.create((java.util.Map)sourceLinkProps, ClusterLinkConfig$.MODULE$.create$default$2());
        ((ClusterLinkManager)sourceController.clusterLinkManager()).createSourceClusterLink(sourceLinkData, sourceLinkConfig);
        Socket sourceSocket = $this.connect(sourceController.socketServer(), $this.connect$default$2());
        $this.sockets().$plus$eq((Object)sourceSocket);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long l = 10000L;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$3($this, sourceSocket, destLink);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
        Socket destSocket = $this.connect(((KafkaServer)$this.servers().find((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$4(x$4))).get()).socketServer(), $this.connect$default$2());
        $this.sockets().$plus$eq((Object)sourceSocket);
        long l2 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        long l3 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            void computeUntilTrue_waitTime;
            java.util.Map computeUntilTrue_result;
            if (ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$6(computeUntilTrue_result = ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$5($this, destSocket, destLink))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        java.util.Map errorCounts = (java.util.Map)tuple2._1();
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)var9_19);
    }

    public static final /* synthetic */ boolean $anonfun$testNotController$1(KafkaServer x$6) {
        return x$6.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkNotFound$1(KafkaServer x$7) {
        return x$7.kafkaController().isActive();
    }

    public static final /* synthetic */ int $anonfun$createLink$3(KafkaServer x$9) {
        return x$9.config().brokerId();
    }

    public static final /* synthetic */ int $anonfun$createLink$5(ReverseConnectionRequestTest $this, KafkaServer x$10) {
        return x$10.socketServer().boundPort($this.listenerName());
    }

    public static final /* synthetic */ boolean $anonfun$createLink$7(KafkaServer x$11) {
        return x$11.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$createLink$9(ReverseConnectionRequestTest $this, KafkaServer x$12) {
        return x$12.clusterLinkManager().resolveLinkId($this.linkName()).nonEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$createLink$8(ReverseConnectionRequestTest $this) {
        return $this.servers().forall((Function1 & Serializable & scala.Serializable)x$12 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$createLink$9($this, x$12)));
    }

    public static final /* synthetic */ String $anonfun$createLink$10() {
        return "Link not created";
    }

    public ReverseConnectionRequestTest() {
        this.brokerCount = 2;
        this.linkName = "testLink1";
    }

    public static final /* synthetic */ Object $anonfun$testInitiateReverseConnection$1$adapted(ReverseConnectionRequestTest $this, IntegrationTestHarness sourceCluster) {
        ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$1($this, sourceCluster);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testInitiateReverseConnection$6$adapted(java.util.Map x$5) {
        return BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$6(x$5));
    }

    public class LinkInfo
    implements Product,
    scala.Serializable {
        private final String name;
        private final UUID id;
        private final String remoteClusterId;
        private final int remoteBrokerId;
        public final /* synthetic */ ReverseConnectionRequestTest $outer;

        public String name() {
            return this.name;
        }

        public UUID id() {
            return this.id;
        }

        public String remoteClusterId() {
            return this.remoteClusterId;
        }

        public int remoteBrokerId() {
            return this.remoteBrokerId;
        }

        public LinkInfo copy(String name, UUID id, String remoteClusterId, int remoteBrokerId) {
            return new LinkInfo(this.kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer(), name, id, remoteClusterId, remoteBrokerId);
        }

        public String copy$default$1() {
            return this.name();
        }

        public UUID copy$default$2() {
            return this.id();
        }

        public String copy$default$3() {
            return this.remoteClusterId();
        }

        public int copy$default$4() {
            return this.remoteBrokerId();
        }

        public String productPrefix() {
            return "LinkInfo";
        }

        public int productArity() {
            return 4;
        }

        public Object productElement(int x$1) {
            switch (x$1) {
                case 0: {
                    return this.name();
                }
                case 1: {
                    return this.id();
                }
                case 2: {
                    return this.remoteClusterId();
                }
                case 3: {
                    return BoxesRunTime.boxToInteger((int)this.remoteBrokerId());
                }
            }
            throw new IndexOutOfBoundsException(Integer.toString(x$1));
        }

        public Iterator<Object> productIterator() {
            return ScalaRunTime$.MODULE$.typedProductIterator((Product)this);
        }

        public boolean canEqual(Object x$1) {
            return x$1 instanceof LinkInfo;
        }

        public int hashCode() {
            return Statics.finalizeHash((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)-889275714, (int)Statics.anyHash((Object)this.name())), (int)Statics.anyHash((Object)this.id())), (int)Statics.anyHash((Object)this.remoteClusterId())), (int)this.remoteBrokerId()), (int)4);
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString((Product)this);
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public boolean equals(Object x$1) {
            if (this == x$1) return true;
            if (!(x$1 instanceof LinkInfo)) return false;
            if (((LinkInfo)x$1).kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer() != this.kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer()) return false;
            boolean bl = true;
            if (!bl) return false;
            LinkInfo linkInfo = (LinkInfo)x$1;
            String string = this.name();
            String string2 = linkInfo.name();
            if (string == null) {
                if (string2 != null) {
                    return false;
                }
            } else if (!string.equals(string2)) return false;
            UUID uUID = this.id();
            UUID uUID2 = linkInfo.id();
            if (uUID == null) {
                if (uUID2 != null) {
                    return false;
                }
            } else if (!((Object)uUID).equals(uUID2)) return false;
            String string3 = this.remoteClusterId();
            String string4 = linkInfo.remoteClusterId();
            if (string3 == null) {
                if (string4 != null) {
                    return false;
                }
            } else if (!string3.equals(string4)) return false;
            if (this.remoteBrokerId() != linkInfo.remoteBrokerId()) return false;
            if (!linkInfo.canEqual(this)) return false;
            return true;
        }

        public /* synthetic */ ReverseConnectionRequestTest kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer() {
            return this.$outer;
        }

        public LinkInfo(ReverseConnectionRequestTest $outer, String name, UUID id, String remoteClusterId, int remoteBrokerId) {
            this.name = name;
            this.id = id;
            this.remoteClusterId = remoteClusterId;
            this.remoteBrokerId = remoteBrokerId;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            Product.$init$((Product)this);
        }
    }
}

