package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.KafkaLogicalClusterUtils;
import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:2")})
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantUserQuotaIntegrationTest.class */
public class MultiTenantUserQuotaIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final String producerClientId = "producer-client";
    private final String consumerClientId = "consumer-client";

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
        setUp();
        createPhysicalAndLogicalClusters();
        awaitMetadataPropagation();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties nodeProps() {
        Properties nodeProps = super.nodeProps();
        nodeProps.put("client.quota.callback.class", TenantQuotaCallback.class.getName());
        nodeProps.put("confluent.quota.tenant.user.quotas.enable", String.valueOf(true));
        return nodeProps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        properties.put("confluent.cluster.link.replication.quota.mode", ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND.name());
        try {
            this.physicalCluster = this.testHarness.startWithTopic("_confluent-logical_clusters", 1, 1, 60000L, properties, properties, Optional.of(Time.SYSTEM));
            this.logicalCluster1 = this.physicalCluster.createLogicalCluster("lkc-tenant1", 100, 9, 11, 12);
            this.logicalCluster2 = this.physicalCluster.createLogicalCluster("lkc-tenant2", 200, 9, 21, 22);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAlterQuotasDisabledForTenants(String str) {
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map singletonMap = Collections.singletonMap("producer_byte_rate", Double.valueOf(5000.0d));
        Assertions.assertEquals(ClusterAuthorizationException.class, ((Exception) Assertions.assertThrows(ExecutionException.class, () -> {
            alterClientQuotas(createAdminClient, "u-9", singletonMap);
        })).getCause().getClass());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAlterQuotasEnabledForAdminUsers(String str) throws Exception {
        this.testHarness.createAdminClient(this.logicalCluster1.adminUser()).createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(5), Optional.empty()))).all().get();
        AdminClient superAdminClient = this.physicalCluster.superAdminClient();
        Map<String, Double> singletonMap = Collections.singletonMap("producer_byte_rate", Double.valueOf(5000.0d));
        LogicalClusterUser user = this.logicalCluster1.user(9);
        alterClientQuotas(superAdminClient, getPrefixedUser(user), singletonMap);
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags(user), 5000.0d / 2.0d);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSetAndRemoveUserProduceQuota(String str) throws Throwable {
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1.user(9));
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        Map<String, Double> singletonMap = Collections.singletonMap("producer_byte_rate", Double.valueOf(5000.0d));
        LogicalClusterUser user = this.logicalCluster1.user(9);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedUser(user), singletonMap);
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, 5000.0d / 2.0d);
        int produceUntilThrottled = produceUntilThrottled(user, "testtopic", 2000);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, true);
        Assertions.assertEquals(produceUntilThrottled, consumeUntilThrottled(user, "testtopic", "test-group", produceUntilThrottled));
        verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user, false);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedUser(user), Collections.singletonMap("producer_byte_rate", null));
        verifyQuotaUpdate(ClientQuotaType.PRODUCE, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.PRODUCE));
        Assertions.assertEquals(2000, produceUntilThrottled(user, "testtopic", 2000));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSetAndRemoveUserFetchQuota(String str) throws Throwable {
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1.user(9));
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        Map<String, Double> singletonMap = Collections.singletonMap("consumer_byte_rate", Double.valueOf(5000.0d));
        LogicalClusterUser user = this.logicalCluster1.user(9);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedUser(user), singletonMap);
        verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, 5000.0d / 2.0d);
        int produceUntilThrottled = produceUntilThrottled(user, "testtopic", 2000);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, false);
        Assertions.assertTrue(produceUntilThrottled > consumeUntilThrottled(user, "testtopic", "test-group", produceUntilThrottled));
        verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user, true);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedUser(user), Collections.singletonMap("consumer_byte_rate", null));
        verifyQuotaUpdate(ClientQuotaType.FETCH, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.FETCH));
        Assertions.assertEquals(produceUntilThrottled, consumeUntilThrottled(user, "testtopic", "test-group-2", produceUntilThrottled));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSetAndRemoveDefaultUserProduceQuota(String str) throws Throwable {
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1.user(9));
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        Map<String, Double> singletonMap = Collections.singletonMap("producer_byte_rate", Double.valueOf(5000.0d));
        LogicalClusterUser user = this.logicalCluster1.user(9);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedDefaultUser(this.logicalCluster1), singletonMap);
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, 5000.0d / 2.0d);
        int produceUntilThrottled = produceUntilThrottled(user, "testtopic", 2000);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, true);
        Assertions.assertEquals(produceUntilThrottled, consumeUntilThrottled(user, "testtopic", "test-group", produceUntilThrottled));
        verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user, false);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedDefaultUser(this.logicalCluster1), Collections.singletonMap("producer_byte_rate", null));
        verifyQuotaUpdate(ClientQuotaType.PRODUCE, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.PRODUCE));
        Assertions.assertEquals(2000, produceUntilThrottled(user, "testtopic", 2000));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testSetAndRemoveDefaultUserFetchQuota(String str) throws Throwable {
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1.user(9));
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        Map<String, Double> singletonMap = Collections.singletonMap("consumer_byte_rate", Double.valueOf(5000.0d));
        LogicalClusterUser user = this.logicalCluster1.user(9);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedDefaultUser(this.logicalCluster1), singletonMap);
        verifyQuotaCallbackLimit(ClientQuotaType.FETCH, quotaTags, 5000.0d / 2.0d);
        int produceUntilThrottled = produceUntilThrottled(user, "testtopic", 2000);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, false);
        Assertions.assertTrue(produceUntilThrottled > consumeUntilThrottled(user, "testtopic", "test-group", produceUntilThrottled));
        verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user, true);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedDefaultUser(this.logicalCluster1), Collections.singletonMap("consumer_byte_rate", null));
        verifyQuotaUpdate(ClientQuotaType.FETCH, quotaTags, QuotaConfig.UNLIMITED_QUOTA.quota(ClientQuotaType.FETCH));
        Assertions.assertEquals(produceUntilThrottled, consumeUntilThrottled(user, "testtopic", "test-group-2", produceUntilThrottled));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testOverrideDefaultQuota(String str) throws Throwable {
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        Map<String, String> quotaTags = quotaTags(this.logicalCluster1.user(9));
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        Map<String, Double> singletonMap = Collections.singletonMap("producer_byte_rate", Double.valueOf(5000.0d));
        LogicalClusterUser user = this.logicalCluster1.user(9);
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedDefaultUser(this.logicalCluster1), singletonMap);
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, 5000.0d / 2.0d);
        int produceUntilThrottled = produceUntilThrottled(user, "testtopic", 2000);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, true);
        Assertions.assertEquals(produceUntilThrottled, consumeUntilThrottled(user, "testtopic", "test-group", produceUntilThrottled));
        verifyBrokerThrottleMetric(ClientQuotaType.FETCH, user, false);
        HashMap hashMap = new HashMap();
        hashMap.put("producer_byte_rate", Double.valueOf(2097152.0d));
        hashMap.put("consumer_byte_rate", Double.valueOf(5000.0d));
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedUser(user), hashMap);
        verifyQuotaUpdate(ClientQuotaType.PRODUCE, quotaTags, 2097152.0d);
        verifyQuotaUpdate(ClientQuotaType.FETCH, quotaTags, 5000.0d);
        int produceUntilThrottled2 = produceUntilThrottled(user, "testtopic", 2000);
        Assertions.assertEquals(2000, produceUntilThrottled2);
        Assertions.assertTrue(produceUntilThrottled2 > consumeUntilThrottled(user, "testtopic", "test-group-2", 2000));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testProduceThrottlePreCheck(String str) throws Throwable {
        setUp();
        Properties nodeProps = nodeProps();
        nodeProps.put("confluent.produce.throttle.pre.check.enable", String.valueOf(true));
        createPhysicalAndLogicalClusters(nodeProps);
        awaitMetadataPropagation();
        addLkcFileAndSyncMetadata(KafkaLogicalClusterUtils.LC_META_1);
        AdminClient createAdminClient = this.testHarness.createAdminClient(this.logicalCluster1.adminUser());
        LogicalClusterUser user = this.logicalCluster1.user(9);
        Map<String, String> quotaTags = quotaTags(user);
        createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopic", Optional.of(10), Optional.empty()))).all().get();
        alterClientQuotas(this.physicalCluster.superAdminClient(), getPrefixedDefaultUser(this.logicalCluster1), Collections.singletonMap("producer_byte_rate", Double.valueOf(50.0d)));
        verifyQuotaCallbackLimit(ClientQuotaType.PRODUCE, quotaTags, 50.0d / 2.0d);
        produceUntilThrottled(user, "testtopic", 2000, 100, 0);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, true);
        int produceUntilThrottled = produceUntilThrottled(user, "testtopic", 2000, 1, 0);
        verifyBrokerThrottleMetric(ClientQuotaType.PRODUCE, user, true);
        Assertions.assertEquals(1, produceUntilThrottled);
    }

    private void verifyQuotaUpdate(ClientQuotaType clientQuotaType, Map<String, String> map, double d) throws Exception {
        double d2 = d == QuotaConfig.UNLIMITED_QUOTA.quota(clientQuotaType) ? d : d / 2.0d;
        verifyQuotaCallbackLimit(clientQuotaType, map, d2);
        verifyQuota(clientQuotaType, map, d2);
    }

    private void verifyBrokerThrottleMetric(ClientQuotaType clientQuotaType, LogicalClusterUser logicalClusterUser, boolean z) {
        verifyThrottle(clientQuotaType, quotaTags(logicalClusterUser), z);
    }

    private boolean isConsumerThrottled(KafkaConsumer<?, ?> kafkaConsumer) {
        return ((Double) ((Metric) kafkaConsumer.metrics().get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", Collections.singletonMap("client-id", "consumer-client")))).metricValue()).doubleValue() > 0.0d;
    }

    private boolean isProducerThrottled(KafkaProducer<?, ?> kafkaProducer) {
        return ((Double) ((Metric) kafkaProducer.metrics().get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", Collections.singletonMap("client-id", "producer-client")))).metricValue()).doubleValue() > 0.0d;
    }

    private int produceUntilThrottled(LogicalClusterUser logicalClusterUser, String str, int i) {
        return produceUntilThrottled(logicalClusterUser, str, i, 1, 42);
    }

    private int produceUntilThrottled(LogicalClusterUser logicalClusterUser, String str, int i, int i2, int i3) {
        boolean isProducerThrottled;
        KafkaProducer<String, String> createProducer = createProducer(logicalClusterUser);
        Throwable th = null;
        int i4 = 0;
        while (true) {
            try {
                try {
                    String valueOf = String.valueOf(i4 * i2);
                    Future send = createProducer.send(new ProducerRecord(str, (Integer) null, valueOf, valueOf), new ErrorLoggingCallback(str, (byte[]) null, (byte[]) null, true));
                    i4++;
                    do {
                        isProducerThrottled = isProducerThrottled(createProducer);
                        if (send.isDone()) {
                            break;
                        }
                    } while (!isProducerThrottled);
                    if (i4 < i3 || (i4 < i && !isProducerThrottled)) {
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (createProducer != null) {
                    if (th != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createProducer.close();
                    }
                }
                throw th2;
            }
        }
        if (createProducer != null) {
            if (0 != 0) {
                try {
                    createProducer.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                createProducer.close();
            }
        }
        return i4;
    }

    private int consumeUntilThrottled(LogicalClusterUser logicalClusterUser, String str, String str2, int i) {
        KafkaConsumer<String, String> createConsumer = createConsumer(logicalClusterUser, str2);
        Throwable th = null;
        try {
            try {
                long millis = TimeUnit.MINUTES.toMillis(1L);
                createConsumer.subscribe(Collections.singleton(str));
                int i2 = 0;
                long currentTimeMillis = System.currentTimeMillis();
                do {
                    i2 += createConsumer.poll(Duration.ofMillis(100L)).count();
                    boolean isConsumerThrottled = isConsumerThrottled(createConsumer);
                    if (i2 >= i || isConsumerThrottled) {
                        break;
                    }
                } while (System.currentTimeMillis() < currentTimeMillis + millis);
                if (createConsumer != null) {
                    if (0 != 0) {
                        try {
                            createConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumer.close();
                    }
                }
                return i2;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumer != null) {
                if (th != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumer.close();
                }
            }
            throw th3;
        }
    }

    private KafkaProducer<String, String> createProducer(LogicalClusterUser logicalClusterUser) {
        Properties properties = new Properties();
        properties.put("client.id", "producer-client");
        return KafkaTestUtils.createProducer(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), logicalClusterUser.saslJaasConfig(), properties);
    }

    private KafkaConsumer<String, String> createConsumer(LogicalClusterUser logicalClusterUser, String str) {
        Properties properties = new Properties();
        properties.put("client.id", "consumer-client");
        return KafkaTestUtils.createConsumer(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), logicalClusterUser.saslJaasConfig(), str, properties);
    }

    private void alterClientQuotas(Admin admin, String str, Map<String, Double> map) throws Exception {
        admin.alterClientQuotas(Collections.singletonList(new ClientQuotaAlteration(new ClientQuotaEntity(Collections.singletonMap(MultiTenantRequestContextTest.USERNAME, str)), (Collection) map.entrySet().stream().map(entry -> {
            return new ClientQuotaAlteration.Op((String) entry.getKey(), (Double) entry.getValue());
        }).collect(Collectors.toList())))).all().get();
    }

    private String getPrefixedUser(LogicalClusterUser logicalClusterUser) {
        return logicalClusterUser.tenantPrefix() + logicalClusterUser.userResourceId();
    }

    private String getPrefixedDefaultUser(LogicalCluster logicalCluster) {
        return logicalCluster.logicalClusterId() + "_<default>";
    }

    private Map<String, String> quotaTags(LogicalClusterUser logicalClusterUser) {
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, logicalClusterUser.logicalClusterId);
        hashMap.put("user-resource-id", logicalClusterUser.userResourceId());
        return hashMap;
    }
}
