/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.ReplicatedConsistentHashFactory;
import org.infinispan.distribution.ch.TopologyAwareConsistentHashFactory;
import org.infinispan.distribution.group.GroupManager;
import org.infinispan.distribution.group.GroupingConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyHandler;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@MBean(objectName="StateTransferManager", description="Component that handles state transfer")
public class StateTransferManagerImpl
implements StateTransferManager {
    private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private StateConsumer stateConsumer;
    private StateProvider stateProvider;
    private String cacheName;
    private CacheNotifier cacheNotifier;
    private Configuration configuration;
    private GlobalConfiguration globalConfiguration;
    private RpcManager rpcManager;
    private GroupManager groupManager;
    private LocalTopologyManager localTopologyManager;
    private CountDownLatch initialStateTransferComplete = new CountDownLatch(1);

    @Inject
    public void init(StateConsumer stateConsumer, StateProvider stateProvider, Cache cache, CacheNotifier cacheNotifier, Configuration configuration, GlobalConfiguration globalConfiguration, RpcManager rpcManager, GroupManager groupManager, LocalTopologyManager localTopologyManager) {
        this.stateConsumer = stateConsumer;
        this.stateProvider = stateProvider;
        this.cacheName = cache.getName();
        this.cacheNotifier = cacheNotifier;
        this.configuration = configuration;
        this.globalConfiguration = globalConfiguration;
        this.rpcManager = rpcManager;
        this.groupManager = groupManager;
        this.localTopologyManager = localTopologyManager;
    }

    @Override
    @Start(priority=50)
    public void start() throws Exception {
        if (trace) {
            log.tracef("Starting StateTransferManager of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        CacheJoinInfo joinInfo = new CacheJoinInfo(this.pickConsistentHashFactory(), this.configuration.clustering().hash().hash(), this.configuration.clustering().hash().numSegments(), this.configuration.clustering().hash().numOwners(), this.configuration.clustering().stateTransfer().timeout());
        this.localTopologyManager.join(this.cacheName, joinInfo, new CacheTopologyHandler(){

            @Override
            public void updateConsistentHash(CacheTopology cacheTopology) {
                StateTransferManagerImpl.this.doTopologyUpdate(cacheTopology, false);
            }

            @Override
            public void rebalance(CacheTopology cacheTopology) {
                StateTransferManagerImpl.this.doTopologyUpdate(cacheTopology, true);
            }
        });
    }

    private ConsistentHashFactory pickConsistentHashFactory() {
        CacheMode cacheMode;
        ConsistentHashFactory factory = this.configuration.clustering().hash().consistentHashFactory();
        if (factory == null && (cacheMode = this.configuration.clustering().cacheMode()).isClustered()) {
            factory = cacheMode.isDistributed() ? (this.globalConfiguration.transport().hasTopologyInfo() ? new TopologyAwareConsistentHashFactory() : new DefaultConsistentHashFactory()) : new ReplicatedConsistentHashFactory();
        }
        return factory;
    }

    private CacheTopology addGrouping(CacheTopology cacheTopology) {
        if (this.groupManager == null) {
            return cacheTopology;
        }
        ConsistentHash currentCH = cacheTopology.getCurrentCH();
        currentCH = new GroupingConsistentHash(currentCH, this.groupManager);
        ConsistentHash pendingCH = cacheTopology.getPendingCH();
        if (pendingCH != null) {
            pendingCH = new GroupingConsistentHash(pendingCH, this.groupManager);
        }
        return new CacheTopology(cacheTopology.getTopologyId(), currentCH, pendingCH);
    }

    private void doTopologyUpdate(CacheTopology newCacheTopology, boolean isRebalance) {
        if (trace) {
            log.tracef("Installing new cache topology %s", newCacheTopology);
        }
        newCacheTopology = this.addGrouping(newCacheTopology);
        CacheTopology oldCacheTopology = this.stateConsumer.getCacheTopology();
        ConsistentHash oldCH = oldCacheTopology != null ? oldCacheTopology.getWriteConsistentHash() : null;
        ConsistentHash newCH = newCacheTopology.getWriteConsistentHash();
        this.cacheNotifier.notifyTopologyChanged(oldCH, newCH, true);
        this.stateConsumer.onTopologyUpdate(newCacheTopology, isRebalance);
        this.stateProvider.onTopologyUpdate(newCacheTopology, isRebalance);
        this.cacheNotifier.notifyTopologyChanged(oldCH, newCH, false);
        if (newCacheTopology.getCurrentCH().getMembers().contains(this.rpcManager.getAddress())) {
            this.initialStateTransferComplete.countDown();
        }
    }

    @Start(priority=1000)
    public void waitForInitialStateTransferToComplete() throws InterruptedException {
        boolean success;
        if (trace) {
            log.tracef("Waiting for initial state transfer to finish", new Object[0]);
        }
        if (!(success = this.initialStateTransferComplete.await(this.configuration.clustering().stateTransfer().timeout(), TimeUnit.MILLISECONDS))) {
            throw new CacheException(String.format("Initial state transfer timed out for cache %s on %s", this.cacheName, this.rpcManager.getAddress()));
        }
    }

    @Override
    @Stop(priority=20)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateTransferManager of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        this.localTopologyManager.leave(this.cacheName);
    }

    @Override
    public boolean isJoinComplete() {
        return this.stateConsumer.getCacheTopology() != null;
    }

    @Override
    public boolean isStateTransferInProgress() {
        return this.stateConsumer.isStateTransferInProgress();
    }

    @Override
    public boolean isStateTransferInProgressForKey(Object key) {
        return this.stateConsumer.isStateTransferInProgressForKey(key);
    }

    @Override
    public CacheTopology getCacheTopology() {
        return this.stateConsumer.getCacheTopology();
    }

    @Override
    public boolean isLocalNodeFirst() {
        CacheTopology cacheTopology = this.stateConsumer.getCacheTopology();
        if (cacheTopology == null || cacheTopology.getMembers().isEmpty()) {
            throw new IllegalStateException("Can only check if the local node is the first to join after joining");
        }
        return cacheTopology.getMembers().get(0).equals(this.rpcManager.getAddress());
    }
}

