/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.Serializable;
import java.net.Socket;
import java.util.Collections;
import java.util.Properties;
import kafka.api.IntegrationTestHarness;
import kafka.server.BaseRequestTest;
import kafka.server.KafkaServer;
import kafka.server.ReverseConnectionRequestTest$LinkInfo$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkInboundConnectionManager;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ConnectionMode;
import kafka.utils.Logging;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateClusterLinksResponseData;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.CreateClusterLinksRequest;
import org.apache.kafka.common.requests.CreateClusterLinksResponse;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.InitiateReverseConnectionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\t}d\u0001B\u001a5\u0001eBQA\u0010\u0001\u0005\u0002}B\u0011\"\u0011\u0001A\u0002\u0003\u0005\u000b\u0015\u0002\"\t\u000f9\u0003!\u0019!C!\u001f\"1a\u000b\u0001Q\u0001\nACqa\u0016\u0001C\u0002\u0013%\u0001\f\u0003\u0004j\u0001\u0001\u0006I!\u0017\u0005\bU\u0002\u0011\r\u0011\"\u0003l\u0011\u0019\u0011\b\u0001)A\u0005Y\u001a!1\u000f\u0001!u\u0011)\ty!\u0003BK\u0002\u0013\u0005\u0011\u0011\u0003\u0005\u000b\u0003CI!\u0011#Q\u0001\n\u0005M\u0001BCA\u0012\u0013\tU\r\u0011\"\u0001\u0002&!Q\u0011\u0011H\u0005\u0003\u0012\u0003\u0006I!a\n\t\u0015\u0005m\u0012B!f\u0001\n\u0003\t\t\u0002\u0003\u0006\u0002>%\u0011\t\u0012)A\u0005\u0003'A\u0011\"a\u0010\n\u0005+\u0007I\u0011A(\t\u0013\u0005\u0005\u0013B!E!\u0002\u0013\u0001\u0006B\u0002 \n\t\u0003\t\u0019\u0005C\u0005\u0002R%\t\t\u0011\"\u0001\u0002T!I\u0011QL\u0005\u0012\u0002\u0013\u0005\u0011q\f\u0005\n\u0003kJ\u0011\u0013!C\u0001\u0003oB\u0011\"a\u001f\n#\u0003%\t!a\u0018\t\u0013\u0005u\u0014\"%A\u0005\u0002\u0005}\u0004\u0002CAB\u0013\u0005\u0005I\u0011I6\t\u0011\u0005\u0015\u0015\"!A\u0005\u0002=C\u0011\"a\"\n\u0003\u0003%\t!!#\t\u0013\u0005U\u0015\"!A\u0005B\u0005]\u0005\"CAQ\u0013\u0005\u0005I\u0011AAR\u0011%\ti+CA\u0001\n\u0003\ny\u000bC\u0005\u00024&\t\t\u0011\"\u0011\u00026\"I\u0011qW\u0005\u0002\u0002\u0013\u0005\u0013\u0011\u0018\u0005\n\u0003wK\u0011\u0011!C!\u0003{;\u0011\"!1\u0001\u0003\u0003E\t!a1\u0007\u0011M\u0004\u0011\u0011!E\u0001\u0003\u000bDaA\u0010\u0012\u0005\u0002\u0005u\u0007\"CA\\E\u0005\u0005IQIA]\u0011%\tyNIA\u0001\n\u0003\u000b\t\u000fC\u0005\u0002l\n\n\t\u0011\"!\u0002n\"9\u0011q \u0001\u0005B\t\u0005\u0001b\u0002B\u000b\u0001\u0011\u0005#q\u0003\u0005\b\u0005C\u0001A\u0011\u0001B\f\u0011\u001d\u0011Y\u0003\u0001C\u0001\u0005/AqAa\f\u0001\t\u0003\u00119\u0002C\u0004\u00034\u0001!\tAa\u0006\t\u000f\t]\u0002\u0001\"\u0003\u0003:!9!q\t\u0001\u0005\n\t%\u0003b\u0002B0\u0001\u0011%!\u0011\r\u0005\n\u0005s\u0002\u0011\u0013!C\u0005\u0003?B\u0011Ba\u001f\u0001#\u0003%I!a \t\u0013\tu\u0004!%A\u0005\n\u0005}$\u0001\b*fm\u0016\u00148/Z\"p]:,7\r^5p]J+\u0017/^3tiR+7\u000f\u001e\u0006\u0003kY\naa]3sm\u0016\u0014(\"A\u001c\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u000f\t\u0003wqj\u0011\u0001N\u0005\u0003{Q\u0012qBQ1tKJ+\u0017/^3tiR+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\u0001\u0003\"a\u000f\u0001\u0002\u0013}#Xm\u001d;J]\u001a|\u0007CA\"M\u001b\u0005!%BA#G\u0003\r\t\u0007/\u001b\u0006\u0003\u000f\"\u000bqA[;qSR,'O\u0003\u0002J\u0015\u0006)!.\u001e8ji*\t1*A\u0002pe\u001eL!!\u0014#\u0003\u0011Q+7\u000f^%oM>\f1B\u0019:pW\u0016\u00148i\\;oiV\t\u0001\u000b\u0005\u0002R)6\t!KC\u0001T\u0003\u0015\u00198-\u00197b\u0013\t)&KA\u0002J]R\fAB\u0019:pW\u0016\u00148i\\;oi\u0002\nqa]8dW\u0016$8/F\u0001Z!\rQv,Y\u0007\u00027*\u0011A,X\u0001\b[V$\u0018M\u00197f\u0015\tq&+\u0001\u0006d_2dWm\u0019;j_:L!\u0001Y.\u0003\r\t+hMZ3s!\t\u0011w-D\u0001d\u0015\t!W-A\u0002oKRT\u0011AZ\u0001\u0005U\u00064\u0018-\u0003\u0002iG\n11k\\2lKR\f\u0001b]8dW\u0016$8\u000fI\u0001\tY&t7NT1nKV\tA\u000e\u0005\u0002na6\taN\u0003\u0002pK\u0006!A.\u00198h\u0013\t\thN\u0001\u0004TiJLgnZ\u0001\nY&t7NT1nK\u0002\u0012\u0001\u0002T5oW&sgm\\\n\u0005\u0013UD8\u0010\u0005\u0002Rm&\u0011qO\u0015\u0002\u0007\u0003:L(+\u001a4\u0011\u0005EK\u0018B\u0001>S\u0005\u001d\u0001&o\u001c3vGR\u00042\u0001`A\u0005\u001d\ri\u0018Q\u0001\b\u0004}\u0006\rQ\"A@\u000b\u0007\u0005\u0005\u0001(\u0001\u0004=e>|GOP\u0005\u0002'&\u0019\u0011q\u0001*\u0002\u000fA\f7m[1hK&!\u00111BA\u0007\u00051\u0019VM]5bY&T\u0018M\u00197f\u0015\r\t9AU\u0001\u0005]\u0006lW-\u0006\u0002\u0002\u0014A!\u0011QCA\u000f\u001d\u0011\t9\"!\u0007\u0011\u0005y\u0014\u0016bAA\u000e%\u00061\u0001K]3eK\u001aL1!]A\u0010\u0015\r\tYBU\u0001\u0006]\u0006lW\rI\u0001\u0003S\u0012,\"!a\n\u0011\t\u0005%\u0012QG\u0007\u0003\u0003WQA!!\f\u00020\u000511m\\7n_:T1aNA\u0019\u0015\r\t\u0019DS\u0001\u0007CB\f7\r[3\n\t\u0005]\u00121\u0006\u0002\u0005+VLG-A\u0002jI\u0002\nqB]3n_R,7\t\\;ti\u0016\u0014\u0018\nZ\u0001\u0011e\u0016lw\u000e^3DYV\u001cH/\u001a:JI\u0002\naB]3n_R,'I]8lKJLE-A\bsK6|G/\u001a\"s_.,'/\u00133!))\t)%!\u0013\u0002L\u00055\u0013q\n\t\u0004\u0003\u000fJQ\"\u0001\u0001\t\u000f\u0005=!\u00031\u0001\u0002\u0014!9\u00111\u0005\nA\u0002\u0005\u001d\u0002bBA\u001e%\u0001\u0007\u00111\u0003\u0005\u0007\u0003\u007f\u0011\u0002\u0019\u0001)\u0002\t\r|\u0007/\u001f\u000b\u000b\u0003\u000b\n)&a\u0016\u0002Z\u0005m\u0003\"CA\b'A\u0005\t\u0019AA\n\u0011%\t\u0019c\u0005I\u0001\u0002\u0004\t9\u0003C\u0005\u0002<M\u0001\n\u00111\u0001\u0002\u0014!A\u0011qH\n\u0011\u0002\u0003\u0007\u0001+\u0001\bd_BLH\u0005Z3gCVdG\u000fJ\u0019\u0016\u0005\u0005\u0005$\u0006BA\n\u0003GZ#!!\u001a\u0011\t\u0005\u001d\u0014\u0011O\u0007\u0003\u0003SRA!a\u001b\u0002n\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003_\u0012\u0016AC1o]>$\u0018\r^5p]&!\u00111OA5\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u000fG>\u0004\u0018\u0010\n3fM\u0006,H\u000e\u001e\u00133+\t\tIH\u000b\u0003\u0002(\u0005\r\u0014AD2paf$C-\u001a4bk2$HeM\u0001\u000fG>\u0004\u0018\u0010\n3fM\u0006,H\u000e\u001e\u00135+\t\t\tIK\u0002Q\u0003G\nQ\u0002\u001d:pIV\u001cG\u000f\u0015:fM&D\u0018\u0001\u00049s_\u0012,8\r^!sSRL\u0018A\u00049s_\u0012,8\r^#mK6,g\u000e\u001e\u000b\u0005\u0003\u0017\u000b\t\nE\u0002R\u0003\u001bK1!a$S\u0005\r\te.\u001f\u0005\t\u0003'S\u0012\u0011!a\u0001!\u0006\u0019\u0001\u0010J\u0019\u0002\u001fA\u0014x\u000eZ;di&#XM]1u_J,\"!!'\u0011\r\u0005m\u0015QTAF\u001b\u0005i\u0016bAAP;\nA\u0011\n^3sCR|'/\u0001\u0005dC:,\u0015/^1m)\u0011\t)+a+\u0011\u0007E\u000b9+C\u0002\u0002*J\u0013qAQ8pY\u0016\fg\u000eC\u0005\u0002\u0014r\t\t\u00111\u0001\u0002\f\u0006\u0011\u0002O]8ek\u000e$X\t\\3nK:$h*Y7f)\ra\u0017\u0011\u0017\u0005\t\u0003'k\u0012\u0011!a\u0001!\u0006A\u0001.Y:i\u0007>$W\rF\u0001Q\u0003!!xn\u0015;sS:<G#\u00017\u0002\r\u0015\fX/\u00197t)\u0011\t)+a0\t\u0013\u0005M\u0005%!AA\u0002\u0005-\u0015\u0001\u0003'j].LeNZ8\u0011\u0007\u0005\u001d#eE\u0003#\u0003\u000f\f\u0019\u000eE\u0007\u0002J\u0006=\u00171CA\u0014\u0003'\u0001\u0016QI\u0007\u0003\u0003\u0017T1!!4S\u0003\u001d\u0011XO\u001c;j[\u0016LA!!5\u0002L\n\t\u0012IY:ue\u0006\u001cGOR;oGRLwN\u001c\u001b\u0011\t\u0005U\u00171\\\u0007\u0003\u0003/T1!!7f\u0003\tIw.\u0003\u0003\u0002\f\u0005]GCAAb\u0003\u0015\t\u0007\u000f\u001d7z))\t)%a9\u0002f\u0006\u001d\u0018\u0011\u001e\u0005\b\u0003\u001f)\u0003\u0019AA\n\u0011\u001d\t\u0019#\na\u0001\u0003OAq!a\u000f&\u0001\u0004\t\u0019\u0002\u0003\u0004\u0002@\u0015\u0002\r\u0001U\u0001\bk:\f\u0007\u000f\u001d7z)\u0011\ty/a?\u0011\u000bE\u000b\t0!>\n\u0007\u0005M(K\u0001\u0004PaRLwN\u001c\t\u000b#\u0006]\u00181CA\u0014\u0003'\u0001\u0016bAA}%\n1A+\u001e9mKRB\u0011\"!@'\u0003\u0003\u0005\r!!\u0012\u0002\u0007a$\u0003'A\u0003tKR,\u0006\u000f\u0006\u0003\u0003\u0004\t%\u0001cA)\u0003\u0006%\u0019!q\u0001*\u0003\tUs\u0017\u000e\u001e\u0005\u0007\u0005\u00179\u0003\u0019\u0001\"\u0002\u0011Q,7\u000f^%oM>D3a\nB\b!\r\u0019%\u0011C\u0005\u0004\u0005'!%A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000e\u0006\u0002\u0003\u0004!\u001a\u0001Fa\u0007\u0011\u0007\r\u0013i\"C\u0002\u0003 \u0011\u0013\u0011\"\u00114uKJ,\u0015m\u00195\u0002+Q,7\u000f\u001e*fm\u0016\u00148/Z\"p]:,7\r^5p]\"\u001a\u0011F!\n\u0011\u0007\r\u00139#C\u0002\u0003*\u0011\u0013A\u0001V3ti\u0006iB/Z:u\u0013:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|g\u000eK\u0002+\u0005K\t\u0011\u0003^3ti:{GoQ8oiJ|G\u000e\\3sQ\rY#QE\u0001\u0018i\u0016\u001cHo\u00117vgR,'\u000fT5oW:{GOR8v]\u0012D3\u0001\fB\u0013\u0003)\u0019'/Z1uK2Kgn\u001b\u000b\u0005\u0003\u000b\u0012Y\u0004C\u0004\u0003>5\u0002\rAa\u0010\u0002\u0019M|WO]2f'\u0016\u0014h/\u001a:\u0011\u000bE\u000b\tP!\u0011\u0011\u0007m\u0012\u0019%C\u0002\u0003FQ\u00121bS1gW\u0006\u001cVM\u001d<fe\u0006I\u0012N\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o)\u0019\u0011YEa\u0016\u0003\\A!!Q\nB*\u001b\t\u0011yE\u0003\u0003\u0003R\u0005-\u0012\u0001\u0003:fcV,7\u000f^:\n\t\tU#q\n\u0002#\u0013:LG/[1uKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d*fgB|gn]3\t\r\tec\u00061\u0001b\u0003\u0019\u0019xnY6fi\"9!Q\f\u0018A\u0002\u0005\u0015\u0013\u0001\u00027j].\f\u0011C]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o)1\u0011\u0019G!\u001b\u0003l\t5$\u0011\u000fB;!\u0011\u0011iE!\u001a\n\t\t\u001d$q\n\u0002\u001a%\u00164XM]:f\u0007>tg.Z2uS>t'+Z:q_:\u001cX\r\u0003\u0004\u0003Z=\u0002\r!\u0019\u0005\b\u0005;z\u0003\u0019AA#\u0011%\u0011yg\fI\u0001\u0002\u0004\t\u0019\"\u0001\u0006t_V\u00148-\u001a%pgRD\u0001Ba\u001d0!\u0003\u0005\r\u0001U\u0001\u000bg>,(oY3Q_J$\b\u0002\u0003B<_A\u0005\t\u0019\u0001)\u0002#%t\u0017\u000e^5bi\u0016\u0014V-];fgRLE-A\u000esKZ,'o]3D_:tWm\u0019;j_:$C-\u001a4bk2$HeM\u0001\u001ce\u00164XM]:f\u0007>tg.Z2uS>tG\u0005Z3gCVdG\u000f\n\u001b\u00027I,g/\u001a:tK\u000e{gN\\3di&|g\u000e\n3fM\u0006,H\u000e\u001e\u00136\u0001")
public class ReverseConnectionRequestTest
extends BaseRequestTest {
    private volatile ReverseConnectionRequestTest$LinkInfo$ LinkInfo$module;
    private TestInfo _testInfo;
    private final int brokerCount;
    private final Buffer<Socket> sockets = (Buffer)Buffer$.MODULE$.empty();
    private final String linkName;

    public ReverseConnectionRequestTest$LinkInfo$ LinkInfo() {
        if (this.LinkInfo$module == null) {
            this.LinkInfo$lzycompute$1();
        }
        return this.LinkInfo$module;
    }

    @Override
    public int brokerCount() {
        return this.brokerCount;
    }

    private Buffer<Socket> sockets() {
        return this.sockets;
    }

    private String linkName() {
        return this.linkName;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        super.setUp(testInfo);
    }

    @Override
    @AfterEach
    public void tearDown() {
        this.sockets().foreach((Function1 & Serializable)x$2 -> {
            x$2.close();
            return BoxedUnit.UNIT;
        });
        super.tearDown();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testReverseConnection() {
        LinkInfo link = this.createLink((Option<KafkaServer>)None$.MODULE$);
        KafkaServer controllerServer = (KafkaServer)((IterableOps)this.servers().filter((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testReverseConnection$1(x$3)))).head();
        ClusterLinkInboundConnectionManager connectionManager = (ClusterLinkInboundConnectionManager)controllerServer.clusterLinkManager().connectionManager(link.id()).get();
        Socket socket = this.connect(controllerServer.socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        String sourceHost = "127.0.0.1";
        int sourcePort = socket.getLocalPort();
        ReverseConnectionResponse response = this.reverseConnection(socket, link, sourceHost, sourcePort, -1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        NetworkClient networkClient = (NetworkClient)connectionManager.reverseConnectionClient().get();
        String channelId = Integer.toString(link.remoteBrokerId());
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!networkClient.hasInFlightRequests(channelId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Channel not added to remote client");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Selector selector = (Selector)TestUtils.fieldValue((Object)networkClient, NetworkClient.class, (String)"selector");
        long l2 = 100L;
        long waitUntilTrue_waitTimeMs2 = 15000L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!selector.isChannelReady(channelId)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                Assertions.fail((String)"Channel not added to remote client");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
        }
        KafkaChannel channel = selector.channel(channelId);
        Assertions.assertNotNull((Object)channel);
        Assertions.assertEquals((Object)channel.socketChannel().getRemoteAddress(), (Object)socket.getLocalSocketAddress());
        Assertions.assertEquals((Object)channel.socketChannel().getLocalAddress(), (Object)socket.getRemoteSocketAddress());
    }

    @Test
    public void testInitiateReverseConnection() {
        TestInfo runWithRemoteCluster_testInfo = this._testInfo;
        IntegrationTestHarness runWithRemoteCluster_remoteCluster = new IntegrationTestHarness(){

            public int brokerCount() {
                return 1;
            }
        };
        runWithRemoteCluster_remoteCluster.setUp(runWithRemoteCluster_testInfo);
        try {
            ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$1(this, runWithRemoteCluster_remoteCluster);
        }
        finally {
            runWithRemoteCluster_remoteCluster.tearDown();
        }
    }

    @Test
    public void testNotController() {
        LinkInfo link = this.createLink((Option<KafkaServer>)None$.MODULE$);
        Socket socket = this.connect(((KafkaServer)((IterableOps)this.servers().filterNot((Function1 & Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testNotController$1(x$7)))).head()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        InitiateReverseConnectionsResponse response = this.initiateReverseConnection(socket, link);
        Assertions.assertEquals(Collections.singletonMap(Errors.NOT_CONTROLLER, BoxesRunTime.boxToInteger((int)1)), (Object)response.errorCounts());
        String sourceHost = "127.0.0.1";
        int sourcePort = socket.getLocalPort();
        ReverseConnectionResponse response2 = this.reverseConnection(socket, link, sourceHost, sourcePort, -1);
        Assertions.assertEquals((Object)Errors.NOT_CONTROLLER, (Object)response2.error());
    }

    @Test
    public void testClusterLinkNotFound() {
        Socket socket = this.connect(((KafkaServer)this.servers().find((Function1 & Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testClusterLinkNotFound$1(x$8))).get()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        LinkInfo link = new LinkInfo(this, "nonexistent", Uuid.randomUuid(), "somehost", 1234);
        InitiateReverseConnectionsResponse initiateResponse = this.initiateReverseConnection(socket, link);
        Assertions.assertEquals(Collections.singletonMap(Errors.CLUSTER_LINK_NOT_FOUND, BoxesRunTime.boxToInteger((int)1)), (Object)initiateResponse.errorCounts());
        ReverseConnectionResponse reverseResponse = this.reverseConnection(socket, link, "localhost", 0, -1);
        Assertions.assertEquals((Object)Errors.CLUSTER_LINK_NOT_FOUND, (Object)reverseResponse.error());
    }

    /*
     * WARNING - void declaration
     */
    private LinkInfo createLink(Option<KafkaServer> sourceServer) {
        String sourceClusterId = (String)sourceServer.map((Function1 & Serializable)x$9 -> x$9.clusterId()).getOrElse((Function0 & Serializable)() -> "sourceCluster1");
        int sourceBrokerId = BoxesRunTime.unboxToInt((Object)sourceServer.map((Function1 & Serializable)x$10 -> BoxesRunTime.boxToInteger((int)ReverseConnectionRequestTest.$anonfun$createLink$3(x$10))).getOrElse((Function0)(JFunction0.mcI.sp & Serializable)() -> 12));
        int sourcePort = BoxesRunTime.unboxToInt((Object)sourceServer.map((Function1 & Serializable)x$11 -> BoxesRunTime.boxToInteger((int)ReverseConnectionRequestTest.$anonfun$createLink$5(this, x$11))).getOrElse((Function0)(JFunction0.mcI.sp & Serializable)() -> 1234));
        scala.collection.immutable.Map linkConfigs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringBuilder(10).append("localhost:").append(sourcePort).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConnectionModeProp()), (Object)ConnectionMode.Inbound$.MODULE$.name())}));
        NewClusterLink newClusterLink = new NewClusterLink(this.linkName(), sourceClusterId, CollectionConverters$.MODULE$.MapHasAsJava((Map)linkConfigs).asJava());
        CreateClusterLinksRequest createRequest = (CreateClusterLinksRequest)new CreateClusterLinksRequest.Builder(Collections.singletonList(newClusterLink), false, false, 10000).build();
        KafkaServer controller = (KafkaServer)((IterableOps)this.servers().filter((Function1 & Serializable)x$12 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$createLink$7(x$12)))).head();
        CreateClusterLinksResponseData.EntryData response = (CreateClusterLinksResponseData.EntryData)((CreateClusterLinksResponse)this.connectAndReceive((AbstractRequest)createRequest, controller.socketServer(), this.connectAndReceive$default$3(), ClassTag$.MODULE$.apply(CreateClusterLinksResponse.class), NotNothing$.MODULE$.notNothingEvidence($less$colon$less$.MODULE$.refl()))).data().entries().get(0);
        Assertions.assertEquals((Object)"", (Object)response.errorMessage());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)response.errorCode());
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionRequestTest.$anonfun$createLink$8(this)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Link not created");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Uuid linkId = (Uuid)controller.clusterLinkManager().resolveLinkId(this.linkName()).get();
        return new LinkInfo(this, this.linkName(), linkId, sourceClusterId, sourceBrokerId);
    }

    private InitiateReverseConnectionsResponse initiateReverseConnection(Socket socket, LinkInfo link) {
        InitiateReverseConnectionsRequestData.EntryData connData = new InitiateReverseConnectionsRequestData.EntryData().setTargetBrokerId(((KafkaServer)this.servers().head()).config().brokerId()).setSourceBrokerId(link.remoteBrokerId()).setInitiateRequestId(1);
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(new Uuid(link.id().getMostSignificantBits(), link.id().getLeastSignificantBits())).setTargetClusterId(((KafkaServer)this.servers().head()).clusterId()).setSourceClusterId(link.remoteClusterId()).setForwardToBroker(true).setEntries(Collections.singletonList(connData));
        InitiateReverseConnectionsRequest reverseRequest = (InitiateReverseConnectionsRequest)new InitiateReverseConnectionsRequest.Builder(requestData).build();
        return (InitiateReverseConnectionsResponse)this.sendAndReceive((AbstractRequest)reverseRequest, socket, this.sendAndReceive$default$3(), this.sendAndReceive$default$4(), ClassTag$.MODULE$.apply(InitiateReverseConnectionsResponse.class), NotNothing$.MODULE$.notNothingEvidence($less$colon$less$.MODULE$.refl()));
    }

    private ReverseConnectionResponse reverseConnection(Socket socket, LinkInfo link, String sourceHost, int sourcePort, int initiateRequestId) {
        ReverseConnectionRequestData reverseRequestData = new ReverseConnectionRequestData().setClusterLinkId(new Uuid(link.id().getMostSignificantBits(), link.id().getLeastSignificantBits())).setTargetClusterId(((KafkaServer)this.servers().head()).clusterId()).setSourceClusterId(link.remoteClusterId()).setSourceBrokerId(link.remoteBrokerId()).setSourceHost(sourceHost).setSourcePort(sourcePort).setInitiateRequestId(initiateRequestId);
        ReverseConnectionRequest reverseRequest = (ReverseConnectionRequest)new ReverseConnectionRequest.Builder(reverseRequestData).build();
        return (ReverseConnectionResponse)this.sendAndReceive((AbstractRequest)reverseRequest, socket, this.sendAndReceive$default$3(), this.sendAndReceive$default$4(), ClassTag$.MODULE$.apply(ReverseConnectionResponse.class), NotNothing$.MODULE$.notNothingEvidence($less$colon$less$.MODULE$.refl()));
    }

    private String reverseConnection$default$3() {
        return "localhost";
    }

    private int reverseConnection$default$4() {
        return 0;
    }

    private int reverseConnection$default$5() {
        return -1;
    }

    private final void LinkInfo$lzycompute$1() {
        synchronized (this) {
            if (this.LinkInfo$module == null) {
                this.LinkInfo$module = new ReverseConnectionRequestTest$LinkInfo$(this);
            }
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnection$1(KafkaServer x$3) {
        return x$3.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnection$2(NetworkClient networkClient$1, String channelId$1) {
        return networkClient$1.hasInFlightRequests(channelId$1);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnection$3() {
        return "Channel not added to remote client";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnection$4(Selector selector$1, String channelId$1) {
        return selector$1.isChannelReady(channelId$1);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnection$5() {
        return "Channel not added to remote client";
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$2(KafkaServer x$4) {
        return x$4.kafkaController().isActive();
    }

    public static final /* synthetic */ void $anonfun$testInitiateReverseConnection$3(ReverseConnectionRequestTest $this, Socket sourceSocket$1, LinkInfo destLink$1) {
        InitiateReverseConnectionsResponse sourceResponse = $this.initiateReverseConnection(sourceSocket$1, destLink$1);
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)sourceResponse.errorCounts());
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$4(KafkaServer x$5) {
        return x$5.kafkaController().isActive();
    }

    public static final /* synthetic */ java.util.Map $anonfun$testInitiateReverseConnection$5(ReverseConnectionRequestTest $this, Socket destSocket$1, LinkInfo destLink$1) {
        return $this.initiateReverseConnection(destSocket$1, destLink$1).errorCounts();
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$6(java.util.Map x$6) {
        java.util.Map map = x$6;
        java.util.Map<Errors, Integer> map2 = Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1));
        return !(map != null ? !((Object)map).equals(map2) : map2 != null);
    }

    /*
     * WARNING - void declaration
     */
    public static final /* synthetic */ void $anonfun$testInitiateReverseConnection$1(ReverseConnectionRequestTest $this, IntegrationTestHarness sourceCluster) {
        void var10_21;
        Tuple2 tuple2;
        KafkaServer sourceController = (KafkaServer)((IterableOps)sourceCluster.servers().filter((Function1 & Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$2(x$4)))).head();
        LinkInfo destLink = $this.createLink((Option<KafkaServer>)new Some((Object)sourceController));
        ClusterLinkData sourceLinkData = new ClusterLinkData($this.linkName(), destLink.id(), (Option)new Some((Object)((KafkaServer)$this.servers().head()).clusterId()), (Option)None$.MODULE$, false);
        Properties sourceLinkProps = new Properties();
        sourceLinkProps.put("bootstrap.servers", new StringBuilder(10).append("localhost:").append(((KafkaServer)$this.servers().head()).socketServer().boundPort($this.listenerName())).toString());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.SOURCE.name());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), ConnectionMode.Outbound$.MODULE$.name());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "PLAINTEXT");
        ClusterLinkConfig sourceLinkConfig = ClusterLinkConfig$.MODULE$.create((java.util.Map)sourceLinkProps, (Option)None$.MODULE$, true);
        ((ClusterLinkManager)sourceController.clusterLinkManager()).createSourceClusterLink(sourceLinkData, sourceLinkConfig);
        Socket sourceSocket = $this.connect(sourceController.socketServer(), $this.connect$default$2());
        $this.sockets().$plus$eq((Object)sourceSocket);
        long l = 10000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l2 = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$3($this, sourceSocket, destLink);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l2).append(", and then retrying.").toString();
                    Object var24_14 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l2);
                l2 += package$.MODULE$.min(l2, 1000L);
                continue;
            }
            break;
        }
        Object var11_9 = null;
        Object var16_13 = null;
        Socket destSocket = $this.connect(((KafkaServer)$this.servers().find((Function1 & Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$4(x$5))).get()).socketServer(), $this.connect$default$2());
        $this.sockets().$plus$eq((Object)sourceSocket);
        long l3 = 100L;
        long computeUntilTrue_waitTime = 15000L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            java.util.Map computeUntilTrue_result;
            if (ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$6(computeUntilTrue_result = ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$5($this, destSocket, destLink))) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)true));
                break;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)computeUntilTrue_result), (Object)BoxesRunTime.boxToBoolean((boolean)false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue_waitTime), (long)computeUntilTrue_pause));
        }
        Object var23_19 = null;
        Tuple2 tuple22 = tuple2;
        if (tuple22 == null) {
            throw new MatchError(null);
        }
        java.util.Map errorCounts = (java.util.Map)tuple22._1();
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)var10_21);
    }

    public static final /* synthetic */ boolean $anonfun$testNotController$1(KafkaServer x$7) {
        return x$7.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkNotFound$1(KafkaServer x$8) {
        return x$8.kafkaController().isActive();
    }

    public static final /* synthetic */ int $anonfun$createLink$3(KafkaServer x$10) {
        return x$10.config().brokerId();
    }

    public static final /* synthetic */ int $anonfun$createLink$5(ReverseConnectionRequestTest $this, KafkaServer x$11) {
        return x$11.socketServer().boundPort($this.listenerName());
    }

    public static final /* synthetic */ boolean $anonfun$createLink$7(KafkaServer x$12) {
        return x$12.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$createLink$9(ReverseConnectionRequestTest $this, KafkaServer x$13) {
        return x$13.clusterLinkManager().resolveLinkId($this.linkName()).nonEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$createLink$8(ReverseConnectionRequestTest $this) {
        return $this.servers().forall((Function1 & Serializable)x$13 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$createLink$9($this, x$13)));
    }

    public static final /* synthetic */ String $anonfun$createLink$10() {
        return "Link not created";
    }

    public ReverseConnectionRequestTest() {
        this.brokerCount = 2;
        this.linkName = "testLink1";
    }

    public static final /* synthetic */ Object $anonfun$testInitiateReverseConnection$1$adapted(ReverseConnectionRequestTest $this, IntegrationTestHarness sourceCluster) {
        ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$1($this, sourceCluster);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testInitiateReverseConnection$6$adapted(java.util.Map x$6) {
        return BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$6(x$6));
    }

    public class LinkInfo
    implements Product,
    Serializable {
        private final String name;
        private final Uuid id;
        private final String remoteClusterId;
        private final int remoteBrokerId;
        public final /* synthetic */ ReverseConnectionRequestTest $outer;

        public Iterator<String> productElementNames() {
            return Product.productElementNames$((Product)this);
        }

        public String name() {
            return this.name;
        }

        public Uuid id() {
            return this.id;
        }

        public String remoteClusterId() {
            return this.remoteClusterId;
        }

        public int remoteBrokerId() {
            return this.remoteBrokerId;
        }

        public LinkInfo copy(String name, Uuid id, String remoteClusterId, int remoteBrokerId) {
            return new LinkInfo(this.kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer(), name, id, remoteClusterId, remoteBrokerId);
        }

        public String copy$default$1() {
            return this.name();
        }

        public Uuid copy$default$2() {
            return this.id();
        }

        public String copy$default$3() {
            return this.remoteClusterId();
        }

        public int copy$default$4() {
            return this.remoteBrokerId();
        }

        public String productPrefix() {
            return "LinkInfo";
        }

        public int productArity() {
            return 4;
        }

        public Object productElement(int x$1) {
            switch (x$1) {
                case 0: {
                    return this.name();
                }
                case 1: {
                    return this.id();
                }
                case 2: {
                    return this.remoteClusterId();
                }
                case 3: {
                    return BoxesRunTime.boxToInteger((int)this.remoteBrokerId());
                }
            }
            return Statics.ioobe((int)x$1);
        }

        public Iterator<Object> productIterator() {
            return ScalaRunTime$.MODULE$.typedProductIterator((Product)this);
        }

        public boolean canEqual(Object x$1) {
            return x$1 instanceof LinkInfo;
        }

        public String productElementName(int x$1) {
            switch (x$1) {
                case 0: {
                    return "name";
                }
                case 1: {
                    return "id";
                }
                case 2: {
                    return "remoteClusterId";
                }
                case 3: {
                    return "remoteBrokerId";
                }
            }
            return (String)Statics.ioobe((int)x$1);
        }

        public int hashCode() {
            return Statics.finalizeHash((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)-889275714, (int)this.productPrefix().hashCode()), (int)Statics.anyHash((Object)this.name())), (int)Statics.anyHash((Object)this.id())), (int)Statics.anyHash((Object)this.remoteClusterId())), (int)this.remoteBrokerId()), (int)4);
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString((Product)this);
        }

        public boolean equals(Object x$1) {
            block3: {
                block2: {
                    if (this == x$1) break block2;
                    if (!(x$1 instanceof LinkInfo && ((LinkInfo)x$1).kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer() == this.kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer())) break block3;
                    LinkInfo linkInfo = (LinkInfo)x$1;
                    if (this.remoteBrokerId() != linkInfo.remoteBrokerId()) break block3;
                    String string = this.name();
                    String string2 = linkInfo.name();
                    if (string != null ? !string.equals(string2) : string2 != null) break block3;
                    Uuid uuid = this.id();
                    Uuid uuid2 = linkInfo.id();
                    if (uuid != null ? !uuid.equals(uuid2) : uuid2 != null) break block3;
                    String string3 = this.remoteClusterId();
                    String string4 = linkInfo.remoteClusterId();
                    if ((string3 != null ? !string3.equals(string4) : string4 != null) || !linkInfo.canEqual(this)) break block3;
                }
                return true;
            }
            return false;
        }

        public /* synthetic */ ReverseConnectionRequestTest kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer() {
            return this.$outer;
        }

        public LinkInfo(ReverseConnectionRequestTest $outer, String name, Uuid id, String remoteClusterId, int remoteBrokerId) {
            this.name = name;
            this.id = id;
            this.remoteClusterId = remoteClusterId;
            this.remoteBrokerId = remoteBrokerId;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            Product.$init$((Product)this);
        }
    }
}

