package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.Utils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kafka.coordinator.quota.QuotaEntity;
import kafka.server.DynamicQuotaChannelManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.JavaConverters;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantQuotaIntegrationTest.class */
public class MultiTenantQuotaIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final double defaultControllerMutationRateQuota = 100.0d;
    private final double defaultProducerIdRateQuota = 10.0d;
    private final double defaultProduceQuotaMultiplier = 2.0d;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties nodeProps() {
        Properties nodeProps = super.nodeProps();
        nodeProps.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        return nodeProps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        this.physicalCluster = this.testHarness.start(properties, Time.SYSTEM);
        this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
        this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicTenantControllerQuota(String str) throws Exception {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(100.0d));
        nodeProps.put(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(2.0d));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        verifyQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 100.0d);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(200.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 200.0d);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        verifyQuota(ClientQuotaType.CONTROLLER_MUTATION, quotaTags, 200.0d);
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicTenantProducerIdQuota(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig.ProducerIdQuotaManagerEnableProp(), String.valueOf(true));
        nodeProps.put(ConfluentConfigs.DEFAULT_PRODUCER_ID_RATE_PER_TENANT_CONFIG, String.valueOf(10.0d));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "test_topic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCER_ID, quotaTags, 10.0d);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.DEFAULT_PRODUCER_ID_RATE_PER_TENANT_CONFIG, String.valueOf(20.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCER_ID, quotaTags, 20.0d);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("test_topic2", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "test_topic2", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCER_ID, quotaTags, 20.0d);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicQuotaMultiplier(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(ConfluentConfigs.DEFAULT_CONTROLLER_MUTATION_RATE_PER_TENANT_CONFIG, String.valueOf(100.0d));
        nodeProps.put(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(2.0d));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(Utils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, (2.0d * Utils.LC_META_1.producerByteRate().longValue()) / 2.0d);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, Utils.LC_META_1.consumerByteRate().longValue() / 2.0d);
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.TENANT_FETCH_QUOTA_MULTIPLIER_CONFIG, String.valueOf(3.0d)));
        updateBrokerConfig(new ConfigEntry(ConfluentConfigs.TENANT_PRODUCE_QUOTA_MULTIPLIER_CONFIG, String.valueOf(1.0d)));
        double longValue = (1.0d * Utils.LC_META_1.producerByteRate().longValue()) / 2.0d;
        double longValue2 = (3.0d * Utils.LC_META_1.consumerByteRate().longValue()) / 2.0d;
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, longValue2);
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
    }

    @Disabled("CPKAFKA-8672 Flaky with higher forks")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testDynamicQuotaPipeline(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig.DynamicQuotaEnabledProp(), String.valueOf(true));
        nodeProps.put(KafkaConfig.QuotasTopicReplicationFactorProp(), (short) 2);
        nodeProps.put(ConfluentConfigs.QUOTA_DYNAMIC_REPORTING_INTERVAL_MS_CONFIG, 100);
        nodeProps.put(ConfluentConfigs.QUOTA_DYNAMIC_PUBLISHING_INTERVAL_MS_CONFIG, 2000);
        nodeProps.put(ConfluentConfigs.QUOTA_DYNAMIC_REPORTING_MIN_USAGE_CONFIG, 0);
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        List list = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> {
            return (DynamicQuotaChannelManager) kafkaBroker.dynamicQuotaChannelManager().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list.stream().allMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().started() && dynamicQuotaChannelManager.getReportRequestThread().started();
            });
        }, "Dynamic quota channel manager should have been started");
        List list2 = (List) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker2 -> {
            return kafkaBroker2.quotaCoordinatorOpt().get();
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return list2.stream().allMatch(quotaCoordinator -> {
                return quotaCoordinator.isActive().get();
            });
        }, "Quota coordinator should have been started");
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(Utils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", (Optional<Integer>) Optional.of(5), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        double longValue = Utils.LC_META_1.producerByteRate().longValue() / 2.0d;
        double longValue2 = Utils.LC_META_1.consumerByteRate().longValue() / 2.0d;
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, longValue);
        verifyQuota(ClientQuotaType.FETCH, quotaTags, longValue2);
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getReportRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should have received a Reporting request");
        TestUtils.waitForCondition(() -> {
            return list2.stream().anyMatch(quotaCoordinator -> {
                return quotaCoordinator.quotaStateManager().getQuota(new QuotaEntity(toScalaMap(quotaTags))).nonEmpty();
            });
        }, "Quota coordinator should have recomputed the quota");
        TestUtils.waitForCondition(() -> {
            return list.stream().anyMatch(dynamicQuotaChannelManager -> {
                return dynamicQuotaChannelManager.getPublishRequestThread().queuePerNode().nonEmpty();
            });
        }, "Dynamic quota channel manager should send a Publishing request");
        verifyQuota(ClientQuotaType.PRODUCE, quotaTags, d -> {
            return d.doubleValue() != longValue;
        });
        verifyQuota(ClientQuotaType.FETCH, quotaTags, d2 -> {
            return d2.doubleValue() != longValue2;
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProducerIdThrottling(String str) throws Throwable {
        setUp(1, Collections.emptyList());
        Properties nodeProps = nodeProps();
        nodeProps.put(KafkaConfig.ProducerIdQuotaManagerEnableProp(), String.valueOf(true));
        nodeProps.put(KafkaConfig.ProducerIdThrottleEnableProp(), String.valueOf(true));
        nodeProps.put(KafkaConfig.ProducerIdCacheLimitProp(), 100);
        nodeProps.put(KafkaConfig.ProducerIdThrottleEnableThresholdPercentageProp(), 0);
        nodeProps.put(ConfluentConfigs.DEFAULT_PRODUCER_ID_RATE_PER_TENANT_CONFIG, 1);
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        addLkcFileAndSyncMetadata(Utils.LC_META_1);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", (Optional<Integer>) Optional.of(1), (Optional<Short>) Optional.empty()))).all().get();
        this.testHarness.produceConsume(this.logicalCluster1.user(11), this.logicalCluster1.user(12), "testtopic", "group1", 0, false);
        verifyQuota(ClientQuotaType.PRODUCER_ID, quotaTags, 1);
        int createProducersUntilProduceIdThrottled = createProducersUntilProduceIdThrottled(this.logicalCluster1.user(12), "testtopic");
        verifyThrottle(ClientQuotaType.PRODUCER_ID, quotaTags, true);
        verifyCount(ClientQuotaType.PRODUCER_ID, quotaTags, createProducersUntilProduceIdThrottled + 1);
    }

    private int createProducersUntilProduceIdThrottled(LogicalClusterUser logicalClusterUser, String str) {
        boolean isProducerIdThrottled;
        int i = 0;
        do {
            KafkaProducer<String, String> createProducer = createProducer(logicalClusterUser);
            Throwable th = null;
            try {
                try {
                    i++;
                    String valueOf = String.valueOf(i);
                    Future<RecordMetadata> send = createProducer.send(new ProducerRecord<>(str, null, valueOf, valueOf), new ErrorLoggingCallback(str, null, null, true));
                    do {
                        isProducerIdThrottled = isProducerIdThrottled();
                        if (send.isDone()) {
                            break;
                        }
                    } while (!isProducerIdThrottled);
                    if (createProducer != null) {
                        if (0 != 0) {
                            try {
                                createProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createProducer.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (createProducer != null) {
                    if (th != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createProducer.close();
                    }
                }
                throw th3;
            }
        } while (!isProducerIdThrottled);
        return i;
    }

    private boolean isProducerIdThrottled() {
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1);
        Map map = (Map) this.physicalCluster.kafkaCluster().kafkaBrokers().stream().collect(Collectors.toMap(kafkaBroker -> {
            return Integer.valueOf(kafkaBroker.config().brokerId());
        }, kafkaBroker2 -> {
            return kafkaBroker2.metrics();
        }));
        Predicate predicate = kafkaMetric -> {
            return kafkaMetric != null && ((Double) kafkaMetric.metricValue()).doubleValue() > ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT;
        };
        Predicate predicate2 = map2 -> {
            return map2.values().stream().anyMatch(predicate);
        };
        TestCondition testCondition = () -> {
            return predicate2.test(map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return (Integer) entry.getKey();
            }, entry2 -> {
                return quotaMetric((Metrics) entry2.getValue(), "throttle-time", ClientQuotaType.PRODUCER_ID, quotaTags);
            })));
        };
        try {
            return testCondition.conditionMet();
        } catch (Exception e) {
            return false;
        }
    }

    private KafkaProducer<String, String> createProducer(LogicalClusterUser logicalClusterUser) {
        Properties properties = new Properties();
        properties.put("client.id", logicalClusterUser.logicalClusterId);
        return KafkaTestUtils.createProducer(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), logicalClusterUser.saslJaasConfig(), properties);
    }

    private void updateBrokerConfig(ConfigEntry configEntry) throws Exception {
        this.physicalCluster.superAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)))).all().get();
    }

    private Map<String, String> quotaTags(LogicalCluster logicalCluster) {
        return Collections.singletonMap("tenant", logicalCluster.logicalClusterId());
    }

    private <T> scala.collection.mutable.Map<T, T> toScalaMap(Map<T, T> map) {
        return (scala.collection.mutable.Map) JavaConverters.mapAsScalaMapConverter(map).asScala();
    }
}
