package org.apache.kafka.streams.kstream.internals.suppress;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.class */
public class KTableSuppressProcessorMetricsTest {
    private static final long ARBITRARY_LONG = 5;
    private static final TaskId TASK_ID = new TaskId(0, 0);
    private final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
    private final String threadId = Thread.currentThread().getName();
    private final MetricName evictionTotalMetricLatest = new MetricName("suppression-emit-total", "stream-processor-node-metrics", "The total number of emitted records from the suppression buffer", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("processor-node-id", "testNode")}));
    private final MetricName evictionRateMetricLatest = new MetricName("suppression-emit-rate", "stream-processor-node-metrics", "The average number of emitted records from the suppression buffer per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("processor-node-id", "testNode")}));
    private final MetricName bufferSizeAvgMetricLatest = new MetricName("suppression-buffer-size-avg", "stream-state-metrics", "The average size of buffered records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("in-memory-suppression-state-id", "test-store")}));
    private final MetricName bufferSizeMaxMetricLatest = new MetricName("suppression-buffer-size-max", "stream-state-metrics", "The maximum size of buffered records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("in-memory-suppression-state-id", "test-store")}));
    private final MetricName bufferCountAvgMetricLatest = new MetricName("suppression-buffer-count-avg", "stream-state-metrics", "The average count of buffered records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("in-memory-suppression-state-id", "test-store")}));
    private final MetricName bufferCountMaxMetricLatest = new MetricName("suppression-buffer-count-max", "stream-state-metrics", "The maximum count of buffered records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", TASK_ID.toString()), Utils.mkEntry("in-memory-suppression-state-id", "test-store")}));

    @Test
    public void shouldRecordMetricsWithBuiltInMetricsVersionLatest() {
        StateStore build = new InMemoryTimeOrderedKeyValueBuffer.Builder("test-store", Serdes.String(), Serdes.Long()).withLoggingDisabled().build();
        Processor processor = new KTableSuppressProcessorSupplier(Suppressed.untilTimeLimit(Duration.ofDays(100L), Suppressed.BufferConfig.maxRecords(1L)), "test-store", (KTableImpl) EasyMock.mock(KTableImpl.class)).get();
        this.streamsConfig.setProperty("built.in.metrics.version", "latest");
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext(this.streamsConfig, TASK_ID, TestUtils.tempDirectory());
        SystemTime systemTime = new SystemTime();
        mockInternalProcessorContext.setCurrentNode(new ProcessorNode("testNode"));
        mockInternalProcessorContext.setSystemTimeMs(systemTime.milliseconds());
        build.init(mockInternalProcessorContext, build);
        processor.init(mockInternalProcessorContext);
        mockInternalProcessorContext.setRecordMetadata("", 0, 0L, new RecordHeaders(), 100L);
        Change change = new Change((Object) null, Long.valueOf(ARBITRARY_LONG));
        processor.process("longKey", change);
        MetricName metricName = this.evictionRateMetricLatest;
        MetricName metricName2 = this.evictionTotalMetricLatest;
        MetricName metricName3 = this.bufferSizeAvgMetricLatest;
        MetricName metricName4 = this.bufferSizeMaxMetricLatest;
        MetricName metricName5 = this.bufferCountAvgMetricLatest;
        MetricName metricName6 = this.bufferCountMaxMetricLatest;
        Map metrics = mockInternalProcessorContext.m176metrics().metrics();
        verifyMetric(metrics, metricName, Is.is(Double.valueOf(0.0d)));
        verifyMetric(metrics, metricName2, Is.is(Double.valueOf(0.0d)));
        verifyMetric(metrics, metricName3, Is.is(Double.valueOf(21.5d)));
        verifyMetric(metrics, metricName4, Is.is(Double.valueOf(43.0d)));
        verifyMetric(metrics, metricName5, Is.is(Double.valueOf(0.5d)));
        verifyMetric(metrics, metricName6, Is.is(Double.valueOf(1.0d)));
        mockInternalProcessorContext.setRecordMetadata("", 0, 1L, new RecordHeaders(), 101L);
        processor.process("key", change);
        Map metrics2 = mockInternalProcessorContext.m176metrics().metrics();
        verifyMetric(metrics2, metricName, Matchers.greaterThan(Double.valueOf(0.0d)));
        verifyMetric(metrics2, metricName2, Is.is(Double.valueOf(1.0d)));
        verifyMetric(metrics2, metricName3, Is.is(Double.valueOf(41.0d)));
        verifyMetric(metrics2, metricName4, Is.is(Double.valueOf(82.0d)));
        verifyMetric(metrics2, metricName5, Is.is(Double.valueOf(1.0d)));
        verifyMetric(metrics2, metricName6, Is.is(Double.valueOf(2.0d)));
    }

    private static <T> void verifyMetric(Map<MetricName, ? extends Metric> map, MetricName metricName, Matcher<T> matcher) {
        MatcherAssert.assertThat(map.get(metricName).metricName().description(), Is.is(metricName.description()));
        MatcherAssert.assertThat(map.get(metricName).metricValue(), matcher);
    }
}
