package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.link.integration.MultiTenantClusterLinkTest;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import java.io.File;
import java.io.IOException;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestInfoUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/SslCertificateIntegrationTest.class */
public class SslCertificateIntegrationTest {
    private static final Long TEST_CACHE_RELOAD_DELAY_MS = Long.valueOf(TimeUnit.SECONDS.toMillis(5));
    private static final String SSL_CERTS_DIR = "mnt/sslcerts/";
    private Path tempDir;
    private File validCertDir;
    private File invalidCertDir;
    private IntegrationTestHarness testHarness;
    private String brokerUUID;
    private LogicalCluster logicalCluster1;
    private LogicalCluster logicalCluster2;
    private Properties clientProps;
    protected Time time;

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.tempDir = TestUtils.tempDirectory().toPath();
        MockAuditLogProvider.reset();
        this.validCertDir = TestUtils.tempDirectory(this.tempDir, "validCert");
        Map buildProperties = new TestSslUtils.SslConfigsBuilder(Mode.SERVER).useClientCert(false).createNewTrustStore(this.validCertDir).certAlias(MultiTenantClusterLinkTest.SSL_KAFKA_CN).cn("localhost.cpdev.cloud").usePem(true).ccloudSpecDir(this.validCertDir).tlsProtocol("TLSv1.2").certBuilder(new TestSslUtils.CertificateBuilder().sanDnsNames(new String[]{"localhost.cpdev.cloud"})).buildProperties();
        TestUtils.writeToFile(new File(this.validCertDir, "spec.json"), "{\"ssl_certificate_encoding\":\"PKCS12\",\"ssl_keystore_password\":\"ServerPassword\",\"ssl_keystore_filename\":\"pkcs.p12\",\"secret_id\":1,\"ssl_pem_fullchain_filename\":\"fullchain.pem\",\"ssl_pem_privkey_filename\":\"privkey.pem\"}");
        TestUtils.writeToFile(new File(this.validCertDir, "truststore.pem"), (String) buildProperties.get("ssl.truststore.certificates"));
        this.invalidCertDir = TestUtils.tempDirectory(this.tempDir, "invalidCert");
        new TestSslUtils.SslConfigsBuilder(Mode.SERVER).useClientCert(false).createNewTrustStore(this.invalidCertDir).certAlias(MultiTenantClusterLinkTest.SSL_KAFKA_CN).cn("localhost.bad.domain").usePem(true).ccloudSpecDir(this.invalidCertDir).tlsProtocol("TLSv1.2").certBuilder(new TestSslUtils.CertificateBuilder().sanDnsNames(new String[]{"localhost.bad.domain"})).buildProperties();
        TestUtils.writeToFile(new File(this.invalidCertDir, "spec.json"), "{\"ssl_certificate_encoding\":\"PKCS12\",\"ssl_keystore_password\":\"ServerPassword\",\"ssl_keystore_filename\":\"pkcs.p12\",\"secret_id\":1,\"ssl_pem_fullchain_filename\":\"fullchain.pem\",\"ssl_pem_privkey_filename\":\"privkey.pem\"}");
        TestUtils.writeToFile(new File(this.invalidCertDir, "truststore.pem"), (String) buildProperties.get("ssl.truststore.certificates"));
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        Utils.createLogicalClusterFile(Utils.LC_META_XYZ, this.tempDir);
        Utils.syncCerts(this.tempDir, this.validCertDir.toURI().toURL(), SSL_CERTS_DIR);
        this.testHarness = new IntegrationTestHarness(testInfo);
        HashMap hashMap = new HashMap();
        hashMap.put("ssl.truststore.certificates", buildProperties.get("ssl.truststore.certificates"));
        hashMap.put("ssl.truststore.type", buildProperties.get("ssl.truststore.type"));
        hashMap.put("confluent.require.compatible.keystore.updates", "false");
        PhysicalCluster start = this.testHarness.start(brokerProps(hashMap, TestInfoUtils.isKRaft(testInfo)), controllerProps(hashMap));
        this.logicalCluster1 = start.createLogicalCluster("tenantA", 100, 9, 11, 12);
        this.logicalCluster2 = start.createLogicalCluster("tenantB", 200, 9, 21, 22);
        this.clientProps = new Properties();
        this.clientProps.put("ssl.truststore.certificates", buildProperties.get("ssl.truststore.certificates"));
        this.clientProps.put("ssl.truststore.type", buildProperties.get("ssl.truststore.type"));
        this.clientProps.put("ssl.trustmanager.algorithm", "ConfluentTls");
        this.clientProps.put("security.providers", "io.confluent.kafka.server.plugins.ssl.ConfluentTrustProviderCreator");
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private Properties brokerProps(Map<String, String> map, boolean z) throws IOException {
        this.brokerUUID = "uuid";
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.BrokerSessionUuidProp(), this.brokerUUID);
        properties.put("multitenant.metadata.dir", this.tempDir.toRealPath(new LinkOption[0]).toString());
        properties.put("multitenant.metadata.class", "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put("multitenant.metadata.reload.delay.ms", TEST_CACHE_RELOAD_DELAY_MS);
        properties.put("multitenant.metadata.ssl.certs.path", this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "spec.json");
        properties.put("listeners", "INTERNAL://localhost:0,SSL://localhost:0");
        properties.put("advertised.listeners", "INTERNAL://localhost:0,SSL://localhost:0");
        properties.put("listener.security.protocol.map", "CONTROLLER:PLAINTEXT,CONTROLLER_SSL:SSL,INTERNAL:PLAINTEXT,SSL:SSL");
        if (z) {
            properties.put("controller.listener.names", "CONTROLLER_SSL,CONTROLLER");
        }
        properties.put("ssl.keystore.location", this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "/pkcs.p12");
        properties.put("ssl.keystore.password", "ServerPassword");
        properties.put("ssl.keystore.type", "PKCS12");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("ssl.trustmanager.algorithm", "ConfluentTls");
        properties.put("security.providers", "io.confluent.kafka.server.plugins.ssl.ConfluentTrustProviderCreator");
        properties.put("ssl.protocol", "TLSv1.2");
        properties.put("confluent.security.event.logger.authentication.enable", "true");
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        properties.putAll(map);
        return properties;
    }

    private Properties controllerProps(Map<String, String> map) throws IOException {
        Properties properties = new Properties();
        properties.put("multitenant.metadata.dir", this.tempDir.toRealPath(new LinkOption[0]).toString());
        properties.put("multitenant.metadata.class", "io.confluent.kafka.multitenant.PhysicalClusterMetadata");
        properties.put("multitenant.metadata.reload.delay.ms", TEST_CACHE_RELOAD_DELAY_MS);
        properties.put("multitenant.metadata.ssl.certs.path", this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "spec.json");
        properties.put("controller.listener.names", "CONTROLLER_SSL,CONTROLLER");
        properties.put("listener.security.protocol.map", "CONTROLLER:PLAINTEXT,CONTROLLER_SSL:SSL,INTERNAL:PLAINTEXT,SSL:SSL");
        properties.put(KafkaConfig.ListenersProp(), "CONTROLLER_SSL://localhost:0,CONTROLLER://localhost:0");
        properties.put("ssl.keystore.location", this.tempDir.toRealPath(new LinkOption[0]) + "/" + SSL_CERTS_DIR + "/pkcs.p12");
        properties.put("ssl.keystore.password", "ServerPassword");
        properties.put("ssl.keystore.type", "PKCS12");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("ssl.trustmanager.algorithm", "ConfluentTls");
        properties.put("security.providers", "io.confluent.kafka.server.plugins.ssl.ConfluentTrustProviderCreator");
        properties.put("ssl.protocol", "TLSv1.2");
        properties.putAll(map);
        return properties;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProduceConsumeFailsOnInvalidCertificateSync(String str) throws Throwable {
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster2.user(21), Collections.singletonList("testtopic1"), "group1", 0, SecurityProtocol.SSL, true, Optional.of("SSL"), this.clientProps);
        Utils.syncCerts(this.tempDir, this.invalidCertDir.toURI().toURL(), SSL_CERTS_DIR);
        TestUtils.waitForCondition(() -> {
            try {
                this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster2.user(21), Collections.singletonList("testtopic2"), "group2", 0, SecurityProtocol.SSL, false, Optional.of("SSL"), this.clientProps);
                return false;
            } catch (Throwable th) {
                System.out.println(th);
                Throwable th2 = th;
                if (th instanceof ExecutionException) {
                    th2 = th.getCause();
                }
                return th2 instanceof SslAuthenticationException;
            }
        }, 30000L, "Produce/consume invocation did not throw SslAuthenticationException");
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.SSL_HANDSHAKE_FAILED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testControllerOperationFailsOnInvalidCertificateSync(String str) throws IOException, InterruptedException {
        try {
            Admin controllerAdminClient = this.testHarness.controllerAdminClient("ssl-cert-integration-test");
            Throwable th = null;
            try {
                try {
                    controllerAdminClient.describeCluster().clusterId().get();
                    if (controllerAdminClient != null) {
                        if (0 != 0) {
                            try {
                                controllerAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            controllerAdminClient.close();
                        }
                    }
                    Utils.syncCerts(this.tempDir, this.invalidCertDir.toURI().toURL(), SSL_CERTS_DIR);
                    TestUtils.waitForCondition(() -> {
                        try {
                            Admin controllerAdminClient2 = this.testHarness.controllerAdminClient("ssl-cert-integration-test");
                            Throwable th3 = null;
                            try {
                                controllerAdminClient2.describeCluster().clusterId().get();
                                if (controllerAdminClient2 != null) {
                                    if (0 != 0) {
                                        try {
                                            controllerAdminClient2.close();
                                        } catch (Throwable th4) {
                                            th3.addSuppressed(th4);
                                        }
                                    } else {
                                        controllerAdminClient2.close();
                                    }
                                }
                                return false;
                            } finally {
                            }
                        } catch (Exception e) {
                            Exception exc = e;
                            if (e instanceof ExecutionException) {
                                exc = e.getCause();
                            }
                            return exc instanceof SslAuthenticationException;
                        }
                    }, "Admin client invocation did not throw SslAuthenticationException");
                } finally {
                }
            } finally {
            }
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
