package org.apache.kafka.streams.processor.internals;

import java.util.Iterator;
import java.util.List;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorContextImpl.class */
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
    private final StreamTask task;
    private final RecordCollector collector;
    private final ToInternal toInternal;
    private static final To SEND_TO_ALL = To.all();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProcessorContextImpl(TaskId taskId, StreamTask streamTask, StreamsConfig streamsConfig, RecordCollector recordCollector, ProcessorStateManager processorStateManager, StreamsMetricsImpl streamsMetricsImpl, ThreadCache threadCache) {
        super(taskId, streamsConfig, streamsMetricsImpl, processorStateManager, threadCache);
        this.toInternal = new ToInternal();
        this.task = streamTask;
        this.collector = recordCollector;
    }

    public ProcessorStateManager getStateMgr() {
        return (ProcessorStateManager) this.stateManager;
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public StateStore getStateStore(String str) {
        if (currentNode() == null) {
            throw new StreamsException("Accessing from an unknown node");
        }
        StateStore globalStore = this.stateManager.getGlobalStore(str);
        if (globalStore != null) {
            return globalStore;
        }
        if (currentNode().stateStores.contains(str)) {
            return this.stateManager.getStore(str);
        }
        throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + str + " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v) {
        forward((ProcessorContextImpl) k, (K) v, SEND_TO_ALL);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, int i) {
        forward((ProcessorContextImpl) k, (K) v, To.child(currentNode().children().get(i).name()));
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, String str) {
        forward((ProcessorContextImpl) k, (K) v, To.child(str));
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, To to) {
        ProcessorNode currentNode = currentNode();
        long j = this.recordContext.timestamp;
        try {
            this.toInternal.update(to);
            if (this.toInternal.hasTimestamp()) {
                this.recordContext.setTimestamp(this.toInternal.timestamp());
            }
            List<ProcessorNode<?, ?>> children = currentNode().children();
            String child = this.toInternal.child();
            if (child != null) {
                ProcessorNode child2 = currentNode().getChild(child);
                if (child2 == null) {
                    throw new StreamsException("Unknown downstream node: " + child + " either does not exist or is not connected to this processor.");
                }
                forward(child2, (ProcessorNode) k, (K) v);
            } else if (children.size() == 1) {
                forward((ProcessorNode) children.get(0), (ProcessorNode<?, ?>) k, (K) v);
            } else {
                Iterator<ProcessorNode<?, ?>> it = children.iterator();
                while (it.hasNext()) {
                    forward((ProcessorNode) it.next(), (ProcessorNode<?, ?>) k, (K) v);
                }
            }
        } finally {
            this.recordContext.timestamp = j;
            setCurrentNode(currentNode);
        }
    }

    private <K, V> void forward(ProcessorNode processorNode, K k, V v) {
        setCurrentNode(processorNode);
        processorNode.process(k, v);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public void commit() {
        this.task.needCommit();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Cancellable schedule(long j, PunctuationType punctuationType, Punctuator punctuator) {
        return this.task.schedule(j, punctuationType, punctuator);
    }
}
