package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import kafka.network.SocketServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
import org.apache.kafka.network.TenantQuotaEntity;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantThrottlingIntegrationTest.class */
class MultiTenantThrottlingIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    MultiTenantThrottlingIntegrationTest() {
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters() {
        super.createPhysicalAndLogicalClusters();
        awaitMetadataPropagation();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public void createPhysicalAndLogicalClusters(Properties properties) {
        super.createPhysicalAndLogicalClusters(properties);
        awaitMetadataPropagation();
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottling(String str) throws Exception {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters(rateThrottlingProps());
        assertTenantThrottled(this.logicalCluster2.user(22), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottlingDefaultChanged(String str) throws Exception {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters(rateThrottlingProps());
        setTenantQuota(null, "connection_creation_rate", 100.0d);
        assertTenantNotThrottled(this.logicalCluster2.user(22), 10, true);
        setTenantQuota(null, "connection_creation_rate", 0.1d);
        assertTenantThrottled(this.logicalCluster2.user(22), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottlingOverride(String str) throws Exception {
        setUp(1, Collections.emptyList());
        Properties rateThrottlingProps = rateThrottlingProps();
        rateThrottlingProps.put("confluent.max.connection.creation.rate.per.tenant", "100");
        createPhysicalAndLogicalClusters(rateThrottlingProps);
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_rate", 0.1d);
        assertTenantNotThrottled(this.logicalCluster1.user(9), 10, true);
        assertTenantThrottled(this.logicalCluster2.user(22), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottlingOverrideHigherThanDefault(String str) throws Exception {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters(rateThrottlingProps());
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_rate", 100.0d);
        assertTenantNotThrottled(this.logicalCluster2.user(22), 10, true);
        assertTenantThrottled(this.logicalCluster1.user(9), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottlingOverrideChanged(String str) throws Exception {
        setUp(1, Collections.emptyList());
        Properties rateThrottlingProps = rateThrottlingProps();
        rateThrottlingProps.put("confluent.max.connection.creation.rate.per.tenant", "100");
        createPhysicalAndLogicalClusters(rateThrottlingProps);
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_rate", 0.1d);
        assertTenantNotThrottled(this.logicalCluster1.user(9), 10, true);
        assertTenantThrottled(this.logicalCluster2.user(22), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottlingOverrideIfDeleted(String str) throws Exception {
        setUp(1, Collections.emptyList());
        Properties rateThrottlingProps = rateThrottlingProps();
        rateThrottlingProps.put("confluent.max.connection.creation.rate.per.tenant", "100");
        createPhysicalAndLogicalClusters(rateThrottlingProps);
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_rate", 0.1d);
        deleteTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_rate");
        assertTenantNotThrottled(this.logicalCluster2.user(22), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantThrottlingOverrideDefaultIfDeleted(String str) throws Exception {
        setUp(1, Collections.emptyList());
        Properties rateThrottlingProps = rateThrottlingProps();
        rateThrottlingProps.put("confluent.max.connection.creation.rate.per.tenant", "100");
        createPhysicalAndLogicalClusters(rateThrottlingProps);
        setTenantQuota(null, "connection_creation_rate", 0.1d);
        deleteTenantQuota(null, "connection_creation_rate");
        assertTenantNotThrottled(this.logicalCluster2.user(22), 2, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testNoTenantThrottlingIfHighMaxConnectionRatePerTenant(String str) throws Exception {
        setUp(1, Collections.emptyList());
        Properties rateThrottlingProps = rateThrottlingProps();
        rateThrottlingProps.put("confluent.max.connection.creation.rate.per.tenant", "100");
        createPhysicalAndLogicalClusters(rateThrottlingProps);
        assertTenantNotThrottled(this.logicalCluster2.user(22), 10, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testNoTenantThrottlingIfNotTrackingTenantOrApiKey(String str) throws Exception {
        setUp(1, Collections.emptyList());
        Properties rateThrottlingProps = rateThrottlingProps();
        rateThrottlingProps.put("confluent.track.api.key.per.ip", "false");
        rateThrottlingProps.put("confluent.track.tenant.id.per.ip", "false");
        createPhysicalAndLogicalClusters(rateThrottlingProps);
        assertTenantNotThrottled(this.logicalCluster2.user(22), 10, true);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantCountThrottlingOverrideHigherThanDefault(String str) throws Exception {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters(countThrottlingProps());
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_count", 100.0d);
        assertTenantNotThrottledForConnectionCount(this.logicalCluster2.user(22), 11);
        assertTenantThrottledForConnectionCount(this.logicalCluster1.user(9), 11);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantCountThrottlingOverrideLowerThanDefault(String str) throws Exception {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters(countThrottlingProps());
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_count", 5.0d);
        assertTenantThrottledForConnectionCount(this.logicalCluster2.user(22), 8);
        assertTenantNotThrottledForConnectionCount(this.logicalCluster1.user(9), 8);
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    void testTenantCountThrottlingDeletingOverrides(String str) throws Exception {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters(countThrottlingProps());
        setTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_count", 5.0d);
        deleteTenantQuota(this.logicalCluster2.user(22).logicalClusterId, "connection_creation_count");
        assertTenantNotThrottledForConnectionCount(this.logicalCluster2.user(22), 8);
        setTenantQuota(null, "connection_creation_count", 3.0d);
        deleteTenantQuota(null, "connection_creation_count");
        assertTenantNotThrottledForConnectionCount(this.logicalCluster1.user(9), 8);
    }

    private double throttlingTimeMs(String str) {
        Metrics metrics = this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).metrics();
        MetricName metricName = metrics.metricName("tenant-connection-accept-throttle-time", SocketServer.MetricsGroup(), "Tracking average throttle-time, out of non-zero throttle times, per tenant", new TenantQuotaEntity(str).metricTags());
        if (metrics.metric(metricName) == null) {
            return 0.0d;
        }
        return ((Double) metrics.metric(metricName).metricValue()).doubleValue();
    }

    private double throttledConnections(String str) {
        Metrics metrics = this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).metrics();
        MetricName metricName = metrics.metricName("tenant-excess-connections", SocketServer.MetricsGroup(), "Tracking number of connections being throttled due to exceeding connection count quota, per tenant", new TenantQuotaEntity(str).metricTags());
        if (metrics.metric(metricName) == null) {
            return 0.0d;
        }
        return ((Double) metrics.metric(metricName).metricValue()).doubleValue();
    }

    private void assertTenantThrottled(LogicalClusterUser logicalClusterUser, int i, boolean z) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            AdminClient createAdminClient = this.testHarness.createAdminClient(logicalClusterUser);
            arrayList.add(createAdminClient);
            createAdminClient.createTopics(Collections.singletonList(new NewTopic("testtopicname" + i2, 3, (short) 1)), new CreateTopicsOptions().timeoutMs(1000).retryOnQuotaViolation(false)).all();
        }
        if (z) {
            TestUtils.waitForCondition(() -> {
                return throttlingTimeMs(logicalClusterUser.logicalClusterId) > 0.0d;
            }, "Expected it to throttle");
        } else {
            TestUtils.waitForCondition(() -> {
                return throttledConnections(logicalClusterUser.logicalClusterId) > 0.0d;
            }, "Expected it to throttle");
        }
        arrayList.forEach(adminClient -> {
            adminClient.close(Duration.ZERO);
        });
    }

    private void setTenantQuota(String str, String str2, double d) throws ExecutionException, InterruptedException {
        this.physicalCluster.superConfluentAdmin().alterClientQuotas(Collections.singletonList(new ClientQuotaAlteration(new ClientQuotaEntity(Collections.singletonMap("confluent-tenant", str)), Collections.singletonList(new ClientQuotaAlteration.Op(str2, Double.valueOf(d)))))).all().get();
        ClientQuotaFilterComponent ofDefaultEntity = str == null ? ClientQuotaFilterComponent.ofDefaultEntity("confluent-tenant") : ClientQuotaFilterComponent.ofEntity("confluent-tenant", str);
        TestUtils.waitForCondition(() -> {
            return ((Map) this.physicalCluster.superConfluentAdmin().describeClientQuotas(ClientQuotaFilter.containsOnly(Collections.singletonList(ofDefaultEntity))).entities().get()).size() == 1 && ((Double) ((Map) ((Map) this.physicalCluster.superConfluentAdmin().describeClientQuotas(ClientQuotaFilter.containsOnly(Collections.singletonList(ofDefaultEntity))).entities().get()).values().iterator().next()).get(str2)).doubleValue() == d;
        }, "Could not describe confluent tenant client quota with correct quota value");
    }

    private void deleteTenantQuota(String str, String str2) throws ExecutionException, InterruptedException {
        this.physicalCluster.superConfluentAdmin().alterClientQuotas(Collections.singletonList(new ClientQuotaAlteration(new ClientQuotaEntity(Collections.singletonMap("confluent-tenant", str)), Collections.singletonList(new ClientQuotaAlteration.Op(str2, (Double) null))))).all().get();
        ClientQuotaFilterComponent ofDefaultEntity = str == null ? ClientQuotaFilterComponent.ofDefaultEntity("confluent-tenant") : ClientQuotaFilterComponent.ofEntity("confluent-tenant", str);
        TestUtils.waitForCondition(() -> {
            return ((Map) this.physicalCluster.superConfluentAdmin().describeClientQuotas(ClientQuotaFilter.containsOnly(Collections.singletonList(ofDefaultEntity))).entities().get()).isEmpty();
        }, "Could not describe confluent tenant client quota");
    }

    private void assertTenantNotThrottled(LogicalClusterUser logicalClusterUser, int i, boolean z) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            AdminClient createAdminClient = this.testHarness.createAdminClient(logicalClusterUser);
            String str = "testtopicname" + i2;
            arrayList.add(str);
            createAdminClient.createTopics(Collections.singletonList(new NewTopic(str, 3, (short) 1))).all();
        }
        AdminClient createAdminClient2 = this.testHarness.createAdminClient(logicalClusterUser);
        TestUtils.waitForCondition(() -> {
            return ((Set) createAdminClient2.listTopics().names().get()).containsAll(arrayList);
        }, String.format("Could not list topic %s in time", arrayList));
        if (z) {
            Assertions.assertEquals(0.0d, throttlingTimeMs(logicalClusterUser.logicalClusterId));
        } else {
            Assertions.assertEquals(0.0d, throttledConnections(logicalClusterUser.logicalClusterId));
        }
    }

    private void assertTenantNotThrottledForConnectionCount(LogicalClusterUser logicalClusterUser, int i) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(this.testHarness.createAdminClient(logicalClusterUser));
        }
        Assertions.assertEquals(0.0d, throttledConnections(logicalClusterUser.logicalClusterId));
        arrayList.forEach(adminClient -> {
            adminClient.close(Duration.ZERO);
        });
    }

    private void assertTenantThrottledForConnectionCount(LogicalClusterUser logicalClusterUser, int i) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(this.testHarness.createAdminClient(logicalClusterUser));
        }
        TestUtils.waitForCondition(() -> {
            return throttledConnections(logicalClusterUser.logicalClusterId) > 0.0d;
        }, "Expected it to throttle");
        arrayList.forEach(adminClient -> {
            adminClient.close(Duration.ZERO);
        });
    }

    private Properties rateThrottlingProps() {
        Properties nodeProps = nodeProps();
        nodeProps.put("max.connection.creation.rate.per.tenant.enable.threshold", "0.0");
        nodeProps.put("confluent.max.connection.creation.rate.per.tenant", "0.1");
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        nodeProps.put("confluent.track.api.key.per.ip", "true");
        nodeProps.put("confluent.track.tenant.id.per.ip", "true");
        nodeProps.put("default.replication.factor", 1);
        return nodeProps;
    }

    private Properties countThrottlingProps() {
        Properties nodeProps = nodeProps();
        nodeProps.put("max.connections.per.tenant", "10");
        nodeProps.put("confluent.plugins.topic.policy.replication.factor", (short) 1);
        nodeProps.put("confluent.track.api.key.per.ip", "true");
        nodeProps.put("confluent.track.tenant.id.per.ip", "true");
        nodeProps.put("default.replication.factor", 1);
        return nodeProps;
    }
}
