/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.oauth.mockserver.common.ServerFailedToConnectException;
import io.confluent.security.auth.oauth.mockserver.server.MockOAuthServer;
import io.confluent.security.auth.oauth.mockserver.server.TokenBuilder;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import kafka.server.KafkaBroker;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.ResolveOffsetRangeOptions;
import org.apache.kafka.clients.admin.ResolveOffsetRangeSpec;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdAndPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:size:medium")})
public class MultiTenantOAuthIntegrationTest {
    private static final String CONSUMER_OFFSET_PLACEMENT_CONSTRAINT = "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"rack-1\"}}]}";
    private static final String LOGICAL_CLUSTER_ID = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private static final String USER_RESOURCE_ID = "UR1";
    private static final String ORG_RESOURCE_ID = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
    private static final String CONFLUENT_ISSUER = "Confluent";
    private static final String LKC_METADATA_TOPIC = "_confluent-logical_clusters";
    private static final String SUBJECT = "1";
    private final Properties adminProperties = new Properties();
    private final String testTopic = "abcd";
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, 1));
    private final AtomicInteger sequenceId = new AtomicInteger();
    private static MockOAuthServer mockOAuthServer;
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private String brokerUUID;
    private LogicalClusterUser testUser;
    private TestInfo testInfo;
    private PhysicalCluster physicalCluster;
    private MockBasicAuthStore authStore;

    public MultiTenantOAuthIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
    }

    @BeforeAll
    public static void beforeAll() throws Exception {
        mockOAuthServer = new MockOAuthServer();
        mockOAuthServer.startServer(61234);
        try {
            mockOAuthServer.connectToServer();
        }
        catch (ServerFailedToConnectException exception) {
            Assertions.fail();
        }
    }

    @AfterAll
    public static void afterAll() {
        mockOAuthServer.stopServer();
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.testHarness.shutdown();
            this.authStore.close();
        }
        finally {
            KafkaTestUtils.verifyThreadCleanup();
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @MethodSource(value={"differentCallingResourceIdentities"})
    public void testCallingResourceIdentityClaimMetricTagging(String quorum, TokenAndApplicationIdentity data) throws Exception {
        ApiKeys[] apiKeys;
        this.setUpMockOAuth(Collections.singletonMap("confluent.calling.resource.identity.type.map", "^cc-(unified-storage|us):CONFLUENT_TABLEFLOW"));
        AdminClient client = this.testHarness.createOAuthAdminClient(this.clientJaasConfig(data.token()), this.adminProperties);
        client.createTopics(this.sampleTopics).all().get();
        List<String> expectedTopics = this.sampleTopics.stream().map(NewTopic::name).toList();
        TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        ConfluentAuthenticationEvent authenticationEvent = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals(Optional.of("User"), authenticationEvent.principal().map(KafkaPrincipal::getPrincipalType));
        Assertions.assertEquals(Optional.of(SUBJECT), authenticationEvent.principal().map(KafkaPrincipal::getName));
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        SaslAuthenticationContext authenticationContext = (SaslAuthenticationContext)authenticationEvent.authenticationContext();
        for (ApiKeys apiKey : apiKeys = new ApiKeys[]{ApiKeys.CREATE_TOPICS, ApiKeys.API_VERSIONS, ApiKeys.METADATA}) {
            for (KafkaBroker broker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                Metrics metrics = broker.metrics();
                HashMap<String, String> tags = new HashMap<String, String>();
                tags.put("tenant", LOGICAL_CLUSTER_ID);
                tags.put("user", authenticationContext.server().getAuthorizationID());
                tags.put("user-resource-id", USER_RESOURCE_ID);
                tags.put("application-identity", data.applicationIdentity());
                tags.put("request", apiKey.name);
                MetricName expectedMetricName = metrics.metricName("response-byte-total", "tenant-metrics", tags);
                KafkaMetric metric = metrics.metric(expectedMetricName);
                Assertions.assertNotNull((Object)metric);
            }
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @MethodSource(value={"differentCallingResourceIdentities"})
    public void testInternalApiAuthorization(String quorum, TokenAndApplicationIdentity data) throws Exception {
        this.setUpMockOAuth(Collections.singletonMap("confluent.calling.resource.identity.type.map", "^cc-(unified-storage|us):CONFLUENT_TABLEFLOW"));
        ConfluentAdmin client = (ConfluentAdmin)this.testHarness.createOAuthAdminClient(this.clientJaasConfig(data.token()), this.adminProperties);
        Uuid topicId = (Uuid)client.createTopics(this.sampleTopics).topicId("abcd").get();
        TopicIdAndPartition topicIdAndPartition = new TopicIdAndPartition(topicId, 0);
        KafkaFuture result = client.resolveOffsetRange(Map.of(topicIdAndPartition, new ResolveOffsetRangeSpec(0L)), new ResolveOffsetRangeOptions(IsolationLevel.READ_COMMITTED, Integer.MAX_VALUE)).partitionResult(topicIdAndPartition);
        if (data.applicationIdentity.equals("default")) {
            try {
                result.get();
                Assertions.fail((String)"Expected exception");
            }
            catch (Exception e) {
                Assertions.assertInstanceOf(ClusterAuthorizationException.class, (Object)e.getCause());
            }
        } else {
            Assertions.assertNotNull((Object)result.get());
        }
    }

    private static Stream<Arguments> differentCallingResourceIdentities() {
        return Stream.of(Arguments.of((Object[])new Object[]{"kraft", new TokenAndApplicationIdentity(MultiTenantOAuthIntegrationTest.buildToken(Collections.singletonMap("calling_resource_identity", "cc-unified-storage-cts-enumerator-service.cc-unified-storage-cts-enumerator-test123")), "tableflow")}), Arguments.of((Object[])new Object[]{"kraft", new TokenAndApplicationIdentity(MultiTenantOAuthIntegrationTest.buildToken(Collections.emptyMap()), "default")}));
    }

    private static String buildToken(Map<String, String> additionalClaims) {
        ArrayList<String> clustersAllowed = new ArrayList<String>();
        clustersAllowed.add(LOGICAL_CLUSTER_ID);
        String clientId = "client_id";
        String clientSecret = "client_Secret";
        TokenBuilder tokenBuilder = new TokenBuilder(clientSecret, clientId, mockOAuthServer);
        tokenBuilder.setExpiry(100000);
        tokenBuilder.setIssuer(CONFLUENT_ISSUER);
        tokenBuilder.setSubject(SUBJECT);
        tokenBuilder.addClaims("userResourceId", USER_RESOURCE_ID);
        tokenBuilder.addClaims("clusters", clustersAllowed);
        tokenBuilder.addClaims("orgResourceId", ORG_RESOURCE_ID);
        for (Map.Entry<String, String> claim : additionalClaims.entrySet()) {
            tokenBuilder.addClaims(claim.getKey(), claim.getValue());
        }
        try {
            return tokenBuilder.build();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Properties setUpMetadata(Properties brokerProps) {
        this.brokerUUID = UUID.randomUUID().toString();
        brokerProps.put("broker.session.uuid", this.brokerUUID);
        return brokerProps;
    }

    private Properties nodeProps(Map<String, String> configOverrides) {
        Path pathToConfigFile;
        Properties props = IntegrationTestHarness.defaultOAuthBrokerProps();
        try {
            pathToConfigFile = JarResourceLoader.loadFileFromResourceWithClassLoader(this.getClass(), (String)"AuthConfigEnhanced.yaml").toPath();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        props.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + String.valueOf(this.jwsContainer.getPublicKeyFile().toPath()) + "\"authenticator.jwt.config.url=\"" + String.valueOf(pathToConfigFile) + "\";");
        props.put("confluent.offsets.topic.placement.constraints", CONSUMER_OFFSET_PLACEMENT_CONSTRAINT);
        props.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
        props.put("confluent.cdc.lkc.metadata.topic", LKC_METADATA_TOPIC);
        props.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
        if (configOverrides != null) {
            props.putAll(configOverrides);
        }
        return props;
    }

    private void addAdminAcls() {
        this.testHarness.newAclCommand().addTopicAclArgs(this.testUser.prefixedKafkaPrincipal(), this.testUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    public void loadLkcMetadata() {
        this.physicalCluster.kafkaCluster().produceLCMData(LKC_METADATA_TOPIC, this.sequenceId.incrementAndGet(), LOGICAL_CLUSTER_ID, KafkaLogicalClusterUtils.LC_META_ABC);
    }

    private void setUpMockOAuth(Map<String, String> configOverrides) throws Exception {
        MockAuditLogProvider.reset();
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1, Collections.singletonList("rack-1"));
        this.jwsContainer = new OAuthUtils.Builder(0, null, SUBJECT, ORG_RESOURCE_ID).build();
        this.authStore = MockBasicAuthStore.create();
        Properties nodeProps = this.nodeProps(configOverrides);
        long topicCreateTimeout = 15000L;
        this.physicalCluster = this.testHarness.startWithTopic(LKC_METADATA_TOPIC, 1, 1, topicCreateTimeout, this.setUpMetadata(nodeProps), this.nodeProps(Collections.emptyMap()));
        int adminUserId = 100;
        LogicalCluster logicalCluster = this.physicalCluster.createLogicalCluster(LOGICAL_CLUSTER_ID, 100, 1);
        this.testUser = logicalCluster.user(1);
        this.addAdminAcls();
        this.loadLkcMetadata();
    }

    private String clientJaasConfig(String jwsToken) {
        return IntegrationTestHarness.clientOAuthJaasConfig(jwsToken, LOGICAL_CLUSTER_ID);
    }

    public record TokenAndApplicationIdentity(String token, String applicationIdentity) {
    }
}

