/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.network.netty.NettyClient;
import org.apache.kafka.common.network.netty.NettyClientTestBase;
import org.apache.kafka.common.network.netty.NettyHttp2Stream;
import org.apache.kafka.common.network.netty.NettyStream;
import org.apache.kafka.common.network.netty.TestNettyStreamHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class NettyStreamHandlerTest
extends NettyClientTestBase {
    private static NettyClient client;
    private NettyStream stream;
    private TestNettyStreamHandler handler;
    private CountDownLatch dataReceivedLatch;

    @BeforeEach
    public void setup() throws Exception {
        client = new NettyClient(NettyStreamHandlerTest.acceptAllSslContext(), this.eventLoopGroup, logContext);
        this.dataReceivedLatch = new CountDownLatch(1);
        this.handler = new TestNettyStreamHandler(this.dataReceivedLatch);
        this.stream = (NettyStream)client.createStream(NettyStreamHandlerTest.serverAddress(), Integer.valueOf(0x100000), Integer.valueOf(0x100000), (NettyStream.StreamHandler)this.handler, (Http2Headers)new DefaultHttp2Headers()).get();
    }

    @AfterEach
    public void closeStream() {
        if (this.stream.isOpen()) {
            this.stream.closeStream().awaitUninterruptibly();
        }
        client.shutdown().join();
    }

    @Test
    public void testIfStreamHandlerClosesWhenStreamCloses() {
        this.stream.closeStream().awaitUninterruptibly();
        Assertions.assertFalse((boolean)this.stream.isOpen(), (String)"Stream is not closed.");
        Assertions.assertTrue((boolean)this.handler.isStreamChannelClosed(), (String)"Stream handler did not get closed notification.");
        Assertions.assertTrue((boolean)this.handler.isClosed(), (String)"Stream handler is not closed.");
    }

    @Test
    public void testExceptionInChannelPipelineGetsHandled() throws InterruptedException {
        this.handler.throwOnDataReceived();
        byte[] message = "Hello, World!".getBytes();
        ByteBuf dataSent = Unpooled.wrappedBuffer((byte[])message);
        this.stream.send(dataSent, true).awaitUninterruptibly();
        boolean fetchedData = this.dataReceivedLatch.await(5L, TimeUnit.MINUTES);
        if (!fetchedData) {
            Assertions.fail((String)"handleData callback not invoked");
        }
        Assertions.assertTrue((boolean)this.handler.exceptionHandlerInvoked(), (String)"Exception handler not invoked on error case.");
    }

    @Test
    public void testWritabilityChangeEvent() throws InterruptedException {
        Channel channel = ((NettyHttp2Stream)this.stream).channel();
        ChannelPipeline pipeline = channel.pipeline();
        Semaphore waitForEventFire = new Semaphore(0);
        channel.eventLoop().execute(() -> {
            pipeline.fireChannelWritabilityChanged();
            waitForEventFire.release();
        });
        waitForEventFire.tryAcquire(1L, TimeUnit.MINUTES);
        Assertions.assertTrue((boolean)this.handler.isWritabilityChangedEventInvoked(), (String)"Writability changed event not invoked on error case.");
    }
}

