package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.common.DeletionEventHandler;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.acl.AclState;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/UserDeletionHandlerIntegrationTest.class */
public class UserDeletionHandlerIntegrationTest {
    static final String KEY2 = String.join("\n", "{", "  \"UserResourceId\": \"u-2\",", "  \"OrgResourceId\": \"org1\",", "  \"PkcId\": \"pkc1\"", "}");
    static final String VALUE2 = String.join("\n", "{", "  \"UserId\": \"2\",", "  \"ActiveLkcIds\": [\"lkc-abc\", \"lkc-xyz\"]", "}");
    static final String KEY3 = String.join("\n", "{", "  \"UserResourceId\": \"u-3\",", "  \"OrgResourceId\": \"org3\"", "}");
    static final String VALUE3 = String.join("\n", "{", "  \"UserId\": \"3\",", "  \"ActiveLkcIds\": [\"lkc-abc\", \"lkc-xyz\"]", "}");
    static final String KEY4 = String.join("\n", "{", "  \"UserResourceId\": \"u-4\",", "  \"OrgResourceId\": \"org1\"", "}");
    static final String VALUE4 = String.join("\n", "{", "  \"UserId\": \"4\",", "  \"ActiveLkcIds\": [\"lkc-abc\", \"lkc-xyz\"]", "}");
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;
    private int userMetadataSequenceId;
    private String testTopic1 = "testTopic1";
    private final String logicalClusterId = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private final String apiKeysTopic = "_confluent-apikey";
    private final String userMetaDataTopic = "_confluent-user_metadata";
    private final String adminUserAPIkey = "APIKEY1";
    private final String adminUserAPIkeyPassword = "pwd1";
    private final String userResourceId2 = "u-2";
    private final String userId2 = "2";
    private final String userId3 = "3";
    private final String userId4 = "4";

    @BeforeEach
    public void setup(TestInfo testInfo) throws Exception {
        this.testHarness = new IntegrationTestHarness(testInfo, 3);
        long millis = 15000 + TimeUnit.SECONDS.toMillis(3L);
        this.physicalCluster = this.testHarness.startWithTopic(Arrays.asList("_confluent-apikey", "_confluent-user_metadata"), 1, 1, 15000L, brokerProps(millis, "0"), controllerProps(millis, "1001"), Optional.empty());
        this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1, 2);
        loadApiKeys(this.physicalCluster, "/file_auth_test_apikeys.json", "APIKEY1");
        loadApiKeys(this.physicalCluster, "/service_account_apikey_2.json", "APIKEY2");
        loadUserMetadata();
    }

    private void loadUserMetadata() {
        this.userMetadataSequenceId = 0;
        List asList = Arrays.asList(KEY2, KEY3, KEY4);
        List asList2 = Arrays.asList(VALUE2, VALUE3, VALUE4);
        for (int i = 0; i < asList.size(); i++) {
            EmbeddedKafkaCluster kafkaCluster = this.physicalCluster.kafkaCluster();
            int i2 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i2;
            kafkaCluster.produceData("_confluent-user_metadata", i2, (String) asList.get(i), (String) asList2.get(i));
        }
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
        KafkaTestUtils.verifyThreadCleanup();
    }

    private Properties brokerProps(long j, String str) throws IOException {
        Properties properties = new Properties();
        properties.put("listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("advertised.listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT, EXTERNAL:SASL_PLAINTEXT");
        properties.put("broker.id", str);
        properties.putAll(commonProps(j));
        return properties;
    }

    private Properties controllerProps(long j, String str) throws IOException {
        Properties properties = new Properties();
        properties.put("node.id", str);
        properties.putAll(commonProps(j));
        return properties;
    }

    private Properties commonProps(long j) throws IOException {
        Properties properties = new Properties();
        properties.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        properties.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        properties.put("listener.name.external.confluent.security.event.logger.authentication.enable", "true");
        properties.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
        properties.put("confluent.security.event.logger.multitenant.enable", "true");
        properties.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required;");
        properties.put("confluent.multitenant.listener.names", "EXTERNAL");
        properties.put("confluent.cdc.api.keys.topic", "_confluent-apikey");
        properties.put("confluent.cdc.api.keys.topic.load.timeout.ms", String.valueOf(j));
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        properties.put("confluent.close.connections.on.credential.delete", "true");
        properties.put("confluent.cdc.user.metadata.enable", "true");
        properties.put("confluent.cdc.user.metadata.topic", "_confluent-user_metadata");
        properties.put("confluent.cdc.user.deletion.handler.enable", "true");
        properties.put("confluent.multitenant.authorizer.enable.acl.state", "true");
        return properties;
    }

    private void loadApiKeys(PhysicalCluster physicalCluster, String str, String str2) throws Exception {
        try {
            physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", str2, Utils.readFullyToString((BufferedInputStream) JarResourceLoader.loadFileFromResource(FileBasedPlainSaslAuthenticatorTest.class, str).toURI().toURL().getContent()), true);
        } catch (IOException e) {
            throw new Exception("Couldn't read apikeys content");
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSoftACLDelete(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        try {
            Metrics metrics = ((DeletionEventHandler) this.physicalCluster.kafkaCluster().kraftController().userDeletionHandler().get()).metrics();
            Assertions.assertEquals(TestUtils.getMetricValue(metrics, "user-deletion-event-count"), 0.0d);
            AclBinding aclBinding = topicAcl("u-2", this.testTopic1);
            AclBinding aclBinding2 = topicAcl("2", this.testTopic1);
            createPlainAuthAdminClient.createAcls(Arrays.asList(aclBinding, aclBinding2)).all().get();
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(createPlainAuthAdminClient.describeAcls(aclBinding.toFilter()).values().get(), Collections.singletonList(aclBinding));
                Assertions.assertEquals(createPlainAuthAdminClient.describeAcls(aclBinding2.toFilter()).values().get(), Collections.singletonList(aclBinding2));
            });
            EmbeddedKafkaCluster kafkaCluster = this.physicalCluster.kafkaCluster();
            int i = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i;
            kafkaCluster.produceData("_confluent-user_metadata", i, KEY2, null);
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(TestUtils.getMetricValue(metrics, "user-deletion-event-count"), 1.0d);
            });
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(((Collection) createPlainAuthAdminClient.describeAcls(aclBinding.toFilter()).values().get()).size(), 0);
            });
            Collection collection = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, aclBinding.toFilter(), new DescribeAclsOptions(), AclState.DELETED).values().get();
            Collection collection2 = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, aclBinding2.toFilter(), new DescribeAclsOptions(), AclState.DELETED).values().get();
            Assertions.assertEquals(collection.size(), 1);
            Assertions.assertEquals(collection2.size(), 1);
            Assertions.assertEquals(collection, Collections.singletonList(aclBinding));
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSoftACLDeleteForMultipleUsers(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        try {
            Metrics metrics = ((DeletionEventHandler) this.physicalCluster.kafkaCluster().kraftController().userDeletionHandler().get()).metrics();
            Assertions.assertEquals(TestUtils.getMetricValue(metrics, "user-deletion-event-count"), 0.0d);
            AclBinding aclBinding = topicAcl("u-2", this.testTopic1);
            AclBinding aclBinding2 = topicAcl("4", this.testTopic1);
            AclBinding aclBinding3 = topicAcl("3", this.testTopic1);
            createPlainAuthAdminClient.createAcls(Arrays.asList(aclBinding, aclBinding2, aclBinding3)).all().get();
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(createPlainAuthAdminClient.describeAcls(aclBinding.toFilter()).values().get(), Collections.singletonList(aclBinding));
                Assertions.assertEquals(createPlainAuthAdminClient.describeAcls(aclBinding2.toFilter()).values().get(), Collections.singletonList(aclBinding2));
                Assertions.assertEquals(createPlainAuthAdminClient.describeAcls(aclBinding3.toFilter()).values().get(), Collections.singletonList(aclBinding3));
            });
            EmbeddedKafkaCluster kafkaCluster = this.physicalCluster.kafkaCluster();
            int i = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i;
            kafkaCluster.produceData("_confluent-user_metadata", i, KEY2, null);
            EmbeddedKafkaCluster kafkaCluster2 = this.physicalCluster.kafkaCluster();
            int i2 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i2;
            kafkaCluster2.produceData("_confluent-user_metadata", i2, KEY4, null);
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(((Collection) createPlainAuthAdminClient.describeAcls(AclBindingFilter.ANY).values().get()).size(), 1);
            });
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(TestUtils.getMetricValue(metrics, "user-deletion-event-count"), 2.0d);
            });
            EmbeddedKafkaCluster kafkaCluster3 = this.physicalCluster.kafkaCluster();
            int i3 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i3;
            kafkaCluster3.produceData("_confluent-user_metadata", i3, KEY2, null);
            EmbeddedKafkaCluster kafkaCluster4 = this.physicalCluster.kafkaCluster();
            int i4 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i4;
            kafkaCluster4.produceData("_confluent-user_metadata", i4, KEY2, null);
            EmbeddedKafkaCluster kafkaCluster5 = this.physicalCluster.kafkaCluster();
            int i5 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i5;
            kafkaCluster5.produceData("_confluent-user_metadata", i5, KEY4, null);
            EmbeddedKafkaCluster kafkaCluster6 = this.physicalCluster.kafkaCluster();
            int i6 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i6;
            kafkaCluster6.produceData("_confluent-user_metadata", i6, KEY4, null);
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(((Collection) createPlainAuthAdminClient.describeAcls(AclBindingFilter.ANY).values().get()).size(), 1);
            });
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(TestUtils.getMetricValue(metrics, "user-deletion-event-count"), 2.0d);
            });
            EmbeddedKafkaCluster kafkaCluster7 = this.physicalCluster.kafkaCluster();
            int i7 = this.userMetadataSequenceId + 1;
            this.userMetadataSequenceId = i7;
            kafkaCluster7.produceData("_confluent-user_metadata", i7, KEY3, null);
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(((Collection) createPlainAuthAdminClient.describeAcls(AclBindingFilter.ANY).values().get()).size(), 0);
            });
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(TestUtils.getMetricValue(metrics, "user-deletion-event-count"), 3.0d);
            });
            Collection collection = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, aclBinding.toFilter(), new DescribeAclsOptions(), AclState.DELETED).values().get();
            Collection collection2 = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, aclBinding2.toFilter(), new DescribeAclsOptions(), AclState.DELETED).values().get();
            Collection collection3 = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, aclBinding3.toFilter(), new DescribeAclsOptions(), AclState.DELETED).values().get();
            Assertions.assertEquals(collection.size(), 1);
            Assertions.assertEquals(collection2.size(), 1);
            Assertions.assertEquals(collection3.size(), 1);
            Assertions.assertEquals(collection, Collections.singletonList(aclBinding));
            Assertions.assertEquals(collection2, Collections.singletonList(aclBinding2));
            Assertions.assertEquals(collection3, Collections.singletonList(aclBinding3));
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private AclBinding topicAcl(String str, String str2) {
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, str2, PatternType.LITERAL), new AccessControlEntry(new KafkaPrincipal("User", str).toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW));
    }
}
