package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNode.class */
public class ProcessorNode<KIn, VIn, KOut, VOut> {
    private final List<ProcessorNode<KOut, VOut, ?, ?>> children;
    private final Map<String, ProcessorNode<KOut, VOut, ?, ?>> childByName;
    private final Processor<KIn, VIn, KOut, VOut> processor;
    private final String name;
    private final Time time;
    public final Set<String> stateStores;
    private InternalProcessorContext internalProcessorContext;
    private String threadId;
    private boolean closed;

    public ProcessorNode(String str) {
        this(str, (Processor) null, (Set<String>) null);
    }

    public ProcessorNode(String str, Processor<KIn, VIn, KOut, VOut> processor, Set<String> set) {
        this.closed = true;
        this.name = str;
        this.processor = processor;
        this.children = new ArrayList();
        this.childByName = new HashMap();
        this.stateStores = set;
        this.time = new SystemTime();
    }

    public ProcessorNode(String str, org.apache.kafka.streams.processor.Processor<KIn, VIn> processor, Set<String> set) {
        this.closed = true;
        this.name = str;
        this.processor = ProcessorAdapter.adapt(processor);
        this.children = new ArrayList();
        this.childByName = new HashMap();
        this.stateStores = set;
        this.time = new SystemTime();
    }

    public final String name() {
        return this.name;
    }

    public final Processor<KIn, VIn, KOut, VOut> processor() {
        return this.processor;
    }

    public List<ProcessorNode<KOut, VOut, ?, ?>> children() {
        return this.children;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProcessorNode<KOut, VOut, ?, ?> getChild(String str) {
        return this.childByName.get(str);
    }

    public void addChild(ProcessorNode<KOut, VOut, ?, ?> processorNode) {
        this.children.add(processorNode);
        this.childByName.put(processorNode.name, processorNode);
    }

    public void init(InternalProcessorContext<KOut, VOut> internalProcessorContext) {
        if (!this.closed) {
            throw new IllegalStateException("The processor is not closed");
        }
        try {
            this.threadId = Thread.currentThread().getName();
            this.internalProcessorContext = internalProcessorContext;
            if (this.processor != null) {
                this.processor.init(internalProcessorContext);
            }
            this.closed = false;
        } catch (Exception e) {
            throw new StreamsException(String.format("failed to initialize processor %s", this.name), e);
        }
    }

    public void close() {
        throwIfClosed();
        try {
            if (this.processor != null) {
                this.processor.close();
            }
            this.internalProcessorContext.metrics().removeAllNodeLevelSensors(this.threadId, this.internalProcessorContext.taskId().toString(), this.name);
            this.closed = true;
        } catch (Exception e) {
            throw new StreamsException(String.format("failed to close processor %s", this.name), e);
        }
    }

    protected void throwIfClosed() {
        if (this.closed) {
            throw new IllegalStateException("The processor is already closed");
        }
    }

    public void process(Record<KIn, VIn> record) {
        throwIfClosed();
        try {
            this.processor.process(record);
        } catch (ClassCastException e) {
            throw new StreamsException(String.format("ClassCastException invoking processor: %s. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: %s, and value: %s.%nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.", name(), record.key() == null ? "unknown because key is null" : record.key().getClass().getName(), record.value() == null ? "unknown because value is null" : record.value().getClass().getName()), e);
        }
    }

    public void punctuate(long j, Punctuator punctuator) {
        punctuator.punctuate(j);
    }

    public boolean isTerminalNode() {
        return this.children.isEmpty();
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder(str + this.name + ":\n");
        if (this.stateStores != null && !this.stateStores.isEmpty()) {
            sb.append(str).append("\tstates:\t\t[");
            Iterator<String> it = this.stateStores.iterator();
            while (it.hasNext()) {
                sb.append(it.next());
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }
}
