package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.class */
class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
    private final Map<TaskId, StreamTask> suspended;
    private final Map<TaskId, StreamTask> restoring;
    private final Set<TopicPartition> restoredPartitions;
    private final Map<TopicPartition, StreamTask> restoringByPartition;
    private final Set<TaskId> prevActiveTasks;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AssignedStreamsTasks(LogContext logContext) {
        super(logContext, "stream task");
        this.suspended = new HashMap();
        this.restoring = new HashMap();
        this.restoredPartitions = new HashSet();
        this.restoringByPartition = new HashMap();
        this.prevActiveTasks = new HashSet();
    }

    @Override // org.apache.kafka.streams.processor.internals.RestoringTasks
    public StreamTask restoringTaskFor(TopicPartition topicPartition) {
        return this.restoringByPartition.get(topicPartition);
    }

    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    List<StreamTask> allTasks() {
        List<StreamTask> allTasks = super.allTasks();
        allTasks.addAll(this.restoring.values());
        allTasks.addAll(this.suspended.values());
        return allTasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    public Set<TaskId> allAssignedTaskIds() {
        Set<TaskId> allAssignedTaskIds = super.allAssignedTaskIds();
        allAssignedTaskIds.addAll(this.restoring.keySet());
        allAssignedTaskIds.addAll(this.suspended.keySet());
        return allAssignedTaskIds;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    public boolean allTasksRunning() {
        return super.allTasksRunning() && this.restoring.isEmpty() && (this.suspended.isEmpty() || !this.running.isEmpty());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    public void closeTask(StreamTask streamTask, boolean z) {
        if (this.suspended.containsKey(streamTask.id())) {
            streamTask.closeSuspended(z, null);
        } else {
            streamTask.close(z, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasRestoringTasks() {
        return !this.restoring.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearRestoringPartitions() {
        if (!this.restoring.isEmpty()) {
            this.log.error("Tried to clear restoring partitions but was still restoring the stream tasks {}", this.restoring);
            throw new IllegalStateException("Should not clear restoring partitions while set of restoring tasks is non-empty");
        }
        this.restoredPartitions.clear();
        this.restoringByPartition.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> suspendedTaskIds() {
        return this.suspended.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> previousRunningTaskIds() {
        return this.prevActiveTasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeException suspendOrCloseTasks(Set<TaskId> set, List<TopicPartition> list) {
        AtomicReference atomicReference = new AtomicReference(null);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        this.prevActiveTasks.clear();
        this.prevActiveTasks.addAll(runningTaskIds());
        for (TaskId taskId : set) {
            if (this.running.containsKey(taskId)) {
                hashSet.add(taskId);
            } else if (this.created.containsKey(taskId)) {
                hashSet2.add(taskId);
            } else if (this.restoring.containsKey(taskId)) {
                hashSet3.add(taskId);
            } else if (!this.suspended.containsKey(taskId)) {
                this.log.warn("Stream task {} was revoked but cannot be found in the assignment, may have been closed due to error", taskId);
            }
        }
        atomicReference.compareAndSet(null, suspendRunningTasks(hashSet, list));
        atomicReference.compareAndSet(null, closeNonRunningTasks(hashSet2, list));
        atomicReference.compareAndSet(null, closeRestoringTasks(hashSet3, list));
        return (RuntimeException) atomicReference.get();
    }

    private RuntimeException suspendRunningTasks(Set<TaskId> set, List<TopicPartition> list) {
        AtomicReference atomicReference = new AtomicReference(null);
        this.log.debug("Suspending the running stream tasks {}", this.running.keySet());
        for (TaskId taskId : set) {
            StreamTask streamTask = (StreamTask) this.running.get(taskId);
            try {
                try {
                    streamTask.suspend();
                    this.suspended.put(taskId, streamTask);
                    removeTaskFromAllStateMaps(streamTask, this.suspended);
                    list.addAll(streamTask.changelogPartitions());
                } catch (RuntimeException e) {
                    this.log.error("Suspending stream task {} failed due to the following error:", taskId, e);
                    atomicReference.compareAndSet(null, e);
                    try {
                        this.prevActiveTasks.remove(taskId);
                        streamTask.close(false, false);
                    } catch (RuntimeException e2) {
                        this.log.error("After suspending failed, closing the same stream task {} failed again due to the following error:", taskId, e2);
                    }
                    removeTaskFromAllStateMaps(streamTask, this.suspended);
                    list.addAll(streamTask.changelogPartitions());
                } catch (TaskMigratedException e3) {
                    this.log.info("Failed to suspend stream task {} since it got migrated to another thread already. Closing it as zombie and moving on.", taskId);
                    tryCloseZombieTask(streamTask);
                    this.prevActiveTasks.remove(taskId);
                    removeTaskFromAllStateMaps(streamTask, this.suspended);
                    list.addAll(streamTask.changelogPartitions());
                }
            } catch (Throwable th) {
                removeTaskFromAllStateMaps(streamTask, this.suspended);
                list.addAll(streamTask.changelogPartitions());
                throw th;
            }
        }
        this.log.trace("Successfully suspended the running stream task {}", this.suspended.keySet());
        return (RuntimeException) atomicReference.get();
    }

    private RuntimeException closeNonRunningTasks(Set<TaskId> set, List<TopicPartition> list) {
        this.log.debug("Closing the created but not initialized stream tasks {}", set);
        AtomicReference atomicReference = new AtomicReference();
        Iterator<TaskId> it = set.iterator();
        while (it.hasNext()) {
            atomicReference.compareAndSet(null, closeNonRunning(false, (StreamTask) this.created.get(it.next()), list));
        }
        return (RuntimeException) atomicReference.get();
    }

    RuntimeException closeRestoringTasks(Set<TaskId> set, List<TopicPartition> list) {
        this.log.debug("Closing restoring stream tasks {}", set);
        AtomicReference atomicReference = new AtomicReference();
        Iterator<TaskId> it = set.iterator();
        while (it.hasNext()) {
            atomicReference.compareAndSet(null, closeRestoring(false, this.restoring.get(it.next()), list));
        }
        return (RuntimeException) atomicReference.get();
    }

    private RuntimeException closeRunning(boolean z, StreamTask streamTask) {
        removeTaskFromAllStateMaps(streamTask, Collections.emptyMap());
        try {
            streamTask.close(!z, z);
            return null;
        } catch (RuntimeException e) {
            this.log.error("Failed to close the stream task {}", streamTask.id(), e);
            return e;
        }
    }

    private RuntimeException closeNonRunning(boolean z, StreamTask streamTask, List<TopicPartition> list) {
        removeTaskFromAllStateMaps(streamTask, Collections.emptyMap());
        list.addAll(streamTask.changelogPartitions());
        try {
            streamTask.close(false, z);
            return null;
        } catch (RuntimeException e) {
            this.log.error("Failed to close the stream task {}", streamTask.id(), e);
            return e;
        }
    }

    private RuntimeException closeRestoring(boolean z, StreamTask streamTask, List<TopicPartition> list) {
        removeTaskFromAllStateMaps(streamTask, Collections.emptyMap());
        list.addAll(streamTask.changelogPartitions());
        try {
            streamTask.closeStateManager(!z);
            return null;
        } catch (RuntimeException e) {
            this.log.error("Failed to close the restoring stream task {} due to the following error:", streamTask.id(), e);
            return e;
        }
    }

    private RuntimeException closeSuspended(boolean z, StreamTask streamTask) {
        removeTaskFromAllStateMaps(streamTask, Collections.emptyMap());
        try {
            streamTask.closeSuspended(!z, null);
            return null;
        } catch (RuntimeException e) {
            this.log.error("Failed to close the suspended stream task {} due to the following error:", streamTask.id(), e);
            return e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeException closeNotAssignedSuspendedTasks(Set<TaskId> set) {
        this.log.debug("Closing the revoked active stream tasks {}", set);
        AtomicReference atomicReference = new AtomicReference(null);
        for (TaskId taskId : set) {
            StreamTask streamTask = this.suspended.get(taskId);
            if (streamTask != null) {
                atomicReference.compareAndSet(null, closeSuspended(false, streamTask));
            } else {
                this.log.debug("Revoked stream task {} could not be found in suspended, may have already been closed", taskId);
            }
        }
        return (RuntimeException) atomicReference.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeException closeAllTasksAsZombies() {
        this.log.debug("Closing all active tasks as zombies, current state of active tasks: {}", toString());
        AtomicReference atomicReference = new AtomicReference(null);
        ArrayList arrayList = new ArrayList();
        for (TaskId taskId : allAssignedTaskIds()) {
            if (this.running.containsKey(taskId)) {
                this.log.debug("Closing the zombie running stream task {}.", taskId);
                atomicReference.compareAndSet(null, closeRunning(true, (StreamTask) this.running.get(taskId)));
            } else if (this.created.containsKey(taskId)) {
                this.log.debug("Closing the zombie created stream task {}.", taskId);
                atomicReference.compareAndSet(null, closeNonRunning(true, (StreamTask) this.created.get(taskId), arrayList));
            } else if (this.restoring.containsKey(taskId)) {
                this.log.debug("Closing the zombie restoring stream task {}.", taskId);
                atomicReference.compareAndSet(null, closeRestoring(true, this.restoring.get(taskId), arrayList));
            } else if (this.suspended.containsKey(taskId)) {
                this.log.debug("Closing the zombie suspended stream task {}.", taskId);
                atomicReference.compareAndSet(null, closeSuspended(true, this.suspended.get(taskId)));
            }
        }
        clear();
        return (RuntimeException) atomicReference.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> set) {
        if (!this.suspended.containsKey(taskId)) {
            return false;
        }
        StreamTask streamTask = this.suspended.get(taskId);
        this.log.trace("Found suspended stream task {}", taskId);
        removeTaskFromAllStateMaps(streamTask, Collections.emptyMap());
        if (!streamTask.partitions().equals(set)) {
            this.log.warn("Couldn't resume stream task {} assigned partitions {}, task partitions {}", new Object[]{taskId, set, streamTask.partitions()});
            streamTask.closeSuspended(true, null);
            return false;
        }
        streamTask.resume();
        try {
            transitionToRunning(streamTask);
            this.log.trace("Resuming the suspended stream task {}", streamTask.id());
            return true;
        } catch (TaskMigratedException e) {
            this.log.info("Failed to resume stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", streamTask.id());
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateRestored(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.log.trace("Stream task changelog partitions that have completed restoring so far: {}", collection);
        this.restoredPartitions.addAll(collection);
        Iterator<Map.Entry<TaskId, StreamTask>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask value = it.next().getValue();
            if (this.restoredPartitions.containsAll(value.changelogPartitions())) {
                transitionToRunning(value);
                it.remove();
                removeFromRestoredPartitions(value);
                removeFromRestoringByPartition(value);
                this.log.debug("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state", value.id(), value.changelogPartitions());
            } else if (this.log.isTraceEnabled()) {
                HashSet hashSet = new HashSet(value.changelogPartitions());
                hashSet.removeAll(this.restoredPartitions);
                this.log.trace("Stream task {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}", value.id(), hashSet);
            }
        }
        if (allTasksRunning()) {
            this.restoredPartitions.clear();
            if (this.restoringByPartition.isEmpty()) {
                return;
            }
            this.log.error("Finished restoring all tasks but found leftover partitions in restoringByPartition: {}", this.restoringByPartition);
            throw new IllegalStateException("Restoration is complete but not all partitions were cleared.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    public void removeTaskFromAllStateMaps(StreamTask streamTask, Map<TaskId, StreamTask> map) {
        super.removeTaskFromAllStateMaps((AssignedStreamsTasks) streamTask, (Map<TaskId, AssignedStreamsTasks>) map);
        TaskId id = streamTask.id();
        HashSet hashSet = new HashSet(streamTask.partitions());
        hashSet.addAll(streamTask.changelogPartitions());
        if (map != this.restoring) {
            this.restoring.remove(id);
            this.restoringByPartition.keySet().removeAll(hashSet);
            this.restoredPartitions.removeAll(hashSet);
        }
        if (map != this.suspended) {
            this.suspended.remove(id);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addTaskToRestoring(StreamTask streamTask) {
        this.restoring.put(streamTask.id(), streamTask);
        Iterator<TopicPartition> it = streamTask.partitions().iterator();
        while (it.hasNext()) {
            this.restoringByPartition.put(it.next(), streamTask);
        }
        Iterator<TopicPartition> it2 = streamTask.changelogPartitions().iterator();
        while (it2.hasNext()) {
            this.restoringByPartition.put(it2.next(), streamTask);
        }
    }

    private void removeFromRestoringByPartition(StreamTask streamTask) {
        this.restoringByPartition.keySet().removeAll(streamTask.partitions());
        this.restoringByPartition.keySet().removeAll(streamTask.changelogPartitions());
    }

    private void removeFromRestoredPartitions(StreamTask streamTask) {
        this.restoredPartitions.removeAll(streamTask.partitions());
        this.restoredPartitions.removeAll(streamTask.changelogPartitions());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommitPerUserRequested() {
        int i = 0;
        RuntimeException runtimeException = null;
        for (StreamTask streamTask : this.running.values()) {
            try {
                if (streamTask.commitRequested() && streamTask.commitNeeded()) {
                    streamTask.commit();
                    i++;
                    this.log.debug("Committed stream task {} per user request in", streamTask.id());
                }
            } catch (RuntimeException e) {
                this.log.error("Failed to commit stream task {} due to the following error:", streamTask.id(), e);
                if (runtimeException == null) {
                    runtimeException = e;
                }
            } catch (TaskMigratedException e2) {
                this.log.info("Failed to commit stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", streamTask.id());
                throw e2;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, Long> recordsToDelete() {
        HashMap hashMap = new HashMap();
        Iterator it = this.running.values().iterator();
        while (it.hasNext()) {
            hashMap.putAll(((StreamTask) it.next()).purgableOffsets());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process(long j) {
        int i = 0;
        for (StreamTask streamTask : this.running.values()) {
            try {
                if (streamTask.isProcessable(j) && streamTask.process()) {
                    i++;
                }
            } catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", streamTask.id(), e);
                throw e;
            } catch (TaskMigratedException e2) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", streamTask.id());
                throw e2;
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        int i = 0;
        for (StreamTask streamTask : this.running.values()) {
            try {
                if (streamTask.maybePunctuateStreamTime()) {
                    i++;
                }
                if (streamTask.maybePunctuateSystemTime()) {
                    i++;
                }
            } catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", streamTask.id());
                throw e;
            } catch (KafkaException e2) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", streamTask.id(), e2);
                throw e2;
            }
        }
        return i;
    }

    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    void clear() {
        super.clear();
        this.restoring.clear();
        this.restoringByPartition.clear();
        this.restoredPartitions.clear();
        this.suspended.clear();
        this.prevActiveTasks.clear();
    }

    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    public void shutdown(boolean z) {
        this.log.debug("{} shutdown of all active tasks\nnon-initialized stream tasks to close: {}\nrestoring tasks to close: {}\nrunning stream tasks to close: {}\nsuspended stream tasks to close: {}", new Object[]{z ? "Clean" : "Unclean", this.created.keySet(), this.restoring.keySet(), this.running.keySet(), this.suspended.keySet()});
        super.shutdown(z);
    }

    @Override // org.apache.kafka.streams.processor.internals.AssignedTasks
    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(super.toString(str));
        describeTasks(sb, this.restoring.values(), str, "Restoring:");
        describePartitions(sb, this.restoringByPartition.keySet(), str, "Restoring Partitions:");
        describePartitions(sb, this.restoredPartitions, str, "Restored Partitions:");
        describeTasks(sb, this.suspended.values(), str, "Suspended:");
        return sb.toString();
    }

    Collection<StreamTask> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }

    Set<TaskId> restoringTaskIds() {
        return new HashSet(this.restoring.keySet());
    }
}
