package org.apache.kafka.streams.processor.internals;

import java.util.ArrayDeque;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordQueue.class */
public class RecordQueue {
    public static final long UNKNOWN = -1;
    private final Logger log;
    private final SourceNode<?, ?> source;
    private final TopicPartition partition;
    private final ProcessorContext processorContext;
    private final TimestampExtractor timestampExtractor;
    private final RecordDeserializer recordDeserializer;
    private final Sensor droppedRecordsSensor;
    private StampedRecord headRecord = null;
    private long partitionTime = -1;
    private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue = new ArrayDeque<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordQueue(TopicPartition topicPartition, SourceNode<?, ?> sourceNode, TimestampExtractor timestampExtractor, DeserializationExceptionHandler deserializationExceptionHandler, InternalProcessorContext internalProcessorContext, LogContext logContext) {
        this.source = sourceNode;
        this.partition = topicPartition;
        this.timestampExtractor = timestampExtractor;
        this.processorContext = internalProcessorContext;
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
        this.recordDeserializer = new RecordDeserializer(sourceNode, deserializationExceptionHandler, logContext, this.droppedRecordsSensor);
        this.log = logContext.logger(RecordQueue.class);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPartitionTime(long j) {
        this.partitionTime = j;
    }

    public SourceNode<?, ?> source() {
        return this.source;
    }

    public TopicPartition partition() {
        return this.partition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        Iterator<ConsumerRecord<byte[], byte[]>> it = iterable.iterator();
        while (it.hasNext()) {
            this.fifoQueue.addLast(it.next());
        }
        updateHead();
        return size();
    }

    public StampedRecord poll() {
        StampedRecord stampedRecord = this.headRecord;
        this.headRecord = null;
        this.partitionTime = Math.max(this.partitionTime, stampedRecord.timestamp);
        updateHead();
        return stampedRecord;
    }

    public int size() {
        return this.fifoQueue.size() + (this.headRecord == null ? 0 : 1);
    }

    public boolean isEmpty() {
        return this.fifoQueue.isEmpty() && this.headRecord == null;
    }

    public long headRecordTimestamp() {
        if (this.headRecord == null) {
            return -1L;
        }
        return this.headRecord.timestamp;
    }

    public Long headRecordOffset() {
        if (this.headRecord == null) {
            return null;
        }
        return Long.valueOf(this.headRecord.offset());
    }

    public void clear() {
        this.fifoQueue.clear();
        this.headRecord = null;
        this.partitionTime = -1L;
    }

    private void updateHead() {
        while (this.headRecord == null && !this.fifoQueue.isEmpty()) {
            ConsumerRecord<Object, Object> deserialize = this.recordDeserializer.deserialize(this.processorContext, this.fifoQueue.pollFirst());
            if (deserialize != null) {
                try {
                    long extract = this.timestampExtractor.extract(deserialize, this.partitionTime);
                    this.log.trace("Source node {} extracted timestamp {} for record {}", new Object[]{this.source.name(), Long.valueOf(extract), deserialize});
                    if (extract < 0) {
                        this.log.warn("Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", new Object[]{deserialize.topic(), Integer.valueOf(deserialize.partition()), Long.valueOf(deserialize.offset()), Long.valueOf(extract), this.timestampExtractor.getClass().getCanonicalName()});
                        this.droppedRecordsSensor.record();
                    } else {
                        this.headRecord = new StampedRecord(deserialize, extract);
                    }
                } catch (Exception e) {
                    throw new StreamsException(String.format("Fatal user code error in TimestampExtractor callback for record %s.", deserialize), e);
                } catch (StreamsException e2) {
                    throw e2;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long partitionTime() {
        return this.partitionTime;
    }
}
