/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.topology.RebalancePolicy;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class DefaultRebalancePolicy
implements RebalancePolicy {
    private static Log log = LogFactory.getLog(DefaultRebalancePolicy.class);
    private Transport transport;
    private ClusterTopologyManager clusterTopologyManager;
    private ExecutorService asyncTransportExecutor;
    private GlobalConfiguration globalConfiguration;
    private volatile List<Address> clusterMembers;
    private final ConcurrentMap<String, CacheStatus> cacheStatusMap = ConcurrentMapFactory.makeConcurrentMap();

    @Inject
    public void inject(Transport transport, ClusterTopologyManager clusterTopologyManager, @ComponentName(value="org.infinispan.executors.transport") ExecutorService asyncTransportExecutor, GlobalConfiguration globalConfiguration) {
        this.transport = transport;
        this.clusterTopologyManager = clusterTopologyManager;
        this.asyncTransportExecutor = asyncTransportExecutor;
        this.globalConfiguration = globalConfiguration;
    }

    @Start(priority=99)
    public void start() {
        this.clusterMembers = this.transport.getMembers();
    }

    @Override
    public void initCache(String cacheName, CacheJoinInfo joinInfo) throws Exception {
        log.tracef("Initializing rebalance policy for cache %s", cacheName);
        this.cacheStatusMap.putIfAbsent(cacheName, new CacheStatus(joinInfo));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void initCache(String cacheName, List<CacheTopology> partitionTopologies) throws Exception {
        log.tracef("Initializing rebalance policy for cache %s, pre-existing partitions are %s", cacheName, partitionTopologies);
        CacheStatus cacheStatus = (CacheStatus)this.cacheStatusMap.get(cacheName);
        if (partitionTopologies.isEmpty()) {
            return;
        }
        int unionTopologyId = 0;
        ConsistentHash currentCHUnion = null;
        ConsistentHash pendingCHUnion = null;
        ConsistentHashFactory chFactory = cacheStatus.getJoinInfo().getConsistentHashFactory();
        for (CacheTopology topology : partitionTopologies) {
            if (topology.getTopologyId() > unionTopologyId) {
                unionTopologyId = topology.getTopologyId();
            }
            currentCHUnion = currentCHUnion == null ? topology.getCurrentCH() : chFactory.union(currentCHUnion, topology.getCurrentCH());
            if (pendingCHUnion == null) {
                pendingCHUnion = topology.getPendingCH();
                continue;
            }
            if (topology.getPendingCH() == null) continue;
            pendingCHUnion = chFactory.union(pendingCHUnion, topology.getPendingCH());
        }
        CacheStatus cacheStatus2 = cacheStatus;
        synchronized (cacheStatus2) {
            CacheTopology cacheTopology = new CacheTopology(unionTopologyId, currentCHUnion, pendingCHUnion);
            this.updateConsistentHash(cacheName, cacheStatus, cacheTopology, true);
        }
    }

    private void updateConsistentHash(String cacheName, CacheStatus cacheStatus, CacheTopology cacheTopology, boolean broadcast) throws Exception {
        log.tracef("Updating cache %s topology: %s", cacheName, cacheTopology);
        cacheStatus.setCacheTopology(cacheTopology);
        ConsistentHash currentCH = cacheTopology.getCurrentCH();
        if (currentCH != null) {
            cacheStatus.getJoiners().removeAll(currentCH.getMembers());
            log.tracef("Updated joiners list for cache %s: %s", cacheName, cacheStatus.getJoiners());
        }
        if (broadcast) {
            this.clusterTopologyManager.updateConsistentHash(cacheName, cacheStatus.getCacheTopology());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateMembersList(List<Address> newClusterMembers) throws Exception {
        this.clusterMembers = newClusterMembers;
        log.tracef("Updating cluster members for all the caches. New list is %s", newClusterMembers);
        for (Map.Entry e : this.cacheStatusMap.entrySet()) {
            CacheStatus cacheStatus;
            String cacheName = (String)e.getKey();
            CacheStatus cacheStatus2 = cacheStatus = (CacheStatus)e.getValue();
            synchronized (cacheStatus2) {
                boolean pendingMembersValid;
                ConsistentHash currentCH = cacheStatus.getCacheTopology().getCurrentCH();
                if (currentCH == null) {
                    continue;
                }
                ConsistentHash pendingCH = cacheStatus.getCacheTopology().getPendingCH();
                boolean currentMembersValid = newClusterMembers.containsAll(currentCH.getMembers());
                boolean bl = pendingMembersValid = pendingCH == null || newClusterMembers.containsAll(pendingCH.getMembers());
                if (!currentMembersValid || !pendingMembersValid) {
                    ArrayList<Address> newCurrentMembers = new ArrayList<Address>(currentCH.getMembers());
                    newCurrentMembers.retainAll(newClusterMembers);
                    this.updateCacheMembers(cacheName, cacheStatus, newCurrentMembers);
                }
                if (!this.isBalanced(cacheStatus.getCacheTopology().getCurrentCH()) || !cacheStatus.getJoiners().isEmpty()) {
                    this.triggerRebalance(cacheName, cacheStatus);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CacheTopology addJoiners(String cacheName, List<Address> joiners) throws Exception {
        CacheStatus cacheStatus = (CacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring members update for cache %s, as we haven't initialized it yet", cacheName);
            return null;
        }
        CacheStatus cacheStatus2 = cacheStatus;
        synchronized (cacheStatus2) {
            this.addUniqueJoiners(cacheStatus.getJoiners(), joiners);
            ConsistentHash currentCH = cacheStatus.getCacheTopology().getCurrentCH();
            if (currentCH == null) {
                this.installInitialTopology(cacheName, cacheStatus);
            } else {
                this.triggerRebalance(cacheName, cacheStatus);
            }
            return cacheStatus.getCacheTopology();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeLeavers(String cacheName, List<Address> leavers) throws Exception {
        CacheStatus cacheStatus = (CacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring members update for cache %s, as we haven't initialized it yet", cacheName);
            return;
        }
        CacheStatus cacheStatus2 = cacheStatus;
        synchronized (cacheStatus2) {
            ArrayList<Address> newMembers = new ArrayList<Address>(this.clusterMembers);
            newMembers.removeAll(leavers);
            this.updateCacheMembers(cacheName, cacheStatus, newMembers);
        }
    }

    private void updateCacheMembers(String cacheName, CacheStatus cacheStatus, List<Address> newMembers) throws Exception {
        ConsistentHash newCurrentCH;
        CacheJoinInfo joinInfo = cacheStatus.getJoinInfo();
        int topologyId = cacheStatus.getCacheTopology().getTopologyId();
        ConsistentHash currentCH = cacheStatus.getCacheTopology().getCurrentCH();
        ConsistentHash pendingCH = cacheStatus.getCacheTopology().getPendingCH();
        ConsistentHash newPendingCH = null;
        if (pendingCH != null) {
            newMembers.retainAll(pendingCH.getMembers());
            if (!newMembers.isEmpty()) {
                newPendingCH = joinInfo.getConsistentHashFactory().updateMembers(pendingCH, newMembers);
            } else {
                log.tracef("Zero new members remaining for cache %s", cacheName);
            }
        }
        newMembers.retainAll(currentCH.getMembers());
        if (!newMembers.isEmpty()) {
            newCurrentCH = joinInfo.getConsistentHashFactory().updateMembers(currentCH, newMembers);
        } else {
            log.tracef("Zero old members remaining for cache %s", cacheName);
            newCurrentCH = newPendingCH;
        }
        boolean hasMembers = newCurrentCH != null;
        CacheTopology cacheTopology = new CacheTopology(topologyId, newCurrentCH, newPendingCH);
        this.updateConsistentHash(cacheName, cacheStatus, cacheTopology, hasMembers);
        if (hasMembers) {
            this.triggerRebalance(cacheName, cacheStatus);
        }
    }

    private void installInitialTopology(String cacheName, CacheStatus cacheStatus) throws Exception {
        CacheJoinInfo joinInfo = cacheStatus.getJoinInfo();
        int topologyId = cacheStatus.getCacheTopology().getTopologyId();
        Object balancedCH = joinInfo.getConsistentHashFactory().create(joinInfo.getHashFunction(), joinInfo.getNumOwners(), joinInfo.getNumSegments(), cacheStatus.getJoiners());
        int newTopologyId = topologyId + 1;
        CacheTopology cacheTopology = new CacheTopology(newTopologyId, (ConsistentHash)balancedCH, null);
        log.tracef("Installing initial topology for cache %s: %s", cacheName, cacheTopology);
        this.updateConsistentHash(cacheName, cacheStatus, cacheTopology, false);
    }

    private void addUniqueJoiners(List<Address> members, List<Address> joiners) {
        for (Address joiner : joiners) {
            if (members.contains(joiner)) continue;
            members.add(joiner);
        }
    }

    private void triggerRebalance(final String cacheName, final CacheStatus cacheStatus) throws Exception {
        this.asyncTransportExecutor.submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                DefaultRebalancePolicy.this.doRebalance(cacheName, cacheStatus);
                return null;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRebalance(String cacheName, CacheStatus cacheStatus) throws Exception {
        CacheTopology newCacheTopology;
        CacheTopology cacheTopology = cacheStatus.getCacheTopology();
        CacheStatus cacheStatus2 = cacheStatus;
        synchronized (cacheStatus2) {
            ConsistentHash updatedMembersCH;
            boolean isRebalanceInProgress;
            boolean bl = isRebalanceInProgress = cacheTopology.getPendingCH() != null;
            if (isRebalanceInProgress) {
                log.tracef("Ignoring request to rebalance cache %s, there's already a rebalance in progress: %s", cacheName, cacheTopology);
                return;
            }
            ArrayList<Address> newMembers = new ArrayList<Address>(cacheTopology.getMembers());
            if (newMembers.isEmpty()) {
                log.tracef("Ignoring request to rebalance cache %s, it doesn't have any member", cacheName);
                return;
            }
            this.addUniqueJoiners(newMembers, cacheStatus.getJoiners());
            newMembers.retainAll(this.clusterMembers);
            log.tracef("Rebalancing consistent hash for cache %s, members are %s", cacheName, newMembers);
            int newTopologyId = cacheTopology.getTopologyId() + 1;
            ConsistentHash currentCH = cacheTopology.getCurrentCH();
            if (currentCH == null) {
                this.installInitialTopology(cacheName, cacheStatus);
                return;
            }
            ConsistentHashFactory chFactory = cacheStatus.getJoinInfo().getConsistentHashFactory();
            ConsistentHash balancedCH = chFactory.rebalance(updatedMembersCH = chFactory.updateMembers(currentCH, newMembers));
            if (balancedCH.equals(currentCH)) {
                log.tracef("The balanced CH is the same as the current CH, not rebalancing", new Object[0]);
                return;
            }
            newCacheTopology = new CacheTopology(newTopologyId, currentCH, balancedCH);
            log.tracef("Updating cache %s topology for rebalance: %s", cacheName, newCacheTopology);
            cacheStatus.setCacheTopology(newCacheTopology);
        }
        this.clusterTopologyManager.rebalance(cacheName, newCacheTopology);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onRebalanceCompleted(String cacheName, int topologyId) throws Exception {
        CacheStatus cacheStatus;
        log.debugf("Finished cluster-wide rebalance for cache %s, topology id = %d", cacheName, topologyId);
        CacheStatus cacheStatus2 = cacheStatus = (CacheStatus)this.cacheStatusMap.get(cacheName);
        synchronized (cacheStatus2) {
            if (topologyId != cacheStatus.getCacheTopology().getTopologyId()) {
                throw new IllegalStateException(String.format("Invalid cluster-wide rebalance confirmation: received topology id %d, expected %d", topologyId, cacheStatus.getCacheTopology().getTopologyId()));
            }
            int newTopologyId = topologyId + 1;
            ConsistentHash newCurrentCH = cacheStatus.getCacheTopology().getPendingCH();
            CacheTopology cacheTopology = new CacheTopology(newTopologyId, newCurrentCH, null);
            this.updateConsistentHash(cacheName, cacheStatus, cacheTopology, true);
            cacheStatus.getJoiners().removeAll(newCurrentCH.getMembers());
            log.tracef("After rebalance, joiners without state are %s", cacheStatus.getJoiners());
            if (cacheStatus.getJoiners().isEmpty() && this.isBalanced(newCurrentCH)) {
                log.tracef("Consistent hash is now balanced for cache %s", cacheName);
            } else {
                this.triggerRebalance(cacheName, cacheStatus);
            }
        }
    }

    @Override
    public CacheTopology getTopology(String cacheName) {
        return ((CacheStatus)this.cacheStatusMap.get(cacheName)).cacheTopology;
    }

    public boolean isBalanced(ConsistentHash ch) {
        int numSegments = ch.getNumSegments();
        for (int i = 0; i < numSegments; ++i) {
            int actualNumOwners = Math.min(ch.getMembers().size(), ch.getNumOwners());
            if (ch.locateOwnersForSegment(i).size() == actualNumOwners) continue;
            return false;
        }
        return true;
    }

    private static class CacheStatus {
        private final CacheJoinInfo joinInfo;
        private final List<Address> joiners;
        private CacheTopology cacheTopology;

        public CacheStatus(CacheJoinInfo joinInfo) {
            this.joinInfo = joinInfo;
            this.cacheTopology = new CacheTopology(-1, null, null);
            this.joiners = new ArrayList<Address>();
        }

        public CacheJoinInfo getJoinInfo() {
            return this.joinInfo;
        }

        public List<Address> getJoiners() {
            return this.joiners;
        }

        public CacheTopology getCacheTopology() {
            return this.cacheTopology;
        }

        public void setCacheTopology(CacheTopology cacheTopology) {
            this.cacheTopology = cacheTopology;
        }

        public String toString() {
            return "CacheStatus{joinInfo=" + this.joinInfo + ", cacheTopology=" + this.cacheTopology + ", joiners=" + this.joiners + '}';
        }
    }
}

