/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.net.Socket;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import kafka.api.IntegrationTestHarness;
import kafka.server.BaseRequestTest;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.server.ReverseConnectionRequestTest$LinkInfo$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestConnectionManager;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ConnectionMode;
import kafka.server.link.LinkMode;
import kafka.utils.NotNothing$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateClusterLinksResponseData;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.message.ReverseConnectionRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.CreateClusterLinksRequest;
import org.apache.kafka.common.requests.CreateClusterLinksResponse;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.InitiateReverseConnectionsResponse;
import org.apache.kafka.common.requests.ReverseConnectionRequest;
import org.apache.kafka.common.requests.ReverseConnectionResponse;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t%c\u0001\u0002\u00192\u0001YBQa\u000f\u0001\u0005\u0002qBqA\u0010\u0001C\u0002\u0013\u0005s\b\u0003\u0004G\u0001\u0001\u0006I\u0001\u0011\u0005\b\u000f\u0002\u0011\r\u0011\"\u0003I\u0011\u0019I\u0006\u0001)A\u0005\u0013\"9!\f\u0001b\u0001\n\u0013Y\u0006B\u00022\u0001A\u0003%AL\u0002\u0003d\u0001\u0001#\u0007\u0002\u00038\t\u0005+\u0007I\u0011A8\t\u0011iD!\u0011#Q\u0001\nAD\u0001b\u001f\u0005\u0003\u0016\u0004%\t\u0001 \u0005\n\u0003\u000fA!\u0011#Q\u0001\nuD\u0011\"!\u0003\t\u0005+\u0007I\u0011A8\t\u0013\u0005-\u0001B!E!\u0002\u0013\u0001\b\"CA\u0007\u0011\tU\r\u0011\"\u0001@\u0011%\ty\u0001\u0003B\tB\u0003%\u0001\t\u0003\u0004<\u0011\u0011\u0005\u0011\u0011\u0003\u0005\n\u0003?A\u0011\u0011!C\u0001\u0003CA\u0011\"a\u000b\t#\u0003%\t!!\f\t\u0013\u0005\r\u0003\"%A\u0005\u0002\u0005\u0015\u0003\"CA%\u0011E\u0005I\u0011AA\u0017\u0011%\tY\u0005CI\u0001\n\u0003\ti\u0005\u0003\u0005\u0002R!\t\t\u0011\"\u0011\\\u0011!\t\u0019\u0006CA\u0001\n\u0003y\u0004\"CA+\u0011\u0005\u0005I\u0011AA,\u0011%\t\u0019\u0007CA\u0001\n\u0003\n)\u0007C\u0005\u0002p!\t\t\u0011\"\u0001\u0002r!I\u00111\u0010\u0005\u0002\u0002\u0013\u0005\u0013Q\u0010\u0005\n\u0003\u007fB\u0011\u0011!C!\u0003\u0003C\u0011\"a!\t\u0003\u0003%\t%!\"\b\u0013\u0005%\u0005!!A\t\u0002\u0005-e\u0001C2\u0001\u0003\u0003E\t!!$\t\rm\u0002C\u0011AAN\u0011%\ty\bIA\u0001\n\u000b\n\t\tC\u0005\u0002\u001e\u0002\n\t\u0011\"!\u0002 \"I\u0011\u0011\u0016\u0011\u0002\u0002\u0013\u0005\u00151\u0016\u0005\b\u0003{\u0003A\u0011IA`\u0011\u001d\t\t\u000f\u0001C\u0001\u0003\u007fCq!a;\u0001\t\u0003\ty\fC\u0004\u0002p\u0002!\t!a0\t\u000f\u0005M\b\u0001\"\u0001\u0002@\"9\u0011q\u001f\u0001\u0005\n\u0005e\bb\u0002B\u0004\u0001\u0011%!\u0011\u0002\u0005\b\u0005S\u0001A\u0011\u0002B\u0016\u0011%\u0011\u0019\u0005AI\u0001\n\u0013\ti\u0003C\u0005\u0003F\u0001\t\n\u0011\"\u0003\u0002N!I!q\t\u0001\u0012\u0002\u0013%\u0011Q\n\u0002\u001d%\u00164XM]:f\u0007>tg.Z2uS>t'+Z9vKN$H+Z:u\u0015\t\u00114'\u0001\u0004tKJ4XM\u001d\u0006\u0002i\u0005)1.\u00194lC\u000e\u00011C\u0001\u00018!\tA\u0014(D\u00012\u0013\tQ\u0014GA\bCCN,'+Z9vKN$H+Z:u\u0003\u0019a\u0014N\\5u}Q\tQ\b\u0005\u00029\u0001\u0005Y!M]8lKJ\u001cu.\u001e8u+\u0005\u0001\u0005CA!E\u001b\u0005\u0011%\"A\"\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0013%aA%oi\u0006a!M]8lKJ\u001cu.\u001e8uA\u000591o\\2lKR\u001cX#A%\u0011\u0007){\u0015+D\u0001L\u0015\taU*A\u0004nkR\f'\r\\3\u000b\u00059\u0013\u0015AC2pY2,7\r^5p]&\u0011\u0001k\u0013\u0002\u0007\u0005V4g-\u001a:\u0011\u0005I;V\"A*\u000b\u0005Q+\u0016a\u00018fi*\ta+\u0001\u0003kCZ\f\u0017B\u0001-T\u0005\u0019\u0019vnY6fi\u0006A1o\\2lKR\u001c\b%\u0001\u0005mS:\\g*Y7f+\u0005a\u0006CA/a\u001b\u0005q&BA0V\u0003\u0011a\u0017M\\4\n\u0005\u0005t&AB*ue&tw-A\u0005mS:\\g*Y7fA\tAA*\u001b8l\u0013:4wn\u0005\u0003\tK\"\\\u0007CA!g\u0013\t9'I\u0001\u0004B]f\u0014VM\u001a\t\u0003\u0003&L!A\u001b\"\u0003\u000fA\u0013x\u000eZ;diB\u0011\u0011\t\\\u0005\u0003[\n\u0013AbU3sS\u0006d\u0017N_1cY\u0016\fAA\\1nKV\t\u0001\u000f\u0005\u0002rq:\u0011!O\u001e\t\u0003g\nk\u0011\u0001\u001e\u0006\u0003kV\na\u0001\u0010:p_Rt\u0014BA<C\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011-\u001f\u0006\u0003o\n\u000bQA\\1nK\u0002\n!!\u001b3\u0016\u0003u\u00042A`A\u0002\u001b\u0005y(bAA\u0001+\u0006!Q\u000f^5m\u0013\r\t)a \u0002\u0005+VKE)A\u0002jI\u0002\nqB]3n_R,7\t\\;ti\u0016\u0014\u0018\nZ\u0001\u0011e\u0016lw\u000e^3DYV\u001cH/\u001a:JI\u0002\naB]3n_R,'I]8lKJLE-A\bsK6|G/\u001a\"s_.,'/\u00133!))\t\u0019\"a\u0006\u0002\u001a\u0005m\u0011Q\u0004\t\u0004\u0003+AQ\"\u0001\u0001\t\u000b9\f\u0002\u0019\u00019\t\u000bm\f\u0002\u0019A?\t\r\u0005%\u0011\u00031\u0001q\u0011\u0019\ti!\u0005a\u0001\u0001\u0006!1m\u001c9z))\t\u0019\"a\t\u0002&\u0005\u001d\u0012\u0011\u0006\u0005\b]J\u0001\n\u00111\u0001q\u0011\u001dY(\u0003%AA\u0002uD\u0001\"!\u0003\u0013!\u0003\u0005\r\u0001\u001d\u0005\t\u0003\u001b\u0011\u0002\u0013!a\u0001\u0001\u0006q1m\u001c9zI\u0011,g-Y;mi\u0012\nTCAA\u0018U\r\u0001\u0018\u0011G\u0016\u0003\u0003g\u0001B!!\u000e\u0002@5\u0011\u0011q\u0007\u0006\u0005\u0003s\tY$A\u0005v]\u000eDWmY6fI*\u0019\u0011Q\b\"\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002B\u0005]\"!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006q1m\u001c9zI\u0011,g-Y;mi\u0012\u0012TCAA$U\ri\u0018\u0011G\u0001\u000fG>\u0004\u0018\u0010\n3fM\u0006,H\u000e\u001e\u00134\u00039\u0019w\u000e]=%I\u00164\u0017-\u001e7uIQ*\"!a\u0014+\u0007\u0001\u000b\t$A\u0007qe>$Wo\u0019;Qe\u00164\u0017\u000e_\u0001\raJ|G-^2u\u0003JLG/_\u0001\u000faJ|G-^2u\u000b2,W.\u001a8u)\u0011\tI&a\u0018\u0011\u0007\u0005\u000bY&C\u0002\u0002^\t\u00131!\u00118z\u0011!\t\t'GA\u0001\u0002\u0004\u0001\u0015a\u0001=%c\u0005y\u0001O]8ek\u000e$\u0018\n^3sCR|'/\u0006\u0002\u0002hA1\u0011\u0011NA6\u00033j\u0011!T\u0005\u0004\u0003[j%\u0001C%uKJ\fGo\u001c:\u0002\u0011\r\fg.R9vC2$B!a\u001d\u0002zA\u0019\u0011)!\u001e\n\u0007\u0005]$IA\u0004C_>dW-\u00198\t\u0013\u0005\u00054$!AA\u0002\u0005e\u0013\u0001\u00035bg\"\u001cu\u000eZ3\u0015\u0003\u0001\u000b\u0001\u0002^8TiJLgn\u001a\u000b\u00029\u00061Q-];bYN$B!a\u001d\u0002\b\"I\u0011\u0011\r\u0010\u0002\u0002\u0003\u0007\u0011\u0011L\u0001\t\u0019&t7.\u00138g_B\u0019\u0011Q\u0003\u0011\u0014\t\u0001\nyi\u001b\t\u000b\u0003#\u000b9\n]?q\u0001\u0006MQBAAJ\u0015\r\t)JQ\u0001\beVtG/[7f\u0013\u0011\tI*a%\u0003#\u0005\u00137\u000f\u001e:bGR4UO\\2uS>tG\u0007\u0006\u0002\u0002\f\u0006)\u0011\r\u001d9msRQ\u00111CAQ\u0003G\u000b)+a*\t\u000b9\u001c\u0003\u0019\u00019\t\u000bm\u001c\u0003\u0019A?\t\r\u0005%1\u00051\u0001q\u0011\u0019\tia\ta\u0001\u0001\u00069QO\\1qa2LH\u0003BAW\u0003s\u0003R!QAX\u0003gK1!!-C\u0005\u0019y\u0005\u000f^5p]B9\u0011)!.q{B\u0004\u0015bAA\\\u0005\n1A+\u001e9mKRB\u0011\"a/%\u0003\u0003\u0005\r!a\u0005\u0002\u0007a$\u0003'\u0001\u0005uK\u0006\u0014Hi\\<o)\t\t\t\rE\u0002B\u0003\u0007L1!!2C\u0005\u0011)f.\u001b;)\u0007\u0015\nI\r\u0005\u0003\u0002L\u0006uWBAAg\u0015\u0011\ty-!5\u0002\u0007\u0005\u0004\u0018N\u0003\u0003\u0002T\u0006U\u0017a\u00026va&$XM\u001d\u0006\u0005\u0003/\fI.A\u0003kk:LGO\u0003\u0002\u0002\\\u0006\u0019qN]4\n\t\u0005}\u0017Q\u001a\u0002\n\u0003\u001a$XM]#bG\"\fQ\u0003^3tiJ+g/\u001a:tK\u000e{gN\\3di&|g\u000eK\u0002'\u0003K\u0004B!a3\u0002h&!\u0011\u0011^Ag\u0005\u0011!Vm\u001d;\u0002;Q,7\u000f^%oSRL\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:D3aJAs\u0003E!Xm\u001d;O_R\u001cuN\u001c;s_2dWM\u001d\u0015\u0004Q\u0005\u0015\u0018a\u0006;fgR\u001cE.^:uKJd\u0015N\\6O_R4u.\u001e8eQ\rI\u0013Q]\u0001\u000bGJ,\u0017\r^3MS:\\G\u0003BA\n\u0003wDq!!@+\u0001\u0004\ty0\u0001\u0007t_V\u00148-Z*feZ,'\u000fE\u0003B\u0003_\u0013\t\u0001E\u00029\u0005\u0007I1A!\u00022\u0005-Y\u0015MZ6b'\u0016\u0014h/\u001a:\u00023%t\u0017\u000e^5bi\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u000b\u0007\u0005\u0017\u0011\tC!\n\u0011\t\t5!QD\u0007\u0003\u0005\u001fQAA!\u0005\u0003\u0014\u0005A!/Z9vKN$8O\u0003\u0003\u0003\u0016\t]\u0011AB2p[6|gNC\u00025\u00053QAAa\u0007\u0002Z\u00061\u0011\r]1dQ\u0016LAAa\b\u0003\u0010\t\u0011\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+7\u000f]8og\u0016DaAa\t,\u0001\u0004\t\u0016AB:pG.,G\u000fC\u0004\u0003(-\u0002\r!a\u0005\u0002\t1Lgn[\u0001\u0012e\u00164XM]:f\u0007>tg.Z2uS>tG\u0003\u0004B\u0017\u0005g\u0011)Da\u000e\u0003<\t}\u0002\u0003\u0002B\u0007\u0005_IAA!\r\u0003\u0010\tI\"+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8SKN\u0004xN\\:f\u0011\u0019\u0011\u0019\u0003\fa\u0001#\"9!q\u0005\u0017A\u0002\u0005M\u0001\u0002\u0003B\u001dYA\u0005\t\u0019\u00019\u0002\u0015M|WO]2f\u0011>\u001cH\u000f\u0003\u0005\u0003>1\u0002\n\u00111\u0001A\u0003)\u0019x.\u001e:dKB{'\u000f\u001e\u0005\t\u0005\u0003b\u0003\u0013!a\u0001\u0001\u0006\t\u0012N\\5uS\u0006$XMU3rk\u0016\u001cH/\u00133\u00027I,g/\u001a:tK\u000e{gN\\3di&|g\u000e\n3fM\u0006,H\u000e\u001e\u00134\u0003m\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c\u0013eK\u001a\fW\u000f\u001c;%i\u0005Y\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8%I\u00164\u0017-\u001e7uIU\u0002")
public class ReverseConnectionRequestTest
extends BaseRequestTest {
    private volatile ReverseConnectionRequestTest$LinkInfo$ LinkInfo$module;
    private final int brokerCount;
    private final Buffer<Socket> sockets = (Buffer)Buffer$.MODULE$.empty();
    private final String linkName;

    public ReverseConnectionRequestTest$LinkInfo$ LinkInfo() {
        if (this.LinkInfo$module == null) {
            this.LinkInfo$lzycompute$1();
        }
        return this.LinkInfo$module;
    }

    @Override
    public int brokerCount() {
        return this.brokerCount;
    }

    private Buffer<Socket> sockets() {
        return this.sockets;
    }

    private String linkName() {
        return this.linkName;
    }

    @Override
    @AfterEach
    public void tearDown() {
        this.sockets().foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
            x$1.close();
            return BoxedUnit.UNIT;
        });
        super.tearDown();
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testReverseConnection() {
        LinkInfo link = this.createLink((Option<KafkaServer>)None$.MODULE$);
        ClusterLinkDestConnectionManager connectionManager = (ClusterLinkDestConnectionManager)((KafkaBroker)((IterableLike)this.servers().filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testReverseConnection$1(x$2)))).head()).clusterLinkManager().connectionManager(link.id()).get();
        Socket socket = this.connect(((KafkaServer)this.servers().head()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        String sourceHost = "127.0.0.1";
        int sourcePort = socket.getLocalPort();
        ReverseConnectionResponse response = this.reverseConnection(socket, link, sourceHost, sourcePort, this.reverseConnection$default$5());
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        NetworkClient networkClient = (NetworkClient)connectionManager.reverseConnectionClient().get();
        String channelId = Integer.toString(link.remoteBrokerId());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!networkClient.hasInFlightRequests(channelId)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionRequestTest.$anonfun$testReverseConnection$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Selector selector = (Selector)TestUtils.fieldValue((Object)networkClient, NetworkClient.class, (String)"selector");
        long l3 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l4 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!selector.isChannelReady(channelId)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionRequestTest.$anonfun$testReverseConnection$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        KafkaChannel channel = selector.channel(channelId);
        Assertions.assertNotNull((Object)channel);
        Assertions.assertEquals((Object)channel.socketChannel().getRemoteAddress(), (Object)socket.getLocalSocketAddress());
        Assertions.assertEquals((Object)channel.socketChannel().getLocalAddress(), (Object)socket.getRemoteSocketAddress());
    }

    @Test
    public void testInitiateReverseConnection() {
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        IntegrationTestHarness runWithRemoteCluster_remoteCluster = new IntegrationTestHarness(){

            public int brokerCount() {
                return 1;
            }
        };
        runWithRemoteCluster_remoteCluster.setUp();
        try {
            IntegrationTestHarness integrationTestHarness = runWithRemoteCluster_remoteCluster;
            ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$1(this, integrationTestHarness);
        }
        finally {
            runWithRemoteCluster_remoteCluster.tearDown();
        }
    }

    @Test
    public void testNotController() {
        LinkInfo link = this.createLink((Option<KafkaServer>)None$.MODULE$);
        Socket socket = this.connect(((KafkaServer)((IterableLike)this.servers().filterNot((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testNotController$1(x$5)))).head()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        InitiateReverseConnectionsResponse response = this.initiateReverseConnection(socket, link);
        Assertions.assertEquals(Collections.singletonMap(Errors.NOT_CONTROLLER, BoxesRunTime.boxToInteger((int)1)), (Object)response.errorCounts());
    }

    @Test
    public void testClusterLinkNotFound() {
        Socket socket = this.connect(((KafkaServer)this.servers().find((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testClusterLinkNotFound$1(x$6))).get()).socketServer(), this.connect$default$2());
        this.sockets().$plus$eq((Object)socket);
        LinkInfo link = new LinkInfo(this, "nonexistent", UUID.randomUUID(), "somehost", 1234);
        InitiateReverseConnectionsResponse initiateResponse = this.initiateReverseConnection(socket, link);
        Assertions.assertEquals(Collections.singletonMap(Errors.CLUSTER_LINK_NOT_FOUND, BoxesRunTime.boxToInteger((int)1)), (Object)initiateResponse.errorCounts());
        ReverseConnectionResponse reverseResponse = this.reverseConnection(socket, link, this.reverseConnection$default$3(), this.reverseConnection$default$4(), this.reverseConnection$default$5());
        Assertions.assertEquals((Object)Errors.CLUSTER_LINK_NOT_FOUND, (Object)reverseResponse.error());
    }

    /*
     * WARNING - void declaration
     */
    private LinkInfo createLink(Option<KafkaServer> sourceServer) {
        String sourceClusterId = (String)sourceServer.map((Function1 & Serializable & scala.Serializable)x$7 -> x$7.clusterId()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "sourceCluster1");
        int sourceBrokerId = BoxesRunTime.unboxToInt((Object)sourceServer.map((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToInteger((int)ReverseConnectionRequestTest.$anonfun$createLink$3(x$8))).getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 12));
        int sourcePort = BoxesRunTime.unboxToInt((Object)sourceServer.map((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToInteger((int)ReverseConnectionRequestTest.$anonfun$createLink$5(this, x$9))).getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 1234));
        scala.collection.immutable.Map linkConfigs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"bootstrap.servers"), (Object)new StringBuilder(10).append("localhost:").append(sourcePort).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConnectionModeProp()), (Object)ConnectionMode.Inbound$.MODULE$.name())}));
        NewClusterLink newClusterLink = new NewClusterLink(this.linkName(), sourceClusterId, (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)linkConfigs).asJava());
        CreateClusterLinksRequest createRequest = (CreateClusterLinksRequest)new CreateClusterLinksRequest.Builder(Collections.singletonList(newClusterLink), false, false, 10000).build();
        CreateClusterLinksResponseData.EntryData response = (CreateClusterLinksResponseData.EntryData)((CreateClusterLinksResponse)this.connectAndReceive((AbstractRequest)createRequest, ((KafkaServer)this.servers().head()).socketServer(), this.connectAndReceive$default$3(), ClassTag$.MODULE$.apply(CreateClusterLinksResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()))).data().entries().get(0);
        Assertions.assertNull((Object)response.errorMessage());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)response.errorCode());
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ReverseConnectionRequestTest.$anonfun$createLink$7(this)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)ReverseConnectionRequestTest.$anonfun$createLink$9());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        UUID linkId = (UUID)((KafkaBroker)this.servers().head()).clusterLinkManager().resolveLinkId(this.linkName()).get();
        return new LinkInfo(this, this.linkName(), linkId, sourceClusterId, sourceBrokerId);
    }

    private InitiateReverseConnectionsResponse initiateReverseConnection(Socket socket, LinkInfo link) {
        InitiateReverseConnectionsRequestData.EntryData connData = new InitiateReverseConnectionsRequestData.EntryData().setTargetBrokerId(((KafkaServer)this.servers().head()).config().brokerId()).setSourceBrokerId(link.remoteBrokerId()).setInitiateRequestId(1);
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(new Uuid(link.id().getMostSignificantBits(), link.id().getLeastSignificantBits())).setTargetClusterId(((KafkaServer)this.servers().head()).clusterId()).setSourceClusterId(link.remoteClusterId()).setForwardToBroker(true).setEntries(Collections.singletonList(connData));
        InitiateReverseConnectionsRequest reverseRequest = (InitiateReverseConnectionsRequest)new InitiateReverseConnectionsRequest.Builder(requestData).build();
        return (InitiateReverseConnectionsResponse)this.sendAndReceive((AbstractRequest)reverseRequest, socket, this.sendAndReceive$default$3(), this.sendAndReceive$default$4(), ClassTag$.MODULE$.apply(InitiateReverseConnectionsResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()));
    }

    private ReverseConnectionResponse reverseConnection(Socket socket, LinkInfo link, String sourceHost, int sourcePort, int initiateRequestId) {
        ReverseConnectionRequestData reverseRequestData = new ReverseConnectionRequestData().setClusterLinkId(new Uuid(link.id().getMostSignificantBits(), link.id().getLeastSignificantBits())).setTargetClusterId(((KafkaServer)this.servers().head()).clusterId()).setSourceClusterId(link.remoteClusterId()).setSourceBrokerId(link.remoteBrokerId()).setSourceHost(sourceHost).setSourcePort(sourcePort).setInitiateRequestId(initiateRequestId);
        ReverseConnectionRequest reverseRequest = (ReverseConnectionRequest)new ReverseConnectionRequest.Builder(reverseRequestData).build();
        return (ReverseConnectionResponse)this.sendAndReceive((AbstractRequest)reverseRequest, socket, this.sendAndReceive$default$3(), this.sendAndReceive$default$4(), ClassTag$.MODULE$.apply(ReverseConnectionResponse.class), NotNothing$.MODULE$.notNothingEvidence(Predef.$eq$colon$eq$.MODULE$.tpEquals()));
    }

    private String reverseConnection$default$3() {
        return "localhost";
    }

    private int reverseConnection$default$4() {
        return 0;
    }

    private int reverseConnection$default$5() {
        return -1;
    }

    private final void LinkInfo$lzycompute$1() {
        synchronized (this) {
            if (this.LinkInfo$module == null) {
                this.LinkInfo$module = new ReverseConnectionRequestTest$LinkInfo$(this);
            }
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnection$1(KafkaServer x$2) {
        return x$2.kafkaController().isActive();
    }

    public static final /* synthetic */ String $anonfun$testReverseConnection$3() {
        return "Channel not added to remote client";
    }

    public static final /* synthetic */ String $anonfun$testReverseConnection$5() {
        return "Channel not added to remote client";
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$2(KafkaServer x$3) {
        return x$3.kafkaController().isActive();
    }

    public static final /* synthetic */ void $anonfun$testInitiateReverseConnection$3(ReverseConnectionRequestTest $this, Socket sourceSocket$1, LinkInfo destLink$1) {
        InitiateReverseConnectionsResponse sourceResponse = $this.initiateReverseConnection(sourceSocket$1, destLink$1);
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)sourceResponse.errorCounts());
    }

    public static final /* synthetic */ boolean $anonfun$testInitiateReverseConnection$4(KafkaServer x$4) {
        return x$4.kafkaController().isActive();
    }

    /*
     * WARNING - void declaration
     */
    public static final /* synthetic */ void $anonfun$testInitiateReverseConnection$1(ReverseConnectionRequestTest $this, IntegrationTestHarness sourceCluster) {
        LinkInfo destLink = $this.createLink((Option<KafkaServer>)new Some(sourceCluster.servers().head()));
        ClusterLinkData sourceLinkData = new ClusterLinkData($this.linkName(), destLink.id(), (Option)new Some((Object)((KafkaServer)$this.servers().head()).clusterId()), (Option)None$.MODULE$, false);
        Properties sourceLinkProps = new Properties();
        sourceLinkProps.put("bootstrap.servers", new StringBuilder(10).append("localhost:").append(((KafkaServer)$this.servers().head()).socketServer().boundPort($this.listenerName())).toString());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.LinkModeProp(), LinkMode.Source$.MODULE$.name());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), ConnectionMode.Outbound$.MODULE$.name());
        sourceLinkProps.put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "PLAINTEXT");
        ClusterLinkConfig sourceLinkConfig = ClusterLinkConfig$.MODULE$.create((java.util.Map)sourceLinkProps);
        ((ClusterLinkManager)((KafkaBroker)sourceCluster.servers().head()).clusterLinkManager()).createSourceClusterLink(sourceLinkData, sourceLinkConfig);
        Socket sourceSocket = $this.connect(((KafkaServer)sourceCluster.servers().find((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$2(x$3))).get()).socketServer(), $this.connect$default$2());
        $this.sockets().$plus$eq((Object)sourceSocket);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long l = 10000L;
        if (testUtils$ == null) {
            throw null;
        }
        TestUtils$ retry_this = testUtils$;
        LongRef retry_wait = LongRef.create((long)1L);
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$3($this, sourceSocket, destLink);
            }
            catch (AssertionError retry_e) {
                void retry_maxWaitMs;
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    retry_this.logger().underlying().info(retry_this.msgWithLogIdent(TestUtils$.$anonfun$retry$1(retry_wait)));
                }
                Thread.sleep(retry_wait.elem);
                retry_wait.elem += package$.MODULE$.min(retry_wait.elem, 1000L);
                continue;
            }
            break;
        }
        Socket destSocket = $this.connect(((KafkaServer)$this.servers().find((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$4(x$4))).get()).socketServer(), $this.connect$default$2());
        $this.sockets().$plus$eq((Object)sourceSocket);
        InitiateReverseConnectionsResponse destResponse = $this.initiateReverseConnection(destSocket, destLink);
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)destResponse.errorCounts());
    }

    public static final /* synthetic */ boolean $anonfun$testNotController$1(KafkaServer x$5) {
        return x$5.kafkaController().isActive();
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkNotFound$1(KafkaServer x$6) {
        return x$6.kafkaController().isActive();
    }

    public static final /* synthetic */ int $anonfun$createLink$3(KafkaServer x$8) {
        return x$8.config().brokerId();
    }

    public static final /* synthetic */ int $anonfun$createLink$5(ReverseConnectionRequestTest $this, KafkaServer x$9) {
        return x$9.socketServer().boundPort($this.listenerName());
    }

    public static final /* synthetic */ boolean $anonfun$createLink$8(ReverseConnectionRequestTest $this, KafkaServer x$10) {
        return x$10.clusterLinkManager().resolveLinkId($this.linkName()).nonEmpty();
    }

    public static final /* synthetic */ boolean $anonfun$createLink$7(ReverseConnectionRequestTest $this) {
        return $this.servers().forall((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToBoolean((boolean)ReverseConnectionRequestTest.$anonfun$createLink$8($this, x$10)));
    }

    public static final /* synthetic */ String $anonfun$createLink$9() {
        return "Link not created";
    }

    public ReverseConnectionRequestTest() {
        this.brokerCount = 2;
        this.linkName = "testLink1";
    }

    public static final /* synthetic */ Object $anonfun$testInitiateReverseConnection$1$adapted(ReverseConnectionRequestTest $this, IntegrationTestHarness sourceCluster) {
        ReverseConnectionRequestTest.$anonfun$testInitiateReverseConnection$1($this, sourceCluster);
        return BoxedUnit.UNIT;
    }

    public class LinkInfo
    implements Product,
    scala.Serializable {
        private final String name;
        private final UUID id;
        private final String remoteClusterId;
        private final int remoteBrokerId;
        public final /* synthetic */ ReverseConnectionRequestTest $outer;

        public String name() {
            return this.name;
        }

        public UUID id() {
            return this.id;
        }

        public String remoteClusterId() {
            return this.remoteClusterId;
        }

        public int remoteBrokerId() {
            return this.remoteBrokerId;
        }

        public LinkInfo copy(String name, UUID id, String remoteClusterId, int remoteBrokerId) {
            return new LinkInfo(this.kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer(), name, id, remoteClusterId, remoteBrokerId);
        }

        public String copy$default$1() {
            return this.name();
        }

        public UUID copy$default$2() {
            return this.id();
        }

        public String copy$default$3() {
            return this.remoteClusterId();
        }

        public int copy$default$4() {
            return this.remoteBrokerId();
        }

        public String productPrefix() {
            return "LinkInfo";
        }

        public int productArity() {
            return 4;
        }

        public Object productElement(int x$1) {
            switch (x$1) {
                case 0: {
                    return this.name();
                }
                case 1: {
                    return this.id();
                }
                case 2: {
                    return this.remoteClusterId();
                }
                case 3: {
                    return BoxesRunTime.boxToInteger((int)this.remoteBrokerId());
                }
            }
            throw new IndexOutOfBoundsException(Integer.toString(x$1));
        }

        public Iterator<Object> productIterator() {
            return ScalaRunTime$.MODULE$.typedProductIterator((Product)this);
        }

        public boolean canEqual(Object x$1) {
            return x$1 instanceof LinkInfo;
        }

        public int hashCode() {
            return Statics.finalizeHash((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)Statics.mix((int)-889275714, (int)Statics.anyHash((Object)this.name())), (int)Statics.anyHash((Object)this.id())), (int)Statics.anyHash((Object)this.remoteClusterId())), (int)this.remoteBrokerId()), (int)4);
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString((Product)this);
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public boolean equals(Object x$1) {
            if (this == x$1) return true;
            if (!(x$1 instanceof LinkInfo)) return false;
            if (((LinkInfo)x$1).kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer() != this.kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer()) return false;
            boolean bl = true;
            if (!bl) return false;
            LinkInfo linkInfo = (LinkInfo)x$1;
            String string = this.name();
            String string2 = linkInfo.name();
            if (string == null) {
                if (string2 != null) {
                    return false;
                }
            } else if (!string.equals(string2)) return false;
            UUID uUID = this.id();
            UUID uUID2 = linkInfo.id();
            if (uUID == null) {
                if (uUID2 != null) {
                    return false;
                }
            } else if (!((Object)uUID).equals(uUID2)) return false;
            String string3 = this.remoteClusterId();
            String string4 = linkInfo.remoteClusterId();
            if (string3 == null) {
                if (string4 != null) {
                    return false;
                }
            } else if (!string3.equals(string4)) return false;
            if (this.remoteBrokerId() != linkInfo.remoteBrokerId()) return false;
            if (!linkInfo.canEqual(this)) return false;
            return true;
        }

        public /* synthetic */ ReverseConnectionRequestTest kafka$server$ReverseConnectionRequestTest$LinkInfo$$$outer() {
            return this.$outer;
        }

        public LinkInfo(ReverseConnectionRequestTest $outer, String name, UUID id, String remoteClusterId, int remoteBrokerId) {
            this.name = name;
            this.id = id;
            this.remoteClusterId = remoteClusterId;
            this.remoteBrokerId = remoteBrokerId;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            Product.$init$((Product)this);
        }
    }
}

