/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;

public class TaskManager {
    private final Logger log;
    private final ChangelogReader changelogReader;
    private final UUID processId;
    private final String logPrefix;
    private final ActiveTaskCreator activeTaskCreator;
    private final StandbyTaskCreator standbyTaskCreator;
    private final InternalTopologyBuilder builder;
    private final Admin adminClient;
    private final StateDirectory stateDirectory;
    private final StreamThread.ProcessingMode processingMode;
    private final Map<TaskId, Task> tasks = new TreeMap<TaskId, Task>();
    private final Map<TopicPartition, Task> partitionToTask = new HashMap<TopicPartition, Task>();
    private Consumer<byte[], byte[]> mainConsumer;
    private DeleteRecordsResult deleteRecordsResult;
    private boolean rebalanceInProgress = false;
    private final Set<TaskId> lockedTaskDirectories = new HashSet<TaskId>();
    private java.util.function.Consumer<Set<TopicPartition>> resetter;

    TaskManager(ChangelogReader changelogReader, UUID processId, String logPrefix, ActiveTaskCreator activeTaskCreator, StandbyTaskCreator standbyTaskCreator, InternalTopologyBuilder builder, Admin adminClient, StateDirectory stateDirectory, StreamThread.ProcessingMode processingMode) {
        this.changelogReader = changelogReader;
        this.processId = processId;
        this.logPrefix = logPrefix;
        this.activeTaskCreator = activeTaskCreator;
        this.standbyTaskCreator = standbyTaskCreator;
        this.builder = builder;
        this.adminClient = adminClient;
        this.stateDirectory = stateDirectory;
        this.processingMode = processingMode;
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
    }

    void setMainConsumer(Consumer<byte[], byte[]> mainConsumer) {
        this.mainConsumer = mainConsumer;
    }

    Consumer<byte[], byte[]> mainConsumer() {
        return this.mainConsumer;
    }

    public UUID processId() {
        return this.processId;
    }

    InternalTopologyBuilder builder() {
        return this.builder;
    }

    boolean isRebalanceInProgress() {
        return this.rebalanceInProgress;
    }

    void handleRebalanceStart(Set<String> subscribedTopics) {
        this.builder.addSubscribedTopicsFromMetadata(subscribedTopics, this.logPrefix);
        this.tryToLockAllNonEmptyTaskDirectories();
        this.rebalanceInProgress = true;
    }

    void handleRebalanceComplete() {
        this.mainConsumer.pause((Collection)this.mainConsumer.assignment());
        this.releaseLockedUnassignedTaskDirectories();
        this.rebalanceInProgress = false;
    }

    void handleCorruption(Map<TaskId, Collection<TopicPartition>> tasksWithChangelogs) throws TaskMigratedException {
        HashMap<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<Task, Collection<TopicPartition>>();
        HashMap<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<Task, Collection<TopicPartition>>();
        for (Map.Entry<TaskId, Collection<TopicPartition>> taskEntry : tasksWithChangelogs.entrySet()) {
            TaskId taskId = taskEntry.getKey();
            Task task = this.tasks.get(taskId);
            if (task.isActive()) {
                corruptedActiveTasks.put(task, taskEntry.getValue());
                continue;
            }
            corruptedStandbyTasks.put(task, taskEntry.getValue());
        }
        this.closeAndRevive(corruptedStandbyTasks);
        this.commit(this.tasks().values().stream().filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING).filter(t -> !tasksWithChangelogs.containsKey(t.id())).collect(Collectors.toSet()));
        this.closeAndRevive(corruptedActiveTasks);
    }

    private void closeAndRevive(Map<Task, Collection<TopicPartition>> taskWithChangelogs) {
        for (Map.Entry<Task, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
            Task task = entry.getKey();
            Collection<TopicPartition> corruptedPartitions = entry.getValue();
            task.markChangelogAsCorrupted(corruptedPartitions);
            try {
                task.prepareCommit();
            }
            catch (RuntimeException swallow) {
                this.log.error("Error flushing cache for corrupted task {} ", (Object)task.id(), (Object)swallow);
            }
            try {
                task.suspend();
                task.postCommit(true);
            }
            catch (RuntimeException swallow) {
                this.log.error("Error suspending corrupted task {} ", (Object)task.id(), (Object)swallow);
            }
            task.closeDirty();
            if (task.isActive()) {
                Set currentAssignment = this.mainConsumer().assignment();
                Set<TopicPartition> taskInputPartitions = task.inputPartitions();
                Set assignedToPauseAndReset = Utils.intersection(HashSet::new, (Set)currentAssignment, (Set[])new Set[]{taskInputPartitions});
                if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
                    this.log.warn("Expected the current consumer assignment {} to contain the input partitions {}. Will proceed to recover.", (Object)currentAssignment, taskInputPartitions);
                }
                this.mainConsumer().pause((Collection)assignedToPauseAndReset);
                Map committed = this.mainConsumer().committed(assignedToPauseAndReset);
                for (Map.Entry committedEntry : committed.entrySet()) {
                    OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)committedEntry.getValue();
                    if (offsetAndMetadata == null) continue;
                    this.mainConsumer().seek((TopicPartition)committedEntry.getKey(), offsetAndMetadata);
                    assignedToPauseAndReset.remove(committedEntry.getKey());
                }
                this.resetter.accept(assignedToPauseAndReset);
            }
            task.revive();
        }
    }

    public void handleAssignment(Map<TaskId, Set<TopicPartition>> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
        this.log.info("Handle new assignment with:\n\tNew active tasks: {}\n\tNew standby tasks: {}\n\tExisting active tasks: {}\n\tExisting standby tasks: {}", new Object[]{activeTasks.keySet(), standbyTasks.keySet(), this.activeTaskIds(), this.standbyTaskIds()});
        this.builder.addSubscribedTopicsFromAssignment(activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), this.logPrefix);
        LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<TaskId, RuntimeException>();
        HashMap<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<TaskId, Set<TopicPartition>>(activeTasks);
        HashMap<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<TaskId, Set<TopicPartition>>(standbyTasks);
        Comparator<Task> byId = Comparator.comparing(Task::id);
        TreeSet<Task> tasksToRecycle = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseClean = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseDirty = new TreeSet<Task>(byId);
        for (Task task : this.tasks.values()) {
            if (activeTasks.containsKey(task.id()) && task.isActive()) {
                this.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                activeTasksToCreate.remove(task.id());
                continue;
            }
            if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                this.updateInputPartitionsAndResume(task, standbyTasks.get(task.id()));
                standbyTasksToCreate.remove(task.id());
                continue;
            }
            if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) {
                tasksToRecycle.add(task);
                continue;
            }
            tasksToCloseClean.add(task);
        }
        this.handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, taskCloseExceptions);
        if (!taskCloseExceptions.isEmpty()) {
            this.log.error("Hit exceptions while closing / recycling tasks: {}", taskCloseExceptions);
            for (Map.Entry entry : taskCloseExceptions.entrySet()) {
                if (entry.getValue() instanceof TaskMigratedException) continue;
                if (entry.getValue() instanceof KafkaException) {
                    throw (RuntimeException)entry.getValue();
                }
                throw new RuntimeException("Unexpected failure to close " + taskCloseExceptions.size() + " task(s) [" + taskCloseExceptions.keySet() + "]. First unexpected exception (for task " + entry.getKey() + ") follows.", (Throwable)entry.getValue());
            }
            Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
            throw first.getValue();
        }
        if (!activeTasksToCreate.isEmpty()) {
            for (Task task : this.activeTaskCreator.createTasks(this.mainConsumer, activeTasksToCreate)) {
                this.addNewTask(task);
            }
        }
        if (!standbyTasksToCreate.isEmpty()) {
            for (Task task : this.standbyTaskCreator.createTasks(standbyTasksToCreate)) {
                this.addNewTask(task);
            }
        }
    }

    private void handleCloseAndRecycle(Set<Task> tasksToRecycle, Set<Task> tasksToCloseClean, Set<Task> tasksToCloseDirty, Map<TaskId, Set<TopicPartition>> activeTasksToCreate, Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
        String uncleanMessage;
        if (!tasksToCloseDirty.isEmpty()) {
            throw new IllegalArgumentException("Tasks to close-dirty should be empty");
        }
        ArrayList<Task> tasksToCheckpoint = new ArrayList<Task>(tasksToCloseClean);
        tasksToCheckpoint.addAll(tasksToRecycle);
        for (Task task : tasksToCheckpoint) {
            try {
                Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
                if (!offsets.isEmpty()) {
                    this.log.error("Task {} should has been committed when it was suspended, but it reports non-empty offsets {} to commit; it means it fails during last commit and hence should be closed dirty", (Object)task.id(), offsets);
                    tasksToCloseDirty.add(task);
                    continue;
                }
                if (task.isActive()) continue;
                task.suspend();
                task.postCommit(true);
            }
            catch (RuntimeException e) {
                uncleanMessage = String.format("Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:", task.id());
                this.log.error(uncleanMessage, (Throwable)e);
                taskCloseExceptions.putIfAbsent(task.id(), e);
                tasksToCloseDirty.add(task);
            }
        }
        tasksToCloseClean.removeAll(tasksToCloseDirty);
        for (Task task : tasksToCloseClean) {
            try {
                this.completeTaskCloseClean(task);
                this.cleanUpTaskProducer(task, taskCloseExceptions);
                this.tasks.remove(task.id());
            }
            catch (RuntimeException e) {
                uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
                this.log.error(uncleanMessage, (Throwable)e);
                taskCloseExceptions.putIfAbsent(task.id(), e);
                tasksToCloseDirty.add(task);
            }
        }
        tasksToRecycle.removeAll(tasksToCloseDirty);
        for (Task task : tasksToRecycle) {
            try {
                AbstractTask newTask;
                Set<TopicPartition> partitions;
                if (task.isActive()) {
                    partitions = standbyTasksToCreate.remove(task.id());
                    newTask = this.standbyTaskCreator.createStandbyTaskFromActive((StreamTask)task, partitions);
                    this.cleanUpTaskProducer(task, taskCloseExceptions);
                } else {
                    partitions = activeTasksToCreate.remove(task.id());
                    newTask = this.activeTaskCreator.createActiveTaskFromStandby((StandbyTask)task, partitions, this.mainConsumer);
                }
                this.tasks.remove(task.id());
                this.addNewTask(newTask);
            }
            catch (RuntimeException e) {
                String uncleanMessage2 = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
                this.log.error(uncleanMessage2, (Throwable)e);
                taskCloseExceptions.putIfAbsent(task.id(), e);
                tasksToCloseDirty.add(task);
            }
        }
        for (Task task : tasksToCloseDirty) {
            this.closeTaskDirty(task);
            this.cleanUpTaskProducer(task, taskCloseExceptions);
            this.tasks.remove(task.id());
        }
    }

    private void cleanUpTaskProducer(Task task, Map<TaskId, RuntimeException> taskCloseExceptions) {
        if (task.isActive()) {
            try {
                this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
            }
            catch (RuntimeException e) {
                String uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id());
                this.log.error(uncleanMessage, (Throwable)e);
                taskCloseExceptions.putIfAbsent(task.id(), e);
            }
        }
    }

    private void updateInputPartitionsAndResume(Task task, Set<TopicPartition> topicPartitions) {
        boolean requiresUpdate;
        boolean bl = requiresUpdate = !task.inputPartitions().equals(topicPartitions);
        if (requiresUpdate) {
            this.log.debug("Update task {} inputPartitions: current {}, new {}", new Object[]{task, task.inputPartitions(), topicPartitions});
            for (TopicPartition inputPartition : task.inputPartitions()) {
                this.partitionToTask.remove(inputPartition);
            }
            for (TopicPartition topicPartition : topicPartitions) {
                this.partitionToTask.put(topicPartition, task);
            }
            task.update(topicPartitions, this.builder.nodeToSourceTopics());
        }
        task.resume();
    }

    private void addNewTask(Task task) {
        Task previous = this.tasks.put(task.id(), task);
        if (previous != null) {
            throw new IllegalStateException("Attempted to create a task that we already owned: " + task.id());
        }
        for (TopicPartition topicPartition : task.inputPartitions()) {
            this.partitionToTask.put(topicPartition, task);
        }
    }

    boolean tryToCompleteRestoration() {
        boolean allRunning = true;
        LinkedList<Task> activeTasks = new LinkedList<Task>();
        for (Task task : this.tasks.values()) {
            try {
                task.initializeIfNeeded();
            }
            catch (TimeoutException | LockException e) {
                this.log.debug("Could not initialize {} due to the following exception; will retry", (Object)task.id(), e);
                allRunning = false;
            }
            if (!task.isActive()) continue;
            activeTasks.add(task);
        }
        if (allRunning && !activeTasks.isEmpty()) {
            Set<TopicPartition> restored = this.changelogReader.completedChangelogs();
            for (Task task : activeTasks) {
                if (restored.containsAll(task.changelogPartitions())) {
                    try {
                        task.completeRestoration();
                    }
                    catch (TimeoutException e) {
                        this.log.debug("Could not complete restoration for {} due to {}; will retry", (Object)task.id(), (Object)e);
                        allRunning = false;
                    }
                    continue;
                }
                allRunning = false;
            }
        }
        if (allRunning) {
            this.mainConsumer.resume((Collection)this.mainConsumer.assignment());
        }
        return allRunning;
    }

    void handleRevocation(Collection<TopicPartition> revokedPartitions) {
        boolean shouldCommitAdditionalTasks;
        HashSet<TopicPartition> remainingRevokedPartitions = new HashSet<TopicPartition>(revokedPartitions);
        HashSet<Task> revokedActiveTasks = new HashSet<Task>();
        HashSet<Task> commitNeededActiveTasks = new HashSet<Task>();
        HashMap<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<TaskId, Map<TopicPartition, OffsetAndMetadata>>();
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        for (Task task : this.activeTaskIterable()) {
            if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
                revokedActiveTasks.add(task);
                remainingRevokedPartitions.removeAll(task.inputPartitions());
                continue;
            }
            if (!task.commitNeeded()) continue;
            commitNeededActiveTasks.add(task);
        }
        if (!remainingRevokedPartitions.isEmpty()) {
            this.log.warn("The following partitions {} are missing from the task partitions. It could potentially due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
        }
        this.prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
        boolean bl = shouldCommitAdditionalTasks = !consumedOffsetsPerTask.isEmpty();
        if (shouldCommitAdditionalTasks) {
            this.prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
        }
        try {
            this.commitOffsetsOrTransaction(consumedOffsetsPerTask);
        }
        catch (RuntimeException e) {
            this.log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, (Throwable)e);
            firstException.compareAndSet(null, e);
        }
        if (firstException.get() == null) {
            for (Task task : revokedActiveTasks) {
                try {
                    task.postCommit(true);
                }
                catch (RuntimeException e) {
                    this.log.error("Exception caught while post-committing task " + task.id(), (Throwable)e);
                    firstException.compareAndSet(null, e);
                }
            }
            if (shouldCommitAdditionalTasks) {
                for (Task task : commitNeededActiveTasks) {
                    try {
                        task.postCommit(false);
                    }
                    catch (RuntimeException e) {
                        this.log.error("Exception caught while post-committing task " + task.id(), (Throwable)e);
                        firstException.compareAndSet(null, e);
                    }
                }
            }
        }
        for (Task task : revokedActiveTasks) {
            try {
                task.suspend();
            }
            catch (RuntimeException e) {
                this.log.error("Caught the following exception while trying to suspend revoked task " + task.id(), (Throwable)e);
                firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e));
            }
        }
        if (firstException.get() != null) {
            throw (RuntimeException)firstException.get();
        }
    }

    private void prepareCommitAndAddOffsetsToMap(Set<Task> tasksToPrepare, Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
        for (Task task : tasksToPrepare) {
            Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
            if (committableOffsets.isEmpty()) continue;
            consumedOffsetsPerTask.put(task.id(), committableOffsets);
        }
    }

    void handleLostAll() {
        this.log.debug("Closing lost active tasks as zombies.");
        Iterator<Task> iterator = this.tasks.values().iterator();
        while (iterator.hasNext()) {
            Task task = iterator.next();
            if (!task.isActive()) continue;
            this.closeTaskDirty(task);
            iterator.remove();
            try {
                this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
            }
            catch (RuntimeException e) {
                this.log.warn("Error closing task producer for " + task.id() + " while handling lostAll", (Throwable)e);
            }
        }
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) {
            this.activeTaskCreator.reInitializeThreadProducer();
        }
    }

    public Map<TaskId, Long> getTaskOffsetSums() {
        HashMap<TaskId, Long> taskOffsetSums = new HashMap<TaskId, Long>();
        for (TaskId id : Utils.union(HashSet::new, (Set[])new Set[]{this.lockedTaskDirectories, this.tasks.keySet()})) {
            Task task = this.tasks.get(id);
            if (task != null && task.state() != Task.State.CREATED && task.state() != Task.State.CLOSED) {
                Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
                if (changelogOffsets.isEmpty()) {
                    this.log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", (Object)id);
                    continue;
                }
                taskOffsetSums.put(id, this.sumOfChangelogOffsets(id, changelogOffsets));
                continue;
            }
            File checkpointFile = this.stateDirectory.checkpointFileFor(id);
            try {
                if (!checkpointFile.exists()) continue;
                taskOffsetSums.put(id, this.sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
            }
            catch (IOException e) {
                this.log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), (Throwable)e);
            }
        }
        return taskOffsetSums;
    }

    private void tryToLockAllNonEmptyTaskDirectories() {
        this.lockedTaskDirectories.clear();
        for (File dir : this.stateDirectory.listNonEmptyTaskDirectories()) {
            try {
                TaskId id = TaskId.parse(dir.getName());
                try {
                    if (!this.stateDirectory.lock(id)) continue;
                    this.lockedTaskDirectories.add(id);
                    if (this.tasks.containsKey(id)) continue;
                    this.log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", (Object)id);
                }
                catch (IOException e) {
                    this.log.warn(String.format("Exception caught while attempting to lock task %s:", id), (Throwable)e);
                }
            }
            catch (TaskIdFormatException taskIdFormatException) {
                // empty catch block
            }
        }
    }

    private void releaseLockedUnassignedTaskDirectories() {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        Iterator<TaskId> taskIdIterator = this.lockedTaskDirectories.iterator();
        while (taskIdIterator.hasNext()) {
            TaskId id = taskIdIterator.next();
            if (this.tasks.containsKey(id)) continue;
            try {
                this.stateDirectory.unlock(id);
                taskIdIterator.remove();
            }
            catch (IOException e) {
                this.log.error(String.format("Caught the following exception while trying to unlock task %s", id), (Throwable)e);
                firstException.compareAndSet(null, new StreamsException(String.format("Failed to unlock task directory %s", id), e));
            }
        }
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw fatalException;
        }
    }

    private long sumOfChangelogOffsets(TaskId id, Map<TopicPartition, Long> changelogOffsets) {
        long offsetSum = 0L;
        for (Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
            long offset = changelogEntry.getValue();
            if (offset == -2L) {
                return -2L;
            }
            if (offset == -4L) continue;
            if (offset < 0L) {
                throw new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry);
            }
            if ((offsetSum += offset) >= 0L) continue;
            this.log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", (Object)id);
            return Long.MAX_VALUE;
        }
        return offsetSum;
    }

    private void closeTaskDirty(Task task) {
        try {
            task.prepareCommit();
        }
        catch (RuntimeException swallow) {
            this.log.error("Error flushing caches of dirty task {} ", (Object)task.id(), (Object)swallow);
        }
        try {
            task.suspend();
        }
        catch (RuntimeException swallow) {
            this.log.error("Error suspending dirty task {} ", (Object)task.id(), (Object)swallow);
        }
        this.cleanupTask(task);
        task.closeDirty();
    }

    private void completeTaskCloseClean(Task task) {
        this.cleanupTask(task);
        task.closeClean();
    }

    private void cleanupTask(Task task) {
        for (TopicPartition inputPartition : task.inputPartitions()) {
            this.partitionToTask.remove(inputPartition);
        }
    }

    void shutdown(boolean clean) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        HashSet<Task> tasksToCloseDirty = new HashSet<Task>();
        tasksToCloseDirty.addAll(this.tryCloseCleanAllActiveTasks(clean, firstException));
        tasksToCloseDirty.addAll(this.tryCloseCleanAllStandbyTasks(clean, firstException));
        for (Task task : tasksToCloseDirty) {
            this.closeTaskDirty(task);
        }
        for (Task task : this.activeTaskIterable()) {
            TaskManager.executeAndMaybeSwallow(clean, () -> this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()), (RuntimeException e) -> firstException.compareAndSet(null, e), (RuntimeException e) -> this.log.warn("Ignoring an exception while closing task " + task.id() + " producer.", (Throwable)e));
        }
        TaskManager.executeAndMaybeSwallow(clean, this.activeTaskCreator::closeThreadProducerIfNeeded, (RuntimeException e) -> firstException.compareAndSet(null, e), (RuntimeException e) -> this.log.warn("Ignoring an exception while closing thread producer.", (Throwable)e));
        this.tasks.clear();
        TaskManager.executeAndMaybeSwallow(clean, this::releaseLockedUnassignedTaskDirectories, (RuntimeException e) -> firstException.compareAndSet(null, e), (RuntimeException e) -> this.log.warn("Ignoring an exception while unlocking remaining task directories.", (Throwable)e));
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw new RuntimeException("Unexpected exception while closing task", fatalException);
        }
    }

    private Collection<Task> tryCloseCleanAllActiveTasks(boolean clean, AtomicReference<RuntimeException> firstException) {
        if (!clean) {
            return this.activeTaskIterable();
        }
        Comparator<Task> byId = Comparator.comparing(Task::id);
        TreeSet<Task> tasksToCommit = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseDirty = new TreeSet<Task>(byId);
        TreeSet<Task> tasksToCloseClean = new TreeSet<Task>(byId);
        HashMap<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<TaskId, Map<TopicPartition, OffsetAndMetadata>>();
        for (Task task : this.activeTaskIterable()) {
            try {
                Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
                tasksToCommit.add(task);
                if (!committableOffsets.isEmpty()) {
                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                }
                tasksToCloseClean.add(task);
            }
            catch (TaskMigratedException e) {
                tasksToCloseDirty.add(task);
            }
            catch (RuntimeException e) {
                firstException.compareAndSet(null, e);
                tasksToCloseDirty.add(task);
            }
        }
        if (!tasksToCloseDirty.isEmpty()) {
            tasksToCloseClean.removeAll(tasksToCommit);
            tasksToCloseDirty.addAll(tasksToCommit);
        } else {
            try {
                this.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
                for (Task task : this.activeTaskIterable()) {
                    try {
                        task.postCommit(true);
                    }
                    catch (RuntimeException e) {
                        this.log.error("Exception caught while post-committing task " + task.id(), (Throwable)e);
                        firstException.compareAndSet(null, e);
                        tasksToCloseDirty.add(task);
                        tasksToCloseClean.remove(task);
                    }
                }
            }
            catch (RuntimeException e) {
                this.log.error("Exception caught while committing tasks during shutdown", (Throwable)e);
                firstException.compareAndSet(null, e);
                tasksToCloseClean.removeAll(tasksToCommit);
                tasksToCloseDirty.addAll(tasksToCommit);
            }
        }
        for (Task task : tasksToCloseClean) {
            try {
                task.suspend();
                this.completeTaskCloseClean(task);
            }
            catch (RuntimeException e) {
                this.log.error("Exception caught while clean-closing task " + task.id(), (Throwable)e);
                firstException.compareAndSet(null, e);
                tasksToCloseDirty.add(task);
            }
        }
        return tasksToCloseDirty;
    }

    private Collection<Task> tryCloseCleanAllStandbyTasks(boolean clean, AtomicReference<RuntimeException> firstException) {
        if (!clean) {
            return this.standbyTaskIterable();
        }
        HashSet<Task> tasksToCloseDirty = new HashSet<Task>();
        for (Task task : this.standbyTaskIterable()) {
            try {
                task.prepareCommit();
                task.postCommit(true);
                task.suspend();
                this.completeTaskCloseClean(task);
            }
            catch (TaskMigratedException e) {
                tasksToCloseDirty.add(task);
            }
            catch (RuntimeException e) {
                firstException.compareAndSet(null, e);
                tasksToCloseDirty.add(task);
            }
        }
        return tasksToCloseDirty;
    }

    Set<TaskId> activeTaskIds() {
        return this.activeTaskStream().map(Task::id).collect(Collectors.toSet());
    }

    Set<TaskId> standbyTaskIds() {
        return this.standbyTaskStream().map(Task::id).collect(Collectors.toSet());
    }

    Map<TaskId, Task> tasks() {
        return this.tasks;
    }

    Map<TaskId, Task> activeTaskMap() {
        return this.activeTaskStream().collect(Collectors.toMap(Task::id, t -> t));
    }

    List<Task> activeTaskIterable() {
        return this.activeTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> activeTaskStream() {
        return this.tasks.values().stream().filter(Task::isActive);
    }

    Map<TaskId, Task> standbyTaskMap() {
        return this.standbyTaskStream().collect(Collectors.toMap(Task::id, t -> t));
    }

    private List<Task> standbyTaskIterable() {
        return this.standbyTaskStream().collect(Collectors.toList());
    }

    private Stream<Task> standbyTaskStream() {
        return this.tasks.values().stream().filter(t -> !t.isActive());
    }

    int commitAll() {
        return this.commit(this.tasks.values());
    }

    void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
        for (TopicPartition partition : records.partitions()) {
            Task task = this.partitionToTask.get(partition);
            if (task == null) {
                this.log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", (Object)partition, (Object)this.toString(">"));
                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
            }
            task.addRecords(partition, records.records(partition));
        }
    }

    int commit(Collection<Task> tasksToCommit) {
        if (this.rebalanceInProgress) {
            return -1;
        }
        int committed = 0;
        HashMap<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<TaskId, Map<TopicPartition, OffsetAndMetadata>>();
        for (Task task : tasksToCommit) {
            if (!task.commitNeeded()) continue;
            Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
            if (!task.isActive()) continue;
            consumedOffsetsAndMetadataPerTask.put(task.id(), offsetAndMetadata);
        }
        this.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
        for (Task task : tasksToCommit) {
            if (!task.commitNeeded()) continue;
            ++committed;
            task.postCommit(false);
        }
        return committed;
    }

    int maybeCommitActiveTasksPerUserRequested() {
        if (this.rebalanceInProgress) {
            return -1;
        }
        for (Task task : this.activeTaskIterable()) {
            if (!task.commitRequested() || !task.commitNeeded()) continue;
            return this.commit(this.activeTaskIterable());
        }
        return 0;
    }

    private void commitOffsetsOrTransaction(Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> offsetsPerTask) {
        block9: {
            this.log.info("Committing task offsets {}", offsetsPerTask);
            if (offsetsPerTask.isEmpty()) break block9;
            if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
                for (Map.Entry<TaskId, Map<TopicPartition, OffsetAndMetadata>> taskToCommit : offsetsPerTask.entrySet()) {
                    this.activeTaskCreator.streamsProducerForTask(taskToCommit.getKey()).commitTransaction(taskToCommit.getValue(), this.mainConsumer.groupMetadata());
                }
            } else {
                Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream().flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_BETA) {
                    this.activeTaskCreator.threadProducer().commitTransaction(allOffsets, this.mainConsumer.groupMetadata());
                } else {
                    try {
                        this.mainConsumer.commitSync(allOffsets);
                    }
                    catch (CommitFailedException error) {
                        throw new TaskMigratedException("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group", error);
                    }
                    catch (TimeoutException error) {
                        throw new StreamsException("Timed out while committing offsets via consumer", error);
                    }
                    catch (KafkaException error) {
                        throw new StreamsException("Error encountered committing offsets via consumer", error);
                    }
                }
            }
        }
    }

    int process(int maxNumRecords, Time time) {
        int totalProcessed = 0;
        long now = time.milliseconds();
        for (Task task : this.activeTaskIterable()) {
            try {
                int processed;
                long then = now;
                for (processed = 0; processed < maxNumRecords && task.process(now); ++processed) {
                }
                now = time.milliseconds();
                totalProcessed += processed;
                task.recordProcessBatchTime(now - then);
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return totalProcessed;
    }

    void recordTaskProcessRatio(long totalProcessLatencyMs, long now) {
        for (Task task : this.activeTaskIterable()) {
            task.recordProcessTimeRatioAndBufferSize(totalProcessLatencyMs, now);
        }
    }

    int punctuate() {
        int punctuated = 0;
        for (Task task : this.activeTaskIterable()) {
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)task.id());
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return punctuated;
    }

    void maybePurgeCommittedRecords() {
        if (this.deleteRecordsResult == null || this.deleteRecordsResult.all().isDone()) {
            if (this.deleteRecordsResult != null && this.deleteRecordsResult.all().isCompletedExceptionally()) {
                this.log.debug("Previous delete-records request has failed: {}. Try sending the new request now", (Object)this.deleteRecordsResult.lowWatermarks());
            }
            HashMap<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<TopicPartition, RecordsToDelete>();
            for (Task task : this.activeTaskIterable()) {
                for (Map.Entry<TopicPartition, Long> entry : task.purgeableOffsets().entrySet()) {
                    recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset((long)entry.getValue()));
                }
            }
            if (!recordsToDelete.isEmpty()) {
                this.deleteRecordsResult = this.adminClient.deleteRecords(recordsToDelete);
                this.log.trace("Sent delete-records request: {}", recordsToDelete);
            }
        }
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("TaskManager\n");
        stringBuilder.append(indent).append("\tMetadataState:\n");
        stringBuilder.append(indent).append("\tTasks:\n");
        for (Task task : this.tasks.values()) {
            stringBuilder.append(indent).append("\t\t").append(task.id()).append(" ").append((Object)task.state()).append(" ").append(task.getClass().getSimpleName()).append('(').append(task.isActive() ? "active" : "standby").append(')');
        }
        return stringBuilder.toString();
    }

    Map<MetricName, Metric> producerMetrics() {
        return this.activeTaskCreator.producerMetrics();
    }

    Set<String> producerClientIds() {
        return this.activeTaskCreator.producerClientIds();
    }

    Set<TaskId> lockedTaskDirectories() {
        return Collections.unmodifiableSet(this.lockedTaskDirectories);
    }

    public static void executeAndMaybeSwallow(boolean clean, Runnable runnable, java.util.function.Consumer<RuntimeException> actionIfClean, java.util.function.Consumer<RuntimeException> actionIfNotClean) {
        try {
            runnable.run();
        }
        catch (RuntimeException e) {
            if (clean) {
                actionIfClean.accept(e);
            }
            actionIfNotClean.accept(e);
        }
    }

    public static void executeAndMaybeSwallow(boolean clean, Runnable runnable, String name, Logger log) {
        TaskManager.executeAndMaybeSwallow(clean, runnable, (RuntimeException e) -> {
            throw e;
        }, (RuntimeException e) -> log.debug("Ignoring error in unclean {}", (Object)name));
    }

    boolean needsInitializationOrRestoration() {
        return this.tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
    }

    public void setPartitionResetter(java.util.function.Consumer<Set<TopicPartition>> resetter) {
        this.resetter = resetter;
    }
}

