/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.multitenant.CallingResourceIdentityType;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.MultiTenantSaslServer;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.UserTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.security.oauthbearer.OAuthBearerJwsToken;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.metadata.AuthCache;
import io.confluent.security.auth.metadata.AuthStore;
import io.confluent.security.auth.mtls.CertIdentityPool;
import io.confluent.security.auth.mtls.CertIdentityPoolExternalIdentifier;
import io.confluent.security.auth.store.data.CaCertificatesKey;
import io.confluent.security.authorizer.Action;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.mtls.CertificateMetadata;
import io.confluent.security.roledefinitions.Operation;
import io.confluent.security.roledefinitions.ResourceType;
import io.confluent.security.store.KeyValueStore;
import io.confluent.security.test.utils.JwtTestUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.cert.Certificate;
import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.ConfluentPrincipal;
import org.apache.kafka.common.security.auth.IdentityMetadata;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

public class MultiTenantPrincipalBuilderTest {
    private static final String OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY = "OAUTHBEARER.token";
    private static final String NON_CONFLUENT_ISSUER = "test-issuer";
    private static final String SUB_CLAIM_VALUE = "123456789";
    private static final String BROKER_UUID = UUID.randomUUID().toString();
    private static Map<String, Object> configs;
    private static PhysicalClusterMetadata metadata;
    private static MockBasicAuthStore authStore;
    private static MockTrustCache authCache;
    private SaslAuthenticationContext context;
    public static final Path TEMP_DIR;

    @BeforeAll
    public static void setUp() throws Exception {
        MultiTenantPrincipalBuilderTest.setUpPhysicalMetadata();
        MultiTenantPrincipalBuilderTest.clearYammerMetrics();
        MultiTenantPrincipalBuilderTest.createAuthStore();
    }

    @AfterAll
    public static void tearDown() throws Exception {
        metadata.close(BROKER_UUID);
        if (authStore != null) {
            authStore.close();
        }
    }

    @BeforeEach
    public void clearConfig() {
        configs.remove("multitenant.oauth.superuser.disable");
        configs.remove("confluent.metadata.kafka.enable.dataplane.rbac");
        configs.remove("confluent.calling.resource.identity.type.map");
    }

    @AfterEach
    public void cleanUp() {
        try {
            AuthStore.removeInstance((String)BROKER_UUID, (AuthStore)AuthStore.getInstance((String)BROKER_UUID), (Logger)((Logger)Mockito.mock(Logger.class)));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private static void setUpPhysicalMetadata() throws IOException, InterruptedException {
        configs = new HashMap<String, Object>();
        configs.put("principal.builder.class", MultiTenantPrincipalBuilder.class);
        configs.put("broker.session.uuid", BROKER_UUID);
        configs.put("multitenant.metadata.dir", TEMP_DIR.toRealPath(new LinkOption[0]));
        configs.put("node.id", "0");
        metadata = Utils.initiatePhysicalClusterMetadata(configs);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_ABC, TEMP_DIR);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, TEMP_DIR);
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId()) != null, (String)"Expected metadata of new logical cluster to be present in metadata cache");
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserByDefault() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserByDefaultForInternalUsers() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslPrincipalIsNotSuperuserWhenMultitenantOauthSuperuserDisableIsTrue() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testOauthSaslPrincipalIsNotSuperuserWhenRBACEnabled() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testOauthSaslPrincipalIsNotSuperuserWhenBothFlagsEnabled() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrueForInternalUsers() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslPrincipalIsSuperuserWhenRBACEnabledForInternalUsers() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testOauthSaslOrgMissingProps() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId());
        this.verifyOrgPropsMetric();
    }

    @Test
    public void testOauthSaslPrincipalUnionOfPoolsEnabledAndUseUnion() {
        this.mockOAuthUnionOfPoolsEnabledSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        JwtTestUtils.updateIdentityPool((KeyValueStore)authCache, (String)"pool-A", (int)1, (String)"issuer", (String)"op-A", (String)"https://test.com/jwks", (String)"sub", (String)"pool-A", (String)"true", (String)"org-A");
        MultiTenantPrincipalBuilder builder = (MultiTenantPrincipalBuilder)ChannelBuilders.createPrincipalBuilder(configs, null, null);
        builder.setAuthStore((AuthStore)authStore);
        MultiTenantPrincipal principal = (MultiTenantPrincipal)builder.build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), (Object)principal.tenantMetadata().clusterId);
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC.organizationId(), (Object)principal.tenantMetadata().organizationId);
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC.environmentId(), (Object)principal.tenantMetadata().environmentId);
        Assertions.assertEquals((int)principal.authorizationIds().size(), (int)3);
        Assertions.assertEquals(principal.authorizationIds().get(2), (Object)"pool-C");
        Assertions.assertEquals((Object)principal.user(), (Object)"OAuth-ClientCredentials");
        Assertions.assertNull((Object)principal.identityMetadata().poolId());
        Assertions.assertEquals((Object)principal.identityMetadata().identity(), (Object)SUB_CLAIM_VALUE);
    }

    @Test
    public void testOauthSaslPrincipalUnionOfPoolsEnabledAndUseSinglePool() {
        this.mockOAuthUnionOfPoolsEnabledSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        JwtTestUtils.updateIdentityPool((KeyValueStore)authCache, (String)"pool-A", (int)1, (String)"issuer", (String)"op-A", (String)"https://test.com/jwks", (String)"sub", (String)"pool-A", (String)"true", (String)"org-A");
        MultiTenantPrincipalBuilder builder = (MultiTenantPrincipalBuilder)ChannelBuilders.createPrincipalBuilder(configs, null, null);
        builder.setAuthStore((AuthStore)authStore);
        MultiTenantPrincipal principal = (MultiTenantPrincipal)builder.build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), (Object)principal.tenantMetadata().clusterId);
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC.organizationId(), (Object)principal.tenantMetadata().organizationId);
        Assertions.assertEquals((Object)KafkaLogicalClusterUtils.LC_META_ABC.environmentId(), (Object)principal.tenantMetadata().environmentId);
        Assertions.assertEquals((int)principal.authorizationIds().size(), (int)1);
        Assertions.assertEquals(principal.authorizationIds().get(0), (Object)"pool-A");
        Assertions.assertEquals((Object)principal.user(), (Object)"pool-A");
        Assertions.assertEquals((Object)principal.identityMetadata().poolId(), (Object)"pool-A");
        Assertions.assertEquals((Object)principal.identityMetadata().identity(), (Object)SUB_CLAIM_VALUE);
    }

    @Test
    public void testOauthExtractCallingResourceIdentityForTableFlow() {
        HashMap<String, Object> jwtClaims = new HashMap<String, Object>();
        jwtClaims.put("userResourceId", "u-1");
        jwtClaims.put("calling_resource_identity", "cc-unified-storage-cts-enumerator-service.cc-unified-storage-cts-enumerator-test123");
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId(), "user", jwtClaims);
        configs.put("confluent.calling.resource.identity.type.map", "^cc-(unified-storage|us)-cts:CONFLUENT_TABLEFLOW");
        MultiTenantPrincipalBuilder builder = (MultiTenantPrincipalBuilder)ChannelBuilders.createPrincipalBuilder(configs, null, null);
        builder.setAuthStore((AuthStore)authStore);
        MultiTenantPrincipal principal = (MultiTenantPrincipal)builder.build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)CallingResourceIdentityType.CONFLUENT_TABLEFLOW, (Object)principal.identityMetadata().callingResourceIdentityType());
    }

    @Test
    public void testOauthExtractCallingResourceIdentityForFlink() {
        HashMap<String, Object> jwtClaims = new HashMap<String, Object>();
        jwtClaims.put("userResourceId", "u-1");
        jwtClaims.put("calling_resource_identity", "crn://confluent.cloud/organization=9bb441c4-edef-46ac-8a41-c49e44a3fd9a/environment=env-ab123/flink-region=aws.us-east-2/statement=statement_name");
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId(), "user", jwtClaims);
        configs.put("confluent.calling.resource.identity.type.map", "^crn://confluent.cloud/organization.*flink-region:CONFLUENT_FLINK");
        MultiTenantPrincipalBuilder builder = (MultiTenantPrincipalBuilder)ChannelBuilders.createPrincipalBuilder(configs, null, null);
        builder.setAuthStore((AuthStore)authStore);
        MultiTenantPrincipal principal = (MultiTenantPrincipal)builder.build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)CallingResourceIdentityType.CONFLUENT_FLINK, (Object)principal.identityMetadata().callingResourceIdentityType());
    }

    @Test
    public void testPlainSaslOrgMissingProps() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId(), true);
        this.verifyOrgPropsMetric();
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserByDefaultForUserAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenRBACEnabledForUserAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrueForUserAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserWhenBothFlagsEnabledForUserAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsSuperuserForInternalUsersWhenRBACEnabledForUserAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), true, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, true);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserByDefaultForServiceAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenRBACEnabledForServiceAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenWhenMultitenantOauthSuperuserDisableIsTrueForServiceAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserWhenBothFlagsEnabledForServiceAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testPlainSaslPrincipalIsNotSuperuserForInternalUsersWhenRBACEnabledForServiceAccount() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), false, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @Test
    public void testHCPrincipalIsSuperuser() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId(), true, true);
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCPrincipalIsSuperuserWhenRBACEnabled() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId(), true, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testHCPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrue() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId(), true, true);
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCPrincipalIsSuperuserWhenBothFlagsTrue() {
        this.mockPlainSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId(), true, true);
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuser() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId());
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuserWhenRBACEnabled() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuserWhenMultitenantOauthSuperuserDisableIsTrue() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.logicalClusterId());
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_HEALTHCHECK, true);
    }

    @Test
    public void testHCOauthPrincipalIsSuperuserWhenBothFlagsTrue() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.logicalClusterId());
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", "true");
        configs.put("multitenant.oauth.superuser.disable", "true");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK, true);
    }

    @Test
    public void testOauthServiceAccountIsNotSuperUser() {
        this.mockOAuthSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), "12345", "sa-foo");
        this.verifyTenantMetadata(KafkaLogicalClusterUtils.LC_META_ABC, false);
    }

    @ParameterizedTest(name="{displayName}.enable_dataplane_rbac={0}.generateInvalidSan={1}")
    @CsvSource(value={"false,false", "false,true", "true,false", "true,true"})
    public void testMTlsSaslPrincipalSuccessAndPrincipalIsNeverSuperuser(boolean enableDataplaneRbacForPkc, boolean generateInvalidSan) throws GeneralSecurityException, IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null, (String)"Expected metadata of logical cluster to be absent in metadata cache");
        String provider = "provider1";
        String orgId = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
        CaCertificatesKey mockProvider = new CaCertificatesKey("provider1", orgId);
        CertIdentityPool mockPool1 = new CertIdentityPool("pool1", "provider1", orgId, "DN", "");
        CertIdentityPool mockPool2 = new CertIdentityPool("pool2", "provider1", orgId, "SNID", "");
        configs.put("confluent.metadata.kafka.enable.dataplane.rbac", enableDataplaneRbacForPkc);
        this.verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.singletonList(mockProvider), Arrays.asList(mockPool1, mockPool2), CertIdentityPoolExternalIdentifier.DN, false, generateInvalidSan);
    }

    @Test
    public void testMTlsSaslPrincipalThrowsSSLSessionNotPresent() {
        this.context = new SaslAuthenticationContext(-1L, null, SecurityProtocol.SASL_SSL, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_SSL.name(), Optional.empty(), true, false);
        Assertions.assertThrows(IllegalArgumentException.class, () -> ChannelBuilders.createPrincipalBuilder(configs, null, null).build((AuthenticationContext)this.context), (String)"Principal builder should throw if SSL session is not present");
    }

    @ParameterizedTest(name="{displayName}.generateInvalidSan={1}")
    @ValueSource(booleans={false, true})
    public void testMTlsSaslPrincipalThrowsClusterIsMultiTenant(boolean generateInvalidSan) throws IOException, InterruptedException {
        Utils.createLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) != null, (String)"Expected metadata of logical cluster to be absent in metadata cache");
        CertIdentityPool mockPool1 = new CertIdentityPool("pool1", "provider1", "org1", "DN", "");
        CertIdentityPool mockPool2 = new CertIdentityPool("pool2", "provider1", "org1", "SNID", "");
        Assertions.assertThrows(IllegalStateException.class, () -> this.verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.emptyList(), Arrays.asList(mockPool1, mockPool2), CertIdentityPoolExternalIdentifier.DN, false, generateInvalidSan), (String)"Principal should not be built for mTLS client on MT clusters");
    }

    @ParameterizedTest(name="{displayName}.generateInvalidSan={1}")
    @ValueSource(booleans={false, true})
    public void testMTlsSaslPrincipalThrowsNoMatchingIdentityProviders(boolean generateInvalidSan) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null, (String)"Expected metadata of logical cluster to be absent in metadata cache");
        Exception exception = (Exception)Assertions.assertThrows(AuthenticationException.class, () -> this.verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.emptyList(), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, false, generateInvalidSan), (String)"Principal should not be built if mTLS client certificate does not match any identity providers");
        Assertions.assertTrue((boolean)exception.getMessage().matches("Must be only one matching identity provider found when building principal for mTLS authenticated client of orgId [^ ]+, found \\[\\], Certificate chain = \\[\\{subjectDN=.+,issuerDN.+,serialNumber=.+,notBefore=.+,notAfter=[^\\}]+\\};\\]"));
    }

    @ParameterizedTest(name="{displayName}.generateInvalidSan={1}")
    @ValueSource(booleans={false, true})
    public void testMTlsSaslPrincipalThrowsMoreThanOneIdentityProviders(boolean generateInvalidSan) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null, (String)"Expected metadata of logical cluster to be absent in metadata cache");
        CaCertificatesKey identityProvider1 = new CaCertificatesKey("provider1", "org1");
        CaCertificatesKey identityProvider2 = new CaCertificatesKey("provider2", "org2");
        Exception exception = (Exception)Assertions.assertThrows(AuthenticationException.class, () -> this.verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Arrays.asList(identityProvider1, identityProvider2), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, false, generateInvalidSan), (String)"Principal should not be built if mTLS client certificate matches more than one identity providers");
        Assertions.assertTrue((boolean)exception.getMessage().matches("Must be only one matching identity provider found when building principal for mTLS authenticated client of orgId [^ ]+, found \\[CaCertificatesKey\\{[^\\}]+\\}, CaCertificatesKey\\{[^\\}]+\\}\\], Certificate chain = \\[\\{subjectDN=.+,issuerDN.+,serialNumber=.+,notBefore=.+,notAfter=[^\\}]+\\};\\]"));
    }

    @ParameterizedTest(name="{displayName}.generateInvalidSan={1}")
    @ValueSource(booleans={false, true})
    public void testMTlsSaslPrincipalThrowsClientCertIsRevoked(boolean generateInvalidSan) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null, (String)"Expected metadata of logical cluster to be absent in metadata cache");
        Assertions.assertThrows(AuthenticationException.class, () -> this.verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.singletonList((CaCertificatesKey)Mockito.mock(CaCertificatesKey.class)), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, true, generateInvalidSan), (String)"Principal should not be built if mTLS client certificate has been revoked by CRL");
    }

    @ParameterizedTest(name="{displayName}.generateInvalidSan={1}")
    @ValueSource(booleans={false, true})
    public void testMTlsSaslPrincipalThrowsNoMatchingPools(boolean generateInvalidSan) throws IOException, InterruptedException {
        Utils.deleteLogicalClusterFile(KafkaLogicalClusterUtils.LC_META_1, TEMP_DIR);
        TestUtils.waitForCondition(() -> metadata.metadata(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId()) == null, (String)"Expected metadata of logical cluster to be absent in metadata cache");
        Assertions.assertThrows(AuthenticationException.class, () -> this.verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterUtils.LC_META_ABC, false, Collections.singletonList((CaCertificatesKey)Mockito.mock(CaCertificatesKey.class)), Collections.emptyList(), CertIdentityPoolExternalIdentifier.DN, false, generateInvalidSan), (String)"Principal should not be built if mTLS client certificate does not match any identity pools");
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_NonMultiTenant() {
        KafkaPrincipal p = new KafkaPrincipal("User", "user");
        MultiTenantPrincipalBuilder mtBuilder = new MultiTenantPrincipalBuilder();
        KafkaPrincipal copy = mtBuilder.deserialize(mtBuilder.serialize(p));
        Assertions.assertEquals((Object)p, (Object)copy);
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_NonMultiTenantCompatibilityWithDefaultKafkaPrincipalBuilder() {
        KafkaPrincipal p = new KafkaPrincipal("User", "user");
        DefaultKafkaPrincipalBuilder defaultSerializer = new DefaultKafkaPrincipalBuilder(null, null);
        MultiTenantPrincipalBuilder mtDeserializer = new MultiTenantPrincipalBuilder();
        KafkaPrincipal copy = mtDeserializer.deserialize(defaultSerializer.serialize(p));
        Assertions.assertEquals((Object)p, (Object)copy);
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_MultiTenant() {
        MultiTenantPrincipalBuilder builder = new MultiTenantPrincipalBuilder();
        MultiTenantPrincipal p = new MultiTenantPrincipal("user", "saslAuthenticationId", new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true));
        Assertions.assertEquals((Object)"TenantUser", (Object)p.getPrincipalType());
        KafkaPrincipal copy = builder.deserialize(builder.serialize((KafkaPrincipal)p));
        Assertions.assertEquals((Object)p, (Object)copy);
        MultiTenantPrincipal mtp = (MultiTenantPrincipal)copy;
        Assertions.assertEquals((Object)p.tenantMetadata().userResourceId, (Object)mtp.tenantMetadata().userResourceId);
        Assertions.assertEquals((Object)p.tenantMetadata().isServiceAccount, (Object)mtp.tenantMetadata().isServiceAccount);
        Assertions.assertEquals((Object)p.tenantMetadata().isHealthcheckTenant, (Object)mtp.tenantMetadata().isHealthcheckTenant);
        Assertions.assertEquals((Object)p.tenantMetadata().isApiKeyAuthenticated, (Object)mtp.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertEquals((Object)p.identityMetadata().poolId(), (Object)mtp.identityMetadata().poolId());
        Assertions.assertNull((Object)p.identityMetadata().poolId());
        Assertions.assertEquals((Object)p.authorizationIds(), (Object)mtp.authorizationIds());
        Assertions.assertTrue((boolean)p.authorizationIds().contains("userResourceID"));
        MultiTenantPrincipalBuilder builder2 = new MultiTenantPrincipalBuilder();
        MultiTenantPrincipal p2 = new MultiTenantPrincipal("user", "saslAuthenticationId", Optional.empty(), new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true), new IdentityMetadata("poolId", "providerId", "serviceAccount", "externalIdentityId", "iss1", Collections.singletonList("aud1"), CallingResourceIdentityType.DEFAULT), Arrays.asList("userResourceId", "poolId1", "poolId2"));
        Assertions.assertEquals((Object)"TenantUser", (Object)p2.getPrincipalType());
        KafkaPrincipal copy2 = builder2.deserialize(builder2.serialize((KafkaPrincipal)p2));
        Assertions.assertEquals((Object)p2, (Object)copy2);
        MultiTenantPrincipal mtp2 = (MultiTenantPrincipal)copy2;
        Assertions.assertEquals((Object)p2.tenantMetadata().userResourceId, (Object)mtp2.tenantMetadata().userResourceId);
        Assertions.assertEquals((Object)p2.tenantMetadata().isServiceAccount, (Object)mtp2.tenantMetadata().isServiceAccount);
        Assertions.assertEquals((Object)p2.tenantMetadata().isHealthcheckTenant, (Object)mtp2.tenantMetadata().isHealthcheckTenant);
        Assertions.assertEquals((Object)p2.tenantMetadata().isApiKeyAuthenticated, (Object)mtp2.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertEquals((Object)p2.identityMetadata().callingResourceIdentityType(), (Object)mtp2.identityMetadata().callingResourceIdentityType());
        Assertions.assertEquals((Object)p2.identityMetadata().poolId(), (Object)mtp2.identityMetadata().poolId());
        Assertions.assertEquals((Object)"poolId", (Object)p2.identityMetadata().poolId());
        Assertions.assertEquals((Object)p2.authorizationIds(), (Object)mtp2.authorizationIds());
        Assertions.assertTrue((boolean)mtp2.authorizationIds().containsAll(Arrays.asList("userResourceId", "poolId1", "poolId2")));
        Assertions.assertEquals((Object)"externalIdentityId", (Object)p2.identityMetadata().externalIdentityId());
    }

    @Test
    public void testAuthorizationIdsEmptyMultiTenantPrincipalBuilder() {
        MultiTenantPrincipalBuilder builder = new MultiTenantPrincipalBuilder();
        MultiTenantPrincipal p = new MultiTenantPrincipal("user", "saslAuthenticationId", Optional.empty(), new TenantMetadata("tenantName", "clusterId", "organizationId", "environmentId", "userResourceID", true, true, true), null, Arrays.asList("abc"));
        KafkaTestUtils.setField(p, ConfluentPrincipal.class, "authorizationIds", new ArrayList(0));
        KafkaPrincipal copy = builder.deserialize(builder.serialize((KafkaPrincipal)p));
        MultiTenantPrincipal mtp = (MultiTenantPrincipal)copy;
        Assertions.assertTrue((boolean)mtp.authorizationIds().contains("userResourceID"));
        KafkaTestUtils.setField(p, ConfluentPrincipal.class, "authorizationIds", null);
        copy = builder.deserialize(builder.serialize((KafkaPrincipal)p));
        mtp = (MultiTenantPrincipal)copy;
        Assertions.assertTrue((boolean)mtp.authorizationIds().contains("userResourceID"));
    }

    @Test
    public void testMultiTenantPrincipalBuilderSerde_RejectsBadType() throws Exception {
        MultiTenantPrincipalBuilder serde = new MultiTenantPrincipalBuilder();
        Arrays.asList("Group", "whatever").forEach(principalType -> {
            SerializationException e = (SerializationException)Assertions.assertThrows(SerializationException.class, () -> serde.deserialize(serde.serialize(new KafkaPrincipal(principalType, "foo"))));
            Assertions.assertTrue((boolean)e.getMessage().startsWith("Invalid principal type "));
        });
    }

    @Test
    public void testUserTenantPrincipalBuilderSerde() {
        UserTenantPrincipalBuilder builder = new UserTenantPrincipalBuilder();
        MultiTenantPrincipal p = (MultiTenantPrincipal)builder.build(null);
        Assertions.assertEquals((Object)"TenantUser", (Object)p.getPrincipalType());
        KafkaPrincipal copy = builder.deserialize(builder.serialize((KafkaPrincipal)p));
        Assertions.assertEquals((Object)p, (Object)copy);
        MultiTenantPrincipal mtp = (MultiTenantPrincipal)copy;
        Assertions.assertEquals((Object)p.tenantMetadata().userResourceId, (Object)mtp.tenantMetadata().userResourceId);
        Assertions.assertEquals((Object)p.tenantMetadata().isServiceAccount, (Object)mtp.tenantMetadata().isServiceAccount);
        Assertions.assertEquals((Object)p.tenantMetadata().isHealthcheckTenant, (Object)mtp.tenantMetadata().isHealthcheckTenant);
        Assertions.assertEquals((Object)p.tenantMetadata().isApiKeyAuthenticated, (Object)mtp.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertEquals((Object)p.authorizationIds(), (Object)mtp.authorizationIds());
        Assertions.assertTrue((boolean)p.authorizationIds().contains(mtp.tenantMetadata().userResourceId));
    }

    @Test
    public void testSpireJwtSvidPrincipalBuilder() {
        MultiTenantPrincipalBuilder builder = new MultiTenantPrincipalBuilder();
        configs.put("confluent.spiffe.id.principal.extraction.rules", "RULE:spiffe://([^/]*?)/v1/physical/(k8s-[^/]*?)/([^/]*?)$!$3!");
        builder.configure(configs);
        this.mockOAuthSpireSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), "spiffe://dummy.trust.domain/test-workload", "spire.internal.confluent.cloud");
        MultiTenantPrincipal p = (MultiTenantPrincipal)builder.build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)"TenantUser", (Object)p.getPrincipalType());
        Assertions.assertTrue((boolean)p.authorizationIds().contains("spiffe://dummy.trust.domain/test-workload"));
        KafkaPrincipal copy = builder.deserialize(builder.serialize((KafkaPrincipal)p));
        Assertions.assertEquals((Object)p, (Object)copy);
        MultiTenantPrincipal mtp = (MultiTenantPrincipal)copy;
        Assertions.assertEquals((Object)p.tenantMetadata().userResourceId, (Object)mtp.tenantMetadata().userResourceId);
        Assertions.assertEquals((Object)p.tenantMetadata().isServiceAccount, (Object)mtp.tenantMetadata().isServiceAccount);
        Assertions.assertEquals((Object)p.tenantMetadata().isHealthcheckTenant, (Object)mtp.tenantMetadata().isHealthcheckTenant);
        Assertions.assertEquals((Object)p.tenantMetadata().isApiKeyAuthenticated, (Object)mtp.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertEquals((Object)p.authorizationIds(), (Object)mtp.authorizationIds());
        String spireIssuerSuffix = "test.spire.suffix";
        builder = new MultiTenantPrincipalBuilder();
        configs.put("authenticator.jwt.spire.issuers.suffix", spireIssuerSuffix);
        builder.configure(configs);
        this.mockOAuthSpireSaslContext(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId(), "spiffe://dummy.trust.domain/v1/physical/k8s-abc/test-workload", spireIssuerSuffix);
        p = (MultiTenantPrincipal)builder.build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)"TenantUser", (Object)p.getPrincipalType());
        Assertions.assertTrue((boolean)p.authorizationIds().contains("spiffe://dummy.trust.domain/v1/physical/k8s-abc/test-workload"));
        Assertions.assertTrue((boolean)p.authorizationIds().contains("test-workload"));
        copy = builder.deserialize(builder.serialize((KafkaPrincipal)p));
        Assertions.assertEquals((Object)p, (Object)copy);
        mtp = (MultiTenantPrincipal)copy;
        Assertions.assertEquals((Object)p.tenantMetadata().userResourceId, (Object)mtp.tenantMetadata().userResourceId);
        Assertions.assertEquals((Object)p.tenantMetadata().isServiceAccount, (Object)mtp.tenantMetadata().isServiceAccount);
        Assertions.assertEquals((Object)p.tenantMetadata().isHealthcheckTenant, (Object)mtp.tenantMetadata().isHealthcheckTenant);
        Assertions.assertEquals((Object)p.tenantMetadata().isApiKeyAuthenticated, (Object)mtp.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertEquals((Object)p.authorizationIds(), (Object)mtp.authorizationIds());
    }

    @Test
    public void testBuildPrincipalWithNullUserResourceId() {
        MultiTenantPrincipal mtp = new MultiTenantPrincipal("some-user", new TenantMetadata.Builder("some-cluster", null).build());
        MultiTenantPrincipalBuilder builder = new MultiTenantPrincipalBuilder();
        builder.serialize((KafkaPrincipal)mtp);
    }

    @Test
    public void testRecordAuthenticationSubtypeMetric() {
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = new MultiTenantPrincipalBuilder();
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("pool-1"), "123", "Non-Confluent");
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("pool-1"), "123", null);
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("u-1"), "123", "Confluent");
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("pool-1"), "pool-1", "Confluent");
        multiTenantPrincipalBuilder.recordAuthenticationSubtypeMetric(Arrays.asList("u-1", "pool-1"), "123", "Confluent");
        this.verifyAuthenticationSubtypeMetric();
    }

    @Test
    public void testExternalIdBasedOnSSLCert() throws Exception {
        X509Certificate clientCert = TestSslUtils.generateSignedCertificate((String)"CN=example.com, OU=Kafka, O=Confluent, L=Mountain View, ST=California, C=US", (KeyPair)TestSslUtils.generateKeyPair((String)"RSA"), (int)1, (int)1, null, null, (String)"SHA256withRSA", (boolean)false, (boolean)true, (boolean)false, (String[])new String[]{"example.com", "*.example.com"});
        CertificateMetadata certificateMetadata = new CertificateMetadata(clientCert);
        String cn = certificateMetadata.getCn();
        String dn = certificateMetadata.getDn();
        String snid = certificateMetadata.getSnid();
        String san = certificateMetadata.getSan();
        String sha1 = certificateMetadata.getSha1();
        Assertions.assertEquals((Object)cn, (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.CN));
        Assertions.assertEquals((Object)dn, (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.DN));
        Assertions.assertEquals((Object)snid, (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.SNID));
        Assertions.assertEquals((Object)String.format("%s, %s", cn, snid), (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.CN_SNID));
        Assertions.assertEquals((Object)san, (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.SAN));
        Assertions.assertEquals((Object)String.format("%s, %s", san, snid), (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.SAN_SNID));
        Assertions.assertEquals((Object)sha1, (Object)MultiTenantPrincipalBuilder.externalIdBasedOnSSLCert((CertificateMetadata)certificateMetadata, (CertIdentityPoolExternalIdentifier)CertIdentityPoolExternalIdentifier.SHA1));
    }

    private void verifyTenantMetadata(KafkaLogicalClusterMetadata lkcMetadata, boolean expectedSuperUser) {
        MultiTenantPrincipal principal = (MultiTenantPrincipal)ChannelBuilders.createPrincipalBuilder(configs, null, null).build((AuthenticationContext)this.context);
        Assertions.assertEquals((Object)lkcMetadata.logicalClusterId(), (Object)principal.tenantMetadata().clusterId);
        Assertions.assertEquals((Object)lkcMetadata.organizationId(), (Object)principal.tenantMetadata().organizationId);
        Assertions.assertEquals((Object)lkcMetadata.environmentId(), (Object)principal.tenantMetadata().environmentId);
        this.verifyTenantMetadataScopeAndSuperuserStatus(lkcMetadata, expectedSuperUser, principal);
    }

    private void verifySslTenantMetadataMTlsEnabled(KafkaLogicalClusterMetadata lkcMetadata, boolean expectedSuperUser, Collection<CaCertificatesKey> providers, Collection<CertIdentityPool> pools, CertIdentityPoolExternalIdentifier externalIdentifier, boolean revokeClientCert, boolean generateInvalidSan) throws GeneralSecurityException, IOException {
        X509Certificate clientCert = generateInvalidSan ? TestSslUtils.generateSignedCertificate((String)"C=US, ST=CA, L=San Francisco, O=Confluent, OU=Confluent, CN=client", (KeyPair)TestSslUtils.generateKeyPair((String)"RSA"), (int)1, (int)1, null, null, (String)"SHA256withRSA", (boolean)false, (boolean)false, (boolean)true, (byte[])new byte[]{1, 2, 3, 4}) : TestSslUtils.generateSignedCertificate((String)"C=US, ST=CA, L=San Francisco, O=Confluent, OU=Confluent, CN=client", (KeyPair)TestSslUtils.generateKeyPair((String)"RSA"), (int)1, (int)1, null, null, (String)"SHA256withRSA", (boolean)false, (boolean)false, (boolean)true, (String[])new String[]{"example.com", "*.example.com"});
        CertificateMetadata certificateMetadata = new CertificateMetadata(clientCert);
        String issuerDn = certificateMetadata.getIssuerDn();
        String dn = certificateMetadata.getDn();
        String cn = certificateMetadata.getCn();
        String snid = certificateMetadata.getSnid();
        String san = certificateMetadata.getSan();
        if (generateInvalidSan) {
            Assertions.assertThrows(CertificateParsingException.class, clientCert::getSubjectAlternativeNames);
            Assertions.assertEquals((int)0, (int)san.length());
        } else {
            Assertions.assertEquals((Object)"DNS:example.com,DNS:*.example.com", (Object)san);
        }
        this.setUpMTlsMocks(clientCert, providers, pools, revokeClientCert);
        MultiTenantPrincipalBuilder multiTenantPrincipalBuilder = (MultiTenantPrincipalBuilder)ChannelBuilders.createPrincipalBuilder(configs, null, null);
        MultiTenantPrincipal principal = (MultiTenantPrincipal)multiTenantPrincipalBuilder.build((AuthenticationContext)this.context);
        String expectedPrincipalName = "";
        if (CertIdentityPoolExternalIdentifier.CN.equals((Object)externalIdentifier)) {
            expectedPrincipalName = cn;
        } else if (CertIdentityPoolExternalIdentifier.DN.equals((Object)externalIdentifier)) {
            expectedPrincipalName = dn;
        }
        String orgId = lkcMetadata.organizationId();
        String providerId = "";
        if (providers != null && providers.size() == 1) {
            providerId = providers.iterator().next().providerId();
        }
        String expectedAuthnId = MultiTenantPrincipal.mTlsAuthenticationId((String)orgId, (String)providerId, (String)issuerDn, (String)snid);
        this.verifyPrincipalProps(lkcMetadata, expectedPrincipalName, expectedSuperUser, expectedAuthnId, principal, pools, providerId);
        byte[] serializedPrincipal = multiTenantPrincipalBuilder.serialize((KafkaPrincipal)principal);
        MultiTenantPrincipal deserializedPrincipal = (MultiTenantPrincipal)multiTenantPrincipalBuilder.deserialize(serializedPrincipal);
        Assertions.assertEquals((Object)principal, (Object)deserializedPrincipal);
        this.verifyPrincipalProps(lkcMetadata, expectedPrincipalName, expectedSuperUser, expectedAuthnId, deserializedPrincipal, pools, providerId);
    }

    private void verifyPrincipalProps(KafkaLogicalClusterMetadata lkcMetadata, String expectedPrincipalName, boolean expectedSuperUser, String expectedAuthnId, MultiTenantPrincipal principal, Collection<CertIdentityPool> pools, String expectedProviderId) {
        Assertions.assertEquals((Object)String.format("%s_%s", lkcMetadata.logicalClusterId(), expectedPrincipalName), (Object)principal.getName());
        Assertions.assertEquals((Object)expectedAuthnId, (Object)principal.authenticationId());
        Assertions.assertFalse((boolean)principal.networkId().isPresent());
        Assertions.assertEquals((Object)lkcMetadata.logicalClusterId(), (Object)principal.tenantMetadata().clusterId);
        Assertions.assertNotNull((Object)principal.tenantMetadata().userResourceId);
        Set expectPoolIds = pools.stream().map(CertIdentityPool::poolId).collect(Collectors.toSet());
        HashSet<String> poolIds = new HashSet<String>(Arrays.asList(principal.tenantMetadata().userResourceId.split(",")));
        Assertions.assertEquals(expectPoolIds, poolIds);
        Assertions.assertEquals((Object)lkcMetadata.organizationId(), (Object)principal.tenantMetadata().organizationId);
        Assertions.assertEquals((Object)lkcMetadata.environmentId(), (Object)principal.tenantMetadata().environmentId);
        Assertions.assertFalse((boolean)principal.tenantMetadata().isServiceAccount);
        Assertions.assertFalse((boolean)principal.tenantMetadata().isApiKeyAuthenticated);
        Assertions.assertFalse((boolean)principal.tenantMetadata().isHealthcheckTenant);
        this.verifyTenantMetadataScopeAndSuperuserStatus(lkcMetadata, expectedSuperUser, principal);
        Assertions.assertEquals((Object)new IdentityMetadata(null, expectedProviderId, expectedPrincipalName, null), (Object)principal.identityMetadata());
        Assertions.assertEquals(expectPoolIds, new HashSet(principal.authorizationIds()));
    }

    private void verifyTenantMetadataScopeAndSuperuserStatus(KafkaLogicalClusterMetadata lkcMetadata, boolean expectedSuperUser, MultiTenantPrincipal principal) {
        Scope expectedScope = new Scope.Builder(new String[0]).addPath("organization=" + lkcMetadata.organizationId()).addPath("environment=" + lkcMetadata.environmentId()).addPath("cloud-cluster=" + lkcMetadata.logicalClusterId()).withKafkaCluster(lkcMetadata.logicalClusterId()).build();
        Assertions.assertEquals((Object)expectedScope, (Object)principal.tenantMetadata().scope());
        String sameTenant = principal.tenantMetadata().clusterId + "_";
        Action action = new Action(Scope.kafkaClusterScope((String)"foo"), ResourceType.ALL, sameTenant, Operation.ALL);
        boolean enableDataplaneRbacForPkc = "true".equals(configs.get("confluent.metadata.kafka.enable.dataplane.rbac"));
        boolean oauthSuperUserDisabled = "true".equals(configs.get("multitenant.oauth.superuser.disable"));
        Assertions.assertEquals((Object)expectedSuperUser, (Object)MultiTenantAuthorizer.isSuperUser((MultiTenantPrincipal)principal, (Action)action, (boolean)false, (boolean)enableDataplaneRbacForPkc, (boolean)oauthSuperUserDisabled));
    }

    private void mockOAuthSaslContext(String clusterId) {
        this.mockOAuthSaslContext(clusterId, false, true);
    }

    private void mockOAuthSaslContext(String clusterId, boolean internalUser) {
        this.mockOAuthSaslContext(clusterId, internalUser, true);
    }

    private void mockOAuthSaslContext(String clusterId, boolean internalUser, boolean withUserResourceId) {
        String userId = internalUser ? "0" : "user";
        String userResourceId = withUserResourceId ? userId : null;
        this.mockOAuthSaslContext(clusterId, userId, userResourceId);
    }

    private void mockOAuthSaslContext(String clusterId, String userId, String userResourceId) {
        Map<String, Object> jwtClaims = null;
        if (userResourceId != null) {
            jwtClaims = Collections.singletonMap("userResourceId", userResourceId);
        }
        this.mockOAuthSaslContext(clusterId, userId, jwtClaims);
    }

    private void mockOAuthSaslContext(String clusterId, String userId, Map<String, Object> jwtClaims) {
        SaslServer saslServer = (SaslServer)Mockito.mock(OAuthBearerSaslServer.class);
        Mockito.when((Object)saslServer.getNegotiatedProperty(OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY)).thenReturn((Object)new OAuthBearerJwsToken("", null, 0L, userId, Long.valueOf(0L), jwtClaims));
        Mockito.when((Object)saslServer.getNegotiatedProperty("logicalCluster")).thenReturn((Object)clusterId);
        this.context = new SaslAuthenticationContext(saslServer, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_PLAINTEXT.name());
    }

    private void mockOAuthUnionOfPoolsEnabledSaslContext(String clusterId, boolean useUnionofPools) {
        SaslServer saslServer = (SaslServer)Mockito.mock(OAuthBearerSaslServer.class);
        HashMap claims = new HashMap();
        Mockito.when((Object)saslServer.getNegotiatedProperty(OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY)).thenReturn((Object)new OAuthBearerJwsToken("", null, 0L, "user", Long.valueOf(0L), claims, NON_CONFLUENT_ISSUER));
        Mockito.when((Object)saslServer.getNegotiatedProperty("logicalCluster")).thenReturn((Object)clusterId);
        Mockito.when((Object)saslServer.getNegotiatedProperty("identityPoolId-azp")).thenReturn((Object)SUB_CLAIM_VALUE);
        if (useUnionofPools) {
            Mockito.when((Object)saslServer.getNegotiatedProperty("identityPoolId-identityPoolId")).thenReturn((Object)"pool-A,pool-B,pool-C");
            Mockito.when((Object)saslServer.getNegotiatedProperty("identityPoolId-sub")).thenReturn((Object)"OAuth-ClientCredentials");
            Mockito.when((Object)saslServer.getNegotiatedProperty("identityPoolId")).thenReturn((Object)"OAuth-ClientCredentials");
        } else {
            Mockito.when((Object)saslServer.getNegotiatedProperty("identityPoolId-sub")).thenReturn((Object)"pool-A");
            Mockito.when((Object)saslServer.getNegotiatedProperty("identityPoolId")).thenReturn((Object)"pool-A");
        }
        this.context = new SaslAuthenticationContext(saslServer, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_PLAINTEXT.name());
    }

    private void mockOAuthSpireSaslContext(String clusterId, String spiffeId, String spireIssuerSuffix) {
        SaslServer saslServer = (SaslServer)Mockito.mock(OAuthBearerSaslServer.class);
        Mockito.when((Object)saslServer.getNegotiatedProperty(OAUTH_NEGOTIATED_TOKEN_PROPERTY_KEY)).thenReturn((Object)new OAuthBearerJwsToken("", null, 0L, spiffeId, Long.valueOf(0L), null, "test.prefix." + spireIssuerSuffix));
        Mockito.when((Object)saslServer.getNegotiatedProperty("logicalCluster")).thenReturn((Object)clusterId);
        this.context = new SaslAuthenticationContext(saslServer, SecurityProtocol.SASL_SSL, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_SSL.name());
    }

    private void mockPlainSaslContext(String clusterId, boolean userAccount) {
        this.mockPlainSaslContext(clusterId, userAccount, false, true);
    }

    private void mockPlainSaslContext(String clusterId, boolean userAccount, boolean internalUser) {
        this.mockPlainSaslContext(clusterId, userAccount, internalUser, true);
    }

    private void mockPlainSaslContext(String clusterId, boolean userAccount, boolean internalUser, boolean withUserResourceId) {
        boolean healthcheckTenant = KafkaLogicalClusterUtils.LC_META_HEALTHCHECK.equals((Object)clusterId) || KafkaLogicalClusterUtils.LC_META_LINK_HEALTHCHECK.equals((Object)clusterId);
        String user = internalUser ? "0" : "user";
        MultiTenantSaslServer saslServer = (MultiTenantSaslServer)Mockito.mock(MultiTenantSaslServer.class);
        TenantMetadata tenantMetadata = withUserResourceId ? new TenantMetadata.Builder(clusterId, "u-" + user).serviceAccount(!userAccount).healthcheckTenant(healthcheckTenant).apiKeyAuthenticated(true).build() : new TenantMetadata.Builder(clusterId, null).serviceAccount(!userAccount).healthcheckTenant(healthcheckTenant).apiKeyAuthenticated(true).build();
        Mockito.when((Object)saslServer.tenantMetadata()).thenReturn((Object)tenantMetadata);
        Mockito.when((Object)saslServer.getAuthorizationID()).thenReturn((Object)user);
        this.context = new SaslAuthenticationContext((SaslServer)saslServer, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_PLAINTEXT.name());
    }

    private void setUpMTlsMocks(X509Certificate clientCert, Collection<CaCertificatesKey> providers, Collection<CertIdentityPool> pools, boolean revokeClientCert) throws IOException {
        Certificate[] certs = new Certificate[]{clientCert};
        SSLSession sslSession = (SSLSession)Mockito.mock(SSLSession.class);
        Mockito.when((Object)sslSession.getPeerCertificates()).thenReturn((Object)certs);
        this.context = new SaslAuthenticationContext(-1L, null, SecurityProtocol.SASL_SSL, InetAddress.getLoopbackAddress(), SecurityProtocol.SASL_SSL.name(), Optional.of(sslSession), true, false);
        AuthStore mockStore = (AuthStore)Mockito.mock(AuthStore.class);
        AuthCache mockCache = (AuthCache)Mockito.mock(AuthCache.class);
        AuthStore.addInstance((String)BROKER_UUID, (AuthStore)mockStore, (Logger)((Logger)Mockito.mock(Logger.class)));
        Mockito.when((Object)mockStore.authCache()).thenReturn((Object)mockCache);
        Mockito.when((Object)mockCache.findCertIdentityProviders((Certificate[])ArgumentMatchers.any(), (String)ArgumentMatchers.any())).thenReturn(providers);
        Mockito.when((Object)mockCache.isRevoked((Certificate[])ArgumentMatchers.eq((Object)certs), (String)ArgumentMatchers.any(), (String)ArgumentMatchers.any())).thenReturn((Object)revokeClientCert);
        Mockito.when((Object)mockCache.findCertIdentityPools(ArgumentMatchers.anyMap(), (String)ArgumentMatchers.any(), (String)ArgumentMatchers.any())).thenReturn(pools);
    }

    private void verifyOrgPropsMetric() {
        for (int i = 0; i < 10; ++i) {
            ChannelBuilders.createPrincipalBuilder(configs, null, null).build((AuthenticationContext)this.context);
        }
        List orgPropsMetrics = KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).getName().equals("org-props-missing-rate")).map(Map.Entry::getValue).collect(Collectors.toList());
        Assertions.assertTrue((orgPropsMetrics.size() > 0 ? 1 : 0) != 0);
        Assertions.assertTrue(((int)((Meter)orgPropsMetrics.get(0)).count() >= 10 ? 1 : 0) != 0);
    }

    private void verifyAuthenticationSubtypeMetric() {
        KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).getName().equals("authentication-subtype-rate")).forEach(e -> Assertions.assertTrue((((Meter)e.getValue()).count() >= 1L ? 1 : 0) != 0));
    }

    private static void createAuthStore() throws Exception {
        authStore = MockBasicAuthStore.create((String)BROKER_UUID);
        authCache = (MockTrustCache)authStore.trustCache();
    }

    private static void clearYammerMetrics() {
        for (MetricName metricName : KafkaYammerMetrics.defaultRegistry().allMetrics().keySet()) {
            KafkaYammerMetrics.defaultRegistry().removeMetric(metricName);
        }
    }

    static {
        TEMP_DIR = TestUtils.tempDirectory().toPath();
    }
}

