/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.metrics;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.metrics.HotPartitionManager;
import io.confluent.kafka.multitenant.metrics.PartitionSensorBuilder;
import io.confluent.kafka.multitenant.metrics.PartitionSensors;
import io.confluent.kafka.multitenant.metrics.TenantMetrics;
import io.confluent.kafka.multitenant.metrics.utils.MetricSampler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

public class HotPartitionManagerTest {
    private Time time;
    private Metrics metrics;
    private HotPartitionManager hotPartitionManager;

    private Map<String, Object> getConfig(long brokerLimitInBytesPerSecond, long brokerLimitOutBytesPerSecond, double hotPartitionRatio) {
        return Map.of("confluent.broker.limit.producer.bytes.per.second", brokerLimitInBytesPerSecond, "confluent.broker.limit.consumer.bytes.per.second", brokerLimitOutBytesPerSecond, "confluent.hot.partition.ratio", hotPartitionRatio);
    }

    @BeforeEach
    public void setup() {
        this.time = new MockTime();
        this.hotPartitionManager = new HotPartitionManager((MetricSampler)new NoOpMetricSampler(), (MetricSampler)new NoOpMetricSampler());
        this.hotPartitionManager.configure(this.getConfig(10000L, 30000L, 0.8));
    }

    @AfterEach
    public void tearDown() {
        this.metrics.close();
    }

    private Map<String, String> tags(String tenant, TopicPartition tp) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("io-confluent-jmx-ignore", "");
        tags.put("tenant", tenant);
        tags.put("topic", tp.topic());
        tags.put("partition", Integer.toString(tp.partition()));
        return tags;
    }

    private KafkaMetric metric(String name, Map<String, String> tags) {
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(name, "tenant-metrics", tags));
    }

    private static class NoOpMetricSampler
    implements MetricSampler {
        private NoOpMetricSampler() {
        }

        public boolean shouldSample() {
            return true;
        }
    }

    @Nested
    public class SensorExpirationEnabled {
        @BeforeEach
        public void setup() {
            MetricConfig config = new MetricConfig().timeWindow(1L, TimeUnit.SECONDS);
            HotPartitionManagerTest.this.metrics = new Metrics(config, Collections.emptyList(), HotPartitionManagerTest.this.time, true);
        }

        @Test
        public void testHotPartitionMetricExpiration() throws InterruptedException {
            String tenant = "lkc-aaaaa";
            String clientId = "producer-1";
            String topic = tenant + "_topic-1";
            TenantMetrics.MetricsRequestContext context = new TenantMetrics.MetricsRequestContext(new MultiTenantPrincipal("userA", new TenantMetadata(tenant, tenant)), clientId, ApiKeys.PRODUCE);
            PartitionSensors partitionSensors = new PartitionSensorBuilder(HotPartitionManagerTest.this.metrics, context).build();
            TopicPartition tp = new TopicPartition(topic, 0);
            partitionSensors.recordStatsIn(tp, 8000L, 800L, HotPartitionManagerTest.this.time.milliseconds());
            HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, tp, HotPartitionManagerTest.this.time.milliseconds());
            HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, tp, HotPartitionManagerTest.this.time.milliseconds());
            KafkaMetric hotPartitionIn = HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, tp));
            Assertions.assertNotNull((Object)hotPartitionIn, (String)("Hot partition metric should exist for " + String.valueOf(tp)));
            Assertions.assertEquals((Object)1.0, (Object)hotPartitionIn.metricValue());
            HotPartitionManagerTest.this.time.sleep(TimeUnit.MINUTES.toMillis(10L));
            TestUtils.waitForCondition(() -> HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, tp)) == null, (long)60000L, (String)("Hot partition metric should have been removed for " + String.valueOf(tp)));
            partitionSensors.recordStatsIn(tp, 8000L, 800L, HotPartitionManagerTest.this.time.milliseconds());
            HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, tp, HotPartitionManagerTest.this.time.milliseconds());
            HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, tp, HotPartitionManagerTest.this.time.milliseconds());
            hotPartitionIn = HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, tp));
            Assertions.assertNotNull((Object)hotPartitionIn, (String)("Hot partition metric should exist for " + String.valueOf(tp)));
            Assertions.assertEquals((Object)1.0, (Object)hotPartitionIn.metricValue());
        }
    }

    @Nested
    public class SensorExpirationDisabled {
        @BeforeEach
        public void setup() {
            MetricConfig config = new MetricConfig().timeWindow(1L, TimeUnit.SECONDS);
            HotPartitionManagerTest.this.metrics = new Metrics(config, HotPartitionManagerTest.this.time);
        }

        @Test
        public void testDoNotRecordIngressWhenHotPartitionMetricIsDisabled() {
            HotPartitionManagerTest.this.hotPartitionManager.configure(HotPartitionManagerTest.this.getConfig(10000L, 30000L, -1.0));
            String tenant = "lkc-aaaaa";
            String clientId = "producer-1";
            String topic = tenant + "_topic-1";
            TenantMetrics.MetricsRequestContext context = new TenantMetrics.MetricsRequestContext(new MultiTenantPrincipal("userA", new TenantMetadata(tenant, tenant)), clientId, ApiKeys.PRODUCE);
            PartitionSensors partitionSensors = new PartitionSensorBuilder(HotPartitionManagerTest.this.metrics, context).build();
            for (int i = 0; i < 10; ++i) {
                TopicPartition tp = new TopicPartition(topic, i);
                partitionSensors.recordStatsIn(tp, (long)(1000 * i), (long)(100 * i), HotPartitionManagerTest.this.time.milliseconds());
                HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, tp, HotPartitionManagerTest.this.time.milliseconds());
                KafkaMetric hotPartitionIn = HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, tp));
                Assertions.assertNull((Object)hotPartitionIn, (String)("Hot partition metric should not exist for " + String.valueOf(tp)));
            }
        }

        @Test
        public void testDoNotRecordEgressWhenHotPartitionMetricIsDisabled() {
            HotPartitionManagerTest.this.hotPartitionManager.configure(HotPartitionManagerTest.this.getConfig(10000L, 30000L, -1.0));
            String tenant = "lkc-aaaaa";
            String clientId = "consumer-1";
            String topic = tenant + "_topic-1";
            TenantMetrics.MetricsRequestContext context = new TenantMetrics.MetricsRequestContext(new MultiTenantPrincipal("userA", new TenantMetadata(tenant, tenant)), clientId, ApiKeys.FETCH);
            PartitionSensors partitionSensors = new PartitionSensorBuilder(HotPartitionManagerTest.this.metrics, context).build();
            for (int i = 0; i < 10; ++i) {
                TopicPartition tp = new TopicPartition(topic, i);
                partitionSensors.recordStatsOut(tp, (long)(3000 * i), (long)(100 * i), HotPartitionManagerTest.this.time.milliseconds());
                HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionOut(HotPartitionManagerTest.this.metrics, context, tp, HotPartitionManagerTest.this.time.milliseconds());
                KafkaMetric hotPartitionOut = HotPartitionManagerTest.this.metric("hot-partition-out", HotPartitionManagerTest.this.tags(tenant, tp));
                Assertions.assertNull((Object)hotPartitionOut, (String)("Hot partition metric should not exist for " + String.valueOf(tp)));
            }
        }

        @Test
        public void testRecordForIngressWiseHotPartitionsOnly() {
            KafkaMetric hotPartitionIn;
            TopicPartition tp;
            int i;
            String tenant = "lkc-aaaaa";
            String clientId = "producer-1";
            String topic = tenant + "_topic-1";
            TenantMetrics.MetricsRequestContext context = new TenantMetrics.MetricsRequestContext(new MultiTenantPrincipal("userA", new TenantMetadata(tenant, tenant)), clientId, ApiKeys.PRODUCE);
            PartitionSensors partitionSensors = new PartitionSensorBuilder(HotPartitionManagerTest.this.metrics, context).build();
            for (i = 0; i < 10; ++i) {
                tp = new TopicPartition(topic, i);
                partitionSensors.recordStatsIn(tp, (long)(1000 * i), (long)(100 * i), HotPartitionManagerTest.this.time.milliseconds());
                HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, tp, HotPartitionManagerTest.this.time.milliseconds());
            }
            for (i = 0; i < 8; ++i) {
                tp = new TopicPartition(topic, i);
                hotPartitionIn = HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, tp));
                Assertions.assertNull((Object)hotPartitionIn, (String)("Hot partition metric should not exist for " + String.valueOf(tp)));
            }
            for (i = 8; i < 10; ++i) {
                tp = new TopicPartition(topic, i);
                hotPartitionIn = HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, tp));
                Assertions.assertNotNull((Object)hotPartitionIn, (String)("Hot partition metric should exist for " + String.valueOf(tp)));
                Assertions.assertEquals((Object)1.0, (Object)hotPartitionIn.metricValue());
            }
            TopicPartition partitionWithoutTraffic = new TopicPartition(topic, 11);
            HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionIn(HotPartitionManagerTest.this.metrics, (TenantMetrics.TenantMetricsContext)context, partitionWithoutTraffic, HotPartitionManagerTest.this.time.milliseconds());
            KafkaMetric hotPartitionIn2 = HotPartitionManagerTest.this.metric("hot-partition-in", HotPartitionManagerTest.this.tags(tenant, partitionWithoutTraffic));
            Assertions.assertNull((Object)hotPartitionIn2, (String)("Hot partition metric should not exist for " + String.valueOf(partitionWithoutTraffic)));
        }

        @Test
        public void testRecordForEgressWiseHotPartitionsOnly() {
            KafkaMetric hotPartitionOut;
            TopicPartition tp;
            int i;
            String tenant = "lkc-aaaaa";
            String clientId = "consumer-1";
            String topic = tenant + "_topic-1";
            TenantMetrics.MetricsRequestContext context = new TenantMetrics.MetricsRequestContext(new MultiTenantPrincipal("userA", new TenantMetadata(tenant, tenant)), clientId, ApiKeys.FETCH);
            PartitionSensors partitionSensors = new PartitionSensorBuilder(HotPartitionManagerTest.this.metrics, context).build();
            for (i = 0; i < 10; ++i) {
                tp = new TopicPartition(topic, i);
                partitionSensors.recordStatsOut(tp, (long)(3000 * i), (long)(100 * i), HotPartitionManagerTest.this.time.milliseconds());
                HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionOut(HotPartitionManagerTest.this.metrics, context, tp, HotPartitionManagerTest.this.time.milliseconds());
            }
            for (i = 0; i < 8; ++i) {
                tp = new TopicPartition(topic, i);
                hotPartitionOut = HotPartitionManagerTest.this.metric("hot-partition-out", HotPartitionManagerTest.this.tags(tenant, tp));
                Assertions.assertNull((Object)hotPartitionOut, (String)("Hot partition metric should not exist for " + String.valueOf(tp)));
            }
            for (i = 8; i < 10; ++i) {
                tp = new TopicPartition(topic, i);
                hotPartitionOut = HotPartitionManagerTest.this.metric("hot-partition-out", HotPartitionManagerTest.this.tags(tenant, tp));
                Assertions.assertNotNull((Object)hotPartitionOut, (String)("Hot partition metric should exist for " + String.valueOf(tp)));
                Assertions.assertEquals((Object)1.0, (Object)hotPartitionOut.metricValue());
            }
            TopicPartition partitionWithoutTraffic = new TopicPartition(topic, 11);
            HotPartitionManagerTest.this.hotPartitionManager.mayRecordHotPartitionOut(HotPartitionManagerTest.this.metrics, context, partitionWithoutTraffic, HotPartitionManagerTest.this.time.milliseconds());
            KafkaMetric hotPartitionOut2 = HotPartitionManagerTest.this.metric("hot-partition-out", HotPartitionManagerTest.this.tags(tenant, partitionWithoutTraffic));
            Assertions.assertNull((Object)hotPartitionOut2, (String)("Hot partition metric should not exist for " + String.valueOf(partitionWithoutTraffic)));
        }
    }
}

