package io.confluent.kafka.multitenant.integration.test;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantAuthorizerTest.class */
public class MultiTenantAuthorizerTest {
    private TestInfo testInfo;
    protected IntegrationTestHarness testHarness;
    private final String topic = "test.topic";
    private final String consumerGroup = "test.consumer.group";
    protected PhysicalCluster physicalCluster;
    private LogicalCluster logicalCluster;
    private LogicalClusterUser user1;
    private LogicalClusterUser user2;

    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantAuthorizerTest$ConsumerAcls.class */
    private class ConsumerAcls {
        private final AdminClient adminClient;
        private final Set<AclBinding> acls = new HashSet();
        private final boolean tenantOnly;

        ConsumerAcls(AdminClient adminClient, boolean z) throws Exception {
            this.adminClient = adminClient;
            this.tenantOnly = z;
            this.acls.addAll(describeAcls(null, PatternType.ANY));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addAcls(LogicalClusterUser logicalClusterUser, String str, String str2, PatternType patternType) throws Exception {
            List consumerAcls = MultiTenantAuthorizerTest.this.consumerAcls(logicalClusterUser, str, str2, patternType);
            this.adminClient.createAcls(consumerAcls).all().get();
            this.acls.addAll(consumerAcls);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void deleteAcls(ResourceType resourceType, String str, PatternType patternType, LogicalClusterUser logicalClusterUser) throws Exception {
            this.acls.removeAll((Collection) this.adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(new ResourcePatternFilter(resourceType, str, patternType), new AccessControlEntryFilter(logicalClusterUser == null ? null : logicalClusterUser.unprefixedKafkaPrincipal().toString(), (String) null, AclOperation.ANY, AclPermissionType.ANY)))).all().get());
        }

        private Set<AclBinding> describeAcls(String str, PatternType patternType) throws Exception {
            return new HashSet((Collection) this.adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, str, patternType), new AccessControlEntryFilter((String) null, (String) null, AclOperation.ANY, AclPermissionType.ANY))).values().get());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void verifyAllAcls(String str, PatternType patternType) throws Exception {
            Assertions.assertEquals(this.acls, describeAcls(str, patternType));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void verifyAcls(ResourceType resourceType, String str, PatternType patternType, LogicalClusterUser logicalClusterUser, String... strArr) throws Exception {
            Collection collection = (Collection) this.adminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(resourceType, str, patternType), new AccessControlEntryFilter(logicalClusterUser == null ? null : logicalClusterUser.unprefixedKafkaPrincipal().toString(), (String) null, AclOperation.ANY, AclPermissionType.ANY))).values().get();
            Assertions.assertEquals(Utils.mkSet(strArr), (Set) collection.stream().map(aclBinding -> {
                return aclBinding.pattern().name();
            }).collect(Collectors.toSet()));
            if (this.tenantOnly) {
                collection.forEach(aclBinding2 -> {
                    Assertions.assertFalse(aclBinding2.entry().principal().contains(PhysicalCluster.BROKER_PRINCIPAL.getName()), "Unexpected acl " + aclBinding2);
                });
            }
        }
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
    }

    @AfterEach
    public void tearDown() throws Exception {
        clearYammerMetrics();
        this.testHarness.shutdown();
    }

    protected void startTestHarness() {
        startTestHarness(nodeConfigOverrides());
    }

    protected void startTestHarness(Properties properties) {
        this.testHarness = new IntegrationTestHarness(this.testInfo);
        this.physicalCluster = this.testHarness.start(properties, properties, true, Optional.empty(), physicalCluster -> {
        });
        this.logicalCluster = this.physicalCluster.createLogicalCluster("tenantA", "testOrg", "testEnv", 100, 1, 2);
        this.user1 = this.logicalCluster.user(1);
        this.user2 = this.logicalCluster.user(2);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLiteralAcls(String str) throws Throwable {
        startTestHarness();
        verifyAclCountMetric(0);
        addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        verifyTopicAuthorizationFailure(this.user1, "sometopic");
        verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
        SecurityTestUtils.verifyConfluentLicense(this.physicalCluster.kafkaCluster(), null);
        verifyAclCountMetric(6);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPrefixedAcls(String str) throws Throwable {
        startTestHarness();
        addProducerAcls(this.user1, "test", PatternType.PREFIXED);
        addConsumerAcls(this.user2, "test", "test", PatternType.PREFIXED);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        verifyTopicAuthorizationFailure(this.user1, "sometopic");
        verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testWildcardAcls(String str) throws Throwable {
        startTestHarness();
        this.physicalCluster.newAclCommand().produceAclArgs(this.user1.prefixedKafkaPrincipal(), "*", PatternType.LITERAL).execute();
        this.physicalCluster.newAclCommand().consumeAclArgs(this.user2.prefixedKafkaPrincipal(), "*", "*", PatternType.LITERAL).execute();
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSuperUsers(String str) throws Throwable {
        startTestHarness();
        this.testHarness.produceConsume(this.logicalCluster.adminUser(), this.logicalCluster.adminUser(), "test.topic", "test.consumer.group", 0);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclUpdateInZooKeeper(String str) throws Throwable {
        startTestHarness();
        String str2 = "test.topic";
        this.physicalCluster.kafkaCluster().createTopic(this.user1.withPrefix("test.topic"), 3, 1);
        KafkaConsumer<String, String> createConsumer = this.testHarness.createConsumer(this.user1, "test.group", SecurityProtocol.SASL_PLAINTEXT);
        Throwable th = null;
        try {
            try {
                Assertions.assertFalse(checkAuthorized(createConsumer, "test.topic"));
                this.physicalCluster.newAclCommand().addTopicAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("test.topic"), AclOperation.DESCRIBE, PatternType.LITERAL).execute();
                this.physicalCluster.newAclCommand().addConsumerGroupAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("test.group"), AclOperation.DESCRIBE, PatternType.LITERAL).execute();
                TestUtils.waitForCondition(() -> {
                    return checkAuthorized(createConsumer, str2);
                }, "ACL not applied within timeout");
                if (createConsumer != null) {
                    if (0 == 0) {
                        createConsumer.close();
                        return;
                    }
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLogicalClusterScope(String str) throws Throwable {
        startTestHarness();
        addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        addConsumerAcls(this.user1, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user1, "test.topic", "test.consumer.group", 0);
        int userId = this.user1.userMetadata.userId();
        LogicalClusterUser user = this.physicalCluster.createLogicalCluster("anotherCluster", 100, Integer.valueOf(userId)).user(userId);
        verifyTopicAuthorizationFailure(user, "sometopic");
        verifyConsumerGroupAuthorizationFailure(user, "test.topic", "somegroup");
        addProducerAcls(user, "test.topic", PatternType.LITERAL);
        addConsumerAcls(user, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(user, user, "test.topic", "test.consumer.group", 0);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLiteralAclsUsingAdminClient(String str) throws Throwable {
        startTestHarness();
        addProducerAclsUsingAdminClient(this.user1, "test.topic", PatternType.LITERAL);
        addConsumerAclsUsingAdminClient(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        verifyTopicAuthorizationFailure(this.user1, "sometopic");
        verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testPrefixAclsUsingAdminClient(String str) throws Throwable {
        startTestHarness();
        addProducerAclsUsingAdminClient(this.user1, "test.", PatternType.PREFIXED);
        addConsumerAclsUsingAdminClient(this.user2, "test.", "test.", PatternType.PREFIXED);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        verifyTopicAuthorizationFailure(this.user1, "sometopic");
        verifyConsumerGroupAuthorizationFailure(this.user1, "test.topic", "somegroup");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testWildcardAclsUsingAdminClient(String str) throws Throwable {
        startTestHarness();
        addProducerAclsUsingAdminClient(this.user1, "*", PatternType.LITERAL);
        addConsumerAclsUsingAdminClient(this.user2, "*", "*", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclCreateDescribeDeleteUsingAdminClient(String str) throws Throwable {
        startTestHarness();
        this.physicalCluster.newAclCommand().topicBrokerReadAclArgs(PhysicalCluster.BROKER_PRINCIPAL).execute();
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        LogicalCluster createLogicalCluster = this.physicalCluster.createLogicalCluster("tenantB", 100, 11);
        LogicalClusterUser user = createLogicalCluster.user(11);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(createLogicalCluster.adminUser());
        ConsumerAcls consumerAcls = new ConsumerAcls(createAdminClient, true);
        ConsumerAcls consumerAcls2 = new ConsumerAcls(createAdminClient2, true);
        consumerAcls.addAcls(this.user1, "test1.topic", "test1.group", PatternType.LITERAL);
        consumerAcls.addAcls(this.user2, "prefixed.test2", "prefixed.test2", PatternType.PREFIXED);
        consumerAcls2.addAcls(user, "*", "*", PatternType.LITERAL);
        this.physicalCluster.kafkaCluster().createTopic("tenantA_test1.topic", 1, 1);
        this.physicalCluster.kafkaCluster().createTopic("tenantA_prefixed.test2.topic", 2, 1);
        this.physicalCluster.kafkaCluster().createTopic("tenantB_test1.topic", 1, 1);
        KafkaConsumer<String, String> createConsumer = this.testHarness.createConsumer(this.user1, "test1.group", SecurityProtocol.SASL_PLAINTEXT);
        KafkaConsumer<String, String> createConsumer2 = this.testHarness.createConsumer(this.user2, "prefixed.test2.group", SecurityProtocol.SASL_PLAINTEXT);
        KafkaConsumer<String, String> createConsumer3 = this.testHarness.createConsumer(user, "test1.group", SecurityProtocol.SASL_PLAINTEXT);
        Assertions.assertTrue(checkAuthorized(createConsumer, "test1.topic"));
        Assertions.assertFalse(checkAuthorized(createConsumer2, "test1.topic"));
        Assertions.assertTrue(checkAuthorized(createConsumer2, "prefixed.test2.topic"));
        Assertions.assertFalse(checkAuthorized(createConsumer, "prefixed.test2.topic"));
        Assertions.assertTrue(checkAuthorized(createConsumer3, "test1.topic"));
        ConsumerAcls consumerAcls3 = new ConsumerAcls(superAdminClient, false);
        consumerAcls3.verifyAllAcls(null, PatternType.ANY);
        consumerAcls3.verifyAllAcls(null, PatternType.MATCH);
        consumerAcls3.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, null, "*", "tenantA_test1.topic");
        consumerAcls3.verifyAcls(ResourceType.GROUP, null, PatternType.PREFIXED, null, "tenantA_prefixed.test2", "tenantB_");
        consumerAcls3.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, "*", "tenantA_test1.topic", "tenantA_test1.group", "tenantA_kafka-cluster", "tenantB_kafka-cluster");
        consumerAcls3.verifyAcls(ResourceType.TOPIC, null, PatternType.ANY, null, "*", "tenantA_test1.topic", "tenantA_prefixed.test2", "tenantB_");
        consumerAcls.verifyAllAcls(null, PatternType.ANY);
        consumerAcls.verifyAllAcls(null, PatternType.MATCH);
        consumerAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, null, "test1.topic");
        consumerAcls.verifyAcls(ResourceType.GROUP, null, PatternType.PREFIXED, null, "prefixed.test2");
        consumerAcls.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, "test1.topic", "test1.group", "kafka-cluster");
        consumerAcls.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, this.user1, "test1.topic", "test1.group", "kafka-cluster");
        consumerAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.ANY, null, "test1.topic", "prefixed.test2");
        consumerAcls2.verifyAllAcls(null, PatternType.ANY);
        consumerAcls2.verifyAcls(ResourceType.TOPIC, "*", PatternType.LITERAL, user, "*");
        consumerAcls2.verifyAcls(ResourceType.TOPIC, "*", PatternType.ANY, user, "*");
        consumerAcls2.verifyAcls(ResourceType.TOPIC, "*", PatternType.PREFIXED, user, new String[0]);
        consumerAcls2.verifyAcls(ResourceType.TOPIC, null, PatternType.LITERAL, user, "*");
        consumerAcls2.verifyAcls(ResourceType.ANY, null, PatternType.LITERAL, null, "*", "kafka-cluster");
        consumerAcls2.verifyAcls(ResourceType.TOPIC, null, PatternType.PREFIXED, user, new String[0]);
        consumerAcls2.verifyAcls(ResourceType.ANY, "kafka-cluster", PatternType.LITERAL, user, "kafka-cluster");
        consumerAcls3.verifyAcls(ResourceType.ANY, "tenantA_prefixed.test2.topic", PatternType.MATCH, null, "tenantA_prefixed.test2", "*");
        consumerAcls3.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, null, "tenantA_test1.topic", "tenantA_prefixed.test2", "tenantB_", "*");
        consumerAcls.verifyAcls(ResourceType.ANY, "prefixed.test2.topic", PatternType.MATCH, null, "prefixed.test2");
        consumerAcls.verifyAcls(ResourceType.ANY, "prefixed.test2.topic", PatternType.MATCH, this.user2, "prefixed.test2");
        consumerAcls.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, this.user1, "test1.topic");
        consumerAcls2.verifyAcls(ResourceType.TOPIC, "test", PatternType.MATCH, user, "*");
        consumerAcls2.verifyAcls(ResourceType.TOPIC, "*", PatternType.MATCH, user, "*");
        consumerAcls2.verifyAcls(ResourceType.TOPIC, null, PatternType.MATCH, null, "*");
        Assertions.assertTrue(checkAuthorized(createConsumer, "test1.topic"));
        consumerAcls.deleteAcls(ResourceType.TOPIC, "test1.topic", PatternType.LITERAL, null);
        Assertions.assertFalse(checkAuthorized(createConsumer, "test1.topic"));
        consumerAcls.verifyAllAcls(null, PatternType.ANY);
        Assertions.assertTrue(checkAuthorized(createConsumer2, "prefixed.test2.topic"));
        consumerAcls.deleteAcls(ResourceType.TOPIC, "prefixed.test2.topic", PatternType.MATCH, this.user2);
        Assertions.assertFalse(checkAuthorized(createConsumer2, "prefixed.test2.topic"));
        consumerAcls.verifyAllAcls(null, PatternType.ANY);
        Assertions.assertTrue(checkAuthorized(createConsumer3, "test1.topic"));
        consumerAcls2.deleteAcls(ResourceType.TOPIC, null, PatternType.MATCH, user);
        Assertions.assertFalse(checkAuthorized(createConsumer3, "test1.topic"));
        consumerAcls2.verifyAllAcls(null, PatternType.MATCH);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclLimit(String str) throws Throwable {
        startTestHarness();
        addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        Function function = str2 -> {
            return new AclBinding(new ResourcePattern(ResourceType.TOPIC, str2, PatternType.LITERAL), new AccessControlEntry(this.user1.unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        };
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        Throwable th = null;
        try {
            int size = ((Collection) createAdminClient.describeAcls(new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, (String) null, PatternType.ANY), new AccessControlEntryFilter((String) null, (String) null, AclOperation.ANY, AclPermissionType.ANY))).values().get()).size();
            for (int i = 0; i < 100 - size; i++) {
                createAdminClient.createAcls(Collections.singleton(function.apply("topic" + i))).all().get();
            }
            TestUtils.assertFutureThrows(createAdminClient.createAcls(Collections.singleton(function.apply("othertopic"))).all(), InvalidRequestException.class);
            if (createAdminClient != null) {
                if (0 != 0) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            AdminClient createAdminClient2 = this.testHarness.createAdminClient(this.physicalCluster.createLogicalCluster("anotherCluster", 100, new Integer[0]).adminUser());
            Throwable th3 = null;
            try {
                try {
                    createAdminClient2.createAcls(Collections.singleton(function.apply("sometopic"))).all().get();
                    if (createAdminClient2 != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            createAdminClient2.close();
                        }
                    }
                    this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
                } finally {
                }
            } catch (Throwable th5) {
                if (createAdminClient2 != null) {
                    if (th3 != null) {
                        try {
                            createAdminClient2.close();
                        } catch (Throwable th6) {
                            th3.addSuppressed(th6);
                        }
                    } else {
                        createAdminClient2.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (createAdminClient != null) {
                if (0 != 0) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th7;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAuthorizerDisabledUsingAclLimit(String str) throws Throwable {
        Properties nodeConfigOverrides = nodeConfigOverrides();
        nodeConfigOverrides.put("confluent.max.acls.per.tenant", "0");
        startTestHarness(nodeConfigOverrides);
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(this.user1.unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBindingFilter aclBindingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, (String) null, PatternType.ANY), new AccessControlEntryFilter((String) null, (String) null, AclOperation.ANY, AclPermissionType.ANY));
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        Throwable th = null;
        try {
            try {
                try {
                    createAdminClient.createAcls(Collections.singleton(aclBinding)).all().get();
                } finally {
                }
            } catch (ExecutionException e) {
                verifyAclsDisabledException(e);
            }
            try {
                createAdminClient.describeAcls(aclBindingFilter).values().get();
            } catch (ExecutionException e2) {
                verifyAclsDisabledException(e2);
            }
            try {
                createAdminClient.deleteAcls(Collections.singleton(aclBindingFilter)).all().get();
            } catch (ExecutionException e3) {
                verifyAclsDisabledException(e3);
            }
            if (createAdminClient != null) {
                if (0 != 0) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    private void verifyAclsDisabledException(ExecutionException executionException) {
        Throwable cause = executionException.getCause();
        Assertions.assertTrue(cause instanceof InvalidRequestException, "Unexpected exception: " + cause);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidAcl(String str) {
        startTestHarness();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        Throwable th = null;
        try {
            try {
                Arrays.asList("", "userWithoutPrincipalType").forEach(str2 -> {
                    try {
                        createAdminClient.createAcls(Collections.singleton(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test.topic", PatternType.LITERAL), new AccessControlEntry(str2, "*", AclOperation.WRITE, AclPermissionType.ALLOW)))).all().get();
                        Assertions.fail("createAcls didn't fail with invalid principal");
                    } catch (Exception e) {
                        Assertions.assertTrue((e instanceof ExecutionException) && (e.getCause() instanceof InvalidRequestException), "Invalid exception: " + e);
                    }
                });
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAuthorizeByResourceType(String str) throws Throwable {
        startTestHarness();
        this.physicalCluster.kafkaCluster().createTopic(this.user1.withPrefix("test.topic"), 1, 1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.user1);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("test.topic"), AclOperation.DESCRIBE, PatternType.LITERAL).execute();
        TestUtils.waitForCondition(() -> {
            return checkDescribeAuthorized(createAdminClient, "test.topic");
        }, "ACL not applied within timeout");
        verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("prefix"), AclOperation.ALL, PatternType.PREFIXED).execute();
        TestUtils.waitForCondition(() -> {
            return checkDescribeAuthorized(createAdminClient, "prefix.topic1");
        }, "ACL not applied within timeout");
        this.physicalCluster.newAclCommand().addTopicDenyAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("prefix"), AclOperation.ALL, PatternType.PREFIXED).execute();
        TestUtils.waitForCondition(() -> {
            return !checkDescribeAuthorized(createAdminClient, "prefix.topic2");
        }, "ACL not applied within timeout");
        verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.user2.prefixedKafkaPrincipal(), "*", AclOperation.ALL, PatternType.LITERAL).execute();
        verifyIdempotentProducer(this.user1, "test.topic", ClusterAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("another"), AclOperation.ALL, PatternType.PREFIXED).execute();
        TestUtils.waitForCondition(() -> {
            return checkDescribeAuthorized(createAdminClient, "another.topic");
        }, "ACL not applied within timeout");
        verifyIdempotentProducer(this.user1, "test.topic", TopicAuthorizationException.class);
        this.physicalCluster.newAclCommand().addTopicAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("test.topic"), AclOperation.WRITE, PatternType.LITERAL).execute();
        verifyIdempotentProducer(this.user1, "test.topic", null);
    }

    protected Properties nodeConfigOverrides() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put("confluent.max.acls.per.tenant", "100");
        return properties;
    }

    protected void addProducerAcls(LogicalClusterUser logicalClusterUser, String str, PatternType patternType) {
        this.physicalCluster.newAclCommand().produceAclArgs(logicalClusterUser.prefixedKafkaPrincipal(), logicalClusterUser.withPrefix(str), patternType).execute();
    }

    protected void addConsumerAcls(LogicalClusterUser logicalClusterUser, String str, String str2, PatternType patternType) {
        this.physicalCluster.newAclCommand().consumeAclArgs(logicalClusterUser.prefixedKafkaPrincipal(), logicalClusterUser.withPrefix(str), logicalClusterUser.withPrefix(str2), patternType).execute();
    }

    protected void addTopicDenyAcls(LogicalClusterUser logicalClusterUser, String str, PatternType patternType) {
        this.physicalCluster.newAclCommand().addTopicDenyAclArgs(logicalClusterUser.prefixedKafkaPrincipal(), logicalClusterUser.withPrefix(str), AclOperation.ALL, patternType).execute();
    }

    private void addProducerAclsUsingAdminClient(LogicalClusterUser logicalClusterUser, String str, PatternType patternType) throws Exception {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        Throwable th = null;
        try {
            try {
                addProducerAcls(createAdminClient, logicalClusterUser, str, patternType);
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private void addProducerAcls(AdminClient adminClient, LogicalClusterUser logicalClusterUser, String str, PatternType patternType) throws Exception {
        adminClient.createAcls(Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, patternType), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", patternType), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)))).all().get();
    }

    private void addConsumerAclsUsingAdminClient(LogicalClusterUser logicalClusterUser, String str, String str2, PatternType patternType) throws Exception {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster.adminUser());
        Throwable th = null;
        try {
            try {
                addConsumerAcls(createAdminClient, logicalClusterUser, str, str2, patternType);
                if (createAdminClient != null) {
                    if (0 == 0) {
                        createAdminClient.close();
                        return;
                    }
                    try {
                        createAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private void addConsumerAcls(AdminClient adminClient, LogicalClusterUser logicalClusterUser, String str, String str2, PatternType patternType) throws Exception {
        adminClient.createAcls(consumerAcls(logicalClusterUser, str, str2, patternType)).all().get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<AclBinding> consumerAcls(LogicalClusterUser logicalClusterUser, String str, String str2, PatternType patternType) {
        return Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, patternType), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, str2, patternType), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", patternType), new AccessControlEntry(logicalClusterUser.unprefixedKafkaPrincipal().toString(), "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
    }

    protected void verifyTopicAuthorizationFailure(LogicalClusterUser logicalClusterUser, String str) {
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(logicalClusterUser, SecurityProtocol.SASL_PLAINTEXT);
        Throwable th = null;
        try {
            try {
                createProducer.partitionsFor(str);
                Assertions.fail("Authorization should have failed");
            } catch (AuthorizationException e) {
            }
            if (createProducer != null) {
                if (0 == 0) {
                    createProducer.close();
                    return;
                }
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createProducer != null) {
                if (0 != 0) {
                    try {
                        createProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createProducer.close();
                }
            }
            throw th3;
        }
    }

    protected void verifyConsumerGroupAuthorizationFailure(LogicalClusterUser logicalClusterUser, String str, String str2) {
        try {
            KafkaConsumer<String, String> createConsumer = this.testHarness.createConsumer(logicalClusterUser, str2, SecurityProtocol.SASL_PLAINTEXT);
            Throwable th = null;
            try {
                try {
                    createConsumer.subscribe(Collections.singleton(str));
                    createConsumer.poll(Duration.ofSeconds(5L));
                    Assertions.fail("Authorization should have failed");
                    if (createConsumer != null) {
                        if (0 != 0) {
                            try {
                                createConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createConsumer.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (AuthorizationException e) {
        }
    }

    private boolean checkAuthorized(KafkaConsumer<?, ?> kafkaConsumer, String str) {
        try {
            kafkaConsumer.partitionsFor(str).size();
            return true;
        } catch (AuthorizationException e) {
            return false;
        }
    }

    protected boolean checkDescribeAuthorized(AdminClient adminClient, String str) {
        return checkDescribeAuthorized(adminClient, Arrays.asList(str));
    }

    protected boolean checkDescribeAuthorized(AdminClient adminClient, Collection<String> collection) {
        try {
            try {
                adminClient.describeTopics(collection).allTopicNames().get(15L, TimeUnit.SECONDS);
                return true;
            } catch (ExecutionException e) {
                throw e.getCause();
            }
        } catch (AuthorizationException e2) {
            return false;
        } catch (UnknownTopicOrPartitionException e3) {
            return true;
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    protected void verifyIdempotentProducer(LogicalClusterUser logicalClusterUser, String str, Class<? extends AuthorizationException> cls) throws Throwable {
        Properties producerProps = KafkaTestUtils.producerProps(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), logicalClusterUser.saslJaasConfig());
        producerProps.setProperty("enable.idempotence", "true");
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps);
        Throwable th = null;
        try {
            if (cls != null) {
                KafkaException assertThrows = Assertions.assertThrows(KafkaException.class, () -> {
                    KafkaTestUtils.sendRecords(kafkaProducer, str, 0, 1);
                });
                if (assertThrows instanceof AuthorizationException) {
                    Assertions.assertTrue(cls.isInstance(assertThrows));
                } else {
                    Assertions.assertTrue(cls.isInstance(assertThrows.getCause()));
                }
            } else {
                KafkaTestUtils.sendRecords(kafkaProducer, str, 0, 1);
            }
            if (kafkaProducer != null) {
                if (0 == 0) {
                    kafkaProducer.close();
                    return;
                }
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public static void verifyAclCountMetric(int i) {
        Set set = (Set) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(entry -> {
            return ((MetricName) entry.getKey()).getName().equals("AclCount");
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toSet());
        Assertions.assertFalse(set.isEmpty());
        Assertions.assertEquals(i, set.stream().mapToLong(metric -> {
            return ((Number) ((Gauge) metric).value()).longValue();
        }).sum());
    }

    private void clearYammerMetrics() {
        Iterator it = KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().iterator();
        while (it.hasNext()) {
            KafkaYammerMetrics.defaultRegistry().removeMetric((MetricName) it.next());
        }
    }
}
