package io.confluent.kafka.server.plugins.ssl;

import com.sun.jna.platform.win32.COM.tlb.imp.TlbConst;
import com.sun.jna.platform.win32.WinError;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.network.CCloudTrafficType;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.ProxyProtocol;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.common.security.ssl.NettySslEngineFactory;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.test.ValuelessCallable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:io/confluent/kafka/server/plugins/ssl/ConfluentTrustManagerTransportLayerTest.class */
public class ConfluentTrustManagerTransportLayerTest {
    private static final long CONNECTIONS_MAX_REAUTH_MS_VALUE = 100;
    private static final int BUFFER_SIZE = 4096;
    private NioEchoServer server;
    private String brokerSessionUuId;
    private Selector selector;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private CredentialCache credentialCache;
    private String clientCN;
    private static Time time = Time.SYSTEM;
    private static Map<String, Object> pp2ValidateTrafficHeader = Collections.singletonMap(CCloudTrafficType.CCLOUD_TRAFFIC_TYPE_CLIENT_CONFIG, CCloudTrafficType.PL_PUBLIC_IP_NLB);
    private static Map<String, Object> pp2NoValidateTrafficHeader = Collections.singletonMap(CCloudTrafficType.CCLOUD_TRAFFIC_TYPE_CLIENT_CONFIG, CCloudTrafficType.PL_PRIVATE_LINK_NLB);
    private static final Time TIME = Time.SYSTEM;

    private void setupAndRunTest(boolean z, SslClientAuth sslClientAuth, String str, Class<?> cls, Map<String, Object> map, ValuelessCallable valuelessCallable) throws Exception {
        this.brokerSessionUuId = Uuid.randomUuid().toString();
        this.clientCN = str;
        this.credentialCache = new CredentialCache();
        CertStores certStores = new CertStores(true, "localhost", "localhost");
        CertStores certStores2 = new CertStores(false, str, str);
        this.saslServerConfigs = certStores.getTrustingConfig(certStores2);
        this.saslClientConfigs = certStores2.getTrustingConfig(certStores);
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        String configPrefix = ListenerName.forSecurityProtocol(securityProtocol).configPrefix();
        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
        if (z) {
            this.saslServerConfigs.put(configPrefix + "ssl.client.auth", sslClientAuth.name());
        } else {
            this.saslServerConfigs.put(configPrefix + "ssl.client.auth", SslClientAuth.REQUIRED.name());
        }
        this.saslServerConfigs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, SaslAuthenticatorTest.SaslSslPrincipalBuilder.class.getName());
        this.saslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, cls);
        this.saslServerConfigs.put(ConfluentConfigs.CCLOUD_REVOKED_CERTIFICATE_IDS_CONFIG, " ");
        this.saslServerConfigs.put(ConfluentConfigs.BROKER_SESSION_ID_PROP, this.brokerSessionUuId);
        createClient(z, securityProtocol, map);
        this.server = createEchoServer(z, securityProtocol);
        valuelessCallable.call();
    }

    private NioEchoServer createEchoServer(boolean z, SecurityProtocol securityProtocol) throws Exception {
        this.saslServerConfigs.put("security.providers", ConfluentTrustProviderCreator.class.getName());
        this.saslServerConfigs.put("ssl.trustmanager.algorithm", "ConfluentTls");
        return z ? NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, 100, time, new DelegationTokenCache(ScramMechanism.mechanismNames()), new ProxyProtocolEngineFactory(ProxyProtocol.V2)) : NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, TIME);
    }

    private void createClient(boolean z, SecurityProtocol securityProtocol, Map<String, Object> map) {
        ProxyProtocolEngineFactory proxyProtocolEngineFactory;
        LogContext logContext = new LogContext();
        String str = (String) this.saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
        if (z) {
            HashMap hashMap = new HashMap(this.saslClientConfigs);
            hashMap.put("confluent.proxy.protocol.client.address", "1.1.1.1");
            hashMap.put("confluent.proxy.protocol.client.port", Integer.valueOf(WinError.ERROR_BUS_RESET));
            hashMap.put("confluent.proxy.protocol.client.mode", ConfluentConfigs.PROXY_PROTOCOL_CLIENT_MODE_DEFAULT);
            hashMap.putAll(map);
            proxyProtocolEngineFactory = new ProxyProtocolEngineFactory(ProxyProtocol.V2, hashMap, Mode.CLIENT, logContext);
        } else {
            proxyProtocolEngineFactory = new ProxyProtocolEngineFactory(ProxyProtocol.NONE);
        }
        this.selector = NetworkTestUtils.createSelector(ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, new TestSecurityConfig(this.saslClientConfigs), null, str, time, true, logContext, null, proxyProtocolEngineFactory), time);
    }

    private TestJaasConfig configureMechanisms(String str, List<String> list) {
        this.saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, str);
        this.saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, list);
        this.saslServerConfigs.put(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, 100L);
        return TestJaasConfig.createConfiguration(str, list);
    }

    @AfterEach
    public void teardown() throws Exception {
        closeClientConnectionIfNecessary();
        if (this.server != null) {
            this.server.close();
        }
    }

    private void closeClientConnectionIfNecessary() throws Exception {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
    }

    private void checkAuthenticationFailed(boolean z) throws IOException, InterruptedException {
        createClientConnection(TlbConst.TYPELIB_MINOR_VERSION_SHELL);
        Assertions.assertEquals(SslAuthenticationException.class, NetworkTestUtils.waitForChannelClose(this.selector, TlbConst.TYPELIB_MINOR_VERSION_SHELL, ChannelState.State.AUTHENTICATION_FAILED).exception().getClass());
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    private void checkAuthenticationSucceed(boolean z) throws Exception {
        try {
            Assertions.assertEquals(Collections.emptyList(), this.server.selector().channels());
            createClientConnection(TlbConst.TYPELIB_MINOR_VERSION_SHELL);
            NetworkTestUtils.waitForChannelReady(this.selector, TlbConst.TYPELIB_MINOR_VERSION_SHELL);
            if (z) {
                Assertions.assertEquals(SaslAuthenticatorTest.SaslSslPrincipalBuilder.saslSslPrincipal("myuser", "ANONYMOUS"), this.server.selector().channels().get(0).principal());
            } else {
                Assertions.assertEquals(SaslAuthenticatorTest.SaslSslPrincipalBuilder.saslSslPrincipal("myuser", "O=A client,CN=" + this.clientCN), this.server.selector().channels().get(0).principal());
            }
            this.server.verifyAuthenticationMetrics(1, 0);
            NetworkTestUtils.checkClientConnection(this.selector, TlbConst.TYPELIB_MINOR_VERSION_SHELL, 100, 10);
            closeClientConnectionIfNecessary();
            TestUtils.waitForCondition(() -> {
                return this.server.selector().channels().isEmpty();
            }, "Channel not removed after disconnection");
        } catch (Throwable th) {
            closeClientConnectionIfNecessary();
            TestUtils.waitForCondition(() -> {
                return this.server.selector().channels().isEmpty();
            }, "Channel not removed after disconnection");
            throw th;
        }
    }

    private void createClientConnection(String str) throws IOException {
        this.selector.connect(str, new InetSocketAddress("localhost", this.server.port()), 4096, 4096);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.{0}.{1}.{2}")
    public void testSuccessfulHandshakeWithCCloudClient(boolean z, SslClientAuth sslClientAuth, Class<?> cls) throws Exception {
        setupAndRunTest(z, sslClientAuth, "client1.us-west-1.aws.confluent.cloud", cls, pp2ValidateTrafficHeader, () -> {
            checkAuthenticationSucceed(false);
        });
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.{0}.{1}.{2}")
    public void testHandshakeFailureWithLocalClient(boolean z, SslClientAuth sslClientAuth, Class<?> cls) throws Exception {
        setupAndRunTest(z, sslClientAuth, "localhost", cls, pp2ValidateTrafficHeader, () -> {
            checkAuthenticationFailed(z);
        });
    }

    @Test
    public void testSuccessfulHandshakeWithLocalClientPP2NoValidateMutualTls() throws Exception {
        setupAndRunTest(true, SslClientAuth.REQUESTED, "localhost", DefaultSslEngineFactory.class, pp2NoValidateTrafficHeader, () -> {
            checkAuthenticationSucceed(false);
        });
    }

    @Test
    public void testSuccessfulHandshakeWithLocalClientPP2NoValidateOneWayTls() throws Exception {
        setupAndRunTest(true, SslClientAuth.NONE, "localhost", DefaultSslEngineFactory.class, pp2NoValidateTrafficHeader, () -> {
            checkAuthenticationSucceed(true);
        });
    }

    public static Collection<Object[]> allCombinations() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{true, SslClientAuth.REQUESTED, DefaultSslEngineFactory.class});
        arrayList.add(new Object[]{true, SslClientAuth.NONE, DefaultSslEngineFactory.class});
        arrayList.add(new Object[]{false, SslClientAuth.NONE, DefaultSslEngineFactory.class});
        arrayList.add(new Object[]{true, SslClientAuth.REQUESTED, NettySslEngineFactory.class});
        arrayList.add(new Object[]{true, SslClientAuth.NONE, NettySslEngineFactory.class});
        arrayList.add(new Object[]{false, SslClientAuth.NONE, NettySslEngineFactory.class});
        return arrayList;
    }
}
