package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/common/network/ProxyProtocolV1EngineTest.class */
public class ProxyProtocolV1EngineTest extends AbstractProxyProtocolEngineTest {
    @Test
    public void testProxyProtocolV1EngineBasic() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        newServerPPE.processHeaders(putStream("PROXY TCP4 200.200.200.200 201.201.201.201 8888 9092\r\n"));
        Assertions.assertTrue(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        Assertions.assertEquals(newServerPPE.clientAddress(), InetAddress.getByName("200.200.200.200"));
        Assertions.assertEquals(newServerPPE.clientPort(), 8888);
    }

    @Test
    public void testProxyProtocolV1EngineIpv6() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        newServerPPE.processHeaders(putStream("PROXY TCP6 2001:db8:85a3:8d3:1319:8a2e:370:7348 2001:db8:85a3:8d3:1319:8a2e:370:7349 8888 9092\r\n"));
        Assertions.assertTrue(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        Assertions.assertEquals(newServerPPE.clientAddress(), InetAddress.getByName("2001:db8:85a3:8d3:1319:8a2e:370:7348"));
        Assertions.assertEquals(newServerPPE.clientPort(), 8888);
    }

    @Test
    public void testProxyProtocolV1EngineWithMultipleSegments() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        ByteBuffer allocate = ByteBuffer.allocate(1000);
        putStreamAndFlip(allocate, "PRO");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertFalse(newServerPPE.ready());
        putStreamAndFlip(allocate, "XY TCP4 200.200.200.200 201.201.201.201 88");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertFalse(newServerPPE.ready());
        putStreamAndFlip(allocate, "88 9092\r\n");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertTrue(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        Assertions.assertEquals(newServerPPE.clientAddress(), InetAddress.getByName("200.200.200.200"));
        Assertions.assertEquals(newServerPPE.clientPort(), 8888);
    }

    @Test
    public void TestProxyProtocolV1EngineVerifyRemaining() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        ByteBuffer putStream = putStream("PROXY TCP4 200.200.200.200 201.201.201.201 8888 9092\r\nhello");
        newServerPPE.processHeaders(putStream);
        Assertions.assertTrue(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        Assertions.assertEquals(newServerPPE.clientAddress(), InetAddress.getByName("200.200.200.200"));
        Assertions.assertEquals(newServerPPE.clientPort(), 8888);
        Assertions.assertEquals(5, putStream.remaining());
    }

    @Test
    public void testNonProxyHeaderFallbackEnabled() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        newServerPPE.configure(Collections.singletonMap("confluent.proxy.protocol.fallback.enabled", true));
        ByteBuffer putStream = putStream("NOT A PROXY HEADER");
        newServerPPE.processHeaders(putStream);
        putStream.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        putStream.flip();
        Assertions.assertEquals(18, putStream.remaining());
    }

    @Test
    public void testNonProxyHeaderWithMultipleSegments() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        newServerPPE.configure(Collections.singletonMap("confluent.proxy.protocol.fallback.enabled", true));
        ByteBuffer allocate = ByteBuffer.allocate(1000);
        putStreamAndFlip(allocate, "NOT");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertFalse(newServerPPE.ready());
        putStreamAndFlip(allocate, " ENOUGH BYTES IN FIRST CALL");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        allocate.flip();
        Assertions.assertEquals(30, allocate.remaining());
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected ProxyProtocolEngine newServerPPE() {
        return new ProxyProtocolV1Engine(ConnectionMode.SERVER, new LogContext());
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected ProxyProtocolEngine newClientPPE() {
        return new ProxyProtocolV1Engine(ConnectionMode.CLIENT, new LogContext());
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected byte[] generateHeader(String str) {
        return str.getBytes(StandardCharsets.US_ASCII);
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected List<String> getInvalidProxyHeaders() {
        return Arrays.asList("PROXY TCP4 \r\n", "PROXY TCP4 201.201.201.201 8888\r\n", "PROXY TCP4 200.200.200.200 201.201.201.201 8888 9092\r\r", "PROXY SMTP 200.200.200.200 201.201.201.201 8888 9092\r\n", "PROXY TCP hello.world 201.201.201.201 8888 9092\r\n", "PROXY TCP4                                                                                         200.200.200.200 201.201.201.201 8888 9092\n", "NOT A PROXY HEADER\n");
    }
}
