/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.CacheException;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.topology.RebalancePolicy;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class ClusterTopologyManagerImpl
implements ClusterTopologyManager {
    private static Log log = LogFactory.getLog(ClusterTopologyManagerImpl.class);
    private Transport transport;
    private RebalancePolicy rebalancePolicy;
    private GlobalConfiguration globalConfiguration;
    private GlobalComponentRegistry gcr;
    private CacheManagerNotifier cacheManagerNotifier;
    private ExecutorService asyncTransportExecutor;
    private volatile boolean isCoordinator;
    private volatile boolean isShuttingDown;
    private volatile int viewId = -1;
    private final Object viewUpdateLock = new Object();
    private final ConcurrentMap<String, RebalanceInfo> rebalanceStatusMap = ConcurrentMapFactory.makeConcurrentMap();
    private ClusterViewListener listener;

    @Inject
    public void inject(Transport transport, RebalancePolicy rebalancePolicy, @ComponentName(value="org.infinispan.executors.transport") ExecutorService asyncTransportExecutor, GlobalConfiguration globalConfiguration, GlobalComponentRegistry gcr, CacheManagerNotifier cacheManagerNotifier) {
        this.transport = transport;
        this.rebalancePolicy = rebalancePolicy;
        this.asyncTransportExecutor = asyncTransportExecutor;
        this.globalConfiguration = globalConfiguration;
        this.gcr = gcr;
        this.cacheManagerNotifier = cacheManagerNotifier;
    }

    @Start(priority=100)
    public void start() {
        this.isShuttingDown = false;
        this.isCoordinator = this.transport.isCoordinator();
        this.listener = new ClusterViewListener();
        this.cacheManagerNotifier.addListener(this.listener);
        this.handleNewView(this.transport.getMembers(), false, this.transport.getViewId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Stop(priority=100)
    public void stop() {
        this.isShuttingDown = true;
        this.cacheManagerNotifier.removeListener(this.listener);
        Object object = this.viewUpdateLock;
        synchronized (object) {
            this.viewId = Integer.MAX_VALUE;
            this.viewUpdateLock.notifyAll();
        }
    }

    @Override
    public void updateConsistentHash(String cacheName, CacheTopology cacheTopology) throws Exception {
        List<Address> members;
        log.debugf("Updating cluster-wide consistent hash for cache %s, topology = %s", cacheName, cacheTopology);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.CH_UPDATE, this.transport.getAddress(), cacheTopology, this.transport.getViewId());
        this.executeOnClusterSync(command, this.getGlobalTimeout());
        RebalanceInfo rebalanceInfo = (RebalanceInfo)this.rebalanceStatusMap.get(cacheName);
        if (rebalanceInfo != null && rebalanceInfo.updateMembers(members = cacheTopology.getMembers())) {
            this.onClusterRebalanceCompleted(cacheName, cacheTopology.getTopologyId(), rebalanceInfo);
        }
    }

    private void onClusterRebalanceCompleted(String cacheName, int topologyId, RebalanceInfo rebalanceInfo) throws Exception {
        log.debugf("Removing rebalance information for topology id %d", topologyId);
        this.rebalanceStatusMap.remove(cacheName);
        this.rebalancePolicy.onRebalanceCompleted(cacheName, topologyId);
    }

    @Override
    public void rebalance(String cacheName, CacheTopology cacheTopology) throws Exception {
        log.debugf("Starting cluster-wide rebalance for cache %s, topology = %s", cacheName, cacheTopology);
        int topologyId = cacheTopology.getTopologyId();
        List<Address> members = cacheTopology.getPendingCH().getMembers();
        RebalanceInfo existingRebalance = this.rebalanceStatusMap.putIfAbsent(cacheName, new RebalanceInfo(cacheName, topologyId, members));
        if (existingRebalance != null) {
            throw new IllegalStateException("Aborting the current rebalance, there is another operation in progress: " + existingRebalance);
        }
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.REBALANCE_START, this.transport.getAddress(), cacheTopology, this.viewId);
        this.executeOnClusterAsync(command);
    }

    @Override
    public CacheTopology handleJoin(String cacheName, Address joiner, CacheJoinInfo joinInfo, int viewId) throws Exception {
        this.waitForView(viewId);
        if (this.isShuttingDown) {
            log.debugf("Ignoring join request from %s for cache %s, the local cache manager is shutting down", joiner, cacheName);
            return null;
        }
        this.rebalancePolicy.initCache(cacheName, joinInfo);
        return this.rebalancePolicy.addJoiners(cacheName, Collections.singletonList(joiner));
    }

    @Override
    public void handleLeave(String cacheName, Address leaver, int viewId) throws Exception {
        if (this.isShuttingDown) {
            log.debugf("Ignoring leave request from %s for cache %s, the local cache manager is shutting down", leaver, cacheName);
            return;
        }
        this.rebalancePolicy.removeLeavers(cacheName, Collections.singletonList(leaver));
    }

    @Override
    public void handleRebalanceCompleted(String cacheName, Address node, int topologyId, Throwable throwable, int viewId) throws Exception {
        if (throwable != null) {
            log.rebalanceError(cacheName, node, throwable);
        }
        log.debugf("Finished local rebalance for cache %s on node %s, topology id = %d", cacheName, node, topologyId);
        RebalanceInfo rebalanceInfo = (RebalanceInfo)this.rebalanceStatusMap.get(cacheName);
        if (rebalanceInfo == null) {
            throw new CacheException(String.format("Received invalid rebalance confirmation from %s for cache %s, we don't have a rebalance in progress", node, cacheName));
        }
        if (rebalanceInfo.confirmRebalance(node, topologyId)) {
            this.onClusterRebalanceCompleted(cacheName, topologyId, rebalanceInfo);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForView(int viewId) throws InterruptedException {
        if (this.viewId < viewId) {
            log.tracef("Received a cache topology command with a higher view id: %s, our view id is %s", viewId, this.viewId);
        }
        Object object = this.viewUpdateLock;
        synchronized (object) {
            while (this.viewId < viewId) {
                this.viewUpdateLock.wait(1000L);
            }
        }
    }

    private Map<Address, Object> executeOnClusterSync(final ReplicableCommand command, final int timeout) throws Exception {
        Future<Map<Address, Response>> remoteFuture = this.asyncTransportExecutor.submit(new Callable<Map<Address, Response>>(){

            @Override
            public Map<Address, Response> call() throws Exception {
                return ClusterTopologyManagerImpl.this.transport.invokeRemotely(null, command, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, true, null);
            }
        });
        Future<Object> localFuture = this.asyncTransportExecutor.submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                ClusterTopologyManagerImpl.this.gcr.wireDependencies(command);
                try {
                    return command.perform(null);
                }
                catch (Throwable t) {
                    throw new Exception(t);
                }
            }
        });
        Map<Address, Response> responseMap = remoteFuture.get(timeout, TimeUnit.MILLISECONDS);
        HashMap<Address, Object> responseValues = new HashMap<Address, Object>(this.transport.getMembers().size());
        for (Map.Entry<Address, Response> entry : responseMap.entrySet()) {
            Address address = entry.getKey();
            Response response = entry.getValue();
            if (!response.isSuccessful()) {
                Exception cause = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
                throw new CacheException("Unsuccessful response received from node " + address + ": " + response, cause);
            }
            responseValues.put(address, ((SuccessfulResponse)response).getResponseValue());
        }
        Response localResponse = (Response)localFuture.get(timeout, TimeUnit.MILLISECONDS);
        if (!localResponse.isSuccessful()) {
            throw new CacheException("Unsuccessful local response");
        }
        responseValues.put(this.transport.getAddress(), ((SuccessfulResponse)localResponse).getResponseValue());
        return responseValues;
    }

    private void executeOnClusterAsync(final ReplicableCommand command) throws Exception {
        this.transport.invokeRemotely(null, command, ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING, -1L, true, null);
        this.asyncTransportExecutor.submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                ClusterTopologyManagerImpl.this.gcr.wireDependencies(command);
                try {
                    return command.perform(null);
                }
                catch (Throwable t) {
                    throw new Exception(t);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleNewView(List<Address> newMembers, boolean mergeView, int newViewId) {
        if (newViewId <= this.viewId) {
            log.tracef("Ignoring old cluster view notification: %s", newViewId);
            return;
        }
        log.tracef("Received new cluster view: %s", newViewId);
        boolean becameCoordinator = !this.isCoordinator && this.transport.isCoordinator();
        this.isCoordinator = this.transport.isCoordinator();
        if (mergeView || becameCoordinator) {
            try {
                HashMap<String, List<CacheTopology>> clusterCacheMap = this.recoverClusterStatus();
                for (Map.Entry e : clusterCacheMap.entrySet()) {
                    String cacheName = (String)e.getKey();
                    List topologyList = (List)e.getValue();
                    this.rebalancePolicy.initCache(cacheName, topologyList);
                }
            }
            catch (InterruptedException e) {
                log.tracef("Cluster state recovery interrupted because the coordinator is shutting down", new Object[0]);
                return;
            }
            catch (Exception e) {
                log.failedToRecoverClusterState(e);
            }
        } else if (this.isCoordinator) {
            try {
                this.rebalancePolicy.updateMembersList(newMembers);
            }
            catch (Exception e) {
                log.errorUpdatingMembersList(e);
            }
        }
        Object object = this.viewUpdateLock;
        synchronized (object) {
            this.viewId = newViewId;
            this.viewUpdateLock.notifyAll();
        }
    }

    private HashMap<String, List<CacheTopology>> recoverClusterStatus() throws Exception {
        log.debugf("Recovering running caches in the cluster", new Object[0]);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(null, CacheTopologyControlCommand.Type.GET_STATUS, this.transport.getAddress(), this.transport.getViewId());
        Map<Address, Object> statusResponses = this.executeOnClusterSync(command, this.getGlobalTimeout());
        HashMap<String, List<CacheTopology>> clusterCacheMap = new HashMap<String, List<CacheTopology>>();
        for (Object o : statusResponses.values()) {
            Map nodeStatus = (Map)o;
            for (Map.Entry e : nodeStatus.entrySet()) {
                String cacheName = (String)e.getKey();
                CacheJoinInfo joinInfo = (CacheJoinInfo)((Object[])e.getValue())[0];
                CacheTopology cacheTopology = (CacheTopology)((Object[])e.getValue())[1];
                List<CacheTopology> topologyList = clusterCacheMap.get(cacheName);
                if (topologyList == null) {
                    this.rebalancePolicy.initCache(cacheName, joinInfo);
                    topologyList = new ArrayList<CacheTopology>();
                    clusterCacheMap.put(cacheName, topologyList);
                }
                topologyList.add(cacheTopology);
            }
        }
        return clusterCacheMap;
    }

    private int getGlobalTimeout() {
        return (int)this.globalConfiguration.transport().distributedSyncTimeout();
    }

    private static class RebalanceInfo {
        private final String cacheName;
        private final int topologyId;
        private final Set<Address> confirmationsNeeded;

        public RebalanceInfo(String cacheName, int topologyId, Collection<Address> members) {
            this.cacheName = cacheName;
            this.topologyId = topologyId;
            this.confirmationsNeeded = new HashSet<Address>(members);
            log.tracef("Initialized rebalance confirmation collector %d, initial list is %s", topologyId, this.confirmationsNeeded);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean confirmRebalance(Address node, int receivedTopologyId) {
            RebalanceInfo rebalanceInfo = this;
            synchronized (rebalanceInfo) {
                if (this.topologyId != receivedTopologyId) {
                    throw new CacheException(String.format("Received invalid rebalance confirmation from %s for cache %s, expecting topology id %d but got %d", node, this.cacheName, this.topologyId, receivedTopologyId));
                }
                boolean removed = this.confirmationsNeeded.remove(node);
                if (!removed) {
                    log.tracef("Rebalance confirmation collector %d ignored confirmation for %s, which is not a member", this.topologyId, node);
                    return false;
                }
                log.tracef("Rebalance confirmation collector %d received confirmation for %s, remaining list is %s", this.topologyId, node, this.confirmationsNeeded);
                return this.confirmationsNeeded.isEmpty();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean updateMembers(Collection<Address> newMembers) {
            RebalanceInfo rebalanceInfo = this;
            synchronized (rebalanceInfo) {
                boolean modified = this.confirmationsNeeded.retainAll(newMembers);
                return modified && this.confirmationsNeeded.isEmpty();
            }
        }

        public String toString() {
            return "RebalanceInfo{topologyId=" + this.topologyId + ", confirmationsNeeded=" + this.confirmationsNeeded + '}';
        }
    }

    @Listener(sync=false)
    public class ClusterViewListener {
        @Merged
        @ViewChanged
        public void handleViewChange(ViewChangedEvent e) {
            ClusterTopologyManagerImpl.this.handleNewView(e.getNewMembers(), e.isMergeView(), e.getViewId());
        }
    }
}

