package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateConsumerTest.class */
public class StateConsumerTest {
    private static final long FLUSH_INTERVAL = 1000;
    private final TopicPartition topicOne = new TopicPartition("topic-one", 1);
    private final TopicPartition topicTwo = new TopicPartition("topic-two", 1);
    private final MockTime time = new MockTime();
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final Map<TopicPartition, Long> partitionOffsets = new HashMap();
    private final LogContext logContext = new LogContext("test ");
    private GlobalStreamThread.StateConsumer stateConsumer;
    private TaskStub stateMaintainer;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateConsumerTest$TaskStub.class */
    private static class TaskStub implements GlobalStateMaintainer {
        private final Map<TopicPartition, Long> partitionOffsets;
        private final Map<TopicPartition, Integer> updatedPartitions = new HashMap();
        private boolean flushed;
        private boolean wipeStore;
        private boolean closed;

        TaskStub(Map<TopicPartition, Long> map) {
            this.partitionOffsets = map;
        }

        public Map<TopicPartition, Long> initialize() {
            return this.partitionOffsets;
        }

        public void flushState() {
            this.flushed = true;
        }

        public void close(boolean z) {
            this.closed = true;
            this.wipeStore = z;
        }

        public void update(ConsumerRecord<byte[], byte[]> consumerRecord) {
            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            if (!this.updatedPartitions.containsKey(topicPartition)) {
                this.updatedPartitions.put(topicPartition, 0);
            }
            this.updatedPartitions.put(topicPartition, Integer.valueOf(this.updatedPartitions.get(topicPartition).intValue() + 1));
        }
    }

    @Before
    public void setUp() {
        this.partitionOffsets.put(this.topicOne, 20L);
        this.partitionOffsets.put(this.topicTwo, 30L);
        this.stateMaintainer = new TaskStub(this.partitionOffsets);
        this.stateConsumer = new GlobalStreamThread.StateConsumer(this.logContext, this.consumer, this.stateMaintainer, this.time, Duration.ofMillis(10L), FLUSH_INTERVAL);
    }

    @Test
    public void shouldAssignPartitionsToConsumer() {
        this.stateConsumer.initialize();
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.topicOne, this.topicTwo}), this.consumer.assignment());
    }

    @Test
    public void shouldSeekToInitialOffsets() {
        this.stateConsumer.initialize();
        Assert.assertEquals(20L, this.consumer.position(this.topicOne));
        Assert.assertEquals(30L, this.consumer.position(this.topicTwo));
    }

    @Test
    public void shouldUpdateStateWithReceivedRecordsForPartition() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 21L, new byte[0], new byte[0]));
        this.stateConsumer.pollAndUpdate();
        Assert.assertEquals(2L, ((Integer) this.stateMaintainer.updatedPartitions.get(this.topicOne)).intValue());
    }

    @Test
    public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.consumer.addRecord(new ConsumerRecord("topic-two", 1, 31L, new byte[0], new byte[0]));
        this.consumer.addRecord(new ConsumerRecord("topic-two", 1, 32L, new byte[0], new byte[0]));
        this.stateConsumer.pollAndUpdate();
        Assert.assertEquals(1L, ((Integer) this.stateMaintainer.updatedPartitions.get(this.topicOne)).intValue());
        Assert.assertEquals(2L, ((Integer) this.stateMaintainer.updatedPartitions.get(this.topicTwo)).intValue());
    }

    @Test
    public void shouldFlushStoreWhenFlushIntervalHasLapsed() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.time.sleep(FLUSH_INTERVAL);
        this.stateConsumer.pollAndUpdate();
        Assert.assertTrue(this.stateMaintainer.flushed);
    }

    @Test
    public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() {
        this.stateConsumer.initialize();
        this.consumer.addRecord(new ConsumerRecord("topic-one", 1, 20L, new byte[0], new byte[0]));
        this.time.sleep(500L);
        this.stateConsumer.pollAndUpdate();
        Assert.assertFalse(this.stateMaintainer.flushed);
    }

    @Test
    public void shouldCloseConsumer() throws IOException {
        this.stateConsumer.close(false);
        Assert.assertTrue(this.consumer.closed());
    }

    @Test
    public void shouldCloseStateMaintainer() throws IOException {
        this.stateConsumer.close(false);
        Assert.assertTrue(this.stateMaintainer.closed);
    }

    @Test
    public void shouldWipeStoreOnClose() throws IOException {
        this.stateConsumer.close(true);
        Assert.assertTrue(this.stateMaintainer.wipeStore);
    }
}
