package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.kafka.server.plugins.auth.DefaultUserMetaDataStore;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/UserMetaDataStoreIntegrationTest.class */
public class UserMetaDataStoreIntegrationTest {
    static final String KEY1 = String.join("\n", "{", "  \"UserResourceId\": \"r1\",", "  \"OrgResourceId\": \"org1\"", "}");
    static final String VALUE1 = String.join("\n", "{", "  \"UserId\": \"u1\",", "  \"ActiveLkcIds\": [\"lkc1\", \"lkc2\"]", "}");
    static final String KEY2 = String.join("\n", "{", "  \"UserResourceId\": \"r2\",", "  \"OrgResourceId\": \"org2\"", "}");
    static final String VALUE2 = String.join("\n", "{", "  \"UserId\": \"u2\",", "  \"ActiveLkcIds\": [\"lkc3\", \"lkc4\"]", "}");
    static final String KEY3 = String.join("\n", "{", "  \"UserResourceId\": \"r3\",", "  \"OrgResourceId\": \"org1\"", "}");
    static final String VALUE3 = String.join("\n", "{", "  \"UserId\": \"u3\",", "  \"ActiveLkcIds\": [\"lkc1\", \"lkc2\"]", "}");
    static final String INVALID_KEY1 = String.join("\n", "{", "  \"UserResourceId\": \"r3\",", "}");
    private IntegrationTestHarness testHarness;
    private PhysicalCluster physicalCluster;
    private final String userMetaDataTopic = "_confluent-user_metadata";
    private int numBrokers = 3;
    private final String apiKeysTopic = "_confluent-apikey";

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.testHarness = new IntegrationTestHarness(testInfo, this.numBrokers);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.testHarness.shutdown();
    }

    protected void startWithTopic() throws Exception {
        long millis = 15000 + TimeUnit.SECONDS.toMillis(3L);
        this.physicalCluster = this.testHarness.startWithTopic(getInitialTopics(), 1, 1, 15000L, brokerProps(millis), nodeProps(millis), Optional.empty());
    }

    protected Properties brokerProps(long j) throws IOException, InterruptedException {
        Properties properties = new Properties();
        properties.put("listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("advertised.listeners", "INTERNAL://localhost:0, EXTERNAL://localhost:0");
        properties.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT, EXTERNAL:SASL_PLAINTEXT");
        properties.put("inter.broker.listener.name", "INTERNAL");
        properties.putAll(nodeProps(j));
        return properties;
    }

    protected Properties nodeProps(long j) throws IOException {
        Properties properties = new Properties();
        properties.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        properties.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        properties.put("listener.name.external.confluent.security.event.logger.authentication.enable", "true");
        properties.put("authorizer.class.name", MultiTenantAuthorizer.class.getName());
        properties.put("confluent.security.event.logger.multitenant.enable", "true");
        properties.put("listener.name.external.plain.sasl.jaas.config", "io.confluent.kafka.server.plugins.auth.TopicBasedLoginModule required;");
        properties.put("confluent.multitenant.listener.names", "EXTERNAL");
        properties.put("confluent.cdc.api.keys.topic", "_confluent-apikey");
        properties.put("confluent.cdc.api.keys.topic.load.timeout.ms", String.valueOf(j));
        properties.put(MockAuditLogProvider.AUDIT_PROVIDER_CONFIG, "TEST");
        properties.put("confluent.close.connections.on.credential.delete", "true");
        properties.put("sasl.server.max.receive.size", "1024");
        properties.put("confluent.cdc.user.metadata.enable", "true");
        return properties;
    }

    protected List<String> getInitialTopics() {
        return Arrays.asList("_confluent-apikey", "_confluent-user_metadata");
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testUserMetaData(String str) throws Exception {
        startWithTopic();
        List<DefaultUserMetaDataStore> userMetaDataStoreInstances = this.physicalCluster.userMetaDataStoreInstances();
        List asList = Arrays.asList(KEY1, KEY2, KEY3, INVALID_KEY1);
        List asList2 = Arrays.asList(VALUE1, VALUE2, VALUE3, VALUE3);
        for (int i = 0; i < asList.size(); i++) {
            Long l = 100L;
            this.physicalCluster.kafkaCluster().produceData("_confluent-user_metadata", l.longValue() + i, (String) asList.get(i), (String) asList2.get(i));
        }
        Long l2 = 100L;
        this.physicalCluster.kafkaCluster().produceData("_confluent-user_metadata", l2.longValue(), KEY1, VALUE2);
        TestUtils.waitForCondition(() -> {
            return userMetaDataStoreInstances.stream().allMatch(defaultUserMetaDataStore -> {
                return defaultUserMetaDataStore.numOfMapping() == 3 && ((String) defaultUserMetaDataStore.userIdToUserResourceId("u1").get()).equals("r1") && TestUtils.getMetricValue(defaultUserMetaDataStore.metrics(), "user-metadata-event-rate") > 0.0d && TestUtils.getMetricValue(defaultUserMetaDataStore.metrics(), "user-metadata-event-failure-rate") > 0.0d && TestUtils.getMetricValue(defaultUserMetaDataStore.metrics(), "user-metadata-count") == 3.0d;
            });
        }, "Expected user metadata stored correctly");
    }
}
