package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/impl/AvgShedder.class */
public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrategy {
    private static final Logger log = LoggerFactory.getLogger(AvgShedder.class);
    private final Map<BundleData, String> bundleBrokerMap = new HashMap();
    private final Map<String, Double> brokerScoreMap = new HashMap();
    private final Map<String, MutableInt> brokerHitCountForHigh = new HashMap();
    private final Map<String, MutableInt> brokerHitCountForLow = new HashMap();
    private static final double MB = 1048576.0d;

    @Override // org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration serviceConfiguration) {
        ArrayListMultimap create = ArrayListMultimap.create();
        double minUnloadMessageThroughput = serviceConfiguration.getMinUnloadMessageThroughput();
        double minUnloadMessage = serviceConfiguration.getMinUnloadMessage();
        double maxUnloadPercentage = serviceConfiguration.getMaxUnloadPercentage();
        double loadBalancerAvgShedderLowThreshold = serviceConfiguration.getLoadBalancerAvgShedderLowThreshold();
        double loadBalancerAvgShedderHighThreshold = serviceConfiguration.getLoadBalancerAvgShedderHighThreshold();
        int loadBalancerAvgShedderHitCountHighThreshold = serviceConfiguration.getLoadBalancerAvgShedderHitCountHighThreshold();
        int loadBalancerAvgShedderHitCountLowThreshold = serviceConfiguration.getLoadBalancerAvgShedderHitCountLowThreshold();
        if (log.isDebugEnabled()) {
            log.debug("highThreshold:{}, lowThreshold:{}, hitCountHighThreshold:{}, hitCountLowThreshold:{}, minMsgThreshold:{}, minThroughputThreshold:{}", new Object[]{Double.valueOf(loadBalancerAvgShedderHighThreshold), Double.valueOf(loadBalancerAvgShedderLowThreshold), Integer.valueOf(loadBalancerAvgShedderHitCountHighThreshold), Integer.valueOf(loadBalancerAvgShedderHitCountLowThreshold), Double.valueOf(minUnloadMessage), Double.valueOf(minUnloadMessageThroughput)});
        }
        List<String> calculateScoresAndSort = calculateScoresAndSort(loadData, serviceConfiguration);
        log.info("sorted broker list:{}", calculateScoresAndSort);
        List<Pair<String, String>> findBrokerPairs = findBrokerPairs(calculateScoresAndSort, loadBalancerAvgShedderLowThreshold, loadBalancerAvgShedderHighThreshold);
        log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}", this.brokerHitCountForHigh, this.brokerHitCountForLow);
        if (findBrokerPairs.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("there is no any overload broker, no need to shedding bundles.");
            }
            this.brokerHitCountForHigh.clear();
            this.brokerHitCountForLow.clear();
            return create;
        }
        for (Pair<String, String> pair : findBrokerPairs) {
            String str = (String) pair.getRight();
            String str2 = (String) pair.getLeft();
            if (this.brokerHitCountForHigh.computeIfAbsent(str2, str3 -> {
                return new MutableInt(0);
            }).intValue() >= loadBalancerAvgShedderHitCountHighThreshold || this.brokerHitCountForHigh.computeIfAbsent(str, str4 -> {
                return new MutableInt(0);
            }).intValue() >= loadBalancerAvgShedderHitCountHighThreshold || this.brokerHitCountForLow.computeIfAbsent(str2, str5 -> {
                return new MutableInt(0);
            }).intValue() >= loadBalancerAvgShedderHitCountLowThreshold || this.brokerHitCountForLow.computeIfAbsent(str, str6 -> {
                return new MutableInt(0);
            }).intValue() >= loadBalancerAvgShedderHitCountLowThreshold) {
                this.brokerHitCountForHigh.remove(str2);
                this.brokerHitCountForHigh.remove(str);
                this.brokerHitCountForLow.remove(str2);
                this.brokerHitCountForLow.remove(str);
                selectBundleForUnloading(loadData, str, str2, minUnloadMessageThroughput, minUnloadMessage, maxUnloadPercentage, create);
            }
        }
        return create;
    }

    private void selectBundleForUnloading(LoadData loadData, String str, String str2, double d, double d2, double d3, Multimap<String, String> multimap) {
        boolean z;
        LocalBrokerData localData = loadData.getBrokerData().get(str2).getLocalData();
        LocalBrokerData localData2 = loadData.getBrokerData().get(str).getLocalData();
        double msgRateIn = localData.getMsgRateIn() + localData.getMsgRateOut();
        double msgRateIn2 = localData2.getMsgRateIn() + localData2.getMsgRateOut();
        double msgThroughputIn = localData.getMsgThroughputIn() + localData.getMsgThroughputOut();
        double msgThroughputIn2 = localData2.getMsgThroughputIn() + localData2.getMsgThroughputOut();
        double d4 = (msgRateIn2 - msgRateIn) * d3;
        double d5 = (msgThroughputIn2 - msgThroughputIn) * d3;
        MutableDouble mutableDouble = new MutableDouble(0.0d);
        if (d4 > d2) {
            z = true;
            mutableDouble.setValue(d4);
        } else if (d5 <= d) {
            log.info("broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.", new Object[]{str, str2, Double.valueOf(d5 / MB), Double.valueOf(d / MB), Double.valueOf(d4), Double.valueOf(d2)});
            return;
        } else {
            z = false;
            mutableDouble.setValue(d5);
        }
        if (localData2.getBundles().size() == 1) {
            log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. No Load Shedding will be done on this broker", localData2.getBundles().iterator().next(), str);
        } else if (localData2.getBundles().isEmpty()) {
            log.warn("Broker {} is overloaded despite having no bundles", str);
        }
        log.info("broker:[{}] is planning to shed bundles to broker:[{}]. maxBroker stat:scores:{}, throughput:{}, msgRate:{}. minBroker stat:scores:{}, throughput:{}, msgRate:{}. isMsgRateToOffload:{},  trafficMarkedToOffload:{}", new Object[]{str, str2, this.brokerScoreMap.get(str), Double.valueOf(msgThroughputIn2), Double.valueOf(msgRateIn2), this.brokerScoreMap.get(str2), Double.valueOf(msgThroughputIn), Double.valueOf(msgRateIn), Boolean.valueOf(z), mutableDouble});
        boolean z2 = z;
        boolean z3 = z;
        loadData.getBundleDataForLoadShedding().entrySet().stream().filter(entry -> {
            return localData2.getBundles().contains(entry.getKey());
        }).filter(entry2 -> {
            return !loadData.getRecentlyUnloadedBundles().containsKey(entry2.getKey());
        }).map(entry3 -> {
            TimeAverageMessageData shortTermData = ((BundleData) entry3.getValue()).getShortTermData();
            return Pair.of(entry3, Double.valueOf(z2 ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut()));
        }).sorted((pair, pair2) -> {
            return Double.compare(((Double) pair2.getRight()).doubleValue(), ((Double) pair.getRight()).doubleValue());
        }).forEach(pair3 -> {
            Map.Entry entry4 = (Map.Entry) pair3.getLeft();
            double doubleValue = ((Double) pair3.getRight()).doubleValue();
            if (doubleValue <= 0.0d || doubleValue > mutableDouble.getValue().doubleValue()) {
                return;
            }
            multimap.put(str, (String) entry4.getKey());
            this.bundleBrokerMap.put((BundleData) entry4.getValue(), str2);
            mutableDouble.add(-doubleValue);
            if (log.isDebugEnabled()) {
                log.debug("Found bundle to unload:{}, isMsgRateToOffload:{}, traffic:{}", new Object[]{entry4, Boolean.valueOf(z3), Double.valueOf(doubleValue)});
            }
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy, org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy
    public void onActiveBrokersChange(Set<String> set) {
        super.onActiveBrokersChange(set);
    }

    private List<String> calculateScoresAndSort(LoadData loadData, ServiceConfiguration serviceConfiguration) {
        this.brokerScoreMap.clear();
        for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
            LocalBrokerData localData = entry.getValue().getLocalData();
            String key = entry.getKey();
            Double calculateScores = calculateScores(localData, serviceConfiguration);
            this.brokerScoreMap.put(key, calculateScores);
            if (log.isDebugEnabled()) {
                log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", new Object[]{key, calculateScores, Double.valueOf(localData.getMsgThroughputIn() + localData.getMsgThroughputOut()), Double.valueOf(localData.getMsgRateIn() + localData.getMsgRateOut())});
            }
        }
        return this.brokerScoreMap.entrySet().stream().sorted((entry2, entry3) -> {
            return (int) (((Double) entry2.getValue()).doubleValue() - ((Double) entry3.getValue()).doubleValue());
        }).map((v0) -> {
            return v0.getKey();
        }).toList();
    }

    private Double calculateScores(LocalBrokerData localBrokerData, ServiceConfiguration serviceConfiguration) {
        return Double.valueOf(localBrokerData.getMaxResourceUsageWithWeight(serviceConfiguration.getLoadBalancerCPUResourceWeight(), serviceConfiguration.getLoadBalancerDirectMemoryResourceWeight(), serviceConfiguration.getLoadBalancerBandwithInResourceWeight(), serviceConfiguration.getLoadBalancerBandwithOutResourceWeight()) * 100.0d);
    }

    private List<Pair<String, String>> findBrokerPairs(List<String> list, double d, double d2) {
        LinkedList linkedList = new LinkedList();
        int i = 0;
        for (int size = list.size() - 1; i <= size; size--) {
            String str = list.get(size);
            String str2 = list.get(i);
            if (this.brokerScoreMap.get(str).doubleValue() - this.brokerScoreMap.get(str2).doubleValue() < d) {
                this.brokerHitCountForHigh.remove(str);
                this.brokerHitCountForHigh.remove(str2);
                this.brokerHitCountForLow.remove(str);
                this.brokerHitCountForLow.remove(str2);
            } else {
                linkedList.add(Pair.of(str2, str));
                if (this.brokerScoreMap.get(str).doubleValue() - this.brokerScoreMap.get(str2).doubleValue() < d2) {
                    this.brokerHitCountForLow.computeIfAbsent(str2, str3 -> {
                        return new MutableInt(0);
                    }).increment();
                    this.brokerHitCountForLow.computeIfAbsent(str, str4 -> {
                        return new MutableInt(0);
                    }).increment();
                    this.brokerHitCountForHigh.remove(str);
                    this.brokerHitCountForHigh.remove(str2);
                } else {
                    this.brokerHitCountForLow.computeIfAbsent(str2, str5 -> {
                        return new MutableInt(0);
                    }).increment();
                    this.brokerHitCountForLow.computeIfAbsent(str, str6 -> {
                        return new MutableInt(0);
                    }).increment();
                    this.brokerHitCountForHigh.computeIfAbsent(str2, str7 -> {
                        return new MutableInt(0);
                    }).increment();
                    this.brokerHitCountForHigh.computeIfAbsent(str, str8 -> {
                        return new MutableInt(0);
                    }).increment();
                }
            }
            i++;
        }
        return linkedList;
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy
    public Optional<String> selectBroker(Set<String> set, BundleData bundleData, LoadData loadData, ServiceConfiguration serviceConfiguration) {
        String orDefault = this.bundleBrokerMap.getOrDefault(bundleData, null);
        if (orDefault != null && set.contains(this.bundleBrokerMap.get(bundleData))) {
            return Optional.of(orDefault);
        }
        if (log.isDebugEnabled()) {
            if (this.bundleBrokerMap.containsKey(bundleData)) {
                log.debug("expected broker:{} is shutdown, candidates:{}", this.bundleBrokerMap.get(bundleData), set);
            } else {
                log.debug("cluster is initializing");
            }
        }
        String expectedBroker = getExpectedBroker(set, bundleData);
        this.bundleBrokerMap.put(bundleData, expectedBroker);
        return Optional.of(expectedBroker);
    }

    private static String getExpectedBroker(Collection<String> collection, BundleData bundleData) {
        ArrayList arrayList = new ArrayList(collection);
        Collections.sort(arrayList);
        try {
            long padToLong = Hashing.crc32().hashString(String.valueOf(new Random().nextInt()), StandardCharsets.UTF_8).padToLong();
            int abs = (int) (Math.abs(padToLong) % arrayList.size());
            if (log.isDebugEnabled()) {
                log.debug("Assignment details: brokers={}, bundle={}, hashcode={}, index={}", new Object[]{arrayList, bundleData, Long.valueOf(padToLong), Integer.valueOf(abs)});
            }
            return (String) arrayList.get(abs);
        } catch (Throwable th) {
            log.error("Bundle format of {} is invalid", bundleData, th);
            return (String) arrayList.get(Math.abs(bundleData.hashCode()) % arrayList.size());
        }
    }
}
