/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.coordinator.quota.QuotaCoordinator;
import kafka.coordinator.quota.QuotaEntity;
import kafka.server.DynamicQuotaChannelManager;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
import scala.collection.mutable.Map;

@Tag(value="integration")
public class MultiTenantQuotaIntegrationTest
extends AbstractMultiTenantKafkaIntegrationTest {
    private final double defaultControllerMutationRateQuota = 100.0;
    private final double defaultProduceQuotaMultiplier = 2.0;

    @Override
    protected Properties brokerProps() {
        Properties props = super.brokerProps();
        props.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        return props;
    }

    @Override
    protected void createPhysicalAndLogicalClusters(Properties brokerProperties) {
        this.physicalCluster = this.testHarness.start(brokerProperties, Time.SYSTEM);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @Test
    public void testDynamicTenantControllerQuota() throws Exception {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put("confluent.quota.tenant.default.controller.mutation.rate", String.valueOf(100.0));
        brokerProps.put("confluent.quota.tenant.produce.multiplier", String.valueOf(2.0));
        this.createPhysicalAndLogicalClusters(brokerProps);
        this.awaitMetadataPropagation();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        java.util.Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1);
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", Optional.of(5), Optional.empty()))).all().get();
        this.verifyExpectedTenantQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 100.0);
        double updatedControllerMutationRateQuota = 200.0;
        this.updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.default.controller.mutation.rate", String.valueOf(updatedControllerMutationRateQuota)));
        this.verifyQuotaCallbackLimit(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, updatedControllerMutationRateQuota);
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", Optional.of(5), Optional.empty()))).all().get();
        this.verifyExpectedTenantQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, updatedControllerMutationRateQuota);
    }

    @Test
    public void testDynamicQuotaMultiplier() throws Throwable {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put("confluent.quota.tenant.default.controller.mutation.rate", String.valueOf(100.0));
        brokerProps.put("confluent.quota.tenant.produce.multiplier", String.valueOf(2.0));
        this.createPhysicalAndLogicalClusters(brokerProps);
        this.awaitMetadataPropagation();
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        java.util.Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1);
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double expectedProduceQuota = 2.0 * (double)Utils.LC_META_1.producerByteRate().longValue() / 2.0;
        double expectedFetchQuota = (double)Utils.LC_META_1.consumerByteRate().longValue() / 2.0;
        this.verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, quotaTags, expectedProduceQuota);
        this.verifyExpectedTenantQuota(ClientQuotaType.FETCH, quotaTags, expectedFetchQuota);
        double updatedFetchMultiplier = 3.0;
        double updatedProduceMultiplier = 1.0;
        this.updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.fetch.multiplier", String.valueOf(updatedFetchMultiplier)));
        this.updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.produce.multiplier", String.valueOf(updatedProduceMultiplier)));
        double updatedProduceQuota = updatedProduceMultiplier * (double)Utils.LC_META_1.producerByteRate().longValue() / 2.0;
        double updatedFetchQuota = updatedFetchMultiplier * (double)Utils.LC_META_1.consumerByteRate().longValue() / 2.0;
        this.verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, updatedProduceQuota);
        this.verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, updatedFetchQuota);
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        this.verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, quotaTags, updatedProduceQuota);
        this.verifyExpectedTenantQuota(ClientQuotaType.FETCH, quotaTags, updatedFetchQuota);
    }

    @Test
    public void testDynamicQuotaPipeline() throws Throwable {
        this.setUp();
        Properties brokerProps = this.brokerProps();
        brokerProps.put(KafkaConfig.DynamicQuotaEnabledProp(), String.valueOf(true));
        brokerProps.put(KafkaConfig.QuotasTopicReplicationFactorProp(), (Object)2);
        brokerProps.put("confluent.quota.dynamic.reporting.interval.ms", (Object)100);
        brokerProps.put("confluent.quota.dynamic.publishing.interval.ms", (Object)2000);
        this.createPhysicalAndLogicalClusters(brokerProps);
        this.awaitMetadataPropagation();
        List channelManagers = this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> (DynamicQuotaChannelManager)kafkaServer.dynamicQuotaChannelManager().get()).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> channelManagers.stream().allMatch(channelManager -> channelManager.getPublishRequestThread().started() && channelManager.getReportRequestThread().started()), (String)"Dynamic quota channel manager should have been started");
        List quotaCoordinators = this.physicalCluster.kafkaCluster().brokers().stream().map(kafkaServer -> (QuotaCoordinator)kafkaServer.quotaCoordinatorOpt().get()).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> quotaCoordinators.stream().allMatch(quotaCoordinator -> quotaCoordinator.isActive().get()), (String)"Quota coordinator should have been started");
        AdminClient tenantAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        java.util.Map<String, String> quotaTags = this.quotaTags(this.logicalCluster1);
        this.addLkcFileAndSyncMetadata(Utils.LC_META_1);
        tenantAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double expectedProduceQuota = (double)Utils.LC_META_1.producerByteRate().longValue() / 2.0;
        double expectedFetchQuota = (double)Utils.LC_META_1.consumerByteRate().longValue() / 2.0;
        this.verifyExpectedTenantQuota(ClientQuotaType.PRODUCE, quotaTags, expectedProduceQuota);
        this.verifyExpectedTenantQuota(ClientQuotaType.FETCH, quotaTags, expectedFetchQuota);
        TestUtils.waitForCondition(() -> channelManagers.stream().anyMatch(channelManager -> channelManager.getReportRequestThread().queuePerNode().nonEmpty()), (String)"Dynamic quota channel manager should have received a Reporting request");
        TestUtils.waitForCondition(() -> quotaCoordinators.stream().anyMatch(quotaCoordinator -> quotaCoordinator.quotaStateManager().getQuota(new QuotaEntity(this.toScalaMap(quotaTags))).nonEmpty()), (String)"Quota coordinator should have recomputed the quota");
        TestUtils.waitForCondition(() -> channelManagers.stream().anyMatch(channelManager -> channelManager.getPublishRequestThread().queuePerNode().nonEmpty()), (String)"Dynamic quota channel manager should send a Publishing request");
        TestUtils.waitForCondition(() -> this.physicalCluster.kafkaCluster().brokers().stream().map(KafkaServer::metrics).allMatch(metric -> !this.verifyTenantMetricQuotaValue((Metrics)metric, ClientQuotaType.PRODUCE, quotaTags, expectedProduceQuota) && !this.verifyTenantMetricQuotaValue((Metrics)metric, ClientQuotaType.FETCH, quotaTags, expectedFetchQuota)), (String)"Produce and fetch quotas should be updated based on their consumption");
    }

    private void updateBrokerConfig(ConfigEntry configToSet) throws Exception {
        AdminClient internalAdminClient = this.physicalCluster.superAdminClient();
        internalAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(configToSet, AlterConfigOp.OpType.SET)))).all().get();
    }

    private java.util.Map<String, String> quotaTags(LogicalCluster logicalCluster) {
        return Collections.singletonMap("tenant", logicalCluster.logicalClusterId());
    }

    private <T> Map<T, T> toScalaMap(java.util.Map<T, T> map) {
        return (Map)JavaConverters.mapAsScalaMapConverter(map).asScala();
    }
}

