/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:size:medium")})
public class MultiTenantAuthorizerTest {
    private TestInfo testInfo;
    protected IntegrationTestHarness testHarness;
    private final String tenantA = "tenantA";
    private final String topic = "test.topic";
    private final String consumerGroup = "test.consumer.group";
    private final String maxAclsPerTenant = "100";
    private final int aclsAddedByProducerConsumer = 6;
    protected PhysicalCluster physicalCluster;
    private LogicalCluster logicalCluster;
    private LogicalClusterUser user1;
    private LogicalClusterUser user2;

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.clearYammerMetrics();
        this.testHarness.shutdown();
        KafkaTestUtils.verifyThreadCleanup();
    }

    @AfterAll
    public static void verifyCleanup() {
        KafkaTestUtils.verifyThreadCleanup();
    }

    protected void startTestHarness(boolean useExternalAuthStore) {
        this.startTestHarness(this.nodeConfigOverrides(useExternalAuthStore));
    }

    protected void startTestHarness(Properties nodeConfigOverrides) {
        this.testHarness = new IntegrationTestHarness(this.testInfo);
        this.physicalCluster = this.testHarness.start(nodeConfigOverrides, nodeConfigOverrides, true, Optional.empty(), cluster -> {});
        this.logicalCluster = this.physicalCluster.createLogicalCluster("tenantA", "testOrg", "testEnv", 100, 1, 2);
        this.user1 = this.logicalCluster.user(1);
        this.user2 = this.logicalCluster.user(2);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLiteralAcls(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        MultiTenantAuthorizerTest.verifyAclCountMetric(0);
        this.verifyAclCountPerTenantMetric(0, Integer.parseInt("100"), "tenantA");
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
        SecurityTestUtils.verifyConfluentLicense(this.physicalCluster.kafkaCluster(), null);
        MultiTenantAuthorizerTest.verifyAclCountMetric(6);
        this.verifyAclCountPerTenantMetric(6, Integer.parseInt("100"), "tenantA");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testPrefixedAcls(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAcls(this.user1, "test", PatternType.PREFIXED);
        this.addConsumerAcls(this.user2, "test", "test", PatternType.PREFIXED);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testWildcardAcls(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.physicalCluster.newAclCommand().produceAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.tenantPrefix(), PatternType.PREFIXED).execute();
        this.physicalCluster.newAclCommand().consumeAclArgs(this.prefixedKafkaPrincipal(this.user2), this.user2.tenantPrefix(), this.user2.tenantPrefix(), PatternType.PREFIXED).execute();
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testSuperUsers(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.testHarness.produceConsume(this.logicalCluster.adminUser(), this.logicalCluster.adminUser(), "test.topic", "test.consumer.group", 0);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclUpdate(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        String topic = "test.topic";
        String consumerGroup = "test.group";
        this.physicalCluster.kafkaCluster().createTopic(this.user1.withPrefix(topic), 3, 1);
        try (KafkaConsumer<String, String> consumer = this.testHarness.createConsumer(this.user1, consumerGroup, SecurityProtocol.SASL_PLAINTEXT);){
            Assertions.assertFalse((boolean)this.checkAuthorized(consumer, topic));
            this.physicalCluster.newAclCommand().addTopicAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix(topic), AclOperation.DESCRIBE, PatternType.LITERAL).execute();
            this.physicalCluster.newAclCommand().addConsumerGroupAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix("test.group"), AclOperation.DESCRIBE, PatternType.LITERAL).execute();
            TestUtils.waitForCondition(() -> this.checkAuthorized(consumer, topic), (String)"ACL not applied within timeout");
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLogicalClusterScope(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user1, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user1, "test.topic", "test.consumer.group", 0);
        int userId = this.user1.userMetadata.userId();
        LogicalCluster cluster2 = this.physicalCluster.createLogicalCluster("anotherCluster", 100, userId);
        LogicalClusterUser cluster2user1 = cluster2.user(userId);
        this.verifyTopicAuthorizationFailure(cluster2user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(cluster2user1, "test.topic", "somegroup");
        this.addProducerAcls(cluster2user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(cluster2user1, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(cluster2user1, cluster2user1, "test.topic", "test.consumer.group", 0);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLiteralAclsUsingAdminClient(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAclsUsingAdminClient(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAclsUsingAdminClient(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testPrefixAclsUsingAdminClient(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAclsUsingAdminClient(this.user1, "test.", PatternType.PREFIXED);
        this.addConsumerAclsUsingAdminClient(this.user2, "test.", "test.", PatternType.PREFIXED);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        this.verifyTopicAuthorizationFailure(this.user1, "sometopic");
        this.verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testWildcardAclsUsingAdminClient(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAclsUsingAdminClient(this.user1, "*", PatternType.LITERAL);
        this.addConsumerAclsUsingAdminClient(this.user2, "*", "*", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclCreateDescribeDeleteUsingAdminClient(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.physicalCluster.newAclCommand().topicBrokerReadAclArgs(PhysicalCluster.BROKER_PRINCIPAL).execute();
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        LogicalCluster logicalClusterB = this.physicalCluster.createLogicalCluster("tenantB", 100, 11);
        LogicalClusterUser userB1 = logicalClusterB.user(11);
        AdminClient adminClientA = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        AdminClient adminClientB = this.testHarness.createAdminClient(logicalClusterB.adminUser());
        ConsumerAcls aclsA = new ConsumerAcls(adminClientA, true);
        ConsumerAcls aclsB = new ConsumerAcls(adminClientB, true);
        aclsA.addAcls(this.user1, "test1.topic", "test1.group", PatternType.LITERAL);
        aclsA.addAcls(this.user2, "prefixed.test2", "prefixed.test2", PatternType.PREFIXED);
        aclsB.addAcls(userB1, "*", "*", PatternType.LITERAL);
        this.physicalCluster.kafkaCluster().createTopic("tenantA_test1.topic", 1, 1);
        this.physicalCluster.kafkaCluster().createTopic("tenantA_prefixed.test2.topic", 2, 1);
        this.physicalCluster.kafkaCluster().createTopic("tenantB_test1.topic", 1, 1);
        KafkaConsumer<String, String> consumer1 = this.testHarness.createConsumer(this.user1, "test1.group", SecurityProtocol.SASL_PLAINTEXT);
        KafkaConsumer<String, String> consumer2 = this.testHarness.createConsumer(this.user2, "prefixed.test2.group", SecurityProtocol.SASL_PLAINTEXT);
        KafkaConsumer<String, String> consumerB1 = this.testHarness.createConsumer(userB1, "test1.group", SecurityProtocol.SASL_PLAINTEXT);
        Assertions.assertTrue((boolean)this.checkAuthorized(consumer1, "test1.topic"));
        Assertions.assertFalse((boolean)this.checkAuthorized(consumer2, "test1.topic"));
        Assertions.assertTrue((boolean)this.checkAuthorized(consumer2, "prefixed.test2.topic"));
        Assertions.assertFalse((boolean)this.checkAuthorized(consumer1, "prefixed.test2.topic"));
        Assertions.assertTrue((boolean)this.checkAuthorized(consumerB1, "test1.topic"));
        ConsumerAcls baseAcls = new ConsumerAcls(superAdminClient, false);
        baseAcls.verifyAllAcls(null, PatternType.ANY);
        baseAcls.verifyAllAcls(null, PatternType.MATCH);
        baseAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, null, "*", "tenantA_test1.topic");
        baseAcls.verifyAcls(ResourceType.GROUP, null, PatternType.PREFIXED, null, "tenantA_prefixed.test2", "tenantB_");
        baseAcls.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, "*", "tenantA_test1.topic", "tenantA_test1.group", "tenantA_kafka-cluster", "tenantB_kafka-cluster");
        baseAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.ANY, null, "*", "tenantA_test1.topic", "tenantA_prefixed.test2", "tenantB_");
        aclsA.verifyAllAcls(null, PatternType.ANY);
        aclsA.verifyAllAcls(null, PatternType.MATCH);
        aclsA.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, null, "test1.topic");
        aclsA.verifyAcls(ResourceType.GROUP, null, PatternType.PREFIXED, null, "prefixed.test2");
        aclsA.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, "test1.topic", "test1.group", "kafka-cluster");
        aclsA.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, this.user1, "test1.topic", "test1.group", "kafka-cluster");
        aclsA.verifyAcls(ResourceType.TOPIC, null, PatternType.ANY, null, "test1.topic", "prefixed.test2");
        aclsB.verifyAllAcls(null, PatternType.ANY);
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.LITERAL, userB1, "*");
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.ANY, userB1, "*");
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.PREFIXED, userB1, new String[0]);
        aclsB.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, userB1, "*");
        aclsB.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, "*", "kafka-cluster");
        aclsB.verifyAcls(ResourceType.TOPIC, null, PatternType.PREFIXED, userB1, new String[0]);
        aclsB.verifyAcls(ResourceType.ANY, "kafka-cluster", PatternType.LITERAL, userB1, "kafka-cluster");
        baseAcls.verifyAcls(ResourceType.ANY, "tenantA_prefixed.test2.topic", PatternType.MATCH, null, "tenantA_prefixed.test2", "*");
        baseAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, null, "tenantA_test1.topic", "tenantA_prefixed.test2", "tenantB_", "*");
        aclsA.verifyAcls(ResourceType.ANY, "prefixed.test2.topic", PatternType.MATCH, null, "prefixed.test2");
        aclsA.verifyAcls(ResourceType.ANY, "prefixed.test2.topic", PatternType.MATCH, this.user2, "prefixed.test2");
        aclsA.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, this.user1, "test1.topic");
        aclsB.verifyAcls(ResourceType.TOPIC, "test", PatternType.MATCH, userB1, "*");
        aclsB.verifyAcls(ResourceType.TOPIC, "*", PatternType.MATCH, userB1, "*");
        aclsB.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, null, "*");
        Assertions.assertTrue((boolean)this.checkAuthorized(consumer1, "test1.topic"));
        aclsA.deleteAcls(ResourceType.TOPIC, "test1.topic", PatternType.LITERAL, null);
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Assertions.assertFalse((boolean)this.checkAuthorized(consumer1, "test1.topic"));
            aclsA.verifyAllAcls(null, PatternType.ANY);
            Assertions.assertTrue((boolean)this.checkAuthorized(consumer2, "prefixed.test2.topic"));
        });
        aclsA.deleteAcls(ResourceType.TOPIC, "prefixed.test2.topic", PatternType.MATCH, this.user2);
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Assertions.assertFalse((boolean)this.checkAuthorized(consumer2, "prefixed.test2.topic"));
            aclsA.verifyAllAcls(null, PatternType.ANY);
            Assertions.assertTrue((boolean)this.checkAuthorized(consumerB1, "test1.topic"));
        });
        aclsB.deleteAcls(ResourceType.TOPIC, null, PatternType.MATCH, userB1);
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Assertions.assertFalse((boolean)this.checkAuthorized(consumerB1, "test1.topic"));
            aclsB.verifyAllAcls(null, PatternType.MATCH);
        });
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclLimit(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        Function<String, AclBinding> topicAcl = topic -> new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), new AccessControlEntry(this.unprefixedKafkaPrincipal(this.user1), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            TestUtils.waitForCondition(() -> MultiTenantAuthorizerTest.getAclCount(adminClient) == 6, (String)"ACLs are not fully created");
            int aclCount = MultiTenantAuthorizerTest.getAclCount(adminClient);
            int maxAcls = Integer.parseInt("100");
            this.verifyAclCountPerTenantMetric(6, maxAcls, "tenantA");
            for (int i = 0; i < maxAcls - aclCount; ++i) {
                adminClient.createAcls(Collections.singleton(topicAcl.apply("topic" + i))).all().get();
            }
            InvalidRequestException e = (InvalidRequestException)TestUtils.assertFutureThrows(InvalidRequestException.class, (Future)adminClient.createAcls(Collections.singleton(topicAcl.apply("othertopic"))).all());
            Assertions.assertEquals((Object)"ACLs not created since it will exceed the limit 100", (Object)e.getMessage());
        }
        LogicalCluster logicalCluster2 = this.physicalCluster.createLogicalCluster("anotherCluster", 100, new Integer[0]);
        try (AdminClient adminClient = this.testHarness.createAdminClient(logicalCluster2.adminUser());){
            adminClient.createAcls(Collections.singleton(topicAcl.apply("sometopic"))).all().get();
        }
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    private static int getAclCount(AdminClient adminClient) throws InterruptedException, ExecutionException {
        return ((Collection)adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get()).size();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclLimitDynamicallyUpdated(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        this.addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        Function<String, AclBinding> topicAcl = topic -> new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL), new AccessControlEntry(this.unprefixedKafkaPrincipal(this.user1), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        String tenant = this.logicalCluster.logicalClusterId();
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            int aclCount = MultiTenantAuthorizerTest.getAclCount(adminClient);
            int maxAcls = Integer.parseInt("100");
            for (int i = 0; i < maxAcls - aclCount; ++i) {
                adminClient.createAcls(Collections.singleton(topicAcl.apply("topic" + i))).all().get();
            }
            TestUtils.waitForCondition(() -> MultiTenantAuthorizerTest.getAclCount(adminClient) == maxAcls, (String)"ACLs are not fully created");
            TestUtils.assertFutureThrows(InvalidRequestException.class, (Future)adminClient.createAcls(Collections.singleton(topicAcl.apply("othertopic"))).all());
            this.verifyAclCountPerTenantMetric(maxAcls, maxAcls, "tenantA");
            double newAclsLimit = 100000.0;
            String topicName = "topic100";
            this.physicalCluster.superConfluentAdmin().alterClientQuotas(Collections.singletonList(new ClientQuotaAlteration(new ClientQuotaEntity(Collections.singletonMap("confluent-tenant", tenant)), Collections.singletonList(new ClientQuotaAlteration.Op("acl_count", Double.valueOf(100000.0)))))).all().get();
            TestUtils.retryOnExceptionWithTimeout(() -> {
                adminClient.createAcls(Collections.singleton((AclBinding)topicAcl.apply("topic100"))).all();
                Assertions.assertEquals((int)MultiTenantAuthorizerTest.getAclCount(adminClient), (int)(maxAcls + 1));
                this.verifyAclCountPerTenantMetric(maxAcls + 1, 100000, "tenantA");
            });
            adminClient.deleteAcls(Collections.singleton(new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, topicName, PatternType.LITERAL), new AccessControlEntryFilter(this.unprefixedKafkaPrincipal(this.user1), "*", AclOperation.WRITE, AclPermissionType.ALLOW)))).all();
            TestUtils.waitForCondition(() -> MultiTenantAuthorizerTest.getAclCount(adminClient) == maxAcls, (String)"ACLs are not deleted yet");
            this.verifyAclCountPerTenantMetric(maxAcls, 100000, "tenantA");
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclMultiTenantValidation(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        AclBinding user1TopicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, this.user1.withPrefix("test.topic"), PatternType.LITERAL), new AccessControlEntry(this.prefixedKafkaPrincipal(this.user1).toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding user1ConsumerGroupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, this.user1.withPrefix("test.consumer.group"), PatternType.LITERAL), new AccessControlEntry(this.prefixedKafkaPrincipal(this.user1).toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW));
        Map successResult = superAdminClient.createAcls(Arrays.asList(user1TopicAcl, user1ConsumerGroupAcl)).values();
        Assertions.assertEquals((int)2, (int)successResult.size());
        for (Map.Entry entry : successResult.entrySet()) {
            ((KafkaFuture)entry.getValue()).get();
            Assertions.assertTrue((boolean)((KafkaFuture)entry.getValue()).isDone());
            Assertions.assertFalse((boolean)((KafkaFuture)entry.getValue()).isCompletedExceptionally());
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclMultiTenantValidationFailureMultipleTenants(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        AclBinding user1TopicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, this.user1.withPrefix("test.topic"), PatternType.LITERAL), new AccessControlEntry(this.prefixedKafkaPrincipal(this.user1).toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        LogicalCluster logicalCluster2 = this.physicalCluster.createLogicalCluster("tenantB", "testOrg", "testEnv", 100, 1, 2);
        LogicalClusterUser logicalCluster2User = logicalCluster2.user(1);
        AclBinding user2TopicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, logicalCluster2User.withPrefix("test.topic"), PatternType.LITERAL), new AccessControlEntry(this.prefixedKafkaPrincipal(logicalCluster2User).toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        InvalidRequestException exception = (InvalidRequestException)TestUtils.assertFutureThrows(InvalidRequestException.class, (Future)superAdminClient.createAcls(Arrays.asList(user1TopicAcl, user2TopicAcl)).all());
        Assertions.assertTrue((boolean)exception.getMessage().startsWith("Internal error: Could not create ACLs because all principals are not in the same scope"));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAclMultiTenantValidationFailureMultiTenantsAndNonMultiTenants(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        AclBinding user1TopicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, this.user1.withPrefix("test.topic"), PatternType.LITERAL), new AccessControlEntry(this.prefixedKafkaPrincipal(this.user1).toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding nonMtTopicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(new KafkaPrincipal("User", "Bob").toString(), "*", AclOperation.READ, AclPermissionType.ALLOW));
        InvalidRequestException exception = (InvalidRequestException)TestUtils.assertFutureThrows(InvalidRequestException.class, (Future)superAdminClient.createAcls(Arrays.asList(nonMtTopicAcl, user1TopicAcl)).all());
        Assertions.assertTrue((boolean)exception.getMessage().startsWith("Internal error: Could not create ACLs because all principals are not in the same scope"));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAuthorizerDisabledUsingAclLimit(boolean useExternalAuthStore) throws Throwable {
        boolean maxAclsPerTenantZero = false;
        Properties nodeConfigOverrides = this.nodeConfigOverrides(false);
        nodeConfigOverrides.put("confluent.max.acls.per.tenant", String.valueOf(0));
        this.startTestHarness(nodeConfigOverrides);
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(this.unprefixedKafkaPrincipal(this.user1), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBindingFilter topicFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            try {
                adminClient.createAcls(Collections.singleton(topicAcl)).all().get();
            }
            catch (ExecutionException e) {
                this.verifyAclsDisabledException(e);
                this.verifyAclCountPerTenantMetric(0, 0, "tenantA");
            }
            try {
                adminClient.describeAcls(topicFilter).values().get();
            }
            catch (ExecutionException e) {
                this.verifyAclsDisabledException(e);
                this.verifyAclCountPerTenantMetric(0, 0, "tenantA");
            }
            try {
                adminClient.deleteAcls(Collections.singleton(topicFilter)).all().get();
            }
            catch (ExecutionException e) {
                this.verifyAclsDisabledException(e);
                this.verifyAclCountPerTenantMetric(0, 0, "tenantA");
            }
        }
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    private void verifyAclsDisabledException(ExecutionException e) {
        Throwable cause = e.getCause();
        Assertions.assertTrue((boolean)(cause instanceof InvalidRequestException), (String)("Unexpected exception: " + String.valueOf(cause)));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInvalidAcl(boolean useExternalAuthStore) {
        this.startTestHarness(useExternalAuthStore);
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            List<String> invalidPrincipals = Arrays.asList("", "userWithoutPrincipalType");
            invalidPrincipals.forEach(principal -> {
                AclBinding acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.WRITE, AclPermissionType.ALLOW));
                try {
                    adminClient.createAcls(Collections.singleton(acl)).all().get();
                    Assertions.fail((String)"createAcls didn't fail with invalid principal");
                }
                catch (Exception e) {
                    Assertions.assertTrue((e instanceof ExecutionException && e.getCause() instanceof InvalidRequestException ? 1 : 0) != 0, (String)("Invalid exception: " + String.valueOf(e)));
                }
            });
            AclBinding invalidResourceAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "invalidTopic$!", PatternType.LITERAL), new AccessControlEntry(this.unprefixedKafkaPrincipal(this.user1), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
            InvalidRequestException e = (InvalidRequestException)TestUtils.assertFutureThrows(InvalidRequestException.class, (Future)adminClient.createAcls(Collections.singleton(invalidResourceAcl)).all());
            Assertions.assertTrue((boolean)e.getMessage().startsWith("Internal error: Could not create ACLs because following resource names are invalid :"), (String)("Unexpected error message: " + e.getMessage()));
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testAuthorizeByResourceType(boolean useExternalAuthStore) throws Throwable {
        this.startTestHarness(useExternalAuthStore);
        this.physicalCluster.kafkaCluster().createTopic(this.user1.withPrefix("test.topic"), 1, 1);
        AdminClient adminClient = this.testHarness.createAdminClient(this.user1);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix("test.topic"), AclOperation.DESCRIBE, PatternType.LITERAL).execute();
        TestUtils.waitForCondition(() -> this.checkDescribeAuthorized(adminClient, "test.topic"), (String)"ACL not applied within timeout");
        this.verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix("prefix"), AclOperation.ALL, PatternType.PREFIXED).execute();
        TestUtils.waitForCondition(() -> this.checkDescribeAuthorized(adminClient, "prefix.topic1"), (String)"ACL not applied within timeout");
        this.physicalCluster.newAclCommand().addTopicDenyAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix("prefix"), AclOperation.ALL, PatternType.PREFIXED).execute();
        TestUtils.waitForCondition(() -> !this.checkDescribeAuthorized(adminClient, "prefix.topic2"), (String)"ACL not applied within timeout");
        this.verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.prefixedKafkaPrincipal(this.user2), this.user2.tenantPrefix(), AclOperation.ALL, PatternType.PREFIXED).execute();
        this.verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix("another"), AclOperation.ALL, PatternType.PREFIXED).execute();
        TestUtils.waitForCondition(() -> this.checkDescribeAuthorized(adminClient, "another.topic"), (String)"ACL not applied within timeout");
        this.verifyIdempotentProducer(this.user1, "test.topic", TopicAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.prefixedKafkaPrincipal(this.user1), this.user1.withPrefix("test.topic"), AclOperation.WRITE, PatternType.LITERAL).execute();
        this.verifyIdempotentProducer(this.user1, "test.topic", null);
    }

    protected Properties nodeConfigOverrides(boolean useExternalAuthStore) {
        Properties props = new Properties();
        props.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
        props.put("confluent.max.acls.per.tenant", "100");
        props.put("log.dir", TestUtils.tempDirectory().getAbsolutePath());
        if (useExternalAuthStore) {
            props.setProperty("confluent.metadata.rbac_auth.read.controller.enable", "true");
            props.setProperty("confluent.metadata.rbac_auth.update.controller.enable", "true");
        }
        props.put("confluent.broker.type.topic.enabled", "false");
        return props;
    }

    protected void addProducerAcls(LogicalClusterUser user, String topic, PatternType patternType) {
        this.physicalCluster.newAclCommand().produceAclArgs(this.prefixedKafkaPrincipal(user), user.withPrefix(topic), patternType).execute();
    }

    protected void addConsumerAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) {
        this.physicalCluster.newAclCommand().consumeAclArgs(this.prefixedKafkaPrincipal(user), user.withPrefix(topic), user.withPrefix(consumerGroup), patternType).execute();
    }

    protected void addTopicDenyAcls(LogicalClusterUser user, String topic, PatternType patternType) {
        this.physicalCluster.newAclCommand().addTopicDenyAclArgs(this.prefixedKafkaPrincipal(user), user.withPrefix(topic), AclOperation.ALL, patternType).execute();
    }

    private void addProducerAclsUsingAdminClient(LogicalClusterUser user, String topic, PatternType patternType) throws Exception {
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            this.addProducerAcls(adminClient, user, topic, patternType);
        }
    }

    private void addProducerAcls(AdminClient adminClient, LogicalClusterUser user, String topic, PatternType patternType) throws Exception {
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, patternType), new AccessControlEntry(this.unprefixedKafkaPrincipal(user), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", patternType), new AccessControlEntry(this.unprefixedKafkaPrincipal(user), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        adminClient.createAcls(Arrays.asList(topicAcl, clusterAcl)).all().get();
    }

    private void addConsumerAclsUsingAdminClient(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) throws Exception {
        try (AdminClient adminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());){
            this.addConsumerAcls(adminClient, user, topic, consumerGroup, patternType);
        }
    }

    private void addConsumerAcls(AdminClient adminClient, LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) throws Exception {
        List<AclBinding> acls = this.consumerAcls(user, topic, consumerGroup, patternType);
        adminClient.createAcls(acls).all().get();
    }

    private List<AclBinding> consumerAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) {
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, patternType), new AccessControlEntry(this.unprefixedKafkaPrincipal(user), "*", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding consumerGroupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, consumerGroup, patternType), new AccessControlEntry(this.unprefixedKafkaPrincipal(user), "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", patternType), new AccessControlEntry(this.unprefixedKafkaPrincipal(user), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        return Arrays.asList(topicAcl, consumerGroupAcl, clusterAcl);
    }

    protected KafkaPrincipal prefixedKafkaPrincipal(LogicalClusterUser user) {
        return user.prefixedKafkaPrincipal();
    }

    protected String unprefixedKafkaPrincipal(LogicalClusterUser user) {
        return user.unprefixedKafkaPrincipal().toString();
    }

    protected void verifyTopicAuthorizationFailure(LogicalClusterUser user, String topic) {
        try (KafkaProducer<String, String> producer = this.testHarness.createProducer(user, SecurityProtocol.SASL_PLAINTEXT);){
            try {
                producer.partitionsFor(topic);
                Assertions.fail((String)"Authorization should have failed");
            }
            catch (AuthorizationException authorizationException) {
                // empty catch block
            }
        }
    }

    protected void verifyConsumerGroupAuthorizationFailure(LogicalClusterUser user, String topic, String group) {
        try (KafkaConsumer<String, String> consumer = this.testHarness.createConsumer(user, group, SecurityProtocol.SASL_PLAINTEXT);){
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(Duration.ofSeconds(5L));
            Assertions.fail((String)"Authorization should have failed");
        }
        catch (AuthorizationException authorizationException) {
            // empty catch block
        }
    }

    private boolean checkAuthorized(KafkaConsumer<?, ?> consumer, String topic) {
        try {
            consumer.partitionsFor(topic).size();
            return true;
        }
        catch (AuthorizationException e) {
            return false;
        }
    }

    protected boolean checkDescribeAuthorized(AdminClient adminClient, String topic) {
        return this.checkDescribeAuthorized(adminClient, Arrays.asList(topic));
    }

    protected boolean checkDescribeAuthorized(AdminClient adminClient, Collection<String> topics) {
        try {
            try {
                adminClient.describeTopics(topics).allTopicNames().get(15L, TimeUnit.SECONDS);
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
            return true;
        }
        catch (AuthorizationException e) {
            return false;
        }
        catch (UnknownTopicOrPartitionException e) {
            return true;
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    protected void verifySuccessfulIdempotentProducerWithRetry(LogicalClusterUser user, String topic, int retryTimes) throws Throwable {
        for (int remainingRetryTimes = retryTimes; remainingRetryTimes > 0; --remainingRetryTimes) {
            try {
                this.verifyIdempotentProducer(user, topic, null);
            }
            catch (Exception e) {
                Thread.sleep(500L);
                continue;
            }
            return;
        }
        Assertions.fail((String)"retry time exceeded, could not verify idempotent producer");
    }

    protected void verifyIdempotentProducer(LogicalClusterUser user, String topic, Class<? extends AuthorizationException> exceptionClass) throws Throwable {
        Properties producerProps = KafkaTestUtils.producerProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), user.saslJaasConfig());
        producerProps.setProperty("enable.idempotence", "true");
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            if (exceptionClass != null) {
                KafkaException exception = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> KafkaTestUtils.sendRecords((KafkaProducer<String, String>)producer, topic, 0, 1));
                if (exception instanceof AuthorizationException) {
                    Assertions.assertTrue((boolean)exceptionClass.isInstance(exception));
                } else {
                    Assertions.assertTrue((boolean)exceptionClass.isInstance(exception.getCause()));
                }
            } else {
                KafkaTestUtils.sendRecords((KafkaProducer<String, String>)producer, topic, 0, 1);
            }
        }
    }

    public static void verifyAclCountMetric(int expectedValue) {
        Set metrics = KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).getName().equals("AclCount")).map(Map.Entry::getValue).collect(Collectors.toSet());
        Assertions.assertFalse((boolean)metrics.isEmpty());
        long value = metrics.stream().mapToLong(m -> ((Number)((Gauge)m).value()).longValue()).sum();
        Assertions.assertEquals((long)expectedValue, (long)value);
    }

    private void verifyAclCountPerTenantMetric(int expectedAcls, int expectedAclLimit, String tenantId) {
        boolean hasMetrics;
        KafkaBroker broker = this.physicalCluster.kafkaCluster().kafkaBrokers().get(0);
        List kafkaMetrics = TestUtils.getKafkaMetrics((Metrics)broker.metrics(), (String)"created-acls-count-per-tenant", Collections.singletonMap("tenant", tenantId));
        boolean bl = hasMetrics = kafkaMetrics != null && !kafkaMetrics.isEmpty();
        if (hasMetrics) {
            Assertions.assertEquals((int)1, (int)kafkaMetrics.size());
            Assertions.assertEquals((int)expectedAclLimit, (int)Integer.parseInt((String)((KafkaMetric)kafkaMetrics.get(0)).metricName().tags().get("max-acls")));
        }
        Assertions.assertEquals((int)expectedAcls, (int)(hasMetrics ? (Integer)((KafkaMetric)kafkaMetrics.get(0)).metricValue() : 0));
    }

    private void clearYammerMetrics() {
        for (MetricName metricName : KafkaYammerMetrics.defaultRegistry().allMetrics().keySet()) {
            KafkaYammerMetrics.defaultRegistry().removeMetric(metricName);
        }
    }

    private class ConsumerAcls {
        private final AdminClient adminClient;
        private final Set<AclBinding> acls;
        private final boolean tenantOnly;

        ConsumerAcls(AdminClient adminClient, boolean tenantOnly) throws Exception {
            this.adminClient = adminClient;
            this.tenantOnly = tenantOnly;
            this.acls = new HashSet<AclBinding>();
            this.acls.addAll(this.describeAcls(null, PatternType.ANY));
        }

        private void addAcls(LogicalClusterUser user, String topic, String consumerGroup, PatternType patternType) throws Exception {
            List<AclBinding> consumerAcls = MultiTenantAuthorizerTest.this.consumerAcls(user, topic, consumerGroup, patternType);
            this.adminClient.createAcls(consumerAcls).all().get();
            this.acls.addAll(consumerAcls);
        }

        private void deleteAcls(ResourceType resourceType, String resourceName, PatternType patternType, LogicalClusterUser user) throws Exception {
            String principal = user == null ? null : MultiTenantAuthorizerTest.this.unprefixedKafkaPrincipal(user);
            Collection deletedAcls = (Collection)this.adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType), new AccessControlEntryFilter(principal, null, AclOperation.ANY, AclPermissionType.ANY)))).all().get();
            this.acls.removeAll(deletedAcls);
        }

        private Set<AclBinding> describeAcls(String resourceName, PatternType patternType) throws Exception {
            Collection describedAcls = (Collection)this.adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, resourceName, patternType), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
            return new HashSet<AclBinding>(describedAcls);
        }

        private void verifyAllAcls(String resourceName, PatternType patternType) throws Exception {
            Set<AclBinding> describedAcls = this.describeAcls(resourceName, patternType);
            Assertions.assertEquals(this.acls, describedAcls);
        }

        private void verifyAcls(ResourceType resourceType, String resourceName, PatternType patternType, LogicalClusterUser user, String ... expectedResources) throws Exception {
            String principal = user == null ? null : MultiTenantAuthorizerTest.this.unprefixedKafkaPrincipal(user);
            Collection acls = (Collection)this.adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType), new AccessControlEntryFilter(principal, null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
            Set aclResources = acls.stream().map(acl -> acl.pattern().name()).collect(Collectors.toSet());
            Assertions.assertEquals(Set.of(expectedResources), aclResources);
            if (this.tenantOnly) {
                acls.forEach(acl -> Assertions.assertFalse((boolean)acl.entry().principal().contains(PhysicalCluster.BROKER_PRINCIPAL.getName()), (String)("Unexpected acl " + String.valueOf(acl))));
            }
        }
    }
}

