/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.streams.processor.internals.Punctuator;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamTask
extends AbstractTask
implements Punctuator {
    private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord("__null_topic__", -1, -1L, null, null);
    private final String logPrefix;
    private final PartitionGroup partitionGroup;
    private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
    private final PunctuationQueue punctuationQueue = new PunctuationQueue();
    private final Map<TopicPartition, Long> consumedOffsets;
    private final RecordCollector recordCollector;
    private final int maxBufferedSize;
    private boolean commitRequested = false;
    private boolean commitOffsetNeeded = false;
    private ProcessorNode currNode = null;
    private boolean requiresPoll = true;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamTask(TaskId id, String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics, StateDirectory stateDirectory, ThreadCache cache) {
        super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache);
        this.maxBufferedSize = config.getInt("buffered.records.per.partition");
        HashMap<TopicPartition, RecordQueue> partitionQueues = new HashMap<TopicPartition, RecordQueue>();
        for (TopicPartition partition : partitions) {
            SourceNode source = topology.source(partition.topic());
            RecordQueue queue = this.createRecordQueue(partition, source);
            partitionQueues.put(partition, queue);
        }
        this.logPrefix = String.format("task [%s]", id);
        TimestampExtractor timestampExtractor = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);
        this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
        this.consumedOffsets = new HashMap<TopicPartition, Long>();
        this.recordCollector = new RecordCollector(producer, this.id().toString());
        this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, this.stateMgr, metrics, cache);
        log.info("{} Initializing state stores", (Object)this.logPrefix);
        this.initializeStateStores();
        log.info("{} Initializing processor nodes of the topology", (Object)this.logPrefix);
        Iterator<ProcessorNode> i$ = this.topology.processors().iterator();
        while (i$.hasNext()) {
            ProcessorNode node;
            this.currNode = node = i$.next();
            try {
                node.init(this.processorContext);
            }
            finally {
                this.currNode = null;
            }
        }
        ((ProcessorContextImpl)this.processorContext).initialized();
    }

    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
        int queueSize = this.partitionGroup.addRawRecords(partition, records);
        log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", new Object[]{this.logPrefix, partition, queueSize});
        if (queueSize > this.maxBufferedSize) {
            this.consumer.pause(Collections.singleton(partition));
        }
    }

    public int process() {
        StampedRecord record = this.partitionGroup.nextRecord(this.recordInfo);
        if (record == null) {
            this.requiresPoll = true;
            return 0;
        }
        this.requiresPoll = false;
        try {
            this.currNode = this.recordInfo.node();
            TopicPartition partition = this.recordInfo.partition();
            log.trace("{} Start processing one record [{}]", (Object)this.logPrefix, (Object)record);
            ProcessorRecordContext recordContext = this.createRecordContext(record);
            this.updateProcessorContext(recordContext, this.currNode);
            this.currNode.process(record.key(), record.value());
            log.trace("{} Completed processing one record [{}]", (Object)this.logPrefix, (Object)record);
            this.consumedOffsets.put(partition, record.offset());
            this.commitOffsetNeeded = true;
            if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                this.consumer.resume(Collections.singleton(partition));
                this.requiresPoll = true;
            }
            if (this.partitionGroup.topQueueSize() <= this.maxBufferedSize) {
                this.requiresPoll = true;
            }
        }
        catch (KafkaException ke) {
            throw new StreamsException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", this.id.toString(), this.currNode.name(), record.topic(), record.partition(), record.offset()), ke);
        }
        finally {
            this.processorContext.setCurrentNode(null);
            this.currNode = null;
        }
        return this.partitionGroup.numBuffered();
    }

    private void updateProcessorContext(ProcessorRecordContext recordContext, ProcessorNode currNode) {
        this.processorContext.setRecordContext(recordContext);
        this.processorContext.setCurrentNode(currNode);
    }

    public boolean requiresPoll() {
        return this.requiresPoll;
    }

    public boolean maybePunctuate() {
        long timestamp = this.partitionGroup.timestamp();
        if (timestamp == -1L) {
            return false;
        }
        return this.punctuationQueue.mayPunctuate(timestamp, this);
    }

    @Override
    public void punctuate(ProcessorNode node, long timestamp) {
        if (this.currNode != null) {
            throw new IllegalStateException(String.format("%s Current node is not null", this.logPrefix));
        }
        this.currNode = node;
        StampedRecord stampedRecord = new StampedRecord(DUMMY_RECORD, timestamp);
        this.updateProcessorContext(this.createRecordContext(stampedRecord), node);
        log.trace("{} Punctuating processor {} with timestamp {}", new Object[]{this.logPrefix, node.name(), timestamp});
        try {
            node.processor().punctuate(timestamp);
        }
        catch (KafkaException ke) {
            throw new StreamsException(String.format("Exception caught in punctuate. taskId=%s processor=%s", this.id, node.name()), ke);
        }
        finally {
            this.processorContext.setCurrentNode(null);
            this.currNode = null;
        }
    }

    public ProcessorNode node() {
        return this.currNode;
    }

    @Override
    public void commit() {
        log.debug("{} Committing its state", (Object)this.logPrefix);
        this.stateMgr.flush(this.processorContext);
        this.recordCollector.flush();
        this.commitOffsets();
    }

    @Override
    public void commitOffsets() {
        if (this.commitOffsetNeeded) {
            HashMap<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<TopicPartition, OffsetAndMetadata>(this.consumedOffsets.size());
            for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
                TopicPartition partition = entry.getKey();
                long offset = entry.getValue() + 1L;
                consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
                this.stateMgr.putOffsetLimit(partition, offset);
            }
            this.consumer.commitSync(consumedOffsetsAndMetadata);
            this.commitOffsetNeeded = false;
        }
        this.commitRequested = false;
    }

    public boolean commitNeeded() {
        return this.commitRequested;
    }

    public void needCommit() {
        this.commitRequested = true;
    }

    public void schedule(long interval) {
        if (this.currNode == null) {
            throw new IllegalStateException(String.format("%s Current node is null", this.logPrefix));
        }
        this.punctuationQueue.schedule(new PunctuationSchedule(this.currNode, interval));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        log.debug("{} Closing processor topology", (Object)this.logPrefix);
        this.partitionGroup.close();
        this.consumedOffsets.clear();
        RuntimeException exception = null;
        Iterator<ProcessorNode> i$ = this.topology.processors().iterator();
        while (i$.hasNext()) {
            ProcessorNode node;
            this.currNode = node = i$.next();
            try {
                node.close();
            }
            catch (RuntimeException e) {
                exception = e;
            }
            finally {
                this.currNode = null;
            }
        }
        if (exception != null) {
            throw exception;
        }
    }

    @Override
    protected Map<TopicPartition, Long> recordCollectorOffsets() {
        return this.recordCollector.offsets();
    }

    private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
        return new RecordQueue(partition, source);
    }

    private ProcessorRecordContext createRecordContext(StampedRecord currRecord) {
        return new ProcessorRecordContext(currRecord.timestamp, currRecord.offset(), currRecord.partition(), currRecord.topic());
    }

    @Override
    public String toString() {
        return super.toString();
    }
}

