package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNode.class */
public class ProcessorNode<K, V> {
    private final List<ProcessorNode<?, ?>> children;
    private final String name;
    private final Processor<K, V> processor;
    NodeMetrics nodeMetrics;
    private Time time;
    private K key;
    private V value;
    private Runnable processDelegate;
    private ProcessorContext context;
    private Runnable initDelegate;
    private Runnable closeDelegate;
    private long timestamp;
    private Runnable punctuateDelegate;
    public final Set<String> stateStores;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNode$NodeMetrics.class */
    protected static final class NodeMetrics {
        final StreamsMetricsImpl metrics;
        final Sensor nodeProcessTimeSensor;
        final Sensor nodePunctuateTimeSensor;
        final Sensor sourceNodeForwardSensor;
        final Sensor nodeCreationSensor;
        final Sensor nodeDestructionSensor;

        public NodeMetrics(StreamsMetrics streamsMetrics, String str, ProcessorContext processorContext) {
            String taskId = processorContext.taskId().toString();
            this.metrics = (StreamsMetricsImpl) streamsMetrics;
            this.nodeProcessTimeSensor = streamsMetrics.addLatencyAndThroughputSensor("processor-node", str, "process", Sensor.RecordingLevel.DEBUG, "task-id", taskId);
            this.nodePunctuateTimeSensor = streamsMetrics.addLatencyAndThroughputSensor("processor-node", str, "punctuate", Sensor.RecordingLevel.DEBUG, "task-id", taskId);
            this.nodeCreationSensor = streamsMetrics.addLatencyAndThroughputSensor("processor-node", str, "create", Sensor.RecordingLevel.DEBUG, "task-id", taskId);
            this.nodeDestructionSensor = streamsMetrics.addLatencyAndThroughputSensor("processor-node", str, "destroy", Sensor.RecordingLevel.DEBUG, "task-id", taskId);
            this.sourceNodeForwardSensor = streamsMetrics.addThroughputSensor("processor-node", str, "forward", Sensor.RecordingLevel.DEBUG, "task-id", taskId);
        }

        public void removeAllSensors() {
            this.metrics.removeSensor(this.nodeProcessTimeSensor);
            this.metrics.removeSensor(this.nodePunctuateTimeSensor);
            this.metrics.removeSensor(this.sourceNodeForwardSensor);
            this.metrics.removeSensor(this.nodeCreationSensor);
            this.metrics.removeSensor(this.nodeDestructionSensor);
        }
    }

    public ProcessorNode(String str) {
        this(str, null, null);
    }

    public ProcessorNode(String str, Processor<K, V> processor, Set<String> set) {
        this.processDelegate = new Runnable() { // from class: org.apache.kafka.streams.processor.internals.ProcessorNode.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.lang.Runnable
            public void run() {
                ProcessorNode.this.processor.process(ProcessorNode.this.key, ProcessorNode.this.value);
            }
        };
        this.initDelegate = new Runnable() { // from class: org.apache.kafka.streams.processor.internals.ProcessorNode.2
            @Override // java.lang.Runnable
            public void run() {
                if (ProcessorNode.this.processor != null) {
                    ProcessorNode.this.processor.init(ProcessorNode.this.context);
                }
            }
        };
        this.closeDelegate = new Runnable() { // from class: org.apache.kafka.streams.processor.internals.ProcessorNode.3
            @Override // java.lang.Runnable
            public void run() {
                if (ProcessorNode.this.processor != null) {
                    ProcessorNode.this.processor.close();
                }
            }
        };
        this.punctuateDelegate = new Runnable() { // from class: org.apache.kafka.streams.processor.internals.ProcessorNode.4
            @Override // java.lang.Runnable
            public void run() {
                ProcessorNode.this.processor().punctuate(ProcessorNode.this.timestamp);
            }
        };
        this.name = str;
        this.processor = processor;
        this.children = new ArrayList();
        this.stateStores = set;
        this.time = new SystemTime();
    }

    public final String name() {
        return this.name;
    }

    public final Processor<K, V> processor() {
        return this.processor;
    }

    public final List<ProcessorNode<?, ?>> children() {
        return this.children;
    }

    public void addChild(ProcessorNode<?, ?> processorNode) {
        this.children.add(processorNode);
    }

    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        try {
            this.nodeMetrics = new NodeMetrics(processorContext.metrics(), this.name, processorContext);
            this.nodeMetrics.metrics.measureLatencyNs(this.time, this.initDelegate, this.nodeMetrics.nodeCreationSensor);
        } catch (Exception e) {
            throw new StreamsException(String.format("failed to initialize processor %s", this.name), e);
        }
    }

    public void close() {
        try {
            this.nodeMetrics.metrics.measureLatencyNs(this.time, this.closeDelegate, this.nodeMetrics.nodeDestructionSensor);
            this.nodeMetrics.removeAllSensors();
        } catch (Exception e) {
            throw new StreamsException(String.format("failed to close processor %s", this.name), e);
        }
    }

    public void process(K k, V v) {
        this.key = k;
        this.value = v;
        this.nodeMetrics.metrics.measureLatencyNs(this.time, this.processDelegate, this.nodeMetrics.nodeProcessTimeSensor);
    }

    public void punctuate(long j) {
        this.timestamp = j;
        this.nodeMetrics.metrics.measureLatencyNs(this.time, this.punctuateDelegate, this.nodeMetrics.nodePunctuateTimeSensor);
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder(str + this.name + ":\n");
        if (this.stateStores != null && !this.stateStores.isEmpty()) {
            sb.append(str).append("\tstates:\t\t[");
            Iterator<String> it = this.stateStores.iterator();
            while (it.hasNext()) {
                sb.append(it.next());
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }
}
