package io.confluent.kafka.multitenant.integration.test;

import com.google.common.collect.ImmutableMap;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.TenantUtils;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.coordinator.quota.QuotaCoordinator;
import kafka.coordinator.quota.QuotaEntity;
import kafka.server.DynamicQuotaChannelManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.jdk.CollectionConverters;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantQuotaIntegrationTest.class */
public class MultiTenantQuotaIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final double defaultControllerMutationRateQuota = 100.0d;
    private final double defaultProducerIdRateQuota = 10.0d;
    private final double defaultProduceQuotaMultiplier = 2.0d;
    private final String topicName = "_confluent-logical_clusters";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties nodeProps() {
        Properties nodeProps = super.nodeProps();
        nodeProps.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
        return nodeProps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        properties.put("confluent.cluster.link.replication.quota.mode", ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND.name());
        try {
            this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, 60000L, properties, properties, Optional.of(Time.SYSTEM));
            this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
            this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    protected void createPhysicalAndLogicalClustersWithControllerProperties(Properties properties, Properties properties2) throws Exception {
        properties.put("confluent.cluster.link.replication.quota.mode", ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND.name());
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, 15000L, properties, properties2, Optional.of(Time.SYSTEM));
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicTenantControllerQuota(String str) throws Exception {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put("confluent.quota.tenant.default.controller.mutation.rate", String.valueOf(100.0d));
        nodeProps.put("confluent.quota.tenant.produce.multiplier", String.valueOf(2.0d));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", Optional.of(5), Optional.empty()))).all().get();
        verifyQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 100.0d);
        updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.default.controller.mutation.rate", String.valueOf(200.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 200.0d);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", Optional.of(5), Optional.empty()))).all().get();
        verifyQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 200.0d);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicTenantProducerIdQuota(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put("confluent.producer.id.quota.manager.enable", String.valueOf(true));
        nodeProps.put("confluent.quota.tenant.default.producer.id.rate", String.valueOf(10.0d));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "test_topic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCER_ID, quotaTags, 10.0d);
        updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.default.producer.id.rate", String.valueOf(20.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCER_ID, quotaTags, 20.0d);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "test_topic2", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCER_ID, quotaTags, 20.0d);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicQuotaMultiplier(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put("confluent.quota.tenant.default.controller.mutation.rate", String.valueOf(100.0d));
        nodeProps.put("confluent.quota.tenant.produce.multiplier", String.valueOf(2.0d));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, (2.0d * KafkaLogicalClusterUtils.LC_META_1.producerByteRate().longValue()) / 2.0d);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, KafkaLogicalClusterUtils.LC_META_1.consumerByteRate().longValue() / 2.0d);
        updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.fetch.multiplier", String.valueOf(3.0d)));
        updateBrokerConfig(new ConfigEntry("confluent.quota.tenant.produce.multiplier", String.valueOf(1.0d)));
        double longValue = (1.0d * KafkaLogicalClusterUtils.LC_META_1.producerByteRate().longValue()) / 2.0d;
        double longValue2 = (3.0d * KafkaLogicalClusterUtils.LC_META_1.consumerByteRate().longValue()) / 2.0d;
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, longValue2);
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicQuotaPipeline(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig.DynamicQuotaEnabledProp(), String.valueOf(true));
        nodeProps.put(KafkaConfig.QuotasTopicReplicationFactorProp(), (short) 2);
        nodeProps.put("confluent.quota.dynamic.reporting.interval.ms", 1000);
        nodeProps.put("confluent.quota.dynamic.publishing.interval.ms", 4000);
        nodeProps.put("confluent.enable.broker.reporting.min.usage.mode", false);
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (DynamicQuotaChannelManager) kafkaBroker.dynamicQuotaChannelManager().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().started() && dynamicQuotaChannelManager.getReportRequestThread().started();
            });
        }, "Dynamic quota channel manager should have been started");
        List list2 = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker2 -> {
            return (QuotaCoordinator) kafkaBroker2.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list2.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double longValue = KafkaLogicalClusterUtils.LC_META_1.producerByteRate().longValue() / 2.0d;
        double longValue2 = KafkaLogicalClusterUtils.LC_META_1.consumerByteRate().longValue() / 2.0d;
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getReportRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should have received a Reporting request");
        TestUtils.waitForCondition(() -> {
            return list2.stream().anyMatch(quotaCoordinator -> {
                return quotaCoordinator.quotaStateManager().getQuota(new QuotaEntity(toScalaMap(quotaTags))).nonEmpty();
            });
        }, "Quota coordinator should have recomputed the quota");
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should send a Publishing request");
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, d -> {
            return d.doubleValue() != longValue;
        });
        verifyQuota(ClientQuotaType.FETCH, quotaTags, d2 -> {
            return d2.doubleValue() != longValue2;
        });
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicQuotaPipelineTopicBased(String str) throws Throwable {
        topicBasedClusterSetup(true, false);
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (DynamicQuotaChannelManager) kafkaBroker.dynamicQuotaChannelManager().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().started() && dynamicQuotaChannelManager.getReportRequestThread().started();
            });
        }, "Dynamic quota channel manager should have been started");
        List list2 = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker2 -> {
            return (QuotaCoordinator) kafkaBroker2.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list2.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcMsgsAndSyncMetadata(1000);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double longValue = KafkaLogicalClusterUtils.LC_META_1.producerByteRate().longValue() / 2.0d;
        double longValue2 = KafkaLogicalClusterUtils.LC_META_1.consumerByteRate().longValue() / 2.0d;
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getReportRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should have received a Reporting request");
        TestUtils.waitForCondition(() -> {
            return list2.stream().anyMatch(quotaCoordinator -> {
                return quotaCoordinator.quotaStateManager().getQuota(new QuotaEntity(toScalaMap(quotaTags))).nonEmpty();
            });
        }, "Quota coordinator should have recomputed the quota");
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should send a Publishing request");
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, d -> {
            return d.doubleValue() != longValue;
        });
        verifyQuota(ClientQuotaType.FETCH, quotaTags, d2 -> {
            return d2.doubleValue() != longValue2;
        });
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicQuotaPipelineTopicBasedWithMaxElasticCKU(String str) throws Throwable {
        topicBasedClusterSetup(false, true);
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (QuotaCoordinator) kafkaBroker.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster3.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster3);
        addLkcMsgsAndSyncMetadata(1000);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic1", Optional.of(2), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster3.user(31), this.logicalCluster3.user(31), "testtopic1", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, 2621440.0d);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, 2621440.0d);
        updateMaxECKUTo3Fortenant(1100);
        this.testHarness.produce(this.logicalCluster3.user(31), "testtopic1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, 7864320.0d);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, 7864320.0d);
        updateMaxECKUTo2Fortenant(1200);
        this.testHarness.produce(this.logicalCluster3.user(31), "testtopic1", 0, false);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, 5242880.0d);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, 5242880.0d);
        updateInvalidEckuFortenant(1300);
        this.testHarness.produce(this.logicalCluster3.user(31), "testtopic1", 0, false);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, 1.31072E7d);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, 1.31072E7d);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProducerIdThrottling(String str) throws Throwable {
        setUp(1, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put("confluent.producer.id.quota.manager.enable", String.valueOf(true));
        nodeProps.put("confluent.producer.id.throttle.enable", String.valueOf(true));
        nodeProps.put("confluent.producer.id.cache.limit", 100);
        nodeProps.put("confluent.producer.id.throttle.enable.threshold.percentage", 0);
        nodeProps.put("confluent.quota.tenant.default.producer.id.rate", 1);
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(1), Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCER_ID, quotaTags, 1);
        int createProducersUntilProduceIdThrottled = createProducersUntilProduceIdThrottled(this.logicalCluster1.user(12), "testtopic");
        verifyThrottle(ClientQuotaType.PRODUCER_ID, quotaTags, true);
        verifyCount(ClientQuotaType.PRODUCER_ID, quotaTags, createProducersUntilProduceIdThrottled + 1);
    }

    protected void createPhysicalAndLogicalClustersWithTopic(Properties properties, Properties properties2) throws ExecutionException, InterruptedException {
        this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, 15000L, properties, properties2);
        String logicalClusterId = KafkaLogicalClusterUtils.LC_META_1.logicalClusterId();
        String logicalClusterId2 = KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId();
        String logicalClusterId3 = KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId();
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster(logicalClusterId, 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster(logicalClusterId2, 200, 9, 21, 22);
        this.logicalCluster3 = this.physicalCluster.createLogicalCluster(logicalClusterId3, 300, 9, 31, 32);
    }

    protected void addLkcMsgsAndSyncMetadata(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_META_1.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, logicalClusterId, KafkaLogicalClusterUtils.LC_META_1);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
        String logicalClusterId2 = KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 2, logicalClusterId2, KafkaLogicalClusterUtils.LC_META_XYZ);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId2) != null;
            });
        }, "Expected metadata to get consumed");
        String logicalClusterId3 = KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 3, logicalClusterId3, KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId3) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateLkcMeta1WithEckuMetadata(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_META_1_WITH_1_ECKU.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, logicalClusterId, KafkaLogicalClusterUtils.LC_META_1_WITH_1_ECKU);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateActiveLkcWithNullEckuMetadata(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, logicalClusterId, KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateActiveLkcWithInvalidEckuMetadata(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, logicalClusterId, KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateInActiveLkcWithInvalidEckuMetadata(int i) throws InterruptedException {
        this.physicalCluster.clusterMetadataInstances();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN.logicalClusterId(), KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN);
    }

    protected void updateLkcMeta1WithoutEckuMetadata(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_META_1_WITH_1_ECKU.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, logicalClusterId, KafkaLogicalClusterUtils.LC_META_1);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateMaxECKUTo3Fortenant(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_TENANT_WITH_3_MAX_ECKU.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 5, logicalClusterId, KafkaLogicalClusterUtils.LC_TENANT_WITH_3_MAX_ECKU);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateMaxECKUTo2Fortenant(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_TENANT_WITH_2_MAX_ECKU.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 7, logicalClusterId, KafkaLogicalClusterUtils.LC_TENANT_WITH_2_MAX_ECKU);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void updateInvalidEckuFortenant(int i) throws InterruptedException {
        List<BasePhysicalClusterMetadata> clusterMetadataInstances = this.physicalCluster.clusterMetadataInstances();
        String logicalClusterId = KafkaLogicalClusterUtils.LC_TENANT_WITH_INVALID_ECKU.logicalClusterId();
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 9, logicalClusterId, KafkaLogicalClusterUtils.LC_TENANT_WITH_INVALID_ECKU);
        TestUtils.waitForCondition(() -> {
            return clusterMetadataInstances.stream().allMatch(basePhysicalClusterMetadata -> {
                return basePhysicalClusterMetadata.metadata(logicalClusterId) != null;
            });
        }, "Expected metadata to get consumed");
    }

    protected void deleteLogicalClusters(int i) throws InterruptedException {
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 2, KafkaLogicalClusterUtils.LC_META_TENANT1_DELETED.logicalClusterId(), KafkaLogicalClusterUtils.LC_META_TENANT1_DELETED);
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 2, KafkaLogicalClusterUtils.LC_META_XYZ_DELETED.logicalClusterId(), KafkaLogicalClusterUtils.LC_META_XYZ_DELETED);
        this.physicalCluster.kafkaCluster().produceLCMData("_confluent-logical_clusters", i + 1, KafkaLogicalClusterUtils.LC_META_1_WITH_1_MAX_ECKU_WITH_DELETED.logicalClusterId(), KafkaLogicalClusterUtils.LC_META_1_WITH_1_MAX_ECKU_WITH_DELETED);
    }

    public Map<String, Object> eckuNodeProps() {
        return (Map) Stream.of(new Object[]{"client.quota.callback.class", TenantQuotaCallback.class.getName()}, new Object[]{"confluent.cdc.lkc.metadata.topic", "_confluent-logical_clusters"}, new Object[]{"multitenant.metadata.class", TopicBasedPhysicalClusterMetadata.class.getName()}).collect(Collectors.toMap(objArr -> {
            return objArr[0].toString();
        }, objArr2 -> {
            return objArr2[1];
        }));
    }

    public void topicBasedClusterSetup(boolean z, boolean z2) throws Exception {
        Properties properties = new Properties();
        properties.put(KafkaConfig.DynamicQuotaEnabledProp(), String.valueOf(z));
        properties.put(KafkaConfig.QuotasTopicReplicationFactorProp(), (short) 2);
        properties.put(KafkaConfig.ElasticCkuEnabledProp(), Boolean.valueOf(z2));
        properties.put("confluent.quota.dynamic.reporting.interval.ms", 1000);
        properties.put("confluent.quota.dynamic.publishing.interval.ms", 4000);
        properties.put("confluent.enable.broker.reporting.min.usage.mode", false);
        properties.putAll(eckuNodeProps());
        Properties properties2 = new Properties();
        properties2.putAll(eckuNodeProps());
        this.testHarness = new IntegrationTestHarness(this.testInfo, 2);
        createPhysicalAndLogicalClustersWithTopic(properties, properties2);
        awaitMetadataPropagation();
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testElasticCkuCount(String str) throws Throwable {
        topicBasedClusterSetup(true, true);
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (DynamicQuotaChannelManager) kafkaBroker.dynamicQuotaChannelManager().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().started() && dynamicQuotaChannelManager.getReportRequestThread().started();
            });
        }, "Dynamic quota channel manager should have been started");
        List list2 = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker2 -> {
            return (QuotaCoordinator) kafkaBroker2.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list2.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        addLkcMsgsAndSyncMetadata(1000);
        verifyElasticCkuMetric(KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId());
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId());
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId());
        updateLkcMeta1WithEckuMetadata(1500);
        verifyElasticCkuMetric(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId());
        updateLkcMeta1WithoutEckuMetadata(2000);
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId());
        deleteLogicalClusters(2500);
        awaitMetadataPropagation();
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_META_1.logicalClusterId());
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId());
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_META_XYZ.logicalClusterId());
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testInvalidElasticCkuMetadataMetric(String str) throws Throwable {
        topicBasedClusterSetup(true, true);
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (DynamicQuotaChannelManager) kafkaBroker.dynamicQuotaChannelManager().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().started() && dynamicQuotaChannelManager.getReportRequestThread().started();
            });
        }, "Dynamic quota channel manager should have been started");
        List list2 = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker2 -> {
            return (QuotaCoordinator) kafkaBroker2.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list2.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        addLkcMsgsAndSyncMetadata(1000);
        verifyElasticCkuMetric(KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId());
        Utils.verifyInvalidECkuMetadataMetricNotPresent(KafkaLogicalClusterUtils.LC_TENANT_WITH_1_MAX_ECKU.logicalClusterId());
        updateActiveLkcWithNullEckuMetadata(1500);
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId());
        Utils.verifyInvalidECkuMetadataMetricNotPresent(KafkaLogicalClusterUtils.LC_WITH_NULL_ECKU_DEFN.logicalClusterId());
        updateActiveLkcWithInvalidEckuMetadata(1600);
        verifyElasticCkuMetric(KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
        Utils.verifyInvalidECkuMetadataMetricPresent(KafkaLogicalClusterUtils.LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
        updateInActiveLkcWithInvalidEckuMetadata(1610);
        awaitMetadataPropagation();
        verifyElasticCkuMetricNotPresent(KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
        Utils.verifyInvalidECkuMetadataMetricNotPresent(KafkaLogicalClusterUtils.DEACTIVATED_LC_WITH_INVALID_ECKU_DEFN.logicalClusterId());
    }

    private void verifyElasticCkuMetricNotPresent(String str) throws InterruptedException {
        String str2 = "ElasticCku";
        String str3 = "QuotaStateManager";
        Map allMetrics = KafkaYammerMetrics.defaultRegistry().allMetrics();
        TestUtils.waitForCondition(() -> {
            return allMetrics.entrySet().stream().filter(entry -> {
                MetricName metricName = (MetricName) entry.getKey();
                return metricName.getName().equals(str2) && metricName.getType().equals(str3) && metricName.getMBeanName().contains(String.format("%s=%s", MultiTenantRequestContextTest.TENANT_NAME, str));
            }).count() == 0;
        }, "E-CKU metric should not be present for " + str);
    }

    private void verifyElasticCkuMetric(String str) throws InterruptedException {
        String str2 = "ElasticCku";
        String str3 = "QuotaStateManager";
        Map allMetrics = KafkaYammerMetrics.defaultRegistry().allMetrics();
        TestUtils.waitForCondition(() -> {
            return allMetrics.entrySet().stream().filter(entry -> {
                MetricName metricName = (MetricName) entry.getKey();
                return metricName.getName().equals(str2) && metricName.getType().equals(str3) && metricName.getMBeanName().contains(String.format("%s=%s", MultiTenantRequestContextTest.TENANT_NAME, str));
            }).filter(entry2 -> {
                Gauge gauge = (Gauge) entry2.getValue();
                return gauge != null && ((Double) gauge.value()).doubleValue() >= 0.0d;
            }).count() > 0;
        }, "E-CKU metric is not present for " + str);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testNonCompactedTenantPartitionCountMetric(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        TenantUtils.setTenantPartitionCountUpdateThresholdInMs(0L);
        nodeProps.put(KafkaConfig.ElasticCkuEnabledProp(), true);
        createPhysicalAndLogicalClustersWithControllerProperties(nodeProps, nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic1", Optional.of(5), Optional.empty()))).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 5L, "PartitionCountNonCompacted");
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic2", Optional.of(5), Optional.empty()))).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 10L, "PartitionCountNonCompacted");
        createAdminClient.deleteTopics(Collections.singleton("testtopic1")).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 5L, "PartitionCountNonCompacted");
        createAdminClient.deleteTopics(Collections.singleton("testtopic2")).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 0L, "PartitionCountNonCompacted");
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testCompactedTenantPartitionCountMetric(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        TenantUtils.setTenantPartitionCountUpdateThresholdInMs(0L);
        nodeProps.put(KafkaConfig.ElasticCkuEnabledProp(), true);
        createPhysicalAndLogicalClustersWithControllerProperties(nodeProps, nodeProps);
        awaitMetadataPropagation();
        ImmutableMap of = ImmutableMap.of("cleanup.policy", "compact");
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic1", Optional.of(5), Optional.empty()).configs(of))).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 5L, "PartitionCountCompacted");
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic2", Optional.of(5), Optional.empty()).configs(of))).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 10L, "PartitionCountCompacted");
        createAdminClient.deleteTopics(Collections.singleton("testtopic1")).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 5L, "PartitionCountCompacted");
        createAdminClient.deleteTopics(Collections.singleton("testtopic2")).all().get();
        verifyPartitionCountMetric(this.logicalCluster1.logicalClusterId(), 0L, "PartitionCountCompacted");
    }

    private void verifyPartitionCountMetric(String str, Long l, String str2) throws InterruptedException {
        String str3 = "TenantMetricsPublisher";
        Map allMetrics = KafkaYammerMetrics.defaultRegistry().allMetrics();
        TestUtils.waitForCondition(() -> {
            return allMetrics.entrySet().stream().filter(entry -> {
                MetricName metricName = (MetricName) entry.getKey();
                return metricName.getName().equals(str2) && metricName.getType().equals(str3) && metricName.getMBeanName().endsWith(String.format("%s=%s", MultiTenantRequestContextTest.TENANT_NAME, str));
            }).filter(entry2 -> {
                Gauge gauge = (Gauge) entry2.getValue();
                return gauge != null && ((Long) gauge.value()).longValue() >= 0 && ((Long) gauge.value()).longValue() == l.longValue();
            }).count() > 0;
        }, "Partition count metric is not present for " + str);
    }

    private int createProducersUntilProduceIdThrottled(LogicalClusterUser logicalClusterUser, String str) {
        boolean isProducerIdThrottled;
        int i = 0;
        do {
            KafkaProducer<String, String> createProducer = createProducer(logicalClusterUser);
            try {
                i++;
                String valueOf = String.valueOf(i);
                Future send = createProducer.send(new ProducerRecord(str, (Integer) null, valueOf, valueOf), new ErrorLoggingCallback(str, (byte[]) null, (byte[]) null, true));
                do {
                    isProducerIdThrottled = isProducerIdThrottled();
                    if (send.isDone()) {
                        break;
                    }
                } while (!isProducerIdThrottled);
                if (createProducer != null) {
                    createProducer.close();
                }
            } catch (Throwable th) {
                if (createProducer != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } while (!isProducerIdThrottled);
        return i;
    }

    private boolean isProducerIdThrottled() {
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        Map map = (Map) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().collect(Collectors.toMap(kafkaBroker -> {
            return Integer.valueOf(kafkaBroker.config().brokerId());
        }, kafkaBroker2 -> {
            return kafkaBroker2.metrics();
        }));
        Predicate predicate = kafkaMetric -> {
            return kafkaMetric != null && ((Double) kafkaMetric.metricValue()).doubleValue() > 0.0d;
        };
        Predicate predicate2 = map2 -> {
            return map2.values().stream().anyMatch(predicate);
        };
        TestCondition testCondition = () -> {
            return predicate2.test((Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return (Integer) entry.getKey();
            }, entry2 -> {
                return quotaMetric((Metrics) entry2.getValue(), "throttle-time", ClientQuotaType.PRODUCER_ID, quotaTags);
            })));
        };
        try {
            return testCondition.conditionMet();
        } catch (Exception e) {
            return false;
        }
    }

    private KafkaProducer<String, String> createProducer(LogicalClusterUser logicalClusterUser) {
        Properties properties = new Properties();
        properties.put("client.id", logicalClusterUser.logicalClusterId);
        return KafkaTestUtils.createProducer(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), logicalClusterUser.saslJaasConfig(), properties);
    }

    private void updateBrokerConfig(ConfigEntry configEntry) throws Exception {
        this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)))).all().get();
    }

    private Map<String, String> quotaTags(LogicalCluster logicalCluster) {
        return Collections.singletonMap(MultiTenantRequestContextTest.TENANT_NAME, logicalCluster.logicalClusterId());
    }

    private <T> scala.collection.mutable.Map<T, T> toScalaMap(Map<T, T> map) {
        return CollectionConverters.MapHasAsScala(map).asScala();
    }
}
