package org.apache.kafka.streams;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver.class */
public class TopologyTestDriver implements Closeable {
    private final Time mockTime;
    private final InternalTopologyBuilder internalTopologyBuilder;
    private static final int PARTITION_ID = 0;
    private static final TaskId TASK_ID = new TaskId(PARTITION_ID, PARTITION_ID);
    private final StreamTask task;
    private final GlobalStateUpdateTask globalStateTask;
    private final GlobalStateManager globalStateManager;
    private final StateDirectory stateDirectory;
    private final ProcessorTopology processorTopology;
    private final MockProducer<byte[], byte[]> producer;
    private final Set<String> internalTopics;
    private final Map<String, TopicPartition> partitionsByTopic;
    private final Map<String, TopicPartition> globalPartitionsByTopic;
    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition;
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic;
    private final boolean eosEnabled;

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$MockTime.class */
    static class MockTime implements Time {
        private final AtomicLong timeMs;
        private final AtomicLong highResTimeNs;

        MockTime(long j) {
            this.timeMs = new AtomicLong(j);
            this.highResTimeNs = new AtomicLong(j * 1000 * 1000);
        }

        public long milliseconds() {
            return this.timeMs.get();
        }

        public long nanoseconds() {
            return this.highResTimeNs.get();
        }

        public long hiResClockMs() {
            return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
        }

        public void sleep(long j) {
            if (j < 0) {
                throw new IllegalArgumentException("Sleep ms cannot be negative.");
            }
            this.timeMs.addAndGet(j);
            this.highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(j));
        }
    }

    public TopologyTestDriver(Topology topology, Properties properties) {
        this(topology, properties, System.currentTimeMillis());
    }

    public TopologyTestDriver(Topology topology, Properties properties, long j) {
        this.internalTopics = new HashSet();
        this.partitionsByTopic = new HashMap();
        this.globalPartitionsByTopic = new HashMap();
        this.offsetsByTopicPartition = new HashMap();
        this.outputRecordsByTopic = new HashMap();
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        this.mockTime = new MockTime(j);
        this.internalTopologyBuilder = topology.internalTopologyBuilder;
        this.internalTopologyBuilder.setApplicationId(streamsConfig.getString("application.id"));
        this.processorTopology = this.internalTopologyBuilder.build((Integer) null);
        ProcessorTopology buildGlobalStateTopology = this.internalTopologyBuilder.buildGlobalStateTopology();
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
        this.producer = new MockProducer<byte[], byte[]>(true, byteArraySerializer, byteArraySerializer) { // from class: org.apache.kafka.streams.TopologyTestDriver.1
            public List<PartitionInfo> partitionsFor(String str) {
                return Collections.singletonList(new PartitionInfo(str, TopologyTestDriver.PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null));
            }
        };
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.stateDirectory = new StateDirectory(streamsConfig, this.mockTime);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(new Metrics(), "topology-test-driver-stream-metrics", Collections.emptyMap());
        ThreadCache threadCache = new ThreadCache(new LogContext("topology-test-driver "), Math.max(0L, streamsConfig.getLong("cache.max.bytes.buffering").longValue()), streamsMetricsImpl);
        StateRestoreListener stateRestoreListener = new StateRestoreListener() { // from class: org.apache.kafka.streams.TopologyTestDriver.2
            public void onRestoreStart(TopicPartition topicPartition, String str, long j2, long j3) {
            }

            public void onBatchRestored(TopicPartition topicPartition, String str, long j2, long j3) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String str, long j2) {
            }
        };
        Iterator it = this.internalTopologyBuilder.topicGroups().values().iterator();
        while (it.hasNext()) {
            this.internalTopics.addAll(((InternalTopologyBuilder.TopicsInfo) it.next()).repartitionSourceTopics.keySet());
        }
        for (String str : this.processorTopology.sourceTopics()) {
            TopicPartition topicPartition = new TopicPartition(str, PARTITION_ID);
            this.partitionsByTopic.put(str, topicPartition);
            this.offsetsByTopicPartition.put(topicPartition, new AtomicLong());
        }
        mockConsumer.assign(this.partitionsByTopic.values());
        if (buildGlobalStateTopology != null) {
            for (String str2 : buildGlobalStateTopology.sourceTopics()) {
                TopicPartition topicPartition2 = new TopicPartition(str2, PARTITION_ID);
                this.globalPartitionsByTopic.put(str2, topicPartition2);
                this.offsetsByTopicPartition.put(topicPartition2, new AtomicLong());
                mockConsumer.updatePartitions(str2, Collections.singletonList(new PartitionInfo(str2, PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null)));
                mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
                mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition2, 0L));
            }
            this.globalStateManager = new GlobalStateManagerImpl(new LogContext("mock "), buildGlobalStateTopology, mockConsumer, this.stateDirectory, stateRestoreListener, streamsConfig);
            GlobalProcessorContextImpl globalProcessorContextImpl = new GlobalProcessorContextImpl(streamsConfig, this.globalStateManager, streamsMetricsImpl, threadCache);
            this.globalStateManager.setGlobalProcessorContext(globalProcessorContextImpl);
            this.globalStateTask = new GlobalStateUpdateTask(buildGlobalStateTopology, globalProcessorContextImpl, this.globalStateManager, new LogAndContinueExceptionHandler(), new LogContext());
            this.globalStateTask.initialize();
        } else {
            this.globalStateManager = null;
            this.globalStateTask = null;
        }
        if (this.partitionsByTopic.isEmpty()) {
            this.task = null;
        } else {
            this.task = new StreamTask(TASK_ID, this.partitionsByTopic.values(), this.processorTopology, mockConsumer, new StoreChangelogReader(createRestoreConsumer(this.processorTopology.storeToChangelogTopic()), stateRestoreListener, new LogContext("topology-test-driver ")), streamsConfig, streamsMetricsImpl, this.stateDirectory, threadCache, this.mockTime, new StreamTask.ProducerSupplier() { // from class: org.apache.kafka.streams.TopologyTestDriver.3
                public Producer<byte[], byte[]> get() {
                    return TopologyTestDriver.this.producer;
                }
            });
            this.task.initializeStateStores();
            this.task.initializeTopology();
        }
        this.eosEnabled = streamsConfig.getString("processing.guarantee").equals("exactly_once");
    }

    public void pipeInput(ConsumerRecord<byte[], byte[]> consumerRecord) {
        String str = consumerRecord.topic();
        TopicPartition topicPartition = this.partitionsByTopic.get(str);
        if (topicPartition == null) {
            TopicPartition topicPartition2 = this.globalPartitionsByTopic.get(str);
            if (topicPartition2 == null) {
                throw new IllegalArgumentException("Unknown topic: " + str);
            }
            this.globalStateTask.update(new ConsumerRecord(topicPartition2.topic(), topicPartition2.partition(), this.offsetsByTopicPartition.get(topicPartition2).incrementAndGet() - 1, consumerRecord.timestamp(), consumerRecord.timestampType(), -1L, consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), consumerRecord.key(), consumerRecord.value()));
            this.globalStateTask.flushState();
            return;
        }
        long incrementAndGet = this.offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1;
        this.task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord(str, topicPartition.partition(), incrementAndGet, consumerRecord.timestamp(), consumerRecord.timestampType(), -1L, consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), consumerRecord.key(), consumerRecord.value())));
        this.task.context().setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), incrementAndGet, topicPartition.partition(), str));
        this.task.process();
        this.task.maybePunctuateStreamTime();
        this.task.commit();
        captureOutputRecords();
    }

    private void captureOutputRecords() {
        List<ProducerRecord<byte[], byte[]>> history = this.producer.history();
        this.producer.clear();
        if (this.eosEnabled && !this.producer.closed()) {
            this.producer.initTransactions();
            this.producer.beginTransaction();
        }
        for (ProducerRecord<byte[], byte[]> producerRecord : history) {
            Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(producerRecord.topic());
            if (queue == null) {
                queue = new LinkedList();
                this.outputRecordsByTopic.put(producerRecord.topic(), queue);
            }
            queue.add(producerRecord);
            String str = producerRecord.topic();
            if (this.internalTopics.contains(str) || this.processorTopology.sourceTopics().contains(str) || this.globalPartitionsByTopic.containsKey(str)) {
                byte[] bArr = (byte[]) producerRecord.key();
                byte[] bArr2 = (byte[]) producerRecord.value();
                pipeInput(new ConsumerRecord<>(str, -1, -1L, producerRecord.timestamp().longValue(), TimestampType.CREATE_TIME, 0L, bArr == null ? PARTITION_ID : bArr.length, bArr2 == null ? PARTITION_ID : bArr2.length, bArr, bArr2));
            }
        }
    }

    public void pipeInput(List<ConsumerRecord<byte[], byte[]>> list) {
        Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
        while (it.hasNext()) {
            pipeInput(it.next());
        }
    }

    public void advanceWallClockTime(long j) {
        this.mockTime.sleep(j);
        if (this.task != null) {
            this.task.maybePunctuateSystemTime();
            this.task.commit();
        }
        captureOutputRecords();
    }

    public ProducerRecord<byte[], byte[]> readOutput(String str) {
        Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(str);
        if (queue == null) {
            return null;
        }
        return queue.poll();
    }

    public <K, V> ProducerRecord<K, V> readOutput(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        ProducerRecord<byte[], byte[]> readOutput = readOutput(str);
        if (readOutput == null) {
            return null;
        }
        return new ProducerRecord<>(readOutput.topic(), readOutput.partition(), readOutput.timestamp(), deserializer.deserialize(readOutput.topic(), (byte[]) readOutput.key()), deserializer2.deserialize(readOutput.topic(), (byte[]) readOutput.value()));
    }

    public Map<String, StateStore> getAllStateStores() {
        HashMap hashMap = new HashMap();
        for (String str : this.internalTopologyBuilder.allStateStoreName()) {
            hashMap.put(str, getStateStore(str));
        }
        return hashMap;
    }

    public StateStore getStateStore(String str) {
        StateStore store = this.task == null ? null : this.task.context().getStateMgr().getStore(str);
        if (store == null && this.globalStateManager != null) {
            store = this.globalStateManager.getGlobalStore(str);
        }
        return store;
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String str) {
        KeyValueStore<K, V> stateStore = getStateStore(str);
        if (stateStore instanceof KeyValueStore) {
            return stateStore;
        }
        return null;
    }

    public <K, V> WindowStore<K, V> getWindowStore(String str) {
        WindowStore<K, V> stateStore = getStateStore(str);
        if (stateStore instanceof WindowStore) {
            return stateStore;
        }
        return null;
    }

    public <K, V> SessionStore<K, V> getSessionStore(String str) {
        SessionStore<K, V> stateStore = getStateStore(str);
        if (stateStore instanceof SessionStore) {
            return stateStore;
        }
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.task != null) {
            this.task.close(true, false);
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close();
            } catch (IOException e) {
            }
        }
        captureOutputRecords();
        if (!this.eosEnabled) {
            this.producer.close();
        }
        this.stateDirectory.clean();
    }

    private Producer<byte[], byte[]> get() {
        return this.producer;
    }

    private MockConsumer<byte[], byte[]> createRestoreConsumer(Map<String, String> map) {
        MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { // from class: org.apache.kafka.streams.TopologyTestDriver.4
            public synchronized void seekToEnd(Collection<TopicPartition> collection) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> collection) {
            }

            public synchronized long position(TopicPartition topicPartition) {
                return 0L;
            }
        };
        Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String value = it.next().getValue();
            ArrayList arrayList = new ArrayList();
            arrayList.add(new PartitionInfo(value, PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null));
            mockConsumer.updatePartitions(value, arrayList);
            mockConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(value, PARTITION_ID), 0L));
        }
        return mockConsumer;
    }
}
