package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.metrics.TenantMetricsTestUtils;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.oauth.mockserver.common.ServerFailedToConnectException;
import io.confluent.security.auth.oauth.mockserver.server.MockOAuthServer;
import io.confluent.security.auth.oauth.mockserver.server.TokenBuilder;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import kafka.server.KafkaBroker;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.ResolveOffsetRangeOptions;
import org.apache.kafka.clients.admin.ResolveOffsetRangeSpec;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicIdAndPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tags({@Tag("integration"), @Tag("bazel:size:medium")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest.class */
public class MultiTenantOAuthIntegrationTest {
    private static final String CONSUMER_OFFSET_PLACEMENT_CONSTRAINT = "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"rack-1\"}}]}";
    private static final String USER_RESOURCE_ID = "UR1";
    private static final String CONFLUENT_ISSUER = "Confluent";
    private static final String LKC_METADATA_TOPIC = "_confluent-logical_clusters";
    private static final String SUBJECT = "1";
    private final Properties adminProperties = new Properties();
    private final String testTopic = "abcd";
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, 1));
    private final AtomicInteger sequenceId = new AtomicInteger();
    private static MockOAuthServer mockOAuthServer;
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private String brokerUUID;
    private LogicalClusterUser testUser;
    private TestInfo testInfo;
    private PhysicalCluster physicalCluster;
    private MockBasicAuthStore authStore;
    private static final String LOGICAL_CLUSTER_ID = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private static final String ORG_RESOURCE_ID = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();

    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity.class */
    public static final class TokenAndApplicationIdentity extends Record {
        private final String token;
        private final String applicationIdentity;

        public TokenAndApplicationIdentity(String str, String str2) {
            this.token = str;
            this.applicationIdentity = str2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, TokenAndApplicationIdentity.class), TokenAndApplicationIdentity.class, "token;applicationIdentity", "FIELD:Lio/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity;->token:Ljava/lang/String;", "FIELD:Lio/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity;->applicationIdentity:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, TokenAndApplicationIdentity.class), TokenAndApplicationIdentity.class, "token;applicationIdentity", "FIELD:Lio/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity;->token:Ljava/lang/String;", "FIELD:Lio/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity;->applicationIdentity:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, TokenAndApplicationIdentity.class, Object.class), TokenAndApplicationIdentity.class, "token;applicationIdentity", "FIELD:Lio/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity;->token:Ljava/lang/String;", "FIELD:Lio/confluent/kafka/multitenant/integration/test/MultiTenantOAuthIntegrationTest$TokenAndApplicationIdentity;->applicationIdentity:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String token() {
            return this.token;
        }

        public String applicationIdentity() {
            return this.applicationIdentity;
        }
    }

    public MultiTenantOAuthIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
    }

    @BeforeAll
    public static void beforeAll() throws Exception {
        mockOAuthServer = new MockOAuthServer();
        mockOAuthServer.startServer(61234);
        try {
            mockOAuthServer.connectToServer();
        } catch (ServerFailedToConnectException e) {
            Assertions.fail();
        }
    }

    @AfterAll
    public static void afterAll() {
        mockOAuthServer.stopServer();
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.testHarness.shutdown();
            this.authStore.close();
        } finally {
            KafkaTestUtils.verifyThreadCleanup();
        }
    }

    @MethodSource({"differentCallingResourceIdentities"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCallingResourceIdentityClaimMetricTagging(String str, TokenAndApplicationIdentity tokenAndApplicationIdentity) throws Exception {
        setUpMockOAuth(Collections.singletonMap("confluent.calling.resource.identity.type.map", "^cc-(unified-storage|us):CONFLUENT_TABLEFLOW"));
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(tokenAndApplicationIdentity.token()), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        List list = this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).toList();
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        ConfluentAuthenticationEvent lastAuthenticationEntry = MockAuditLogProvider.getInstance(this.brokerUUID).lastAuthenticationEntry();
        Assertions.assertEquals(Optional.of("User"), lastAuthenticationEntry.principal().map((v0) -> {
            return v0.getPrincipalType();
        }));
        Assertions.assertEquals(Optional.of(SUBJECT), lastAuthenticationEntry.principal().map((v0) -> {
            return v0.getName();
        }));
        Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEntry.status());
        SaslAuthenticationContext authenticationContext = lastAuthenticationEntry.authenticationContext();
        for (ApiKeys apiKeys : new ApiKeys[]{ApiKeys.CREATE_TOPICS, ApiKeys.API_VERSIONS, ApiKeys.METADATA}) {
            Iterator<KafkaBroker> it = this.physicalCluster.kafkaCluster().kafkaBrokers().iterator();
            while (it.hasNext()) {
                Metrics metrics = it.next().metrics();
                HashMap hashMap = new HashMap();
                hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, LOGICAL_CLUSTER_ID);
                hashMap.put(MultiTenantRequestContextTest.USERNAME, authenticationContext.server().getAuthorizationID());
                hashMap.put("user-resource-id", USER_RESOURCE_ID);
                hashMap.put("application-identity", tokenAndApplicationIdentity.applicationIdentity());
                hashMap.put(TenantMetricsTestUtils.REQUEST_TAG, apiKeys.name);
                Assertions.assertNotNull(metrics.metric(metrics.metricName(TenantMetricsTestUtils.RESPONSE_BYTE_TOTAL, "tenant-metrics", hashMap)));
            }
        }
    }

    @MethodSource({"differentCallingResourceIdentities"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInternalApiAuthorization(String str, TokenAndApplicationIdentity tokenAndApplicationIdentity) throws Exception {
        setUpMockOAuth(Collections.singletonMap("confluent.calling.resource.identity.type.map", "^cc-(unified-storage|us):CONFLUENT_TABLEFLOW"));
        ConfluentAdmin createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(tokenAndApplicationIdentity.token()), this.adminProperties);
        TopicIdAndPartition topicIdAndPartition = new TopicIdAndPartition((Uuid) createOAuthAdminClient.createTopics(this.sampleTopics).topicId("abcd").get(), 0);
        KafkaFuture partitionResult = createOAuthAdminClient.resolveOffsetRange(Map.of(topicIdAndPartition, new ResolveOffsetRangeSpec(0L)), new ResolveOffsetRangeOptions(IsolationLevel.READ_COMMITTED, Integer.MAX_VALUE)).partitionResult(topicIdAndPartition);
        if (!tokenAndApplicationIdentity.applicationIdentity.equals("default")) {
            Assertions.assertNotNull(partitionResult.get());
            return;
        }
        try {
            partitionResult.get();
            Assertions.fail("Expected exception");
        } catch (Exception e) {
            Assertions.assertInstanceOf(ClusterAuthorizationException.class, e.getCause());
        }
    }

    private static Stream<Arguments> differentCallingResourceIdentities() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"kraft", new TokenAndApplicationIdentity(buildToken(Collections.singletonMap("calling_resource_identity", "cc-unified-storage-cts-enumerator-service.cc-unified-storage-cts-enumerator-test123")), "tableflow")}), Arguments.of(new Object[]{"kraft", new TokenAndApplicationIdentity(buildToken(Collections.emptyMap()), "default")})});
    }

    private static String buildToken(Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(LOGICAL_CLUSTER_ID);
        TokenBuilder tokenBuilder = new TokenBuilder("client_Secret", "client_id", mockOAuthServer);
        tokenBuilder.setExpiry(100000);
        tokenBuilder.setIssuer(CONFLUENT_ISSUER);
        tokenBuilder.setSubject(SUBJECT);
        tokenBuilder.addClaims("userResourceId", USER_RESOURCE_ID);
        tokenBuilder.addClaims("clusters", arrayList);
        tokenBuilder.addClaims("orgResourceId", ORG_RESOURCE_ID);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            tokenBuilder.addClaims(entry.getKey(), entry.getValue());
        }
        try {
            return tokenBuilder.build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Properties setUpMetadata(Properties properties) {
        this.brokerUUID = UUID.randomUUID().toString();
        properties.put("broker.session.uuid", this.brokerUUID);
        return properties;
    }

    private Properties nodeProps(Map<String, String> map) {
        Properties defaultOAuthBrokerProps = IntegrationTestHarness.defaultOAuthBrokerProps();
        try {
            defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + String.valueOf(this.jwsContainer.getPublicKeyFile().toPath()) + "\"authenticator.jwt.config.url=\"" + String.valueOf(JarResourceLoader.loadFileFromResourceWithClassLoader(getClass(), "AuthConfigEnhanced.yaml").toPath()) + "\";");
            defaultOAuthBrokerProps.put("confluent.offsets.topic.placement.constraints", CONSUMER_OFFSET_PLACEMENT_CONSTRAINT);
            defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
            defaultOAuthBrokerProps.put("confluent.cdc.lkc.metadata.topic", LKC_METADATA_TOPIC);
            defaultOAuthBrokerProps.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
            if (map != null) {
                defaultOAuthBrokerProps.putAll(map);
            }
            return defaultOAuthBrokerProps;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void addAdminAcls() {
        this.testHarness.newAclCommand().addTopicAclArgs(this.testUser.prefixedKafkaPrincipal(), this.testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    public void loadLkcMetadata() {
        this.physicalCluster.kafkaCluster().produceLCMData(LKC_METADATA_TOPIC, this.sequenceId.incrementAndGet(), LOGICAL_CLUSTER_ID, KafkaLogicalClusterUtils.LC_META_ABC);
    }

    private void setUpMockOAuth(Map<String, String> map) throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        this.jwsContainer = new OAuthUtils.Builder(0, null, SUBJECT, ORG_RESOURCE_ID).build();
        this.authStore = MockBasicAuthStore.create();
        this.physicalCluster = this.testHarness.startWithTopic(LKC_METADATA_TOPIC, 1, 1, 15000L, setUpMetadata(nodeProps(map)), nodeProps(Collections.emptyMap()));
        this.testUser = this.physicalCluster.createLogicalCluster(LOGICAL_CLUSTER_ID, 100, 1).user(1);
        addAdminAcls();
        loadLkcMetadata();
    }

    private String clientJaasConfig(String str) {
        return IntegrationTestHarness.clientOAuthJaasConfig(str, LOGICAL_CLUSTER_ID);
    }
}
