/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.quota;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.quota.MultiTenantQuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import kafka.coordinator.quota.QuotaEntity;
import kafka.server.ClientQuotaManager;
import kafka.server.FlexFanoutQuotaManager;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.server.ReplicationQuotaManager;
import kafka.utils.TestUtils;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.network.Session;
import org.apache.kafka.server.config.FlexFanoutMode;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Option;
import scala.collection.IterableOnce;
import scala.collection.immutable.Map;
import scala.jdk.CollectionConverters;

public class FlexFanoutQuotaManagerTest {
    private final Time time = new MockTime();
    private Metrics metrics;
    private QuotaFactory.QuotaManagers quotaManagers;
    private ClientQuotaManager produce;
    private ClientQuotaManager fetch;
    private ReplicationQuotaManager leaderQuotaManager;
    private ReplicationQuotaManager followerQuotaManager;
    private FlexFanoutQuotaManager flexFanoutManager;
    private KafkaConfig config;
    private KafkaScheduler scheduler;
    private static final Long MAX_REPLICATION_THROTTLED_RATE = 5L;
    private static final Long MAX_BROKER_PRODUCER_CAPACITY = 100L;
    private static final Long MAX_BROKER_CONSUMER_CAPACITY = 300L;
    private static final Long MIN_BROKER_PRODUCER_QUOTA = 10L;
    private static final Long MIN_BROKER_CONSUMER_QUOTA = 10L;
    private static final String TENANT_1 = "tenant1";
    private static final String TENANT_2 = "tenant2";
    private static final Long TENANT_1_PRODUCE_BYTE_RATE = 500L;
    private static final Long TENANT_1_CONSUME_BYTE_RATE = 1000L;
    private static final Long TENANT_2_PRODUCE_BYTE_RATE = 10L;
    private static final Long TENANT_2_CONSUME_BYTE_RATE = 20L;
    private static final Long FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH = 620L;
    private static final Long FLEXIBLE_FANOUT_BROKER_STORAGE_BANDWIDTH = 512L;
    private static final Double FLEXIBLE_FANOUT_BROKER_MIN_PRODUCER_PERCENTAGE = 10.0;
    private static final Long FLEXIBLE_FANOUT_BROKER_MAX_PRODUCE_BANDWIDTH = 500L;
    private static final Long FLEXIBLE_FANOUT_BROKER_MAX_FETCH_BANDWIDTH = 1500L;

    @BeforeEach
    public void setup() {
        this.metrics = new Metrics(this.time);
        this.configure(this.quotaProps());
    }

    private void configure(Properties props) {
        this.quotaManagers = this.createQuotaManagers(props);
        this.produce = this.quotaManagers.produce();
        this.fetch = this.quotaManagers.fetch();
        this.leaderQuotaManager = this.quotaManagers.leader();
        this.followerQuotaManager = this.quotaManagers.follower();
        this.scheduler = new KafkaScheduler(1);
        this.scheduler.startup();
        this.flexFanoutManager = new FlexFanoutQuotaManager(FlexFanoutQuotaManager.flexFanoutManagerConfig((KafkaConfig)this.config), this.metrics, (Scheduler)this.scheduler, this.quotaManagers.clientQuotaCallback(), this.quotaManagers);
        this.flexFanoutManager.start();
        this.createTenantQuotas();
    }

    @AfterEach
    public void tearDown() {
        this.quotaManagers.shutdown();
        TenantQuotaCallback.closeAll();
        this.metrics.close();
    }

    private long expectedBrokerProduceLimitByRatio(double ratio) {
        double calculatedProduceLimit = Math.min((double)(FLEXIBLE_FANOUT_BROKER_STORAGE_BANDWIDTH / 3L), (double)FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH.longValue() / (3.0 + ratio));
        return (long)Math.min(Math.max(calculatedProduceLimit, this.flexFanoutManager.getMinProduceLimit()), (double)FLEXIBLE_FANOUT_BROKER_MAX_PRODUCE_BANDWIDTH.longValue());
    }

    @Test
    public void testAutoTuningWithFlexibleFanout() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        long expectedBrokerProduce = this.expectedBrokerProduceLimitByRatio(100.0);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 5.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 500.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)expectedBrokerProduce, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)1.0);
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 3L * expectedBrokerProduce), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)1.0);
        this.time.sleep(2000L);
        expectedBrokerProduce = this.expectedBrokerProduceLimitByRatio(10.0);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 50.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 500.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((float)(3L * expectedBrokerProduce), (float)this.leaderQuotaManager.upperBound(), (float)3.0f);
        Assertions.assertEquals((float)(3L * expectedBrokerProduce), (float)this.followerQuotaManager.upperBound(), (float)3.0f);
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)expectedBrokerProduce, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)1.0);
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 3L * expectedBrokerProduce), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)3.0);
        this.time.sleep(2000L);
        expectedBrokerProduce = this.expectedBrokerProduceLimitByRatio(3.0);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 100.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 300.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((float)(3L * expectedBrokerProduce), (float)this.leaderQuotaManager.upperBound(), (float)1.0f);
        Assertions.assertEquals((float)(3L * expectedBrokerProduce), (float)this.followerQuotaManager.upperBound(), (float)1.0f);
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)expectedBrokerProduce, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)1.0);
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 3L * expectedBrokerProduce), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)1.0);
        this.time.sleep(2000L);
        expectedBrokerProduce = this.expectedBrokerProduceLimitByRatio(1.0);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)expectedBrokerProduce, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 3L * expectedBrokerProduce), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.time.sleep(2000L);
        expectedBrokerProduce = this.expectedBrokerProduceLimitByRatio(0.5);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 125.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)expectedBrokerProduce, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 3L * expectedBrokerProduce), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.time.sleep(2000L);
    }

    @Test
    public void testAutoTuningWithFlexibleFanoutAndNoProduce() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 0.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((double)(3.0 * this.flexFanoutManager.getMinProduceLimit()), (double)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((double)(3.0 * this.flexFanoutManager.getMinProduceLimit()), (double)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)this.flexFanoutManager.getMinProduceLimit(), (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)((double)FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH.longValue() - 3.0 * this.flexFanoutManager.getMinProduceLimit()), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)1.0);
    }

    @Test
    public void testAutoTuningWithFlexibleFanoutAndNoFetch() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        long expectedBrokerProduce = this.expectedBrokerProduceLimitByRatio(0.0);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 0.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)(3L * expectedBrokerProduce), (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)expectedBrokerProduce, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 3L * expectedBrokerProduce), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
    }

    @Test
    public void testAutoTuningWithFlexibleFanoutEnabledAndDisabled() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)465L, (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)465L, (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)155.0, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 465L), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound());
        this.time.sleep(2000L);
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(false));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 250.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((Long)MAX_REPLICATION_THROTTLED_RATE, (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((Long)MAX_REPLICATION_THROTTLED_RATE, (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)((long)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((Long)MIN_BROKER_CONSUMER_QUOTA, (long)((long)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
    }

    @Test
    public void testAutoTuningWithFlexibleFanoutOnLazyEvaluation() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 40.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 40.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((Long)MAX_REPLICATION_THROTTLED_RATE, (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((Long)MAX_REPLICATION_THROTTLED_RATE, (long)this.followerQuotaManager.upperBound());
        Assertions.assertEquals((Long)MAX_BROKER_PRODUCER_CAPACITY, (long)((long)this.produce.getBrokerQuotaLimit()));
        Assertions.assertEquals((Long)MAX_BROKER_CONSUMER_CAPACITY, (long)((long)this.fetch.getBrokerQuotaLimit()));
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)((long)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((Long)MIN_BROKER_CONSUMER_QUOTA, (long)((long)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 200.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 200.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)465L, (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)465L, (long)this.followerQuotaManager.upperBound());
        Assertions.assertEquals((long)155L, (long)((long)this.produce.getBrokerQuotaLimit()));
        Assertions.assertEquals((long)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 465L), (long)((long)this.fetch.getBrokerQuotaLimit()));
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((long)155L, (long)((long)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((long)(FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH - 465L), (long)((long)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
        this.time.sleep(2000L);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 40.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 40.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((Long)MAX_REPLICATION_THROTTLED_RATE, (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((Long)MAX_REPLICATION_THROTTLED_RATE, (long)this.followerQuotaManager.upperBound());
        Assertions.assertEquals((Long)MAX_BROKER_PRODUCER_CAPACITY, (long)((long)this.produce.getBrokerQuotaLimit()));
        Assertions.assertEquals((Long)MAX_BROKER_CONSUMER_CAPACITY, (long)((long)this.fetch.getBrokerQuotaLimit()));
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)((long)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((Long)MIN_BROKER_CONSUMER_QUOTA, (long)((long)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound()));
    }

    @Test
    public void testAutoTuningWithFlexibleFanoutEnabledOnOneTenant() {
        MultiTenantPrincipal tenantPrincipal1 = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        MultiTenantPrincipal tenantPrincipal2 = new MultiTenantPrincipal("userB", new TenantMetadata.Builder(TENANT_2, "sa-b").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal1), "", 240.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal1), "", 240.0, this.time.milliseconds());
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal2), "", 10.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal2), "", 10.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)465L, (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)465L, (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)145.0, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal1, "").bound(), (double)1.0);
        Assertions.assertEquals((double)10.0, (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal2, "").bound(), (double)1.0);
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)145.0, (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal1, "").bound(), (double)1.0);
        Assertions.assertEquals((double)10.0, (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal2, "").bound(), (double)1.0);
    }

    @Test
    public void testCleanUpDynamicQuotaOnFlexibleFanoutEnabledTenantWhenBelowLazyEvaluation() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        MultiTenantPrincipal tenantPrincipal2 = new MultiTenantPrincipal("userB", new TenantMetadata.Builder(TENANT_2, "sa-b").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        java.util.Map<String, String> metricTags = Collections.singletonMap("tenant", TENANT_1);
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.PRODUCE, metricTags).longValue());
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.FETCH, metricTags).longValue());
        java.util.Map<String, String> metricTags2 = Collections.singletonMap("tenant", TENANT_2);
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.PRODUCE, metricTags2).longValue());
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.FETCH, metricTags2).longValue());
        HashMap<String, Long> quotas = new HashMap<String, Long>();
        quotas.put("PRODUCE", 200L);
        quotas.put("FETCH", 400L);
        this.flexFanoutManager.quotaCallback().updateDynamicQuotas(Collections.singletonMap(metricTags, quotas));
        this.flexFanoutManager.quotaCallback().updateDynamicQuotas(Collections.singletonMap(metricTags2, quotas));
        Assertions.assertEquals((long)200L, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.PRODUCE, metricTags).longValue());
        Assertions.assertEquals((long)400L, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.FETCH, metricTags).longValue());
        Assertions.assertEquals((long)200L, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.PRODUCE, metricTags2).longValue());
        Assertions.assertEquals((long)400L, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.FETCH, metricTags2).longValue());
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 20.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 40.0, this.time.milliseconds());
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal2), "", 20.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal2), "", 40.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.PRODUCE, metricTags).longValue());
        Assertions.assertEquals((Long)MIN_BROKER_PRODUCER_QUOTA, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.FETCH, metricTags).longValue());
        Assertions.assertEquals((long)200L, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.PRODUCE, metricTags2).longValue());
        Assertions.assertEquals((long)400L, (long)this.flexFanoutManager.quotaCallback().quotaLimit(ClientQuotaType.FETCH, metricTags2).longValue());
    }

    @Test
    public void testTenantQuotaModeEnabled() {
        Properties props = this.quotaProps();
        props.put("confluent.flexible.fanout.mode", FlexFanoutMode.TENANT_QUOTA.name);
        this.configure(props);
        Assertions.assertEquals((int)0, (int)this.scheduler.size());
    }

    @Test
    public void testBrokerQuotaModeEnabled() {
        Properties props = this.quotaProps();
        props.put("confluent.flexible.fanout.mode", FlexFanoutMode.BROKER_QUOTA.name);
        this.configure(props);
        TestUtils.waitUntilTrue(() -> this.scheduler.size() == 1, () -> "Timed out waiting for scheduler size to be 1", (long)15000L, (long)100L);
    }

    @Test
    public void testIsFlexFanoutEnabled() {
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        java.util.Map<String, String> tags1 = java.util.Map.of("tenant", TENANT_1);
        Assertions.assertTrue((boolean)this.flexFanoutManager.isFlexFanoutEnabled(this.toImmutableScalaMap(tags1)), (String)"Expect Flex Fanout to be enabled");
        java.util.Map<String, String> tags2 = java.util.Map.of("tenant", TENANT_1, "user-resource-id", "sa-123");
        Assertions.assertFalse((boolean)this.flexFanoutManager.isFlexFanoutEnabled(this.toImmutableScalaMap(tags2)), (String)"Expect Flex Fanout to be disabled when enabled in metadata but not tenant only tag");
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(false));
        java.util.Map<String, String> tags3 = java.util.Map.of("tenant", TENANT_1);
        Assertions.assertFalse((boolean)this.flexFanoutManager.isFlexFanoutEnabled(this.toImmutableScalaMap(tags3)), (String)"Expect Flex Fanout to be disabled when disabled in metadata");
        java.util.Map<String, String> tags4 = java.util.Map.of("tenant", TENANT_1, "user-resource-id", "sa-123");
        Assertions.assertFalse((boolean)this.flexFanoutManager.isFlexFanoutEnabled(this.toImmutableScalaMap(tags4)), (String)"Expect Flex Fanout to be disabled when disabled in metadata and not tenant only tag");
    }

    @Test
    public void testIsDynamicQuotaComputeByFlexFanout() {
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        QuotaEntity quotaEntity = new QuotaEntity(this.toImmutableScalaMap(java.util.Map.of("tenant", TENANT_1)));
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 100.0, this.time.milliseconds());
        Assertions.assertTrue((boolean)this.flexFanoutManager.isDynamicQuotaComputedByFlexFanout(quotaEntity));
        this.time.sleep(2000L);
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(false));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 100.0, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.flexFanoutManager.isDynamicQuotaComputedByFlexFanout(quotaEntity));
        this.time.sleep(2000L);
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 10.0, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.flexFanoutManager.isDynamicQuotaComputedByFlexFanout(quotaEntity));
        this.time.sleep(2000L);
        Properties props = this.quotaProps();
        props.put("confluent.flexible.fanout.mode", FlexFanoutMode.TENANT_QUOTA.name);
        this.configure(props);
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 100.0, this.time.milliseconds());
        Assertions.assertFalse((boolean)this.flexFanoutManager.isDynamicQuotaComputedByFlexFanout(quotaEntity));
    }

    @Test
    public void testMaxBrokerProduceAndFetchInFlexibleFanout() {
        Long brokerMaxProduceLimit = 50L;
        Long brokerMaxFetchLimit = 150L;
        Properties props = this.quotaProps();
        props.put("confluent.flexible.fanout.broker.max.produce.bytes.per.second", brokerMaxProduceLimit.toString());
        props.put("confluent.flexible.fanout.broker.max.fetch.bytes.per.second", brokerMaxFetchLimit.toString());
        this.configure(props);
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder(TENANT_1, "sa-a").build());
        this.flexFanoutManager.quotaCallback().updateTenantFlexFanoutEnabled(TENANT_1, Boolean.valueOf(true));
        this.produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 100.0, this.time.milliseconds());
        this.fetch.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 200.0, this.time.milliseconds());
        this.flexFanoutManager.updateBrokerLimitWithFlexFanout();
        Assertions.assertEquals((long)(3L * brokerMaxProduceLimit), (long)this.leaderQuotaManager.upperBound());
        Assertions.assertEquals((long)(3L * brokerMaxProduceLimit), (long)this.followerQuotaManager.upperBound());
        this.produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)brokerMaxProduceLimit.longValue(), (double)this.produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)0.0);
        this.fetch.maybeAutoTuneQuota();
        Assertions.assertEquals((double)brokerMaxFetchLimit.longValue(), (double)this.fetch.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)0.0);
    }

    private void createTenantQuotas() {
        HashMap<String, MultiTenantQuotaConfig> tenantQuotas = new HashMap<String, MultiTenantQuotaConfig>();
        tenantQuotas.put(TENANT_1, this.quotaConfig(TENANT_1_PRODUCE_BYTE_RATE, TENANT_1_CONSUME_BYTE_RATE, 300.0, 200.0));
        tenantQuotas.put(TENANT_2, this.quotaConfig(TENANT_2_PRODUCE_BYTE_RATE, TENANT_2_CONSUME_BYTE_RATE, 300.0, 200.0));
        TenantQuotaCallback.updateQuotas(tenantQuotas, (MultiTenantQuotaConfig)MultiTenantQuotaConfig.UNLIMITED_QUOTA);
    }

    private Properties quotaProps() {
        Properties props = new Properties();
        props.setProperty("process.roles", "broker");
        props.setProperty("controller.listener.names", "SSL");
        props.setProperty("broker.id", "1");
        props.setProperty("controller.quorum.voters", "2@localhost:9093");
        props.put("throughput.quota.window.num", String.valueOf(2));
        props.put("leader.replication.throttled.rate", MAX_REPLICATION_THROTTLED_RATE.toString());
        props.put("follower.replication.throttled.rate", MAX_REPLICATION_THROTTLED_RATE.toString());
        props.put("confluent.flexible.fanout.enabled", String.valueOf(true));
        props.put("confluent.flexible.fanout.mode", FlexFanoutMode.BROKER_QUOTA.name);
        props.put("confluent.flexible.fanout.broker.network.out.bytes.per.second", FLEXIBLE_FANOUT_BROKER_NETWORK_OUT_BANDWIDTH.toString());
        props.put("confluent.flexible.fanout.broker.storage.bytes.per.second", FLEXIBLE_FANOUT_BROKER_STORAGE_BANDWIDTH.toString());
        props.put("confluent.flexible.fanout.broker.min.producer.percentage", FLEXIBLE_FANOUT_BROKER_MIN_PRODUCER_PERCENTAGE.toString());
        props.put("confluent.flexible.fanout.broker.max.produce.bytes.per.second", FLEXIBLE_FANOUT_BROKER_MAX_PRODUCE_BANDWIDTH.toString());
        props.put("confluent.flexible.fanout.broker.max.fetch.bytes.per.second", FLEXIBLE_FANOUT_BROKER_MAX_FETCH_BANDWIDTH.toString());
        props.putAll(this.quotaCallbackProps());
        return props;
    }

    private QuotaFactory.QuotaManagers createQuotaManagers(Properties props) {
        this.config = KafkaConfig.fromProps((Properties)props);
        return QuotaFactory.instantiate((KafkaConfig)this.config, (Metrics)this.metrics, (Time)this.time, (String)"", (Option)Option.empty(), (Option)Option.empty());
    }

    private java.util.Map<String, Object> quotaCallbackProps() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
        configs.put("confluent.broker.limit.producer.bytes.per.second", MAX_BROKER_PRODUCER_CAPACITY.toString());
        configs.put("confluent.broker.limit.consumer.bytes.per.second", MAX_BROKER_CONSUMER_CAPACITY.toString());
        configs.put("confluent.quota.tenant.follower.broker.min.producer.rate", MIN_BROKER_PRODUCER_QUOTA.toString());
        configs.put("confluent.quota.tenant.follower.broker.min.consumer.rate", MIN_BROKER_CONSUMER_QUOTA.toString());
        configs.put(KafkaConfig.DynamicQuotaEnabledProp(), String.valueOf(true));
        return configs;
    }

    private MultiTenantQuotaConfig quotaConfig(long producerByteRate, long consumerByteRate, double requestPercentage, double linkRequestPercentage) {
        return new MultiTenantQuotaConfig(Long.valueOf(producerByteRate), Long.valueOf(consumerByteRate), Double.valueOf(requestPercentage), null, null, Double.valueOf(linkRequestPercentage), MultiTenantQuotaConfig.UNLIMITED_QUOTA);
    }

    private Session createSession(KafkaPrincipal principal) {
        return new Session(principal, null);
    }

    private <T> Map<T, T> toImmutableScalaMap(java.util.Map<T, T> map) {
        return Map.from((IterableOnce)CollectionConverters.MapHasAsScala(map).asScala());
    }
}

