/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class ExecutionMetricsReporter {
    private static final String HISTOGRAM_PROPOSALS_PER_BROKER = "proposals-per-broker";
    private static final String HISTOGRAM_PROPOSAL_SIZE = "proposal-size-per-partition-mb";
    private static final String HISTOGRAM_PROPOSALS_SIZE_PER_BROKER = "proposals-size-per-broker-mb";
    private static final String HISTOGRAM_EXECUTION_TIME = "execution-time-ms";
    private static final String GAUGE_EXECUTION_STARTED = "execution-started";
    private static final String GAUGE_EXECUTION_STOPPED = "execution-stopped";
    private static final String GAUGE_CANCELLED_REASSIGNMENTS = "cancelled-reassignments";
    private static final String GAUGE_FAILED_REASSIGNMENT_CANCELLATIONS = "failed-reassignment-cancellations";
    private final DataBalancerMetricsRegistry metricRegistry;

    public ExecutionMetricsReporter(DataBalancerMetricsRegistry metricRegistry) {
        this.metricRegistry = metricRegistry;
    }

    public void reportExecutionTime(long timeMs) {
        this.metricRegistry.newHistogram(Executor.class, HISTOGRAM_EXECUTION_TIME).update(timeMs);
    }

    public void registerExecutionStartedGauge(Supplier<Integer> startedExecutions) {
        this.metricRegistry.newGauge(Executor.class, GAUGE_EXECUTION_STARTED, startedExecutions);
    }

    public void registerExecutionStoppedGauge(Supplier<Integer> startedExecutions) {
        this.metricRegistry.newGauge(Executor.class, GAUGE_EXECUTION_STOPPED, startedExecutions);
    }

    public void registerCancelledReassignmentsGauge(Supplier<Integer> startedExecutions) {
        this.metricRegistry.newGauge(Executor.class, GAUGE_CANCELLED_REASSIGNMENTS, startedExecutions);
    }

    public void registerFailedReassignmentCancellationsGauge(Supplier<Integer> startedExecutions) {
        this.metricRegistry.newGauge(Executor.class, GAUGE_FAILED_REASSIGNMENT_CANCELLATIONS, startedExecutions);
    }

    public void reportProposals(Collection<ExecutionProposal> proposals) {
        Map<Integer, List<ExecutionProposal>> perBrokerProposals = this.perBrokerProposals(proposals);
        this.reportPerBrokerProposalsMetrics(perBrokerProposals);
        this.reportProposalsDataMovement(proposals);
    }

    private void reportPerBrokerProposalsMetrics(Map<Integer, List<ExecutionProposal>> perBrokerProposals) {
        perBrokerProposals.forEach((key, value) -> {
            this.metricRegistry.newHistogram(Executor.class, HISTOGRAM_PROPOSALS_PER_BROKER).update(value.size());
            long proposalsSize = value.stream().mapToLong(ExecutionProposal::dataToMoveInMB).sum();
            this.metricRegistry.newHistogram(Executor.class, HISTOGRAM_PROPOSALS_SIZE_PER_BROKER).update(proposalsSize);
        });
    }

    private void reportProposalsDataMovement(Collection<ExecutionProposal> proposals) {
        for (ExecutionProposal proposal : proposals) {
            if (Objects.equals(proposal.newLeader().brokerId(), proposal.oldLeader().brokerId())) continue;
            this.metricRegistry.newHistogram(Executor.class, HISTOGRAM_PROPOSAL_SIZE).update(proposal.dataToMoveInMB());
        }
    }

    private Map<Integer, List<ExecutionProposal>> perBrokerProposals(Collection<ExecutionProposal> proposals) {
        HashMap<Integer, List<ExecutionProposal>> proposalsPerBroker = new HashMap<Integer, List<ExecutionProposal>>();
        for (ExecutionProposal proposal : proposals) {
            if (proposal.dataToMoveInMB() <= 0L) continue;
            List outboundExecutionProposals = proposalsPerBroker.computeIfAbsent(proposal.oldLeader().brokerId(), x -> new ArrayList());
            outboundExecutionProposals.add(proposal);
            List inboundExecutionProposals = proposalsPerBroker.computeIfAbsent(proposal.newLeader().brokerId(), x -> new ArrayList());
            inboundExecutionProposals.add(proposal);
        }
        return proposalsPerBroker;
    }
}

