/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.ReplicationThrottleHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.kafka.clients.admin.ConfluentAdmin;

@NotThreadSafe
public class ExecutorIntraBrokerReplicaMovement
extends AbstractExecutorReplicaMovement {
    public ExecutorIntraBrokerReplicaMovement(String executionUuid, ExecutionTaskManager executionTaskManager, Set<Integer> recentlyRemovedBrokers, ReplicationThrottleHelper throttleHelper, ConfluentAdmin adminClient, SbkAdminUtils adminUtils, AtomicBoolean stopRequested) {
        super(executionUuid, executionTaskManager, recentlyRemovedBrokers, throttleHelper, adminClient, adminUtils, stopRequested);
    }

    @Override
    public void move(Executor.ExecutionTaskWaiter taskWaiter) throws InterruptedException {
        int numTotalPartitionMovements = this.executionTaskManager.numPendingIntraBrokerPartitionMovements();
        LOG.info("Starting {} intra-broker partition movements.", (Object)numTotalPartitionMovements);
        int partitionsToMove = numTotalPartitionMovements;
        while (!(partitionsToMove <= 0 && this.executionTaskManager.inExecutionTasks().isEmpty() || this.stopRequested.get())) {
            List<ExecutionTask> tasksToExecute = this.executionTaskManager.getIntraBrokerReplicaMovementTasks();
            LOG.info("Executor will execute {} task(s): {} as part of operation {}", new Object[]{tasksToExecute.size(), tasksToExecute, this.uuid});
            if (!tasksToExecute.isEmpty()) {
                this.executionTaskManager.markTasksInProgress(tasksToExecute);
                this.adminUtils.executeIntraBrokerReplicaMovements(tasksToExecute, this.executionTaskManager);
            }
            taskWaiter.waitForAnyTaskToFinish(this);
            partitionsToMove = this.executionTaskManager.numPendingIntraBrokerPartitionMovements();
            int numFinishedPartitionMovements = this.executionTaskManager.numFinishedIntraBrokerPartitionMovements();
            LOG.info("{}/{} ({}%) intra-broker partition movements completed.", new Object[]{numFinishedPartitionMovements, numTotalPartitionMovements, String.format(Locale.US, "%.2f", (double)numFinishedPartitionMovements * 100.0 / (double)numTotalPartitionMovements)});
        }
        Set<ExecutionTask> inExecutionTasks = this.executionTaskManager.inExecutionTasks();
        while (!inExecutionTasks.isEmpty()) {
            LOG.info("Waiting for {} tasks to finish: {}", (Object)inExecutionTasks.size(), inExecutionTasks);
            taskWaiter.waitForAnyTaskToFinish(this);
            inExecutionTasks = this.executionTaskManager.inExecutionTasks();
        }
        if (this.executionTaskManager.inExecutionTasks().isEmpty()) {
            LOG.info("Intra-broker partition movements finished.");
        } else if (this.stopRequested.get()) {
            ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = this.executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
            LOG.info("Intra-broker partition movements stopped. {}; for leadership movements {} task cancelled.", (Object)executionTasksSummary.summarize(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION), (Object)executionTasksSummary.taskStat().get((Object)ExecutionTask.TaskType.LEADER_ACTION).get((Object)ExecutionTask.State.PENDING));
        }
    }

    @Override
    public void maybeReexecuteTasks() {
        ArrayList<ExecutionTask> intraBrokerReplicaActionsToReexecute = new ArrayList<ExecutionTask>(this.executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)));
        this.adminUtils.getLogdirInfoForExecutionTask(intraBrokerReplicaActionsToReexecute).forEach((k, v) -> {
            String targetLogdir = k.proposal().replicasToMoveBetweenDisksByBroker().get(k.brokerId()).logdir();
            if (targetLogdir.equals(v.getCurrentReplicaLogDir()) || targetLogdir.equals(v.getFutureReplicaLogDir())) {
                intraBrokerReplicaActionsToReexecute.remove(k);
            }
        });
        if (!intraBrokerReplicaActionsToReexecute.isEmpty()) {
            LOG.info("Reexecuting tasks {}", intraBrokerReplicaActionsToReexecute);
            this.adminUtils.executeIntraBrokerReplicaMovements(intraBrokerReplicaActionsToReexecute, this.executionTaskManager);
        }
    }

    @Override
    ExecutionTask.TaskType taskType() {
        return ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION;
    }

    @Override
    ExecutorState.State state() {
        return ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS;
    }

    @Override
    ExecutorState executorState() {
        return ExecutorState.operationInProgress(this.state(), this.executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), this.executionTaskManager.interBrokerPartitionMovementConcurrency(), this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), this.executionTaskManager.leadershipMovementConcurrency(), this.uuid, this.recentlyRemovedBrokers);
    }
}

