/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.client;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;

public class MockConsumerSupplier<K, V>
implements Supplier<Consumer<K, V>> {
    private final MockProducer<K, V> producer;
    private final String clientIdSuffix;
    private final Set<TopicPartition> topicPartitions;
    private final Map<String, TierMockConsumer<K, V>> consumers = new HashMap<String, TierMockConsumer<K, V>>();
    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> logs = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
    private int instanceId = 0;
    private int numRecordsProcessed = 0;

    public MockConsumerSupplier(String clientIdSuffix, Set<TopicPartition> topicPartitions2, MockProducer<K, V> producer) {
        this.clientIdSuffix = clientIdSuffix;
        this.topicPartitions = topicPartitions2;
        this.producer = producer;
    }

    @Override
    public synchronized Consumer<K, V> get() {
        String clientId = this.instanceId + this.clientIdSuffix;
        ++this.instanceId;
        TierMockConsumer<K, V> tierMockConsumer = new TierMockConsumer<K, V>(this.topicPartitions);
        tierMockConsumer.consumeTillEnd(this.logs);
        this.consumers.put(clientId, tierMockConsumer);
        return ((TierMockConsumer)tierMockConsumer).consumer;
    }

    public synchronized void moveRecordsFromProducer() {
        List toProcess = this.producer.history().subList(this.numRecordsProcessed, this.producer.history().size());
        for (ProducerRecord producerRecord : toProcess) {
            TopicPartition topicPartition = new TopicPartition(producerRecord.topic(), producerRecord.partition().intValue());
            this.logs.putIfAbsent(topicPartition, new LinkedList());
            List<ConsumerRecord<K, V>> log2 = this.logs.get(topicPartition);
            log2.add(new ConsumerRecord(producerRecord.topic(), producerRecord.partition().intValue(), (long)log2.size(), producerRecord.key(), producerRecord.value()));
            ++this.numRecordsProcessed;
        }
        this.removeClosed();
        for (TierMockConsumer tierMockConsumer : this.consumers.values()) {
            tierMockConsumer.consumeTillEnd(this.logs);
        }
    }

    public synchronized void setConsumerPositionException(KafkaException exception) {
        this.consumers.values().forEach(tierMockConsumer -> ((TierMockConsumer)tierMockConsumer).consumer.setPositionException(exception));
    }

    public synchronized List<Consumer<K, V>> consumers() {
        return this.consumers.values().stream().map(tierMockConsumer -> ((TierMockConsumer)tierMockConsumer).consumer).collect(Collectors.toList());
    }

    private void removeClosed() {
        this.consumers.keySet().removeIf(clientId -> ((TierMockConsumer)this.consumers.get(clientId)).consumer.closed());
    }

    private static class TierMockConsumer<K, V> {
        private final MockConsumer<K, V> consumer;

        public TierMockConsumer(Set<TopicPartition> topicPartitions2) {
            HashMap beginningOffsets = new HashMap();
            topicPartitions2.stream().forEach(topicPartition -> beginningOffsets.put(topicPartition, 0L));
            this.consumer = new MockConsumer(OffsetResetStrategy.NONE);
            this.consumer.updateBeginningOffsets(beginningOffsets);
            this.consumer.updateEndOffsets(beginningOffsets);
        }

        public void consumeTillEnd(Map<TopicPartition, List<ConsumerRecord<K, V>>> logs) {
            Set assignment2 = this.consumer.assignment();
            for (TopicPartition topicPartition : assignment2) {
                List<ConsumerRecord<K, V>> records2 = logs.get(topicPartition);
                if (records2 == null) continue;
                this.consumeTillEnd(topicPartition, records2);
            }
        }

        private void consumeTillEnd(TopicPartition topicPartition, List<ConsumerRecord<K, V>> log2) {
            Map endOffsets = this.consumer.endOffsets((Collection)this.consumer.assignment());
            long currentEndOffset = endOffsets.getOrDefault(topicPartition, 0L);
            List<ConsumerRecord<K, V>> records2 = log2.subList((int)currentEndOffset, log2.size());
            for (ConsumerRecord<K, V> record : records2) {
                this.consumer.addRecord(record);
            }
            endOffsets.put(topicPartition, Long.valueOf(log2.size()));
            this.consumer.updateEndOffsets(endOffsets);
        }
    }
}

