/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskGenerationIdUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExecutionTaskGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionTaskGenerator.class);
    private final ConfluentAdmin adminClient;
    private final KafkaCruiseControlConfig config;
    private ExecutionTaskGenerationIdUtils generatorUtils;

    public ExecutionTaskGenerator(ConfluentAdmin adminClient, KafkaCruiseControlConfig config) {
        this.adminClient = adminClient;
        this.config = config;
        this.generatorUtils = new ExecutionTaskGenerationIdUtils();
    }

    public Set<ExecutionTask> generateInterBrokerReplicaMovementTasks(Collection<ExecutionProposal> proposals, Cluster cluster) {
        Predicate<ExecutionProposal> shouldCreateInterBrokerTask = proposal -> {
            TopicPartition tp = proposal.topicPartition();
            PartitionInfo partitionInfo = cluster.partition(tp);
            return partitionInfo != null && !proposal.isInterBrokerMovementCompleted(partitionInfo.replicas(), partitionInfo.observers());
        };
        Function<ExecutionProposal, ExecutionTask> generateInterBrokerTask = proposal -> new ExecutionTask(this.generatorUtils.allocateExecutionId(), (ExecutionProposal)proposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
        return proposals.stream().filter(shouldCreateInterBrokerTask).map(generateInterBrokerTask).collect(Collectors.toSet());
    }

    public Map<Integer, SortedSet<ExecutionTask>> generateIntraBrokerReplicaMovementTasks(Collection<ExecutionProposal> proposals) {
        Map<Integer, SortedSet<ExecutionTask>> intraBrokerMoveTasks = new HashMap<Integer, SortedSet<ExecutionTask>>();
        Set replicasToCheckLogdir = proposals.stream().flatMap(p -> p.replicasToMoveBetweenDisksByBroker().keySet().stream().map(broker -> new TopicPartitionReplica(p.topic(), p.partitionId(), broker.intValue()))).collect(Collectors.toSet());
        if (!replicasToCheckLogdir.isEmpty()) {
            HashMap currentLogdirByReplica = new HashMap(replicasToCheckLogdir.size());
            Map logDirsByReplicas = this.adminClient.describeReplicaLogDirs(replicasToCheckLogdir, (DescribeReplicaLogDirsOptions)new DescribeReplicaLogDirsOptions().timeoutMs(this.config.getInt("logdir.response.timeout.ms"))).values();
            for (Map.Entry entry : logDirsByReplicas.entrySet()) {
                try {
                    DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = (DescribeReplicaLogDirsResult.ReplicaLogDirInfo)((KafkaFuture)entry.getValue()).get();
                    currentLogdirByReplica.put(entry.getKey(), info.getCurrentReplicaLogDir());
                }
                catch (InterruptedException | ExecutionException e) {
                    LOG.warn("Encounter exception {} when fetching logdir information for replica {}.", (Object)e.getMessage(), entry.getKey());
                }
            }
            intraBrokerMoveTasks = proposals.stream().flatMap(p -> p.replicasToMoveBetweenDisksByBroker().values().stream().filter(r -> {
                String currentLogdir = (String)currentLogdirByReplica.get(new TopicPartitionReplica(p.topic(), p.partitionId(), r.brokerId().intValue()));
                return currentLogdir != null && !currentLogdir.equals(r.logdir());
            }).map(r -> new ExecutionTask(this.generatorUtils.allocateExecutionId(), (ExecutionProposal)p, r.brokerId(), ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION))).collect(Collectors.groupingBy(ExecutionTask::brokerId, Collectors.toCollection(TreeSet::new)));
        }
        return intraBrokerMoveTasks;
    }

    public Set<ExecutionTask> generateLeaderChangeTasks(Collection<ExecutionProposal> proposals, Cluster cluster) {
        Predicate<ExecutionProposal> proposalChangesCurrentLeaderPredicate = proposal -> {
            Node currentLeader = cluster.leaderFor(proposal.topicPartition());
            return currentLeader != null && currentLeader.id() != proposal.newLeader().brokerId().intValue();
        };
        Function<ExecutionProposal, ExecutionTask> generateLeaderTask = proposal -> new ExecutionTask(this.generatorUtils.allocateExecutionId(), (ExecutionProposal)proposal, ExecutionTask.TaskType.LEADER_ACTION);
        return proposals.stream().filter(ExecutionProposal::hasLeaderAction).filter(proposalChangesCurrentLeaderPredicate).map(generateLeaderTask).collect(Collectors.toSet());
    }
}

