/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.CacheException;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.topology.CacheTopologyHandler;
import org.infinispan.topology.LocalCacheStatus;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class LocalTopologyManagerImpl
implements LocalTopologyManager {
    private static Log log = LogFactory.getLog(LocalTopologyManagerImpl.class);
    private Transport transport;
    private ExecutorService asyncTransportExecutor;
    private GlobalComponentRegistry gcr;
    private ConcurrentMap<String, LocalCacheStatus> runningCaches = ConcurrentMapFactory.makeConcurrentMap();
    private volatile boolean running;

    @Inject
    public void inject(Transport transport, @ComponentName(value="org.infinispan.executors.transport") ExecutorService asyncTransportExecutor, GlobalComponentRegistry gcr) {
        this.transport = transport;
        this.asyncTransportExecutor = asyncTransportExecutor;
        this.gcr = gcr;
    }

    @Start(priority=100)
    public void start() {
        this.running = true;
    }

    @Stop(priority=9)
    public void stop() {
        this.running = false;
    }

    @Override
    public CacheTopology join(String cacheName, CacheJoinInfo joinInfo, CacheTopologyHandler stm) throws Exception {
        log.debugf("Node %s joining cache %s", this.transport.getAddress(), cacheName);
        LocalCacheStatus cacheStatus = new LocalCacheStatus(joinInfo, stm);
        this.runningCaches.put(cacheName, cacheStatus);
        int viewId = this.transport.getViewId();
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.JOIN, this.transport.getAddress(), joinInfo, viewId);
        long timeout = joinInfo.getTimeout();
        long endTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
        while (true) {
            try {
                CacheTopology initialTopology;
                while ((initialTopology = (CacheTopology)this.executeOnCoordinator(command, timeout)) == null) {
                }
                this.handleConsistentHashUpdate(cacheName, initialTopology, viewId);
                return initialTopology;
            }
            catch (Exception e) {
                log.debugf(e, "Error sending join request for cache %s to coordinator", cacheName);
                if (endTime <= System.nanoTime()) {
                    throw e;
                }
                Thread.sleep(1000L);
                continue;
            }
            break;
        }
    }

    @Override
    public void leave(String cacheName) {
        log.debugf("Node %s leaving cache %s", this.transport.getAddress(), cacheName);
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.remove(cacheName);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.LEAVE, this.transport.getAddress(), this.transport.getViewId());
        try {
            this.executeOnCoordinator(command, cacheStatus.getJoinInfo().getTimeout());
        }
        catch (Exception e) {
            log.debugf(e, "Error sending the leave request for cache %s to coordinator", cacheName);
        }
    }

    @Override
    public void confirmRebalance(String cacheName, int topologyId, Throwable throwable) {
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.REBALANCE_CONFIRM, this.transport.getAddress(), topologyId, throwable, this.transport.getViewId());
        try {
            this.executeOnCoordinatorAsync(command);
        }
        catch (Exception e) {
            log.debugf(e, "Error sending the rebalance completed notification for cache %s to the coordinator", cacheName);
        }
    }

    @Override
    public Map<String, Object[]> handleStatusRequest(int viewId) {
        HashMap<String, Object[]> response = new HashMap<String, Object[]>();
        for (Map.Entry e : this.runningCaches.entrySet()) {
            String cacheName = (String)e.getKey();
            LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
            response.put((String)e.getKey(), new Object[]{cacheStatus.getJoinInfo(), cacheStatus.getTopology()});
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleConsistentHashUpdate(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException {
        if (!this.running) {
            log.debugf("Ignoring consistent hash update %s for cache %s, the local cache manager is not running", cacheTopology.getTopologyId(), cacheName);
            return;
        }
        this.waitForView(viewId);
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring consistent hash update %s for cache %s that doesn't exist locally", cacheTopology.getTopologyId(), cacheName);
            return;
        }
        LocalCacheStatus localCacheStatus = cacheStatus;
        synchronized (localCacheStatus) {
            if (cacheStatus.getTopology() != null && cacheStatus.getTopology().getTopologyId() > cacheTopology.getTopologyId()) {
                log.tracef("Ignoring consistent hash update %s for cache %s, we have already received a newer topology %s", cacheTopology.getTopologyId(), cacheName, cacheStatus.getTopology().getTopologyId());
                return;
            }
            log.debugf("Updating local consistent hash(es) for cache %s: new topology = %s", cacheName, cacheTopology);
            cacheStatus.setTopology(cacheTopology);
            ConsistentHash unionCH = null;
            if (cacheTopology.getPendingCH() != null) {
                unionCH = cacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
            }
            CacheTopologyHandler handler = cacheStatus.getHandler();
            CacheTopology unionTopology = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), unionCH);
            handler.updateConsistentHash(unionTopology);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException {
        if (!this.running) {
            log.debugf("Ignoring rebalance request %s for cache %s, the local cache manager is not running", cacheTopology.getTopologyId(), cacheName);
            return;
        }
        this.waitForView(viewId);
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring rebalance %s for cache %s that doesn't exist locally", cacheTopology.getTopologyId(), cacheName);
            return;
        }
        LocalCacheStatus localCacheStatus = cacheStatus;
        synchronized (localCacheStatus) {
            CacheTopology existingTopology = cacheStatus.getTopology();
            if (existingTopology != null && cacheTopology.getTopologyId() < existingTopology.getTopologyId()) {
                log.debugf("Ignoring old rebalance for cache %s: %s", cacheName, cacheTopology.getTopologyId());
                return;
            }
            log.debugf("Starting local rebalance for cache %s, topology = %s", cacheName, cacheTopology);
            cacheStatus.setTopology(cacheTopology);
        }
        ConsistentHash unionCH = cacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
        CacheTopologyHandler handler = cacheStatus.getHandler();
        handler.rebalance(new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), unionCH));
    }

    @Override
    public CacheTopology getCacheTopology(String cacheName) {
        LocalCacheStatus cacheStatus = (LocalCacheStatus)this.runningCaches.get(cacheName);
        return cacheStatus.getTopology();
    }

    private void waitForView(int viewId) throws InterruptedException {
        if (this.transport.getViewId() < viewId) {
            log.tracef("Received a cache topology command with a higher view id: %s, our view id is %s", viewId, this.transport.getViewId());
        }
        while (this.transport.getViewId() < viewId) {
            Thread.sleep(100L);
        }
    }

    private Object executeOnCoordinator(ReplicableCommand command, long timeout) throws Exception {
        Response response;
        if (this.transport.isCoordinator()) {
            try {
                this.gcr.wireDependencies(command);
                response = (Response)command.perform(null);
            }
            catch (Throwable t) {
                throw new CacheException("Error handling join request", t);
            }
        } else {
            Address coordinator = this.transport.getCoordinator();
            Map<Address, Response> responseMap = this.transport.invokeRemotely(Collections.singleton(coordinator), command, ResponseMode.SYNCHRONOUS, timeout, true, null);
            response = responseMap.get(coordinator);
        }
        if (response == null || !response.isSuccessful()) {
            Exception exception = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
            throw new CacheException("Bad response received from coordinator: " + response, exception);
        }
        return ((SuccessfulResponse)response).getResponseValue();
    }

    private void executeOnCoordinatorAsync(final ReplicableCommand command) throws Exception {
        if (this.transport.isCoordinator()) {
            this.asyncTransportExecutor.submit(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    LocalTopologyManagerImpl.this.gcr.wireDependencies(command);
                    try {
                        return command.perform(null);
                    }
                    catch (Throwable t) {
                        log.errorf(t, "Failed to execute ReplicableCommand %s on coordinator async: %s", command, t.getMessage());
                        throw new Exception(t);
                    }
                }
            });
        } else {
            Address coordinator = this.transport.getCoordinator();
            this.transport.invokeRemotely(Collections.singleton(coordinator), command, ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING, 0L, true, null);
        }
    }
}

