/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import com.nimbusds.jose.jwk.JWK;
import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.oauth.mockserver.common.ServerFailedToConnectException;
import io.confluent.security.auth.oauth.mockserver.server.MockOAuthServer;
import io.confluent.security.auth.oauth.mockserver.server.TokenBuilder;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import io.confluent.security.auth.store.data.AuthKey;
import io.confluent.security.auth.store.data.AuthValue;
import io.confluent.security.auth.store.data.JwtIssuerKey;
import io.confluent.security.auth.store.data.JwtIssuerValue;
import io.confluent.security.authorizer.Scope;
import java.io.IOException;
import java.nio.file.Path;
import java.security.interfaces.RSAPublicKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.metadata.TopicPlacement;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:size:medium")})
public class OAuthEnhancedValidatorIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(OAuthEnhancedValidatorIntegrationTest.class);
    private static final String CONSUMER_OFFSET_PLACEMENT_CONSTRAINT = "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"rack-1\"}}]}";
    private static final String CONFLUENT_JWT_ISSUER = "confluent";
    private final String logicalClusterId = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private final String orgResourceId = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
    private final Properties adminProperties = new Properties();
    private static final String NON_CONFLUENT_JWT_ISSUER = "abcd";
    private static final String NON_CONFLUENT_JWKS_URI = "abcd";
    private static final String CONFLUENT_ISSUER = "Confluent";
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private String brokerUUID;
    private PhysicalClusterMetadata metadata;
    private final String testTopic = "abcd";
    private final String clientId = "client_id";
    private final String clientSecret = "client_Secret";
    private final List<NewTopic> sampleTopics;
    private LogicalClusterUser testUser;
    private TestInfo testInfo;
    private PhysicalCluster physicalCluster;
    private MockBasicAuthStore authStore;
    private final String lkcMetadataTopic = "_confluent-logical_clusters";
    private final AtomicInteger sequenceId;
    long topicCreateTimeout;

    public OAuthEnhancedValidatorIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        this.testTopic = "abcd";
        this.clientId = "client_id";
        this.clientSecret = "client_Secret";
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, 1));
        this.lkcMetadataTopic = "_confluent-logical_clusters";
        this.sequenceId = new AtomicInteger();
        this.topicCreateTimeout = 15000L;
    }

    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        this.testInfo = testInfo;
    }

    public void setUp() throws Exception {
        this.setUp("abcd", this.orgResourceId);
    }

    private void setUp(String issuer, String orgResourceId) throws Exception {
        this.setUp(issuer, orgResourceId, null, null, null);
    }

    private void setUp(String issuer, String orgResourceId, String aud, Map<String, String> additionalClaims, Map<String, String> configOverrides) throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        boolean serviceUserId = true;
        String subject = "1";
        this.jwsContainer = new OAuthUtils.Builder(100000, issuer, subject, orgResourceId).jku("https://localhost/keys").withKid(true).audience(aud).additionalClaims(additionalClaims).build();
        this.authStore = MockBasicAuthStore.create();
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, this.topicCreateTimeout, this.setUpMetadata(this.nodeProps(configOverrides)), this.nodeProps(configOverrides));
        int adminUserId = 100;
        LogicalCluster logicalCluster = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1);
        this.testUser = logicalCluster.user(1);
        this.addAdminAcls();
        MockTrustCache cache = (MockTrustCache)this.authStore.trustCache();
        RsaJsonWebKey key = new RsaJsonWebKey((RSAPublicKey)this.jwsContainer.verificationKey());
        key.setKeyId(this.jwsContainer.getKid());
        cache.put((AuthKey)new JwtIssuerKey(issuer, null, ""), (AuthValue)new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{key})));
        this.loadLkcMetadata();
    }

    private void setUpMockOAuth() throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        boolean serviceUserId = true;
        String subject = "1";
        this.jwsContainer = new OAuthUtils.Builder(0, null, subject, this.orgResourceId).build();
        this.authStore = MockBasicAuthStore.create();
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, this.topicCreateTimeout, this.setUpMetadata(this.nodeProps(Collections.emptyMap())), this.nodeProps(Collections.emptyMap()));
        int adminUserId = 100;
        LogicalCluster logicalCluster = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1);
        this.testUser = logicalCluster.user(1);
        this.addAdminAcls();
        this.loadLkcMetadata();
    }

    private void addAdminAcls() {
        this.testHarness.newAclCommand().addTopicAclArgs(this.testUser.prefixedKafkaPrincipal(), this.testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.testHarness.shutdown();
            this.authStore.close();
        }
        finally {
            KafkaTestUtils.verifyThreadCleanup();
        }
    }

    private Properties setUpMetadata(Properties brokerProps) throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        brokerProps.put("broker.session.uuid", this.brokerUUID);
        return brokerProps;
    }

    private Properties nodeProps(Map<String, String> configOverrides) {
        Properties props = IntegrationTestHarness.defaultOAuthBrokerProps();
        Path pathToConfigFile = null;
        try {
            pathToConfigFile = JarResourceLoader.loadFileFromResourceWithClassLoader(this.getClass(), (String)"AuthConfigEnhanced.yaml").toPath();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        props.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + String.valueOf(this.jwsContainer.getPublicKeyFile().toPath()) + "\"authenticator.jwt.config.url=\"" + String.valueOf(pathToConfigFile) + "\";");
        props.put("confluent.offsets.topic.placement.constraints", CONSUMER_OFFSET_PLACEMENT_CONSTRAINT);
        props.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
        props.put("confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters");
        props.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
        if (configOverrides != null) {
            props.putAll(configOverrides);
        }
        return props;
    }

    private String clientJaasConfig(String jwsToken, String allowedCluster) {
        return IntegrationTestHarness.clientOAuthJaasConfig(jwsToken, allowedCluster);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testMockOAuthServerForConfluentIssuer(String quorum) throws Exception {
        this.setUpMockOAuth();
        boolean serviceUserId = true;
        String subject = "1";
        MockOAuthServer mockOAuthServer = new MockOAuthServer();
        mockOAuthServer.startServer(61234);
        try {
            mockOAuthServer.connectToServer();
        }
        catch (ServerFailedToConnectException exception) {
            log.error("Server failed to connect");
            Assertions.fail();
        }
        ArrayList<String> clustersAllowed = new ArrayList<String>();
        clustersAllowed.add(this.logicalClusterId);
        String token = new TokenBuilder("client_Secret", "client_id", mockOAuthServer).setExpiry(100000).setIssuer(CONFLUENT_ISSUER).setSubject(subject).addClaims("userResourceId", "UR1").addClaims("userId", "UI1").addClaims("clusters", clustersAllowed).addClaims("orgResourceId", this.orgResourceId).build();
        AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(token, this.logicalClusterId), this.adminProperties);
        client.createTopics(this.sampleTopics).all().get();
        Assertions.assertNotNull((Object)token);
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
        Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)((KafkaPrincipal)authenticationEvent.principal().get()).toString().contains("tenantMetadata"));
        Scope scope = new Scope.Builder(new String[0]).addPath("organization=" + KafkaLogicalClusterUtils.LC_META_ABC.organizationId()).addPath("environment=" + KafkaLogicalClusterUtils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).withKafkaCluster(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).build();
        Assertions.assertEquals((Object)scope, (Object)authenticationEvent.getScope());
        SaslAuthenticationContext authenticationContext = (SaslAuthenticationContext)authenticationEvent.authenticationContext();
        Assertions.assertEquals((Object)"1", (Object)authenticationContext.server().getAuthorizationID());
        mockOAuthServer.stopServer();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testMockOAuthServerForNonConfluentIssuer(String quorum) throws Exception {
        String[] arr;
        this.setUpMockOAuth();
        boolean serviceUserId = true;
        String subject = "1";
        MockOAuthServer mockOAuthServer = new MockOAuthServer();
        mockOAuthServer.startServer(62234, true);
        ArrayList<String> clustersAllowed = new ArrayList<String>();
        clustersAllowed.add(this.logicalClusterId);
        String token = new TokenBuilder("client_Secret", "client_id", mockOAuthServer).setExpiry(100000).setIssuer("abcd").setSubject(subject).addClaims("userResourceId", "UR1").addClaims("userId", "UI1").addClaims("clusters", clustersAllowed).addClaims("orgResourceId", this.orgResourceId).build();
        JWK key = mockOAuthServer.getKeys().get("abcd");
        MockTrustCache cache = (MockTrustCache)this.authStore.trustCache();
        HashMap<String, String> params = new HashMap<String, String>();
        String keyNew = key.toString();
        keyNew = keyNew.substring(1, keyNew.length() - 1);
        for (String s : arr = keyNew.split(",")) {
            String[] arrNew = s.split(":");
            String a1 = arrNew[0];
            String a2 = arrNew[1];
            params.put(a1.substring(1, a1.length() - 1), a2.substring(1, a2.length() - 1));
        }
        RsaJsonWebKey key1 = new RsaJsonWebKey(params, null);
        key1.setKeyId("abcd");
        cache.put((AuthKey)new JwtIssuerKey("abcd", null, ""), (AuthValue)new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{key1})));
        AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(token, this.logicalClusterId), this.adminProperties);
        client.createTopics(this.sampleTopics).all().get();
        Assertions.assertNotNull((Object)token);
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
        Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)((KafkaPrincipal)authenticationEvent.principal().get()).toString().contains("tenantMetadata"));
        Scope scope = new Scope.Builder(new String[0]).addPath("organization=" + KafkaLogicalClusterUtils.LC_META_ABC.organizationId()).addPath("environment=" + KafkaLogicalClusterUtils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).withKafkaCluster(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).build();
        Assertions.assertEquals((Object)scope, (Object)authenticationEvent.getScope());
        SaslAuthenticationContext authenticationContext = (SaslAuthenticationContext)authenticationEvent.authenticationContext();
        Assertions.assertEquals((Object)"1", (Object)authenticationContext.server().getAuthorizationID());
        mockOAuthServer.stopServer();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testCorrectConfigurationAuthenticatesSuccessfully(String quorum) throws Exception {
        this.setUp();
        AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        client.createTopics(this.sampleTopics).all().get();
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
        Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)((KafkaPrincipal)authenticationEvent.principal().get()).toString().contains("tenantMetadata"));
        Scope scope = new Scope.Builder(new String[0]).addPath("organization=" + KafkaLogicalClusterUtils.LC_META_ABC.organizationId()).addPath("environment=" + KafkaLogicalClusterUtils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).withKafkaCluster(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).build();
        Assertions.assertEquals((Object)scope, (Object)authenticationEvent.getScope());
        SaslAuthenticationContext authenticationContext = (SaslAuthenticationContext)authenticationEvent.authenticationContext();
        Assertions.assertEquals((Object)"1", (Object)authenticationContext.server().getAuthorizationID());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testClusterInExtensionNotBelongToTokenOrgThrowsException(String quorum) throws Exception {
        this.setUp(CONFLUENT_JWT_ISSUER, "different_org");
        Class<SaslAuthenticationException> expectedException = SaslAuthenticationException.class;
        try {
            AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
            client.createTopics(this.sampleTopics).all().get();
            Assertions.fail((String)String.format("Expected admin command to throw a %s", expectedException));
        }
        catch (Exception e) {
            if (e.getCause().getClass() != expectedException) {
                Assertions.fail((String)String.format("Expected admin command to throw a %s but it threw a %s", expectedException, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", (Object)e.getCause().getMessage());
        }
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.getScope().toString().contains("kafka-cluster=" + this.logicalClusterId));
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains("logical cluster " + this.logicalClusterId + " does not belong to the org"));
        Assertions.assertEquals((Object)"1", (Object)errorInfo.identifier());
        Assertions.assertEquals((Object)this.logicalClusterId, errorInfo.saslExtensions().get("logicalCluster"));
    }

    @Disabled(value="TODO: KSECURITY-2486 - Convert testAudClaimInTokenThrowsException to kraft")
    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testAudClaimInTokenThrowsException(String quorum) throws Exception {
        this.setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, "CONTROL_PLANE", null, null);
        Class<SaslAuthenticationException> expectedException = SaslAuthenticationException.class;
        try {
            AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
            client.createTopics(this.sampleTopics).all().get();
            Assertions.fail((String)String.format("Expected admin command to throw a %s", expectedException));
        }
        catch (Exception e) {
            if (e.getCause().getClass() != expectedException) {
                Assertions.fail((String)String.format("Expected admin command to throw a %s but it threw a %s", expectedException, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", (Object)e.getCause().getMessage());
        }
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        Assertions.assertTrue((boolean)authenticationException.errorMessage().contains("AUD_CLAIM_MISMATCH"));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testConfluentIssuerRequiredSuccessfulAuth(String quorum) throws Exception {
        this.setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, null, null, Collections.singletonMap("confluent.require.confluent.issuer", "true"));
        AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        client.createTopics(this.sampleTopics).all().get();
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
        Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testConfluentIssuerRequiredFailedAuth(String quorum) throws Exception {
        this.setUp("abcd", this.orgResourceId, null, null, Collections.singletonMap("confluent.require.confluent.issuer", "true"));
        Class<SaslAuthenticationException> expectedException = SaslAuthenticationException.class;
        try {
            AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
            client.createTopics(this.sampleTopics).all().get();
            Assertions.fail((String)String.format("Expected admin command to throw a %s", expectedException));
        }
        catch (Exception e) {
            if (e.getCause().getClass() != expectedException) {
                Assertions.fail((String)String.format("Expected admin command to throw a %s but it threw a %s", expectedException, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", (Object)e.getCause().getMessage());
        }
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        Assertions.assertTrue((boolean)authenticationException.errorMessage().contains("ISSUER_INVALID"));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testCallingResourceIdentityRequiredSuccessfulAuth(String quorum) throws Exception {
        HashMap<String, String> configOverrides = new HashMap<String, String>();
        configOverrides.put("confluent.require.confluent.issuer", "true");
        configOverrides.put("confluent.require.calling.resource.identity", "true");
        this.setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, null, Collections.singletonMap("calling_resource_identity", "cri"), configOverrides);
        AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        client.createTopics(this.sampleTopics).all().get();
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
        Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testCallingResourceIdentityRequiredFailedAuth(String quorum) throws Exception {
        HashMap<String, String> configOverrides = new HashMap<String, String>();
        configOverrides.put("confluent.require.confluent.issuer", "true");
        configOverrides.put("confluent.require.calling.resource.identity", "true");
        this.setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, null, null, configOverrides);
        Class<SaslAuthenticationException> expectedException = SaslAuthenticationException.class;
        try {
            AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
            client.createTopics(this.sampleTopics).all().get();
            Assertions.fail((String)String.format("Expected admin command to throw a %s", expectedException));
        }
        catch (Exception e) {
            if (e.getCause().getClass() != expectedException) {
                Assertions.fail((String)String.format("Expected admin command to throw a %s but it threw a %s", expectedException, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", (Object)e.getCause().getMessage());
        }
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains(String.format("Expected %s claim, but none was found", "calling_resource_identity")));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testClusterInExtensionNotHostedOnBrokerThrowsException(String quorum) throws Exception {
        this.setUp();
        Class<SaslAuthenticationException> expectedException = SaslAuthenticationException.class;
        try {
            AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), "other-cluster"), this.adminProperties);
            client.createTopics(this.sampleTopics).all().get();
            Assertions.fail((String)String.format("Expected admin command to throw a %s", expectedException));
        }
        catch (Exception e) {
            if (e.getCause().getClass() != expectedException) {
                Assertions.fail((String)String.format("Expected admin command to throw a %s but it threw a %s", expectedException, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", (Object)e.getCause().getMessage());
        }
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.getScope().toString().contains("kafka-cluster=other-cluster"));
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains("cluster other-cluster is not hosted on this broker"));
        Assertions.assertEquals((Object)"1", (Object)errorInfo.identifier());
        Assertions.assertEquals((Object)"other-cluster", errorInfo.saslExtensions().get("logicalCluster"));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testIllegalStateExceptionOnBrokerMetadataThrowsException(String quorum) throws Exception {
        this.setUp();
        Class<SaslAuthenticationException> expectedException = SaslAuthenticationException.class;
        try {
            this.closeLkcMetadataCache();
            AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId + "1"), this.adminProperties);
            client.createTopics(this.sampleTopics).all().get();
            Assertions.fail((String)String.format("Expected admin command to throw a %s", expectedException));
        }
        catch (Exception e) {
            if (e.getCause().getClass() != expectedException) {
                Assertions.fail((String)String.format("Expected admin command to throw a %s but it threw a %s", expectedException, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", (Object)e.getCause().getMessage());
        }
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains("Could not get cluster metadata to validate the token"));
        Assertions.assertEquals((Object)"1", (Object)errorInfo.identifier());
        Assertions.assertTrue((boolean)errorInfo.clusterId().isEmpty());
    }

    @Disabled(value="KSECURITY-1963")
    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testPlacementConstraintForGroupCoordinators(String quorum) throws Throwable {
        this.setUp();
        String jaasConfig = this.clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId);
        String topic = "test-topic";
        String consumerGroup = "test-cg";
        try (KafkaProducer<String, String> producer = this.testHarness.createOAuthProducer(jaasConfig, this.adminProperties);){
            KafkaTestUtils.sendRecords(producer, topic, 0, 10);
        }
        try (KafkaConsumer<String, String> consumer = this.testHarness.createOAuthConsumer(consumerGroup, jaasConfig, this.adminProperties);){
            KafkaTestUtils.consumeRecords(consumer, topic, 0, 10);
            consumer.commitSync();
        }
        AdminClient adminClient = this.physicalCluster.superAdminClient();
        ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, "__consumer_offsets");
        Set<ConfigResource> configsToDescribe = Collections.singleton(topicConfigResource);
        AtomicReference topicConfig = new AtomicReference();
        AtomicReference topicDescription = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            DescribeConfigsResult describeConfigResult = adminClient.describeConfigs((Collection)configsToDescribe);
            topicConfig.set(describeConfigResult);
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton("__consumer_offsets"));
            topicDescription.set(describeTopicsResult);
            Map configResourceConfigMap = (Map)describeConfigResult.all().get(5L, TimeUnit.SECONDS);
            return configResourceConfigMap.size() == 1;
        }, (String)"Unable to find consumer offset topic.");
        Assertions.assertNotNull(topicConfig.get(), (String)"Unable to get consumer offset topic configuration.");
        Config config = (Config)((Map)((DescribeConfigsResult)topicConfig.get()).all().get()).get(topicConfigResource);
        Assertions.assertEquals((Object)TopicPlacement.parse((String)CONSUMER_OFFSET_PLACEMENT_CONSTRAINT), (Object)TopicPlacement.parse((String)config.get("confluent.placement.constraints").value()));
        Assertions.assertNotNull(topicDescription.get(), (String)"Unable to get consumer offset topic description.");
        TopicDescription offsetTopicDescription = (TopicDescription)((Map)((DescribeTopicsResult)topicDescription.get()).allTopicNames().get()).get("__consumer_offsets");
        List offsetTopicPartitions = offsetTopicDescription.partitions();
        offsetTopicPartitions.forEach(tpi -> Assertions.assertEquals((int)1, (int)tpi.replicas().size(), (String)"More than one replica found."));
    }

    public void loadLkcMetadata() {
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", this.sequenceId.incrementAndGet(), this.logicalClusterId, KafkaLogicalClusterUtils.LC_META_ABC);
    }

    public void closeLkcMetadataCache() {
        BasePhysicalClusterMetadata.getInstance((String)this.brokerUUID).close(this.brokerUUID);
    }
}

