package io.confluent.kafka.multitenant.integration.test;

import com.nimbusds.jose.jwk.JWK;
import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.oauth.mockserver.common.ServerFailedToConnectException;
import io.confluent.security.auth.oauth.mockserver.server.MockOAuthServer;
import io.confluent.security.auth.oauth.mockserver.server.TokenBuilder;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import io.confluent.security.auth.store.data.JwtIssuerKey;
import io.confluent.security.auth.store.data.JwtIssuerValue;
import io.confluent.security.authorizer.Scope;
import java.io.IOException;
import java.nio.file.Path;
import java.security.interfaces.RSAPublicKey;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.metadata.TopicPlacement;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags({@Tag("integration"), @Tag("bazel:size:medium")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/OAuthEnhancedValidatorIntegrationTest.class */
public class OAuthEnhancedValidatorIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(OAuthEnhancedValidatorIntegrationTest.class);
    private static final String CONSUMER_OFFSET_PLACEMENT_CONSTRAINT = "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"rack-1\"}}]}";
    private static final String CONFLUENT_JWT_ISSUER = "confluent";
    private final String logicalClusterId = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private final String orgResourceId = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
    private final Properties adminProperties = new Properties();
    private static final String NON_CONFLUENT_JWT_ISSUER = "abcd";
    private static final String NON_CONFLUENT_JWKS_URI = "abcd";
    private static final String CONFLUENT_ISSUER = "Confluent";
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private String brokerUUID;
    private PhysicalClusterMetadata metadata;
    private final String testTopic = "abcd";
    private final String clientId = "client_id";
    private final String clientSecret = "client_Secret";
    private final List<NewTopic> sampleTopics;
    private LogicalClusterUser testUser;
    private TestInfo testInfo;
    private PhysicalCluster physicalCluster;
    private MockBasicAuthStore authStore;
    private final String lkcMetadataTopic = "_confluent-logical_clusters";
    private final AtomicInteger sequenceId;
    long topicCreateTimeout;

    public OAuthEnhancedValidatorIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        this.testTopic = "abcd";
        this.clientId = "client_id";
        this.clientSecret = "client_Secret";
        this.sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, (short) 1));
        this.lkcMetadataTopic = "_confluent-logical_clusters";
        this.sequenceId = new AtomicInteger();
        this.topicCreateTimeout = 15000L;
    }

    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        this.testInfo = testInfo;
    }

    public void setUp() throws Exception {
        setUp("abcd", this.orgResourceId);
    }

    private void setUp(String str, String str2) throws Exception {
        setUp(str, str2, null, null, null);
    }

    private void setUp(String str, String str2, String str3, Map<String, String> map, Map<String, String> map2) throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        this.jwsContainer = new OAuthUtils.Builder(100000, str, "1", str2).jku("https://localhost/keys").withKid(true).audience(str3).additionalClaims(map).build();
        this.authStore = MockBasicAuthStore.create();
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, this.topicCreateTimeout, setUpMetadata(nodeProps(map2)), nodeProps(map2));
        this.testUser = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1).user(1);
        addAdminAcls();
        MockTrustCache trustCache = this.authStore.trustCache();
        JsonWebKey rsaJsonWebKey = new RsaJsonWebKey((RSAPublicKey) this.jwsContainer.verificationKey());
        rsaJsonWebKey.setKeyId(this.jwsContainer.getKid());
        trustCache.put(new JwtIssuerKey(str, (String) null, ""), new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{rsaJsonWebKey})));
        loadLkcMetadata();
    }

    private void setUpMockOAuth() throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        this.jwsContainer = new OAuthUtils.Builder(0, null, "1", this.orgResourceId).build();
        this.authStore = MockBasicAuthStore.create();
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, this.topicCreateTimeout, setUpMetadata(nodeProps(Collections.emptyMap())), nodeProps(Collections.emptyMap()));
        this.testUser = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1).user(1);
        addAdminAcls();
        loadLkcMetadata();
    }

    private void addAdminAcls() {
        this.testHarness.newAclCommand().addTopicAclArgs(this.testUser.prefixedKafkaPrincipal(), this.testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.testHarness.shutdown();
            this.authStore.close();
        } finally {
            KafkaTestUtils.verifyThreadCleanup();
        }
    }

    private Properties setUpMetadata(Properties properties) throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        properties.put("broker.session.uuid", this.brokerUUID);
        return properties;
    }

    private Properties nodeProps(Map<String, String> map) {
        Properties defaultOAuthBrokerProps = IntegrationTestHarness.defaultOAuthBrokerProps();
        Path path = null;
        try {
            path = JarResourceLoader.loadFileFromResourceWithClassLoader(getClass(), "AuthConfigEnhanced.yaml").toPath();
        } catch (Exception e) {
            e.printStackTrace();
        }
        defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + String.valueOf(this.jwsContainer.getPublicKeyFile().toPath()) + "\"authenticator.jwt.config.url=\"" + String.valueOf(path) + "\";");
        defaultOAuthBrokerProps.put("confluent.offsets.topic.placement.constraints", CONSUMER_OFFSET_PLACEMENT_CONSTRAINT);
        defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
        defaultOAuthBrokerProps.put("confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters");
        defaultOAuthBrokerProps.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
        if (map != null) {
            defaultOAuthBrokerProps.putAll(map);
        }
        return defaultOAuthBrokerProps;
    }

    private String clientJaasConfig(String str, String str2) {
        return IntegrationTestHarness.clientOAuthJaasConfig(str, str2);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMockOAuthServerForConfluentIssuer(String str) throws Exception {
        setUpMockOAuth();
        MockOAuthServer mockOAuthServer = new MockOAuthServer();
        mockOAuthServer.startServer(61234);
        try {
            mockOAuthServer.connectToServer();
        } catch (ServerFailedToConnectException e) {
            log.error("Server failed to connect");
            Assertions.fail();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.logicalClusterId);
        String build = new TokenBuilder("client_Secret", "client_id", mockOAuthServer).setExpiry(100000).setIssuer(CONFLUENT_ISSUER).setSubject("1").addClaims("userResourceId", "UR1").addClaims("userId", "UI1").addClaims("clusters", arrayList).addClaims("orgResourceId", this.orgResourceId).build();
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(build, this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        Assertions.assertNotNull(build);
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getPrincipalType());
        Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEntry.status());
        Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEntry.principal().get()).toString().contains("tenantMetadata"));
        Assertions.assertEquals(new Scope.Builder(new String[0]).addPath("organization=" + KafkaLogicalClusterUtils.LC_META_ABC.organizationId()).addPath("environment=" + KafkaLogicalClusterUtils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).withKafkaCluster(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).build(), lastAuthenticationEntry.getScope());
        Assertions.assertEquals("1", lastAuthenticationEntry.authenticationContext().server().getAuthorizationID());
        mockOAuthServer.stopServer();
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMockOAuthServerForNonConfluentIssuer(String str) throws Exception {
        setUpMockOAuth();
        MockOAuthServer mockOAuthServer = new MockOAuthServer();
        mockOAuthServer.startServer(62234, true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.logicalClusterId);
        String build = new TokenBuilder("client_Secret", "client_id", mockOAuthServer).setExpiry(100000).setIssuer("abcd").setSubject("1").addClaims("userResourceId", "UR1").addClaims("userId", "UI1").addClaims("clusters", arrayList).addClaims("orgResourceId", this.orgResourceId).build();
        JWK jwk = mockOAuthServer.getKeys().get("abcd");
        MockTrustCache trustCache = this.authStore.trustCache();
        HashMap hashMap = new HashMap();
        String jwk2 = jwk.toString();
        for (String str2 : jwk2.substring(1, jwk2.length() - 1).split(",")) {
            String[] split = str2.split(":");
            String str3 = split[0];
            String str4 = split[1];
            hashMap.put(str3.substring(1, str3.length() - 1), str4.substring(1, str4.length() - 1));
        }
        JsonWebKey rsaJsonWebKey = new RsaJsonWebKey(hashMap, (String) null);
        rsaJsonWebKey.setKeyId("abcd");
        trustCache.put(new JwtIssuerKey("abcd", (String) null, ""), new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{rsaJsonWebKey})));
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(build, this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        Assertions.assertNotNull(build);
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getPrincipalType());
        Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEntry.status());
        Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEntry.principal().get()).toString().contains("tenantMetadata"));
        Assertions.assertEquals(new Scope.Builder(new String[0]).addPath("organization=" + KafkaLogicalClusterUtils.LC_META_ABC.organizationId()).addPath("environment=" + KafkaLogicalClusterUtils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).withKafkaCluster(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).build(), lastAuthenticationEntry.getScope());
        Assertions.assertEquals("1", lastAuthenticationEntry.authenticationContext().server().getAuthorizationID());
        mockOAuthServer.stopServer();
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCorrectConfigurationAuthenticatesSuccessfully(String str) throws Exception {
        setUp();
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getPrincipalType());
        Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEntry.status());
        Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEntry.principal().get()).toString().contains("tenantMetadata"));
        Assertions.assertEquals(new Scope.Builder(new String[0]).addPath("organization=" + KafkaLogicalClusterUtils.LC_META_ABC.organizationId()).addPath("environment=" + KafkaLogicalClusterUtils.LC_META_ABC.environmentId()).addPath("cloud-cluster=" + KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).withKafkaCluster(KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId()).build(), lastAuthenticationEntry.getScope());
        Assertions.assertEquals("1", lastAuthenticationEntry.authenticationContext().server().getAuthorizationID());
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClusterInExtensionNotBelongToTokenOrgThrowsException(String str) throws Exception {
        setUp(CONFLUENT_JWT_ISSUER, "different_org");
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.getScope().toString().contains("kafka-cluster=" + this.logicalClusterId));
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
        AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEntry.authenticationException().get()).errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("logical cluster " + this.logicalClusterId + " does not belong to the org"));
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertEquals(this.logicalClusterId, errorInfo.saslExtensions().get("logicalCluster"));
    }

    @Disabled("TODO: KSECURITY-2486 - Convert testAudClaimInTokenThrowsException to kraft")
    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAudClaimInTokenThrowsException(String str) throws Exception {
        setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, "CONTROL_PLANE", null, null);
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
        Assertions.assertTrue(((AuthenticationException) lastAuthenticationEntry.authenticationException().get()).errorMessage().contains("AUD_CLAIM_MISMATCH"));
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testConfluentIssuerRequiredSuccessfulAuth(String str) throws Exception {
        setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, null, null, Collections.singletonMap("confluent.require.confluent.issuer", "true"));
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getPrincipalType());
        Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEntry.status());
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testConfluentIssuerRequiredFailedAuth(String str) throws Exception {
        setUp("abcd", this.orgResourceId, null, null, Collections.singletonMap("confluent.require.confluent.issuer", "true"));
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
        Assertions.assertTrue(((AuthenticationException) lastAuthenticationEntry.authenticationException().get()).errorMessage().contains("ISSUER_INVALID"));
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCallingResourceIdentityRequiredSuccessfulAuth(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.require.confluent.issuer", "true");
        hashMap.put("confluent.require.calling.resource.identity", "true");
        setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, null, Collections.singletonMap("calling_resource_identity", "cri"), hashMap);
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getPrincipalType());
        Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getName());
        Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEntry.status());
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCallingResourceIdentityRequiredFailedAuth(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.require.confluent.issuer", "true");
        hashMap.put("confluent.require.calling.resource.identity", "true");
        setUp(CONFLUENT_JWT_ISSUER, this.orgResourceId, null, null, hashMap);
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
        Assertions.assertTrue(((AuthenticationException) lastAuthenticationEntry.authenticationException().get()).errorInfo().errorMessage().contains(String.format("Expected %s claim, but none was found", "calling_resource_identity")));
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClusterInExtensionNotHostedOnBrokerThrowsException(String str) throws Exception {
        setUp();
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), "other-cluster"), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.getScope().toString().contains("kafka-cluster=other-cluster"));
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
        AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEntry.authenticationException().get()).errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("cluster other-cluster is not hosted on this broker"));
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertEquals("other-cluster", errorInfo.saslExtensions().get("logicalCluster"));
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testIllegalStateExceptionOnBrokerMetadataThrowsException(String str) throws Exception {
        setUp();
        try {
            closeLkcMetadataCache();
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId + "1"), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e) {
            if (e.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e.getCause().getClass()));
            }
            log.info("Expected exception message: {}", e.getCause().getMessage());
        }
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry.principal().isPresent());
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEntry.status());
        Assertions.assertTrue(lastAuthenticationEntry.authenticationException().isPresent());
        AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEntry.authenticationException().get()).errorInfo();
        Assertions.assertTrue(errorInfo.errorMessage().contains("Could not get cluster metadata to validate the token"));
        Assertions.assertEquals("1", errorInfo.identifier());
        Assertions.assertTrue(errorInfo.clusterId().isEmpty());
    }

    @Disabled("KSECURITY-1963")
    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPlacementConstraintForGroupCoordinators(String str) throws Throwable {
        setUp();
        String clientJaasConfig = clientJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId);
        KafkaProducer<String, String> createOAuthProducer = this.testHarness.createOAuthProducer(clientJaasConfig, this.adminProperties);
        try {
            KafkaTestUtils.sendRecords(createOAuthProducer, "test-topic", 0, 10);
            if (createOAuthProducer != null) {
                createOAuthProducer.close();
            }
            KafkaConsumer<String, String> createOAuthConsumer = this.testHarness.createOAuthConsumer("test-cg", clientJaasConfig, this.adminProperties);
            try {
                KafkaTestUtils.consumeRecords(createOAuthConsumer, "test-topic", 0, 10);
                createOAuthConsumer.commitSync();
                if (createOAuthConsumer != null) {
                    createOAuthConsumer.close();
                }
                AdminClient superAdminClient = this.physicalCluster.superAdminClient();
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "__consumer_offsets");
                Set singleton = Collections.singleton(configResource);
                AtomicReference atomicReference = new AtomicReference();
                AtomicReference atomicReference2 = new AtomicReference();
                TestUtils.waitForCondition(() -> {
                    DescribeConfigsResult describeConfigs = superAdminClient.describeConfigs(singleton);
                    atomicReference.set(describeConfigs);
                    atomicReference2.set(superAdminClient.describeTopics(Collections.singleton("__consumer_offsets")));
                    return ((Map) describeConfigs.all().get(5L, TimeUnit.SECONDS)).size() == 1;
                }, "Unable to find consumer offset topic.");
                Assertions.assertNotNull(atomicReference.get(), "Unable to get consumer offset topic configuration.");
                Assertions.assertEquals(TopicPlacement.parse(CONSUMER_OFFSET_PLACEMENT_CONSTRAINT), TopicPlacement.parse(((Config) ((Map) ((DescribeConfigsResult) atomicReference.get()).all().get()).get(configResource)).get("confluent.placement.constraints").value()));
                Assertions.assertNotNull(atomicReference2.get(), "Unable to get consumer offset topic description.");
                ((TopicDescription) ((Map) ((DescribeTopicsResult) atomicReference2.get()).allTopicNames().get()).get("__consumer_offsets")).partitions().forEach(topicPartitionInfo -> {
                    Assertions.assertEquals(1, topicPartitionInfo.replicas().size(), "More than one replica found.");
                });
            } catch (Throwable th) {
                if (createOAuthConsumer != null) {
                    try {
                        createOAuthConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createOAuthProducer != null) {
                try {
                    createOAuthProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    public void loadLkcMetadata() {
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", this.sequenceId.incrementAndGet(), this.logicalClusterId, KafkaLogicalClusterUtils.LC_META_ABC);
    }

    public void closeLkcMetadataCache() {
        BasePhysicalClusterMetadata.getInstance(this.brokerUUID).close(this.brokerUUID);
    }
}
