package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.acl.AclState;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantAuthorizerAclStateTest.class */
public class MultiTenantAuthorizerAclStateTest {
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;
    private final String logicalClusterId = KafkaLogicalClusterUtils.LC_META_ABC.logicalClusterId();
    private final String apiKeysTopic = "_confluent-apikey";
    private final String adminUserAPIkey = "APIKEY1";
    private final String adminUserAPIkeyPassword = "pwd1";
    private final String serviceUserAPIkey2 = "APIKEY2";
    private final String serviceUserAPIkeyPassword2 = "pwd2";
    private final String testTopic = "topic1";
    private final String testTopic2 = "topic2";
    private final String userId2 = "2";

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        KafkaTestUtils.verifyThreadCleanup();
        this.testHarness = new IntegrationTestHarness(testInfo, 3);
        long millis = 15000 + TimeUnit.SECONDS.toMillis(3L);
        this.physicalCluster = this.testHarness.startWithTopic(Arrays.asList("_confluent-apikey"), 1, 1, 15000L, brokerProps(millis, "0"), controllerProps(millis, "0"), Optional.empty());
        this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1, 2);
        loadApiKeys(this.physicalCluster, "/file_auth_test_apikeys.json", "APIKEY1");
        loadApiKeys(this.physicalCluster, "/service_account_apikey_2.json", "APIKEY2");
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
        KafkaTestUtils.verifyThreadCleanup();
    }

    private Properties brokerProps(long j, String str) throws IOException {
        Properties properties = new Properties();
        properties.put("listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("advertised.listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT, EXTERNAL:SASL_PLAINTEXT");
        properties.put("broker.id", str);
        properties.putAll(commonProps(j));
        return properties;
    }

    private Properties controllerProps(long j, String str) throws IOException {
        Properties properties = new Properties();
        properties.put("node.id", str);
        properties.putAll(commonProps(j));
        return properties;
    }

    private Properties commonProps(long j) throws IOException {
        Properties properties = new Properties();
        properties.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        properties.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        properties.put("listener.name.external.confluent.security.event.logger.authentication.enable", "true");
        properties.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
        properties.put("confluent.security.event.logger.multitenant.enable", "true");
        properties.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required;");
        properties.put("confluent.multitenant.listener.names", "EXTERNAL");
        properties.put("confluent.cdc.api.keys.topic", "_confluent-apikey");
        properties.put("confluent.cdc.api.keys.topic.load.timeout.ms", String.valueOf(j));
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        properties.put("confluent.close.connections.on.credential.delete", "true");
        properties.put("password.encoder.secret", "link-secret");
        properties.put("confluent.multitenant.authorizer.enable.acl.state", "true");
        properties.put("confluent.max.acls.per.tenant", "100");
        return properties;
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLifecycleAclState(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        try {
            try {
                AdminClient createPlainAuthAdminClient2 = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY2", "pwd2"));
                TestUtils.assertFutureError(createPlainAuthAdminClient2.createTopics(topicsList("topic1")).all(), TopicAuthorizationException.class);
                AclBinding aclBinding = topicAcl("2", "topic1");
                createPlainAuthAdminClient.createAcls(Collections.singleton(aclBinding)).all().get();
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    createPlainAuthAdminClient2.createTopics(topicsList("topic1")).all().get();
                });
                List list = (List) topicsList("topic1").stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList());
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Assertions.assertTrue(((Set) createPlainAuthAdminClient2.listTopics().names().get()).containsAll(list));
                });
                List asList = Arrays.asList(topicFilter("2"));
                ((KafkaFuture) ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.ACTIVE).values().get(topicFilter("2"))).get();
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    TestUtils.assertFutureThrows(createPlainAuthAdminClient2.deleteTopics(Arrays.asList("topic1")).all(), TopicAuthorizationException.class);
                });
                Collection collection = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, (AclBindingFilter) asList.get(0), new DescribeAclsOptions(), AclState.DELETED).values().get();
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Assertions.assertEquals(collection, Arrays.asList(aclBinding));
                });
                DeleteAclsResult.FilterResults filterResults = (DeleteAclsResult.FilterResults) ((KafkaFuture) ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.ACTIVE).values().get(asList.get(0))).get();
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Assertions.assertEquals(filterResults.values().size(), 0);
                });
                DeleteAclsResult.FilterResults filterResults2 = (DeleteAclsResult.FilterResults) ((KafkaFuture) ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.DELETED).values().get(topicFilter("2"))).get();
                Assertions.assertEquals(filterResults2.values().size(), 1);
                Assertions.assertEquals(((DeleteAclsResult.FilterResult) filterResults2.values().get(0)).binding(), aclBinding);
                Assertions.assertEquals(((DeleteAclsResult.FilterResult) filterResults2.values().get(0)).exception(), (Object) null);
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Assertions.assertEquals(((Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, (AclBindingFilter) asList.get(0), new DescribeAclsOptions(), AclState.ANY).values().get()).size(), 0);
                });
                if (createPlainAuthAdminClient != null) {
                    if (0 == 0) {
                        createPlainAuthAdminClient.close();
                        return;
                    }
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDeleteAcls(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                for (int i = 0; i < 10; i++) {
                    AclBinding aclBinding = topicAcl("2", "topic1" + i);
                    arrayList.add(aclBinding);
                    if (i < 2) {
                        arrayList2.add(aclBinding);
                    } else if (i > 2) {
                        arrayList3.add(aclBinding);
                    }
                }
                createPlainAuthAdminClient.createAcls(arrayList).all().get();
                AclBindingFilter aclBindingFilter = topicFilter("2");
                List asList = Arrays.asList(filterWithTopicName("2", "topic10"), filterWithTopicName("2", "topic11"));
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Collection collection = (Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.ANY).values().get();
                    Assertions.assertTrue(collection.containsAll(arrayList));
                    Assertions.assertEquals(collection.size(), arrayList.size());
                });
                Assertions.assertEquals(((Collection) ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.DELETED).all().get()).size(), 0);
                Map values = ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.ACTIVE).values();
                ((KafkaFuture) values.get(asList.get(0))).get();
                ((KafkaFuture) values.get(asList.get(1))).get();
                Collection collection = (Collection) ConfluentAdminUtils.describeAcls(createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.DELETED).values().get();
                Assertions.assertTrue(collection.containsAll(arrayList2));
                Assertions.assertEquals(collection.size(), 2);
                Assertions.assertEquals(((Collection) ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.ACTIVE).all().get()).size(), 0);
                Assertions.assertEquals(ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, Arrays.asList(filterWithTopicName("2", "topic10"), filterWithTopicName("2", "topic11"), filterWithTopicName("2", "topic12")), new DeleteAclsOptions(), AclState.ANY).values().size(), 3);
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Collection collection2 = (Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.ANY).values().get();
                    Assertions.assertEquals(collection2.size(), arrayList3.size());
                    Assertions.assertTrue(collection2.containsAll(arrayList3));
                });
                if (createPlainAuthAdminClient != null) {
                    if (0 == 0) {
                        createPlainAuthAdminClient.close();
                        return;
                    }
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDescribeAcls(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                for (int i = 0; i < 10; i++) {
                    AclBinding aclBinding = topicAcl("2", "topic1" + i);
                    arrayList.add(aclBinding);
                    if (i < 2) {
                        arrayList2.add(aclBinding);
                    } else {
                        arrayList3.add(aclBinding);
                    }
                }
                createPlainAuthAdminClient.createAcls(arrayList).all().get();
                AclBindingFilter aclBindingFilter = topicFilter("2");
                List asList = Arrays.asList(filterWithTopicName("2", "topic10"), filterWithTopicName("2", "topic11"));
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Collection collection = (Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.ANY).values().get();
                    Assertions.assertTrue(collection.containsAll(arrayList));
                    Assertions.assertEquals(collection.size(), arrayList.size());
                });
                Map values = ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, asList, new DeleteAclsOptions(), AclState.ACTIVE).values();
                ((KafkaFuture) values.get(asList.get(0))).get();
                ((KafkaFuture) values.get(asList.get(1))).get();
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Collection collection = (Collection) createPlainAuthAdminClient.describeAcls(aclBindingFilter).values().get();
                    Assertions.assertTrue(collection.containsAll(arrayList3));
                    Assertions.assertEquals(collection.size(), arrayList3.size());
                });
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Collection collection = (Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.DELETED).values().get();
                    Assertions.assertTrue(collection.containsAll(arrayList2));
                    Assertions.assertEquals(collection.size(), 2);
                });
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Collection collection = (Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.ANY).values().get();
                    Assertions.assertEquals(collection.size(), arrayList.size());
                    Assertions.assertTrue(collection.containsAll(arrayList));
                });
                if (createPlainAuthAdminClient != null) {
                    if (0 == 0) {
                        createPlainAuthAdminClient.close();
                        return;
                    }
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclLimit(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        for (int i = 0; i < 100; i++) {
            try {
                try {
                    createPlainAuthAdminClient.createAcls(Collections.singleton(topicAcl("2", "topic1" + i))).all().get();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (createPlainAuthAdminClient != null) {
                    if (th != null) {
                        try {
                            createPlainAuthAdminClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createPlainAuthAdminClient.close();
                    }
                }
                throw th3;
            }
        }
        TestUtils.assertFutureThrows(createPlainAuthAdminClient.createAcls(Collections.singleton(topicAcl("2", "topic2"))).all(), InvalidRequestException.class);
        ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, Arrays.asList(filterWithTopicName("2", "topic10"), filterWithTopicName("2", "topic11")), new DeleteAclsOptions(), AclState.ACTIVE).all().get();
        createPlainAuthAdminClient.createAcls(Arrays.asList(topicAcl("2", "topic21"), topicAcl("2", "topic22"))).all().get();
        TestUtils.assertFutureThrows(createPlainAuthAdminClient.createAcls(Collections.singleton(topicAcl("2", "topic23"))).all(), InvalidRequestException.class);
        if (createPlainAuthAdminClient != null) {
            if (0 == 0) {
                createPlainAuthAdminClient.close();
                return;
            }
            try {
                createPlainAuthAdminClient.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclDeleteResponse(String str) throws Exception {
        ConfluentAdmin createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        try {
            createPlainAuthAdminClient.createAcls(Collections.singleton(topicAcl("2", "topic1"))).all().get();
            AclBindingFilter aclBindingFilter = topicFilter("2");
            ConfluentAdminUtils.deleteAcls(createPlainAuthAdminClient, Collections.singletonList(aclBindingFilter), new DeleteAclsOptions(), AclState.ACTIVE).all().get();
            createPlainAuthAdminClient.createAcls(Collections.singleton(topicAcl("2", "topic12"))).all().get();
            DeleteAclsResult deleteAcls = createPlainAuthAdminClient.deleteAcls(Collections.singletonList(aclBindingFilter));
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertEquals(((Collection) ConfluentAdminUtils.describeAcls((ConfluentAdmin) createPlainAuthAdminClient, aclBindingFilter, new DescribeAclsOptions(), AclState.ANY).values().get()).size(), 0);
            });
            Assertions.assertEquals(((DeleteAclsResult.FilterResults) ((KafkaFuture) deleteAcls.values().get(aclBindingFilter)).get()).values().size(), 1);
            Assertions.assertEquals(((DeleteAclsResult.FilterResult) ((DeleteAclsResult.FilterResults) ((KafkaFuture) deleteAcls.values().get(aclBindingFilter)).get()).values().get(0)).binding(), topicAcl("2", "topic12"));
            if (createPlainAuthAdminClient != null) {
                if (0 == 0) {
                    createPlainAuthAdminClient.close();
                    return;
                }
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createPlainAuthAdminClient != null) {
                if (0 != 0) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th3;
        }
    }

    private AclBinding topicAcl(String str, String str2) {
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, str2, PatternType.LITERAL), new AccessControlEntry(new KafkaPrincipal("User", str).toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW));
    }

    private AclBindingFilter topicFilter(String str) {
        return topicFilter(str, "User");
    }

    private AclBindingFilter topicFilter(String str, String str2) {
        return new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, (String) null, PatternType.ANY), new AccessControlEntryFilter(new KafkaPrincipal(str2, str).toString(), (String) null, AclOperation.ANY, AclPermissionType.ANY));
    }

    private AclBindingFilter filterWithTopicName(String str, String str2) {
        return new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, str2, PatternType.ANY), new AccessControlEntryFilter(new KafkaPrincipal("User", str).toString(), (String) null, AclOperation.ANY, AclPermissionType.ANY));
    }

    private List<NewTopic> topicsList(String str) {
        return Collections.singletonList(new NewTopic(str, 3, (short) 1));
    }

    private void loadApiKeys(PhysicalCluster physicalCluster, String str, String str2) throws Exception {
        try {
            physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", str2, Utils.readFullyToString(new BufferedInputStream(FileBasedPlainSaslAuthenticatorTest.class.getResourceAsStream(str))), true);
        } catch (Exception e) {
            throw new Exception("Couldn't read apikeys content");
        }
    }
}
