package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest.class */
public class ProcessorNodeTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$ExceptionalProcessor.class */
    private static class ExceptionalProcessor implements Processor {
        private ExceptionalProcessor() {
        }

        public void init(ProcessorContext processorContext) {
            throw new RuntimeException();
        }

        public void process(Object obj, Object obj2) {
            throw new RuntimeException();
        }

        public void close() {
            throw new RuntimeException();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$NoOpProcessor.class */
    private static class NoOpProcessor implements Processor<Object, Object> {
        private NoOpProcessor() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public void process(Object obj, Object obj2) {
        }

        public void close() {
        }
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
        new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet()).init((InternalProcessorContext) null);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
        new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet()).close();
    }

    @Test
    public void testMetrics() {
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
        Metrics metrics = new Metrics();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(withBuiltinTypes, new RecordCollectorImpl((String) null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler(), metrics.sensor("skipped-records")), metrics);
        ProcessorNode processorNode = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
        processorNode.init(internalMockProcessorContext);
        String[] strArr = {"process", "punctuate", "create", "destroy"};
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("processor-node-id", processorNode.name());
        linkedHashMap.put("task-id", internalMockProcessorContext.taskId().toString());
        linkedHashMap.put("client-id", "mock");
        for (String str : strArr) {
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str + "-latency-avg", "stream-processor-node-metrics", linkedHashMap);
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str + "-latency-max", "stream-processor-node-metrics", linkedHashMap);
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str + "-rate", "stream-processor-node-metrics", linkedHashMap);
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str + "-total", "stream-processor-node-metrics", linkedHashMap);
        }
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("forward-rate", "stream-processor-node-metrics", "The average number of occurrence of forward operation per second.", linkedHashMap)));
        linkedHashMap.put("processor-node-id", "all");
        for (String str2 : strArr) {
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str2 + "-latency-avg", "stream-processor-node-metrics", linkedHashMap);
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str2 + "-latency-max", "stream-processor-node-metrics", linkedHashMap);
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str2 + "-rate", "stream-processor-node-metrics", linkedHashMap);
            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), str2 + "-total", "stream-processor-node-metrics", linkedHashMap);
        }
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("forward-rate", "stream-processor-node-metrics", "The average number of occurrence of forward operation per second.", linkedHashMap)));
        JmxReporter jmxReporter = new JmxReporter("kafka.streams");
        metrics.addReporter(jmxReporter);
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=%s,client-id=mock,task-id=%s,processor-node-id=%s", "stream-processor-node-metrics", internalMockProcessorContext.taskId().toString(), processorNode.name())));
    }
}
