/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.ResourceLeakDetector;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.common.network.netty.NettyServer;
import org.apache.kafka.common.network.netty.NettyStream;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;

public class NettyRawBytesStreamHandlesExceptionTest {
    private static LogContext logContext;
    private static Logger log;
    private EventLoopGroup clientEventLoopGroup;
    private NettyServer nettyServer;
    private CountDownLatch latch;

    @BeforeEach
    void setUp() {
        logContext = new LogContext("[K2proxy Exception Test]");
        log = logContext.logger(NettyRawBytesStreamHandlesExceptionTest.class);
        ResourceLeakDetector.setLevel((ResourceLeakDetector.Level)ResourceLeakDetector.Level.ADVANCED);
    }

    @AfterEach
    void tearDown() {
        if (this.clientEventLoopGroup != null) {
            this.clientEventLoopGroup.shutdownGracefully();
        }
        if (this.nettyServer != null) {
            this.nettyServer.shutdown().join();
        }
    }

    @Test
    public void testHandlesException() throws Exception {
        InetSocketAddress listeningAddress;
        this.latch = new CountDownLatch(1);
        ByteBuf firstMessage = Unpooled.wrappedBuffer((byte[])"qwertyuiop".getBytes(StandardCharsets.US_ASCII));
        try {
            listeningAddress = this.startNettyServer();
        }
        catch (InterruptedException | ExecutionException e) {
            Assertions.fail((String)("Failed to start Netty server: " + e.getMessage()));
            return;
        }
        Channel channel = this.startNettyClient(listeningAddress);
        channel.writeAndFlush((Object)firstMessage);
        boolean fetchedData = this.latch.await(10L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)fetchedData, (String)"Failed to receive response from the server within the timeout period.");
    }

    private InetSocketAddress startNettyServer() throws Exception {
        this.nettyServer = NettyServer.Builder.newServerBuilder((LogContext)logContext, (String)"echoing-kafka-cluster-connections-manager", null, this.getErroringStreamHandler(), (NettyServer.NettyServerProtocol)NettyServer.NettyServerProtocol.SOCKET, (boolean)false).withBossThreads(1).withWorkerThreads(4).build();
        InetSocketAddress socketAddress = new InetSocketAddress("localhost", 0);
        InetSocketAddress actualSocketAddress = (InetSocketAddress)this.nettyServer.start((SocketAddress)socketAddress).get();
        log.info("Started Netty Echo Server on ... {}", (Object)actualSocketAddress);
        return actualSocketAddress;
    }

    BiFunction<NettyStream, Http2Headers, NettyStream.StreamHandler> getErroringStreamHandler() {
        return (stream, headers) -> new ErroringStreamHandler(this.latch);
    }

    private Channel startNettyClient(InetSocketAddress listeningAddress) throws ExecutionException, InterruptedException {
        this.clientEventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)bootstrap.group(this.clientEventLoopGroup)).channel(NioSocketChannel.class)).option(ChannelOption.SO_KEEPALIVE, (Object)true)).option(ChannelOption.TCP_NODELAY, (Object)true)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) {
            }
        });
        ChannelFuture clientFuture = bootstrap.connect(listeningAddress.getHostName(), listeningAddress.getPort());
        log.info("Starting Netty Echo Client...");
        clientFuture.get();
        return clientFuture.channel();
    }

    private static class ErroringStreamHandler
    implements NettyStream.StreamHandler {
        private final CountDownLatch internalLatch;
        private final RuntimeException exception;

        public ErroringStreamHandler(CountDownLatch latch) {
            this.internalLatch = latch;
            this.exception = new RuntimeException("Throw in handle Data");
        }

        public void handleData(ByteBuf data) {
            throw this.exception;
        }

        public void handleReadyForSend() {
        }

        public void handleException(Throwable cause) {
            Assertions.assertEquals((Object)this.exception, (Object)cause);
            this.internalLatch.countDown();
        }

        public void handleClose() {
        }
    }
}

