/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.server.ConfigEntityName;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

@Tag(value="integration")
public class MultiTenantUserQuotaIntegrationTest
extends AbstractMultiTenantKafkaIntegrationTest {
    private final String producerClientId = "producer-client";
    private final String consumerClientId = "consumer-client";

    @Override
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
        this.setUp();
        this.createPhysicalAndLogicalClusters();
        this.awaitMetadataPropagation();
    }

    @Override
    protected Properties brokerProps() {
        Properties props = super.brokerProps();
        props.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put("confluent.quota.tenant.user.quotas.enable", String.valueOf(true));
        return props;
    }

    @Override
    protected void createPhysicalAndLogicalClusters(Properties brokerProperties) {
        this.physicalCluster = this.testHarness.start(brokerProperties, Time.SYSTEM);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @Test
    public void testAlterQuotasDisabledForTenants() {
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("producer_byte_rate", 5000.0);
        Exception e = (Exception)Assertions.assertThrows(ExecutionException.class, () -> this.alterClientQuotas((Admin)tenantAdminClient, "u-9", produceQuotaOverride));
        Assertions.assertEquals(ClusterAuthorizationException.class, e.getCause().getClass());
    }

    @Test
    public void testAlterQuotasEnabledForAdminUsers() throws Exception {
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        double quotaOverride = 5000.0;
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("producer_byte_rate", quotaOverride);
        LogicalClusterUser user9 = this.logicalCluster1.user(9);
        this.alterClientQuotas((Admin)internalAdminClient, this.getPrefixedUser(user9), produceQuotaOverride);
        this.verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, this.quotaTags(user9), quotaOverride / 2.0);
    }

    @Test
    public void testSetAndRemoveUserProduceQuota() throws Throwable {
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1.user(9));
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        double quotaOverride = 5000.0;
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("producer_byte_rate", quotaOverride);
        LogicalClusterUser user9 = this.logicalCluster1.user(9);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedUser(user9), produceQuotaOverride);
        this.verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, quotaOverride / 2.0);
        int recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        this.verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user9, true);
        int recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group", recordsProduced);
        Assertions.assertEquals((int)recordsProduced, (int)recordsConsumed);
        this.verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user9, false);
        Map<String, Object> deleteProduceQuotaOverride = Collections.singletonMap("producer_byte_rate", null);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedUser(user9), deleteProduceQuotaOverride);
        this.verifyQuotaUpdate(ClientQuotaType.PRODUCE, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.PRODUCE));
        recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        Assertions.assertEquals((int)2000, (int)recordsProduced);
    }

    @Test
    public void testSetAndRemoveUserFetchQuota() throws Throwable {
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1.user(9));
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        double quotaOverride = 5000.0;
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("consumer_byte_rate", quotaOverride);
        LogicalClusterUser user9 = this.logicalCluster1.user(9);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedUser(user9), produceQuotaOverride);
        this.verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, quotaOverride / 2.0);
        int recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        this.verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user9, false);
        int recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group", recordsProduced);
        Assertions.assertTrue((recordsProduced > recordsConsumed ? 1 : 0) != 0);
        this.verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user9, true);
        Map<String, Object> deleteFetchQuotaOverride = Collections.singletonMap("consumer_byte_rate", null);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedUser(user9), deleteFetchQuotaOverride);
        this.verifyQuotaUpdate(ClientQuotaType.FETCH, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.FETCH));
        recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group-2", recordsProduced);
        Assertions.assertEquals((int)recordsProduced, (int)recordsConsumed);
    }

    @Test
    public void testSetAndRemoveDefaultUserProduceQuota() throws Throwable {
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1.user(9));
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        double quotaOverride = 5000.0;
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("producer_byte_rate", quotaOverride);
        LogicalClusterUser user9 = this.logicalCluster1.user(9);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedDefaultUser(this.logicalCluster1), produceQuotaOverride);
        this.verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, quotaOverride / 2.0);
        int recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        this.verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user9, true);
        int recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group", recordsProduced);
        Assertions.assertEquals((int)recordsProduced, (int)recordsConsumed);
        this.verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user9, false);
        Map<String, Object> deleteProduceQuotaOverride = Collections.singletonMap("producer_byte_rate", null);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedDefaultUser(this.logicalCluster1), deleteProduceQuotaOverride);
        this.verifyQuotaUpdate(ClientQuotaType.PRODUCE, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.PRODUCE));
        recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        Assertions.assertEquals((int)2000, (int)recordsProduced);
    }

    @Test
    public void testSetAndRemoveDefaultUserFetchQuota() throws Throwable {
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1.user(9));
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        double quotaOverride = 5000.0;
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("consumer_byte_rate", quotaOverride);
        LogicalClusterUser user9 = this.logicalCluster1.user(9);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedDefaultUser(this.logicalCluster1), produceQuotaOverride);
        this.verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, quotaOverride / 2.0);
        int recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        this.verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user9, false);
        int recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group", recordsProduced);
        Assertions.assertTrue((recordsProduced > recordsConsumed ? 1 : 0) != 0);
        this.verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user9, true);
        Map<String, Object> deleteFetchQuotaOverride = Collections.singletonMap("consumer_byte_rate", null);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedDefaultUser(this.logicalCluster1), deleteFetchQuotaOverride);
        this.verifyQuotaUpdate(ClientQuotaType.FETCH, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.FETCH));
        recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group-2", recordsProduced);
        Assertions.assertEquals((int)recordsProduced, (int)recordsConsumed);
    }

    @Test
    public void testOverrideDefaultQuota() throws Throwable {
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1.user(9));
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        double quotaOverride = 5000.0;
        Map<String, Double> produceQuotaOverride = Collections.singletonMap("producer_byte_rate", quotaOverride);
        LogicalClusterUser user9 = this.logicalCluster1.user(9);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedDefaultUser(this.logicalCluster1), produceQuotaOverride);
        this.verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, quotaOverride / 2.0);
        int recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        this.verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user9, true);
        int recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group", recordsProduced);
        Assertions.assertEquals((int)recordsProduced, (int)recordsConsumed);
        this.verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user9, false);
        double produceQuotaUpdate = 2097152.0;
        HashMap<String, Double> quotaOverrides = new HashMap<String, Double>();
        quotaOverrides.put("producer_byte_rate", produceQuotaUpdate);
        quotaOverrides.put("consumer_byte_rate", quotaOverride);
        this.alterClientQuotas((Admin)this.physicalCluster.superAdminClient(), this.getPrefixedUser(user9), quotaOverrides);
        this.verifyQuotaUpdate(ClientQuotaType.PRODUCE, quotaTags, produceQuotaUpdate);
        this.verifyQuotaUpdate(ClientQuotaType.FETCH, quotaTags, quotaOverride);
        recordsProduced = this.produceUntilThrottled(user9, "testtopic", 2000);
        Assertions.assertEquals((int)2000, (int)recordsProduced);
        recordsConsumed = this.consumeUntilThrottled(user9, "testtopic", "test-group-2", 2000);
        Assertions.assertTrue((recordsProduced > recordsConsumed ? 1 : 0) != 0);
    }

    private void verifyQuotaUpdate(ClientQuotaType type, Map<String, String> quotaTags, double expectedQuota) throws Exception {
        double perBrokerQuota = expectedQuota == QuotaConfig.UNLIMITED_QUOTA.quota(type) ? expectedQuota : expectedQuota / 2.0;
        this.verifyQuotaCallbackLimit(type, quotaTags, perBrokerQuota);
        this.verifyExpectedTenantQuota(type, quotaTags, perBrokerQuota);
    }

    private void verifyBrokerThrottleMetric(ClientQuotaType quotaType, LogicalClusterUser user, Boolean expectThrottle) throws Exception {
        if (expectThrottle.booleanValue()) {
            this.verifyTenantThrottled(quotaType, this.quotaTags(user));
        } else {
            this.verifyTenantNotThrottled(quotaType, this.quotaTags(user));
        }
    }

    private boolean isConsumerThrottled(KafkaConsumer<?, ?> consumer) {
        Map<String, String> tags = Collections.singletonMap("client-id", "consumer-client");
        Metric avgMetric = (Metric)consumer.metrics().get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags));
        return (Double)avgMetric.metricValue() > 0.0;
    }

    private boolean isProducerThrottled(KafkaProducer<?, ?> producer) {
        Map<String, String> tags = Collections.singletonMap("client-id", "producer-client");
        Metric avgMetric = (Metric)producer.metrics().get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags));
        return (Double)avgMetric.metricValue() > 0.0;
    }

    private int produceUntilThrottled(LogicalClusterUser user, String topic, int numRecords) {
        int minProduce = 42;
        try (KafkaProducer<String, String> producer = this.createProducer(user);){
            boolean throttled;
            int numProduced = 0;
            do {
                String payload = String.valueOf(numProduced);
                Future future = producer.send(new ProducerRecord(topic, null, (Object)payload, (Object)payload), (Callback)new ErrorLoggingCallback(topic, null, null, true));
                ++numProduced;
                do {
                    throttled = this.isProducerThrottled(producer);
                } while (!future.isDone() && !throttled);
            } while (numProduced < 42 || numProduced < numRecords && !throttled);
            int n = numProduced;
            return n;
        }
    }

    private int consumeUntilThrottled(LogicalClusterUser user, String topic, String consumerGroup, int numRecords) {
        try (KafkaConsumer<String, String> consumer = this.createConsumer(user, consumerGroup);){
            boolean throttled;
            long timeoutMs = TimeUnit.MINUTES.toMillis(1L);
            consumer.subscribe(Collections.singleton(topic));
            int numConsumed = 0;
            long startMs = System.currentTimeMillis();
            do {
                throttled = this.isConsumerThrottled(consumer);
            } while ((numConsumed += consumer.poll(Duration.ofMillis(100L)).count()) < numRecords && !throttled && System.currentTimeMillis() < startMs + timeoutMs);
            int n = numConsumed;
            return n;
        }
    }

    private KafkaProducer<String, String> createProducer(LogicalClusterUser user) {
        Properties overrideProps = new Properties();
        overrideProps.put("client.id", "producer-client");
        return KafkaTestUtils.createProducer(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), user.saslJaasConfig(), overrideProps);
    }

    private KafkaConsumer<String, String> createConsumer(LogicalClusterUser user, String consumerGroup) {
        Properties overrideProps = new Properties();
        overrideProps.put("client.id", "consumer-client");
        return KafkaTestUtils.createConsumer(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), user.saslJaasConfig(), consumerGroup, overrideProps);
    }

    private void alterClientQuotas(Admin adminClient, String user, Map<String, Double> quotaAlterations) throws Exception {
        Collection ops = quotaAlterations.entrySet().stream().map(entry -> new ClientQuotaAlteration.Op((String)entry.getKey(), (Double)entry.getValue())).collect(Collectors.toList());
        ClientQuotaEntity quotaEntity = new ClientQuotaEntity(Collections.singletonMap("user", user));
        adminClient.alterClientQuotas(Collections.singletonList(new ClientQuotaAlteration(quotaEntity, ops))).all().get();
    }

    private String getPrefixedUser(LogicalClusterUser user) {
        return user.tenantPrefix() + user.userResourceId();
    }

    private String getPrefixedDefaultUser(LogicalCluster logicalCluster) {
        return logicalCluster.logicalClusterId() + '_' + ConfigEntityName.Default();
    }

    private Map<String, String> quotaTags(LogicalClusterUser user) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", user.logicalClusterId);
        tags.put("user-resource-id", user.userResourceId());
        return tags;
    }
}

