package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/MetricsIntegrationTest.class */
public class MetricsIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private final long timeout = IntegrationTestUtils.DEFAULT_TIMEOUT;
    private static final String APPLICATION_ID_VALUE = "stream-metrics-test";
    private static final String STREAM_CLIENT_NODE_METRICS = "stream-metrics";
    private static final String STREAM_THREAD_NODE_METRICS = "stream-metrics";
    private static final String STREAM_TASK_NODE_METRICS = "stream-task-metrics";
    private static final String STREAM_PROCESSOR_NODE_METRICS = "stream-processor-node-metrics";
    private static final String STREAM_CACHE_NODE_METRICS = "stream-record-cache-metrics";
    private static final String STREAM_STORE_IN_MEMORY_STATE_METRICS = "stream-in-memory-state-metrics";
    private static final String STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS = "stream-in-memory-lru-state-metrics";
    private static final String STREAM_STORE_ROCKSDB_STATE_METRICS = "stream-rocksdb-state-metrics";
    private static final String STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS = "stream-rocksdb-window-state-metrics";
    private static final String STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS = "stream-rocksdb-session-state-metrics";
    private static final String VERSION = "version";
    private static final String COMMIT_ID = "commit-id";
    private static final String APPLICATION_ID = "application-id";
    private static final String TOPOLOGY_DESCRIPTION = "topology-description";
    private static final String STATE = "state";
    private static final String PUT_LATENCY_AVG = "put-latency-avg";
    private static final String PUT_LATENCY_MAX = "put-latency-max";
    private static final String PUT_IF_ABSENT_LATENCY_AVG = "put-if-absent-latency-avg";
    private static final String PUT_IF_ABSENT_LATENCY_MAX = "put-if-absent-latency-max";
    private static final String GET_LATENCY_AVG = "get-latency-avg";
    private static final String GET_LATENCY_MAX = "get-latency-max";
    private static final String DELETE_LATENCY_AVG = "delete-latency-avg";
    private static final String DELETE_LATENCY_MAX = "delete-latency-max";
    private static final String PUT_ALL_LATENCY_AVG = "put-all-latency-avg";
    private static final String PUT_ALL_LATENCY_MAX = "put-all-latency-max";
    private static final String ALL_LATENCY_AVG = "all-latency-avg";
    private static final String ALL_LATENCY_MAX = "all-latency-max";
    private static final String RANGE_LATENCY_AVG = "range-latency-avg";
    private static final String RANGE_LATENCY_MAX = "range-latency-max";
    private static final String FLUSH_LATENCY_AVG = "flush-latency-avg";
    private static final String FLUSH_LATENCY_MAX = "flush-latency-max";
    private static final String RESTORE_LATENCY_AVG = "restore-latency-avg";
    private static final String RESTORE_LATENCY_MAX = "restore-latency-max";
    private static final String PUT_RATE = "put-rate";
    private static final String PUT_TOTAL = "put-total";
    private static final String PUT_IF_ABSENT_RATE = "put-if-absent-rate";
    private static final String PUT_IF_ABSENT_TOTAL = "put-if-absent-total";
    private static final String GET_RATE = "get-rate";
    private static final String DELETE_RATE = "delete-rate";
    private static final String DELETE_TOTAL = "delete-total";
    private static final String PUT_ALL_RATE = "put-all-rate";
    private static final String PUT_ALL_TOTAL = "put-all-total";
    private static final String ALL_RATE = "all-rate";
    private static final String ALL_TOTAL = "all-total";
    private static final String RANGE_RATE = "range-rate";
    private static final String RANGE_TOTAL = "range-total";
    private static final String FLUSH_RATE = "flush-rate";
    private static final String FLUSH_TOTAL = "flush-total";
    private static final String RESTORE_RATE = "restore-rate";
    private static final String RESTORE_TOTAL = "restore-total";
    private static final String PROCESS_LATENCY_AVG = "process-latency-avg";
    private static final String PROCESS_LATENCY_MAX = "process-latency-max";
    private static final String PUNCTUATE_LATENCY_AVG = "punctuate-latency-avg";
    private static final String PUNCTUATE_LATENCY_MAX = "punctuate-latency-max";
    private static final String CREATE_LATENCY_AVG = "create-latency-avg";
    private static final String CREATE_LATENCY_MAX = "create-latency-max";
    private static final String DESTROY_LATENCY_AVG = "destroy-latency-avg";
    private static final String DESTROY_LATENCY_MAX = "destroy-latency-max";
    private static final String PROCESS_RATE = "process-rate";
    private static final String PROCESS_TOTAL = "process-total";
    private static final String PUNCTUATE_RATE = "punctuate-rate";
    private static final String PUNCTUATE_TOTAL = "punctuate-total";
    private static final String CREATE_RATE = "create-rate";
    private static final String CREATE_TOTAL = "create-total";
    private static final String DESTROY_RATE = "destroy-rate";
    private static final String DESTROY_TOTAL = "destroy-total";
    private static final String FORWARD_TOTAL = "forward-total";
    private static final String STREAM_STRING = "stream";
    private static final String COMMIT_LATENCY_AVG = "commit-latency-avg";
    private static final String COMMIT_LATENCY_MAX = "commit-latency-max";
    private static final String POLL_LATENCY_AVG = "poll-latency-avg";
    private static final String POLL_LATENCY_MAX = "poll-latency-max";
    private static final String COMMIT_RATE = "commit-rate";
    private static final String COMMIT_TOTAL = "commit-total";
    private static final String POLL_RATE = "poll-rate";
    private static final String POLL_TOTAL = "poll-total";
    private static final String TASK_CREATED_RATE = "task-created-rate";
    private static final String TASK_CREATED_TOTAL = "task-created-total";
    private static final String TASK_CLOSED_RATE = "task-closed-rate";
    private static final String TASK_CLOSED_TOTAL = "task-closed-total";
    private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
    private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
    private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
    private static final String RECORD_LATENESS_MAX = "record-lateness-max";
    private static final String HIT_RATIO_AVG_BEFORE_24 = "hitRatio-avg";
    private static final String HIT_RATIO_MIN_BEFORE_24 = "hitRatio-min";
    private static final String HIT_RATIO_MAX_BEFORE_24 = "hitRatio-max";
    private static final String HIT_RATIO_AVG = "hit-ratio-avg";
    private static final String HIT_RATIO_MIN = "hit-ratio-min";
    private static final String HIT_RATIO_MAX = "hit-ratio-max";
    private static final String BYTES_WRITTEN_RATE = "bytes-written-rate";
    private static final String BYTES_WRITTEN_TOTAL = "bytes-written-total";
    private static final String BYTES_READ_RATE = "bytes-read-rate";
    private static final String BYTES_READ_TOTAL = "bytes-read-total";
    private static final String MEMTABLE_BYTES_FLUSHED_RATE = "memtable-bytes-flushed-rate";
    private static final String MEMTABLE_BYTES_FLUSHED_TOTAL = "memtable-bytes-flushed-total";
    private static final String MEMTABLE_HIT_RATIO = "memtable-hit-ratio";
    private static final String MEMTABLE_FLUSH_TIME_AVG = "memtable-flush-time-avg";
    private static final String MEMTABLE_FLUSH_TIME_MIN = "memtable-flush-time-min";
    private static final String MEMTABLE_FLUSH_TIME_MAX = "memtable-flush-time-max";
    private static final String WRITE_STALL_DURATION_AVG = "write-stall-duration-avg";
    private static final String WRITE_STALL_DURATION_TOTAL = "write-stall-duration-total";
    private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit-ratio";
    private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit-ratio";
    private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit-ratio";
    private static final String BYTES_READ_DURING_COMPACTION_RATE = "bytes-read-compaction-rate";
    private static final String BYTES_WRITTEN_DURING_COMPACTION_RATE = "bytes-written-compaction-rate";
    private static final String COMPACTION_TIME_AVG = "compaction-time-avg";
    private static final String COMPACTION_TIME_MIN = "compaction-time-min";
    private static final String COMPACTION_TIME_MAX = "compaction-time-max";
    private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
    private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
    private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
    private static final String SESSION_AGGREGATED_STREAM_STORE = "session-aggregated-stream-store";
    private static final String MY_STORE_IN_MEMORY = "myStoreInMemory";
    private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
    private static final String MY_STORE_LRU_MAP = "myStoreLruMap";
    private static final String STREAM_INPUT = "STREAM_INPUT";
    private static final String STREAM_OUTPUT_1 = "STREAM_OUTPUT_1";
    private static final String STREAM_OUTPUT_2 = "STREAM_OUTPUT_2";
    private static final String STREAM_OUTPUT_3 = "STREAM_OUTPUT_3";
    private static final String STREAM_OUTPUT_4 = "STREAM_OUTPUT_4";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", APPLICATION_ID_VALUE);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name);
        this.streamsConfiguration.put("cache.max.bytes.buffering", 10485760L);
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
    }

    @After
    public void after() throws InterruptedException {
        CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
    }

    private void startApplication() throws InterruptedException {
        Topology build = this.builder.build();
        this.kafkaStreams = new KafkaStreams(build, this.streamsConfiguration);
        verifyStateMetric(KafkaStreams.State.CREATED);
        verifyTopologyDescriptionMetric(build.describe().toString());
        verifyApplicationIdMetric(APPLICATION_ID_VALUE);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.kafkaStreams.state() == KafkaStreams.State.RUNNING;
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, () -> {
            return "Kafka Streams application did not reach state RUNNING in 60000 ms";
        });
    }

    private void produceRecordsForTwoSegments(Duration duration) throws Exception {
        MockTime mockTime = new MockTime(Math.max(duration.toMillis(), IntegrationTestUtils.DEFAULT_TIMEOUT));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(mockTime.milliseconds()));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(STREAM_INPUT, Collections.singletonList(new KeyValue(Integer.valueOf(NUM_BROKERS), "B")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(mockTime.milliseconds()));
    }

    private void waitUntilAllRecordsAreConsumed() throws Exception {
        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), "consumerApp", LongDeserializer.class, LongDeserializer.class, new Properties()), STREAM_OUTPUT_1, 2);
    }

    private void closeApplication() throws Exception {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        TestUtils.waitForCondition(() -> {
            return this.kafkaStreams.state() == KafkaStreams.State.NOT_RUNNING;
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, () -> {
            return "Kafka Streams application did not reach state NOT_RUNNING in 60000 ms";
        });
    }

    @Test
    public void shouldAddMetricsOnAllLevelsWithBuiltInMetricsVersion0100To23() throws Exception {
        shouldAddMetricsOnAllLevels("0.10.0-2.3");
    }

    private void shouldAddMetricsOnAllLevels(String str) throws Exception {
        this.builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())).to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
        this.builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_2);
        this.builder.table(STREAM_OUTPUT_2, Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_3);
        this.builder.table(STREAM_OUTPUT_3, Materialized.as(Stores.lruMap(MY_STORE_LRU_MAP, 10000)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_4);
        startApplication();
        verifyStateMetric(KafkaStreams.State.RUNNING);
        checkThreadLevelMetrics();
        checkTaskLevelMetrics();
        checkProcessorLevelMetrics();
        checkKeyValueStoreMetricsByGroup(STREAM_STORE_IN_MEMORY_STATE_METRICS);
        checkKeyValueStoreMetricsByGroup(STREAM_STORE_ROCKSDB_STATE_METRICS);
        checkKeyValueStoreMetricsByGroup(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS);
        checkRocksDBMetricsByTag("rocksdb-state-id");
        checkCacheMetrics(str);
        closeApplication();
        checkMetricsDeregistration();
    }

    @Test
    public void shouldAddMetricsForWindowStore() throws Exception {
        Duration ofMillis = Duration.ofMillis(50L);
        this.builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())).groupByKey().windowedBy(TimeWindows.of(ofMillis).grace(Duration.ZERO)).aggregate(() -> {
            return 0L;
        }, (num, str, l) -> {
            return l;
        }, Materialized.as(TIME_WINDOWED_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()).withRetention(ofMillis)).toStream().map((windowed, l2) -> {
            return KeyValue.pair(l2, l2);
        }).to(STREAM_OUTPUT_1, Produced.with(Serdes.Long(), Serdes.Long()));
        produceRecordsForTwoSegments(ofMillis);
        startApplication();
        verifyStateMetric(KafkaStreams.State.RUNNING);
        waitUntilAllRecordsAreConsumed();
        checkWindowStoreMetrics();
        checkRocksDBMetricsByTag("rocksdb-window-state-id");
        closeApplication();
        checkMetricsDeregistration();
    }

    @Test
    public void shouldAddMetricsForSessionStore() throws Exception {
        Duration ofMillis = Duration.ofMillis(50L);
        this.builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())).groupByKey().windowedBy(SessionWindows.with(ofMillis).grace(Duration.ZERO)).aggregate(() -> {
            return 0L;
        }, (num, str, l) -> {
            return l;
        }, (num2, l2, l3) -> {
            return l2;
        }, Materialized.as(SESSION_AGGREGATED_STREAM_STORE).withValueSerde(Serdes.Long()).withRetention(ofMillis)).toStream().map((windowed, l4) -> {
            return KeyValue.pair(l4, l4);
        }).to(STREAM_OUTPUT_1, Produced.with(Serdes.Long(), Serdes.Long()));
        produceRecordsForTwoSegments(ofMillis);
        startApplication();
        verifyStateMetric(KafkaStreams.State.RUNNING);
        waitUntilAllRecordsAreConsumed();
        checkSessionStoreMetrics();
        checkRocksDBMetricsByTag("rocksdb-session-state-id");
        closeApplication();
        checkMetricsDeregistration();
    }

    @Test
    public void shouldNotAddRocksDBMetricsIfRecordingLevelIsInfo() throws Exception {
        this.builder.table(STREAM_INPUT, Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()).toStream().to(STREAM_OUTPUT_1);
        this.streamsConfiguration.put("metrics.recording.level", Sensor.RecordingLevel.INFO.name);
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> {
            return this.kafkaStreams.state() == KafkaStreams.State.RUNNING;
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, () -> {
            return "Kafka Streams application did not reach state RUNNING in 60000 ms";
        });
        Assert.assertTrue(((List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals("stream-state-metrics") && metric.metricName().tags().containsKey("rocksdb-state-id");
        }).collect(Collectors.toList())).isEmpty());
        closeApplication();
    }

    private void verifyStateMetric(KafkaStreams.State state) {
        List list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().name().equals(STATE) && metric.metricName().group().equals("stream-metrics");
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(Integer.valueOf(NUM_BROKERS)));
        MatcherAssert.assertThat(((Metric) list.get(0)).metricValue(), CoreMatchers.is(state));
    }

    private void verifyTopologyDescriptionMetric(String str) {
        List list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().name().equals(TOPOLOGY_DESCRIPTION) && metric.metricName().group().equals("stream-metrics");
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(Integer.valueOf(NUM_BROKERS)));
        MatcherAssert.assertThat(((Metric) list.get(0)).metricValue(), CoreMatchers.is(str));
    }

    private void verifyApplicationIdMetric(String str) {
        List list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().name().equals(APPLICATION_ID) && metric.metricName().group().equals("stream-metrics");
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(Integer.valueOf(NUM_BROKERS)));
        MatcherAssert.assertThat(((Metric) list.get(0)).metricValue(), CoreMatchers.is(str));
    }

    private void checkThreadLevelMetrics() {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals("stream-metrics");
        }).collect(Collectors.toList());
        checkMetricByName(list, VERSION, NUM_BROKERS);
        checkMetricByName(list, COMMIT_ID, NUM_BROKERS);
        checkMetricByName(list, APPLICATION_ID, NUM_BROKERS);
        checkMetricByName(list, TOPOLOGY_DESCRIPTION, NUM_BROKERS);
        checkMetricByName(list, STATE, NUM_BROKERS);
        checkMetricByName(list, COMMIT_LATENCY_AVG, NUM_BROKERS);
        checkMetricByName(list, COMMIT_LATENCY_MAX, NUM_BROKERS);
        checkMetricByName(list, POLL_LATENCY_AVG, NUM_BROKERS);
        checkMetricByName(list, POLL_LATENCY_MAX, NUM_BROKERS);
        checkMetricByName(list, PROCESS_LATENCY_AVG, NUM_BROKERS);
        checkMetricByName(list, PROCESS_LATENCY_MAX, NUM_BROKERS);
        checkMetricByName(list, PUNCTUATE_LATENCY_AVG, NUM_BROKERS);
        checkMetricByName(list, PUNCTUATE_LATENCY_MAX, NUM_BROKERS);
        checkMetricByName(list, COMMIT_RATE, NUM_BROKERS);
        checkMetricByName(list, COMMIT_TOTAL, NUM_BROKERS);
        checkMetricByName(list, POLL_RATE, NUM_BROKERS);
        checkMetricByName(list, POLL_TOTAL, NUM_BROKERS);
        checkMetricByName(list, PROCESS_RATE, NUM_BROKERS);
        checkMetricByName(list, PROCESS_TOTAL, NUM_BROKERS);
        checkMetricByName(list, PUNCTUATE_RATE, NUM_BROKERS);
        checkMetricByName(list, PUNCTUATE_TOTAL, NUM_BROKERS);
        checkMetricByName(list, TASK_CREATED_RATE, NUM_BROKERS);
        checkMetricByName(list, TASK_CREATED_TOTAL, NUM_BROKERS);
        checkMetricByName(list, TASK_CLOSED_RATE, NUM_BROKERS);
        checkMetricByName(list, TASK_CLOSED_TOTAL, NUM_BROKERS);
        checkMetricByName(list, SKIPPED_RECORDS_RATE, NUM_BROKERS);
        checkMetricByName(list, SKIPPED_RECORDS_TOTAL, NUM_BROKERS);
    }

    private void checkTaskLevelMetrics() {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(STREAM_TASK_NODE_METRICS);
        }).collect(Collectors.toList());
        checkMetricByName(list, COMMIT_LATENCY_AVG, 5);
        checkMetricByName(list, COMMIT_LATENCY_MAX, 5);
        checkMetricByName(list, COMMIT_RATE, 5);
        checkMetricByName(list, COMMIT_TOTAL, 5);
        checkMetricByName(list, RECORD_LATENESS_AVG, 4);
        checkMetricByName(list, RECORD_LATENESS_MAX, 4);
    }

    private void checkProcessorLevelMetrics() {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS);
        }).collect(Collectors.toList());
        checkMetricByName(list, PROCESS_LATENCY_AVG, 18);
        checkMetricByName(list, PROCESS_LATENCY_MAX, 18);
        checkMetricByName(list, PUNCTUATE_LATENCY_AVG, 18);
        checkMetricByName(list, PUNCTUATE_LATENCY_MAX, 18);
        checkMetricByName(list, CREATE_LATENCY_AVG, 18);
        checkMetricByName(list, CREATE_LATENCY_MAX, 18);
        checkMetricByName(list, DESTROY_LATENCY_AVG, 18);
        checkMetricByName(list, DESTROY_LATENCY_MAX, 18);
        checkMetricByName(list, PROCESS_RATE, 18);
        checkMetricByName(list, PROCESS_TOTAL, 18);
        checkMetricByName(list, PUNCTUATE_RATE, 18);
        checkMetricByName(list, PUNCTUATE_TOTAL, 18);
        checkMetricByName(list, CREATE_RATE, 18);
        checkMetricByName(list, CREATE_TOTAL, 18);
        checkMetricByName(list, DESTROY_RATE, 18);
        checkMetricByName(list, DESTROY_TOTAL, 18);
        checkMetricByName(list, FORWARD_TOTAL, 18);
    }

    private void checkRocksDBMetricsByTag(String str) {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals("stream-state-metrics") && metric.metricName().tags().containsKey(str);
        }).collect(Collectors.toList());
        checkMetricByName(list, BYTES_WRITTEN_RATE, NUM_BROKERS);
        checkMetricByName(list, BYTES_WRITTEN_TOTAL, NUM_BROKERS);
        checkMetricByName(list, BYTES_READ_RATE, NUM_BROKERS);
        checkMetricByName(list, BYTES_READ_TOTAL, NUM_BROKERS);
        checkMetricByName(list, MEMTABLE_BYTES_FLUSHED_RATE, NUM_BROKERS);
        checkMetricByName(list, MEMTABLE_BYTES_FLUSHED_TOTAL, NUM_BROKERS);
        checkMetricByName(list, MEMTABLE_HIT_RATIO, NUM_BROKERS);
        checkMetricByName(list, WRITE_STALL_DURATION_AVG, NUM_BROKERS);
        checkMetricByName(list, WRITE_STALL_DURATION_TOTAL, NUM_BROKERS);
        checkMetricByName(list, BLOCK_CACHE_DATA_HIT_RATIO, NUM_BROKERS);
        checkMetricByName(list, BLOCK_CACHE_INDEX_HIT_RATIO, NUM_BROKERS);
        checkMetricByName(list, BLOCK_CACHE_FILTER_HIT_RATIO, NUM_BROKERS);
        checkMetricByName(list, BYTES_READ_DURING_COMPACTION_RATE, NUM_BROKERS);
        checkMetricByName(list, BYTES_WRITTEN_DURING_COMPACTION_RATE, NUM_BROKERS);
        checkMetricByName(list, NUMBER_OF_OPEN_FILES, NUM_BROKERS);
        checkMetricByName(list, NUMBER_OF_FILE_ERRORS, NUM_BROKERS);
    }

    private void checkKeyValueStoreMetricsByGroup(String str) {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(str);
        }).collect(Collectors.toList());
        checkMetricByName(list, PUT_LATENCY_AVG, 2);
        checkMetricByName(list, PUT_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_IF_ABSENT_LATENCY_AVG, 2);
        checkMetricByName(list, PUT_IF_ABSENT_LATENCY_MAX, 2);
        checkMetricByName(list, GET_LATENCY_AVG, 2);
        checkMetricByName(list, GET_LATENCY_MAX, 2);
        checkMetricByName(list, DELETE_LATENCY_AVG, 2);
        checkMetricByName(list, DELETE_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_ALL_LATENCY_AVG, 2);
        checkMetricByName(list, PUT_ALL_LATENCY_MAX, 2);
        checkMetricByName(list, ALL_LATENCY_AVG, 2);
        checkMetricByName(list, ALL_LATENCY_MAX, 2);
        checkMetricByName(list, RANGE_LATENCY_AVG, 2);
        checkMetricByName(list, RANGE_LATENCY_MAX, 2);
        checkMetricByName(list, FLUSH_LATENCY_AVG, 2);
        checkMetricByName(list, FLUSH_LATENCY_MAX, 2);
        checkMetricByName(list, RESTORE_LATENCY_AVG, 2);
        checkMetricByName(list, RESTORE_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_RATE, 2);
        checkMetricByName(list, PUT_TOTAL, 2);
        checkMetricByName(list, PUT_IF_ABSENT_RATE, 2);
        checkMetricByName(list, PUT_IF_ABSENT_TOTAL, 2);
        checkMetricByName(list, GET_RATE, 2);
        checkMetricByName(list, DELETE_RATE, 2);
        checkMetricByName(list, DELETE_TOTAL, 2);
        checkMetricByName(list, PUT_ALL_RATE, 2);
        checkMetricByName(list, PUT_ALL_TOTAL, 2);
        checkMetricByName(list, ALL_RATE, 2);
        checkMetricByName(list, ALL_TOTAL, 2);
        checkMetricByName(list, RANGE_RATE, 2);
        checkMetricByName(list, RANGE_TOTAL, 2);
        checkMetricByName(list, FLUSH_RATE, 2);
        checkMetricByName(list, FLUSH_TOTAL, 2);
        checkMetricByName(list, RESTORE_RATE, 2);
        checkMetricByName(list, RESTORE_TOTAL, 2);
    }

    private void checkMetricsDeregistration() {
        MatcherAssert.assertThat(Integer.valueOf(((List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().contains(STREAM_STRING);
        }).collect(Collectors.toList())).size()), CoreMatchers.is(0));
    }

    private void checkCacheMetrics(String str) {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(STREAM_CACHE_NODE_METRICS);
        }).collect(Collectors.toList());
        checkMetricByName(list, str.equals("latest") ? HIT_RATIO_AVG : HIT_RATIO_AVG_BEFORE_24, str.equals("latest") ? 3 : 6);
        checkMetricByName(list, str.equals("latest") ? HIT_RATIO_MIN : HIT_RATIO_MIN_BEFORE_24, str.equals("latest") ? 3 : 6);
        checkMetricByName(list, str.equals("latest") ? HIT_RATIO_MAX : HIT_RATIO_MAX_BEFORE_24, str.equals("latest") ? 3 : 6);
    }

    private void checkWindowStoreMetrics() {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS);
        }).collect(Collectors.toList());
        checkMetricByName(list, PUT_LATENCY_AVG, 2);
        checkMetricByName(list, PUT_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_IF_ABSENT_LATENCY_AVG, 0);
        checkMetricByName(list, PUT_IF_ABSENT_LATENCY_MAX, 0);
        checkMetricByName(list, GET_LATENCY_AVG, 0);
        checkMetricByName(list, GET_LATENCY_MAX, 0);
        checkMetricByName(list, DELETE_LATENCY_AVG, 0);
        checkMetricByName(list, DELETE_LATENCY_MAX, 0);
        checkMetricByName(list, PUT_ALL_LATENCY_AVG, 0);
        checkMetricByName(list, PUT_ALL_LATENCY_MAX, 0);
        checkMetricByName(list, ALL_LATENCY_AVG, 0);
        checkMetricByName(list, ALL_LATENCY_MAX, 0);
        checkMetricByName(list, RANGE_LATENCY_AVG, 0);
        checkMetricByName(list, RANGE_LATENCY_MAX, 0);
        checkMetricByName(list, FLUSH_LATENCY_AVG, 2);
        checkMetricByName(list, FLUSH_LATENCY_MAX, 2);
        checkMetricByName(list, RESTORE_LATENCY_AVG, 2);
        checkMetricByName(list, RESTORE_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_RATE, 2);
        checkMetricByName(list, PUT_TOTAL, 2);
        checkMetricByName(list, PUT_IF_ABSENT_RATE, 0);
        checkMetricByName(list, PUT_IF_ABSENT_TOTAL, 0);
        checkMetricByName(list, GET_RATE, 0);
        checkMetricByName(list, DELETE_RATE, 0);
        checkMetricByName(list, DELETE_TOTAL, 0);
        checkMetricByName(list, PUT_ALL_RATE, 0);
        checkMetricByName(list, PUT_ALL_TOTAL, 0);
        checkMetricByName(list, ALL_RATE, 0);
        checkMetricByName(list, ALL_TOTAL, 0);
        checkMetricByName(list, RANGE_RATE, 0);
        checkMetricByName(list, RANGE_TOTAL, 0);
        checkMetricByName(list, FLUSH_RATE, 2);
        checkMetricByName(list, FLUSH_TOTAL, 2);
        checkMetricByName(list, RESTORE_RATE, 2);
        checkMetricByName(list, RESTORE_TOTAL, 2);
    }

    private void checkSessionStoreMetrics() {
        List<Metric> list = (List) new ArrayList(this.kafkaStreams.metrics().values()).stream().filter(metric -> {
            return metric.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS);
        }).collect(Collectors.toList());
        checkMetricByName(list, PUT_LATENCY_AVG, 2);
        checkMetricByName(list, PUT_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_IF_ABSENT_LATENCY_AVG, 0);
        checkMetricByName(list, PUT_IF_ABSENT_LATENCY_MAX, 0);
        checkMetricByName(list, GET_LATENCY_AVG, 0);
        checkMetricByName(list, GET_LATENCY_MAX, 0);
        checkMetricByName(list, DELETE_LATENCY_AVG, 0);
        checkMetricByName(list, DELETE_LATENCY_MAX, 0);
        checkMetricByName(list, PUT_ALL_LATENCY_AVG, 0);
        checkMetricByName(list, PUT_ALL_LATENCY_MAX, 0);
        checkMetricByName(list, ALL_LATENCY_AVG, 0);
        checkMetricByName(list, ALL_LATENCY_MAX, 0);
        checkMetricByName(list, RANGE_LATENCY_AVG, 0);
        checkMetricByName(list, RANGE_LATENCY_MAX, 0);
        checkMetricByName(list, FLUSH_LATENCY_AVG, 2);
        checkMetricByName(list, FLUSH_LATENCY_MAX, 2);
        checkMetricByName(list, RESTORE_LATENCY_AVG, 2);
        checkMetricByName(list, RESTORE_LATENCY_MAX, 2);
        checkMetricByName(list, PUT_RATE, 2);
        checkMetricByName(list, PUT_TOTAL, 2);
        checkMetricByName(list, PUT_IF_ABSENT_RATE, 0);
        checkMetricByName(list, PUT_IF_ABSENT_TOTAL, 0);
        checkMetricByName(list, GET_RATE, 0);
        checkMetricByName(list, DELETE_RATE, 0);
        checkMetricByName(list, DELETE_TOTAL, 0);
        checkMetricByName(list, PUT_ALL_RATE, 0);
        checkMetricByName(list, PUT_ALL_TOTAL, 0);
        checkMetricByName(list, ALL_RATE, 0);
        checkMetricByName(list, ALL_TOTAL, 0);
        checkMetricByName(list, RANGE_RATE, 0);
        checkMetricByName(list, RANGE_TOTAL, 0);
        checkMetricByName(list, FLUSH_RATE, 2);
        checkMetricByName(list, FLUSH_TOTAL, 2);
        checkMetricByName(list, RESTORE_RATE, 2);
        checkMetricByName(list, RESTORE_TOTAL, 2);
    }

    private void checkMetricByName(List<Metric> list, String str, int i) {
        List<Metric> list2 = (List) list.stream().filter(metric -> {
            return metric.metricName().name().equals(str);
        }).collect(Collectors.toList());
        Assert.assertEquals("Size of metrics of type:'" + str + "' must be equal to " + i + " but it's equal to " + list2.size(), i, list2.size());
        for (Metric metric2 : list2) {
            Assert.assertNotNull("Metric:'" + metric2.metricName() + "' must be not null", metric2.metricValue());
        }
    }
}
