package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tags({@Tag("integration"), @Tag("bazel:size:medium")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedPlainSaslAuthIntegrationTest.class */
public class TopicBasedPlainSaslAuthIntegrationTest extends AbstractTopicBasedPlainSaslAuthIntegrationTest {
    protected final String pbkdf2ServiceUserAPIkey = "APIKEY2";
    private static final String JSON_WITH_MULTIPLE_KEYS = String.join("\n", "{", "  \"keys\": {", "    \"key1\": {", "      \"user_id\": \"user1\",", "      \"logical_cluster_id\": \"myCluster\",", "      \"sasl_mechanism\": \"PLAIN\",", "      \"hashed_secret\": \"no hash\",", "      \"hash_function\": \"none\"", "    },", "    \"key2\": {", "      \"user_id\": \"user2\",", "      \"logical_cluster_id\": \"myCluster2\",", "      \"sasl_mechanism\": \"SSL\",", "      \"hashed_secret\": \"no hash\",", "      \"hash_function\": \"none\"", "    }", "  }", "}");

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSuccessfulAuthentication(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        try {
            createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
            List list = (List) this.sampleTopics.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll(list));
            });
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertTrue(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getPrincipalType());
            Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getName());
            Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEvent.status());
            Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEvent.principal().get()).toString().contains("tenantMetadata"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
            Assertions.assertEquals("1", lastAuthenticationEvent.authenticationContext().server().getAuthorizationID());
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidPassword(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "WrongPassword"));
        try {
            TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEvent.status());
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("organization=my-org2"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("environment=my-env2"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("cloud-cluster=lkc-abc"));
            Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
            Assertions.assertEquals(lastAuthenticationEvent.data().get("principal-resource-id"), "u-1");
            AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
            Assertions.assertTrue(errorInfo.errorMessage().contains("Bad password for user APIKEY1"));
            Assertions.assertEquals("APIKEY1", errorInfo.identifier());
            Assertions.assertEquals("lkc-abc", errorInfo.clusterId());
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSuccessfulAuthenticationPbkdf2(String str) throws Exception {
        startWithTopic();
        loadPbkdf2ApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY2", "blah"));
        try {
            createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
            List list = (List) this.sampleTopics.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toList());
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll(list));
            });
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertTrue(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getPrincipalType());
            Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getName());
            Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEvent.status());
            Assertions.assertFalse(((KafkaPrincipal) lastAuthenticationEvent.principal().get()).toString().contains("tenantMetadata"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
            Assertions.assertEquals("1", lastAuthenticationEvent.authenticationContext().server().getAuthorizationID());
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidPbkdf2Password(String str) throws Exception {
        startWithTopic();
        loadPbkdf2ApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY2", "WrongPassword"));
        try {
            TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEvent.status());
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("organization=my-org2"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("environment=my-env2"));
            Assertions.assertTrue(lastAuthenticationEvent.getScope().toString().contains("cloud-cluster=lkc-abc"));
            Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
            Assertions.assertEquals(lastAuthenticationEvent.data().get("principal-resource-id"), "u-1");
            AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
            Assertions.assertTrue(errorInfo.errorMessage().contains("Bad password for user APIKEY2"));
            Assertions.assertEquals("APIKEY2", errorInfo.identifier());
            Assertions.assertEquals("lkc-abc", errorInfo.clusterId());
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testUnknownUser(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("UnknownUser", "WrongPassword"));
        try {
            TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEvent.status());
            Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
            AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
            Assertions.assertTrue(errorInfo.errorMessage().contains("Unknown user UnknownUser"));
            Assertions.assertEquals("UnknownUser", errorInfo.identifier());
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNoSecretsInTopic(String str) throws Exception {
        startWithTopic();
        loadLKCMetadata();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        try {
            TestUtils.assertFutureError(createPlainAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
            if (createPlainAuthAdminClient != null) {
                createPlainAuthAdminClient.close();
            }
            ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
            Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
            Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEvent.status());
            Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
            Assertions.assertTrue(((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo().errorMessage().contains("Unknown user"));
        } catch (Throwable th) {
            if (createPlainAuthAdminClient != null) {
                try {
                    createPlainAuthAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testFailureToConnectWithNoTopic(String str) {
        long j = 3000;
        Exception exc = (Exception) Assertions.assertThrows(RuntimeException.class, () -> {
            this.testHarness.start(brokerProps(j, false, SecurityProtocol.SASL_PLAINTEXT));
        }, "No topic should cause the broker to fail to start");
        Assertions.assertTrue(exc.getMessage().startsWith("Received a fatal error while waiting for all of the authorizer futures to be completed"), "Unexpected error message " + exc.getMessage());
        String str2 = exc.getCause() == null ? "(cause is null)" : (String) Optional.ofNullable(exc.getCause().getMessage()).orElse("(null)");
        Assertions.assertTrue(str2.contains("Unable to start the consumer for MultiTenantSaslSecretsStore"), "Unexpected value of exception cause message: " + str2);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testApiKeyDelete(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", null, true);
        TestUtils.waitForCondition(() -> {
            try {
                createPlainAuthAdminClient.listTopics().listings().get(15L, TimeUnit.SECONDS);
                return false;
            } catch (Exception e) {
                return true;
            }
        }, "Connection for deleted API key not terminated");
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidSecretsHandledGracefully(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        AdminClient createPlainAuthAdminClient = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        createPlainAuthAdminClient.createTopics(this.sampleTopics).all().get();
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Assertions.assertTrue(((Set) createPlainAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "some_key", JSON_WITH_MULTIPLE_KEYS, false);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "some_other_key", "NOT A JSON MESSAGE", false);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", null, true);
        TestUtils.waitForCondition(() -> {
            try {
                createPlainAuthAdminClient.listTopics().listings().get(15L, TimeUnit.SECONDS);
                return false;
            } catch (Exception e) {
                return true;
            }
        }, "Connection for deleted API key not terminated");
    }

    protected void loadPbkdf2ApiKeys() throws Exception {
        try {
            this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY2", Utils.readFullyToString((BufferedInputStream) JarResourceLoader.loadFileFromResource(FileBasedPlainSaslAuthenticatorTest.class, "/pbkdf2_auth_keys.json").toURI().toURL().getContent()), true);
        } catch (IOException e) {
            throw new Exception("Couldn't read apikeys content");
        }
    }
}
