package io.confluent.kafka.server.plugins.auth.oauth;

import com.sun.jna.platform.win32.COM.tlb.imp.TlbConst;
import io.confluent.kafka.common.multitenant.oauth.OAuthBearerJwsToken;
import io.confluent.kafka.multitenant.PhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEvent;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.server.audit.AuthenticationEvent;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafka/server/plugins/auth/oauth/OAuthSaslAuthenticatorTest.class */
public class OAuthSaslAuthenticatorTest {
    private Selector selector;
    private NioEchoServer server;
    private OAuthUtils.JwsContainer jwsContainer;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private PhysicalClusterMetadata metadata;
    private Map<String, Object> configs;
    private String brokerUUID;
    private CredentialCache credentialCache;
    private final Path tempDir = TestUtils.tempDirectory().toPath();
    private String allowedCluster = Utils.LC_META_ABC.logicalClusterId();
    private String orgId = Utils.LC_META_ABC.organizationId();
    private TestAuditLogProvider auditLogProvider = new TestAuditLogProvider();
    private Time time = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/oauth/OAuthSaslAuthenticatorTest$TestAuditLogProvider.class */
    public static class TestAuditLogProvider implements AuditLogProvider {
        public final List<AuthenticationEvent> authenticationEvents;

        private TestAuditLogProvider() {
            this.authenticationEvents = new ArrayList();
        }

        @Override // org.apache.kafka.server.audit.AuditLogProvider
        public boolean providerConfigured(Map<String, ?> map) {
            return false;
        }

        @Override // org.apache.kafka.server.audit.AuditLogProvider
        public void logEvent(AuditEvent auditEvent) {
            this.authenticationEvents.add((AuthenticationEvent) auditEvent);
        }

        @Override // org.apache.kafka.server.audit.AuditLogProvider
        public void setSanitizer(UnaryOperator<AuditEvent> unaryOperator) {
        }

        @Override // org.apache.kafka.server.audit.AuditLogProvider
        public boolean usesMetadataFromThisKafkaCluster() {
            return false;
        }

        @Override // org.apache.kafka.server.audit.AuditLogProvider
        public void close(String str) throws Exception {
            close();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
        }

        @Override // org.apache.kafka.common.Reconfigurable
        public Set<String> reconfigurableConfigs() {
            return null;
        }

        @Override // org.apache.kafka.common.Reconfigurable
        public void validateReconfiguration(Map<String, ?> map) throws ConfigException {
        }

        @Override // org.apache.kafka.common.Reconfigurable
        public void reconfigure(Map<String, ?> map) {
        }

        @Override // org.apache.kafka.common.Configurable
        public void configure(Map<String, ?> map) {
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.time = Time.SYSTEM;
        LoginManager.closeAll();
        CertStores certStores = new CertStores(true, "localhost");
        CertStores certStores2 = new CertStores(false, "localhost");
        this.saslServerConfigs = certStores.getTrustingConfig(certStores2);
        this.saslClientConfigs = certStores2.getTrustingConfig(certStores);
        this.credentialCache = new CredentialCache();
        setUpMetadata();
    }

    private void setUpMetadata() throws IOException, InterruptedException {
        this.brokerUUID = "uuid";
        this.configs = new HashMap();
        this.configs.put(ConfluentConfigs.BROKER_SESSION_ID_PROP, this.brokerUUID);
        this.saslServerConfigs.put(ConfluentConfigs.BROKER_SESSION_ID_PROP, this.brokerUUID);
        this.configs.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG, this.tempDir.toRealPath(new LinkOption[0]).toString());
        this.metadata = Utils.initiatePhysicalClusterMetadata(this.configs);
        Utils.createLogicalClusterFile(Utils.LC_META_ABC, this.tempDir);
        TestUtils.waitForCondition(() -> {
            return this.metadata.metadata(Utils.LC_META_ABC.logicalClusterId()) != null;
        }, "Expected metadata of new logical cluster to be present in metadata cache");
    }

    @AfterEach
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
        this.metadata.close(this.brokerUUID);
    }

    @Test
    public void testValidTokenAuthorizes() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = new OAuthUtils.Builder(100000, "Confluent", "Confluent", this.orgId).build();
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), this.allowedCluster);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnection(securityProtocol, TlbConst.TYPELIB_MINOR_VERSION_SHELL);
        this.server.verifyAuthenticationMetrics(1, 0);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals(AuditEventStatus.SUCCESS, authenticationEvent.status());
        Assertions.assertEquals("Confluent", authenticationEvent.principal().get().getName());
    }

    @Test
    public void testInvalidIssuerFailsAuthorization() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = new OAuthUtils.Builder(100000, "SomebodyElse", "Confluent", this.orgId).build();
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), this.allowedCluster);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, TlbConst.TYPELIB_MINOR_VERSION_SHELL);
        this.server.verifyAuthenticationMetrics(0, 1);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, authenticationEvent.status());
        Assertions.assertFalse(authenticationEvent.principal().isPresent());
    }

    @Test
    public void testPublicKeyFailsAuthorization() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.jwsContainer = new OAuthUtils.Builder(100000, "SomebodyElse", "Confluent", this.orgId).build();
        OAuthUtils.writePemFile(this.jwsContainer.getPublicKeyFile(), OAuthUtils.generateKeyPair().getPublic());
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), this.allowedCluster);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, TlbConst.TYPELIB_MINOR_VERSION_SHELL);
        this.server.verifyAuthenticationMetrics(0, 1);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, authenticationEvent.status());
        Assertions.assertFalse(authenticationEvent.principal().isPresent());
    }

    @Test
    public void testTokenWhenLogicalClusterIsNotHostedOnBroker() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        String[] strArr = {"cp10"};
        this.jwsContainer = new OAuthUtils.Builder(100000, "Confluent", "Confluent", this.orgId).build();
        configureMechanisms("OAUTHBEARER", Collections.singletonList("OAUTHBEARER"), strArr[0]);
        this.server = createEchoServer(securityProtocol);
        createAndCheckClientConnectionFailure(securityProtocol, TlbConst.TYPELIB_MINOR_VERSION_SHELL);
        this.server.verifyAuthenticationMetrics(0, 1);
        AuthenticationEvent authenticationEvent = this.auditLogProvider.authenticationEvents.get(0);
        Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, authenticationEvent.status());
        Assertions.assertFalse(authenticationEvent.principal().isPresent());
        AuthenticationErrorInfo errorInfo = authenticationEvent.authenticationException().get().errorInfo();
        Assertions.assertEquals("Confluent", errorInfo.identifier());
        Assertions.assertEquals(strArr[0], errorInfo.saslExtensions().get(OAuthBearerJwsToken.OAUTH_NEGOTIATED_LOGICAL_CLUSTER_PROPERTY_KEY));
        Assertions.assertTrue(errorInfo.errorMessage().contains("logical cluster cp10 is not hosted on this broker"));
    }

    private void configureMechanisms(String str, List<String> list, String str2) {
        SecurityTestUtils.attachMechanisms(this.saslClientConfigs, str, this.jwsContainer, str2);
        SecurityTestUtils.attachServerOAuthConfigs(this.saslServerConfigs, list, "listener.name.sasl_ssl", this.jwsContainer);
        TestJaasConfig.createConfiguration(str, list);
    }

    private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        createClientConnection(securityProtocol, str);
        NetworkTestUtils.checkClientConnection(this.selector, str, 100, 10);
        this.selector.close();
        this.selector = null;
    }

    private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String str) throws Exception {
        createClientConnection(securityProtocol, str);
        NetworkTestUtils.waitForChannelClose(this.selector, str, ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
        this.selector = null;
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String str) throws Exception {
        createSelector(securityProtocol, this.saslClientConfigs);
        this.selector.connect(str, new InetSocketAddress("localhost", this.server.port()), 4096, 4096);
    }

    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> map) {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
        this.selector = new Selector(25000L, new Metrics(), this.time, "MetricGroup", ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, new TestSecurityConfig(map), (ListenerName) null, (String) this.saslClientConfigs.get(SaslConfigs.SASL_MECHANISM), this.time, true, new LogContext()), new LogContext());
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache, this.time, Optional.of(this.auditLogProvider));
    }
}
