package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StoreChangelogReader.class */
public class StoreChangelogReader implements ChangelogReader {
    private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
    private final String logPrefix;
    private final Consumer<byte[], byte[]> consumer;
    private final Map<TopicPartition, Long> endOffsets;
    private final Map<String, List<PartitionInfo>> partitionInfo;
    private final Map<TopicPartition, StateRestorer> stateRestorers;
    private final Map<TopicPartition, StateRestorer> needsRestoring;
    private final Map<TopicPartition, StateRestorer> needsInitializing;

    public StoreChangelogReader(String str, Consumer<byte[], byte[]> consumer) {
        this.endOffsets = new HashMap();
        this.partitionInfo = new HashMap();
        this.stateRestorers = new HashMap();
        this.needsRestoring = new HashMap();
        this.needsInitializing = new HashMap();
        this.consumer = consumer;
        this.logPrefix = String.format("stream-thread [%s]", str);
    }

    public StoreChangelogReader(Consumer<byte[], byte[]> consumer) {
        this("", consumer);
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public void register(StateRestorer stateRestorer) {
        if (!this.stateRestorers.containsKey(stateRestorer.partition())) {
            this.stateRestorers.put(stateRestorer.partition(), stateRestorer);
            log.trace("Added restorer for changelog {}", stateRestorer.partition());
        }
        this.needsInitializing.put(stateRestorer.partition(), stateRestorer);
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public Collection<TopicPartition> restore(Collection<StreamTask> collection) {
        if (!this.needsInitializing.isEmpty()) {
            initialize(collection);
        }
        if (this.needsRestoring.isEmpty()) {
            this.consumer.assign(Collections.emptyList());
            return completed();
        }
        HashSet hashSet = new HashSet(this.needsRestoring.keySet());
        ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(10L);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            restorePartition(poll, (TopicPartition) it.next());
        }
        if (this.needsRestoring.isEmpty()) {
            this.consumer.assign(Collections.emptyList());
        }
        return completed();
    }

    private void initialize(Collection<StreamTask> collection) {
        if (!this.consumer.subscription().isEmpty()) {
            throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + this.consumer.subscription() + ")");
        }
        refreshChangelogInfo();
        Map<TopicPartition, StateRestorer> hashMap = new HashMap<>();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.needsInitializing.entrySet()) {
            if (hasPartition(entry.getKey())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        try {
            this.endOffsets.putAll(this.consumer.endOffsets(hashMap.keySet()));
            Iterator<TopicPartition> it = hashMap.keySet().iterator();
            while (it.hasNext()) {
                TopicPartition next = it.next();
                Long l = this.endOffsets.get(next);
                if (l != null) {
                    StateRestorer stateRestorer = this.needsInitializing.get(next);
                    if (stateRestorer.checkpoint() >= l.longValue()) {
                        stateRestorer.setRestoredOffset(stateRestorer.checkpoint());
                        it.remove();
                    } else if (stateRestorer.offsetLimit() == 0 || l.longValue() == 0) {
                        stateRestorer.setRestoredOffset(0L);
                        it.remove();
                    }
                    this.needsInitializing.remove(next);
                } else {
                    log.info("{} End offset cannot be found form the returned metadata; removing this partition from the current run loop", this.logPrefix);
                    it.remove();
                }
            }
            if (hashMap.isEmpty()) {
                return;
            }
            startRestoration(hashMap, collection);
        } catch (TimeoutException e) {
            log.debug("{} Could not fetch end offset for {}; will fall back to partition by partition fetching", this.logPrefix, hashMap);
        }
    }

    private void startRestoration(Map<TopicPartition, StateRestorer> map, Collection<StreamTask> collection) {
        log.debug("{} Start restoring state stores from changelog topics {}", this.logPrefix, map.keySet());
        HashSet hashSet = new HashSet(this.consumer.assignment());
        hashSet.addAll(map.keySet());
        this.consumer.assign(hashSet);
        ArrayList<StateRestorer> arrayList = new ArrayList();
        for (StateRestorer stateRestorer : map.values()) {
            TopicPartition partition = stateRestorer.partition();
            if (stateRestorer.checkpoint() != -1) {
                this.consumer.seek(partition, stateRestorer.checkpoint());
                logRestoreOffsets(partition, stateRestorer.checkpoint(), this.endOffsets.get(partition));
                stateRestorer.setStartingOffset(this.consumer.position(partition));
            } else {
                this.consumer.seekToBeginning(Collections.singletonList(partition));
                arrayList.add(stateRestorer);
            }
        }
        for (StateRestorer stateRestorer2 : arrayList) {
            TopicPartition partition2 = stateRestorer2.partition();
            for (StreamTask streamTask : collection) {
                if (streamTask.changelogPartitions().contains(partition2) || streamTask.partitions().contains(partition2)) {
                    if (streamTask.eosEnabled) {
                        log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. Reinitializing the task and restore its state from the beginning.", new Object[]{streamTask.id, stateRestorer2.storeName(), stateRestorer2.partition()});
                        this.needsInitializing.remove(partition2);
                        stateRestorer2.setCheckpointOffset(this.consumer.position(partition2));
                        streamTask.reinitializeStateStoresForPartitions(partition2);
                    } else {
                        log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", new Object[]{streamTask.id, stateRestorer2.storeName(), stateRestorer2.partition()});
                        long position = this.consumer.position(partition2);
                        logRestoreOffsets(partition2, position, this.endOffsets.get(partition2));
                        stateRestorer2.setStartingOffset(position);
                    }
                }
            }
        }
        this.needsRestoring.putAll(map);
    }

    private void logRestoreOffsets(TopicPartition topicPartition, long j, Long l) {
        log.debug("{} Restoring partition {} from offset {} to endOffset {}", new Object[]{this.logPrefix, topicPartition, Long.valueOf(j), l});
    }

    private Collection<TopicPartition> completed() {
        HashSet hashSet = new HashSet(this.stateRestorers.keySet());
        hashSet.removeAll(this.needsRestoring.keySet());
        log.trace("{} completed partitions {}", this.logPrefix, hashSet);
        return hashSet;
    }

    private void refreshChangelogInfo() {
        try {
            this.partitionInfo.putAll(this.consumer.listTopics());
        } catch (TimeoutException e) {
            log.debug("{} Could not fetch topic metadata within the timeout, will retry in the next run loop", this.logPrefix);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public Map<TopicPartition, Long> restoredOffsets() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.stateRestorers.entrySet()) {
            StateRestorer value = entry.getValue();
            if (value.isPersistent()) {
                hashMap.put(entry.getKey(), value.restoredOffset());
            }
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public void reset() {
        this.partitionInfo.clear();
        this.stateRestorers.clear();
        this.needsRestoring.clear();
        this.endOffsets.clear();
        this.needsInitializing.clear();
    }

    private void restorePartition(ConsumerRecords<byte[], byte[]> consumerRecords, TopicPartition topicPartition) {
        StateRestorer stateRestorer = this.stateRestorers.get(topicPartition);
        Long l = this.endOffsets.get(topicPartition);
        long processNext = processNext(consumerRecords.records(topicPartition), stateRestorer, l);
        stateRestorer.setRestoredOffset(processNext);
        if (stateRestorer.hasCompleted(processNext, l.longValue())) {
            if (processNext > l.longValue() + 1) {
                throw new IllegalStateException(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d", topicPartition, l, Long.valueOf(processNext)));
            }
            log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}", new Object[]{this.logPrefix, topicPartition, Long.valueOf(stateRestorer.restoredNumRecords()), Long.valueOf(stateRestorer.startingOffset()), stateRestorer.restoredOffset()});
            this.needsRestoring.remove(topicPartition);
        }
    }

    private long processNext(List<ConsumerRecord<byte[], byte[]>> list, StateRestorer stateRestorer, Long l) {
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            long offset = consumerRecord.offset();
            if (stateRestorer.hasCompleted(offset, l.longValue())) {
                return offset;
            }
            if (consumerRecord.key() != null) {
                stateRestorer.restore((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value());
            }
        }
        return this.consumer.position(stateRestorer.partition());
    }

    private boolean hasPartition(TopicPartition topicPartition) {
        List<PartitionInfo> list = this.partitionInfo.get(topicPartition.topic());
        if (list == null) {
            return false;
        }
        Iterator<PartitionInfo> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().partition() == topicPartition.partition()) {
                return true;
            }
        }
        return false;
    }
}
