package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantRequestContextTest;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import kafka.metrics.BrokerWithMember;
import kafka.metrics.ConsumerLagEmitter;
import kafka.metrics.ConsumerLagEmitterIntegrationTest;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
/* loaded from: input_file:io/confluent/kafka/multitenant/integration/test/MultiTenantConsumerLagEmitterIntegrationTest.class */
public class MultiTenantConsumerLagEmitterIntegrationTest extends AbstractMultiTenantKafkaIntegrationTest {
    private final SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
    private final String groupId = "test-group";
    private final String topic = "test-topic";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.kafka.multitenant.integration.test.AbstractMultiTenantKafkaIntegrationTest
    public Properties nodeProps() {
        Properties nodeProps = super.nodeProps();
        nodeProps.put("confluent.consumer.lag.emitter.enabled", "true");
        nodeProps.put("confluent.consumer.lag.emitter.interval.ms", "1000");
        return nodeProps;
    }

    @ValueSource(strings = {"CLASSIC", "CONSUMER"})
    @ParameterizedTest
    public void testMultiTenantConsumerLagMetrics(String str) throws Throwable {
        setUp();
        createPhysicalAndLogicalClusters();
        LogicalClusterUser user = this.logicalCluster1.user(11);
        LogicalClusterUser user2 = this.logicalCluster2.user(21);
        KafkaProducer<String, String> createProducer = this.testHarness.createProducer(user, this.securityProtocol);
        KafkaProducer<String, String> createProducer2 = this.testHarness.createProducer(user2, this.securityProtocol);
        Properties properties = new Properties();
        properties.put("group.protocol", str);
        KafkaConsumer<String, String> createConsumer = this.testHarness.createConsumer(user, "test-group", this.securityProtocol, Optional.empty(), properties);
        KafkaConsumer<String, String> createConsumer2 = this.testHarness.createConsumer(user2, "test-group", this.securityProtocol);
        String str2 = user.tenantPrefix() + "test-topic";
        String str3 = user2.tenantPrefix() + "test-topic";
        this.physicalCluster.kafkaCluster().createTopic(str2, 1, 1);
        this.physicalCluster.kafkaCluster().createTopic(str3, 1, 1);
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 0, 100);
        KafkaTestUtils.sendRecords(createProducer2, "test-topic", 0, 200);
        KafkaTestUtils.consumeRecords(createConsumer, "test-topic", 0, 100);
        KafkaTestUtils.consumeRecords(createConsumer2, "test-topic", 0, 200);
        createConsumer.commitSync();
        createConsumer2.commitSync();
        BrokerWithMember findBrokerWithMember = ConsumerLagEmitterIntegrationTest.findBrokerWithMember(this.physicalCluster.kafkaCluster().kafkaBrokers(), user.tenantPrefix() + "test-group", GroupProtocol.of(str));
        BrokerWithMember findBrokerWithMember2 = ConsumerLagEmitterIntegrationTest.findBrokerWithMember(this.physicalCluster.kafkaCluster().kafkaBrokers(), user2.tenantPrefix() + "test-group", GroupProtocol.CLASSIC);
        assertConsumerLag(findBrokerWithMember, user, str, 0L);
        assertConsumerLag(findBrokerWithMember2, user2, GroupProtocol.CLASSIC.toString(), 0L);
        KafkaTestUtils.sendRecords(createProducer, "test-topic", 100, 400);
        KafkaTestUtils.sendRecords(createProducer2, "test-topic", 200, 500);
        assertConsumerLag(findBrokerWithMember, user, str, 400L);
        assertConsumerLag(findBrokerWithMember2, user2, GroupProtocol.CLASSIC.toString(), 500L);
        assertConsumerLagEmitterLatency(findBrokerWithMember);
        KafkaTestUtils.consumeRecords(createConsumer, "test-topic", 100, 400);
        KafkaTestUtils.consumeRecords(createConsumer2, "test-topic", 200, 500);
        createConsumer.commitSync();
        createConsumer2.commitSync();
        assertConsumerLag(findBrokerWithMember, user, str, 0L);
        assertConsumerLag(findBrokerWithMember2, user2, GroupProtocol.CLASSIC.toString(), 0L);
        assertConsumerLagEmitterLatency(findBrokerWithMember);
        assertConsumerLagEmitterLatency(findBrokerWithMember2);
    }

    private void assertConsumerLag(BrokerWithMember brokerWithMember, LogicalClusterUser logicalClusterUser, String str, long j) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return verifyConsumerLagMetrics(brokerWithMember, logicalClusterUser, str, Long.valueOf(j));
        }, String.format("consumer lag should be %d", Long.valueOf(j)));
    }

    private void assertConsumerLagEmitterLatency(BrokerWithMember brokerWithMember) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return verifyConsumerLagEmitterLatency(brokerWithMember);
        }, String.format("consumer lag emitter run should take 0 - 1 seconds.", new Object[0]));
        Assertions.assertTrue(verifyConsumerLagEmitterTimeSinceLastSuccessfulRun(brokerWithMember));
    }

    private boolean verifyConsumerLagMetrics(BrokerWithMember brokerWithMember, LogicalClusterUser logicalClusterUser, String str, Long l) {
        DescribeGroupsResponseData.DescribedGroupMember member = brokerWithMember.member();
        HashMap hashMap = new HashMap();
        hashMap.put(MultiTenantRequestContextTest.TENANT_NAME, logicalClusterUser.logicalClusterId);
        hashMap.put("consumer-group", "test-group");
        hashMap.put("member", member.memberId());
        hashMap.put("client-id", member.clientId());
        hashMap.put("group-protocol", str);
        hashMap.put("topic", "test-topic");
        hashMap.put("partition", "0");
        Metrics metrics = brokerWithMember.broker().metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.metrics().get(metrics.metricName(ConsumerLagEmitter.ConsumerLagMetricName(), "tenant-metrics", hashMap));
        if (kafkaMetric == null) {
            return false;
        }
        return l.equals(kafkaMetric.metricValue());
    }

    private boolean verifyConsumerLagEmitterLatency(BrokerWithMember brokerWithMember) {
        Metrics metrics = brokerWithMember.broker().metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.metrics().get(metrics.metricName(ConsumerLagEmitter.ExecutionTimeMetricName(), ConsumerLagEmitter.MetricGroupName()));
        if (kafkaMetric == null) {
            return false;
        }
        return isMetricValueInRange(kafkaMetric);
    }

    private boolean verifyConsumerLagEmitterTimeSinceLastSuccessfulRun(BrokerWithMember brokerWithMember) {
        Metrics metrics = brokerWithMember.broker().metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.metrics().get(metrics.metricName(ConsumerLagEmitter.TimeSinceLastSuccessfulRunMsMetricName(), ConsumerLagEmitter.MetricGroupName()));
        if (kafkaMetric == null) {
            return false;
        }
        return isMetricValueInRange(kafkaMetric);
    }

    private boolean isMetricValueInRange(KafkaMetric kafkaMetric) {
        return ((Long) kafkaMetric.metricValue()).longValue() >= 0 && ((Long) kafkaMetric.metricValue()).longValue() < 1000;
    }
}
