/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.util.ResourceLeakDetector;
import java.io.File;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.network.netty.NettyRawBytesStream;
import org.apache.kafka.common.network.netty.NettyStream;
import org.apache.kafka.common.network.netty.NettyStreamChannel;
import org.apache.kafka.common.network.netty.TestNettyStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class NettyStreamChannelTest {
    private static final ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
    private NettyStream.StreamHandler streamHandler;
    private Channel mockChannel;
    private TestNettyStream testNettyStream;
    private Send send;

    NettyStreamChannelTest() {
    }

    @BeforeEach
    void setUp() {
        this.streamHandler = (NettyStream.StreamHandler)Mockito.mock(NettyStream.StreamHandler.class);
        this.mockChannel = (Channel)Mockito.mock(Channel.class);
        Mockito.when((Object)this.mockChannel.close()).thenReturn((Object)((ChannelFuture)Mockito.mock(ChannelFuture.class)));
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.testNettyStream != null) {
            this.testNettyStream.close();
        }
        if (this.send != null) {
            this.send.release();
        }
    }

    @BeforeAll
    static void beforeAll() {
        ResourceLeakDetector.setLevel((ResourceLeakDetector.Level)ResourceLeakDetector.Level.PARANOID);
    }

    @AfterAll
    static void afterAll() {
        ResourceLeakDetector.setLevel((ResourceLeakDetector.Level)ResourceLeakDetector.Level.DISABLED);
    }

    @Test
    void testHasPendingWrites() throws IOException {
        EventLoop mockEventLoop = (EventLoop)Mockito.mock(EventLoop.class);
        Mockito.when((Object)this.mockChannel.eventLoop()).thenReturn((Object)mockEventLoop);
        Mockito.when((Object)mockEventLoop.inEventLoop()).thenReturn((Object)false);
        Mockito.when((Object)this.mockChannel.config()).thenReturn((Object)((ChannelConfig)Mockito.mock(ChannelConfig.class)));
        try (NettyRawBytesStream streamChannel = new NettyRawBytesStream(this.mockChannel, new LogContext());){
            Assertions.assertFalse((boolean)streamChannel.hasPendingWrites());
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true, 256, 128", "false, 256, 128", "true, 256, 256", "false, 256, 256", "true, 512, 128", "false, 512, 128", "true, 1024, 128", "false, 1024, 128"})
    void testFileTransferCompletesInSingleOperation(boolean offsetAligned, int onReadyThreshold, int fileTransferChunkSize) throws Exception {
        ByteBuffer[] byteBuffers = NettyStreamChannelTest.generateByteBuffers(3, 8, 12);
        try (FileRecords fileRecords = this.createFileRecords(byteBuffers, offsetAligned);){
            byte[] expectedOutput = NettyStreamChannelTest.readFileToByteArray(fileRecords.file());
            this.send = fileRecords.toSend();
            Assertions.assertTrue((this.send.size() < (long)onReadyThreshold ? 1 : 0) != 0);
            this.testCompleteWrite(onReadyThreshold, fileTransferChunkSize, 1, expectedOutput);
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true, 256, 128", "false, 256, 128", "true, 256, 256", "false, 256, 256", "true, 512, 128", "false, 512, 128", "true, 1024, 128", "false, 1024, 128"})
    void testFileTransferRequiresMultipleOperations(boolean offsetAligned, int onReadyThreshold, int fileTransferChunkSize) throws Exception {
        ByteBuffer[] byteBuffers = NettyStreamChannelTest.generateByteBuffers(6, fileTransferChunkSize, fileTransferChunkSize + 1);
        try (FileRecords fileRecords = this.createFileRecords(byteBuffers, offsetAligned);){
            byte[] expectedOutput = NettyStreamChannelTest.readFileToByteArray(fileRecords.file());
            this.send = fileRecords.toSend();
            Assertions.assertTrue((this.send.size() > (long)onReadyThreshold ? 1 : 0) != 0);
            this.testWriteMultipleTimes(onReadyThreshold, fileTransferChunkSize, 1, expectedOutput);
        }
    }

    @ParameterizedTest
    @CsvSource(value={"256, 64", "256, 128", "256, 256", "512, 128", "1024, 128"})
    void testByteBufferWriteCompletesInSingleCall(int onReadyThreshold, int byteBufferTransferChunkSize) throws Exception {
        ByteBuffer[] byteBuffers = NettyStreamChannelTest.generateByteBuffers(4, 60, 64);
        byte[] expectedOutput = NettyStreamChannelTest.toByteArray(byteBuffers);
        this.send = new ByteBufferSend(byteBuffers);
        Assertions.assertTrue((this.send.size() < (long)onReadyThreshold ? 1 : 0) != 0);
        this.testCompleteWrite(onReadyThreshold, 1, byteBufferTransferChunkSize, expectedOutput);
    }

    @ParameterizedTest
    @CsvSource(value={"256, 64", "256, 128", "256, 256", "512, 128", "1024, 128"})
    void testByteBufferWriteRequiresMultipleSendCalls(int onReadyThreshold, int byteBufferTransferChunkSize) throws Exception {
        ByteBuffer[] byteBuffers = NettyStreamChannelTest.generateByteBuffers(10, byteBufferTransferChunkSize, byteBufferTransferChunkSize + 1);
        byte[] expectedOutput = NettyStreamChannelTest.toByteArray(byteBuffers);
        this.send = new ByteBufferSend(byteBuffers);
        Assertions.assertTrue((this.send.size() > (long)onReadyThreshold ? 1 : 0) != 0);
        this.testWriteMultipleTimes(onReadyThreshold, 1, byteBufferTransferChunkSize, expectedOutput);
    }

    private void testCompleteWrite(int onReadyThreshold, int fileTransferChunkSize, int byteBufferTransferChunkSize, byte[] expectedOutput) throws IOException {
        this.testNettyStream = new TestNettyStream(onReadyThreshold, this.streamHandler, new LogContext(), fileTransferChunkSize, byteBufferTransferChunkSize);
        this.send.writeTo((TransferableChannel)this.testNettyStream);
        Assertions.assertTrue((this.testNettyStream.flushCount() > 0 ? 1 : 0) != 0, (String)"flushCount should be greater than 0");
        this.testNettyStream.peerReceivedAllPendingWrites();
        this.verifySendComplete(this.send, fileTransferChunkSize, byteBufferTransferChunkSize, expectedOutput, false);
    }

    private void testWriteMultipleTimes(int onReadyThreshold, int fileTransferChunkSize, int byteBufferTransferChunkSize, byte[] expectedOutput) throws IOException {
        this.testNettyStream = new TestNettyStream(onReadyThreshold, this.streamHandler, new LogContext(), fileTransferChunkSize, byteBufferTransferChunkSize);
        this.send.writeTo((TransferableChannel)this.testNettyStream);
        Assertions.assertEquals((int)0, (int)this.testNettyStream.flushCount(), (String)"flushCount should be 0");
        this.verifySendIncomplete(this.send, onReadyThreshold, fileTransferChunkSize, byteBufferTransferChunkSize);
        this.send.writeTo((TransferableChannel)this.testNettyStream);
        Assertions.assertEquals((int)0, (int)this.testNettyStream.flushCount(), (String)"flushCount should be 0");
        this.verifySendIncomplete(this.send, onReadyThreshold, fileTransferChunkSize, byteBufferTransferChunkSize);
        do {
            this.testNettyStream.flush();
            this.testNettyStream.peerReceivedAllPendingWrites();
            this.send.writeTo((TransferableChannel)this.testNettyStream);
        } while (!this.send.completed());
        this.testNettyStream.peerReceivedAllPendingWrites();
        this.verifySendComplete(this.send, fileTransferChunkSize, byteBufferTransferChunkSize, expectedOutput, true);
    }

    @Test
    void testIsOpen() throws IOException {
        EventLoop mockEventLoop = (EventLoop)Mockito.mock(EventLoop.class);
        Mockito.when((Object)this.mockChannel.eventLoop()).thenReturn((Object)mockEventLoop);
        Mockito.when((Object)mockEventLoop.inEventLoop()).thenReturn((Object)false);
        Mockito.when((Object)this.mockChannel.config()).thenReturn((Object)((ChannelConfig)Mockito.mock(ChannelConfig.class)));
        try (NettyRawBytesStream streamChannel = new NettyRawBytesStream(this.mockChannel, new LogContext());){
            Mockito.when((Object)this.mockChannel.isOpen()).thenReturn((Object)true).thenReturn((Object)false);
            Assertions.assertTrue((boolean)streamChannel.isOpen());
            Assertions.assertFalse((boolean)streamChannel.isOpen());
            ((Channel)Mockito.verify((Object)this.mockChannel, (VerificationMode)Mockito.times((int)2))).isOpen();
        }
    }

    @Test
    void testClose() throws IOException {
        EventLoop mockEventLoop = (EventLoop)Mockito.mock(EventLoop.class);
        Mockito.when((Object)this.mockChannel.eventLoop()).thenReturn((Object)mockEventLoop);
        Mockito.when((Object)mockEventLoop.inEventLoop()).thenReturn((Object)false);
        Mockito.when((Object)this.mockChannel.config()).thenReturn((Object)((ChannelConfig)Mockito.mock(ChannelConfig.class)));
        NettyRawBytesStream streamChannel = new NettyRawBytesStream(this.mockChannel, new LogContext());
        streamChannel.close();
        ((Channel)Mockito.verify((Object)this.mockChannel)).close();
    }

    @Test
    void testCloseFailsIfInEventLoop() {
        EventLoop mockEventLoop = (EventLoop)Mockito.mock(EventLoop.class);
        Mockito.when((Object)this.mockChannel.eventLoop()).thenReturn((Object)mockEventLoop);
        Mockito.when((Object)mockEventLoop.inEventLoop()).thenReturn((Object)true);
        Mockito.when((Object)this.mockChannel.config()).thenReturn((Object)((ChannelConfig)Mockito.mock(ChannelConfig.class)));
        NettyRawBytesStream streamChannel = new NettyRawBytesStream(this.mockChannel, new LogContext());
        Assertions.assertThrows(IllegalThreadStateException.class, () -> NettyStreamChannelTest.lambda$testCloseFailsIfInEventLoop$0((NettyStreamChannel)streamChannel));
    }

    private void verifySendComplete(Send send, int fileTransferChunkSize, int byteBufferTransferChunkSize, byte[] expectedOutput, boolean expectedReadyNotification) {
        Assertions.assertTrue((boolean)send.completed());
        if (expectedReadyNotification) {
            ((NettyStream.StreamHandler)Mockito.verify((Object)this.streamHandler, (VerificationMode)Mockito.atLeastOnce())).handleReadyForSend();
        } else {
            ((NettyStream.StreamHandler)Mockito.verify((Object)this.streamHandler, (VerificationMode)Mockito.never())).handleReadyForSend();
        }
        int chunkSize = send instanceof ByteBufferSend ? byteBufferTransferChunkSize : fileTransferChunkSize;
        Assertions.assertTrue((this.testNettyStream.flushCount() > 0 ? 1 : 0) != 0, (String)"flushCount should be greater than 0");
        ((NettyStream.StreamHandler)Mockito.verify((Object)this.streamHandler, (VerificationMode)Mockito.never())).handleException((Throwable)ArgumentMatchers.any());
        Assertions.assertEquals((double)Math.ceil((double)send.size() / (double)chunkSize), (double)this.testNettyStream.peerReceives().size());
        Assertions.assertArrayEquals((byte[])expectedOutput, (byte[])NettyStreamChannelTest.toByteArray(this.testNettyStream.peerReceives()));
        Assertions.assertTrue((boolean)this.testNettyStream.pendingWrites().isEmpty());
    }

    private void verifySendIncomplete(Send send, int onReadyThreshold, int fileTransferChunkSize, int byteBufferTransferChunkSize) {
        Assertions.assertFalse((boolean)send.completed());
        int chunkSize = send instanceof ByteBufferSend ? byteBufferTransferChunkSize : fileTransferChunkSize;
        ((NettyStream.StreamHandler)Mockito.verify((Object)this.streamHandler, (VerificationMode)Mockito.never())).handleException((Throwable)ArgumentMatchers.any());
        ((NettyStream.StreamHandler)Mockito.verify((Object)this.streamHandler, (VerificationMode)Mockito.never())).handleReadyForSend();
        Assertions.assertEquals((int)0, (int)this.testNettyStream.flushCount());
        Assertions.assertTrue((boolean)this.testNettyStream.peerReceives().isEmpty(), (String)"pendingSends should be empty");
        Assertions.assertTrue(((double)this.testNettyStream.pendingWrites().size() >= (double)onReadyThreshold / (double)chunkSize ? 1 : 0) != 0);
    }

    private static ByteBuffer[] generateByteBuffers(int length, int minSizeInclusive, int maxSizeExclusive) {
        ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>(length);
        for (int i = 0; i < length; ++i) {
            buffers.add(NettyStreamChannelTest.generateByteBuffer(RANDOM.nextInt(minSizeInclusive, maxSizeExclusive)));
        }
        return buffers.toArray(new ByteBuffer[0]);
    }

    private static byte[] toByteArray(Collection<ByteBuf> byteBufs) {
        CompositeByteBuf byteBuf = new CompositeByteBuf(ByteBufAllocator.DEFAULT, false, Integer.MAX_VALUE, byteBufs);
        int totalSize = byteBuf.readableBytes();
        byte[] byteArray = new byte[totalSize];
        byteBuf.readBytes(byteArray);
        return byteArray;
    }

    private static byte[] toByteArray(ByteBuffer[] buffers) {
        int totalSize = Arrays.stream(buffers).mapToInt(Buffer::remaining).sum();
        byte[] byteArray = new byte[totalSize];
        int offset = 0;
        for (ByteBuffer buffer : buffers) {
            int remaining = buffer.remaining();
            buffer.mark();
            buffer.get(byteArray, offset, remaining);
            buffer.reset();
            offset += remaining;
        }
        return byteArray;
    }

    private static ByteBuffer generateByteBuffer(int size) {
        ByteBuffer buffer = ByteBuffer.allocate(size);
        for (int i = 0; i < size; ++i) {
            buffer.put((byte)RANDOM.nextInt(256));
        }
        buffer.flip();
        return buffer;
    }

    private FileRecords createFileRecords(ByteBuffer[] values, boolean offsetAligned) throws IOException {
        FileRecords fileRecords = FileRecords.open((File)TestUtils.tempFile());
        if (offsetAligned) {
            this.append(fileRecords, values);
        } else {
            this.appendUnaligned(fileRecords, values);
        }
        return fileRecords;
    }

    private void append(FileRecords fileRecords, ByteBuffer[] values) throws IOException {
        long offset = 0L;
        for (ByteBuffer value : values) {
            ByteBuffer buffer = ByteBuffer.allocate(128);
            try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)offset);){
                builder.appendWithOffset(offset++, System.currentTimeMillis(), null, value);
                fileRecords.append(builder.build());
            }
        }
        fileRecords.flush();
    }

    private void appendUnaligned(FileRecords fileRecords, ByteBuffer[] values) throws IOException {
        for (ByteBuffer value : values) {
            fileRecords.append(MemoryRecords.withRecords((Compression)Compression.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(value)}));
        }
        fileRecords.flush();
    }

    public static byte[] readFileToByteArray(File file) throws IOException {
        Path path = file.toPath();
        return Files.readAllBytes(path);
    }

    private static /* synthetic */ void lambda$testCloseFailsIfInEventLoop$0(NettyStreamChannel streamChannel) throws Throwable {
        streamChannel.close();
    }
}

