/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StoreChangelogReader
implements ChangelogReader {
    private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
    private final Consumer<byte[], byte[]> consumer;
    private final String logPrefix;
    private final Time time;
    private final long partitionValidationTimeoutMs;
    private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<String, List<PartitionInfo>>();
    private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<TopicPartition, StateRestorer>();

    public StoreChangelogReader(String threadId, Consumer<byte[], byte[]> consumer, Time time, long partitionValidationTimeoutMs) {
        this.time = time;
        this.consumer = consumer;
        this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
        this.logPrefix = String.format("stream-thread [%s]", threadId);
    }

    public StoreChangelogReader(Consumer<byte[], byte[]> consumer, Time time, long partitionValidationTimeoutMs) {
        this("", consumer, time, partitionValidationTimeoutMs);
    }

    @Override
    public void validatePartitionExists(TopicPartition topicPartition, String storeName) {
        long start = this.time.milliseconds();
        if (this.partitionInfo.isEmpty()) {
            try {
                this.partitionInfo.putAll(this.consumer.listTopics());
            }
            catch (TimeoutException e) {
                log.warn("{} Could not list topics so will fall back to partition by partition fetching", (Object)this.logPrefix);
            }
        }
        long endTime = this.time.milliseconds() + this.partitionValidationTimeoutMs;
        while (!this.hasPartition(topicPartition) && this.time.milliseconds() < endTime) {
            try {
                List partitions = this.consumer.partitionsFor(topicPartition.topic());
                if (partitions == null) continue;
                this.partitionInfo.put(topicPartition.topic(), partitions);
            }
            catch (TimeoutException e) {
                throw new StreamsException(String.format("Could not fetch partition info for topic: %s before expiration of the configured request timeout", topicPartition.topic()));
            }
        }
        if (!this.hasPartition(topicPartition)) {
            throw new StreamsException(String.format("Store %s's change log (%s) does not contain partition %s", storeName, topicPartition.topic(), topicPartition.partition()));
        }
        log.debug("{} Took {} ms to validate that partition {} exists", new Object[]{this.logPrefix, this.time.milliseconds() - start, topicPartition});
    }

    @Override
    public void register(StateRestorer restorer) {
        if (restorer.offsetLimit() > 0L) {
            this.stateRestorers.put(restorer.partition(), restorer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore() {
        long start = this.time.milliseconds();
        try {
            if (!this.consumer.subscription().isEmpty()) {
                throw new IllegalStateException(String.format("Restore consumer should have not subscribed to any partitions (%s) beforehand", this.consumer.subscription()));
            }
            Map endOffsets = this.consumer.endOffsets(this.stateRestorers.keySet());
            HashMap<TopicPartition, StateRestorer> needsRestoring = new HashMap<TopicPartition, StateRestorer>();
            for (Map.Entry entry : endOffsets.entrySet()) {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                Long offset = (Long)entry.getValue();
                StateRestorer restorer = this.stateRestorers.get(topicPartition);
                if (restorer.checkpoint() >= offset) {
                    restorer.setRestoredOffset(restorer.checkpoint());
                    continue;
                }
                needsRestoring.put(topicPartition, restorer);
            }
            log.info("{} Starting restoring state stores from changelog topics {}", (Object)this.logPrefix, needsRestoring.keySet());
            this.consumer.assign(needsRestoring.keySet());
            ArrayList<StateRestorer> needsPositionUpdate = new ArrayList<StateRestorer>();
            for (StateRestorer restorer : needsRestoring.values()) {
                if (restorer.checkpoint() != -1L) {
                    this.consumer.seek(restorer.partition(), restorer.checkpoint());
                    this.logRestoreOffsets(restorer.partition(), restorer.checkpoint(), (Long)endOffsets.get(restorer.partition()));
                    restorer.setStartingOffset(this.consumer.position(restorer.partition()));
                    continue;
                }
                this.consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                needsPositionUpdate.add(restorer);
            }
            for (StateRestorer restorer : needsPositionUpdate) {
                long position = this.consumer.position(restorer.partition());
                restorer.setStartingOffset(position);
                this.logRestoreOffsets(restorer.partition(), position, (Long)endOffsets.get(restorer.partition()));
            }
            HashSet partitions = new HashSet(needsRestoring.keySet());
            while (!partitions.isEmpty()) {
                ConsumerRecords allRecords = this.consumer.poll(10L);
                Iterator<TopicPartition> partitionIterator = partitions.iterator();
                while (partitionIterator.hasNext()) {
                    this.restorePartition(endOffsets, (ConsumerRecords<byte[], byte[]>)allRecords, partitionIterator);
                }
            }
        }
        finally {
            this.consumer.assign(Collections.emptyList());
            log.debug("{} Took {} ms to restore all active states", (Object)this.logPrefix, (Object)(this.time.milliseconds() - start));
        }
    }

    private void logRestoreOffsets(TopicPartition partition, long startingOffset, Long endOffset) {
        log.debug("{} Restoring partition {} from offset {} to endOffset {}", new Object[]{this.logPrefix, partition, startingOffset, endOffset});
    }

    @Override
    public Map<TopicPartition, Long> restoredOffsets() {
        HashMap<TopicPartition, Long> restoredOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.stateRestorers.entrySet()) {
            StateRestorer restorer = entry.getValue();
            if (!restorer.isPersistent()) continue;
            restoredOffsets.put(entry.getKey(), restorer.restoredOffset());
        }
        return restoredOffsets;
    }

    private void restorePartition(Map<TopicPartition, Long> endOffsets, ConsumerRecords<byte[], byte[]> allRecords, Iterator<TopicPartition> partitionIterator) {
        TopicPartition topicPartition = partitionIterator.next();
        StateRestorer restorer = this.stateRestorers.get(topicPartition);
        Long endOffset = endOffsets.get(topicPartition);
        long pos = this.processNext(allRecords.records(topicPartition), restorer, endOffset);
        if (restorer.hasCompleted(pos, endOffset)) {
            if (pos > endOffset + 1L) {
                throw new IllegalStateException(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d", topicPartition, endOffset, pos));
            }
            restorer.setRestoredOffset(pos);
            log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}", new Object[]{this.logPrefix, topicPartition, restorer.restoredNumRecords(), restorer.startingOffset(), restorer.restoredOffset()});
            partitionIterator.remove();
        }
    }

    private long processNext(List<ConsumerRecord<byte[], byte[]>> records, StateRestorer restorer, Long endOffset) {
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long offset = record.offset();
            if (restorer.hasCompleted(offset, endOffset)) {
                return offset;
            }
            if (record.key() == null) continue;
            restorer.restore((byte[])record.key(), (byte[])record.value());
        }
        return this.consumer.position(restorer.partition());
    }

    private boolean hasPartition(TopicPartition topicPartition) {
        List<PartitionInfo> partitions = this.partitionInfo.get(topicPartition.topic());
        if (partitions == null) {
            return false;
        }
        for (PartitionInfo partition : partitions) {
            if (partition.partition() != topicPartition.partition()) continue;
            return true;
        }
        return false;
    }
}

