package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTask.class */
public class StreamTask extends AbstractTask implements Punctuator {
    private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1, (Object) null, (Object) null);
    private final int maxBufferedSize;
    private final PartitionGroup partitionGroup;
    private final PartitionGroup.RecordInfo recordInfo;
    private final PunctuationQueue punctuationQueue;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final RecordCollector recordCollector;
    private boolean commitRequested;
    private boolean commitOffsetNeeded;
    private StampedRecord currRecord;
    private ProcessorNode currNode;
    private boolean requiresPoll;

    public StreamTask(TaskId taskId, String str, Collection<TopicPartition> collection, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> consumer2, StreamsConfig streamsConfig, StreamsMetrics streamsMetrics) {
        super(taskId, str, collection, processorTopology, consumer, consumer2, streamsConfig, false);
        this.recordInfo = new PartitionGroup.RecordInfo();
        this.commitRequested = false;
        this.commitOffsetNeeded = false;
        this.currRecord = null;
        this.currNode = null;
        this.requiresPoll = true;
        this.punctuationQueue = new PunctuationQueue();
        this.maxBufferedSize = streamsConfig.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG).intValue();
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            hashMap.put(topicPartition, createRecordQueue(topicPartition, processorTopology.source(topicPartition.topic())));
        }
        this.partitionGroup = new PartitionGroup(hashMap, (TimestampExtractor) streamsConfig.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class));
        this.consumedOffsets = new HashMap();
        this.recordCollector = new RecordCollector(producer);
        log.info("Creating restoration consumer client for stream task #" + id());
        this.processorContext = new ProcessorContextImpl(taskId, this, streamsConfig, this.recordCollector, this.stateMgr, streamsMetrics);
        initializeStateStores();
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.currNode = processorNode;
            try {
                processorNode.init(this.processorContext);
                this.currNode = null;
            } catch (Throwable th) {
                this.currNode = null;
                throw th;
            }
        }
        ((ProcessorContextImpl) this.processorContext).initialized();
    }

    public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        if (this.partitionGroup.addRawRecords(topicPartition, iterable) > this.maxBufferedSize) {
            this.consumer.pause(Collections.singleton(topicPartition));
        }
    }

    /* JADX WARN: Finally extract failed */
    public int process() {
        synchronized (this) {
            StampedRecord nextRecord = this.partitionGroup.nextRecord(this.recordInfo);
            if (nextRecord == null) {
                this.requiresPoll = true;
                return 0;
            }
            this.requiresPoll = false;
            try {
                this.currRecord = nextRecord;
                this.currNode = this.recordInfo.node();
                TopicPartition partition = this.recordInfo.partition();
                log.debug("Start processing one record [{}]", this.currRecord);
                this.currNode.process(this.currRecord.key(), this.currRecord.value());
                log.debug("Completed processing one record [{}]", this.currRecord);
                this.consumedOffsets.put(partition, Long.valueOf(this.currRecord.offset()));
                this.commitOffsetNeeded = true;
                if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                    this.consumer.resume(Collections.singleton(partition));
                    this.requiresPoll = true;
                }
                if (this.partitionGroup.topQueueSize() <= this.maxBufferedSize) {
                    this.requiresPoll = true;
                }
                this.currRecord = null;
                this.currNode = null;
                return this.partitionGroup.numBuffered();
            } catch (Throwable th) {
                this.currRecord = null;
                this.currNode = null;
                throw th;
            }
        }
    }

    public boolean requiresPoll() {
        return this.requiresPoll;
    }

    public boolean maybePunctuate() {
        return this.punctuationQueue.mayPunctuate(this.partitionGroup.timestamp(), this);
    }

    @Override // org.apache.kafka.streams.processor.internals.Punctuator
    public void punctuate(ProcessorNode processorNode, long j) {
        if (this.currNode != null) {
            throw new IllegalStateException("Current node is not null");
        }
        this.currNode = processorNode;
        this.currRecord = new StampedRecord(DUMMY_RECORD, j);
        try {
            processorNode.processor().punctuate(j);
            this.currNode = null;
            this.currRecord = null;
        } catch (Throwable th) {
            this.currNode = null;
            this.currRecord = null;
            throw th;
        }
    }

    public StampedRecord record() {
        return this.currRecord;
    }

    public ProcessorNode node() {
        return this.currNode;
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask
    public void commit() {
        this.stateMgr.flush();
        this.recordCollector.flush();
        if (this.commitOffsetNeeded) {
            HashMap hashMap = new HashMap(this.consumedOffsets.size());
            for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
                TopicPartition key = entry.getKey();
                long longValue = entry.getValue().longValue() + 1;
                hashMap.put(key, new OffsetAndMetadata(longValue));
                this.stateMgr.putOffsetLimit(key, longValue);
            }
            this.consumer.commitSync(hashMap);
            this.commitOffsetNeeded = false;
        }
        this.commitRequested = false;
    }

    public boolean commitNeeded() {
        return this.commitRequested;
    }

    public void needCommit() {
        this.commitRequested = true;
    }

    public void schedule(long j) {
        if (this.currNode == null) {
            throw new IllegalStateException("Current node is null");
        }
        this.punctuationQueue.schedule(new PunctuationSchedule(this.currNode, j));
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask
    public void close() {
        this.partitionGroup.close();
        this.consumedOffsets.clear();
        RuntimeException runtimeException = null;
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.currNode = processorNode;
            try {
                processorNode.close();
                this.currNode = null;
            } catch (RuntimeException e) {
                runtimeException = e;
                this.currNode = null;
            } catch (Throwable th) {
                this.currNode = null;
                throw th;
            }
        }
        super.close();
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask
    protected Map<TopicPartition, Long> recordCollectorOffsets() {
        return this.recordCollector.offsets();
    }

    private RecordQueue createRecordQueue(TopicPartition topicPartition, SourceNode sourceNode) {
        return new RecordQueue(topicPartition, sourceNode);
    }

    public <K, V> void forward(K k, V v) {
        ProcessorNode processorNode = this.currNode;
        try {
            for (ProcessorNode<?, ?> processorNode2 : processorNode.children()) {
                this.currNode = processorNode2;
                processorNode2.process(k, v);
            }
        } finally {
            this.currNode = processorNode;
        }
    }

    public <K, V> void forward(K k, V v, int i) {
        ProcessorNode processorNode = this.currNode;
        ProcessorNode<?, ?> processorNode2 = processorNode.children().get(i);
        this.currNode = processorNode2;
        try {
            processorNode2.process(k, v);
            this.currNode = processorNode;
        } catch (Throwable th) {
            this.currNode = processorNode;
            throw th;
        }
    }

    public <K, V> void forward(K k, V v, String str) {
        ProcessorNode processorNode = this.currNode;
        for (ProcessorNode<?, ?> processorNode2 : processorNode.children()) {
            if (processorNode2.name().equals(str)) {
                this.currNode = processorNode2;
                try {
                    processorNode2.process(k, v);
                    this.currNode = processorNode;
                    return;
                } catch (Throwable th) {
                    this.currNode = processorNode;
                    throw th;
                }
            }
        }
    }
}
