package io.confluent.kafka.multitenant;

import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.common.multitenant.oauth.OAuthBearerJwsToken;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.metadata.AuthCache;
import io.confluent.security.auth.metadata.AuthStore;
import io.confluent.security.auth.mtls.CertIdentityPool;
import io.confluent.security.auth.mtls.CertIdentityPoolExternalIdentifier;
import io.confluent.security.auth.store.data.CaCertificatesKey;
import io.confluent.security.authorizer.Action;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.mtls.CertificateMetadata;
import io.confluent.security.roledefinitions.Operation;
import io.confluent.security.roledefinitions.ResourceType;
import io.confluent.security.test.utils.JwtTestUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.cert.Certificate;
import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.security.auth.ConfluentPrincipal;
import org.apache.kafka.common.security.auth.IdentityMetadata;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:io/confluent/kafka/multitenant/MultiTenantPrincipalBuilderTest.class */
public class MultiTenantPrincipalBuilderTest {
    private static final String OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY = "OAUTHBEARER.token";
    private static final String NON_CONFLUENT_ISSUER = "test-issuer";
    private static final String SUB_CLAIM_VALUE = "123456789";
    private static Map<String, Object> configs;
    private static PhysicalClusterMetadata metadata;
    private static MockBasicAuthStore authStore;
    private static MockTrustCache authCache;
    private SaslAuthenticationContext context;
    private static final String BROKER_UUID = UUID.randomUUID().toString();
    public static final Path TEMP_DIR = TestUtils.tempDirectory().toPath();

    @BeforeAll
    public static void setUp() throws Exception {
        setUpPhysicalMetadata();
        clearYammerMetrics();
        createAuthStore();
    }

    @AfterAll
    public static void tearDown() throws Exception {
        metadata.close(BROKER_UUID);
        if (authStore != null) {
            authStore.close();
        }
    }

    @BeforeEach
    public void clearConfig() {
        configs.remove("multitenant.oauth.superuser.disable");
        configs.remove("confluent.metadata.kafka.enable.dataplane.rbac");
        configs.remove("confluent.calling.resource.identity.type.map");
    }

    @AfterEach
    public void cleanUp() {
        try {
            AuthStore.removeInstance(BROKER_UUID, AuthStore.getInstance(BROKER_UUID), (Logger) Mockito.mock(Logger.class));
        } catch (Exception e) {
        }
    }

    private static void setUpPhysicalMetadata() throws IOException, InterruptedException {
        configs = new HashMap();
        configs.put("principal.builder.class", MultiTenantPrincipalBuilder.class);
        configs.put("broker.session.uuid", BROKER_UUID);
        configs.put("multitenant.metadata.dir", TEMP_DIR.toRealPath(new LinkOption[0]));
        configs.put("node.id", "0");
        metadata = Utils.initiatePhysicalClusterMetadata(configs);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_ABC, TEMP_DIR);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, TEMP_DIR);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId()) != null;
        }, "Expected metadata of new logical cluster to be present in metadata cache");
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserByDefault() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserByDefaultForInternalUsers() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslPrincipalIsNotSuperuserWhenMultitenantOauthSuperuserDisableIsTrue() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testOauthSaslPrincipalIsNotSuperuserWhenRBACEnabled() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testOauthSaslPrincipalIsNotSuperuserWhenBothFlagsEnabled() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrueForInternalUsers() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserWhenRBACEnabledForInternalUsers() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslOrgMissingProps() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId());
        verifyOrgPropsMetric();
    }

    @Test
    public void testOauthSaslPrincipalUnionOfPoolsEnabledAndUseUnion() {
        mockOAuthUnionOfPoolsEnabledSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        JwtTestUtils.updateIdentityPool(authCache, "pool-A", 1, "issuer", "op-A", "https://test.com/jwks", "sub", "pool-A", "true", "org-A");
        MultiTenantPrincipalBuilder createPrincipalBuilder = ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null);
        createPrincipalBuilder.setAuthStore(authStore);
        MultiTenantPrincipal build = createPrincipalBuilder.build(this.context);
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), build.tenantMetadata().clusterId);
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC.organizationId(), build.tenantMetadata().organizationId);
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC.environmentId(), build.tenantMetadata().environmentId);
        Assertions.assertEquals(build.authorizationIds().size(), 3);
        Assertions.assertEquals(build.authorizationIds().get(2), "pool-C");
        Assertions.assertEquals(build.user(), "OAuth-ClientCredentials");
        Assertions.assertNull(build.identityMetadata().poolId());
        Assertions.assertEquals(build.identityMetadata().identity(), SUB_CLAIM_VALUE);
    }

    @Test
    public void testOauthSaslPrincipalUnionOfPoolsEnabledAndUseSinglePool() {
        mockOAuthUnionOfPoolsEnabledSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        JwtTestUtils.updateIdentityPool(authCache, "pool-A", 1, "issuer", "op-A", "https://test.com/jwks", "sub", "pool-A", "true", "org-A");
        MultiTenantPrincipalBuilder createPrincipalBuilder = ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null);
        createPrincipalBuilder.setAuthStore(authStore);
        MultiTenantPrincipal build = createPrincipalBuilder.build(this.context);
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), build.tenantMetadata().clusterId);
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC.organizationId(), build.tenantMetadata().organizationId);
        Assertions.assertEquals(KafkaLogicalClusterUtils.LC_META_ABC.environmentId(), build.tenantMetadata().environmentId);
        Assertions.assertEquals(build.authorizationIds().size(), 1);
        Assertions.assertEquals(build.authorizationIds().get(0), "pool-A");
        Assertions.assertEquals(build.user(), "pool-A");
        Assertions.assertEquals(build.identityMetadata().poolId(), "pool-A");
        Assertions.assertEquals(build.identityMetadata().identity(), SUB_CLAIM_VALUE);
    }

    @Test
    public void testOauthExtractCallingResourceIdentityForTableFlow() {
        HashMap hashMap = new HashMap();
        hashMap.put("userResourceId", "u-1");
        hashMap.put("calling_resource_identity", "cc-unified-storage-cts-enumerator-service.cc-unified-storage-cts-enumerator-test123");
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId(), MultiTenantRequestContextTest.USERNAME, hashMap);
        configs.put("confluent.calling.resource.identity.type.map", "^cc-(unified-storage|us)-cts:CONFLUENT_TABLEFLOW");
        MultiTenantPrincipalBuilder createPrincipalBuilder = ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null);
        createPrincipalBuilder.setAuthStore(authStore);
        Assertions.assertEquals(CallingResourceIdentityType.CONFLUENT_TABLEFLOW, createPrincipalBuilder.build(this.context).identityMetadata().callingResourceIdentityType());
    }

    @Test
    public void testOauthExtractCallingResourceIdentityForFlink() {
        HashMap hashMap = new HashMap();
        hashMap.put("userResourceId", "u-1");
        hashMap.put("calling_resource_identity", "crn://confluent.cloud/organization=9bb441c4-edef-46ac-8a41-c49e44a3fd9a/environment=env-ab123/flink-region=aws.us-east-2/statement=statement_name");
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId(), MultiTenantRequestContextTest.USERNAME, hashMap);
        configs.put("confluent.calling.resource.identity.type.map", "^crn://confluent.cloud/organization.*flink-region:CONFLUENT_FLINK");
        MultiTenantPrincipalBuilder createPrincipalBuilder = ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null);
        createPrincipalBuilder.setAuthStore(authStore);
        Assertions.assertEquals(CallingResourceIdentityType.CONFLUENT_FLINK, createPrincipalBuilder.build(this.context).identityMetadata().callingResourceIdentityType());
    }

    @Test
    public void testPlainSaslOrgMissingProps() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId(), true);
        verifyOrgPropsMetric();
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserByDefaultForUserAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenRBACEnabledForUserAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrueForUserAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserWhenBothFlagsEnabledForUserAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserForInternalUsersWhenRBACEnabledForUserAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserByDefaultForServiceAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenRBACEnabledForServiceAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenWhenMultitenantOauthSuperuserDisableIsTrueForServiceAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenBothFlagsEnabledForServiceAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserForInternalUsersWhenRBACEnabledForServiceAccount() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testHCPrincipalIsSuperuser() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId(), true, true);
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCPrincipalIsSuperuserWhenRBACEnabled() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId(), true, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testHCPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrue() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId(), true, true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCPrincipalIsSuperuserWhenBothFlagsTrue() {
        mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId(), true, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuser() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId());
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuserWhenRBACEnabled() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrue() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId());
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuserWhenBothFlagsTrue() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testOauthServiceAccountIsNotSuperUser() {
        mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), "12345", "sa-foo");
        verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @ParameterizedTest(name = "{displayName}.enable_dataplane_rbac={0}.generateInvalidSan={1}")
    @CsvSource({"false,false", "false,true", "true,false", "true,true"})
    public void testMTlsSaslPrincipalSuccessAndPrincipalIsNeverSuperuser(boolean z, boolean z2) throws GeneralSecurityException, IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null;
        }, "Expected metadata of logical cluster to be absent in metadata cache");
        String organizationId = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
        CaCertificatesKey caCertificatesKey = new CaCertificatesKey("provider1", organizationId);
        CertIdentityPool certIdentityPool = new CertIdentityPool("pool1", "provider1", organizationId, "DN", "");
        CertIdentityPool certIdentityPool2 = new CertIdentityPool("pool2", "provider1", organizationId, "SNID", "");
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", Boolean.valueOf(z));
        verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.singletonList(caCertificatesKey), Arrays.asList(certIdentityPool, certIdentityPool2), CertIdentityPoolExternalIdentifier.DN, false, z2);
    }

    @Test
    public void testMTlsSaslPrincipalThrowsSSLSessionNotPresent() {
        this.context = new SaslAuthenticationContext(-1L, (SaslServer) null, SecurityProtocol.SASL_SSL, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_SSL.name(), Optional.empty(), true, false);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null).build(this.context);
        }, "Principal builder should throw if SSL session is not present");
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "{displayName}.generateInvalidSan={1}")
    public void testMTlsSaslPrincipalThrowsClusterIsMultiTenant(boolean z) throws IOException, InterruptedException {
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) != null;
        }, "Expected metadata of logical cluster to be absent in metadata cache");
        CertIdentityPool certIdentityPool = new CertIdentityPool("pool1", "provider1", "org1", "DN", "");
        CertIdentityPool certIdentityPool2 = new CertIdentityPool("pool2", "provider1", "org1", "SNID", "");
        Assertions.assertThrows(IllegalStateException.class, () -> {
            verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.emptyList(), Arrays.asList(certIdentityPool, certIdentityPool2), CertIdentityPoolExternalIdentifier.DN, false, z);
        }, "Principal should not be built for mTLS client on MT clusters");
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "{displayName}.generateInvalidSan={1}")
    public void testMTlsSaslPrincipalThrowsNoMatchingIdentityProviders(boolean z) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null;
        }, "Expected metadata of logical cluster to be absent in metadata cache");
        Assertions.assertTrue(((Exception) Assertions.assertThrows(AuthenticationException.class, () -> {
            verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.emptyList(), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, false, z);
        }, "Principal should not be built if mTLS client certificate does not match any identity providers")).getMessage().matches("Must be only one matching identity provider found when building principal for mTLS authenticated client of orgId [^ ]+, found \\[\\], Certificate chain = \\[\\{subjectDN=.+,issuerDN.+,serialNumber=.+,notBefore=.+,notAfter=[^\\}]+\\};\\]"));
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "{displayName}.generateInvalidSan={1}")
    public void testMTlsSaslPrincipalThrowsMoreThanOneIdentityProviders(boolean z) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null;
        }, "Expected metadata of logical cluster to be absent in metadata cache");
        CaCertificatesKey caCertificatesKey = new CaCertificatesKey("provider1", "org1");
        CaCertificatesKey caCertificatesKey2 = new CaCertificatesKey("provider2", "org2");
        Assertions.assertTrue(((Exception) Assertions.assertThrows(AuthenticationException.class, () -> {
            verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Arrays.asList(caCertificatesKey, caCertificatesKey2), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, false, z);
        }, "Principal should not be built if mTLS client certificate matches more than one identity providers")).getMessage().matches("Must be only one matching identity provider found when building principal for mTLS authenticated client of orgId [^ ]+, found \\[CaCertificatesKey\\{[^\\}]+\\}, CaCertificatesKey\\{[^\\}]+\\}\\], Certificate chain = \\[\\{subjectDN=.+,issuerDN.+,serialNumber=.+,notBefore=.+,notAfter=[^\\}]+\\};\\]"));
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "{displayName}.generateInvalidSan={1}")
    public void testMTlsSaslPrincipalThrowsClientCertIsRevoked(boolean z) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null;
        }, "Expected metadata of logical cluster to be absent in metadata cache");
        Assertions.assertThrows(AuthenticationException.class, () -> {
            verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.singletonList((CaCertificatesKey) Mockito.mock(CaCertificatesKey.class)), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, true, z);
        }, "Principal should not be built if mTLS client certificate has been revoked by CRL");
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "{displayName}.generateInvalidSan={1}")
    public void testMTlsSaslPrincipalThrowsNoMatchingPools(boolean z) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> {
            return metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null;
        }, "Expected metadata of logical cluster to be absent in metadata cache");
        Assertions.assertThrows(AuthenticationException.class, () -> {
            verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.singletonList((CaCertificatesKey) Mockito.mock(CaCertificatesKey.class)), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, false, z);
        }, "Principal should not be built if mTLS client certificate does not match any identity pools");
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_NonMultiTenant() {
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", MultiTenantRequestContextTest.USERNAME);
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        Assertions.assertEquals(kafkaPrincipal, multiTenantPrincipalBuilder.deserialize(multiTenantPrincipalBuilder.serialize(kafkaPrincipal)));
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_NonMultiTenantCompatibilityWithDefaultKafkaPrincipalBuilder() {
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", MultiTenantRequestContextTest.USERNAME);
        Assertions.assertEquals(kafkaPrincipal, new MultiTenantPrincipalBuilder().deserialize(new DefaultKafkaPrincipalBuilder((KerberosShortNamer) null, (SslPrincipalMapper) null).serialize(kafkaPrincipal)));
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_MultiTenant() {
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal(MultiTenantRequestContextTest.USERNAME, "saslAuthenticationId", new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true));
        Assertions.assertEquals("TenantUser", multiTenantPrincipal.getPrincipalType());
        MultiTenantPrincipal deserialize = multiTenantPrincipalBuilder.deserialize(multiTenantPrincipalBuilder.serialize(multiTenantPrincipal));
        Assertions.assertEquals(multiTenantPrincipal, deserialize);
        MultiTenantPrincipal multiTenantPrincipal2 = deserialize;
        Assertions.assertEquals(multiTenantPrincipal.tenantMetadata().userResourceId, multiTenantPrincipal2.tenantMetadata().userResourceId);
        Assertions.assertEquals(Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isServiceAccount), Boolean.valueOf(multiTenantPrincipal2.tenantMetadata().isServiceAccount));
        Assertions.assertEquals(Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isHealthcheckTenant), Boolean.valueOf(multiTenantPrincipal2.tenantMetadata().isHealthcheckTenant));
        Assertions.assertEquals(Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isApiKeyAuthenticated), Boolean.valueOf(multiTenantPrincipal2.tenantMetadata().isApiKeyAuthenticated));
        Assertions.assertEquals(multiTenantPrincipal.identityMetadata().poolId(), multiTenantPrincipal2.identityMetadata().poolId());
        Assertions.assertNull(multiTenantPrincipal.identityMetadata().poolId());
        Assertions.assertEquals(multiTenantPrincipal.authorizationIds(), multiTenantPrincipal2.authorizationIds());
        Assertions.assertTrue(multiTenantPrincipal.authorizationIds().contains("userResourceID"));
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder2 = new MultiTenantPrincipalBuilder();
        MultiTenantPrincipal multiTenantPrincipal3 = new MultiTenantPrincipal(MultiTenantRequestContextTest.USERNAME, "saslAuthenticationId", Optional.empty(), new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true), new IdentityMetadata("poolId", "providerId", "serviceAccount", "externalIdentityId", "iss1", Collections.singletonList("aud1"), CallingResourceIdentityType.DEFAULT), Arrays.asList("userResourceId", "poolId1", "poolId2"));
        Assertions.assertEquals("TenantUser", multiTenantPrincipal3.getPrincipalType());
        MultiTenantPrincipal deserialize2 = multiTenantPrincipalBuilder2.deserialize(multiTenantPrincipalBuilder2.serialize(multiTenantPrincipal3));
        Assertions.assertEquals(multiTenantPrincipal3, deserialize2);
        MultiTenantPrincipal multiTenantPrincipal4 = deserialize2;
        Assertions.assertEquals(multiTenantPrincipal3.tenantMetadata().userResourceId, multiTenantPrincipal4.tenantMetadata().userResourceId);
        Assertions.assertEquals(Boolean.valueOf(multiTenantPrincipal3.tenantMetadata().isServiceAccount), Boolean.valueOf(multiTenantPrincipal4.tenantMetadata().isServiceAccount));
        Assertions.assertEquals(Boolean.valueOf(multiTenantPrincipal3.tenantMetadata().isHealthcheckTenant), Boolean.valueOf(multiTenantPrincipal4.tenantMetadata().isHealthcheckTenant));
        Assertions.assertEquals(Boolean.valueOf(multiTenantPrincipal3.tenantMetadata().isApiKeyAuthenticated), Boolean.valueOf(multiTenantPrincipal4.tenantMetadata().isApiKeyAuthenticated));
        Assertions.assertEquals(multiTenantPrincipal3.identityMetadata().callingResourceIdentityType(), multiTenantPrincipal4.identityMetadata().callingResourceIdentityType());
        Assertions.assertEquals(multiTenantPrincipal3.identityMetadata().poolId(), multiTenantPrincipal4.identityMetadata().poolId());
        Assertions.assertEquals("poolId", multiTenantPrincipal3.identityMetadata().poolId());
        Assertions.assertEquals(multiTenantPrincipal3.authorizationIds(), multiTenantPrincipal4.authorizationIds());
        Assertions.assertTrue(multiTenantPrincipal4.authorizationIds().containsAll(Arrays.asList("userResourceId", "poolId1", "poolId2")));
        Assertions.assertEquals("externalIdentityId", multiTenantPrincipal3.identityMetadata().externalIdentityId());
    }

    @Test
    public void testAuthorizationIdsEmptyMultiTenantPrincipalBuilder() {
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal(MultiTenantRequestContextTest.USERNAME, "saslAuthenticationId", Optional.empty(), new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true), (IdentityMetadata) null, Arrays.asList("abc"));
        KafkaTestUtils.setField(multiTenantPrincipal, ConfluentPrincipal.class, "authorizationIds", new ArrayList(0));
        Assertions.assertTrue(multiTenantPrincipalBuilder.deserialize(multiTenantPrincipalBuilder.serialize(multiTenantPrincipal)).authorizationIds().contains("userResourceID"));
        KafkaTestUtils.setField(multiTenantPrincipal, ConfluentPrincipal.class, "authorizationIds", null);
        Assertions.assertTrue(multiTenantPrincipalBuilder.deserialize(multiTenantPrincipalBuilder.serialize(multiTenantPrincipal)).authorizationIds().contains("userResourceID"));
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_RejectsBadType() throws Exception {
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        Arrays.asList("Group", "whatever").forEach(str -> {
            Assertions.assertTrue(Assertions.assertThrows(SerializationException.class, () -> {
                multiTenantPrincipalBuilder.deserialize(multiTenantPrincipalBuilder.serialize(new KafkaPrincipal(str, "foo")));
            }).getMessage().startsWith("Invalid principal type "));
        });
    }

    @Test
    public void testUserTenantPrincipalBuilderSerde() {
        UserTenantPrincipalBuilder userTenantPrincipalBuilder = new UserTenantPrincipalBuilder();
        MultiTenantPrincipal build = userTenantPrincipalBuilder.build(null);
        Assertions.assertEquals("TenantUser", build.getPrincipalType());
        MultiTenantPrincipal deserialize = userTenantPrincipalBuilder.deserialize(userTenantPrincipalBuilder.serialize(build));
        Assertions.assertEquals(build, deserialize);
        MultiTenantPrincipal multiTenantPrincipal = deserialize;
        Assertions.assertEquals(build.tenantMetadata().userResourceId, multiTenantPrincipal.tenantMetadata().userResourceId);
        Assertions.assertEquals(Boolean.valueOf(build.tenantMetadata().isServiceAccount), Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isServiceAccount));
        Assertions.assertEquals(Boolean.valueOf(build.tenantMetadata().isHealthcheckTenant), Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isHealthcheckTenant));
        Assertions.assertEquals(Boolean.valueOf(build.tenantMetadata().isApiKeyAuthenticated), Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isApiKeyAuthenticated));
        Assertions.assertEquals(build.authorizationIds(), multiTenantPrincipal.authorizationIds());
        Assertions.assertTrue(build.authorizationIds().contains(multiTenantPrincipal.tenantMetadata().userResourceId));
    }

    @Test
    public void testSpireJwtSvidPrincipalBuilder() {
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        configs.put("confluent.spiffe.id.principal.extraction.rules", "RULE:spiffe://([^/]*?)/v1/physical/(k8s-[^/]*?)/([^/]*?)$!$3!");
        multiTenantPrincipalBuilder.configure(configs);
        mockOAuthSpireSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), "spiffe://dummy.trust.domain/test-workload", "spire.internal.confluent.cloud");
        MultiTenantPrincipal build = multiTenantPrincipalBuilder.build(this.context);
        Assertions.assertEquals("TenantUser", build.getPrincipalType());
        Assertions.assertTrue(build.authorizationIds().contains("spiffe://dummy.trust.domain/test-workload"));
        MultiTenantPrincipal deserialize = multiTenantPrincipalBuilder.deserialize(multiTenantPrincipalBuilder.serialize(build));
        Assertions.assertEquals(build, deserialize);
        MultiTenantPrincipal multiTenantPrincipal = deserialize;
        Assertions.assertEquals(build.tenantMetadata().userResourceId, multiTenantPrincipal.tenantMetadata().userResourceId);
        Assertions.assertEquals(Boolean.valueOf(build.tenantMetadata().isServiceAccount), Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isServiceAccount));
        Assertions.assertEquals(Boolean.valueOf(build.tenantMetadata().isHealthcheckTenant), Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isHealthcheckTenant));
        Assertions.assertEquals(Boolean.valueOf(build.tenantMetadata().isApiKeyAuthenticated), Boolean.valueOf(multiTenantPrincipal.tenantMetadata().isApiKeyAuthenticated));
        Assertions.assertEquals(build.authorizationIds(), multiTenantPrincipal.authorizationIds());
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder2 = new MultiTenantPrincipalBuilder();
        configs.put("authenticator.jwt.spire.issuers.suffix", "test.spire.suffix");
        multiTenantPrincipalBuilder2.configure(configs);
        mockOAuthSpireSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), "spiffe://dummy.trust.domain/v1/physical/k8s-abc/test-workload", "test.spire.suffix");
        MultiTenantPrincipal build2 = multiTenantPrincipalBuilder2.build(this.context);
        Assertions.assertEquals("TenantUser", build2.getPrincipalType());
        Assertions.assertTrue(build2.authorizationIds().contains("spiffe://dummy.trust.domain/v1/physical/k8s-abc/test-workload"));
        Assertions.assertTrue(build2.authorizationIds().contains("test-workload"));
        MultiTenantPrincipal deserialize2 = multiTenantPrincipalBuilder2.deserialize(multiTenantPrincipalBuilder2.serialize(build2));
        Assertions.assertEquals(build2, deserialize2);
        MultiTenantPrincipal multiTenantPrincipal2 = deserialize2;
        Assertions.assertEquals(build2.tenantMetadata().userResourceId, multiTenantPrincipal2.tenantMetadata().userResourceId);
        Assertions.assertEquals(Boolean.valueOf(build2.tenantMetadata().isServiceAccount), Boolean.valueOf(multiTenantPrincipal2.tenantMetadata().isServiceAccount));
        Assertions.assertEquals(Boolean.valueOf(build2.tenantMetadata().isHealthcheckTenant), Boolean.valueOf(multiTenantPrincipal2.tenantMetadata().isHealthcheckTenant));
        Assertions.assertEquals(Boolean.valueOf(build2.tenantMetadata().isApiKeyAuthenticated), Boolean.valueOf(multiTenantPrincipal2.tenantMetadata().isApiKeyAuthenticated));
        Assertions.assertEquals(build2.authorizationIds(), multiTenantPrincipal2.authorizationIds());
    }

    @Test
    public void testBuildPrincipalWithNullUserResourceId() {
        new MultiTenantPrincipalBuilder().serialize(new MultiTenantPrincipal("some-user", new TenantMetadata.Builder("some-cluster", (String) null).build()));
    }

    @Test
    public void testRecordAuthenticationSubtypeMetric() {
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("pool-1"), "123", "Non-Confluent");
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("pool-1"), "123", (String) null);
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("u-1"), "123", "Confluent");
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("pool-1"), "pool-1", "Confluent");
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("u-1", "pool-1"), "123", "Confluent");
        verifyAuthenticationSubtypeMetric();
    }

    @Test
    public void testExternalIdBasedOnSSLCert() throws Exception {
        CertificateMetadata certificateMetadata = new CertificateMetadata(TestSslUtils.generateSignedCertificate("CN=example.com, OU=Kafka, O=Confluent, L=Mountain View, ST=California, C=US", TestSslUtils.generateKeyPair("RSA"), 1, 1, (String) null, (KeyPair) null, "SHA256withRSA", false, true, false, new String[]{"example.com", "*.example.com"}));
        String cn = certificateMetadata.getCn();
        String dn = certificateMetadata.getDn();
        String snid = certificateMetadata.getSnid();
        String san = certificateMetadata.getSan();
        String sha1 = certificateMetadata.getSha1();
        Assertions.assertEquals(cn, MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.CN));
        Assertions.assertEquals(dn, MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.DN));
        Assertions.assertEquals(snid, MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.SNID));
        Assertions.assertEquals(String.format("%s, %s", cn, snid), MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.CN_SNID));
        Assertions.assertEquals(san, MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.SAN));
        Assertions.assertEquals(String.format("%s, %s", san, snid), MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.SAN_SNID));
        Assertions.assertEquals(sha1, MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert(certificateMetadata, CertIdentityPoolExternalIdentifier.SHA1));
    }

    private void verifyTenantMetadata(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, boolean z) {
        MultiTenantPrincipal multiTenantPrincipal = (MultiTenantPrincipal) ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null).build(this.context);
        Assertions.assertEquals(kafkaLogicalClusterMetadata.logicalClusterId(), multiTenantPrincipal.tenantMetadata().clusterId);
        Assertions.assertEquals(kafkaLogicalClusterMetadata.organizationId(), multiTenantPrincipal.tenantMetadata().organizationId);
        Assertions.assertEquals(kafkaLogicalClusterMetadata.environmentId(), multiTenantPrincipal.tenantMetadata().environmentId);
        verifyTenantMetadataScopeAndSuperuserStatus(kafkaLogicalClusterMetadata, z, multiTenantPrincipal);
    }

    private void verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, boolean z, Collection<CaCertificatesKey> collection, Collection<CertIdentityPool> collection2, CertIdentityPoolExternalIdentifier certIdentityPoolExternalIdentifier, boolean z2, boolean z3) throws GeneralSecurityException, IOException {
        X509Certificate generateSignedCertificate = z3 ? TestSslUtils.generateSignedCertificate("C=US, ST=CA, L=San Francisco, O=Confluent, OU=Confluent, CN=client", TestSslUtils.generateKeyPair("RSA"), 1, 1, (String) null, (KeyPair) null, "SHA256withRSA", false, false, true, new byte[]{1, 2, 3, 4}) : TestSslUtils.generateSignedCertificate("C=US, ST=CA, L=San Francisco, O=Confluent, OU=Confluent, CN=client", TestSslUtils.generateKeyPair("RSA"), 1, 1, (String) null, (KeyPair) null, "SHA256withRSA", false, false, true, new String[]{"example.com", "*.example.com"});
        CertificateMetadata certificateMetadata = new CertificateMetadata(generateSignedCertificate);
        String issuerDn = certificateMetadata.getIssuerDn();
        String dn = certificateMetadata.getDn();
        String cn = certificateMetadata.getCn();
        String snid = certificateMetadata.getSnid();
        String san = certificateMetadata.getSan();
        if (z3) {
            Objects.requireNonNull(generateSignedCertificate);
            Assertions.assertThrows(CertificateParsingException.class, generateSignedCertificate::getSubjectAlternativeNames);
            Assertions.assertEquals(0, san.length());
        } else {
            Assertions.assertEquals("DNS:example.com,DNS:*.example.com", san);
        }
        setUpMTlsMocks(generateSignedCertificate, collection, collection2, z2);
        MultiTenantPrincipalBuilder createPrincipalBuilder = ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null);
        MultiTenantPrincipal multiTenantPrincipal = (MultiTenantPrincipal) createPrincipalBuilder.build(this.context);
        String str = "";
        if (CertIdentityPoolExternalIdentifier.CN.equals(certIdentityPoolExternalIdentifier)) {
            str = cn;
        } else if (CertIdentityPoolExternalIdentifier.DN.equals(certIdentityPoolExternalIdentifier)) {
            str = dn;
        }
        String organizationId = kafkaLogicalClusterMetadata.organizationId();
        String str2 = "";
        if (collection != null && collection.size() == 1) {
            str2 = collection.iterator().next().providerId();
        }
        String mTlsAuthenticationId = MultiTenantPrincipal.mTlsAuthenticationId(organizationId, str2, issuerDn, snid);
        verifyPrincipalProps(kafkaLogicalClusterMetadata, str, z, mTlsAuthenticationId, multiTenantPrincipal, collection2, str2);
        MultiTenantPrincipal multiTenantPrincipal2 = (MultiTenantPrincipal) createPrincipalBuilder.deserialize(createPrincipalBuilder.serialize(multiTenantPrincipal));
        Assertions.assertEquals(multiTenantPrincipal, multiTenantPrincipal2);
        verifyPrincipalProps(kafkaLogicalClusterMetadata, str, z, mTlsAuthenticationId, multiTenantPrincipal2, collection2, str2);
    }

    private void verifyPrincipalProps(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, String str, boolean z, String str2, MultiTenantPrincipal multiTenantPrincipal, Collection<CertIdentityPool> collection, String str3) {
        Assertions.assertEquals(String.format("%s_%s", kafkaLogicalClusterMetadata.logicalClusterId(), str), multiTenantPrincipal.getName());
        Assertions.assertEquals(str2, multiTenantPrincipal.authenticationId());
        Assertions.assertFalse(multiTenantPrincipal.networkId().isPresent());
        Assertions.assertEquals(kafkaLogicalClusterMetadata.logicalClusterId(), multiTenantPrincipal.tenantMetadata().clusterId);
        Assertions.assertNotNull(multiTenantPrincipal.tenantMetadata().userResourceId);
        Set set = (Set) collection.stream().map((v0) -> {
            return v0.poolId();
        }).collect(Collectors.toSet());
        Assertions.assertEquals(set, new HashSet(Arrays.asList(multiTenantPrincipal.tenantMetadata().userResourceId.split(","))));
        Assertions.assertEquals(kafkaLogicalClusterMetadata.organizationId(), multiTenantPrincipal.tenantMetadata().organizationId);
        Assertions.assertEquals(kafkaLogicalClusterMetadata.environmentId(), multiTenantPrincipal.tenantMetadata().environmentId);
        Assertions.assertFalse(multiTenantPrincipal.tenantMetadata().isServiceAccount);
        Assertions.assertFalse(multiTenantPrincipal.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertFalse(multiTenantPrincipal.tenantMetadata().isHealthcheckTenant);
        verifyTenantMetadataScopeAndSuperuserStatus(kafkaLogicalClusterMetadata, z, multiTenantPrincipal);
        Assertions.assertEquals(new IdentityMetadata((String) null, str3, str, (String) null), multiTenantPrincipal.identityMetadata());
        Assertions.assertEquals(set, new HashSet(multiTenantPrincipal.authorizationIds()));
    }

    private void verifyTenantMetadataScopeAndSuperuserStatus(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, boolean z, MultiTenantPrincipal multiTenantPrincipal) {
        Assertions.assertEquals(new Scope.Builder(new String[0]).addPath("organization=" + kafkaLogicalClusterMetadata.organizationId()).addPath("environment=" + kafkaLogicalClusterMetadata.environmentId()).addPath("cloud-cluster=" + kafkaLogicalClusterMetadata.logicalClusterId()).withKafkaCluster(kafkaLogicalClusterMetadata.logicalClusterId()).build(), multiTenantPrincipal.tenantMetadata().scope());
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(MultiTenantAuthorizer.isSuperUser(multiTenantPrincipal, new Action(Scope.kafkaClusterScope("foo"), ResourceType.ALL, multiTenantPrincipal.tenantMetadata().clusterId + "_", Operation.ALL), false, "true".equals(configs.get("confluent.metadata.kafka.enable.dataplane.rbac")), "true".equals(configs.get("multitenant.oauth.superuser.disable")))));
    }

    private void mockOAuthSaslContext(String str) {
        mockOAuthSaslContext(str, false, true);
    }

    private void mockOAuthSaslContext(String str, boolean z) {
        mockOAuthSaslContext(str, z, true);
    }

    private void mockOAuthSaslContext(String str, boolean z, boolean z2) {
        String str2 = z ? "0" : MultiTenantRequestContextTest.USERNAME;
        mockOAuthSaslContext(str, str2, z2 ? str2 : null);
    }

    private void mockOAuthSaslContext(String str, String str2, String str3) {
        Map<String, Object> map = null;
        if (str3 != null) {
            map = Collections.singletonMap("userResourceId", str3);
        }
        mockOAuthSaslContext(str, str2, map);
    }

    private void mockOAuthSaslContext(String str, String str2, Map<String, Object> map) {
        SaslServer saslServer = (SaslServer) Mockito.mock(OAuthBearerSaslServer.class);
        Mockito.when(saslServer.getNegotiatedProperty(OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY)).thenReturn(new OAuthBearerJwsToken("", (Set) null, 0L, str2, 0L, map));
        Mockito.when(saslServer.getNegotiatedProperty("logicalCluster")).thenReturn(str);
        this.context = new SaslAuthenticationContext(saslServer, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_PLAINTEXT.name());
    }

    private void mockOAuthUnionOfPoolsEnabledSaslContext(String str, boolean z) {
        SaslServer saslServer = (SaslServer) Mockito.mock(OAuthBearerSaslServer.class);
        Mockito.when(saslServer.getNegotiatedProperty(OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY)).thenReturn(new OAuthBearerJwsToken("", (Set) null, 0L, MultiTenantRequestContextTest.USERNAME, 0L, new HashMap(), NON_CONFLUENT_ISSUER));
        Mockito.when(saslServer.getNegotiatedProperty("logicalCluster")).thenReturn(str);
        Mockito.when(saslServer.getNegotiatedProperty("identityPoolId-azp")).thenReturn(SUB_CLAIM_VALUE);
        if (z) {
            Mockito.when(saslServer.getNegotiatedProperty("identityPoolId-identityPoolId")).thenReturn("pool-A,pool-B,pool-C");
            Mockito.when(saslServer.getNegotiatedProperty("identityPoolId-sub")).thenReturn("OAuth-ClientCredentials");
            Mockito.when(saslServer.getNegotiatedProperty("identityPoolId")).thenReturn("OAuth-ClientCredentials");
        } else {
            Mockito.when(saslServer.getNegotiatedProperty("identityPoolId-sub")).thenReturn("pool-A");
            Mockito.when(saslServer.getNegotiatedProperty("identityPoolId")).thenReturn("pool-A");
        }
        this.context = new SaslAuthenticationContext(saslServer, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_PLAINTEXT.name());
    }

    private void mockOAuthSpireSaslContext(String str, String str2, String str3) {
        SaslServer saslServer = (SaslServer) Mockito.mock(OAuthBearerSaslServer.class);
        Mockito.when(saslServer.getNegotiatedProperty(OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY)).thenReturn(new OAuthBearerJwsToken("", (Set) null, 0L, str2, 0L, (Map) null, "test.prefix." + str3));
        Mockito.when(saslServer.getNegotiatedProperty("logicalCluster")).thenReturn(str);
        this.context = new SaslAuthenticationContext(saslServer, SecurityProtocol.SASL_SSL, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_SSL.name());
    }

    private void mockPlainSaslContext(String str, boolean z) {
        mockPlainSaslContext(str, z, false, true);
    }

    private void mockPlainSaslContext(String str, boolean z, boolean z2) {
        mockPlainSaslContext(str, z, z2, true);
    }

    private void mockPlainSaslContext(String str, boolean z, boolean z2, boolean z3) {
        TenantMetadata build;
        boolean z4 = KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.equals(str) || KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.equals(str);
        String str2 = z2 ? "0" : MultiTenantRequestContextTest.USERNAME;
        MultiTenantSaslServer multiTenantSaslServer = (MultiTenantSaslServer) Mockito.mock(MultiTenantSaslServer.class);
        if (z3) {
            build = new TenantMetadata.Builder(str, "u-" + str2).serviceAccount(!z).healthcheckTenant(z4).apiKeyAuthenticated(true).build();
        } else {
            build = new TenantMetadata.Builder(str, (String) null).serviceAccount(!z).healthcheckTenant(z4).apiKeyAuthenticated(true).build();
        }
        Mockito.when(multiTenantSaslServer.tenantMetadata()).thenReturn(build);
        Mockito.when(multiTenantSaslServer.getAuthorizationID()).thenReturn(str2);
        this.context = new SaslAuthenticationContext(multiTenantSaslServer, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_PLAINTEXT.name());
    }

    private void setUpMTlsMocks(X509Certificate x509Certificate, Collection<CaCertificatesKey> collection, Collection<CertIdentityPool> collection2, boolean z) throws IOException {
        Certificate[] certificateArr = {x509Certificate};
        SSLSession sSLSession = (SSLSession) Mockito.mock(SSLSession.class);
        Mockito.when(sSLSession.getPeerCertificates()).thenReturn(certificateArr);
        this.context = new SaslAuthenticationContext(-1L, (SaslServer) null, SecurityProtocol.SASL_SSL, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_SSL.name(), Optional.of(sSLSession), true, false);
        AuthStore authStore2 = (AuthStore) Mockito.mock(AuthStore.class);
        AuthCache authCache2 = (AuthCache) Mockito.mock(AuthCache.class);
        AuthStore.addInstance(BROKER_UUID, authStore2, (Logger) Mockito.mock(Logger.class));
        Mockito.when(authStore2.authCache()).thenReturn(authCache2);
        Mockito.when(authCache2.findCertIdentityProviders((Certificate[]) ArgumentMatchers.any(), (String) ArgumentMatchers.any())).thenReturn(collection);
        Mockito.when(Boolean.valueOf(authCache2.isRevoked((Certificate[]) ArgumentMatchers.eq(certificateArr), (String) ArgumentMatchers.any(), (String) ArgumentMatchers.any()))).thenReturn(Boolean.valueOf(z));
        Mockito.when(authCache2.findCertIdentityPools(ArgumentMatchers.anyMap(), (String) ArgumentMatchers.any(), (String) ArgumentMatchers.any())).thenReturn(collection2);
    }

    private void verifyOrgPropsMetric() {
        for (int i = 0; i < 10; i++) {
            ChannelBuilders.createPrincipalBuilder(configs, (KerberosShortNamer) null, (SslPrincipalMapper) null).build(this.context);
        }
        List list = (List) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(entry -> {
            return ((MetricName) entry.getKey()).getName().equals("org-props-missing-rate");
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
        Assertions.assertTrue(list.size() > 0);
        Assertions.assertTrue(((int) ((Meter) list.get(0)).count()) >= 10);
    }

    private void verifyAuthenticationSubtypeMetric() {
        KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(entry -> {
            return ((MetricName) entry.getKey()).getName().equals("authentication-subtype-rate");
        }).forEach(entry2 -> {
            Assertions.assertTrue(((Meter) entry2.getValue()).count() >= 1);
        });
    }

    private static void createAuthStore() throws Exception {
        authStore = MockBasicAuthStore.create(BROKER_UUID);
        authCache = authStore.trustCache();
    }

    private static void clearYammerMetrics() {
        Iterator it = KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().iterator();
        while (it.hasNext()) {
            KafkaYammerMetrics.defaultRegistry().removeMetric((MetricName) it.next());
        }
    }
}
