/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.BrokerExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskGenerator;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionTaskPlanner {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionTaskPlanner.class);
    private final ExecutionTaskGenerator taskGenerator;
    private Map<Integer, SortedSet<ExecutionTask>> interPartMoveTaskByBrokerId;
    private Map<Integer, SortedSet<ExecutionTask>> intraPartMoveTaskByBrokerId;
    private final Map<Long, ExecutionTask> remainingLeadershipMovements;
    private final ReplicaMovementStrategy defaultReplicaMovementTaskStrategy;

    public ExecutionTaskPlanner(ConfluentAdmin adminClient, List<String> defaultReplicaMovementStrategies, KafkaCruiseControlConfig config) {
        this(new ExecutionTaskGenerator(adminClient, config), defaultReplicaMovementStrategies);
    }

    ExecutionTaskPlanner(ExecutionTaskGenerator taskGenerator, List<String> defaultReplicaMovementStrategies) {
        this.taskGenerator = taskGenerator;
        this.interPartMoveTaskByBrokerId = new HashMap<Integer, SortedSet<ExecutionTask>>();
        this.intraPartMoveTaskByBrokerId = new HashMap<Integer, SortedSet<ExecutionTask>>();
        this.defaultReplicaMovementTaskStrategy = ReplicaMovementStrategy.generateChainedReplicaMovementStrategies(defaultReplicaMovementStrategies);
        this.remainingLeadershipMovements = new HashMap<Long, ExecutionTask>();
    }

    public void addExecutionProposals(Collection<ExecutionProposal> proposals, Cluster cluster) {
        LOG.trace("Cluster state before adding proposals: {}.", (Object)cluster);
        this.maybeAddInterBrokerReplicaMovementTasks(proposals, cluster);
        this.maybeAddIntraBrokerReplicaMovementTasks(proposals);
        this.maybeAddLeaderChangeTasks(proposals, cluster);
        this.sanityCheckExecutionTasks();
        this.maybeDropReplicaSwapTasks();
    }

    private void sanityCheckExecutionTasks() {
        if (this.anyRemainingIntraBrokerTasks()) {
            for (ExecutionTask task : this.remainingInterBrokerReplicaMovements()) {
                if (task.proposal().replicasToAdd().size() <= 0) continue;
                throw new IllegalStateException("Intra-broker partition movement should not mingle with inter-broker partition movement.");
            }
        }
    }

    private void maybeDropReplicaSwapTasks() {
        if (this.anyRemainingIntraBrokerTasks()) {
            this.interPartMoveTaskByBrokerId.clear();
        }
    }

    private void maybeAddInterBrokerReplicaMovementTasks(Collection<ExecutionProposal> proposals, Cluster cluster) {
        Set<ExecutionTask> generatedTasks = this.taskGenerator.generateInterBrokerReplicaMovementTasks(proposals, cluster);
        this.interPartMoveTaskByBrokerId = this.orderInterBrokerReplicaMovementTasks(generatedTasks, cluster);
    }

    public void overrideInterBrokerTasksWithOrdering(Collection<ExecutionTask> tasks, Cluster cluster) {
        if (this.anyRemainingInterBrokerTasks()) {
            throw new IllegalStateException("Cannot re-plan ordering of inter-broker execution tasks while existing inter-broker tasks remain unexecuted as they would be overridden.");
        }
        this.interPartMoveTaskByBrokerId = this.orderInterBrokerReplicaMovementTasks(new HashSet<ExecutionTask>(tasks), cluster);
    }

    private void maybeAddIntraBrokerReplicaMovementTasks(Collection<ExecutionProposal> proposals) {
        this.intraPartMoveTaskByBrokerId = this.taskGenerator.generateIntraBrokerReplicaMovementTasks(proposals);
    }

    private void maybeAddLeaderChangeTasks(Collection<ExecutionProposal> proposals, Cluster cluster) {
        Set<ExecutionTask> leaderTasks = this.taskGenerator.generateLeaderChangeTasks(proposals, cluster);
        leaderTasks.forEach(t -> this.remainingLeadershipMovements.put(t.executionId(), (ExecutionTask)t));
    }

    public Set<ExecutionTask> remainingInterBrokerReplicaMovements() {
        HashSet<ExecutionTask> pendingExecutionTasks = new HashSet<ExecutionTask>();
        this.interPartMoveTaskByBrokerId.values().forEach(pendingExecutionTasks::addAll);
        return pendingExecutionTasks;
    }

    public Set<ExecutionTask> remainingIntraBrokerReplicaMovements() {
        HashSet<ExecutionTask> pendingIntraExecutionTasks = new HashSet<ExecutionTask>();
        this.intraPartMoveTaskByBrokerId.values().forEach(pendingIntraExecutionTasks::addAll);
        return pendingIntraExecutionTasks;
    }

    boolean anyRemainingInterBrokerTasks() {
        return this.interPartMoveTaskByBrokerId.values().stream().filter(s -> !s.isEmpty()).findFirst().isPresent();
    }

    boolean anyRemainingIntraBrokerTasks() {
        return this.intraPartMoveTaskByBrokerId.values().stream().filter(s -> !s.isEmpty()).findFirst().isPresent();
    }

    public Collection<ExecutionTask> remainingLeadershipMovements() {
        return this.remainingLeadershipMovements.values();
    }

    public List<ExecutionTask> getLeadershipMovementTasks(int numTasks) {
        ArrayList<ExecutionTask> leadershipMovementsList = new ArrayList<ExecutionTask>();
        Iterator<ExecutionTask> leadershipMovementIter = this.remainingLeadershipMovements.values().iterator();
        for (int i = 0; i < numTasks && leadershipMovementIter.hasNext(); ++i) {
            leadershipMovementsList.add(leadershipMovementIter.next());
            leadershipMovementIter.remove();
        }
        return leadershipMovementsList;
    }

    public List<ExecutionTask> drainInterBrokerTasks(BrokerExecutionTaskTracker brokerExecutionTracker, Set<TopicPartition> inProgressPartitions) {
        LOG.trace("Generating inter-broker replica movement tasks for brokers with concurrency {}", (Object)brokerExecutionTracker);
        ArrayList<ExecutionTask> executableReplicaMovements = new ArrayList<ExecutionTask>();
        boolean newTaskAdded = true;
        HashSet<Integer> brokerInvolved = new HashSet<Integer>();
        HashSet<TopicPartition> partitionsInvolved = new HashSet<TopicPartition>();
        while (newTaskAdded) {
            newTaskAdded = false;
            brokerInvolved.clear();
            block1: for (Integer brokerId : brokerExecutionTracker.knownBrokers()) {
                if (brokerInvolved.contains(brokerId)) continue;
                SortedSet<ExecutionTask> proposalsForBroker = this.interPartMoveTaskByBrokerId.get(brokerId);
                LOG.trace("Execution task for broker {} are {}", (Object)brokerId, proposalsForBroker);
                if (proposalsForBroker == null) continue;
                for (ExecutionTask task : proposalsForBroker) {
                    LOG.trace("Considering execution task {} for broker {}", (Object)task, (Object)brokerId);
                    int sourceBroker = task.proposal().oldLeader().brokerId();
                    Set<Integer> destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
                    if (brokerInvolved.contains(sourceBroker) || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) {
                        LOG.trace("Skipping execution task {} because either the source or the destination brokers were already involved in the generated proposals", (Object)task);
                        continue;
                    }
                    TopicPartition tp = task.proposal().topicPartition();
                    boolean isExecutable = this.isExecutableTask(task, brokerExecutionTracker);
                    boolean isPartitionInProgress = inProgressPartitions.contains(tp);
                    boolean isPartitionInvolved = partitionsInvolved.contains(tp);
                    if (isExecutable && !isPartitionInProgress && !isPartitionInvolved) {
                        partitionsInvolved.add(tp);
                        executableReplicaMovements.add(task);
                        brokerInvolved.add(sourceBroker);
                        brokerInvolved.addAll(destinationBrokers);
                        this.removeInterBrokerReplicaActionForExecution(task);
                        int nSourceSlots = task.requiredParallelism();
                        brokerExecutionTracker.addTaskForBroker(sourceBroker, nSourceSlots);
                        for (int broker : destinationBrokers) {
                            brokerExecutionTracker.addTaskForBroker(broker, 1);
                        }
                        newTaskAdded = true;
                        LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", new Object[]{task, brokerId, brokerExecutionTracker});
                        continue block1;
                    }
                    LOG.trace("Skipped execution task {} - isExecutable: {}, isPartitionInProgress: {}, isPartitionInvolved: {}", new Object[]{task, isExecutable, isPartitionInProgress, isPartitionInvolved});
                }
            }
        }
        LOG.trace("Generated {} inter-broker replica movement tasks for brokers with concurrency {}", (Object)executableReplicaMovements.size(), (Object)brokerExecutionTracker);
        return executableReplicaMovements;
    }

    public List<ExecutionTask> drainIntraBrokerTasks(BrokerExecutionTaskTracker brokerExecutionTracker) {
        LOG.trace("Getting intra-broker replica movement tasks for brokers with concurrency {}", (Object)brokerExecutionTracker);
        ArrayList<ExecutionTask> executableReplicaMovements = new ArrayList<ExecutionTask>();
        for (int brokerId : brokerExecutionTracker.knownBrokers()) {
            if (!this.intraPartMoveTaskByBrokerId.containsKey(brokerId)) continue;
            Iterator tasksForBroker = this.intraPartMoveTaskByBrokerId.get(brokerId).iterator();
            while (tasksForBroker.hasNext() && !brokerExecutionTracker.wouldOverloadBroker(brokerId, 1)) {
                ExecutionTask task = (ExecutionTask)tasksForBroker.next();
                executableReplicaMovements.add(task);
                tasksForBroker.remove();
            }
        }
        return executableReplicaMovements;
    }

    public void clear() {
        this.intraPartMoveTaskByBrokerId.clear();
        this.interPartMoveTaskByBrokerId.clear();
        this.remainingLeadershipMovements.clear();
    }

    private boolean isExecutableTask(ExecutionTask task, BrokerExecutionTaskTracker brokerExecutionTracker) {
        int nSimultaneousMoves = task.requiredParallelism();
        if (brokerExecutionTracker.wouldOverloadBroker(task.proposal().oldLeader().brokerId(), nSimultaneousMoves)) {
            return false;
        }
        for (ReplicaPlacementInfo destinationBroker : task.proposal().replicasToAdd()) {
            if (!brokerExecutionTracker.wouldOverloadBroker(destinationBroker.brokerId(), 1)) continue;
            return false;
        }
        return true;
    }

    private void removeInterBrokerReplicaActionForExecution(ExecutionTask task) {
        int sourceBroker = task.proposal().oldLeader().brokerId();
        this.interPartMoveTaskByBrokerId.get(sourceBroker).remove(task);
        for (ReplicaPlacementInfo destinationBroker : task.proposal().replicasToAdd()) {
            this.interPartMoveTaskByBrokerId.get(destinationBroker.brokerId()).remove(task);
        }
    }

    private Map<Integer, SortedSet<ExecutionTask>> orderInterBrokerReplicaMovementTasks(Set<ExecutionTask> tasks, Cluster cluster) {
        return this.defaultReplicaMovementTaskStrategy.applyStrategy(tasks, cluster);
    }
}

