package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.clients.plugins.auth.oauth.OAuthBearerLoginCallbackHandler;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.security.auth.oauth.mockserver.common.ServerFailedToConnectException;
import io.confluent.security.auth.oauth.mockserver.server.MockOAuthServer;
import io.confluent.security.auth.oauth.mockserver.server.TokenBuilder;
import io.confluent.security.auth.provider.oauth.EnhancedOAuthBearerValidatorCallbackHandler;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import kafka.test.JarResourceLoader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/JkuOAuthIntegrationTest.class */
public class JkuOAuthIntegrationTest {
    private static final String CONFLUENT_ISSUER = "Confluent";
    private static final String NON_CONFLUENT_ISSUER = "nonConfluent";
    private final String testTopic = "abcd3";
    private final List<NewTopic> sampleTopics = Collections.singletonList(new NewTopic("abcd3", 1, 1));
    private final String lkcMetadataTopic = "_confluent-logical_clusters";
    private final String logicalClusterId = Utils.LC_META_ABC.logicalClusterId();
    private final String orgResourceId = Utils.LC_META_ABC.organizationId();
    private final Properties adminProperties = new Properties();
    private final String adminUserId = "100";
    private IntegrationTestHarness testHarness;
    private OAuthUtils.JwsContainer jwsContainer;
    private final AtomicInteger sequenceId;
    private TestInfo testInfo;
    private PhysicalCluster physicalCluster;
    private MockOAuthServer mockOAuthServer;
    private String token;
    private int authServerPort;

    public JkuOAuthIntegrationTest() {
        this.adminProperties.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        this.adminUserId = "100";
        this.sequenceId = new AtomicInteger();
        this.authServerPort = 61578;
    }

    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) throws Exception {
        this.testInfo = testInfo;
    }

    private void setUp(String str) throws Exception {
        this.testHarness = new IntegrationTestHarness(this.testInfo, 1);
        this.jwsContainer = new OAuthUtils.Builder(0, null, "sub", this.orgResourceId).build();
        this.physicalCluster = this.testHarness.startWithTopic(Arrays.asList("_confluent-logical_clusters"), 1, 1, 15000L, nodeProps(), nodeProps(), Optional.empty());
        this.physicalCluster.createLogicalCluster(this.logicalClusterId, Integer.parseInt("100"), new Integer[0]);
        setUpMockOauthServer(str);
        loadLkcMetadata();
    }

    private void setUpMockOauthServer(String str) throws Exception {
        this.mockOAuthServer = new MockOAuthServer();
        this.mockOAuthServer.startServer(this.authServerPort);
        this.mockOAuthServer.connectToServer();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.logicalClusterId);
        this.token = new TokenBuilder("clientSecret", "clientId", this.mockOAuthServer).setExpiry(100000).setIssuer(str).setSubject("sub_1").addClaims("userResourceId", "UR1").addClaims("userId", "UI1").addClaims("clusters", arrayList).addClaims("orgResourceId", this.orgResourceId).build();
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.testHarness.shutdown();
            this.mockOAuthServer.stopServer();
        } finally {
            KafkaTestUtils.verifyThreadCleanup();
        }
    }

    private Properties nodeProps() {
        Properties defaultOAuthBrokerProps = IntegrationTestHarness.defaultOAuthBrokerProps();
        Path path = null;
        try {
            path = JarResourceLoader.loadFileFromResourceWithClassLoader(getClass(), "AuthConfigJku.yaml").toPath();
        } catch (Exception e) {
            e.printStackTrace();
        }
        defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=\"" + this.jwsContainer.getPublicKeyFile().toPath() + "\"authenticator.jwt.config.url=\"" + path + "\";");
        defaultOAuthBrokerProps.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", EnhancedOAuthBearerValidatorCallbackHandler.class.getName());
        defaultOAuthBrokerProps.put("confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters");
        defaultOAuthBrokerProps.put("multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName());
        return defaultOAuthBrokerProps;
    }

    private String clientJaasConfig(String str, String str2) {
        return "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required token=\"" + str + "\" cluster=\"" + str2 + "\";";
    }

    @Disabled
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testValidDomainAuthenticatesSuccessfully(String str) throws Exception {
        try {
            setUp(CONFLUENT_ISSUER);
        } catch (ServerFailedToConnectException e) {
            Assertions.fail();
        }
        Assertions.assertNotNull(this.token);
        AdminClient createOAuthAdminClient = this.testHarness.createOAuthAdminClient(clientJaasConfig(this.token, this.logicalClusterId), this.adminProperties);
        createOAuthAdminClient.createTopics(this.sampleTopics).all().get();
        Assertions.assertNotNull(this.token);
        List list = (List) this.sampleTopics.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        TestUtils.retryOnExceptionWithTimeout(30000L, () -> {
            Assertions.assertTrue(((Set) createOAuthAdminClient.listTopics().names().get()).containsAll(list));
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidDomainAuthenticatesFailure(String str) throws Exception {
        try {
            setUp(NON_CONFLUENT_ISSUER);
        } catch (ServerFailedToConnectException e) {
            Assertions.fail();
        }
        Assertions.assertNotNull(this.token);
        try {
            this.testHarness.createOAuthAdminClient(clientJaasConfig(this.token, this.logicalClusterId), this.adminProperties).createTopics(this.sampleTopics).all().get();
            Assertions.fail(String.format("Expected admin command to throw a %s", SaslAuthenticationException.class));
        } catch (Exception e2) {
            if (e2.getCause().getClass() != SaslAuthenticationException.class) {
                Assertions.fail(String.format("Expected admin command to throw a %s but it threw a %s", SaslAuthenticationException.class, e2.getCause().getClass()));
            }
        }
    }

    public void loadLkcMetadata() {
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", this.sequenceId.incrementAndGet(), this.logicalClusterId, Utils.LC_META_ABC);
    }
}
