package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedPlainSaslAuthIntegrationTest.class */
public class TopicBasedPlainSaslAuthIntegrationTest {
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;
    private static final String JSON_WITH_MULTIPLE_KEYS = String.join("\n", "{", "  \"keys\": {", "    \"key1\": {", "      \"user_id\": \"user1\",", "      \"logical_cluster_id\": \"myCluster\",", "      \"sasl_mechanism\": \"PLAIN\",", "      \"hashed_secret\": \"no hash\",", "      \"hash_function\": \"none\"", "    },", "    \"key2\": {", "      \"user_id\": \"user2\",", "      \"logical_cluster_id\": \"myCluster2\",", "      \"sasl_mechanism\": \"SSL\",", "      \"hashed_secret\": \"no hash\",", "      \"hash_function\": \"none\"", "    }", "  }", "}");
    private final String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
    private final String serviceUserAPIkey = "APIKEY1";
    private final String serviceUserAPIkeyPassword = "pwd1";
    private final int numBrokers = 3;
    private final String apiKeysTopic = "_confluent-apikey";
    private final String lkcMetadataTopic = "_confluent-logical_clusters";
    private final String testTopic = "abcd";
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, 1));
    private long baseSequenceId = 1000;

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testHarness = new IntegrationTestHarness(testInfo, 3);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private void startWithTopic() throws Exception {
        long millis = 15000 + TimeUnit.SECONDS.toMillis(3L);
        this.physicalCluster = this.testHarness.startWithTopic(Arrays.asList("_confluent-apikey", "_confluent-logical_clusters"), 1, 1, 15000L, brokerProps(millis, true), nodeProps(millis), Optional.empty());
        LogicalClusterUser user = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, 1).user(1);
        this.testHarness.newAclCommand().addTopicAclArgs(user.prefixedKafkaPrincipal(), user.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL).execute();
    }

    private Properties brokerProps(long j, boolean z) throws IOException, InterruptedException {
        Properties properties = new Properties();
        properties.put("listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("advertised.listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT, EXTERNAL:SASL_PLAINTEXT");
        properties.put("inter.broker.listener.name", "INTERNAL");
        properties.putAll(nodeProps(j));
        if (z) {
            setupClusterMetadata(properties);
        }
        return properties;
    }

    private Properties nodeProps(long j) throws IOException {
        Properties properties = new Properties();
        properties.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        properties.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        properties.put("listener.name.external.confluent.security.event.logger.authentication.enable", "true");
        properties.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        properties.put("confluent.security.event.logger.multitenant.enable", "true");
        properties.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required;");
        properties.put("confluent.multitenant.listener.names", "EXTERNAL");
        properties.put("confluent.cdc.api.keys.topic", "_confluent-apikey");
        properties.put("confluent.cdc.api.keys.load.timeout.ms", String.valueOf(j));
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        properties.put("confluent.close.connections.on.credential.delete", "true");
        properties.put("sasl.server.max.receive.size", "1024");
        return properties;
    }

    private void setupClusterMetadata(Properties properties) {
        properties.put("confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters");
        properties.put("confluent.cdc.api.keys.load.timeout.ms", Long.valueOf(15000 + TimeUnit.SECONDS.toMillis(3L)));
        properties.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSuccessfulAuthentication(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        try {
            try {
                createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
                Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll((List) this.sampleTopics.stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList())));
                if (createPlainAuthAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createPlainAuthAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createPlainAuthAdminClient.close();
                    }
                }
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertTrue(lastAuthenticationEvent.principal().isPresent());
                Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getPrincipalType());
                Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getName());
                Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEvent.status());
                Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEvent.principal().get()).toString().contains("tenantMetadata"));
                Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
                Assertions.assertEquals("1", lastAuthenticationEvent.authenticationContext().server().getAuthorizationID());
            } finally {
            }
        } catch (Throwable th3) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidPassword(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "WrongPassword"));
        Throwable th = null;
        try {
            try {
                TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
                if (createPlainAuthAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createPlainAuthAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createPlainAuthAdminClient.close();
                    }
                }
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
                Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEvent.status());
                Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
                Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("organization=my-org2"));
                Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("environment=my-env2"));
                Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("cloud-cluster=lkc-abc"));
                Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
                Assertions.assertEquals(lastAuthenticationEvent.data().get("principal-resource-id"), "u-1");
                AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
                Assertions.assertTrue(errorInfo.errorMessage().contains("Bad password for user APIKEY1"));
                Assertions.assertEquals("APIKEY1", errorInfo.identifier());
                Assertions.assertEquals("lkc-abc", errorInfo.clusterId());
            } finally {
            }
        } catch (Throwable th3) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testUnknownUser(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("UnknownUser", "WrongPassword"));
        Throwable th = null;
        try {
            try {
                TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
                if (createPlainAuthAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createPlainAuthAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createPlainAuthAdminClient.close();
                    }
                }
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
                Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEvent.status());
                Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
                AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
                Assertions.assertTrue(errorInfo.errorMessage().contains("Unknown user UnknownUser"));
                Assertions.assertEquals("UnknownUser", errorInfo.identifier());
            } finally {
            }
        } catch (Throwable th3) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNoSecretsInTopic(String str) throws Exception {
        startWithTopic();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        Throwable th = null;
        try {
            try {
                TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
                if (createPlainAuthAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createPlainAuthAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createPlainAuthAdminClient.close();
                    }
                }
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
                Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEvent.status());
                Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
                Assertions.assertTrue(((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo().errorMessage().contains("Unknown user"));
            } finally {
            }
        } catch (Throwable th3) {
            if (createPlainAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createPlainAuthAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createPlainAuthAdminClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testFailureToConnectWithNoTopic(String str) {
        long j = 3000;
        Exception exc = (Exception) Assertions.assertThrows(RuntimeException.class, () -> {
            this.testHarness.start(brokerProps(j, false));
        }, "No topic should cause the broker to fail to start");
        Assertions.assertEquals("Received a fatal error while waiting for all of the authorizer futures to be completed.", exc.getMessage());
        String str2 = exc.getCause() == null ? "(cause is null)" : (String) Optional.ofNullable(exc.getCause().getMessage()).orElse("(null)");
        Assertions.assertTrue(str2.contains("Unable to start the consumer for MultiTenantSaslSecretsStore"), "Unexpected value of exception cause message: " + str2);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testApiKeyDelete(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
        Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll((List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())));
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", null, true);
        TestUtils.waitForCondition(() -> {
            try {
                createPlainAuthAdminClient.listTopics().listings().get(15L, TimeUnit.SECONDS);
                return false;
            } catch (Exception e) {
                return true;
            }
        }, "Connection for deleted API key not terminated");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidSecretsHandledGracefully(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
        Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll((List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())));
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "some_key", JSON_WITH_MULTIPLE_KEYS, false);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "some_other_key", "NOT A JSON MESSAGE", false);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", null, true);
        TestUtils.waitForCondition(() -> {
            try {
                createPlainAuthAdminClient.listTopics().listings().get(15L, TimeUnit.SECONDS);
                return false;
            } catch (Exception e) {
                return true;
            }
        }, "Connection for deleted API key not terminated");
    }

    private void loadApiKeys() throws Exception {
        try {
            this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", org.apache.kafka.common.utils.Utils.readFullyToString((BufferedInputStream) FileBasedPlainSaslAuthenticatorTest.class.getResource("/file_auth_test_apikeys.json").getContent()), true);
        } catch (IOException e) {
            throw new Exception("Couldn't read apikeys content");
        }
    }

    private void loadLKCMetadata() throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", this.baseSequenceId + 1, logicalClusterId, Utils.LC_META_ABC);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    private ConfluentAuthenticationEvent getLastAuthenticationEvent() throws Exception {
        ConfluentAuthenticationEvent lastAuthenticationEntry;
        for (int i = 0; i < 3; i++) {
            try {
                lastAuthenticationEntry = MockAuditLogProvider.getInstance("uuid" + i).lastAuthenticationEntry();
            } catch (Exception e) {
            }
            if (lastAuthenticationEntry != null) {
                return lastAuthenticationEntry;
            }
        }
        throw new Exception("Couldn't find the last authentication event");
    }
}
