/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantSaslServer;
import io.confluent.kafka.multitenant.integration.test.AbstractTopicBasedPlainSaslAuthIntegrationTest;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.integration.test.SaslAuthenticateRequestCallback;
import io.confluent.kafka.security.audit.event.ConfluentAuthenticationEvent;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.DefaultDataPolicyContext;
import io.confluent.kafka.server.plugins.auth.DefaultDataPolicyValidationMode;
import io.confluent.kafka.server.plugins.auth.oauth.MockBasicAuthStore;
import io.confluent.kafka.server.plugins.auth.oauth.MockTrustCache;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import io.confluent.security.auth.store.data.AuthKey;
import io.confluent.security.auth.store.data.AuthValue;
import io.confluent.security.auth.store.data.JwtIssuerKey;
import io.confluent.security.auth.store.data.JwtIssuerValue;
import java.io.IOException;
import java.nio.file.Path;
import java.security.interfaces.RSAPublicKey;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tags(value={@Tag(value="integration"), @Tag(value="bazel:shard_count:2")})
public class TopicBasedDefaultDataPolicyAuthIntegrationTest
extends AbstractTopicBasedPlainSaslAuthIntegrationTest {
    public static final String TEST_WITH_PARAMETERIZED_NAMES = "{displayName}.validationMode={0}";
    public static final String TEST_WITH_PARAMETERIZED_VALIDATION_NETWORK = "{displayName}.quorum={0},validationMode={1},network={2}";
    private static final String NON_CONFLUENT_JWT_ISSUER = "non-confluent";
    private static final String LKC_CLIENT = "lkc-client";
    private final String orgResourceId = KafkaLogicalClusterUtils.LC_META_ABC.organizationId();
    private final Properties adminProperties = new Properties();
    private DefaultDataPolicyValidationMode validationMode;
    private OAuthUtils.JwsContainer jwsContainer;
    private MockBasicAuthStore authStore;
    private String brokerSessionUUID;
    private Properties serverCertStore;
    private SaslMechanism saslMechanism;
    private String denyOrgIds;

    public TopicBasedDefaultDataPolicyAuthIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.numBrokers = 1;
        this.brokerSessionUUID = UUID.randomUUID().toString();
        super.setUp(testInfo);
        this.validationMode = DefaultDataPolicyValidationMode.STRICT;
        this.authStore = MockBasicAuthStore.create((String)this.brokerSessionUUID);
        this.saslMechanism = SaslMechanism.PLAIN;
    }

    @Override
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        this.authStore.close();
        MockAuditLogProvider.reset();
    }

    private void setUpOAuth() throws Exception {
        this.saslMechanism = SaslMechanism.OAUTHBEARER;
        int serviceUserId = 1;
        String subject = "" + serviceUserId;
        this.jwsContainer = new OAuthUtils.Builder(100000, NON_CONFLUENT_JWT_ISSUER, subject, this.orgResourceId).jku("https://localhost/keys").withKid(true).build();
        MockTrustCache cache = (MockTrustCache)this.authStore.trustCache();
        RsaJsonWebKey key = new RsaJsonWebKey((RSAPublicKey)this.jwsContainer.verificationKey());
        key.setKeyId(this.jwsContainer.getKid());
        cache.put((AuthKey)new JwtIssuerKey(NON_CONFLUENT_JWT_ISSUER, null, ""), (AuthValue)new JwtIssuerValue(new JsonWebKeySet(new JsonWebKey[]{key})));
    }

    @Override
    protected List<String> getInitialTopics() {
        return Arrays.asList("_confluent-apikey", "_confluent-logical_clusters");
    }

    @Override
    protected void startWithTopic() throws Exception {
        super.startWithTopic(Optional.of(Time.SYSTEM), SecurityProtocol.SASL_SSL);
    }

    @Override
    protected Properties nodeProps(long topicLoadTimeoutMs) throws IOException {
        Properties props = super.nodeProps(topicLoadTimeoutMs);
        if (this.saslMechanism == SaslMechanism.OAUTHBEARER) {
            props.put("multitenant.oauth.superuser.disable", "false");
        }
        return props;
    }

    @Override
    protected Properties brokerProps(long topicLoadTimeoutMs, boolean enableLKCMetadata, SecurityProtocol externalListenerSecurityProtocol) throws Exception {
        Properties props = super.brokerProps(topicLoadTimeoutMs, enableLKCMetadata, externalListenerSecurityProtocol);
        props.put("broker.session.uuid", this.brokerSessionUUID);
        if (this.denyOrgIds != null) {
            props.put("confluent.cluster.link.intranet.connectivity.denied.org.ids", this.denyOrgIds);
        }
        String defaultDataPolicyJassConfigEntry = "default_data_policy_validation_mode=\"" + this.validationMode.name() + "\"";
        if (this.saslMechanism == SaslMechanism.PLAIN) {
            props.put("listener.name.external.plain.sasl.jaas.config", String.format("io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required %s;", defaultDataPolicyJassConfigEntry));
        } else if (this.saslMechanism == SaslMechanism.OAUTHBEARER) {
            Path pathToConfigFile;
            props.putAll((Map<?, ?>)IntegrationTestHarness.defaultOAuthBrokerProps());
            try {
                pathToConfigFile = JarResourceLoader.loadFileFromResourceWithClassLoader(this.getClass(), (String)"AuthConfigEnhanced.yaml").toPath();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            props.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + defaultDataPolicyJassConfigEntry + " publicKeyPath=\"" + String.valueOf(this.jwsContainer.getPublicKeyFile().toPath()) + "\"authenticator.jwt.config.url=\"" + String.valueOf(pathToConfigFile) + "\";");
            props.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
            props.put("listener.name.external.confluent.oauth.flat.networking.verification.enable", "true");
            props.put("listener.name.external.confluent.require.calling.resource.identity", "true");
        }
        this.serverCertStore = this.createSslStores();
        props.putAll((Map<?, ?>)this.serverCertStore);
        props.put("listener.name.external.ssl.client.auth", "requested");
        return props;
    }

    private Properties createSslStores() throws Exception {
        CertStores certStores = new CertStores.Builder(true).cn("kafka").addHostName("localhost").build();
        Properties props = new Properties();
        BiConsumer<String, Object> copyEntry = (k, v) -> {
            if (v instanceof Password) {
                props.setProperty((String)k, ((Password)v).value());
            } else if (v instanceof List) {
                List listOfString = (List)v;
                props.setProperty((String)k, String.join((CharSequence)",", listOfString));
            } else if (v != null) {
                props.setProperty((String)k, (String)v);
            }
        };
        certStores.keyStoreProps().forEach(copyEntry);
        certStores.trustStoreProps().forEach(copyEntry);
        TestSslUtils.convertToPemWithoutFiles((Properties)props);
        return props;
    }

    @ParameterizedTest(name="{displayName}.validationMode={0}")
    @ValueSource(strings={"kraft"})
    public void testNoDenyOrgIds(String quorum) throws Exception {
        this.startWithTopic();
        this.loadApiKeys();
        this.loadLKCMetadata();
        this.assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(true)).build());
    }

    @ParameterizedTest(name="{displayName}.validationMode={0}")
    @ValueSource(strings={"kraft"})
    public void testNoDenyOrgIdsOAuth(String quorum) throws Exception {
        this.setUpOAuth();
        this.testNoDenyOrgIds(quorum);
    }

    @ParameterizedTest(name="{displayName}.validationMode={0}")
    @ValueSource(strings={"kraft"})
    public void testDenyOrgIds(String quorum) throws Exception {
        this.denyOrgIds = this.organizationId + ",foo";
        this.startWithTopic();
        this.loadApiKeys();
        this.loadLKCMetadata();
        this.assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(true)).build());
    }

    @ParameterizedTest(name="{displayName}.validationMode={0}")
    @ValueSource(strings={"kraft"})
    public void testDenyOrgIdsOAuth(String quorum) throws Exception {
        this.setUpOAuth();
        this.testDenyOrgIds(quorum);
    }

    @ParameterizedTest(name="{displayName}.validationMode={0}")
    @CsvSource(value={"kraft,strict", "kraft,none"})
    public void testDifferentOrganizationId(String quorum, String validationMode) throws Exception {
        this.validationMode = DefaultDataPolicyValidationMode.fromString((String)validationMode);
        this.startWithTopic();
        this.loadApiKeys();
        this.loadLKCMetadata();
        if (this.saslMechanism != SaslMechanism.OAUTHBEARER && this.validationMode == DefaultDataPolicyValidationMode.NONE) {
            this.assertAuthSuccess(new DefaultDataPolicyContext.Builder("org-diff", SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(true)).build());
        } else {
            this.assertAuthFailure(new DefaultDataPolicyContext.Builder("org-diff", SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(true)).build());
        }
    }

    @ParameterizedTest(name="{displayName}.validationMode={0}")
    @CsvSource(value={"kraft,strict", "kraft,none"})
    public void testDifferentOrganizationIdOAuth(String quorum, String validationMode) throws Exception {
        this.setUpOAuth();
        this.testDifferentOrganizationId(quorum, validationMode);
    }

    @ParameterizedTest(name="{displayName}.quorum={0},validationMode={1},network={2}")
    @CsvSource(value={"kraft,none,NOT_SET", "kraft,none,PUBLIC", "kraft,none,PRIVATE", "kraft,strict,NOT_SET", "kraft,strict,PUBLIC", "kraft,strict,PRIVATE"})
    public void testNetworkTypes(String quorum, String validationMode, String networkTypeStr) throws Exception {
        this.validationMode = DefaultDataPolicyValidationMode.fromString((String)validationMode);
        SaslInternalConfigs.NetworkType networkType = SaslInternalConfigs.NetworkType.fromString((String)networkTypeStr);
        this.startWithTopic();
        this.loadApiKeys();
        this.loadLKCMetadata();
        if (this.saslMechanism == SaslMechanism.OAUTHBEARER) {
            if (this.validationMode == DefaultDataPolicyValidationMode.NONE) {
                this.assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, networkType, Boolean.valueOf(true)).build());
            } else if (networkType == SaslInternalConfigs.NetworkType.PRIVATE) {
                this.assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, networkType, Boolean.valueOf(true)).build());
            } else {
                this.assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, networkType, Boolean.valueOf(true)).build());
            }
        } else if (this.validationMode == DefaultDataPolicyValidationMode.NONE || networkType == SaslInternalConfigs.NetworkType.PRIVATE) {
            this.assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, networkType, Boolean.valueOf(true)).build());
        } else {
            this.assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, networkType, Boolean.valueOf(true)).build());
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0},validationMode={1},network={2}")
    @CsvSource(value={"kraft,none,NOT_SET", "kraft,none,PUBLIC", "kraft,none,PRIVATE", "kraft,strict,NOT_SET", "kraft,strict,PUBLIC", "kraft,strict,PRIVATE"})
    public void testNetworkTypesOAuth(String quorum, String validationMode, String networkTypeStr) throws Exception {
        this.setUpOAuth();
        this.testNetworkTypes(quorum, validationMode, networkTypeStr);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testClientCertificate(String quorum) throws Exception {
        this.validationMode = DefaultDataPolicyValidationMode.STRICT;
        this.startWithTopic();
        this.loadApiKeys();
        this.loadLKCMetadata();
        this.assertAuthSuccess(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(true)).build());
        this.assertAuthFailure(new DefaultDataPolicyContext.Builder(this.organizationId, SaslInternalConfigs.NetworkType.PRIVATE, Boolean.valueOf(false)).build());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"kraft"})
    public void testClientCertificateOAuth(String quorum) throws Exception {
        this.setUpOAuth();
        this.testClientCertificate(quorum);
    }

    private void assertAuthSuccess(DefaultDataPolicyContext policyContext) throws Exception {
        SaslAuthenticateRequestCallback authenticateRequestCallback = new SaslAuthenticateRequestCallback(policyContext, LKC_CLIENT);
        try (AdminClient client = this.createSSLAuthAdminClient(authenticateRequestCallback, (Boolean)policyContext.hasSslPeerCertificate.get());){
            client.createTopics((Collection)this.sampleTopics).all().get();
            List expectedTopics = this.sampleTopics.stream().map(NewTopic::name).collect(Collectors.toList());
            TestUtils.retryOnExceptionWithTimeout(() -> Assertions.assertTrue((boolean)((Set)client.listTopics().names().get()).containsAll(expectedTopics)));
            Assertions.assertTrue((authenticateRequestCallback.callCount.get() > 0 ? 1 : 0) != 0);
            ConfluentAuthenticationEvent authenticationEvent = this.getLastAuthenticationEvent();
            Assertions.assertTrue((boolean)authenticationEvent.principal().isPresent());
            Assertions.assertEquals((Object)"User", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getPrincipalType());
            Assertions.assertEquals((Object)"1", (Object)((KafkaPrincipal)authenticationEvent.principal().get()).getName());
            Assertions.assertEquals((Object)AuditEventStatus.SUCCESS, (Object)authenticationEvent.status());
            this.assertEventInfo(policyContext, authenticationEvent);
            SaslAuthenticationContext authenticationContext = (SaslAuthenticationContext)authenticationEvent.authenticationContext();
            Assertions.assertEquals((Object)"1", (Object)authenticationContext.server().getAuthorizationID());
        }
    }

    private void assertEventInfo(DefaultDataPolicyContext expectedPolicyContext, ConfluentAuthenticationEvent authenticationEvent) {
        Optional clientLogicalClusterId;
        Optional hasSslPeerCertificate;
        Optional networkType;
        Optional organizationId;
        if (this.saslMechanism == SaslMechanism.PLAIN) {
            organizationId = ((MultiTenantSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).organizationId();
            networkType = ((MultiTenantSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).networkType();
            hasSslPeerCertificate = ((MultiTenantSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).hasSslPeerCertificate();
            clientLogicalClusterId = ((MultiTenantSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).clientLogicalClusterId();
        } else {
            organizationId = ((OAuthBearerSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).organizationId();
            networkType = ((OAuthBearerSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).networkType();
            hasSslPeerCertificate = ((OAuthBearerSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).hasSslPeerCertificate();
            clientLogicalClusterId = ((OAuthBearerSaslServer)((SaslAuthenticationContext)authenticationEvent.authenticationContext()).server()).clientLogicalClusterId();
        }
        Assertions.assertEquals((Object)expectedPolicyContext.organizationId, (Object)organizationId);
        Assertions.assertEquals((Object)expectedPolicyContext.networkType, (Object)networkType);
        Assertions.assertEquals((Object)expectedPolicyContext.hasSslPeerCertificate, (Object)hasSslPeerCertificate);
        Assertions.assertEquals(Optional.of(LKC_CLIENT), (Object)clientLogicalClusterId);
    }

    private void assertAuthFailure(DefaultDataPolicyContext expectedPolicyContext) throws Exception {
        SaslAuthenticateRequestCallback authenticateRequestCallback = new SaslAuthenticateRequestCallback(expectedPolicyContext, LKC_CLIENT);
        try (AdminClient client = this.createSSLAuthAdminClient(authenticateRequestCallback, (Boolean)expectedPolicyContext.hasSslPeerCertificate.get());){
            Object expectedMessagePart;
            KafkaFuture future = client.createTopics((Collection)this.sampleTopics).all();
            TestUtils.assertFutureThrows(SaslAuthenticationException.class, (Future)future);
            Assertions.assertTrue((authenticateRequestCallback.callCount.get() > 0 ? 1 : 0) != 0);
            ConfluentAuthenticationEvent authenticationEvent = this.getLastAuthenticationEvent();
            Assertions.assertFalse((boolean)authenticationEvent.principal().isPresent());
            if (this.saslMechanism == SaslMechanism.OAUTHBEARER && this.validationMode == DefaultDataPolicyValidationMode.NONE) {
                Assertions.assertEquals((Object)AuditEventStatus.UNKNOWN_USER_DENIED, (Object)authenticationEvent.status());
                expectedMessagePart = "";
            } else {
                Assertions.assertEquals((Object)AuditEventStatus.UNAUTHENTICATED, (Object)authenticationEvent.status());
                expectedMessagePart = "OrganizationId: " + (String)expectedPolicyContext.organizationId.get() + ", NetworkType: " + ((SaslInternalConfigs.NetworkType)expectedPolicyContext.networkType.get()).name() + ", HasPeerCertificate: " + String.valueOf(expectedPolicyContext.hasSslPeerCertificate.get()) + " is not allowed to communicate";
            }
            Assertions.assertTrue((boolean)authenticationEvent.authenticationException().isPresent());
            this.assertEventInfo(expectedPolicyContext, authenticationEvent);
            AuthenticationException authenticationException = (AuthenticationException)authenticationEvent.authenticationException().get();
            AuthenticationErrorInfo errorInfo = authenticationException.errorInfo();
            Assertions.assertTrue((boolean)errorInfo.errorMessage().contains((CharSequence)expectedMessagePart), (String)errorInfo.errorMessage());
        }
    }

    private AdminClient createSSLAuthAdminClient(SaslAuthenticateRequestCallback authenticateRequestCallback, boolean includeClientCertificate) {
        Properties clientCertStore = new Properties();
        if (includeClientCertificate) {
            clientCertStore.putAll((Map<?, ?>)this.serverCertStore);
        } else {
            clientCertStore.put("ssl.truststore.certificates", this.serverCertStore.get("ssl.truststore.certificates"));
            clientCertStore.put("ssl.truststore.type", this.serverCertStore.get("ssl.truststore.type"));
        }
        if (this.saslMechanism == SaslMechanism.PLAIN) {
            return this.testHarness.createSSLAuthAdminClient(IntegrationTestHarness.clientPlainJaasConfig("APIKEY1", "pwd1"), null, clientCertStore, authenticateRequestCallback);
        }
        Assertions.assertEquals((Object)((Object)SaslMechanism.OAUTHBEARER), (Object)((Object)this.saslMechanism));
        Properties clientProperties = new Properties();
        clientProperties.putAll((Map<?, ?>)this.adminProperties);
        clientProperties.putAll((Map<?, ?>)clientCertStore);
        return this.testHarness.createSSLOAuthAdminClient(IntegrationTestHarness.clientOAuthJaasConfig(this.jwsContainer.getJwsToken(), this.logicalClusterId), clientProperties, authenticateRequestCallback, null);
    }

    private static enum SaslMechanism {
        PLAIN,
        OAUTHBEARER;

    }
}

