/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.cluster.BrokerEndPoint;
import kafka.server.ClusterLinkRequestQuota;
import kafka.server.FetcherPool;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.UnboundedClusterLinkRequestQuota$;
import kafka.server.link.ClusterLinkAsyncSender;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005\u0005Eh\u0001B\u0017/\u0001UBQ\u0001\u0010\u0001\u0005\u0002uBq\u0001\u0011\u0001C\u0002\u0013%\u0011\t\u0003\u0004P\u0001\u0001\u0006IA\u0011\u0005\b!\u0002\u0011\r\u0011\"\u0003R\u0011\u0019A\u0006\u0001)A\u0005%\"9\u0011\f\u0001b\u0001\n\u0013Q\u0006BB1\u0001A\u0003%1\fC\u0004c\u0001\t\u0007I\u0011B2\t\r!\u0004\u0001\u0015!\u0003e\u0011\u001dI\u0007A1A\u0005\n)Daa\u001c\u0001!\u0002\u0013Y\u0007b\u00029\u0001\u0005\u0004%I!\u001d\u0005\u0007k\u0002\u0001\u000b\u0011\u0002:\t\u000fY\u0004!\u0019!C\u0005c\"1q\u000f\u0001Q\u0001\nID\u0011\u0002\u001f\u0001A\u0002\u0003\u0007I\u0011B=\t\u0013u\u0004\u0001\u0019!a\u0001\n\u0013q\bBCA\u0005\u0001\u0001\u0007\t\u0011)Q\u0005u\"Y\u00111\u0002\u0001A\u0002\u0003\u0007I\u0011BA\u0007\u0011-\t)\u0002\u0001a\u0001\u0002\u0004%I!a\u0006\t\u0017\u0005m\u0001\u00011A\u0001B\u0003&\u0011q\u0002\u0005\f\u0003;\u0001\u0001\u0019!a\u0001\n\u0013\ty\u0002C\u0006\u0002:\u0001\u0001\r\u00111A\u0005\n\u0005m\u0002bCA \u0001\u0001\u0007\t\u0011)Q\u0005\u0003CAq!!\u0011\u0001\t\u0003\t\u0019\u0005C\u0004\u0002\\\u0001!\t!a\u0011\t\u000f\u0005\u0015\u0004\u0001\"\u0001\u0002D!9\u0011\u0011\u000e\u0001\u0005\u0002\u0005\r\u0003bBA7\u0001\u0011\u0005\u00111\t\u0005\b\u0003c\u0002A\u0011AA\"\u0011\u001d\t)\b\u0001C\u0001\u0003\u0007Bq!!\u001f\u0001\t\u0003\t\u0019\u0005C\u0004\u0002~\u0001!\t!a\u0011\t\u000f\u0005\u0005\u0005\u0001\"\u0001\u0002D!9\u0011Q\u0011\u0001\u0005\n\u0005\r\u0003bBAD\u0001\u0011%\u00111\t\u0005\b\u0003\u0013\u0003A\u0011BA\"\u0011\u001d\tY\t\u0001C\u0005\u0003\u001bCq!a/\u0001\t\u0013\ti\fC\u0004\u0002T\u0002!I!!6\t\u000f\u0005u\u0007\u0001\"\u0003\u0002`\"9\u0011Q\u001d\u0001\u0005\n\u0005\u001d\bbBAw\u0001\u0011%\u00111\t\u0005\b\u0003_\u0004A\u0011BA\"\u0005i\u0019E.^:uKJd\u0015N\\6Bgft7mU3oI\u0016\u0014H+Z:u\u0015\ty\u0003'\u0001\u0003mS:\\'BA\u00193\u0003\u0019\u0019XM\u001d<fe*\t1'A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u00011\u0004CA\u001c;\u001b\u0005A$\"A\u001d\u0002\u000bM\u001c\u0017\r\\1\n\u0005mB$AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002}A\u0011q\bA\u0007\u0002]\u0005!A/[7f+\u0005\u0011\u0005CA\"N\u001b\u0005!%BA#G\u0003\u0015)H/\u001b7t\u0015\t9\u0005*\u0001\u0004d_6lwN\u001c\u0006\u0003g%S!AS&\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005a\u0015aA8sO&\u0011a\n\u0012\u0002\t\u001b>\u001c7\u000eV5nK\u0006)A/[7fA\u000511\r\\5f]R,\u0012A\u0015\t\u0003'Zk\u0011\u0001\u0016\u0006\u0003+\"\u000bqa\u00197jK:$8/\u0003\u0002X)\nQQj\\2l\u00072LWM\u001c;\u0002\u000f\rd\u0017.\u001a8uA\u0005a1o\\;sG\u0016\u0014%o\\6feV\t1\f\u0005\u0002]?6\tQL\u0003\u0002_e\u000591\r\\;ti\u0016\u0014\u0018B\u00011^\u00059\u0011%o\\6fe\u0016sG\rU8j]R\fQb]8ve\u000e,'I]8lKJ\u0004\u0013AC:pkJ\u001cWMT8eKV\tA\r\u0005\u0002fM6\ta)\u0003\u0002h\r\n!aj\u001c3f\u0003-\u0019x.\u001e:dK:{G-\u001a\u0011\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0016\u0003-\u0004\"\u0001\\7\u000e\u0003AJ!A\u001c\u0019\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u000eEJ|7.\u001a:D_:4\u0017n\u001a\u0011\u0002\u001fM|7m[3u)&lWm\\;u\u001bN,\u0012A\u001d\t\u0003oML!\u0001\u001e\u001d\u0003\u0007%sG/\u0001\tt_\u000e\\W\r\u001e+j[\u0016|W\u000f^'tA\u0005\u0001\"/Z9vKN$H+[7f_V$Xj]\u0001\u0012e\u0016\fX/Z:u)&lWm\\;u\u001bN\u0004\u0013AB:f]\u0012,'/F\u0001{!\ty40\u0003\u0002}]\t12\t\\;ti\u0016\u0014H*\u001b8l\u0003NLhnY*f]\u0012,'/\u0001\u0006tK:$WM]0%KF$2a`A\u0003!\r9\u0014\u0011A\u0005\u0004\u0003\u0007A$\u0001B+oSRD\u0001\"a\u0002\u0012\u0003\u0003\u0005\rA_\u0001\u0004q\u0012\n\u0014aB:f]\u0012,'\u000fI\u0001\rY\u0006\u001cHOU3ta>t7/Z\u000b\u0003\u0003\u001f\u00012aUA\t\u0013\r\t\u0019\u0002\u0016\u0002\u000f\u00072LWM\u001c;SKN\u0004xN\\:f\u0003Aa\u0017m\u001d;SKN\u0004xN\\:f?\u0012*\u0017\u000fF\u0002\u0000\u00033A\u0011\"a\u0002\u0015\u0003\u0003\u0005\r!a\u0004\u0002\u001b1\f7\u000f\u001e*fgB|gn]3!\u00035a\u0017m\u001d;Fq\u000e,\u0007\u000f^5p]V\u0011\u0011\u0011\u0005\t\u0005\u0003G\t\u0019D\u0004\u0003\u0002&\u0005=b\u0002BA\u0014\u0003[i!!!\u000b\u000b\u0007\u0005-B'\u0001\u0004=e>|GOP\u0005\u0002s%\u0019\u0011\u0011\u0007\u001d\u0002\u000fA\f7m[1hK&!\u0011QGA\u001c\u0005%!\u0006N]8xC\ndWMC\u0002\u00022a\n\u0011\u0003\\1ti\u0016C8-\u001a9uS>tw\fJ3r)\ry\u0018Q\b\u0005\n\u0003\u000f9\u0012\u0011!a\u0001\u0003C\ta\u0002\\1ti\u0016C8-\u001a9uS>t\u0007%A\u0003tKR,\u0006\u000fF\u0001\u0000Q\rI\u0012q\t\t\u0005\u0003\u0013\n9&\u0004\u0002\u0002L)!\u0011QJA(\u0003\r\t\u0007/\u001b\u0006\u0005\u0003#\n\u0019&A\u0004kkBLG/\u001a:\u000b\u0007\u0005U3*A\u0003kk:LG/\u0003\u0003\u0002Z\u0005-#A\u0003\"fM>\u0014X-R1dQ\u0006iA/Z:u\u0003NLhnY*f]\u0012D3AGA0!\u0011\tI%!\u0019\n\t\u0005\r\u00141\n\u0002\u0005)\u0016\u001cH/A\u000euKN$\u0018)\u001e;iK:$\u0018nY1uS>tW\t_2faRLwN\u001c\u0015\u00047\u0005}\u0013\u0001\u0005;fgR\u0014V-\u00193z)&lWm\\;uQ\ra\u0012qL\u0001\u0013i\u0016\u001cHOU3rk\u0016\u001cH\u000fV5nK>,H\u000fK\u0002\u001e\u0003?\nA\u0003^3ti\u000e{gN\\3di&|gNR1jY\u0016$\u0007f\u0001\u0010\u0002`\u00051B/Z:u+:\u001cX\u000f\u001d9peR,GMV3sg&|g\u000eK\u0002 \u0003?\n\u0011\u0003^3ti\u0012K7oY8o]\u0016\u001cG/[8oQ\r\u0001\u0013qL\u0001\u000fi\u0016\u001cHOT8SKN\u0004xN\\:fQ\r\t\u0013qL\u0001\ri\u0016\u001cHo\u00155vi\u0012|wO\u001c\u0015\u0004E\u0005}\u0013\u0001E1ts:\u001c7+\u001a8e%\u0016\fX/Z:u\u0003A\u0011Xm\u001d9p]\u0012\fe\u000e\u001a,fe&4\u00170\u0001\u000btK:$'+Z2fSZ,\u0017I\u001c3WKJLg-_\u0001\u000em\u0016\u0014\u0018NZ=GC&dWO]3\u0015\u0007}\fy\tC\u0004\u0002\u0012\u001a\u0002\r!a%\u0002\u000b\rd\u0017M\u001f>1\t\u0005U\u0015\u0011\u0016\t\u0007\u0003/\u000by*!*\u000f\t\u0005e\u00151\u0014\t\u0004\u0003OA\u0014bAAOq\u00051\u0001K]3eK\u001aLA!!)\u0002$\n)1\t\\1tg*\u0019\u0011Q\u0014\u001d\u0011\t\u0005\u001d\u0016\u0011\u0016\u0007\u0001\t1\tY+a$\u0002\u0002\u0003\u0005)\u0011AAW\u0005\ryF%M\t\u0005\u0003_\u000b)\fE\u00028\u0003cK1!a-9\u0005\u001dqu\u000e\u001e5j]\u001e\u00042aNA\\\u0013\r\tI\f\u000f\u0002\u0004\u0003:L\u0018A\u00038foJ+\u0017/^3tiR\u0011\u0011q\u0018\t\u0005\u0003\u0003\fiM\u0004\u0003\u0002D\u0006%WBAAc\u0015\r\t9MR\u0001\te\u0016\fX/Z:ug&!\u00111ZAc\u0003qyeMZ:fiN4uN\u001d'fC\u0012,'/\u00129pG\"\u0014V-];fgRLA!a4\u0002R\n9!)^5mI\u0016\u0014(\u0002BAf\u0003\u000b\f1B\\3x%\u0016\u001c\bo\u001c8tKR\u0011\u0011q\u001b\t\u0005\u0003\u0007\fI.\u0003\u0003\u0002\\\u0006\u0015'!H(gMN,Go\u001d$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fgB|gn]3\u0002\u0015=t'+Z:q_:\u001cX\rF\u0002\u0000\u0003CDq!a9*\u0001\u0004\ty!\u0001\u0005sKN\u0004xN\\:f\u0003-yg.\u0012=dKB$\u0018n\u001c8\u0015\u0007}\fI\u000fC\u0004\u0002l*\u0002\r!!\t\u0002\u0003Q\f!b\u00197fCJ\u001cF/\u0019;f\u0003U1XM]5gs\u000e\u000bG\u000e\u001c2bG.\u0004VM\u001c3j]\u001e\u0004")
public class ClusterLinkAsyncSenderTest {
    private final MockTime time = new MockTime();
    private final MockClient client = new MockClient((Time)this.time());
    private final BrokerEndPoint sourceBroker = new BrokerEndPoint(12, "localhost", 123);
    private final Node sourceNode = new Node(12, "localhost", 123);
    private final KafkaConfig brokerConfig = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:2181", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
    private final int socketTimeoutMs;
    private final int requestTimeoutMs;
    private ClusterLinkAsyncSender sender;
    private ClientResponse lastResponse;
    private Throwable lastException;

    private MockTime time() {
        return this.time;
    }

    private MockClient client() {
        return this.client;
    }

    private BrokerEndPoint sourceBroker() {
        return this.sourceBroker;
    }

    private Node sourceNode() {
        return this.sourceNode;
    }

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private int socketTimeoutMs() {
        return this.socketTimeoutMs;
    }

    private int requestTimeoutMs() {
        return this.requestTimeoutMs;
    }

    private ClusterLinkAsyncSender sender() {
        return this.sender;
    }

    private void sender_$eq(ClusterLinkAsyncSender x$1) {
        this.sender = x$1;
    }

    private ClientResponse lastResponse() {
        return this.lastResponse;
    }

    private void lastResponse_$eq(ClientResponse x$1) {
        this.lastResponse = x$1;
    }

    private Throwable lastException() {
        return this.lastException;
    }

    private void lastException_$eq(Throwable x$1) {
        this.lastException = x$1;
    }

    @BeforeEach
    public void setUp() {
        this.sender_$eq(new ClusterLinkAsyncSender(this.sourceBroker(), this.brokerConfig(), this.socketTimeoutMs(), this.requestTimeoutMs(), (Time)this.time(), (ClusterLinkRequestQuota)UnboundedClusterLinkRequestQuota$.MODULE$, 1, (FetcherPool)FetcherPool.Default$.MODULE$, (KafkaClient)this.client()));
    }

    @Test
    public void testAsyncSend() {
        this.asyncSendRequest();
        Assertions.assertTrue((boolean)this.client().isConnected(this.sourceNode().idString()));
        this.client().poll(0L, this.time().milliseconds());
        Assertions.assertEquals((int)0, (int)this.client().requests().size());
        this.sender().maybeSendPendingRequest();
        Assertions.assertEquals((int)1, (int)this.client().requests().size());
        this.verifyCallbackPending();
        this.client().poll(0L, this.time().milliseconds());
        this.verifyCallbackPending();
        this.respondAndVerify();
        this.sendReceiveAndVerify();
    }

    @Test
    public void testAuthenticationException() {
        this.client().authenticationFailed(this.sourceNode(), this.time().milliseconds());
        this.asyncSendRequest();
        this.verifyFailure(SaslAuthenticationException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testReadyTimeout() {
        this.client().delayReady(this.sourceNode(), (long)(this.socketTimeoutMs() + 1000));
        this.asyncSendRequest();
        this.time().sleep((long)(this.socketTimeoutMs() + 1));
        this.verifyFailure(SocketTimeoutException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testRequestTimeout() {
        this.asyncSendRequest();
        Assertions.assertTrue((boolean)this.client().isConnected(this.sourceNode().idString()));
        this.client().poll(0L, this.time().milliseconds());
        Assertions.assertEquals((int)0, (int)this.client().requests().size());
        this.time().sleep((long)(this.socketTimeoutMs() + 1));
        this.sender().maybeSendPendingRequest();
        this.respondAndVerify();
        this.asyncSendRequest();
        Assertions.assertTrue((boolean)this.client().isConnected(this.sourceNode().idString()));
        this.client().poll(0L, this.time().milliseconds());
        Assertions.assertEquals((int)0, (int)this.client().requests().size());
        this.time().sleep((long)(this.requestTimeoutMs() + 1));
        this.verifyFailure(IOException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testConnectionFailed() {
        this.client().backoff(this.sourceNode(), 1000L);
        this.asyncSendRequest();
        this.verifyFailure(IOException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testUnsupportedVersion() {
        this.client().prepareUnsupportedVersionResponse(x$4 -> true);
        this.asyncSendRequest();
        this.verifyFailure(UnsupportedVersionException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testDisconnection() {
        this.client().prepareResponse((AbstractResponse)this.newResponse(), true);
        this.asyncSendRequest();
        this.verifyFailure(IOException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testNoResponse() {
        this.client().prepareResponse(null, false);
        this.asyncSendRequest();
        this.verifyFailure(IllegalStateException.class);
        this.sendReceiveAndVerify();
    }

    @Test
    public void testShutdown() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            this.asyncSendRequest();
            this.client().enableBlockingUntilWakeup(1);
            Future<List> future = executor.submit(() -> this.client().poll(1000L, this.time().milliseconds()));
            this.sender().initiateClose();
            Assertions.assertTrue((boolean)this.sender().shutdownInitiated());
            this.sender().maybeSendPendingRequest();
            this.verifyFailure(NetworkException.class);
            Assertions.assertThrows(NetworkException.class, () -> this.sender().asyncSendRequest((AbstractRequest.Builder)this.newRequest(), (Function1 & Serializable)response -> {
                this.onResponse(response);
                return BoxedUnit.UNIT;
            }, (Function1 & Serializable)t -> {
                this.onException(t);
                return BoxedUnit.UNIT;
            }));
            future.get(15L, TimeUnit.SECONDS);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void asyncSendRequest() {
        this.clearState();
        this.sender().asyncSendRequest((AbstractRequest.Builder)this.newRequest(), (Function1 & Serializable)response -> {
            this.onResponse(response);
            return BoxedUnit.UNIT;
        }, (Function1 & Serializable)t -> {
            this.onException(t);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals((int)0, (int)this.client().requests().size());
        this.verifyCallbackPending();
    }

    private void respondAndVerify() {
        this.client().respond((AbstractResponse)this.newResponse());
        this.client().poll(0L, this.time().milliseconds());
        Assertions.assertFalse((boolean)this.sender().hasPendingRequest());
        Assertions.assertEquals((int)0, (int)this.client().requests().size());
        Assertions.assertNotNull((Object)this.lastResponse());
        Assertions.assertNull((Object)this.lastException());
        Assertions.assertEquals(Collections.singletonMap(Errors.NONE, BoxesRunTime.boxToInteger((int)1)), (Object)this.lastResponse().responseBody().errorCounts());
    }

    private void sendReceiveAndVerify() {
        this.client().reset();
        this.asyncSendRequest();
        this.sender().maybeSendPendingRequest();
        this.respondAndVerify();
    }

    private void verifyFailure(Class<?> clazz) {
        this.sender().maybeSendPendingRequest();
        this.client().poll(0L, this.time().milliseconds());
        Assertions.assertFalse((boolean)this.sender().hasPendingRequest());
        Assertions.assertEquals((int)0, (int)this.client().requests().size());
        Assertions.assertNull((Object)this.lastResponse());
        Assertions.assertNotNull((Object)this.lastException());
        Assertions.assertEquals(clazz, this.lastException().getClass());
        Assertions.assertFalse((boolean)this.client().isConnected(this.sourceNode().idString()));
    }

    private OffsetsForLeaderEpochRequest.Builder newRequest() {
        OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection epochs = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection();
        epochs.add((ImplicitLinkedHashCollection.Element)new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic().setTopic("topic").setPartitions(Collections.singletonList(new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(5).setLeaderEpoch(6).setCurrentLeaderEpoch(7))));
        return OffsetsForLeaderEpochRequest.Builder.forConsumer((OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection)epochs);
    }

    private OffsetsForLeaderEpochResponse newResponse() {
        OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
        OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult result = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic("topic").setPartitions(Collections.singletonList(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(5).setErrorCode(Errors.NONE.code()).setLeaderEpoch(6).setEndOffset(600L)));
        data.topics().add((ImplicitLinkedHashCollection.Element)result);
        return new OffsetsForLeaderEpochResponse(data);
    }

    private void onResponse(ClientResponse response) {
        this.verifyCallbackPending();
        this.lastResponse_$eq(response);
    }

    private void onException(Throwable t) {
        this.verifyCallbackPending();
        this.lastException_$eq(t);
    }

    private void clearState() {
        this.lastResponse_$eq(null);
        this.lastException_$eq(null);
    }

    private void verifyCallbackPending() {
        Assertions.assertNull((Object)this.lastResponse());
        Assertions.assertNull((Object)this.lastException());
        Assertions.assertTrue((boolean)this.sender().hasPendingRequest());
    }

    public ClusterLinkAsyncSenderTest() {
        this.socketTimeoutMs = 10000;
        this.requestTimeoutMs = 60000;
    }
}

