/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.EntityCombinator;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Partition;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.Tenant;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import io.confluent.databalancer.DatabalancerUtils;
import io.confluent.databalancer.GoalConstraints;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import kafka.common.TenantHelpers;
import org.apache.kafka.common.PartitionPlacementStrategy;

public class TenantAwareGoal
extends AbstractGoal {
    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0, true, false);
    }

    @Override
    public String name() {
        return TenantAwareGoal.class.getSimpleName();
    }

    @Override
    public void finish() {
        this.finished = true;
    }

    boolean isFinished() {
        return this.finished;
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return new TreeSet<Broker>(clusterModel.brokersEligibleForRebalancing());
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) throws OptimizationFailureException {
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        DatabalancerUtils.ensureConstraintsAreMet(GoalConstraints.VERIFY_TENANTS, clusterModel, excludedTopics);
        this.finish();
    }

    @Override
    protected void rebalanceForBroker(Broker sourceBroker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        if (!clusterModel.isCellEnabled()) {
            return;
        }
        for (Replica leaderReplica : sourceBroker.leaderReplicas()) {
            if (TenantAwareGoal.shouldExclude(leaderReplica, optimizationOptions.excludedTopics()) || TenantHelpers.extractTenantPrefix((String)leaderReplica.topicPartition().topic(), (boolean)false) == null) continue;
            Partition partition = clusterModel.partition(leaderReplica.topicPartition());
            String tenantId = DatabalancerUtils.getTenantId(leaderReplica);
            if (clusterModel.tenant(tenantId).map(t -> !t.placementPolicy().equals((Object)PartitionPlacementStrategy.TENANT_IN_CELL)).orElse(false).booleanValue()) continue;
            Optional<Cell> expectedCell = clusterModel.tenant(tenantId).map(Tenant::cell);
            Set replicaCells = partition.partitionBrokers().stream().map(Broker::cell).collect(Collectors.toSet());
            if (replicaCells.size() == 1 && expectedCell.map(c -> c.equals(leaderReplica.broker().cell())).orElse(false).booleanValue()) continue;
            Set ineligibleBrokers = partition.partitionBrokers().stream().filter(broker -> expectedCell.map(e -> !e.equals(broker.cell())).orElse(false) != false && !broker.isEligibleSource()).collect(Collectors.toSet());
            if (!ineligibleBrokers.isEmpty()) {
                throw new OptimizationFailureException(String.format("Tenant aware goal cannot be satisfied. Partition %s has its replicas on brokers that are ineligible source %s", partition, ineligibleBrokers));
            }
            List<Replica> replicasToRelocate = partition.replicas().stream().filter(replica -> expectedCell.map(e -> !e.equals(replica.broker().cell())).orElse(false)).collect(Collectors.toList());
            HashSet<Broker> partitionBrokers = new HashSet<Broker>(partition.partitionBrokers());
            List validBrokers = expectedCell.map(e -> e.brokers().stream().filter(broker -> !partitionBrokers.contains(broker)).collect(Collectors.toList())).orElse(Collections.emptyList());
            List validEligibleBrokers = validBrokers.stream().filter(Broker::isEligibleDestination).collect(Collectors.toList());
            if (validEligibleBrokers.size() < replicasToRelocate.size()) {
                throw new OptimizationFailureException(String.format("[%s] Can't meet tenant awareness requirements for the broker with id %d, tenant %s partition %s has %d replicas but tenant cell has only %d brokers", this.name(), sourceBroker.id(), tenantId, partition, partitionBrokers.size(), validBrokers.size()));
            }
            Iterable<List<Broker>> candidateBrokersIterable = EntityCombinator.singleEntityListIterable(validBrokers, replicasToRelocate.size());
            Map<Replica, Broker> replicaMoves = GoalUtils.getPartitionMoves(clusterModel, optimizedGoals, replicasToRelocate, candidateBrokersIterable);
            if (replicaMoves.isEmpty()) {
                throw new OptimizationFailureException(String.format("[%s] Violated tenant awareness requirement for broker with id %d, tenant: %s, replica: %s", this.name(), sourceBroker.id(), tenantId, leaderReplica));
            }
            replicaMoves.forEach((replicaToMove, broker) -> this.relocateReplica(clusterModel, replicaToMove.topicPartition(), replicaToMove.broker().id(), broker.id()));
        }
    }

    @Override
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction action, ClusterModel clusterModel) {
        if (!clusterModel.isCellEnabled()) {
            return ActionAcceptance.ACCEPT;
        }
        Cell destinationCell = clusterModel.broker(action.destinationBrokerId()).cell();
        String tenantId = TenantHelpers.extractTenantPrefix((String)action.topic(), (boolean)false);
        Optional<Cell> expectedCell = DatabalancerUtils.expectedCellForTenant(clusterModel, tenantId);
        if (!expectedCell.isPresent()) {
            expectedCell = Optional.of(clusterModel.broker(action.sourceBrokerId()).cell());
        }
        Optional<Cell> finalExpectedCell = expectedCell;
        return clusterModel.tenant(tenantId).map(t -> {
            PartitionPlacementStrategy strategy = t.placementPolicy();
            switch (strategy) {
                case CLUSTER_WIDE: {
                    return ActionAcceptance.ACCEPT;
                }
                case PARTITION_IN_CELL: 
                case TENANT_IN_CELL: {
                    return finalExpectedCell.map(e -> e.equals(destinationCell)).orElse(false) != false ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
                }
            }
            return ActionAcceptance.REPLICA_REJECT;
        }).orElse(ActionAcceptance.REPLICA_REJECT);
    }

    @Override
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction action, ClusterModel clusterModel) {
        if (!clusterModel.isCellEnabled()) {
            return ActionAcceptance.ACCEPT;
        }
        Optional<String> tenantId = action.replicaMoves().keySet().stream().findAny().map(DatabalancerUtils::getTenantId);
        Optional expectedCell = tenantId.flatMap(tId -> DatabalancerUtils.expectedCellForTenant(clusterModel, tId));
        Set destinationCells = action.replicaMoves().values().stream().map(Broker::cell).collect(Collectors.toSet());
        return tenantId.flatMap(tId -> clusterModel.tenant((String)tId).map(tenant -> {
            PartitionPlacementStrategy strategy = tenant.placementPolicy();
            switch (strategy) {
                case CLUSTER_WIDE: {
                    return ActionAcceptance.ACCEPT;
                }
                case PARTITION_IN_CELL: {
                    return destinationCells.size() == 1 ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
                }
                case TENANT_IN_CELL: {
                    boolean cellMatched = destinationCells.stream().findAny().flatMap(dc -> expectedCell.map(ec -> ec.equals(dc))).orElse(false);
                    if (destinationCells.size() == 1 && cellMatched) {
                        return ActionAcceptance.ACCEPT;
                    }
                    return ActionAcceptance.REPLICA_REJECT;
                }
            }
            return ActionAcceptance.REPLICA_REJECT;
        })).orElse(ActionAcceptance.REPLICA_REJECT);
    }

    @Override
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction action) {
        return true;
    }

    @Override
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction action) {
        return true;
    }
}

