/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.statetransfer.TransactionInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.AbstractCacheTransaction;
import org.infinispan.transaction.LocalTransaction;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.RemoteTransaction;
import org.infinispan.transaction.TransactionTable;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class StateConsumerImpl
implements StateConsumer {
    private static final Log log = LogFactory.getLog(StateConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private StateTransferManager stateTransferManager;
    private String cacheName;
    private Configuration configuration;
    private RpcManager rpcManager;
    private TransactionManager transactionManager;
    private CommandsFactory commandsFactory;
    private TransactionTable transactionTable;
    private DataContainer dataContainer;
    private CacheLoaderManager cacheLoaderManager;
    private InterceptorChain interceptorChain;
    private InvocationContextContainer icc;
    private StateTransferLock stateTransferLock;
    private long timeout;
    private boolean useVersionedPut;
    private boolean fetchEnabled;
    private volatile CacheTopology cacheTopology;
    private volatile Set<Object> updatedKeys;
    private final AtomicInteger activeTopologyUpdates = new AtomicInteger(0);
    private final AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
    private final Map<Address, List<InboundTransferTask>> transfersBySource = new HashMap<Address, List<InboundTransferTask>>();
    private final Map<Integer, InboundTransferTask> transfersBySegment = new HashMap<Integer, InboundTransferTask>();

    @Override
    public void stopApplyingState() {
        this.updatedKeys = null;
    }

    @Override
    public void addUpdatedKey(Object key) {
        if (this.updatedKeys != null && this.cacheTopology.getWriteConsistentHash().isKeyLocalToNode(this.rpcManager.getAddress(), key)) {
            this.updatedKeys.add(key);
        }
    }

    @Override
    public boolean isKeyUpdated(Object key) {
        return this.updatedKeys == null || this.updatedKeys.contains(key);
    }

    @Inject
    public void init(Cache cache, StateTransferManager stateTransferManager, InterceptorChain interceptorChain, InvocationContextContainer icc, Configuration configuration, RpcManager rpcManager, TransactionManager transactionManager, CommandsFactory commandsFactory, CacheLoaderManager cacheLoaderManager, DataContainer dataContainer, TransactionTable transactionTable, StateTransferLock stateTransferLock) {
        this.cacheName = cache.getName();
        this.stateTransferManager = stateTransferManager;
        this.interceptorChain = interceptorChain;
        this.icc = icc;
        this.configuration = configuration;
        this.rpcManager = rpcManager;
        this.transactionManager = transactionManager;
        this.commandsFactory = commandsFactory;
        this.cacheLoaderManager = cacheLoaderManager;
        this.dataContainer = dataContainer;
        this.transactionTable = transactionTable;
        this.stateTransferLock = stateTransferLock;
        this.useVersionedPut = configuration.transaction().transactionMode().isTransactional() && configuration.versioning().enabled() && configuration.locking().writeSkewCheck() && configuration.transaction().lockingMode() == LockingMode.OPTIMISTIC && configuration.clustering().cacheMode().isClustered();
        this.timeout = configuration.clustering().stateTransfer().timeout();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isStateTransferInProgress() {
        StateConsumerImpl stateConsumerImpl = this;
        synchronized (stateConsumerImpl) {
            return !this.transfersBySource.isEmpty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isStateTransferInProgressForKey(Object key) {
        if (this.configuration.clustering().cacheMode().isInvalidation()) {
            return false;
        }
        StateConsumerImpl stateConsumerImpl = this;
        synchronized (stateConsumerImpl) {
            return this.cacheTopology != null && this.transfersBySegment.containsKey(this.getSegment(key));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        if (trace) {
            log.tracef("Received new CH %s for cache %s", cacheTopology.getWriteConsistentHash(), this.cacheName);
        }
        int numStartedTopologyUpdates = this.activeTopologyUpdates.incrementAndGet();
        if (isRebalance) {
            this.rebalanceInProgress.set(true);
        }
        ConsistentHash previousCh = this.cacheTopology != null ? this.cacheTopology.getWriteConsistentHash() : null;
        this.stateTransferLock.acquireExclusiveTopologyLock();
        this.cacheTopology = cacheTopology;
        if (numStartedTopologyUpdates == 1) {
            this.updatedKeys = new ConcurrentHashSet<Object>();
        }
        this.stateTransferLock.releaseExclusiveTopologyLock();
        this.stateTransferLock.notifyTopologyInstalled(cacheTopology.getTopologyId());
        try {
            Set<Integer> addedSegments;
            if (previousCh == null) {
                addedSegments = this.getOwnedSegments(cacheTopology.getWriteConsistentHash());
                if (trace) {
                    log.tracef("On cache %s we have: added segments: %s", this.cacheName, addedSegments);
                }
            } else {
                Set<Integer> previousSegments = this.getOwnedSegments(previousCh);
                Set<Integer> newSegments = this.getOwnedSegments(cacheTopology.getWriteConsistentHash());
                HashSet<Integer> removedSegments = new HashSet<Integer>(previousSegments);
                removedSegments.removeAll(newSegments);
                addedSegments = new HashSet<Integer>(newSegments);
                addedSegments.removeAll(previousSegments);
                if (trace) {
                    log.tracef("On cache %s we have: removed segments: %s; new segments: %s; old segments: %s; added segments: %s", new Object[]{this.cacheName, removedSegments, newSegments, previousSegments, addedSegments});
                }
                this.cancelTransfers(removedSegments);
                this.invalidateSegments(newSegments, removedSegments);
                HashSet<Address> members = new HashSet<Address>(cacheTopology.getReadConsistentHash().getMembers());
                StateConsumerImpl stateConsumerImpl = this;
                synchronized (stateConsumerImpl) {
                    Iterator<Address> it = this.transfersBySource.keySet().iterator();
                    while (it.hasNext()) {
                        Address source = it.next();
                        if (members.contains(source)) continue;
                        if (trace) {
                            log.tracef("Removing inbound transfers from source %s for cache %s", source, this.cacheName);
                        }
                        List<InboundTransferTask> inboundTransfers = this.transfersBySource.get(source);
                        it.remove();
                        for (InboundTransferTask inboundTransfer : inboundTransfers) {
                            if (trace) {
                                log.tracef("Removing inbound transfers for segments %s from source %s for cache %s", inboundTransfer.getSegments(), source, this.cacheName);
                            }
                            this.transfersBySegment.keySet().removeAll(inboundTransfer.getSegments());
                            addedSegments.addAll(inboundTransfer.getUnfinishedSegments());
                        }
                    }
                    addedSegments.removeAll(this.transfersBySegment.keySet());
                }
            }
            if (!addedSegments.isEmpty()) {
                this.addTransfers(addedSegments);
            }
        }
        finally {
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            if (this.activeTopologyUpdates.decrementAndGet() == 0) {
                this.notifyEndOfTopologyUpdate(cacheTopology.getTopologyId());
            }
        }
    }

    private void notifyEndOfTopologyUpdate(int topologyId) {
        if (!this.isStateTransferInProgress() && this.rebalanceInProgress.compareAndSet(true, false)) {
            log.debugf("Finished receiving of segments for cache %s for topology %d.", this.cacheName, topologyId);
            this.stopApplyingState();
            this.stateTransferManager.notifyEndOfTopologyUpdate(topologyId);
        }
    }

    private Set<Integer> getOwnedSegments(ConsistentHash consistentHash) {
        Address address = this.rpcManager.getAddress();
        return consistentHash.getMembers().contains(address) ? consistentHash.getSegmentsForOwner(address) : InfinispanCollections.emptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void applyState(Address sender, int topologyId, int segmentId, Collection<InternalCacheEntry> cacheEntries, boolean isLastChunk) {
        InboundTransferTask inboundTransfer;
        if (!this.cacheTopology.getWriteConsistentHash().getSegmentsForOwner(this.rpcManager.getAddress()).contains(segmentId)) {
            if (trace) {
                log.warnf("Discarding received cache entries for segment %d of cache %s because they do not belong to this node.", segmentId, this.cacheName);
            }
            return;
        }
        StateConsumerImpl stateConsumerImpl = this;
        synchronized (stateConsumerImpl) {
            inboundTransfer = this.transfersBySegment.get(segmentId);
        }
        if (inboundTransfer != null) {
            if (trace) {
                log.tracef("Before applying the received state the data container of cache %s has %d keys", this.cacheName, this.dataContainer.size());
            }
            if (cacheEntries != null) {
                this.doApplyState(sender, segmentId, cacheEntries);
            }
            inboundTransfer.onStateReceived(segmentId, isLastChunk);
            if (trace) {
                log.tracef("After applying the received state the data container of cache %s has %d keys", this.cacheName, this.dataContainer.size());
                stateConsumerImpl = this;
                synchronized (stateConsumerImpl) {
                    log.tracef("Segments not received yet for cache %s: %s", this.cacheName, this.transfersBySource);
                }
            }
        } else {
            log.warnf("Received unsolicited state from node %s for segment %d of cache %s", sender, segmentId, this.cacheName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doApplyState(Address sender, int segmentId, Collection<InternalCacheEntry> cacheEntries) {
        log.debugf("Applying new state for segment %d of cache %s from node %s: received %d cache entries", new Object[]{segmentId, this.cacheName, sender, cacheEntries.size()});
        if (trace) {
            ArrayList<Object> keys = new ArrayList<Object>(cacheEntries.size());
            for (InternalCacheEntry e : cacheEntries) {
                keys.add(e.getKey());
            }
            log.tracef("Received keys %s for segment %d of cache %s from node %s", new Object[]{keys, segmentId, this.cacheName, sender});
        }
        EnumSet<Flag[]> flags = EnumSet.of(Flag.PUT_FOR_STATE_TRANSFER, new Flag[]{Flag.CACHE_MODE_LOCAL, Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_OWNERSHIP_CHECK, Flag.SKIP_XSITE_BACKUP});
        for (InternalCacheEntry e : cacheEntries) {
            try {
                InvocationContext ctx;
                if (this.transactionManager != null) {
                    this.transactionManager.begin();
                    Transaction transaction = this.transactionManager.getTransaction();
                    ctx = this.icc.createInvocationContext(transaction);
                    ((TxInvocationContext)ctx).setImplicitTransaction(true);
                } else {
                    ctx = this.icc.createSingleKeyNonTxInvocationContext();
                }
                PutKeyValueCommand put = this.useVersionedPut ? this.commandsFactory.buildVersionedPutKeyValueCommand(e.getKey(), e.getValue(), e.getLifespan(), e.getMaxIdle(), e.getVersion(), flags) : this.commandsFactory.buildPutKeyValueCommand(e.getKey(), e.getValue(), e.getLifespan(), e.getMaxIdle(), flags);
                boolean success = false;
                try {
                    this.interceptorChain.invoke(ctx, put);
                    success = true;
                }
                finally {
                    if (!ctx.isInTxScope()) continue;
                    if (success) {
                        ((LocalTransaction)((TxInvocationContext)ctx).getCacheTransaction()).setFromStateTransfer(true);
                        try {
                            this.transactionManager.commit();
                        }
                        catch (Throwable ex) {
                            log.errorf(ex, "Could not commit transaction created by state transfer of key %s", e.getKey());
                            if (this.transactionManager.getTransaction() == null) continue;
                            this.transactionManager.rollback();
                        }
                        continue;
                    }
                    this.transactionManager.rollback();
                }
            }
            catch (Exception ex) {
                log.problemApplyingStateForKey(ex.getMessage(), e.getKey(), ex);
            }
        }
        log.debugf("Finished applying state for segment %d of cache %s", segmentId, this.cacheName);
    }

    @Override
    public void applyTransactions(Address sender, int topologyId, Collection<TransactionInfo> transactions) {
        log.debugf("Applying %d transactions for cache %s transferred from node %s", transactions.size(), this.cacheName, sender);
        if (this.configuration.transaction().transactionMode().isTransactional()) {
            for (TransactionInfo transactionInfo : transactions) {
                AbstractCacheTransaction tx = this.transactionTable.getLocalTransaction(transactionInfo.getGlobalTransaction());
                if (tx == null && (tx = this.transactionTable.getRemoteTransaction(transactionInfo.getGlobalTransaction())) == null) {
                    tx = this.transactionTable.createRemoteTransaction(transactionInfo.getGlobalTransaction(), transactionInfo.getModifications());
                    ((RemoteTransaction)tx).setMissingLookedUpEntries(true);
                }
                for (Object key : transactionInfo.getLockedKeys()) {
                    tx.addBackupLockForKey(key);
                }
            }
        }
    }

    @Start(priority=20)
    public void start() {
        this.fetchEnabled = this.configuration.clustering().stateTransfer().fetchInMemoryState() || this.cacheLoaderManager.isFetchPersistentState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Stop(priority=20)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        try {
            StateConsumerImpl stateConsumerImpl = this;
            synchronized (stateConsumerImpl) {
                Iterator<List<InboundTransferTask>> it = this.transfersBySource.values().iterator();
                while (it.hasNext()) {
                    List<InboundTransferTask> inboundTransfers = it.next();
                    it.remove();
                    for (InboundTransferTask inboundTransfer : inboundTransfers) {
                        inboundTransfer.cancel();
                    }
                }
                this.transfersBySource.clear();
                this.transfersBySegment.clear();
            }
        }
        catch (Throwable t) {
            log.errorf(t, "Failed to stop StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    @Override
    public CacheTopology getCacheTopology() {
        return this.cacheTopology;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTransfers(Set<Integer> segments) {
        log.debugf("Adding inbound state transfer for segments %s of cache %s", segments, this.cacheName);
        HashSet<Integer> segmentsToProcess = new HashSet<Integer>(segments);
        HashSet<Address> faultysources = new HashSet<Address>();
        Iterator it = segmentsToProcess.iterator();
        while (it.hasNext()) {
            Integer segmentId = (Integer)it.next();
            Address source = this.pickSourceOwner(segmentId, faultysources);
            if (source != null) continue;
            it.remove();
        }
        while (!segmentsToProcess.isEmpty()) {
            Set<Integer> segmentsFromSource;
            HashMap<Address, HashSet<Integer>> segmentsBySource = new HashMap<Address, HashSet<Integer>>();
            Iterator i$ = segmentsToProcess.iterator();
            while (i$.hasNext()) {
                int segmentId = (Integer)i$.next();
                StateConsumerImpl stateConsumerImpl = this;
                synchronized (stateConsumerImpl) {
                    if (this.transfersBySegment.containsKey(segmentId)) {
                        continue;
                    }
                }
                Address source = this.pickSourceOwner(segmentId, faultysources);
                if (source == null) continue;
                segmentsFromSource = (Set)segmentsBySource.get(source);
                if (segmentsFromSource == null) {
                    segmentsFromSource = new HashSet<Integer>();
                    segmentsBySource.put(source, (HashSet<Integer>)segmentsFromSource);
                }
                segmentsFromSource.add(segmentId);
            }
            HashSet<Integer> failedSegments = new HashSet<Integer>();
            for (Address source : segmentsBySource.keySet()) {
                InboundTransferTask inboundTransfer;
                segmentsFromSource = (HashSet<Integer>)segmentsBySource.get(source);
                StateConsumerImpl stateConsumerImpl = this;
                synchronized (stateConsumerImpl) {
                    segmentsFromSource.removeAll(this.transfersBySegment.keySet());
                    if (segmentsFromSource.isEmpty()) {
                        continue;
                    }
                    inboundTransfer = new InboundTransferTask(segmentsFromSource, source, this.cacheTopology.getTopologyId(), this, this.rpcManager, this.commandsFactory, this.timeout, this.cacheName);
                    Iterator i$2 = segmentsFromSource.iterator();
                    while (i$2.hasNext()) {
                        int segmentId = (Integer)i$2.next();
                        this.transfersBySegment.put(segmentId, inboundTransfer);
                    }
                    List<InboundTransferTask> inboundTransfers = this.transfersBySource.get(inboundTransfer.getSource());
                    if (inboundTransfers == null) {
                        inboundTransfers = new ArrayList<InboundTransferTask>();
                        this.transfersBySource.put(inboundTransfer.getSource(), inboundTransfers);
                    }
                    inboundTransfers.add(inboundTransfer);
                }
                if (this.configuration.transaction().transactionMode().isTransactional() && !inboundTransfer.requestTransactions()) {
                    log.failedToRetrieveTransactionsForSegments(segmentsFromSource, this.cacheName, source);
                    failedSegments.addAll(segmentsFromSource);
                    faultysources.add(source);
                    this.removeTransfer(inboundTransfer);
                    continue;
                }
                if (this.fetchEnabled) {
                    if (inboundTransfer.requestSegments()) continue;
                    log.failedToRequestSegments(segmentsFromSource, this.cacheName, source);
                    failedSegments.addAll(segmentsFromSource);
                    faultysources.add(source);
                    this.removeTransfer(inboundTransfer);
                    continue;
                }
                this.removeTransfer(inboundTransfer);
            }
            segmentsToProcess = failedSegments;
        }
        log.debugf("Finished adding inbound state transfer for segments %s of cache %s", segments, this.cacheName);
    }

    private Address pickSourceOwner(int segmentId, Set<Address> faultySources) {
        List<Address> owners = this.cacheTopology.getReadConsistentHash().locateOwnersForSegment(segmentId);
        if (owners.size() == 1 && owners.get(0).equals(this.rpcManager.getAddress())) {
            return null;
        }
        for (int i = owners.size() - 1; i >= 0; --i) {
            Address o = owners.get(i);
            if (o.equals(this.rpcManager.getAddress()) || faultySources.contains(o)) continue;
            return o;
        }
        log.noLiveOwnersFoundForSegment(segmentId, this.cacheName, owners, faultySources);
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelTransfers(Set<Integer> removedSegments) {
        StateConsumerImpl stateConsumerImpl = this;
        synchronized (stateConsumerImpl) {
            ArrayList<Integer> segmentsToCancel = new ArrayList<Integer>(removedSegments);
            while (!segmentsToCancel.isEmpty()) {
                int segmentId = (Integer)segmentsToCancel.remove(0);
                InboundTransferTask inboundTransfer = this.transfersBySegment.remove(segmentId);
                if (inboundTransfer == null) continue;
                log.debugf("Cancelling inbound state transfer for segment %d of cache %s", segmentId, this.cacheName);
                HashSet<Integer> cancelledSegments = new HashSet<Integer>(segmentsToCancel);
                cancelledSegments.retainAll(inboundTransfer.getSegments());
                segmentsToCancel.removeAll(cancelledSegments);
                inboundTransfer.cancelSegments(cancelledSegments);
            }
        }
    }

    private void invalidateSegments(Set<Integer> newSegments, Set<Integer> segmentsToL1) {
        NonTxInvocationContext ctx;
        InvalidateCommand invalidateCmd;
        HashSet<Object> keysToL1 = new HashSet<Object>();
        HashSet<Object> keysToRemove = new HashSet<Object>();
        for (InternalCacheEntry ice : this.dataContainer) {
            Object key = ice.getKey();
            int keySegment = this.getSegment(key);
            if (segmentsToL1.contains(keySegment)) {
                keysToL1.add(key);
                continue;
            }
            if (newSegments.contains(keySegment)) continue;
            keysToRemove.add(key);
        }
        CacheStore cacheStore = this.getCacheStore();
        if (cacheStore != null) {
            try {
                Set<Object> storedKeys = cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(this.dataContainer));
                for (Object key : storedKeys) {
                    int keySegment = this.getSegment(key);
                    if (segmentsToL1.contains(keySegment)) {
                        keysToL1.add(key);
                        continue;
                    }
                    if (newSegments.contains(keySegment)) continue;
                    keysToRemove.add(key);
                }
            }
            catch (CacheLoaderException e) {
                log.failedLoadingKeysFromCacheStore(e);
            }
        }
        if (this.configuration.clustering().l1().onRehash()) {
            log.debugf("Moving to L1 state for segments %s of cache %s", segmentsToL1, this.cacheName);
        } else {
            log.debugf("Removing state for segments %s of cache %s", segmentsToL1, this.cacheName);
        }
        if (!keysToL1.isEmpty()) {
            try {
                invalidateCmd = this.commandsFactory.buildInvalidateFromL1Command(true, EnumSet.of(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING), keysToL1);
                ctx = this.icc.createNonTxInvocationContext();
                this.interceptorChain.invoke(ctx, invalidateCmd);
                log.debugf("Invalidated %d keys, data container now has %d keys", keysToL1.size(), this.dataContainer.size());
                if (trace) {
                    log.tracef("Invalidated keys: %s", keysToL1);
                }
            }
            catch (CacheException e) {
                log.failedToInvalidateKeys(e);
            }
        }
        log.debugf("Removing L1 state for segments not in %s or %s for cache %s", newSegments, segmentsToL1, this.cacheName);
        if (!keysToRemove.isEmpty()) {
            try {
                invalidateCmd = this.commandsFactory.buildInvalidateFromL1Command(false, EnumSet.of(Flag.CACHE_MODE_LOCAL, Flag.SKIP_LOCKING), keysToRemove);
                ctx = this.icc.createNonTxInvocationContext();
                this.interceptorChain.invoke(ctx, invalidateCmd);
                log.debugf("Invalidated %d keys, data container of cache %s now has %d keys", keysToRemove.size(), this.cacheName, this.dataContainer.size());
                if (trace) {
                    log.tracef("Invalidated keys: %s", keysToRemove);
                }
            }
            catch (CacheException e) {
                log.failedToInvalidateKeys(e);
            }
        }
    }

    private int getSegment(Object key) {
        return this.cacheTopology.getReadConsistentHash().getSegment(key);
    }

    private CacheStore getCacheStore() {
        if (this.cacheLoaderManager != null && this.cacheLoaderManager.isEnabled() && !this.cacheLoaderManager.isShared()) {
            return this.cacheLoaderManager.getCacheStore();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeTransfer(InboundTransferTask inboundTransfer) {
        StateConsumerImpl stateConsumerImpl = this;
        synchronized (stateConsumerImpl) {
            List<InboundTransferTask> transfers = this.transfersBySource.get(inboundTransfer.getSource());
            if (transfers != null && transfers.remove(inboundTransfer)) {
                if (transfers.isEmpty()) {
                    this.transfersBySource.remove(inboundTransfer.getSource());
                }
                this.transfersBySegment.keySet().removeAll(inboundTransfer.getSegments());
            }
        }
    }

    void onTaskCompletion(InboundTransferTask inboundTransfer) {
        log.tracef("Completion of inbound transfer task: %s ", inboundTransfer);
        this.removeTransfer(inboundTransfer);
        if (this.activeTopologyUpdates.get() == 0) {
            this.notifyEndOfTopologyUpdate(this.cacheTopology.getTopologyId());
        }
    }
}

