package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/PartitionGroupTest.class */
public class PartitionGroupTest {
    private final LogContext logContext = new LogContext();
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    private final TopicPartition randomPartition = new TopicPartition("random-partition", 0);
    private final String errMessage = "Partition " + this.randomPartition + " not found.";
    private final String[] topics = {"topic"};
    private final TopicPartition partition1 = new TopicPartition(this.topics[0], 1);
    private final TopicPartition partition2 = new TopicPartition(this.topics[0], 2);
    private final RecordQueue queue1 = new RecordQueue(this.partition1, new MockSourceNode(this.topics, this.intDeserializer, this.intDeserializer), this.timestampExtractor, new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), this.logContext);
    private final RecordQueue queue2 = new RecordQueue(this.partition2, new MockSourceNode(this.topics, this.intDeserializer, this.intDeserializer), this.timestampExtractor, new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), this.logContext);
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);
    private final Metrics metrics = new Metrics();
    private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", Utils.mkMap(new Map.Entry[0]));
    private final PartitionGroup group = new PartitionGroup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), getValueSensor(this.metrics, this.lastLatenessValue));

    private static Sensor getValueSensor(Metrics metrics, MetricName metricName) {
        Sensor sensor = metrics.sensor(metricName.name());
        sensor.add(metricName, new Value());
        return sensor;
    }

    @Test
    public void testTimeTracking() {
        Assert.assertEquals(0L, this.group.numBuffered());
        this.group.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord("topic", 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 3L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 5L, this.recordKey, this.recordValue)));
        this.group.addRawRecords(this.partition2, Arrays.asList(new ConsumerRecord("topic", 2, 2L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 2, 4L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 2, 6L, this.recordKey, this.recordValue)));
        verifyBuffered(6, 3, 3);
        Assert.assertEquals(-1L, this.group.streamTime());
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
        StampedRecord nextRecord = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition1, recordInfo.partition());
        Assert.assertEquals(3L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(2L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(1L, this.group.streamTime());
        verifyTimes(nextRecord, 1L, 1L);
        verifyBuffered(5, 2, 3);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord2 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition2, recordInfo.partition());
        Assert.assertEquals(3L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(4L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(2L, this.group.streamTime());
        verifyTimes(nextRecord2, 2L, 2L);
        verifyBuffered(4, 2, 2);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        this.group.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord("topic", 1, 2L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 4L, this.recordKey, this.recordValue)));
        verifyBuffered(6, 4, 2);
        Assert.assertEquals(3L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(4L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(2L, this.group.streamTime());
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord3 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition1, recordInfo.partition());
        Assert.assertEquals(5L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(4L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(3L, this.group.streamTime());
        verifyTimes(nextRecord3, 3L, 3L);
        verifyBuffered(5, 3, 2);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord4 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition2, recordInfo.partition());
        Assert.assertEquals(5L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(6L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(4L, this.group.streamTime());
        verifyTimes(nextRecord4, 4L, 4L);
        verifyBuffered(4, 3, 1);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord5 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition1, recordInfo.partition());
        Assert.assertEquals(5L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(6L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(5L, this.group.streamTime());
        verifyTimes(nextRecord5, 5L, 5L);
        verifyBuffered(3, 2, 1);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord6 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition1, recordInfo.partition());
        Assert.assertEquals(5L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(6L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(5L, this.group.streamTime());
        verifyTimes(nextRecord6, 2L, 5L);
        verifyBuffered(2, 1, 1);
        Assert.assertEquals(Double.valueOf(3.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord7 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition1, recordInfo.partition());
        Assert.assertEquals(5L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(6L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(5L, this.group.streamTime());
        verifyTimes(nextRecord7, 4L, 5L);
        verifyBuffered(1, 0, 1);
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        StampedRecord nextRecord8 = this.group.nextRecord(recordInfo);
        Assert.assertEquals(this.partition2, recordInfo.partition());
        Assert.assertEquals(5L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(6L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(6L, this.group.streamTime());
        verifyTimes(nextRecord8, 6L, 6L);
        verifyBuffered(0, 0, 0);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
    }

    @Test
    public void shouldChooseNextRecordBasedOnHeadTimestamp() {
        Assert.assertEquals(0L, this.group.numBuffered());
        this.group.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord("topic", 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 5L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 3L, this.recordKey, this.recordValue)));
        verifyBuffered(3, 3, 0);
        Assert.assertEquals(-1L, this.group.streamTime());
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
        Assert.assertEquals(this.group.nextRecord(recordInfo).timestamp, 1L);
        Assert.assertEquals(this.group.nextRecord(recordInfo).timestamp, 5L);
        this.group.addRawRecords(this.partition2, Arrays.asList(new ConsumerRecord("topic", 2, 2L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 2, 4L, this.recordKey, this.recordValue), new ConsumerRecord("topic", 2, 6L, this.recordKey, this.recordValue)));
        Assert.assertEquals(this.group.nextRecord(recordInfo).timestamp, 2L);
        Assert.assertEquals(this.group.nextRecord(recordInfo).timestamp, 3L);
    }

    private void verifyTimes(StampedRecord stampedRecord, long j, long j2) {
        Assert.assertEquals(j, stampedRecord.timestamp);
        Assert.assertEquals(j2, this.group.streamTime());
    }

    private void verifyBuffered(int i, int i2, int i3) {
        Assert.assertEquals(i, this.group.numBuffered());
        Assert.assertEquals(i2, this.group.numBuffered(this.partition1));
        Assert.assertEquals(i3, this.group.numBuffered(this.partition2));
    }

    @Test
    public void shouldSetPartitionTimestampAndStreamTime() {
        this.group.setPartitionTime(this.partition1, 100L);
        Assert.assertEquals(100L, this.group.partitionTimestamp(this.partition1));
        Assert.assertEquals(100L, this.group.streamTime());
        this.group.setPartitionTime(this.partition2, 50L);
        Assert.assertEquals(50L, this.group.partitionTimestamp(this.partition2));
        Assert.assertEquals(100L, this.group.streamTime());
    }

    @Test
    public void shouldThrowNullpointerUponSetPartitionTimestampFailure() {
        Assert.assertThrows(this.errMessage, NullPointerException.class, () -> {
            this.group.setPartitionTime(this.randomPartition, 0L);
        });
    }

    @Test
    public void shouldThrowNullpointerUponGetPartitionTimestampFailure() {
        Assert.assertThrows(this.errMessage, NullPointerException.class, () -> {
            this.group.partitionTimestamp(this.randomPartition);
        });
    }
}
