/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.BrokerExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskPlanner;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.GuardedBy;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionTaskManager {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionTaskManager.class);
    @GuardedBy(value="this")
    private final BrokerExecutionTaskTracker inProgressInterBrokerReplicaMovementTracker;
    @GuardedBy(value="this")
    private final BrokerExecutionTaskTracker inProgressIntraBrokerReplicaMovementTracker;
    private final Set<TopicPartition> inProgressPartitionsForInterBrokerMovement;
    private final ExecutionTaskTracker executionTaskTracker;
    private final ExecutionTaskPlanner executionTaskPlanner;
    private final int defaultInterBrokerPartitionMovementConcurrency;
    private final int defaultIntraBrokerPartitionMovementConcurrency;
    private final int defaultLeadershipMovementConcurrency;
    private final Set<Integer> brokersToSkipConcurrencyCheck;
    private final Set<String> throttledTopics;

    public ExecutionTaskManager(int defaultInterBrokerPartitionMovementConcurrency, int defaultIntraBrokerPartitionMovementConcurrency, int defaultLeadershipMovementConcurrency, List<String> replicaMovementStrategies, ConfluentAdmin adminClient, DataBalancerMetricsRegistry metricRegistry, Time time, KafkaCruiseControlConfig config) {
        this.inProgressInterBrokerReplicaMovementTracker = new BrokerExecutionTaskTracker(defaultInterBrokerPartitionMovementConcurrency);
        this.inProgressIntraBrokerReplicaMovementTracker = new BrokerExecutionTaskTracker(defaultIntraBrokerPartitionMovementConcurrency);
        this.inProgressPartitionsForInterBrokerMovement = new HashSet<TopicPartition>();
        this.executionTaskTracker = new ExecutionTaskTracker(metricRegistry, time);
        this.executionTaskPlanner = new ExecutionTaskPlanner(adminClient, replicaMovementStrategies, config);
        this.defaultInterBrokerPartitionMovementConcurrency = defaultInterBrokerPartitionMovementConcurrency;
        this.defaultIntraBrokerPartitionMovementConcurrency = defaultIntraBrokerPartitionMovementConcurrency;
        this.defaultLeadershipMovementConcurrency = defaultLeadershipMovementConcurrency;
        this.brokersToSkipConcurrencyCheck = new HashSet<Integer>();
        this.throttledTopics = new HashSet<String>();
    }

    public synchronized int interBrokerPartitionMovementConcurrency() {
        return this.defaultInterBrokerPartitionMovementConcurrency;
    }

    public synchronized int intraBrokerPartitionMovementConcurrency() {
        return this.defaultIntraBrokerPartitionMovementConcurrency;
    }

    public synchronized int leadershipMovementConcurrency() {
        return this.defaultLeadershipMovementConcurrency;
    }

    public synchronized List<ExecutionTask> drainInterBrokerReplicaMovementTasks() {
        return this.executionTaskPlanner.drainInterBrokerTasks(BrokerExecutionTaskTracker.duplicate(this.inProgressInterBrokerReplicaMovementTracker), this.inProgressPartitionsForInterBrokerMovement);
    }

    public synchronized List<ExecutionTask> getIntraBrokerReplicaMovementTasks() {
        return this.executionTaskPlanner.drainIntraBrokerTasks(BrokerExecutionTaskTracker.duplicate(this.inProgressIntraBrokerReplicaMovementTracker));
    }

    public synchronized List<ExecutionTask> getLeadershipMovementTasks() {
        return this.executionTaskPlanner.getLeadershipMovementTasks(this.leadershipMovementConcurrency());
    }

    public Set<String> throttledTopics() {
        return this.throttledTopics;
    }

    public synchronized void addExecutionProposals(Collection<ExecutionProposal> proposals, Collection<Integer> brokersToSkipConcurrencyCheck, Cluster cluster) {
        this.executionTaskPlanner.addExecutionProposals(proposals, cluster);
        for (ExecutionProposal p : proposals) {
            if (p.replicaMovementParallelism() > this.interBrokerPartitionMovementConcurrency()) {
                LOG.warn("Proposal required concurrency ({}) is greater than the max allowed ({}).", (Object)p.replicaMovementParallelism(), (Object)this.interBrokerPartitionMovementConcurrency());
            }
            p.replicasToMoveBetweenDisksByBroker().keySet().forEach(this.inProgressIntraBrokerReplicaMovementTracker::maybeAddBroker);
            this.inProgressInterBrokerReplicaMovementTracker.maybeAddBroker(p.oldLeader().brokerId());
            p.replicasToAdd().forEach(r -> this.inProgressInterBrokerReplicaMovementTracker.maybeAddBroker(r.brokerId()));
        }
        this.executionTaskTracker.addTasksToTrace(this.executionTaskPlanner.remainingInterBrokerReplicaMovements(), ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
        this.executionTaskTracker.addTasksToTrace(this.executionTaskPlanner.remainingIntraBrokerReplicaMovements(), ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION);
        this.executionTaskTracker.addTasksToTrace(this.executionTaskPlanner.remainingLeadershipMovements(), ExecutionTask.TaskType.LEADER_ACTION);
        this.brokersToSkipConcurrencyCheck.clear();
        if (brokersToSkipConcurrencyCheck != null) {
            this.brokersToSkipConcurrencyCheck.addAll(brokersToSkipConcurrencyCheck);
        }
    }

    public synchronized void reloadInterBrokerTasksToBeRetried(Cluster cluster) {
        ArrayList<ExecutionTask> interBrokerTasks = new ArrayList<ExecutionTask>(this.executionTaskTracker.tasksToBeRetried(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION));
        this.executionTaskPlanner.overrideInterBrokerTasksWithOrdering(interBrokerTasks, cluster);
        this.markRetriableTasksBackToPending(interBrokerTasks);
    }

    public synchronized void markTasksInProgress(List<ExecutionTask> tasks) {
        for (ExecutionTask task : tasks) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.IN_PROGRESS);
            this.throttledTopics.add(task.proposal().topic());
            switch (task.type()) {
                case INTER_BROKER_REPLICA_ACTION: {
                    this.inProgressPartitionsForInterBrokerMovement.add(task.proposal().topicPartition());
                    int oldLeader = task.proposal().oldLeader().brokerId();
                    int replicasInMotion = task.requiredParallelism();
                    this.inProgressInterBrokerReplicaMovementTracker.addTaskForBroker(oldLeader, replicasInMotion);
                    task.proposal().replicasToAdd().forEach(r -> this.inProgressInterBrokerReplicaMovementTracker.addTaskForBroker(r.brokerId(), 1));
                    break;
                }
                case INTRA_BROKER_REPLICA_ACTION: {
                    this.inProgressIntraBrokerReplicaMovementTracker.addTaskForBroker(task.brokerId(), 1);
                    break;
                }
            }
        }
    }

    public synchronized void markTasksToBeRetried(List<ExecutionTask> tasks) {
        for (ExecutionTask task : tasks) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.TO_RETRY);
            this.clearOngoingTask(task);
        }
    }

    private synchronized void markRetriableTasksBackToPending(List<ExecutionTask> retriableTasks) {
        for (ExecutionTask task : retriableTasks) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.PENDING);
        }
    }

    public synchronized void markTasksAborting(List<ExecutionTask> tasks) {
        tasks.forEach(task -> this.markTaskAborting((ExecutionTask)task));
    }

    public synchronized void markTaskDone(ExecutionTask task) {
        if (task.state() == ExecutionTask.State.IN_PROGRESS) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.COMPLETED);
            this.clearOngoingTask(task);
        } else if (task.state() == ExecutionTask.State.ABORTING) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.ABORTED);
            this.clearOngoingTask(task);
        }
    }

    public synchronized void markTaskAborting(ExecutionTask task) {
        if (task.state() == ExecutionTask.State.IN_PROGRESS) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.ABORTING);
        }
    }

    public synchronized void markTaskDead(ExecutionTask task) {
        if (task.state() != ExecutionTask.State.DEAD) {
            this.executionTaskTracker.markTaskState(task, ExecutionTask.State.DEAD);
            this.clearOngoingTask(task);
        }
    }

    private void clearOngoingTask(ExecutionTask task) {
        switch (task.type()) {
            case INTER_BROKER_REPLICA_ACTION: {
                this.inProgressPartitionsForInterBrokerMovement.remove(task.proposal().topicPartition());
                int oldLeader = task.proposal().oldLeader().brokerId();
                int replicasInMotion = task.requiredParallelism();
                this.inProgressInterBrokerReplicaMovementTracker.removeTaskForBroker(oldLeader, replicasInMotion);
                task.proposal().replicasToAdd().forEach(r -> this.inProgressInterBrokerReplicaMovementTracker.removeTaskForBroker(r.brokerId(), 1));
                break;
            }
            case INTRA_BROKER_REPLICA_ACTION: {
                this.inProgressIntraBrokerReplicaMovementTracker.removeTaskForBroker(task.brokerId(), 1);
                break;
            }
        }
    }

    public synchronized int numInterBrokerPartitionMovementsToBeRetried() {
        return this.executionTaskTracker.numInterBrokerPartitionMovementsToBeRetried();
    }

    public synchronized int numPendingInterBrokerPartitionMovements() {
        return this.executionTaskTracker.numPendingInterBrokerPartitionMovements();
    }

    public synchronized long remainingInterBrokerDataToMoveInMB() {
        return this.executionTaskTracker.remainingInterBrokerDataToMoveInMB();
    }

    public synchronized Map<Integer, Long> remainingInterBrokerDataToMoveByDestinationBroker() {
        return this.executionTaskTracker.remainingInterBrokerDataToMoveByDestinationBroker();
    }

    public synchronized int numFinishedInterBrokerPartitionMovements() {
        return this.executionTaskTracker.numFinishedInterBrokerPartitionMovements();
    }

    public synchronized long finishedInterBrokerDataMovementInMB() {
        return this.executionTaskTracker.finishedInterBrokerDataMovementInMB();
    }

    public synchronized Set<ExecutionTask> inExecutionTasks() {
        return this.inExecutionTasks(ExecutionTask.TaskType.cachedValues());
    }

    public synchronized Set<ExecutionTask> inExecutionTasks(Collection<ExecutionTask.TaskType> types) {
        return this.executionTaskTracker.inExecutionTasks(types);
    }

    public synchronized long inExecutionInterBrokerDataToMoveInMB() {
        return this.executionTaskTracker.inExecutionInterBrokerDataMovementInMB();
    }

    public synchronized int numPendingLeadershipMovements() {
        return this.executionTaskTracker.numPendingLeadershipMovements();
    }

    public synchronized int numFinishedLeadershipMovements() {
        return this.executionTaskTracker.numFinishedLeadershipMovements();
    }

    public synchronized int numPendingIntraBrokerPartitionMovements() {
        return this.executionTaskTracker.numPendingIntraBrokerPartitionMovements();
    }

    public synchronized int numFinishedIntraBrokerPartitionMovements() {
        return this.executionTaskTracker.numFinishedIntraBrokerPartitionMovements();
    }

    Map<Integer, Integer> inProgressInterBrokerMovementsByBrokerId() {
        return this.inProgressInterBrokerReplicaMovementTracker.readonlyView();
    }

    public synchronized void clear() {
        this.brokersToSkipConcurrencyCheck.clear();
        this.inProgressInterBrokerReplicaMovementTracker.clear();
        this.inProgressIntraBrokerReplicaMovementTracker.clear();
        this.inProgressPartitionsForInterBrokerMovement.clear();
        this.executionTaskPlanner.clear();
        this.executionTaskTracker.clear();
        this.throttledTopics.clear();
    }

    public synchronized void setStopRequested() {
        this.executionTaskTracker.setStopRequested();
    }

    public synchronized ExecutionTaskTracker.ExecutionTasksSummary getExecutionTasksSummary(Set<ExecutionTask.TaskType> taskTypesToGetFullList) {
        return this.executionTaskTracker.getExecutionTasksSummary(taskTypesToGetFullList);
    }
}

