/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.FileBasedPlainSaslAuthenticatorTest;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.admin.AclCommand;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecretsLogFailedException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

@Tag(value="integration")
public class TopicBasedPlainSaslAuthIntegrationTest {
    private final long topicCreateTimeout = 15000L;
    private final String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
    private final String serviceUserAPIkey = "APIKEY1";
    private final String serviceUserAPIkeyPassword = "pwd1";
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;
    private final int numBrokers = 3;
    private final String apiKeysTopic = "_confluent-apikey";
    private final String testTopic = "abcd";
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd", 3, 1));
    private static final String JSON_WITH_MULTIPLE_KEYS = String.join((CharSequence)"\n", "{", "  \"keys\": {", "    \"key1\": {", "      \"user_id\": \"user1\",", "      \"logical_cluster_id\": \"myCluster\",", "      \"sasl_mechanism\": \"PLAIN\",", "      \"hashed_secret\": \"no hash\",", "      \"hash_function\": \"none\"", "    },", "    \"key2\": {", "      \"user_id\": \"user2\",", "      \"logical_cluster_id\": \"myCluster2\",", "      \"sasl_mechanism\": \"SSL\",", "      \"hashed_secret\": \"no hash\",", "      \"hash_function\": \"none\"", "    }", "  }", "}");

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testHarness = new IntegrationTestHarness(testInfo, 3);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    private void startWithTopic() throws Exception {
        long topicLoadTimeoutMs = 15000L + TimeUnit.SECONDS.toMillis(3L);
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-apikey", 1, 1, 15000L, this.brokerProps(topicLoadTimeoutMs));
        int serviceUserId = 1;
        LogicalCluster logicalCluster = this.physicalCluster.createLogicalCluster(this.logicalClusterId, 100, serviceUserId);
        LogicalClusterUser serviceUser = logicalCluster.user(serviceUserId);
        AclCommand.main((String[])SecurityTestUtils.addTopicAclArgs(this.testHarness.zkConnect(), serviceUser.prefixedKafkaPrincipal(), serviceUser.withPrefix("abcd"), AclOperation.ALL, PatternType.LITERAL));
    }

    private Properties brokerProps(long topicLoadTimeoutMs) throws IOException {
        Properties props = new Properties();
        props.put("listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        props.put("advertised.listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        props.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT, EXTERNAL:SASL_PLAINTEXT");
        props.put("inter.broker.listener.name", "INTERNAL");
        props.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        props.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        props.put("listener.name.external.confluent.security.event.logger.authentication.enable", "true");
        props.put(KafkaConfig$.MODULE$.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.security.event.logger.multitenant.enable", "true");
        props.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required;");
        props.put("confluent.multitenant.listener.names", "EXTERNAL");
        props.put("confluent.cdc.api.keys.topic", "_confluent-apikey");
        props.put("confluent.cdc.api.keys.load.timeout.ms", String.valueOf(topicLoadTimeoutMs));
        props.put("ce.broker.plugins.test.audit.provider.config", "TEST");
        props.put("confluent.close.connections.on.credential.delete", "true");
        return props;
    }

    @Test
    public void testSuccessfulAuthentication() throws Exception {
        this.startWithTopic();
        this.loadApiKeys();
        try (AdminClient client = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));){
            client.createTopics(this.sampleTopics).all().get();
            List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
            Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics));
        }
        ConfluentAuthenticationEvent authenticationEvent = this.getLastAuthenticationEvent();
        Assertions.assertTrue((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
        Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
        Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
        Assertions.assertFalse((boolean)((KafkaPrincipal)authenticationEvent.principal().get()).toString().contains("tenantMetadata"));
        Assertions.assertTrue((boolean)authenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
        SaslAuthenticationContext authenticationContext = (SaslAuthenticationContext)authenticationEvent.authenticationContext();
        Assertions.assertEquals((Object)"1", (Object)authenticationContext.server().getAuthorizationID());
    }

    @Test
    public void testInvalidPassword() throws Exception {
        this.startWithTopic();
        this.loadApiKeys();
        try (AdminClient client = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "WrongPassword"));){
            KafkaFuture future = client.createTopics(this.sampleTopics).all();
            TestUtils.assertFutureError((Future)future, SaslAuthenticationException.class);
        }
        ConfluentAuthenticationEvent authenticationEvent = this.getLastAuthenticationEvent();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.getScope().toString().contains("kafka-cluster=lkc-abc"));
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains("Bad password for user APIKEY1"));
        Assertions.assertEquals((Object)"APIKEY1", (Object)errorInfo.identifier());
        Assertions.assertEquals((Object)"lkc-abc", (Object)errorInfo.clusterId());
    }

    @Test
    public void testUnknownUser() throws Exception {
        this.startWithTopic();
        this.loadApiKeys();
        try (AdminClient client = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("UnknownUser", "WrongPassword"));){
            KafkaFuture future = client.createTopics(this.sampleTopics).all();
            TestUtils.assertFutureError((Future)future, SaslAuthenticationException.class);
        }
        ConfluentAuthenticationEvent authenticationEvent = this.getLastAuthenticationEvent();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
        Assertions.assertTrue((boolean)errorInfo.errorMessage().contains("Unknown user UnknownUser"));
        Assertions.assertEquals((Object)"UnknownUser", (Object)errorInfo.identifier());
    }

    @Test
    public void testNoSecretsInTopic() throws Exception {
        this.startWithTopic();
        try (AdminClient client = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));){
            KafkaFuture future = client.createTopics(this.sampleTopics).all();
            TestUtils.assertFutureError((Future)future, SaslAuthenticationException.class);
        }
        ConfluentAuthenticationEvent authenticationEvent = this.getLastAuthenticationEvent();
        Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
        Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
        Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
        AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
        Assertions.assertTrue((boolean)authenticationException.errorInfo().errorMessage().contains("Unknown user"));
    }

    @Test
    public void testFailureToConnectWithNoTopic() {
        long topicLoadTimeoutMs = 3000L;
        CompletionException e = (CompletionException)Assertions.assertThrows(CompletionException.class, () -> this.testHarness.start(this.brokerProps(topicLoadTimeoutMs)), (String)"No topic should cause the broker to fail to start");
        Assertions.assertTrue((boolean)(e.getCause() instanceof SecretsLogFailedException));
    }

    @Test
    public void testApiKeyDelete() throws Exception {
        this.startWithTopic();
        this.loadApiKeys();
        AdminClient client = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        client.createTopics(this.sampleTopics).all().get();
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics));
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", null, true);
        TestUtils.waitForCondition(() -> {
            try {
                client.listTopics().listings().get(15L, TimeUnit.SECONDS);
                return false;
            }
            catch (Exception e) {
                return true;
            }
        }, (String)"Connection for deleted API key not terminated");
    }

    @Test
    public void testInvalidSecretsHandledGracefully() throws Exception {
        this.startWithTopic();
        this.loadApiKeys();
        AdminClient client = this.testHarness.createPlainAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"));
        client.createTopics(this.sampleTopics).all().get();
        List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
        Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics));
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "some_key", JSON_WITH_MULTIPLE_KEYS, false);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "some_other_key", "NOT A JSON MESSAGE", false);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", null, true);
        TestUtils.waitForCondition(() -> {
            try {
                client.listTopics().listings().get(15L, TimeUnit.SECONDS);
                return false;
            }
            catch (Exception e) {
                return true;
            }
        }, (String)"Connection for deleted API key not terminated");
    }

    private void loadApiKeys() throws Exception {
        BufferedInputStream path;
        try {
            path = (BufferedInputStream)FileBasedPlainSaslAuthenticatorTest.class.getResource("/file_auth_test_apikeys.json").getContent();
        }
        catch (IOException e) {
            throw new Exception("Couldn't read apikeys content");
        }
        String apiKeys = org.apache.kafka.common.utils.Utils.readFullyToString((InputStream)path);
        this.physicalCluster.kafkaCluster().produceApiKeysData("_confluent-apikey", "APIKEY1", apiKeys, true);
    }

    private ConfluentAuthenticationEvent getLastAuthenticationEvent() throws Exception {
        for (int i = 0; i < 3; ++i) {
            String brokerUuid = "uuid" + i;
            MockAuditLogProvider auditLogProvider = MockAuditLogProvider.getInstance(brokerUuid);
            try {
                ConfluentAuthenticationEvent event = (ConfluentAuthenticationEvent)auditLogProvider.lastAuthenticationEntry();
                if (event == null) continue;
                return event;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        throw new Exception("Couldn't find the last authentication event");
    }
}

