/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.network.netty.NettyClient;
import org.apache.kafka.common.network.netty.NettyClientTestBase;
import org.apache.kafka.common.network.netty.NettyHttp2Connection;
import org.apache.kafka.common.network.netty.NettyHttp2Stream;
import org.apache.kafka.common.network.netty.NettyRawBytesStream;
import org.apache.kafka.common.network.netty.NettyStream;
import org.apache.kafka.common.network.netty.TestNettyStreamHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientTest
extends NettyClientTestBase {
    private static final Logger log = LoggerFactory.getLogger(NettyClientTest.class);
    private NettyClient client;

    @BeforeEach
    public void setupClient() {
        this.client = new NettyClient(NettyClientTest.acceptAllSslContext(), this.eventLoopGroup, logContext);
    }

    @AfterEach
    public void shutdownClient() {
        this.client.shutdown().join();
    }

    @Test
    public void testMultipleShutdown() {
        this.client.shutdown().join();
        this.client.shutdown().join();
    }

    @Test
    public void testSerialMultipleStreamsCreation() throws Exception {
        CountDownLatch dataReceivedLatch = new CountDownLatch(1);
        TestNettyStreamHandler firstStreamHandler = new TestNettyStreamHandler(dataReceivedLatch);
        CompletableFuture firstStreamFuture = this.client.createStream(NettyClientTest.serverAddress(), Integer.valueOf(0x100000), Integer.valueOf(0x100000), (NettyStream.StreamHandler)firstStreamHandler, (Http2Headers)new DefaultHttp2Headers());
        NettyHttp2Stream firstStream = (NettyHttp2Stream)firstStreamFuture.get();
        TestNettyStreamHandler secondStreamHandler = new TestNettyStreamHandler(dataReceivedLatch);
        CompletableFuture secondStreamFuture = this.client.createStream(NettyClientTest.serverAddress(), Integer.valueOf(0x100000), Integer.valueOf(0x100000), (NettyStream.StreamHandler)secondStreamHandler, (Http2Headers)new DefaultHttp2Headers());
        NettyHttp2Stream secondStream = (NettyHttp2Stream)secondStreamFuture.get();
        Assertions.assertSame((Object)firstStream.channel().parent(), (Object)secondStream.channel().parent());
        byte[] message = "Hello, World!".getBytes();
        ByteBuf dataSent = Unpooled.wrappedBuffer((byte[])message);
        firstStream.send(dataSent, true).sync().get();
        boolean fetchedData = dataReceivedLatch.await(30L, TimeUnit.SECONDS);
        if (!fetchedData) {
            Assertions.fail((String)"Failed to receive data");
        }
        ByteBuf receivedData = firstStreamHandler.receivedData();
        Assertions.assertArrayEquals((byte[])message, (byte[])ByteBufUtil.getBytes((ByteBuf)receivedData));
    }

    @Test
    public void testConcurrentMultipleStreamsCreation() throws Exception {
        CountDownLatch dataReceivedLatch = new CountDownLatch(1);
        TestNettyStreamHandler firstStreamHandler = new TestNettyStreamHandler(dataReceivedLatch);
        CompletableFuture firstStreamFuture = this.client.createStream(NettyClientTest.serverAddress(), Integer.valueOf(0x100000), Integer.valueOf(0x100000), (NettyStream.StreamHandler)firstStreamHandler, (Http2Headers)new DefaultHttp2Headers());
        TestNettyStreamHandler secondStreamHandler = new TestNettyStreamHandler(dataReceivedLatch);
        CompletableFuture secondStreamFuture = this.client.createStream(NettyClientTest.serverAddress(), Integer.valueOf(0x100000), Integer.valueOf(0x100000), (NettyStream.StreamHandler)secondStreamHandler, (Http2Headers)new DefaultHttp2Headers());
        NettyHttp2Stream firstStream = (NettyHttp2Stream)firstStreamFuture.get();
        NettyHttp2Stream secondStream = (NettyHttp2Stream)secondStreamFuture.get();
        Assertions.assertSame((Object)firstStream.channel().parent(), (Object)secondStream.channel().parent());
    }

    @Test
    public void testSingleStreamMessageExchange() throws Exception {
        CountDownLatch dataReceivedLatch = new CountDownLatch(1);
        NettyStream stream = null;
        try (TestNettyStreamHandler handler = new TestNettyStreamHandler(dataReceivedLatch);){
            stream = (NettyStream)this.client.createStream(NettyClientTest.serverAddress(), (NettyStream.StreamHandler)handler, (Http2Headers)new DefaultHttp2Headers()).get();
            byte[] message = "Hello, World!".getBytes();
            ByteBuf dataSent = Unpooled.wrappedBuffer((byte[])message);
            stream.send(dataSent, true).sync().get();
            boolean fetchedData = dataReceivedLatch.await(30L, TimeUnit.SECONDS);
            if (!fetchedData) {
                Assertions.fail((String)"Failed to receive data");
            }
            ByteBuf receivedData = handler.receivedData();
            Assertions.assertArrayEquals((byte[])message, (byte[])ByteBufUtil.getBytes((ByteBuf)receivedData));
        }
        catch (Exception ex) {
            log.error("Error in testSingleMessageExchange", (Throwable)ex);
            throw ex;
        }
        finally {
            if (stream != null) {
                stream.closeStream().get();
            }
        }
    }

    @Test
    public void testMultipleStreamMessageExchanges() throws ExecutionException, InterruptedException {
        int numStreams = 10;
        CountDownLatch dataReceivedLatch = new CountDownLatch(numStreams);
        ArrayList<NettyHttp2Connection> connections = new ArrayList<NettyHttp2Connection>(numStreams);
        ArrayList<NettyHttp2Stream> streams = new ArrayList<NettyHttp2Stream>(numStreams);
        List<TestNettyStreamHandler> handlers = IntStream.range(0, numStreams).mapToObj(ignored -> new TestNettyStreamHandler(dataReceivedLatch)).collect(Collectors.toUnmodifiableList());
        try {
            for (TestNettyStreamHandler testNettyStreamHandler : handlers) {
                NettyHttp2Connection connection = (NettyHttp2Connection)this.client.createConnection(NettyClientTest.serverAddress(), Integer.valueOf(0x100000), Integer.valueOf(0x100000)).get();
                connections.add(connection);
                Future streamFuture = connection.createStream((Http2Headers)new DefaultHttp2Headers(), (NettyStream.StreamHandler)testNettyStreamHandler);
                streams.add((NettyHttp2Stream)streamFuture.get());
            }
            byte[] message = "Hello, World!".getBytes();
            for (NettyHttp2Stream stream : streams) {
                ByteBuf dataSent = Unpooled.wrappedBuffer((byte[])message);
                stream.send(dataSent, true).sync().get();
            }
            boolean bl = dataReceivedLatch.await(30L, TimeUnit.SECONDS);
            if (!bl) {
                Assertions.fail((String)"Failed to receive data");
            }
            for (TestNettyStreamHandler handler : handlers) {
                ByteBuf receivedData = handler.receivedData();
                Assertions.assertArrayEquals((byte[])message, (byte[])ByteBufUtil.getBytes((ByteBuf)receivedData));
            }
        }
        catch (Exception ex) {
            log.error("Error in testMultipleMessageExchanges", (Throwable)ex);
            throw ex;
        }
        finally {
            streams.stream().map(NettyRawBytesStream::closeStream).forEach(closeFuture -> {
                try {
                    closeFuture.get();
                }
                catch (Exception e) {
                    log.error("Error closing stream", (Throwable)e);
                }
            });
            handlers.forEach(nettyStreamHandler -> Assertions.assertTrue((boolean)nettyStreamHandler.isClosed(), (String)"Stream handler wasn't closed when stream channel got closed."));
            connections.stream().map(NettyHttp2Connection::close).forEach(closeFuture -> {
                try {
                    closeFuture.get();
                }
                catch (Exception e) {
                    log.error("Error closing connection", (Throwable)e);
                }
            });
        }
    }
}

