package io.confluent.kafka.multitenant.integration.test;

import com.sun.jna.platform.win32.COM.tlb.imp.TlbConst;
import io.confluent.kafka.multitenant.TenantUtils;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import kafka.metrics.BrokerWithMember;
import kafka.metrics.ConsumerLagEmitter;
import kafka.metrics.ConsumerLagEmitterIntegrationTest;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantConsumerLagEmitterIntegrationTest.class */
public class MultiTenantConsumerLagEmitterIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private List<KafkaProducer<String, String>> producers = null;
    private List<KafkaConsumer<String, String>> consumers = null;
    private final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
    private final String groupId = "test-group";
    private final String topic = "test-topic";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties nodeProps() {
        Properties nodeProps = super.nodeProps();
        nodeProps.put(ConfluentConfigs.CONSUMER_LAG_EMITTER_ENABLED_CONFIG, "true");
        nodeProps.put(ConfluentConfigs.CONSUMER_LAG_EMITTER_INTERVAL_MS_CONFIG, "1000");
        return nodeProps;
    }

    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    @BeforeEach
    public void setUpTempDir(TestInfo testInfo) {
        super.setUpTempDir(testInfo);
        this.producers = new ArrayList(2);
        this.consumers = new ArrayList(2);
    }

    @AfterEach
    void shutdown() {
        this.producers.forEach((v0) -> {
            v0.close();
        });
        this.consumers.forEach((v0) -> {
            v0.close();
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testMultiTenantConsumerLagMetrics(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        LogicalClusterUser user2 = this.logicalCluster2.user(21);
        this.producers.add(this.testHarness.createProducer(user, this.securityProtocol));
        this.producers.add(this.testHarness.createProducer(user2, this.securityProtocol));
        this.consumers.add(this.testHarness.createConsumer(user, "test-group", this.securityProtocol));
        this.consumers.add(this.testHarness.createConsumer(user2, "test-group", this.securityProtocol));
        String str2 = user.tenantPrefix() + "test-topic";
        String str3 = user2.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(str2, 1, 1);
        this.physicalCluster.kafkaCluster().createTopic(str3, 1, 1);
        KafkaTestUtils.sendRecords(this.producers.get(0), "test-topic", 0, 100);
        KafkaTestUtils.sendRecords(this.producers.get(1), "test-topic", 0, 200);
        KafkaTestUtils.consumeRecords(this.consumers.get(0), "test-topic", 0, 100);
        KafkaTestUtils.consumeRecords(this.consumers.get(1), "test-topic", 0, 200);
        this.consumers.forEach((v0) -> {
            v0.commitSync();
        });
        BrokerWithMember findBrokerWithMember = ConsumerLagEmitterIntegrationTest.findBrokerWithMember(this.physicalCluster.kafkaCluster().kafkaBrokers(), user.tenantPrefix() + "test-group");
        BrokerWithMember findBrokerWithMember2 = ConsumerLagEmitterIntegrationTest.findBrokerWithMember(this.physicalCluster.kafkaCluster().kafkaBrokers(), user2.tenantPrefix() + "test-group");
        assertConsumerLag(findBrokerWithMember, user, 0L);
        assertConsumerLag(findBrokerWithMember2, user2, 0L);
        KafkaTestUtils.sendRecords(this.producers.get(0), "test-topic", 100, 400);
        KafkaTestUtils.sendRecords(this.producers.get(1), "test-topic", 200, 500);
        assertConsumerLag(findBrokerWithMember, user, 400L);
        assertConsumerLag(findBrokerWithMember2, user2, 500L);
        assertConsumerLagEmitterLatency(findBrokerWithMember);
        KafkaTestUtils.consumeRecords(this.consumers.get(0), "test-topic", 100, 400);
        KafkaTestUtils.consumeRecords(this.consumers.get(1), "test-topic", 200, 500);
        this.consumers.forEach((v0) -> {
            v0.commitSync();
        });
        assertConsumerLag(findBrokerWithMember, user, 0L);
        assertConsumerLag(findBrokerWithMember2, user2, 0L);
        assertConsumerLagEmitterLatency(findBrokerWithMember);
        assertConsumerLagEmitterLatency(findBrokerWithMember2);
    }

    private void assertConsumerLag(BrokerWithMember brokerWithMember, LogicalClusterUser logicalClusterUser, long j) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return verifyConsumerLagMetrics(brokerWithMember, logicalClusterUser, Long.valueOf(j));
        }, String.format("consumer lag should be %d", Long.valueOf(j)));
    }

    private void assertConsumerLagEmitterLatency(BrokerWithMember brokerWithMember) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return verifyConsumerLagEmitterLatency(brokerWithMember);
        }, String.format("consumer lag emitter run should take 0 - 1 seconds.", new Object[0]));
        Assertions.assertTrue(verifyConsumerLagEmitterTimeSinceLastSuccessfulRun(brokerWithMember));
    }

    private boolean verifyConsumerLagMetrics(BrokerWithMember brokerWithMember, LogicalClusterUser logicalClusterUser, Long l) {
        DescribeGroupsResponseData.DescribedGroupMember member = brokerWithMember.member();
        HashMap hashMap = new HashMap();
        hashMap.put("tenant", logicalClusterUser.logicalClusterId);
        hashMap.put(TenantUtils.CONSUMER_GROUP_TAG, "test-group");
        hashMap.put("member", member.memberId());
        hashMap.put("client-id", member.clientId());
        hashMap.put("topic", "test-topic");
        hashMap.put("partition", TlbConst.TYPELIB_MINOR_VERSION_SHELL);
        Metrics metrics = brokerWithMember.broker().metrics();
        KafkaMetric kafkaMetric = metrics.metrics().get(metrics.metricName(ConsumerLagEmitter.ConsumerLagMetricName(), "tenant-metrics", hashMap));
        if (kafkaMetric == null) {
            return false;
        }
        return l.equals(kafkaMetric.metricValue());
    }

    private boolean verifyConsumerLagEmitterLatency(BrokerWithMember brokerWithMember) {
        Metrics metrics = brokerWithMember.broker().metrics();
        KafkaMetric kafkaMetric = metrics.metrics().get(metrics.metricName(ConsumerLagEmitter.ExecutionTimeMetricName(), ConsumerLagEmitter.MetricGroupName()));
        if (kafkaMetric == null) {
            return false;
        }
        return isMetricValueInRange(kafkaMetric);
    }

    private boolean verifyConsumerLagEmitterTimeSinceLastSuccessfulRun(BrokerWithMember brokerWithMember) {
        Metrics metrics = brokerWithMember.broker().metrics();
        KafkaMetric kafkaMetric = metrics.metrics().get(metrics.metricName(ConsumerLagEmitter.TimeSinceLastSuccessfulRunMsMetricName(), ConsumerLagEmitter.MetricGroupName()));
        if (kafkaMetric == null) {
            return false;
        }
        return isMetricValueInRange(kafkaMetric);
    }

    private boolean isMetricValueInRange(KafkaMetric kafkaMetric) {
        return ((Long) kafkaMetric.metricValue()).longValue() >= 0 && ((Long) kafkaMetric.metricValue()).longValue() < 1000;
    }
}
