package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManager.class */
public class ProcessorStateManager implements StateManager {
    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    private Logger log;
    private String logPrefix;
    private final TaskId taskId;
    private final boolean eosEnabled;
    private final ChangelogRegister changelogReader;
    private final Collection<TopicPartition> sourcePartitions;
    private final Map<String, String> storeToChangelogTopic;
    private final FixedOrderMap<String, StateStoreMetadata> stores = new FixedOrderMap<>();
    private final FixedOrderMap<String, StateStore> globalStores = new FixedOrderMap<>();
    private final File baseDir;
    private final OffsetCheckpoint checkpointFile;
    private Task.TaskType taskType;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManager$StateStoreMetadata.class */
    public static class StateStoreMetadata {
        private final StateStore stateStore;
        private final TopicPartition changelogPartition;
        private final StateRestoreCallback restoreCallback;
        private final CommitCallback commitCallback;
        private final RecordConverter recordConverter;
        private Long offset;
        private boolean corrupted;

        private StateStoreMetadata(StateStore stateStore, CommitCallback commitCallback) {
            this.stateStore = stateStore;
            this.commitCallback = commitCallback;
            this.restoreCallback = null;
            this.recordConverter = null;
            this.changelogPartition = null;
            this.corrupted = false;
            this.offset = null;
        }

        private StateStoreMetadata(StateStore stateStore, TopicPartition topicPartition, StateRestoreCallback stateRestoreCallback, CommitCallback commitCallback, RecordConverter recordConverter) {
            if (stateRestoreCallback == null) {
                throw new IllegalStateException("Log enabled store should always provide a restore callback upon registration");
            }
            this.stateStore = stateStore;
            this.changelogPartition = topicPartition;
            this.restoreCallback = stateRestoreCallback;
            this.commitCallback = commitCallback;
            this.recordConverter = recordConverter;
            this.offset = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setOffset(Long l) {
            this.offset = l;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Long offset() {
            return this.offset;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TopicPartition changelogPartition() {
            return this.changelogPartition;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public StateStore store() {
            return this.stateStore;
        }

        public String toString() {
            return "StateStoreMetadata (" + this.stateStore.name() + " : " + this.changelogPartition + " @ " + this.offset;
        }
    }

    public static String storeChangelogTopic(String str, String str2, String str3) {
        return str3 == null ? str + "-" + str2 + STATE_CHANGELOG_TOPIC_SUFFIX : str + "-" + str3 + "-" + str2 + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    public ProcessorStateManager(TaskId taskId, Task.TaskType taskType, boolean z, LogContext logContext, StateDirectory stateDirectory, ChangelogRegister changelogRegister, Map<String, String> map, Collection<TopicPartition> collection) throws ProcessorStateException {
        this.storeToChangelogTopic = map;
        this.log = logContext.logger(ProcessorStateManager.class);
        this.logPrefix = logContext.logPrefix();
        this.taskId = taskId;
        this.taskType = taskType;
        this.eosEnabled = z;
        this.changelogReader = changelogRegister;
        this.sourcePartitions = collection;
        this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
        this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
        this.log.debug("Created state store manager for task {}", taskId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerStateStores(List<StateStore> list, InternalProcessorContext internalProcessorContext) {
        internalProcessorContext.uninitialize();
        for (StateStore stateStore : list) {
            if (this.stores.containsKey(stateStore.name())) {
                maybeRegisterStoreWithChangelogReader(stateStore.name());
            } else {
                stateStore.init((StateStoreContext) internalProcessorContext, stateStore);
            }
            this.log.trace("Registered state store {}", stateStore.name());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerGlobalStateStores(List<StateStore> list) {
        this.log.debug("Register global stores {}", list);
        for (StateStore stateStore : list) {
            this.globalStores.put(stateStore.name(), stateStore);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getGlobalStore(String str) {
        return (StateStore) this.globalStores.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeStoreOffsetsFromCheckpoint(boolean z) {
        try {
            Map<TopicPartition, Long> read = this.checkpointFile.read();
            this.log.trace("Loaded offsets from the checkpoint file: {}", read);
            for (StateStoreMetadata stateStoreMetadata : this.stores.values()) {
                if (stateStoreMetadata.corrupted) {
                    this.log.error("Tried to initialize store offsets for corrupted store {}", stateStoreMetadata);
                    throw new IllegalStateException("Should not initialize offsets for a corrupted task");
                }
                if (stateStoreMetadata.changelogPartition == null) {
                    this.log.info("State store {} is not logged and hence would not be restored", stateStoreMetadata.stateStore.name());
                } else if (!stateStoreMetadata.stateStore.persistent()) {
                    this.log.info("Initializing to the starting offset for changelog {} of in-memory state store {}", stateStoreMetadata.changelogPartition, stateStoreMetadata.stateStore.name());
                } else if (stateStoreMetadata.offset() != null) {
                    read.remove(stateStoreMetadata.changelogPartition);
                    this.log.debug("Skipping re-initialization of offset from checkpoint for recycled store {}", stateStoreMetadata.stateStore.name());
                } else if (read.containsKey(stateStoreMetadata.changelogPartition)) {
                    stateStoreMetadata.setOffset(changelogOffsetFromCheckpointedOffset(read.remove(stateStoreMetadata.changelogPartition).longValue()));
                    this.log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", new Object[]{stateStoreMetadata.stateStore.name(), stateStoreMetadata.offset, stateStoreMetadata.changelogPartition});
                } else {
                    if (this.eosEnabled && !z) {
                        this.log.warn("State store {} did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task {} before re-bootstrapping", stateStoreMetadata.stateStore.name(), this.taskId);
                        throw new TaskCorruptedException(Collections.singleton(this.taskId));
                    }
                    this.log.info("State store {} did not find checkpoint offset, hence would default to the starting offset at changelog {}", stateStoreMetadata.stateStore.name(), stateStoreMetadata.changelogPartition);
                }
            }
            if (!read.isEmpty()) {
                this.log.warn("Some loaded checkpoint offsets cannot find their corresponding state stores: {}", read);
            }
            if (this.eosEnabled) {
                this.checkpointFile.delete();
            }
        } catch (IOException | RuntimeException e) {
            throw new ProcessorStateException(String.format("%sError loading and deleting checkpoint file when creating the state manager", this.logPrefix), e);
        } catch (TaskCorruptedException e2) {
            throw e2;
        }
    }

    private void maybeRegisterStoreWithChangelogReader(String str) {
        if (isLoggingEnabled(str)) {
            this.changelogReader.register(getStorePartition(str), this);
        }
    }

    private List<TopicPartition> getAllChangelogTopicPartitions() {
        ArrayList arrayList = new ArrayList();
        for (StateStoreMetadata stateStoreMetadata : this.stores.values()) {
            if (stateStoreMetadata.changelogPartition != null) {
                arrayList.add(stateStoreMetadata.changelogPartition);
            }
        }
        return arrayList;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public File baseDir() {
        return this.baseDir;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void registerStore(StateStore stateStore, StateRestoreCallback stateRestoreCallback, CommitCallback commitCallback) {
        String name = stateStore.name();
        if (".checkpoint".equals(name)) {
            stateStore.close();
            throw new IllegalArgumentException(String.format("%sIllegal store name: %s, which collides with the pre-defined checkpoint file name", this.logPrefix, name));
        }
        if (this.stores.containsKey(name)) {
            stateStore.close();
            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", this.logPrefix, name));
        }
        if (stateRestoreCallback instanceof StateRestoreListener) {
            this.log.warn("The registered state restore callback is also implementing the state restore listener interface, which is not expected and would be ignored");
        }
        this.stores.put(name, isLoggingEnabled(name) ? new StateStoreMetadata(stateStore, getStorePartition(name), stateRestoreCallback, commitCallback, StateManagerUtil.converterForStore(stateStore)) : new StateStoreMetadata(stateStore, commitCallback));
        maybeRegisterStoreWithChangelogReader(name);
        this.log.debug("Registered state store {} to its state manager", name);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getStore(String str) {
        if (this.stores.containsKey(str)) {
            return ((StateStoreMetadata) this.stores.get(str)).stateStore;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<TopicPartition> changelogPartitions() {
        return changelogOffsets().keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
        for (StateStoreMetadata stateStoreMetadata : this.stores.values()) {
            if (collection.contains(stateStoreMetadata.changelogPartition)) {
                stateStoreMetadata.corrupted = true;
                collection.remove(stateStoreMetadata.changelogPartition);
            }
        }
        if (!collection.isEmpty()) {
            throw new IllegalStateException("Some partitions " + collection + " are not contained in the store list of task " + this.taskId + " marking as corrupted, this is not expected");
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public Map<TopicPartition, Long> changelogOffsets() {
        HashMap hashMap = new HashMap();
        for (StateStoreMetadata stateStoreMetadata : this.stores.values()) {
            if (stateStoreMetadata.changelogPartition != null) {
                hashMap.put(stateStoreMetadata.changelogPartition, Long.valueOf(stateStoreMetadata.offset == null ? 0L : stateStoreMetadata.offset.longValue() + 1));
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskId taskId() {
        return this.taskId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean changelogAsSource(TopicPartition topicPartition) {
        return this.sourcePartitions.contains(topicPartition);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public Task.TaskType taskType() {
        return this.taskType;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateStoreMetadata storeMetadata(TopicPartition topicPartition) {
        for (StateStoreMetadata stateStoreMetadata : this.stores.values()) {
            if (topicPartition.equals(stateStoreMetadata.changelogPartition)) {
                return stateStoreMetadata;
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restore(StateStoreMetadata stateStoreMetadata, List<ConsumerRecord<byte[], byte[]>> list) {
        if (!this.stores.containsValue(stateStoreMetadata)) {
            throw new IllegalStateException("Restoring " + stateStoreMetadata + " which is not registered in this state manager, this should not happen.");
        }
        if (list.isEmpty()) {
            return;
        }
        Long valueOf = Long.valueOf(list.get(list.size() - 1).offset());
        RecordBatchingStateRestoreCallback adapt = StateRestoreCallbackAdapter.adapt(stateStoreMetadata.restoreCallback);
        Stream<ConsumerRecord<byte[], byte[]>> stream = list.stream();
        RecordConverter recordConverter = stateStoreMetadata.recordConverter;
        recordConverter.getClass();
        try {
            adapt.restoreBatch((List) stream.map(recordConverter::convert).collect(Collectors.toList()));
            stateStoreMetadata.setOffset(valueOf);
        } catch (RuntimeException e) {
            throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", this.logPrefix, stateStoreMetadata.changelogPartition), e);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void flush() {
        KafkaException kafkaException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Flushing all stores registered in the state manager: {}", this.stores);
            Iterator it = this.stores.values().iterator();
            while (it.hasNext()) {
                StateStore stateStore = ((StateStoreMetadata) it.next()).stateStore;
                this.log.trace("Flushing store {}", stateStore.name());
                try {
                    stateStore.flush();
                } catch (RuntimeException e) {
                    if (kafkaException == null) {
                        kafkaException = e instanceof StreamsException ? e : new ProcessorStateException(String.format("%sFailed to flush state store %s", this.logPrefix, stateStore.name()), e);
                    }
                    this.log.error("Failed to flush state store {}: ", stateStore.name(), e);
                }
            }
        }
        if (kafkaException != null) {
            throw kafkaException;
        }
    }

    public void flushCache() {
        KafkaException kafkaException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Flushing all store caches registered in the state manager: {}", this.stores);
            Iterator it = this.stores.values().iterator();
            while (it.hasNext()) {
                StateStore stateStore = ((StateStoreMetadata) it.next()).stateStore;
                try {
                    if (stateStore instanceof TimeOrderedKeyValueBuffer) {
                        stateStore.flush();
                    } else if (stateStore instanceof CachedStateStore) {
                        ((CachedStateStore) stateStore).flushCache();
                    }
                    this.log.trace("Flushed cache or buffer {}", stateStore.name());
                } catch (RuntimeException e) {
                    if (kafkaException == null) {
                        kafkaException = e instanceof StreamsException ? e : new ProcessorStateException(String.format("%sFailed to flush cache of store %s", this.logPrefix, stateStore.name()), e);
                    }
                    this.log.error("Failed to flush cache of store {}: ", stateStore.name(), e);
                }
            }
        }
        if (kafkaException != null) {
            throw kafkaException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void close() throws ProcessorStateException {
        this.log.debug("Closing its state manager and all the registered state stores: {}", this.stores);
        this.changelogReader.unregister(getAllChangelogTopicPartitions());
        KafkaException kafkaException = null;
        if (!this.stores.isEmpty()) {
            Iterator it = this.stores.entrySet().iterator();
            while (it.hasNext()) {
                StateStore stateStore = ((StateStoreMetadata) ((Map.Entry) it.next()).getValue()).stateStore;
                this.log.trace("Closing store {}", stateStore.name());
                try {
                    stateStore.close();
                } catch (RuntimeException e) {
                    if (kafkaException == null) {
                        kafkaException = e instanceof StreamsException ? e : new ProcessorStateException(String.format("%sFailed to close state store %s", this.logPrefix, stateStore.name()), e);
                    }
                    this.log.error("Failed to close state store {}: ", stateStore.name(), e);
                }
            }
            this.stores.clear();
        }
        if (kafkaException != null) {
            throw kafkaException;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recycle() {
        this.log.debug("Recycling state for {} task {}.", this.taskType, this.taskId);
        this.changelogReader.unregister(getAllChangelogTopicPartitions());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void transitionTaskType(Task.TaskType taskType, LogContext logContext) {
        if (this.taskType.equals(taskType)) {
            throw new IllegalStateException("Tried to recycle state for task type conversion but new type was the same.");
        }
        Task.TaskType taskType2 = this.taskType;
        this.taskType = taskType;
        this.log = logContext.logger(ProcessorStateManager.class);
        this.logPrefix = logContext.logPrefix();
        this.log.debug("Transitioning state manager for {} task {} to {}", new Object[]{taskType2, this.taskId, taskType});
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void updateChangelogOffsets(Map<TopicPartition, Long> map) {
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            StateStoreMetadata findStore = findStore(entry.getKey());
            if (findStore != null) {
                findStore.setOffset(entry.getValue());
                this.log.debug("State store {} updated to written offset {} at changelog {}", new Object[]{findStore.stateStore.name(), findStore.offset, findStore.changelogPartition});
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void checkpoint() {
        HashMap hashMap = new HashMap();
        for (StateStoreMetadata stateStoreMetadata : this.stores.values()) {
            if (stateStoreMetadata.commitCallback != null && !stateStoreMetadata.corrupted) {
                try {
                    stateStoreMetadata.commitCallback.onCommit();
                } catch (IOException e) {
                    throw new ProcessorStateException(String.format("%sException caught while trying to checkpoint store, changelog partition %s", this.logPrefix, stateStoreMetadata.changelogPartition), e);
                }
            }
            if (stateStoreMetadata.changelogPartition != null && stateStoreMetadata.stateStore.persistent() && !stateStoreMetadata.corrupted) {
                hashMap.put(stateStoreMetadata.changelogPartition, Long.valueOf(checkpointableOffsetFromChangelogOffset(stateStoreMetadata.offset)));
            }
        }
        this.log.debug("Writing checkpoint: {}", hashMap);
        try {
            this.checkpointFile.write(hashMap);
        } catch (IOException e2) {
            this.log.warn("Failed to write offset checkpoint file to [{}]. This may occur if OS cleaned the state.dir in case when it located in ${java.io.tmpdir} directory. This may also occur due to running multiple instances on the same machine using the same state dir. Changing the location of state.dir may resolve the problem.", this.checkpointFile, e2);
        }
    }

    private TopicPartition getStorePartition(String str) {
        return new TopicPartition(changelogFor(str), this.taskId.partition());
    }

    private boolean isLoggingEnabled(String str) {
        return changelogFor(str) != null;
    }

    private StateStoreMetadata findStore(TopicPartition topicPartition) {
        List list = (List) this.stores.values().stream().filter(stateStoreMetadata -> {
            return topicPartition.equals(stateStoreMetadata.changelogPartition);
        }).collect(Collectors.toList());
        if (list.size() > 1) {
            throw new IllegalStateException("Multiple state stores are found for changelog partition " + topicPartition + ", this should never happen: " + list);
        }
        if (list.isEmpty()) {
            return null;
        }
        return (StateStoreMetadata) list.get(0);
    }

    private long checkpointableOffsetFromChangelogOffset(Long l) {
        if (l != null) {
            return l.longValue();
        }
        return -4L;
    }

    private Long changelogOffsetFromCheckpointedOffset(long j) {
        if (j != -4) {
            return Long.valueOf(j);
        }
        return null;
    }

    public TopicPartition registeredChangelogPartitionFor(String str) {
        StateStoreMetadata stateStoreMetadata = (StateStoreMetadata) this.stores.get(str);
        if (stateStoreMetadata == null) {
            throw new IllegalStateException("State store " + str + " for which the registered changelog partition should be retrieved has not been registered");
        }
        if (stateStoreMetadata.changelogPartition == null) {
            throw new IllegalStateException("Registered state store " + str + " does not have a registered changelog partition. This may happen if logging is disabled for the state store.");
        }
        return stateStoreMetadata.changelogPartition;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public String changelogFor(String str) {
        return this.storeToChangelogTopic.get(str);
    }

    public void deleteCheckPointFileIfEOSEnabled() throws IOException {
        if (this.eosEnabled) {
            this.checkpointFile.delete();
        }
    }
}
