/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

public class ProcessorNode<K, V> {
    private final List<ProcessorNode<?, ?>> children;
    private final Map<String, ProcessorNode<?, ?>> childByName;
    private NodeMetrics nodeMetrics;
    private final Processor<K, V> processor;
    private final String name;
    private final Time time;
    public final Set<String> stateStores;

    public ProcessorNode(String name) {
        this(name, null, null);
    }

    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
        this.name = name;
        this.processor = processor;
        this.children = new ArrayList();
        this.childByName = new HashMap();
        this.stateStores = stateStores;
        this.time = new SystemTime();
    }

    public final String name() {
        return this.name;
    }

    public final Processor<K, V> processor() {
        return this.processor;
    }

    public List<ProcessorNode<?, ?>> children() {
        return this.children;
    }

    ProcessorNode getChild(String childName) {
        return this.childByName.get(childName);
    }

    public void addChild(ProcessorNode<?, ?> child) {
        this.children.add(child);
        this.childByName.put(child.name, child);
    }

    public void init(InternalProcessorContext context) {
        try {
            this.nodeMetrics = new NodeMetrics(context.metrics(), this.name, context);
            long startNs = this.time.nanoseconds();
            if (this.processor != null) {
                this.processor.init(context);
            }
            this.nodeMetrics.nodeCreationSensor.record((double)(this.time.nanoseconds() - startNs));
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to initialize processor %s", this.name), e);
        }
    }

    public void close() {
        try {
            long startNs = this.time.nanoseconds();
            if (this.processor != null) {
                this.processor.close();
            }
            this.nodeMetrics.nodeDestructionSensor.record((double)(this.time.nanoseconds() - startNs));
            this.nodeMetrics.removeAllSensors();
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to close processor %s", this.name), e);
        }
    }

    public void process(K key, V value) {
        long startNs = this.time.nanoseconds();
        try {
            this.processor.process(key, value);
        }
        catch (ClassCastException e) {
            String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
            String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
            throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: %s, and value: %s.%nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.", keyClass, valueClass), e);
        }
        this.nodeMetrics.nodeProcessTimeSensor.record((double)(this.time.nanoseconds() - startNs));
    }

    public void punctuate(long timestamp, Punctuator punctuator) {
        long startNs = this.time.nanoseconds();
        punctuator.punctuate(timestamp);
        this.nodeMetrics.nodePunctuateTimeSensor.record((double)(this.time.nanoseconds() - startNs));
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder(indent + this.name + ":\n");
        if (this.stateStores != null && !this.stateStores.isEmpty()) {
            sb.append(indent).append("\tstates:\t\t[");
            for (String store : this.stateStores) {
                sb.append(store);
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    Sensor sourceNodeForwardSensor() {
        return this.nodeMetrics.sourceNodeForwardSensor;
    }

    private static final class NodeMetrics {
        private final StreamsMetricsImpl metrics;
        private final Sensor nodeProcessTimeSensor;
        private final Sensor nodePunctuateTimeSensor;
        private final Sensor sourceNodeForwardSensor;
        private final Sensor nodeCreationSensor;
        private final Sensor nodeDestructionSensor;
        private final String taskName;
        private final String processorNodeName;

        private NodeMetrics(StreamsMetricsImpl metrics, String processorNodeName, ProcessorContext context) {
            this.metrics = metrics;
            String threadId = Thread.currentThread().getName();
            String taskName = context.taskId().toString();
            Map<String, String> tagMap = metrics.nodeLevelTagMap(threadId, context.taskId().toString(), processorNodeName);
            Map<String, String> allTagMap = metrics.nodeLevelTagMap(threadId, context.taskId().toString(), "all");
            this.nodeProcessTimeSensor = NodeMetrics.createTaskAndNodeLatencyAndThroughputSensors("process", metrics, threadId, taskName, processorNodeName, allTagMap, tagMap);
            this.nodePunctuateTimeSensor = NodeMetrics.createTaskAndNodeLatencyAndThroughputSensors("punctuate", metrics, threadId, taskName, processorNodeName, allTagMap, tagMap);
            this.nodeCreationSensor = NodeMetrics.createTaskAndNodeLatencyAndThroughputSensors("create", metrics, threadId, taskName, processorNodeName, allTagMap, tagMap);
            this.nodeDestructionSensor = NodeMetrics.createTaskAndNodeLatencyAndThroughputSensors("destroy", metrics, threadId, taskName, processorNodeName, allTagMap, tagMap);
            this.sourceNodeForwardSensor = NodeMetrics.createTaskAndNodeLatencyAndThroughputSensors("forward", metrics, threadId, taskName, processorNodeName, allTagMap, tagMap);
            this.taskName = taskName;
            this.processorNodeName = processorNodeName;
        }

        private void removeAllSensors() {
            this.metrics.removeAllNodeLevelSensors(Thread.currentThread().getName(), this.taskName, this.processorNodeName);
        }

        private static Sensor createTaskAndNodeLatencyAndThroughputSensors(String operation, StreamsMetricsImpl metrics, String threadId, String taskName, String processorNodeName, Map<String, String> taskTags, Map<String, String> nodeTags) {
            Sensor parent = metrics.taskLevelSensor(threadId, taskName, operation, Sensor.RecordingLevel.DEBUG, new Sensor[0]);
            StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(parent, "stream-processor-node-metrics", taskTags, operation);
            StreamsMetricsImpl.addInvocationRateAndCountToSensor(parent, "stream-processor-node-metrics", taskTags, operation);
            Sensor sensor = metrics.nodeLevelSensor(threadId, taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
            StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(sensor, "stream-processor-node-metrics", nodeTags, operation);
            StreamsMetricsImpl.addInvocationRateAndCountToSensor(sensor, "stream-processor-node-metrics", nodeTags, operation);
            return sensor;
        }
    }
}

