package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.RecordConverters;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateManagerUtil.class */
public final class StateManagerUtil {
    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10000;

    private StateManagerUtil() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static RecordConverter converterForStore(StateStore stateStore) {
        return WrappedStateStore.isTimestamped(stateStore) ? RecordConverters.rawValueToTimestampedValue() : RecordConverters.identity();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean checkpointNeeded(boolean z, Map<TopicPartition, Long> map, Map<TopicPartition, Long> map2) {
        if (map == null) {
            return false;
        }
        if (z) {
            return true;
        }
        long j = 0;
        for (Map.Entry<TopicPartition, Long> entry : map2.entrySet()) {
            j += entry.getValue().longValue() - map.getOrDefault(entry.getKey(), 0L).longValue();
        }
        return j > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void registerStateStores(Logger logger, String str, ProcessorTopology processorTopology, ProcessorStateManager processorStateManager, StateDirectory stateDirectory, InternalProcessorContext internalProcessorContext) {
        if (processorTopology.stateStores().isEmpty()) {
            return;
        }
        TaskId taskId = processorStateManager.taskId();
        if (!stateDirectory.lock(taskId)) {
            throw new LockException(String.format("%sFailed to lock the state directory for task %s", str, taskId));
        }
        logger.debug("Acquired state directory lock");
        boolean directoryForTaskIsEmpty = stateDirectory.directoryForTaskIsEmpty(taskId);
        processorStateManager.registerStateStores(processorTopology.stateStores(), internalProcessorContext);
        logger.debug("Registered state stores");
        processorStateManager.initializeStoreOffsetsFromCheckpoint(directoryForTaskIsEmpty);
        logger.debug("Initialized state stores");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void closeStateManager(Logger logger, String str, boolean z, boolean z2, ProcessorStateManager processorStateManager, StateDirectory stateDirectory, Task.TaskType taskType) {
        boolean z3 = !z && z2;
        TaskId taskId = processorStateManager.taskId();
        logger.trace("Closing state manager for {} task {}", taskType, taskId);
        AtomicReference atomicReference = new AtomicReference(null);
        try {
            try {
                if (stateDirectory.lock(taskId)) {
                    try {
                        processorStateManager.close();
                        if (z3) {
                            try {
                                logger.debug("Wiping state stores for {} task {}", taskType, taskId);
                                Utils.delete(processorStateManager.baseDir());
                            } finally {
                            }
                        }
                        stateDirectory.unlock(taskId);
                    } catch (ProcessorStateException e) {
                        atomicReference.compareAndSet(null, e);
                        if (z3) {
                            try {
                                logger.debug("Wiping state stores for {} task {}", taskType, taskId);
                                Utils.delete(processorStateManager.baseDir());
                            } finally {
                            }
                        }
                        stateDirectory.unlock(taskId);
                    }
                } else {
                    logger.error("Failed to acquire lock while closing the state store for {} task {}", taskType, taskId);
                }
            } catch (Throwable th) {
                if (z3) {
                    try {
                        logger.debug("Wiping state stores for {} task {}", taskType, taskId);
                        Utils.delete(processorStateManager.baseDir());
                    } finally {
                        stateDirectory.unlock(taskId);
                    }
                }
                stateDirectory.unlock(taskId);
                throw th;
            }
        } catch (IOException e2) {
            atomicReference.compareAndSet(null, new ProcessorStateException(String.format("%sFatal error while trying to close the state manager for task %s", str, taskId), e2));
        }
        KafkaException kafkaException = (ProcessorStateException) atomicReference.get();
        if (kafkaException != null) {
            throw kafkaException;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TaskId parseTaskDirectoryName(String str, String str2) {
        int indexOf = str.indexOf(95);
        if (indexOf <= 0 || indexOf + 1 >= str.length()) {
            throw new TaskIdFormatException(str);
        }
        try {
            return new TaskId(Integer.parseInt(str.substring(0, indexOf)), Integer.parseInt(str.substring(indexOf + 1)), str2);
        } catch (Exception e) {
            throw new TaskIdFormatException(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String toTaskDirString(TaskId taskId) {
        return taskId.subtopology() + "_" + taskId.partition();
    }
}
