package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.link.integration.MultiTenantCLDefaultDataPolicyTest;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.DefaultDataPolicyContext;
import io.confluent.kafka.server.plugins.auth.DefaultDataPolicyValidationMode;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import io.confluent.security.auth.store.data.JwtIssuerKey;
import io.confluent.security.auth.store.data.JwtIssuerValue;
import java.security.interfaces.RSAPublicKey;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:2")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedDefaultDataPolicyAuthIntegrationTest.class */
public class TopicBasedDefaultDataPolicyAuthIntegrationTest extends AbstractTopicBasedPlainSaslAuthIntegrationTest {
    public static final String TEST_WITH_PARAMETERIZED_NAMES = "{displayName}.validationMode={0}";
    public static final String TEST_WITH_PARAMETERIZED_VALIDATION_NETWORK = "{displayName}.quorum={0},validationMode={1},network={2}";
    private static final String NON_CONFLUENT_JWT_ISSUER = "non-confluent";
    private final String orgResourceId = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
    private final Properties adminProperties = new Properties();
    private DefaultDataPolicyValidationMode validationMode;
    private OAuthUtils.JwsContainer jwsContainer;
    private MockBasicAuthStore authStore;
    private String brokerSessionUUID;
    private Properties serverCertStore;
    private SaslMechanism saslMechanism;
    private String denyOrgIds;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/TopicBasedDefaultDataPolicyAuthIntegrationTest$SaslMechanism.class */
    public enum SaslMechanism {
        PLAIN,
        OAUTHBEARER
    }

    public TopicBasedDefaultDataPolicyAuthIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.numBrokers = 1;
        this.brokerSessionUUID = UUID.randomUUID().toString();
        super.setUp(testInfo);
        this.validationMode = DefaultDataPolicyValidationMode.STRICT;
        this.authStore = MockBasicAuthStore.create(this.brokerSessionUUID);
        this.saslMechanism = SaslMechanism.PLAIN;
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        this.authStore.close();
        MockAuditLogProvider.reset();
    }

    private void setUpOAuth() throws Exception {
        this.saslMechanism = SaslMechanism.OAUTHBEARER;
        this.jwsContainer = new OAuthUtils.Builder(100000, NON_CONFLUENT_JWT_ISSUER, "1", this.orgResourceId).jku("https://localhost/keys").withKid(true).build();
        MockTrustCache trustCache = this.authStore.trustCache();
        JsonWebKey rsaJsonWebKey = new RsaJsonWebKey((RSAPublicKey) this.jwsContainer.verificationKey());
        rsaJsonWebKey.setKeyId(this.jwsContainer.getKid());
        trustCache.put(new JwtIssuerKey(NON_CONFLUENT_JWT_ISSUER, (String) null, ""), new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{rsaJsonWebKey})));
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    protected List<String> getInitialTopics() {
        return Arrays.asList("_confluent-apikey", "_confluent-logical_clusters");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    public void startWithTopic() throws Exception {
        super.startWithTopic(Optional.of(Time.SYSTEM), SecurityProtocol.SASL_SSL);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest
    public Properties brokerProps(long j, boolean z, SecurityProtocol securityProtocol) throws Exception {
        Properties brokerProps = super.brokerProps(j, z, securityProtocol);
        brokerProps.put("broker.session.uuid", this.brokerSessionUUID);
        if (this.denyOrgIds != null) {
            brokerProps.put("confluent.cluster.link.intranet.connectivity.denied.org.ids", this.denyOrgIds);
        }
        String str = "default_data_policy_validation_mode=\"" + this.validationMode.name() + "\"";
        if (this.saslMechanism == SaslMechanism.PLAIN) {
            brokerProps.put("listener.name.external.plain.sasl.jaas.config", String.format("io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required %s;", str));
        } else if (this.saslMechanism == SaslMechanism.OAUTHBEARER) {
            brokerProps.putAll(IntegrationTestHarness.defaultOAuthBrokerProps());
            try {
                brokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + str + " publicKeyPath=\"" + this.jwsContainer.getPublicKeyFile().toPath() + "\"authenticator.jwt.config.url=\"" + JarResourceLoader.loadFileFromResourceWithClassLoader(getClass(), "AuthConfigEnhanced.yaml").toPath() + "\";");
                brokerProps.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
                brokerProps.put("listener.name.external.confluent.oauth.flat.networking.verification.enable", "true");
                brokerProps.put("listener.name.external.confluent.require.calling.resource.identity", "true");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.serverCertStore = createSslStores();
        brokerProps.putAll(this.serverCertStore);
        brokerProps.put("listener.name.external.ssl.client.auth", "requested");
        return brokerProps;
    }

    private Properties createSslStores() throws Exception {
        CertStores build = new CertStores.Builder(true).cn(MultiTenantCLDefaultDataPolicyTest.SSL_KAFKA_CN).addHostName(MultiTenantRequestContextTest.LOCALHOST).build();
        Properties properties = new Properties();
        BiConsumer biConsumer = (str, obj) -> {
            if (obj instanceof Password) {
                properties.setProperty(str, ((Password) obj).value());
            } else if (obj instanceof List) {
                properties.setProperty(str, String.join(",", (List) obj));
            } else if (obj != null) {
                properties.setProperty(str, (String) obj);
            }
        };
        build.keyStoreProps().forEach(biConsumer);
        build.trustStoreProps().forEach(biConsumer);
        TestSslUtils.convertToPemWithoutFiles(properties);
        return properties;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    public void testNoDenyOrgIds(String str) throws Exception {
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, true).build());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    public void testNoDenyOrgIdsOAuth(String str) throws Exception {
        setUpOAuth();
        testNoDenyOrgIds(str);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    public void testDenyOrgIds(String str) throws Exception {
        this.denyOrgIds = this.organizationId + ",foo";
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, true).build());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    public void testDenyOrgIdsOAuth(String str) throws Exception {
        setUpOAuth();
        testDenyOrgIds(str);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict", "zk,none", "kraft,strict", "kraft,none"})
    public void testDifferentOrganizationId(String str, String str2) throws Exception {
        this.validationMode = DefaultDataPolicyValidationMode.fromString(str2);
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        if (this.saslMechanism == SaslMechanism.OAUTHBEARER || this.validationMode != DefaultDataPolicyValidationMode.NONE) {
            assertAuthFailure(new DefaultDataPolicyContext.Builder("org-diff", SaslInternalConfigs.NetworkType.PRIVATE, true).build());
        } else {
            assertAuthSuccess(new DefaultDataPolicyContext.Builder("org-diff", SaslInternalConfigs.NetworkType.PRIVATE, true).build());
        }
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_NAMES)
    @CsvSource({"zk,strict", "zk,none", "kraft,strict", "kraft,none"})
    public void testDifferentOrganizationIdOAuth(String str, String str2) throws Exception {
        setUpOAuth();
        testDifferentOrganizationId(str, str2);
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_VALIDATION_NETWORK)
    @CsvSource({"zk,strict,NOT_SET", "zk,strict,PUBLIC", "zk,strict,PRIVATE", "kraft,none,NOT_SET", "kraft,none,PUBLIC", "kraft,none,PRIVATE", "kraft,strict,NOT_SET", "kraft,strict,PUBLIC", "kraft,strict,PRIVATE"})
    public void testNetworkTypes(String str, String str2, String str3) throws Exception {
        this.validationMode = DefaultDataPolicyValidationMode.fromString(str2);
        SaslInternalConfigs.NetworkType fromString = SaslInternalConfigs.NetworkType.fromString(str3);
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        if (this.saslMechanism != SaslMechanism.OAUTHBEARER) {
            if (this.validationMode == DefaultDataPolicyValidationMode.NONE || fromString == SaslInternalConfigs.NetworkType.PRIVATE) {
                assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, fromString, true).build());
                return;
            } else {
                assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, fromString, true).build());
                return;
            }
        }
        if (this.validationMode == DefaultDataPolicyValidationMode.NONE) {
            assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, fromString, true).build());
        } else if (fromString == SaslInternalConfigs.NetworkType.PRIVATE) {
            assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, fromString, true).build());
        } else {
            assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, fromString, true).build());
        }
    }

    @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_VALIDATION_NETWORK)
    @CsvSource({"zk,strict,NOT_SET", "zk,strict,PUBLIC", "zk,strict,PRIVATE", "kraft,none,NOT_SET", "kraft,none,PUBLIC", "kraft,none,PRIVATE", "kraft,strict,NOT_SET", "kraft,strict,PUBLIC", "kraft,strict,PRIVATE"})
    public void testNetworkTypesOAuth(String str, String str2, String str3) throws Exception {
        setUpOAuth();
        testNetworkTypes(str, str2, str3);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClientCertificate(String str) throws Exception {
        this.validationMode = DefaultDataPolicyValidationMode.STRICT;
        startWithTopic();
        loadApiKeys();
        loadLKCMetadata();
        assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, true).build());
        assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, false).build());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testClientCertificateOAuth(String str) throws Exception {
        setUpOAuth();
        testClientCertificate(str);
    }

    private void assertAuthSuccess(DefaultDataPolicyContext defaultDataPolicyContext) throws Exception {
        SaslAuthenticateRequestCallback saslAuthenticateRequestCallback = new SaslAuthenticateRequestCallback(defaultDataPolicyContext);
        AdminClient createSSLAuthAdminClient = createSSLAuthAdminClient(saslAuthenticateRequestCallback, ((Boolean) defaultDataPolicyContext.hasSslPeerCertificate.get()).booleanValue());
        Throwable th = null;
        try {
            try {
                createSSLAuthAdminClient.createTopics(this.sampleTopics).all().get();
                List list = (List) this.sampleTopics.stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toList());
                TestUtils.retryOnExceptionWithTimeout(() -> {
                    Assertions.assertTrue(((Set) createSSLAuthAdminClient.listTopics().names().get()).containsAll(list));
                });
                Assertions.assertTrue(saslAuthenticateRequestCallback.callCount.get() > 0);
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertTrue(lastAuthenticationEvent.principal().isPresent());
                Assertions.assertEquals("User", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getPrincipalType());
                Assertions.assertEquals("1", ((KafkaPrincipal) lastAuthenticationEvent.principal().get()).getName());
                Assertions.assertEquals(AuditEventStatus.SUCCESS, lastAuthenticationEvent.status());
                assertEventInfo(defaultDataPolicyContext, lastAuthenticationEvent);
                Assertions.assertEquals("1", lastAuthenticationEvent.authenticationContext().server().getAuthorizationID());
                if (createSSLAuthAdminClient != null) {
                    if (0 == 0) {
                        createSSLAuthAdminClient.close();
                        return;
                    }
                    try {
                        createSSLAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSSLAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createSSLAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSSLAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private void assertEventInfo(DefaultDataPolicyContext defaultDataPolicyContext, ConfluentAuthenticationEvent confluentAuthenticationEvent) {
        Optional organizationId;
        Optional networkType;
        Optional hasSslPeerCertificate;
        if (this.saslMechanism == SaslMechanism.PLAIN) {
            organizationId = confluentAuthenticationEvent.authenticationContext().server().organizationId();
            networkType = confluentAuthenticationEvent.authenticationContext().server().networkType();
            hasSslPeerCertificate = confluentAuthenticationEvent.authenticationContext().server().hasSslPeerCertificate();
        } else {
            organizationId = confluentAuthenticationEvent.authenticationContext().server().organizationId();
            networkType = confluentAuthenticationEvent.authenticationContext().server().networkType();
            hasSslPeerCertificate = confluentAuthenticationEvent.authenticationContext().server().hasSslPeerCertificate();
        }
        Assertions.assertEquals(defaultDataPolicyContext.organizationId, organizationId);
        Assertions.assertEquals(defaultDataPolicyContext.networkType, networkType);
        Assertions.assertEquals(defaultDataPolicyContext.hasSslPeerCertificate, hasSslPeerCertificate);
    }

    private void assertAuthFailure(DefaultDataPolicyContext defaultDataPolicyContext) throws Exception {
        String str;
        SaslAuthenticateRequestCallback saslAuthenticateRequestCallback = new SaslAuthenticateRequestCallback(defaultDataPolicyContext);
        AdminClient createSSLAuthAdminClient = createSSLAuthAdminClient(saslAuthenticateRequestCallback, ((Boolean) defaultDataPolicyContext.hasSslPeerCertificate.get()).booleanValue());
        Throwable th = null;
        try {
            try {
                TestUtils.assertFutureError(createSSLAuthAdminClient.createTopics(this.sampleTopics).all(), SaslAuthenticationException.class);
                Assertions.assertTrue(saslAuthenticateRequestCallback.callCount.get() > 0);
                ConfluentAuthenticationEvent lastAuthenticationEvent = getLastAuthenticationEvent();
                Assertions.assertFalse(lastAuthenticationEvent.principal().isPresent());
                if (this.saslMechanism == SaslMechanism.OAUTHBEARER && this.validationMode == DefaultDataPolicyValidationMode.NONE) {
                    Assertions.assertEquals(AuditEventStatus.UNKNOWN_USER_DENIED, lastAuthenticationEvent.status());
                    str = "";
                } else {
                    Assertions.assertEquals(AuditEventStatus.UNAUTHENTICATED, lastAuthenticationEvent.status());
                    str = "OrganizationId: " + ((String) defaultDataPolicyContext.organizationId.get()) + ", NetworkType: " + ((SaslInternalConfigs.NetworkType) defaultDataPolicyContext.networkType.get()).name() + ", HasPeerCertificate: " + defaultDataPolicyContext.hasSslPeerCertificate.get() + " isn't allowed to communicate";
                }
                Assertions.assertTrue(lastAuthenticationEvent.authenticationException().isPresent());
                assertEventInfo(defaultDataPolicyContext, lastAuthenticationEvent);
                AuthenticationErrorInfo errorInfo = ((AuthenticationException) lastAuthenticationEvent.authenticationException().get()).errorInfo();
                Assertions.assertTrue(errorInfo.errorMessage().contains(str), errorInfo.errorMessage());
                if (createSSLAuthAdminClient != null) {
                    if (0 == 0) {
                        createSSLAuthAdminClient.close();
                        return;
                    }
                    try {
                        createSSLAuthAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSSLAuthAdminClient != null) {
                if (th != null) {
                    try {
                        createSSLAuthAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSSLAuthAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private AdminClient createSSLAuthAdminClient(SaslAuthenticateRequestCallback saslAuthenticateRequestCallback, boolean z) {
        Properties properties = new Properties();
        if (z) {
            properties.putAll(this.serverCertStore);
        } else {
            properties.put("ssl.truststore.certificates", this.serverCertStore.get("ssl.truststore.certificates"));
            properties.put("ssl.truststore.type", this.serverCertStore.get("ssl.truststore.type"));
        }
        if (this.saslMechanism == SaslMechanism.PLAIN) {
            return this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), (Map<String, Object>) null, properties, saslAuthenticateRequestCallback);
        }
        Assertions.assertEquals(SaslMechanism.OAUTHBEARER, this.saslMechanism);
        Properties properties2 = new Properties();
        properties2.putAll(this.adminProperties);
        properties2.putAll(properties);
        return this.testHarness.createSSLOAuthAdminClient(IntegrationTestHarness.clientOAuthJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), properties2, saslAuthenticateRequestCallback, (ProxyProtocolEngineFactory) null);
    }
}
