/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.EntityCombinator;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Partition;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;

public class CellAwareGoal
extends AbstractGoal {
    @Override
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction action, ClusterModel clusterModel) {
        Cell destinationCell = clusterModel.broker(action.destinationBrokerId()).cell();
        Cell sourceCell = clusterModel.broker(action.sourceBrokerId()).cell();
        List<Replica> replicasOfPartition = clusterModel.partition(action.topicPartition()).replicas();
        return replicasOfPartition.size() == 1 || sourceCell.equals(destinationCell) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction action, ClusterModel clusterModel) {
        Set destinationCells = action.replicaMoves().values().stream().map(Broker::cell).collect(Collectors.toSet());
        if (destinationCells.size() != 1) {
            return ActionAcceptance.REPLICA_REJECT;
        }
        List<Replica> partitionReplicas = clusterModel.partition(action.topicPartition()).replicas();
        if (action.replicaMoves().size() == partitionReplicas.size()) {
            return ActionAcceptance.ACCEPT;
        }
        Cell originalCell = partitionReplicas.get(0).broker().cell();
        return originalCell.equals(destinationCells.iterator().next()) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0, true, false);
    }

    @Override
    public String name() {
        return CellAwareGoal.class.getSimpleName();
    }

    @Override
    public void finish() {
        this.finished = true;
    }

    @Override
    public boolean canChangeReplicationFactor() {
        return false;
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.allBrokers();
    }

    @Override
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction action) {
        return true;
    }

    @Override
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction action) {
        return true;
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) throws OptimizationFailureException {
        if (clusterModel.skipInterCellBalancing()) {
            return;
        }
        int maxCellSize = clusterModel.cellsById().values().stream().mapToInt(cell -> cell.brokers().size()).max().orElseThrow(() -> new IllegalStateException("CellAwareGoal cannot find maxCellSize since ClusterModel doesn't have cell information"));
        for (Map.Entry<String, List<Partition>> partitionsByTopic : clusterModel.getPartitionsByTopic().entrySet()) {
            List<Partition> topicPartitions;
            if (optimizationOptions.excludedTopics().contains(partitionsByTopic.getKey()) || (topicPartitions = partitionsByTopic.getValue()).isEmpty() || topicPartitions.get(0).partitionBrokers().size() <= maxCellSize) continue;
            throw new OptimizationFailureException(String.format("Partition %d of topic %s has %d replicas, which is greater than the max cell size of %d brokers", topicPartitions.get(0).topicPartition().partition(), topicPartitions.get(0).topicPartition().topic(), topicPartitions.get(0).partitionBrokers().size(), maxCellSize));
        }
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        CellAwareGoal.cellBoundariesAreMet(clusterModel, excludedTopics);
        this.finish();
    }

    @Override
    protected void rebalanceForBroker(Broker sourceBroker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        if (clusterModel.skipInterCellBalancing()) {
            return;
        }
        for (Replica replica : sourceBroker.replicas()) {
            if (CellAwareGoal.shouldExclude(replica, optimizationOptions.excludedTopics())) continue;
            Partition partition = clusterModel.partition(replica.topicPartition());
            Cell leaderCell = partition.leader().broker().cell();
            if (replica.broker().cell().equals(leaderCell)) continue;
            List<Replica> replicasToRelocate = partition.replicas().stream().filter(replica1 -> !replica1.broker().cell().equals(leaderCell)).collect(Collectors.toList());
            HashSet<Broker> partitionBrokers = new HashSet<Broker>(partition.partitionBrokers());
            List validBrokers = leaderCell.brokers().stream().filter(broker -> !partitionBrokers.contains(broker)).collect(Collectors.toList());
            if (validBrokers.size() < replicasToRelocate.size()) {
                throw new OptimizationFailureException(String.format("[%s] Can't meet cell-awareness requirement for broker with id %d. partition %s has %d replicas but leader cell has only %d brokers", this.name(), sourceBroker.id(), partition, partitionBrokers.size(), leaderCell.brokers().size()));
            }
            Iterable<List<Broker>> candidateBrokersIterable = EntityCombinator.singleEntityListIterable(validBrokers, replicasToRelocate.size());
            Map<Replica, Broker> replicaMoves = GoalUtils.getPartitionMoves(clusterModel, optimizedGoals, replicasToRelocate, candidateBrokersIterable);
            if (replicaMoves.isEmpty()) {
                throw new OptimizationFailureException(String.format("[%s] Violated cell-awareness requirement for broker with id %d. replica: %s", this.name(), sourceBroker.id(), replica));
            }
            replicaMoves.forEach((replicaToMove, broker) -> this.relocateReplica(clusterModel, replicaToMove.topicPartition(), replicaToMove.broker().id(), broker.id()));
        }
    }

    public static void cellBoundariesAreMet(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        if (clusterModel.skipInterCellBalancing()) {
            return;
        }
        for (Map.Entry<String, List<Partition>> partitionsByTopic : clusterModel.getPartitionsByTopic().entrySet()) {
            if (excludedTopics.contains(partitionsByTopic.getKey())) continue;
            for (Partition partition : partitionsByTopic.getValue()) {
                Set cells = partition.partitionBrokers().stream().map(Broker::cell).collect(Collectors.toSet());
                if (cells.size() <= 1) continue;
                throw new OptimizationFailureException(String.format("Partition %d of topic %s has replicas in more than one cells: %s", partition.topicPartition().partition(), partition.topicPartition().topic(), cells));
            }
        }
    }
}

