package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.DefaultRequestCallbackManager;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/common/network/PlaintextTransportLayerTest.class */
public class PlaintextTransportLayerTest {
    private static final int BUFFER_SIZE = 4096;
    private static final Time TIME = Time.SYSTEM;
    private static final String V1_PROXY_PROTOCOL_HEADER = NetworkTestUtils.toTcp4ProxyV1Header("200.200.200.200", "201.201.201.201", 8888, 9092);
    private NioEchoServer server;
    private Selector selector;
    private final ConfigDef proxyConfigDef = new ConfigDef().define("confluent.proxy.protocol.version", ConfigDef.Type.STRING, ConfluentConfigs.PROXY_PROTOCOL_VERSION_DEFAULT, ConfigDef.Importance.LOW, "The version of the PROXY protocol that the broker will use, or NONE if the PROXY protocol will not be used. This value can be overridden for each listener by prefixing the config with the normalized listener name. For example, to enable V1 of the protocol for the EXTERNAL listener, set <code>listener.name.external.confluent.proxy.protocol.version=V1</code>. Because the inter-broker listener and any admin listeners will likely not be accessed through a proxy, the typical configuration would be to use the default value of NONE and enable the protocol explicitly for a subset of listeners. See http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt for more information on the PROXY protocol.").define("confluent.proxy.protocol.fallback.enabled", ConfigDef.Type.BOOLEAN, ConfluentConfigs.PROXY_PROTOCOL_FALLBACK_ENABLED_DEFAULT, ConfigDef.Importance.LOW, "If true, and if <code>confluent.proxy.protocol.version</code> is not set to <code>NONE</code>, the network layer will accept both PROXY and non-PROXY protocol requests. If the PROXY header is present, it will parse it and interpret the specified client IP address and port appropriately; if not, it will fall back to a normal, non-PROXY request. This value can be overridden for each listener by prefixing the config with the normalized listener name.").defineInternal("confluent.multitenant.parse.sni.host.name.enable", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "When enabled, parse the SNI host name upon SASL_SSL connection establishment.");

    @AfterEach
    public void tearDown() throws IOException, InterruptedException {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
    }

    @Test
    public void testWithProxyProtocolEngine() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        AbstractConfig abstractConfig = new AbstractConfig(new ConfigDef(), Collections.emptyMap());
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        InetAddress byName = InetAddress.getByName("192.168.5.5");
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, abstractConfig, "localhost", ChannelBuilders.serverChannelBuilder(forSecurityProtocol, false, securityProtocol, abstractConfig, (CredentialCache) null, (DelegationTokenCache) null, TIME, new LogContext(), () -> {
            return TestUtils.confluentCloudApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        }, new DefaultRequestCallbackManager(), new ProxyProtocolEngineFactory(() -> {
            return new TestProxyProtocolEngine(byName, 31313);
        })), null, TIME);
        this.server.start();
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port());
        this.selector = createSelector(abstractConfig.originals());
        this.selector.connect(KafkaChannelTest.CHANNEL_ID, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
        NetworkTestUtils.checkClientConnection(this.selector, KafkaChannelTest.CHANNEL_ID, 100, 10);
        verifyServerProxyAddress(this.server, byName);
    }

    @Test
    public void testWithProxyProtocolV1Engine() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        AbstractConfig abstractConfig = new AbstractConfig(this.proxyConfigDef, Collections.singletonMap("confluent.proxy.protocol.version", "V1"));
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, abstractConfig, "localhost", ChannelBuilders.serverChannelBuilder(forSecurityProtocol, false, securityProtocol, abstractConfig, (CredentialCache) null, (DelegationTokenCache) null, TIME, new LogContext(), () -> {
            return TestUtils.confluentCloudApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        }, new DefaultRequestCallbackManager()), null, TIME);
        this.server.start();
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port());
        for (int i = 0; i < 3; i++) {
            this.selector = createSelector(abstractConfig.originals());
            this.selector.connect(KafkaChannelTest.CHANNEL_ID, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
            NetworkTestUtils.checkClientConnection(this.selector, KafkaChannelTest.CHANNEL_ID, 100, 10, V1_PROXY_PROTOCOL_HEADER);
            verifyServerProxyAddress(this.server, InetAddress.getByName("200.200.200.200"));
        }
    }

    @Test
    public void testProxyProtocolWithFallbackEnabled() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.proxy.protocol.version", "V1");
        hashMap.put("confluent.proxy.protocol.fallback.enabled", true);
        AbstractConfig abstractConfig = new AbstractConfig(this.proxyConfigDef, hashMap);
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, abstractConfig, "localhost", ChannelBuilders.serverChannelBuilder(forSecurityProtocol, false, securityProtocol, abstractConfig, (CredentialCache) null, (DelegationTokenCache) null, TIME, new LogContext(), () -> {
            return TestUtils.confluentCloudApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        }, new DefaultRequestCallbackManager()), null, TIME);
        this.server.start();
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port());
        for (int i = 0; i < 3; i++) {
            Selector createSelector = createSelector(abstractConfig.originals());
            createSelector.connect(KafkaChannelTest.CHANNEL_ID, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
            NetworkTestUtils.checkClientConnection(createSelector, KafkaChannelTest.CHANNEL_ID, 100, 10);
        }
    }

    @Test
    public void testProxyProtocolWithFallbackDisabled() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.proxy.protocol.version", "V1");
        AbstractConfig abstractConfig = new AbstractConfig(this.proxyConfigDef, hashMap);
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, abstractConfig, "localhost", ChannelBuilders.serverChannelBuilder(forSecurityProtocol, false, securityProtocol, abstractConfig, (CredentialCache) null, (DelegationTokenCache) null, TIME, new LogContext(), () -> {
            return TestUtils.confluentCloudApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        }, new DefaultRequestCallbackManager()), null, TIME);
        this.server.start();
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port());
        Selector createSelector = createSelector(abstractConfig.originals());
        createSelector.connect(KafkaChannelTest.CHANNEL_ID, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
        NetworkTestUtils.waitForChannelReady(createSelector, KafkaChannelTest.CHANNEL_ID);
        createSelector.send(new NetworkSend(KafkaChannelTest.CHANNEL_ID, ByteBufferSend.sizePrefixed(ByteBuffer.wrap((TestUtils.randomString(100) + "-0").getBytes(StandardCharsets.UTF_8)))));
        TestUtils.waitForCondition(() -> {
            createSelector.poll(0L);
            return createSelector.disconnected().size() == 1;
        }, "Client should have been disconnected because PROXY header was not sent");
    }

    @Test
    public void testDisconnectWhileProcessingHeaders() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        AbstractConfig abstractConfig = new AbstractConfig(this.proxyConfigDef, Collections.singletonMap("confluent.proxy.protocol.version", "V1"));
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        TestProxyProtocolEngine testProxyProtocolEngine = new TestProxyProtocolEngine(InetAddress.getByName("192.168.5.5"), 31313, false);
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, abstractConfig, "localhost", ChannelBuilders.serverChannelBuilder(forSecurityProtocol, false, securityProtocol, abstractConfig, (CredentialCache) null, (DelegationTokenCache) null, TIME, new LogContext(), () -> {
            return TestUtils.confluentCloudApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        }, new DefaultRequestCallbackManager(), new ProxyProtocolEngineFactory(() -> {
            return testProxyProtocolEngine;
        })), null, TIME);
        this.server.start();
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port());
        this.selector = createSelector(abstractConfig.originals());
        this.selector.connect(KafkaChannelTest.CHANNEL_ID, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
        this.selector.send(new NetworkSend(KafkaChannelTest.CHANNEL_ID, new ByteBufferSend(new ByteBuffer[]{ByteBuffer.wrap("PRO".getBytes(StandardCharsets.US_ASCII))})));
        this.selector.close();
        TestUtils.waitForCondition(() -> {
            return this.server.selector().channels().isEmpty();
        }, "Server should have closed connection");
        Assertions.assertTrue(testProxyProtocolEngine.processCallCount() <= 1);
    }

    private Selector createSelector(Map<String, Object> map) {
        LogContext logContext = new LogContext();
        PlaintextChannelBuilder plaintextChannelBuilder = new PlaintextChannelBuilder(Mode.CLIENT, (ListenerName) null, new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
        plaintextChannelBuilder.configure(map);
        return new Selector(5000L, new Metrics(), TIME, "MetricGroup", plaintextChannelBuilder, logContext);
    }

    private static void verifyServerProxyAddress(NioEchoServer nioEchoServer, InetAddress inetAddress) {
        Selector selector = nioEchoServer.selector();
        Assertions.assertNotEquals(Collections.emptyList(), selector.channels());
        selector.channels().forEach(kafkaChannel -> {
            Assertions.assertEquals(inetAddress, kafkaChannel.socketAddress());
        });
    }
}
