package io.confluent.kafka.server.plugins.ssl;

import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.ProxyProtocol;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.SslChannelBuilder;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.DefaultRequestCallbackManager;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.common.security.ssl.NettySslEngineFactory;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:io/confluent/kafka/server/plugins/ssl/ConfluentTrustManagerInterBrokerTest.class */
public class ConfluentTrustManagerInterBrokerTest {
    private static final int BUFFER_SIZE = 4096;
    private static final Time TIME = Time.SYSTEM;
    private NioEchoServer server;
    private Selector selector;
    private Map<String, Object> sslClientConfigs;
    private Map<String, Object> sslServerConfigs;

    private void setupInterBroker(String str, String str2, String str3, Class<?> cls) throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
        CertStores certStores = new CertStores(true, str2, str2);
        SecurityTestUtils.clearSecurityConfigs();
        CertStores certStores2 = new CertStores(false, str, str);
        this.sslServerConfigs = certStores.getTrustingConfig(certStores2);
        this.sslClientConfigs = certStores2.getTrustingConfig(certStores);
        this.sslServerConfigs.put("security.providers", ConfluentTrustProviderCreator.class.getName());
        this.sslServerConfigs.put("ssl.trustmanager.algorithm", "ConfluentTls");
        this.sslServerConfigs.put("ssl.engine.factory.class", cls);
        this.sslServerConfigs.put("ssl.client.auth", "required");
        this.sslServerConfigs.put("confluent.ccloud.host.suffixes", str3);
        this.sslClientConfigs.put("security.providers", ConfluentTrustProviderCreator.class.getName());
        this.sslClientConfigs.put("ssl.trustmanager.algorithm", "ConfluentTls");
        this.sslClientConfigs.put("ssl.endpoint.identification.algorithm", "");
        this.sslClientConfigs.put("confluent.ccloud.host.suffixes", str3);
        this.sslServerConfigs.putAll(certStores.keyStoreProps());
        this.sslServerConfigs.putAll(certStores.trustStoreProps());
        this.sslClientConfigs.putAll(certStores.keyStoreProps());
        this.sslClientConfigs.putAll(certStores.trustStoreProps());
        TestSecurityConfig testSecurityConfig = new TestSecurityConfig(this.sslServerConfigs);
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.SSL);
        this.server = new NioEchoServer(forSecurityProtocol, securityProtocol, testSecurityConfig, MultiTenantRequestContextTest.LOCALHOST, ChannelBuilders.serverChannelBuilder(forSecurityProtocol, true, securityProtocol, testSecurityConfig, (CredentialCache) null, (DelegationTokenCache) null, Time.SYSTEM, new LogContext(), defaultApiVersionsSupplier(), new DefaultRequestCallbackManager()), (CredentialCache) null, Time.SYSTEM);
        this.server.start();
        LogContext logContext = new LogContext();
        SslChannelBuilder sslChannelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, (ListenerName) null, false, logContext, new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
        sslChannelBuilder.configure(this.sslClientConfigs);
        this.selector = new Selector(5000L, new Metrics(), Time.SYSTEM, "MetricGroup", sslChannelBuilder, logContext);
    }

    private Function<Short, ApiVersionsResponse> defaultApiVersionsSupplier() {
        return sh -> {
            return TestUtils.confluentCloudApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        };
    }

    @AfterEach
    public void teardown() throws Exception {
        SecurityTestUtils.clearSecurityConfigs();
        if (this.selector != null) {
            this.selector.close();
        }
        if (this.server != null) {
            this.server.close();
        }
    }

    private void checkAuthenticationSucceed() throws IOException, InterruptedException {
        this.selector.connect("0", new InetSocketAddress(MultiTenantRequestContextTest.LOCALHOST, this.server.port()), BUFFER_SIZE, BUFFER_SIZE);
        NetworkTestUtils.waitForChannelReady(this.selector, "0");
        this.server.verifyAuthenticationMetrics(1, 0);
    }

    @ValueSource(classes = {DefaultSslEngineFactory.class, NettySslEngineFactory.class})
    @ParameterizedTest(name = "{displayName}.sslEngineFactoryClass={0}")
    public void testSuccessfulHandshakeWithInterBroker(Class<?> cls) throws Exception {
        setupInterBroker("client1.us-west-1.aws.confluent.testhost1", "server1.us-west-1.aws.confluent.testhost2", "testhost1,testhost2", cls);
        checkAuthenticationSucceed();
    }

    @ValueSource(classes = {DefaultSslEngineFactory.class, NettySslEngineFactory.class})
    @ParameterizedTest(name = "{displayName}.sslEngineFactoryClass={0}")
    public void testHandshakeFailureWithInterBroker(Class<?> cls) throws Exception {
        Assertions.assertThrows(KafkaException.class, () -> {
            setupInterBroker("client1.us-west-1.aws.confluent.testhost3", "server1.us-west-1.aws.confluent.testhost4", "testhost1,testhost2", cls);
        });
    }
}
