package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManager.class */
public class ProcessorStateManager {
    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    public static final String LOCK_FILE_NAME = ".lock";
    private final String applicationId;
    private final int defaultPartition;
    private final Map<String, TopicPartition> partitionForTopic = new HashMap();
    private final File baseDir;
    private final FileLock directoryLock;
    private final Map<String, StateStore> stores;
    private final Set<String> loggingEnabled;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Map<TopicPartition, Long> restoredOffsets;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<TopicPartition, Long> offsetLimits;
    private final boolean isStandby;
    private final Map<String, StateRestoreCallback> restoreCallbacks;

    public ProcessorStateManager(String str, int i, Collection<TopicPartition> collection, File file, Consumer<byte[], byte[]> consumer, boolean z) throws IOException {
        this.applicationId = str;
        this.defaultPartition = i;
        for (TopicPartition topicPartition : collection) {
            this.partitionForTopic.put(topicPartition.topic(), topicPartition);
        }
        this.baseDir = file;
        this.stores = new HashMap();
        this.loggingEnabled = new HashSet();
        this.restoreConsumer = consumer;
        this.restoredOffsets = new HashMap();
        this.isStandby = z;
        this.restoreCallbacks = z ? new HashMap() : null;
        this.offsetLimits = new HashMap();
        createStateDirectory(file);
        this.directoryLock = lockStateDirectory(file, 5);
        if (this.directoryLock == null) {
            throw new IOException("Failed to lock the state directory: " + file.getCanonicalPath());
        }
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        this.checkpointedOffsets = new HashMap(offsetCheckpoint.read());
        offsetCheckpoint.delete();
    }

    private static void createStateDirectory(File file) throws IOException {
        if (file.exists()) {
            return;
        }
        file.mkdir();
    }

    public static String storeChangelogTopic(String str, String str2) {
        return str + "-" + str2 + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    public static FileLock lockStateDirectory(File file) throws IOException {
        return lockStateDirectory(file, 0);
    }

    private static FileLock lockStateDirectory(File file, int i) throws IOException {
        FileLock fileLock;
        FileChannel channel = new RandomAccessFile(new File(file, LOCK_FILE_NAME), "rw").getChannel();
        FileLock lockStateDirectory = lockStateDirectory(channel);
        while (true) {
            fileLock = lockStateDirectory;
            if (fileLock != null || i <= 0) {
                break;
            }
            try {
                Thread.sleep(200L);
            } catch (Exception e) {
            }
            i--;
            lockStateDirectory = lockStateDirectory(channel);
        }
        if (fileLock == null) {
            channel.close();
        }
        return fileLock;
    }

    private static FileLock lockStateDirectory(FileChannel fileChannel) throws IOException {
        try {
            return fileChannel.tryLock();
        } catch (OverlappingFileLockException e) {
            return null;
        }
    }

    public File baseDir() {
        return this.baseDir;
    }

    public void register(StateStore stateStore, boolean z, StateRestoreCallback stateRestoreCallback) {
        if (stateStore.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException("Illegal store name: .checkpoint");
        }
        if (this.stores.containsKey(stateStore.name())) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " has already been registered.");
        }
        if (z) {
            this.loggingEnabled.add(stateStore.name());
        }
        String storeChangelogTopic = z ? storeChangelogTopic(this.applicationId, stateStore.name()) : stateStore.name();
        int partition = getPartition(storeChangelogTopic);
        boolean z2 = true;
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
            }
            List partitionsFor = this.restoreConsumer.partitionsFor(storeChangelogTopic);
            if (partitionsFor != null) {
                Iterator it = partitionsFor.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (((PartitionInfo) it.next()).partition() == partition) {
                        z2 = false;
                        break;
                    }
                }
                if (!z2) {
                    break;
                }
            } else {
                throw new StreamsException("Could not find partition info for topic: " + storeChangelogTopic);
            }
        } while (System.currentTimeMillis() < currentTimeMillis + 5000);
        if (z2) {
            throw new StreamsException("Store " + stateStore.name() + "'s change log (" + storeChangelogTopic + ") does not contain partition " + partition);
        }
        this.stores.put(stateStore.name(), stateStore);
        if (!this.isStandby) {
            restoreActiveState(storeChangelogTopic, stateRestoreCallback);
        } else if (stateStore.persistent()) {
            this.restoreCallbacks.put(storeChangelogTopic, stateRestoreCallback);
        }
    }

    private void restoreActiveState(String str, StateRestoreCallback stateRestoreCallback) {
        if (!this.restoreConsumer.subscription().isEmpty()) {
            throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
        }
        TopicPartition topicPartition = new TopicPartition(str, getPartition(str));
        this.restoreConsumer.assign(Collections.singletonList(topicPartition));
        try {
            this.restoreConsumer.seekToEnd(Collections.singleton(topicPartition));
            long position = this.restoreConsumer.position(topicPartition);
            if (this.checkpointedOffsets.containsKey(topicPartition)) {
                this.restoreConsumer.seek(topicPartition, this.checkpointedOffsets.get(topicPartition).longValue());
            } else {
                this.restoreConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
            long offsetLimit = offsetLimit(topicPartition);
            do {
                long j = 0;
                for (ConsumerRecord consumerRecord : this.restoreConsumer.poll(100L).records(topicPartition)) {
                    j = consumerRecord.offset();
                    if (j >= offsetLimit) {
                        break;
                    } else {
                        stateRestoreCallback.restore((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value());
                    }
                }
                if (j < offsetLimit && this.restoreConsumer.position(topicPartition) != position) {
                }
                this.restoredOffsets.put(topicPartition, Long.valueOf(Math.min(offsetLimit, this.restoreConsumer.position(topicPartition))));
                this.restoreConsumer.assign(Collections.emptyList());
                return;
            } while (this.restoreConsumer.position(topicPartition) <= position);
            throw new IllegalStateException("Log end offset should not change while restoring");
        } catch (Throwable th) {
            this.restoreConsumer.assign(Collections.emptyList());
            throw th;
        }
    }

    public Map<TopicPartition, Long> checkpointedOffsets() {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, StateRestoreCallback>> it = this.restoreCallbacks.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            TopicPartition topicPartition = new TopicPartition(key, getPartition(key));
            if (this.checkpointedOffsets.containsKey(topicPartition)) {
                hashMap.put(topicPartition, this.checkpointedOffsets.get(topicPartition));
            } else {
                hashMap.put(topicPartition, -1L);
            }
        }
        return hashMap;
    }

    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> list) {
        long offsetLimit = offsetLimit(topicPartition);
        ArrayList arrayList = null;
        StateRestoreCallback stateRestoreCallback = this.restoreCallbacks.get(topicPartition.topic());
        long j = -1;
        int i = 0;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            if (consumerRecord.offset() < offsetLimit) {
                stateRestoreCallback.restore((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value());
                j = consumerRecord.offset();
            } else {
                if (arrayList == null) {
                    arrayList = new ArrayList(list.size() - i);
                }
                arrayList.add(consumerRecord);
            }
            i++;
        }
        this.restoredOffsets.put(topicPartition, Long.valueOf(j + 1));
        return arrayList;
    }

    public void putOffsetLimit(TopicPartition topicPartition, long j) {
        this.offsetLimits.put(topicPartition, Long.valueOf(j));
    }

    private long offsetLimit(TopicPartition topicPartition) {
        Long l = this.offsetLimits.get(topicPartition);
        if (l != null) {
            return l.longValue();
        }
        return Long.MAX_VALUE;
    }

    public StateStore getStore(String str) {
        return this.stores.get(str);
    }

    public void flush() {
        if (this.stores.isEmpty()) {
            return;
        }
        log.debug("Flushing stores.");
        Iterator<StateStore> it = this.stores.values().iterator();
        while (it.hasNext()) {
            it.next().flush();
        }
    }

    public void close(Map<TopicPartition, Long> map) throws IOException {
        try {
            if (!this.stores.isEmpty()) {
                log.debug("Closing stores.");
                for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
                    log.debug("Closing storage engine {}", entry.getKey());
                    entry.getValue().flush();
                    entry.getValue().close();
                }
                HashMap hashMap = new HashMap();
                for (String str : this.stores.keySet()) {
                    TopicPartition topicPartition = this.loggingEnabled.contains(str) ? new TopicPartition(storeChangelogTopic(this.applicationId, str), getPartition(str)) : new TopicPartition(str, getPartition(str));
                    if (this.stores.get(str).persistent()) {
                        Long l = map.get(topicPartition);
                        if (l != null) {
                            hashMap.put(topicPartition, Long.valueOf(l.longValue() + 1));
                        } else {
                            Long l2 = this.restoredOffsets.get(topicPartition);
                            if (l2 != null) {
                                hashMap.put(topicPartition, l2);
                            }
                        }
                    }
                }
                new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)).write(hashMap);
            }
        } finally {
            this.directoryLock.release();
            this.directoryLock.channel().close();
        }
    }

    private int getPartition(String str) {
        TopicPartition topicPartition = this.partitionForTopic.get(str);
        return topicPartition == null ? this.defaultPartition : topicPartition.partition();
    }
}
