package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.metrics.utils.PartitionMetricUtils;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.metrics.utils.MetricUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantHotPartitionIntegrationTest.class */
public class MultiTenantHotPartitionIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private List<KafkaProducer<String, String>> producers = null;
    private List<KafkaConsumer<String, String>> consumers = null;
    private final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties nodeProps() {
        Properties nodeProps = super.nodeProps();
        nodeProps.put("confluent.hot.partition.ratio", Double.valueOf(0.1d));
        nodeProps.put("confluent.broker.limit.producer.bytes.per.second", 10);
        nodeProps.put("confluent.broker.limit.consumer.bytes.per.second", 10);
        return nodeProps;
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
        this.producers = new ArrayList(1);
        this.consumers = new ArrayList(1);
    }

    @AfterEach
    void shutdown() {
        this.producers.forEach((v0) -> {
            v0.close();
        });
        this.consumers.forEach((v0) -> {
            v0.close();
        });
    }

    @ValueSource(strings = {"kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMultiTenantHotPartitionMetrics(String str) throws Throwable {
        setUp(1, Collections.emptyList());
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        this.producers.add(this.testHarness.createProducer(user, this.securityProtocol));
        this.consumers.add(this.testHarness.createConsumer(user, "test-group", this.securityProtocol));
        String str2 = user.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(str2, 1, 1);
        KafkaTestUtils.sendRecords(this.producers.get(0), "test-topic", 0, 10);
        KafkaTestUtils.consumeRecords(this.consumers.get(0), "test-topic", 0, 10);
        this.consumers.forEach((v0) -> {
            v0.commitSync();
        });
        List<KafkaBroker> kafkaBrokers = this.physicalCluster.kafkaCluster().kafkaBrokers();
        long currentTimeMillis = System.currentTimeMillis();
        for (KafkaBroker kafkaBroker : kafkaBrokers) {
            Map tenantPartitionMetricTags = PartitionMetricUtils.tenantPartitionMetricTags(this.logicalCluster1.logicalClusterId(), new TopicPartition(str2, 0));
            Metrics metrics = kafkaBroker.metrics();
            Assertions.assertNotNull(metrics.metric(MetricUtils.rateMetricName(metrics, "tenant-metrics", tenantPartitionMetricTags, "partition-bytes-in", "partition-bytes-in")));
            KafkaMetric metric = metrics.metric(metrics.metricName("hot-partition-in", "tenant-metrics", tenantPartitionMetricTags));
            Assertions.assertNotNull(metric);
            Assertions.assertEquals(1.0d, metric.measurableValue(currentTimeMillis));
            Assertions.assertNotNull(metrics.metric(MetricUtils.rateMetricName(metrics, "tenant-metrics", tenantPartitionMetricTags, "partition-bytes-out", "partition-bytes-out")));
            KafkaMetric metric2 = metrics.metric(metrics.metricName("hot-partition-out", "tenant-metrics", tenantPartitionMetricTags));
            Assertions.assertNotNull(metric2);
            Assertions.assertEquals(1.0d, metric2.measurableValue(currentTimeMillis));
        }
    }
}
