package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import io.confluent.security.authorizer.AclAccessRule;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.ConfluentAuthorizationEvent;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.security.sasl.SaslServer;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.server.audit.DefaultAuthenticationEvent;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantAuditLogTest.class */
public class MultiTenantAuditLogTest {
    protected IntegrationTestHarness testHarness;
    protected String brokerUUID = "broker-uuid";
    protected String controllerUUID = "controller-uuid";
    private final String topic = "test.topic";
    private final String consumerGroup = "test.consumer.group";
    private final String logicalClusterId = "lkc-1234";
    protected PhysicalCluster physicalCluster;
    private LogicalCluster logicalCluster;
    private LogicalClusterUser user1;
    private LogicalClusterUser user2;
    private TestInfo testInfo;

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
        MockAuditLogProvider.reset();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    @BeforeAll
    public static void verifyUnexpectedBefore() throws Exception {
        TestUtils.verifyNoUnexpectedThreads();
    }

    @AfterAll
    public static void verifyUnexpectedAfter() throws Exception {
        TestUtils.verifyNoUnexpectedThreads();
    }

    protected void startTestHarness(boolean z) throws Exception {
        this.testHarness = new IntegrationTestHarness(this.testInfo);
        this.physicalCluster = this.testHarness.start(brokerProps(z), controllerProps(z));
        this.logicalCluster = this.physicalCluster.createLogicalCluster("lkc-1234", "testOrg", "testEnv", 100, 1, 2);
        this.user1 = this.logicalCluster.user(1);
        this.user2 = this.logicalCluster.user(2);
        TestUtils.waitForCondition(this::auditLoggerReady, 10000L, "Audit Logger Ready");
    }

    private boolean auditLoggerReady() {
        try {
            if (this.physicalCluster.kafkaCluster().kafkas().isEmpty()) {
                return false;
            }
            Iterator<EmbeddedKafka> it = this.physicalCluster.kafkaCluster().kafkas().iterator();
            while (it.hasNext() && ((MultiTenantAuthorizer) it.next().kafkaBroker().authorizer().get()).isAuditLogEnabled()) {
            }
            return true;
        } catch (ClassCastException e) {
            return false;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDisabled(String str) throws Throwable {
        startTestHarness(false);
        addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        Assertions.assertTrue(MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.isEmpty());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCreateTopicAuthorization(String str) throws Exception {
        startTestHarness(true);
        this.testHarness.newAclCommand().addTopicAclArgs(this.user1.prefixedKafkaPrincipal(), this.user1.withPrefix("foo"), AclOperation.CREATE, PatternType.LITERAL).execute();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.user1);
        Throwable th = null;
        try {
            try {
                createAdminClient.createTopics(Collections.singleton(new NewTopic("foo", Optional.empty(), Optional.empty()))).all().get();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                List<ConfluentAuthorizationEvent> collectUserAuthorizationEvents = collectUserAuthorizationEvents(this.testHarness.isKraft() ? this.controllerUUID : this.brokerUUID, this.user1);
                Assertions.assertEquals(1, collectUserAuthorizationEvents.size());
                ConfluentAuthorizationEvent confluentAuthorizationEvent = collectUserAuthorizationEvents.get(0);
                Assertions.assertEquals(AuthorizeResult.ALLOWED, confluentAuthorizationEvent.authorizeResult());
                Assertions.assertEquals(this.user1.unprefixedKafkaPrincipal(), confluentAuthorizationEvent.requestContext().principal());
                Assertions.assertEquals("Topic", confluentAuthorizationEvent.action().resourceType().name());
                Assertions.assertEquals("foo", confluentAuthorizationEvent.action().resourceName());
                Assertions.assertTrue(confluentAuthorizationEvent.requestContext().kafkaRequestId() > 0);
                if (this.testHarness.isKraft()) {
                    return;
                }
                Assertions.assertTrue(confluentAuthorizationEvent.requestContext().sessionId() > 0);
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private List<ConfluentAuthorizationEvent> collectUserAuthorizationEvents(String str, LogicalClusterUser logicalClusterUser) {
        return (List) MockAuditLogProvider.getInstance(str).authorizationLog.stream().filter(confluentAuthorizationEvent -> {
            return confluentAuthorizationEvent.requestContext().principal().toString().equals(logicalClusterUser.unprefixedKafkaPrincipal().toString());
        }).collect(Collectors.toList());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testLiteralAcls(String str) throws Throwable {
        startTestHarness(true);
        addProducerAcls(this.user1, "test.topic", PatternType.LITERAL);
        addConsumerAcls(this.user2, "test.topic", "test.consumer.group", PatternType.LITERAL);
        this.testHarness.produceConsume(this.user1, this.user2, "test.topic", "test.consumer.group", 0);
        Assertions.assertEquals(1, ((List) MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.stream().filter(confluentAuthorizationEvent -> {
            return confluentAuthorizationEvent.requestContext().principal().toString().equals("User:1") && confluentAuthorizationEvent.action().resourceName().equals("test.topic") && confluentAuthorizationEvent.action().operation().name().equals("Write") && (confluentAuthorizationEvent.authorizePolicy() instanceof AclAccessRule) && confluentAuthorizationEvent.authorizePolicy().resourcePattern().name().equals("test.topic");
        }).collect(Collectors.toList())).size());
        Assertions.assertFalse(((List) MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.stream().filter(confluentAuthorizationEvent2 -> {
            return confluentAuthorizationEvent2.requestContext().principal().toString().equals("User:2") && confluentAuthorizationEvent2.action().resourceName().equals("test.topic") && confluentAuthorizationEvent2.action().operation().name().equals("Read") && (confluentAuthorizationEvent2.authorizePolicy() instanceof AclAccessRule) && confluentAuthorizationEvent2.authorizePolicy().resourcePattern().name().equals("test.topic");
        }).collect(Collectors.toList())).isEmpty());
        Assertions.assertFalse(((List) MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.stream().filter(confluentAuthorizationEvent3 -> {
            return confluentAuthorizationEvent3.requestContext().principal().toString().equals("User:2") && confluentAuthorizationEvent3.action().resourceName().equals("test.consumer.group") && confluentAuthorizationEvent3.action().operation().name().equals("Read") && (confluentAuthorizationEvent3.authorizePolicy() instanceof AclAccessRule) && confluentAuthorizationEvent3.authorizePolicy().resourcePattern().name().equals("test.consumer.group");
        }).collect(Collectors.toList())).isEmpty());
        Assertions.assertFalse(MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.stream().anyMatch(confluentAuthorizationEvent4 -> {
            return confluentAuthorizationEvent4.requestContext().principal().toString().contains("TenantUser:");
        }));
        List list = (List) MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.stream().filter(confluentAuthorizationEvent5 -> {
            return confluentAuthorizationEvent5.requestContext().principal().toString().equals("User:1") || confluentAuthorizationEvent5.requestContext().principal().toString().equals("User:2");
        }).collect(Collectors.toList());
        Scope build = new Scope.Builder(new String[0]).addPath("organization=" + this.logicalCluster.orgId()).addPath("environment=" + this.logicalCluster.envId()).addPath("cloud-cluster=" + this.logicalCluster.logicalClusterId()).withKafkaCluster("lkc-1234").build();
        Assertions.assertTrue(list.stream().allMatch(confluentAuthorizationEvent6 -> {
            return confluentAuthorizationEvent6.sourceScope().equals(build);
        }));
        Assertions.assertTrue(list.stream().allMatch(confluentAuthorizationEvent7 -> {
            return confluentAuthorizationEvent7.action().scope().equals(build);
        }));
        Assertions.assertTrue(list.stream().allMatch(confluentAuthorizationEvent8 -> {
            return (!(confluentAuthorizationEvent8.authorizePolicy() instanceof AclAccessRule) || confluentAuthorizationEvent8.authorizePolicy().resourcePattern().name().contains("lkc-1234") || confluentAuthorizationEvent8.authorizePolicy().aclBinding().entry().principal().startsWith("TenantUser:")) ? false : true;
        }));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClusterResource(String str) throws Throwable {
        startTestHarness(true);
        try {
            AdminClient createAdminClient = this.testHarness.createAdminClient(this.user1);
            Throwable th = null;
            try {
                try {
                    createAdminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof ClusterAuthorizationException)) {
                throw e;
            }
        }
        Assertions.assertEquals(1, ((List) MockAuditLogProvider.getInstance(this.brokerUUID).authorizationLog.stream().filter(confluentAuthorizationEvent -> {
            return confluentAuthorizationEvent.requestContext().principal().toString().equals("User:1") && confluentAuthorizationEvent.action().resourceName().equals("kafka-cluster") && confluentAuthorizationEvent.action().operation().name().equals("DescribeConfigs") && confluentAuthorizationEvent.authorizeResult() == AuthorizeResult.DENIED;
        }).collect(Collectors.toList())).size());
    }

    protected Properties brokerProps(boolean z) throws IOException {
        Properties nodeProps = nodeProps(z);
        nodeProps.put(KafkaConfig$.MODULE$.BrokerSessionUuidProp(), this.brokerUUID);
        return nodeProps;
    }

    protected Properties controllerProps(boolean z) throws IOException {
        Properties nodeProps = nodeProps(z);
        nodeProps.put(KafkaConfig$.MODULE$.BrokerSessionUuidProp(), this.controllerUUID);
        return nodeProps;
    }

    protected Properties nodeProps(boolean z) throws IOException {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put("confluent.max.acls.per.tenant", "100");
        if (z) {
            properties.put("confluent.security.event.logger.multitenant.enable", "true");
        }
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        return properties;
    }

    private void addProducerAcls(LogicalClusterUser logicalClusterUser, String str, PatternType patternType) {
        this.testHarness.newAclCommand().produceAclArgs(logicalClusterUser.prefixedKafkaPrincipal(), logicalClusterUser.withPrefix(str), patternType).execute();
    }

    private void addConsumerAcls(LogicalClusterUser logicalClusterUser, String str, String str2, PatternType patternType) {
        this.testHarness.newAclCommand().consumeAclArgs(logicalClusterUser.prefixedKafkaPrincipal(), logicalClusterUser.withPrefix(str), logicalClusterUser.withPrefix(str2), patternType).execute();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAuthenticationEvent(String str) throws Throwable {
        startTestHarness(true);
        MockAuditLogProvider mockAuditLogProvider = MockAuditLogProvider.getInstance(this.brokerUUID);
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("0", new TenantMetadata("lkc-12345", "lkc-12345"));
        Assertions.assertEquals("TenantUser", multiTenantPrincipal.getPrincipalType());
        Assertions.assertTrue(multiTenantPrincipal.toString().contains("tenantMetadata"));
        SaslAuthenticationContext saslAuthenticationContext = new SaslAuthenticationContext(1111111111111L, (SaslServer) Mockito.mock(SaslServer.class), SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_SSL.name());
        Scope kafkaClusterScope = Scope.kafkaClusterScope("ABC123");
        mockAuditLogProvider.logEvent(new ConfluentAuthenticationEvent(new DefaultAuthenticationEvent(multiTenantPrincipal, saslAuthenticationContext, AuditEventStatus.SUCCESS), kafkaClusterScope));
        ConfluentAuthenticationEvent lastAuthenticationEntry = mockAuditLogProvider.lastAuthenticationEntry();
        Assertions.assertEquals("User:0", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).toString());
        Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEntry.principal().get()).getPrincipalType());
        Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEntry.principal().get()).toString().contains("tenantMetadata"));
        Assertions.assertTrue(lastAuthenticationEntry.getScope().toString().contains("kafka-cluster=lkc-12345"));
        Assertions.assertFalse(lastAuthenticationEntry.getScope().toString().contains("ABC123"));
        Assertions.assertEquals(lastAuthenticationEntry.authenticationContext().sessionId(), 1111111111111L);
        mockAuditLogProvider.logEvent(new ConfluentAuthenticationEvent(new DefaultAuthenticationEvent((KafkaPrincipal) null, saslAuthenticationContext, AuditEventStatus.UNKNOWN_USER_DENIED, new SslAuthenticationException("Ssl handshake failed")), kafkaClusterScope));
        ConfluentAuthenticationEvent lastAuthenticationEntry2 = mockAuditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry2.principal().isPresent());
        Assertions.assertTrue(0 == 0 || lastAuthenticationEntry2.getScope().toString().contains("lkc-1234"));
        mockAuditLogProvider.logEvent(new ConfluentAuthenticationEvent(new DefaultAuthenticationEvent((KafkaPrincipal) null, saslAuthenticationContext, AuditEventStatus.UNKNOWN_USER_DENIED, new SslAuthenticationException("username not specified", AuthenticationErrorInfo.UNKNOWN_USER_ERROR)), kafkaClusterScope));
        ConfluentAuthenticationEvent lastAuthenticationEntry3 = mockAuditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry3.principal().isPresent());
        Assertions.assertTrue(0 == 0 || lastAuthenticationEntry3.getScope().toString().contains("lkc-1234"));
        mockAuditLogProvider.logEvent(new ConfluentAuthenticationEvent(new DefaultAuthenticationEvent((KafkaPrincipal) null, saslAuthenticationContext, AuditEventStatus.UNAUTHENTICATED, new SaslAuthenticationException("Bad password for user", new AuthenticationErrorInfo(AuditEventStatus.UNAUTHENTICATED, "", "APIKEY123", "lkc123"))), kafkaClusterScope));
        ConfluentAuthenticationEvent lastAuthenticationEntry4 = mockAuditLogProvider.lastAuthenticationEntry();
        Assertions.assertFalse(lastAuthenticationEntry4.principal().isPresent());
        Assertions.assertTrue(lastAuthenticationEntry4.getScope().toString().contains("lkc123"));
        Assertions.assertFalse(lastAuthenticationEntry4.getScope().toString().contains("lkc-1234"));
    }
}
