/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import io.reactivex.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.IllegalLifecycleStateException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.conflict.impl.InternalConflictManager;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.TriangleOrderManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.metadata.impl.InternalMetadataImpl;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.cluster.ClusterListenerReplicateCallable;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.TransactionInfo;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.impl.RemoteTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.totalorder.TotalOrderLatch;
import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class StateConsumerImpl
implements StateConsumer {
    private static final Log log = LogFactory.getLog(StateConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    protected static final int NO_STATE_TRANSFER_IN_PROGRESS = -1;
    protected static final long STATE_TRANSFER_FLAGS = EnumUtil.bitSetOf((Enum)Flag.PUT_FOR_STATE_TRANSFER, (Enum)Flag.CACHE_MODE_LOCAL, (Enum[])new Enum[]{Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_SHARED_CACHE_STORE, Flag.SKIP_OWNERSHIP_CHECK, Flag.SKIP_XSITE_BACKUP});
    @Inject
    protected ComponentRef<Cache<Object, Object>> cache;
    @Inject
    protected LocalTopologyManager localTopologyManager;
    @Inject
    protected Configuration configuration;
    @Inject
    protected RpcManager rpcManager;
    @Inject
    protected TransactionManager transactionManager;
    @Inject
    protected CommandsFactory commandsFactory;
    @Inject
    protected TransactionTable transactionTable;
    @Inject
    protected InternalDataContainer<Object, Object> dataContainer;
    @Inject
    protected PersistenceManager persistenceManager;
    @Inject
    protected AsyncInterceptorChain interceptorChain;
    @Inject
    protected InvocationContextFactory icf;
    @Inject
    protected StateTransferLock stateTransferLock;
    @Inject
    protected CacheNotifier cacheNotifier;
    @Inject
    protected TotalOrderManager totalOrderManager;
    @Inject
    @ComponentName(value="org.infinispan.executors.remote")
    protected BlockingTaskAwareExecutorService remoteCommandsExecutor;
    @Inject
    protected CommitManager commitManager;
    @Inject
    @ComponentName(value="org.infinispan.executors.stateTransferExecutor")
    protected ExecutorService stateTransferExecutor;
    @Inject
    protected CommandAckCollector commandAckCollector;
    @Inject
    protected TriangleOrderManager triangleOrderManager;
    @Inject
    protected DistributionManager distributionManager;
    @Inject
    protected KeyPartitioner keyPartitioner;
    @Inject
    private InternalConflictManager conflictManager;
    @Inject
    private LocalPublisherManager<Object, Object> localPublisherManager;
    protected String cacheName;
    protected long timeout;
    protected boolean isFetchEnabled;
    protected boolean isTransactional;
    protected boolean isInvalidationMode;
    protected boolean isTotalOrder;
    protected volatile KeyInvalidationListener keyInvalidationListener;
    protected volatile CacheTopology cacheTopology;
    private volatile int firstTopologyAsMember = Integer.MAX_VALUE;
    protected final AtomicInteger stateTransferTopologyId = new AtomicInteger(-1);
    protected final AtomicBoolean waitingForState = new AtomicBoolean(false);
    protected CompletableFuture<Void> stateTransferFuture = CompletableFutures.completedNull();
    protected final Object transferMapsLock = new Object();
    @GuardedBy(value="transferMapsLock")
    private final Map<Address, List<InboundTransferTask>> transfersBySource = new HashMap<Address, List<InboundTransferTask>>();
    @GuardedBy(value="transferMapsLock")
    protected final Map<Integer, List<InboundTransferTask>> transfersBySegment = new HashMap<Integer, List<InboundTransferTask>>();
    protected LimitedExecutor stateRequestExecutor;
    private volatile boolean ownsData = false;
    protected RpcOptions rpcOptions;
    private volatile boolean running;

    @Override
    public void stopApplyingState(int topologyId) {
        if (trace) {
            log.tracef("Stop keeping track of changed keys for state transfer in topology %d", topologyId);
        }
        this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasActiveTransfers() {
        Object object = this.transferMapsLock;
        synchronized (object) {
            return !this.transfersBySource.isEmpty();
        }
    }

    @Override
    public boolean isStateTransferInProgress() {
        return this.stateTransferTopologyId.get() != -1;
    }

    @Override
    public boolean isStateTransferInProgressForKey(Object key) {
        if (this.isInvalidationMode) {
            return false;
        }
        DistributionInfo distributionInfo = this.distributionManager.getCacheTopology().getDistribution(key);
        return distributionInfo.isWriteOwner() && !distributionInfo.isReadOwner();
    }

    @Override
    public boolean ownsData() {
        return this.ownsData;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> onTopologyUpdate(CacheTopology cacheTopology, boolean isRebalance) {
        boolean startConflictResolution;
        boolean isMember = cacheTopology.getMembers().contains(this.rpcManager.getAddress());
        boolean bl = startConflictResolution = !isRebalance && cacheTopology.getPhase() == CacheTopology.Phase.CONFLICT_RESOLUTION;
        if (trace) {
            log.tracef("Received new topology for cache %s, isRebalance = %b, isMember = %b, topology = %s", new Object[]{this.cacheName, isRebalance, isMember, cacheTopology});
        }
        if (!this.ownsData && isMember) {
            this.ownsData = true;
        } else if (this.ownsData && !isMember) {
            this.ownsData = false;
        }
        boolean startRebalance = isRebalance;
        if (!isRebalance && !startConflictResolution && cacheTopology.getPendingCH() != null && this.cacheTopology.getPendingCH() == null) {
            if (trace) {
                log.tracef("Forcing startRebalance = true", new Object[0]);
            }
            startRebalance = true;
        }
        if (startRebalance) {
            this.stateTransferTopologyId.compareAndSet(-1, cacheTopology.getTopologyId());
            this.conflictManager.cancelVersionRequests();
            if (this.cacheNotifier.hasListener(DataRehashed.class)) {
                CompletionStages.join(this.cacheNotifier.notifyDataRehashed(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), cacheTopology.getUnionCH(), cacheTopology.getTopologyId(), true));
            }
        }
        if (startConflictResolution) {
            this.stateTransferTopologyId.set(-1);
        }
        this.awaitTotalOrderTransactions(cacheTopology, startRebalance);
        this.waitingForState.set(false);
        this.stateTransferFuture = new CompletableFuture();
        ConsistentHash newWriteCh = cacheTopology.getWriteConsistentHash();
        CacheTopology previousCacheTopology = this.cacheTopology;
        ConsistentHash previousReadCh = previousCacheTopology != null ? previousCacheTopology.getCurrentCH() : null;
        ConsistentHash previousWriteCh = previousCacheTopology != null ? previousCacheTopology.getWriteConsistentHash() : null;
        this.stateTransferLock.acquireExclusiveTopologyLock();
        this.beforeTopologyInstalled(cacheTopology.getTopologyId(), startRebalance, previousWriteCh, newWriteCh);
        this.cacheTopology = cacheTopology;
        this.distributionManager.setCacheTopology(cacheTopology);
        IntSet newWriteSegments = this.getOwnedSegments(newWriteCh);
        this.dataContainer.addSegments(newWriteSegments);
        this.persistenceManager.addSegments(newWriteSegments);
        if (startRebalance || startConflictResolution) {
            if (trace) {
                log.tracef("Start keeping track of keys for rebalance", new Object[0]);
            }
            this.commitManager.stopTrack(Flag.PUT_FOR_STATE_TRANSFER);
            this.commitManager.startTrack(Flag.PUT_FOR_STATE_TRANSFER);
        }
        this.stateTransferLock.releaseExclusiveTopologyLock();
        this.stateTransferLock.notifyTopologyInstalled(cacheTopology.getTopologyId());
        this.remoteCommandsExecutor.checkForReadyTasks();
        boolean wasMember = previousWriteCh != null && previousWriteCh.getMembers().contains(this.rpcManager.getAddress());
        try {
            boolean changed;
            if (!wasMember && isMember) {
                this.fetchClusterListeners(cacheTopology);
            }
            if (!startConflictResolution && (this.isTransactional || this.isFetchEnabled)) {
                IntSet addedSegments;
                IntSet removedSegments;
                if (previousWriteCh == null) {
                    removedSegments = IntSets.immutableEmptySet();
                    addedSegments = IntSets.immutableEmptySet();
                    if (trace) {
                        log.tracef("On cache %s we have: added segments: %s", this.cacheName, addedSegments);
                    }
                } else {
                    IntSet previousSegments = this.getOwnedSegments(previousWriteCh);
                    if (newWriteSegments.size() == newWriteCh.getNumSegments()) {
                        removedSegments = IntSets.immutableEmptySet();
                    } else {
                        removedSegments = IntSets.mutableCopyFrom((Set)previousSegments);
                        removedSegments.removeAll(newWriteSegments);
                    }
                    addedSegments = IntSets.mutableCopyFrom((Set)newWriteSegments);
                    addedSegments.removeAll(previousSegments);
                    if (trace) {
                        log.tracef("On cache %s we have: new segments: %s; old segments: %s", this.cacheName, newWriteSegments, previousSegments);
                        log.tracef("On cache %s we have: added segments: %s; removed segments: %s", this.cacheName, addedSegments, removedSegments);
                    }
                    this.cancelTransfers(removedSegments);
                    if (!(startRebalance || addedSegments.isEmpty() || this.configuration.clustering().cacheMode().isScattered())) {
                        log.debugf("Not requesting segments %s because the last owner left the cluster", addedSegments);
                        addedSegments.clear();
                    }
                    this.restartBrokenTransfers(cacheTopology, addedSegments);
                }
                this.handleSegments(startRebalance, addedSegments, removedSegments);
            }
            int stateTransferTopologyId = this.stateTransferTopologyId.get();
            if (trace) {
                log.tracef("Topology update processed, stateTransferTopologyId = %d, startRebalance = %s, pending CH = %s", stateTransferTopologyId, startRebalance, cacheTopology.getPendingCH());
            }
            if (stateTransferTopologyId != -1 && !startRebalance && !cacheTopology.getPhase().isRebalance() && (changed = this.stateTransferTopologyId.compareAndSet(stateTransferTopologyId, -1))) {
                this.stopApplyingState(stateTransferTopologyId);
                ConsistentHash nextConsistentHash = cacheTopology.getPendingCH();
                if (nextConsistentHash == null) {
                    nextConsistentHash = cacheTopology.getCurrentCH();
                }
                if (this.cacheNotifier.hasListener(DataRehashed.class)) {
                    CompletionStages.join(this.cacheNotifier.notifyDataRehashed(previousReadCh, nextConsistentHash, previousWriteCh, cacheTopology.getTopologyId(), false));
                }
                if (trace) {
                    log.tracef("Unlock State Transfer in Progress for topology ID %s", cacheTopology.getTopologyId());
                }
                if (this.isTotalOrder) {
                    this.totalOrderManager.notifyStateTransferEnd();
                }
            }
        }
        finally {
            this.stateTransferLock.notifyTransactionDataReceived(cacheTopology.getTopologyId());
            this.remoteCommandsExecutor.checkForReadyTasks();
            if (this.stateTransferTopologyId.get() != -1 && isMember) {
                this.waitingForState.set(true);
            }
            this.notifyEndOfStateTransferIfNeeded();
            try {
                if (this.transactionTable != null) {
                    this.transactionTable.cleanupLeaverTransactions(this.rpcManager.getTransport().getMembers());
                }
            }
            catch (Exception e) {
                log.transactionCleanupError(e);
            }
            this.commandAckCollector.onMembersChange(newWriteCh.getMembers());
            switch (cacheTopology.getPhase()) {
                case READ_ALL_WRITE_ALL: 
                case READ_NEW_WRITE_ALL: {
                    this.stateTransferFuture.complete(null);
                }
            }
            if ((isMember || wasMember) && cacheTopology.getPhase() == CacheTopology.Phase.NO_REBALANCE) {
                int numSegments = newWriteCh.getNumSegments();
                IntSet removedSegments = IntSets.mutableEmptySet((int)numSegments);
                IntSet newSegments = this.getOwnedSegments(newWriteCh);
                for (int i = 0; i < numSegments; ++i) {
                    if (newSegments.contains(i)) continue;
                    removedSegments.add(i);
                }
                try {
                    this.removeStaleData(removedSegments);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new CacheException((Throwable)e);
                }
                this.conflictManager.restartVersionRequests();
            }
        }
        return this.stateTransferFuture;
    }

    private void fetchClusterListeners(CacheTopology cacheTopology) {
        if (this.configuration.clustering().cacheMode().isDistributed() || this.configuration.clustering().cacheMode().isScattered()) {
            Collection<ClusterListenerReplicateCallable<Object, Object>> callables = this.getClusterListeners(cacheTopology);
            Cache<Object, Object> cache = this.cache.wired();
            for (ClusterListenerReplicateCallable<Object, Object> callable : callables) {
                try {
                    callable.accept(cache.getCacheManager(), cache);
                }
                catch (Exception e) {
                    log.clusterListenerInstallationFailure(e);
                }
            }
        }
    }

    protected void beforeTopologyInstalled(int topologyId, boolean startRebalance, ConsistentHash previousWriteCh, ConsistentHash newWriteCh) {
    }

    protected void handleSegments(boolean startRebalance, IntSet addedSegments, IntSet removedSegments) {
        if (!addedSegments.isEmpty()) {
            this.addTransfers(addedSegments);
        }
    }

    private void awaitTotalOrderTransactions(CacheTopology cacheTopology, boolean isRebalance) {
        if (this.isTotalOrder) {
            if (trace) {
                log.trace("State Transfer in Total Order cache. Waiting for remote transactions to finish");
            }
            try {
                for (TotalOrderLatch block : this.totalOrderManager.notifyStateTransferStart(cacheTopology.getTopologyId(), isRebalance)) {
                    block.awaitUntilUnBlock();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CacheException((Throwable)e);
            }
            if (trace) {
                log.trace("State Transfer in Total Order cache. All remote transactions are finished. Moving on...");
            }
        }
    }

    protected boolean notifyEndOfStateTransferIfNeeded() {
        if (this.waitingForState.get()) {
            if (this.hasActiveTransfers()) {
                if (trace) {
                    log.tracef("No end of state transfer notification, active transfers still exist", new Object[0]);
                }
                return false;
            }
            if (this.waitingForState.compareAndSet(true, false)) {
                int topologyId = this.stateTransferTopologyId.get();
                log.debugf("Finished receiving of segments for cache %s for topology %d.", this.cacheName, topologyId);
                this.stopApplyingState(topologyId);
                this.stateTransferFuture.complete(null);
            }
            if (trace) {
                log.tracef("No end of state transfer notification, waitingForState already set to false by another thread", new Object[0]);
            }
            return false;
        }
        if (trace) {
            log.tracef("No end of state transfer notification, waitingForState already set to false by another thread", new Object[0]);
        }
        return true;
    }

    protected IntSet getOwnedSegments(ConsistentHash consistentHash) {
        Address address = this.rpcManager.getAddress();
        return consistentHash.getMembers().contains(address) ? IntSets.from(consistentHash.getSegmentsForOwner(address)) : IntSets.immutableEmptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void applyState(Address sender, int topologyId, boolean pushTransfer, Collection<StateChunk> stateChunks) {
        ConsistentHash wCh = this.cacheTopology.getWriteConsistentHash();
        if (!wCh.getMembers().contains(this.rpcManager.getAddress())) {
            if (trace) {
                log.tracef("Ignoring received state because we are no longer a member of cache %s", this.cacheName);
            }
            return;
        }
        int rebalanceTopologyId = this.stateTransferTopologyId.get();
        if (rebalanceTopologyId == -1 && !pushTransfer) {
            log.debugf("Discarding state response with topology id %d for cache %s, we don't have a state transfer in progress", topologyId, this.cacheName);
            return;
        }
        if (topologyId < rebalanceTopologyId) {
            log.debugf("Discarding state response with old topology id %d for cache %s, state transfer request topology was %b", topologyId, this.cacheName, this.waitingForState);
            return;
        }
        if (trace) {
            log.tracef("Before applying the received state the data container of cache %s has %d keys", this.cacheName, this.dataContainer.sizeIncludingExpired());
        }
        CountDownLatch countDownLatch = new CountDownLatch(stateChunks.size());
        if (pushTransfer) {
            for (StateChunk stateChunk : stateChunks) {
                if (stateChunk.getCacheEntries() == null) continue;
                this.stateTransferExecutor.submit(() -> {
                    this.doApplyState(sender, stateChunk.getSegmentId(), stateChunk.getCacheEntries());
                    countDownLatch.countDown();
                });
            }
        } else {
            IntSet mySegments = IntSets.from(wCh.getSegmentsForOwner(this.rpcManager.getAddress()));
            for (StateChunk stateChunk : stateChunks) {
                this.stateTransferExecutor.submit(() -> {
                    try {
                        this.applyChunk(sender, mySegments, stateChunk);
                    }
                    catch (Throwable e) {
                        log.error("Failed applying state", e);
                    }
                    countDownLatch.countDown();
                    log.tracef("Latch %d", countDownLatch.getCount());
                });
            }
        }
        try {
            boolean await = countDownLatch.await(this.timeout, TimeUnit.MILLISECONDS);
            if (!await) {
                throw new TimeoutException("Timed out applying state");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CacheException((Throwable)e);
        }
        if (trace) {
            log.tracef("After applying the received state the data container of cache %s has %d keys", this.cacheName, this.dataContainer.sizeIncludingExpired());
            Object object = this.transferMapsLock;
            synchronized (object) {
                log.tracef("Segments not received yet for cache %s: %s", this.cacheName, this.transfersBySource);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void applyChunk(Address sender, IntSet mySegments, StateChunk stateChunk) {
        if (!mySegments.contains(stateChunk.getSegmentId())) {
            log.warnf("Discarding received cache entries for segment %d of cache %s because they do not belong to this node.", stateChunk.getSegmentId(), this.cacheName);
            return;
        }
        InboundTransferTask inboundTransfer = null;
        Object object = this.transferMapsLock;
        synchronized (object) {
            List<InboundTransferTask> inboundTransfers = this.transfersBySegment.get(stateChunk.getSegmentId());
            if (inboundTransfers != null) {
                inboundTransfer = inboundTransfers.stream().filter(task -> task.getSource().equals(sender)).findFirst().orElse(null);
            }
        }
        if (inboundTransfer != null) {
            if (stateChunk.getCacheEntries() != null) {
                this.doApplyState(sender, stateChunk.getSegmentId(), stateChunk.getCacheEntries());
            }
            inboundTransfer.onStateReceived(stateChunk.getSegmentId(), stateChunk.isLastChunk());
        } else if (this.cache.wired().getStatus().allowInvocations()) {
            log.ignoringUnsolicitedState(sender, stateChunk.getSegmentId(), this.cacheName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doApplyState(Address sender, int segmentId, Collection<InternalCacheEntry> cacheEntries) {
        if (trace) {
            log.tracef("Applying new state chunk for segment %d of cache %s from node %s: received %d cache entries", new Object[]{segmentId, this.cacheName, sender, cacheEntries.size()});
        }
        boolean transactional = this.transactionManager != null;
        for (InternalCacheEntry e : cacheEntries) {
            try {
                InvocationContext ctx;
                if (transactional) {
                    this.transactionManager.begin();
                    ctx = this.icf.createInvocationContext(this.transactionManager.getTransaction(), true);
                    ((AbstractCacheTransaction)((TxInvocationContext)ctx).getCacheTransaction()).setStateTransferFlag(Flag.PUT_FOR_STATE_TRANSFER);
                } else {
                    ctx = this.icf.createSingleKeyNonTxInvocationContext();
                }
                InternalMetadataImpl metadata = new InternalMetadataImpl(e);
                PutKeyValueCommand put = this.commandsFactory.buildPutKeyValueCommand(e.getKey(), e.getValue(), segmentId, metadata, STATE_TRANSFER_FLAGS);
                ctx.setLockOwner(put.getKeyLockOwner());
                this.interceptorChain.invoke(ctx, put);
                if (this.transactionManager == null) continue;
                this.transactionManager.commit();
            }
            catch (Exception ex) {
                if (!this.cache.wired().getStatus().allowInvocations()) {
                    log.debugf("Cache %s is shutting down, stopping state transfer", this.cacheName);
                    break;
                }
                log.problemApplyingStateForKey(ex.getMessage(), e.getKey(), ex);
            }
            finally {
                try {
                    if (!transactional || this.transactionManager.getTransaction() == null) continue;
                    this.transactionManager.rollback();
                }
                catch (SystemException systemException) {}
            }
        }
        if (trace) {
            log.tracef("Finished applying chunk of segment %d of cache %s", segmentId, this.cacheName);
        }
    }

    private void applyTransactions(Address sender, Collection<TransactionInfo> transactions, int topologyId) {
        log.debugf("Applying %d transactions for cache %s transferred from node %s", transactions.size(), this.cacheName, sender);
        if (this.isTransactional) {
            for (TransactionInfo transactionInfo : transactions) {
                AbstractCacheTransaction tx;
                block5: {
                    GlobalTransaction gtx = transactionInfo.getGlobalTransaction();
                    if (this.rpcManager.getAddress().equals(gtx.getAddress())) continue;
                    gtx.setRemote(true);
                    tx = this.transactionTable.getLocalTransaction(gtx);
                    if (tx == null && (tx = this.transactionTable.getRemoteTransaction(gtx)) == null) {
                        try {
                            tx = this.transactionTable.getOrCreateRemoteTransaction(gtx, transactionInfo.getModifications());
                            ((RemoteTransaction)tx).setLookedUpEntriesTopology(topologyId - 1);
                        }
                        catch (Throwable t) {
                            if (!trace) break block5;
                            log.tracef(t, "Failed to create remote transaction %s", gtx);
                        }
                    }
                }
                if (tx == null) continue;
                transactionInfo.getLockedKeys().forEach(tx::addBackupLockForKey);
            }
        }
    }

    @Start(priority=20)
    public void start() {
        this.cacheName = this.cache.wired().getName();
        this.isInvalidationMode = this.configuration.clustering().cacheMode().isInvalidation();
        this.isTransactional = this.configuration.transaction().transactionMode().isTransactional();
        this.isTotalOrder = this.configuration.transaction().transactionProtocol().isTotalOrder();
        this.timeout = this.configuration.clustering().stateTransfer().timeout();
        CacheMode mode = this.configuration.clustering().cacheMode();
        this.isFetchEnabled = mode.needsStateTransfer() && (this.configuration.clustering().stateTransfer().fetchInMemoryState() || this.configuration.persistence().fetchPersistentState() != false);
        this.rpcOptions = new RpcOptions(DeliverOrder.NONE, this.timeout, TimeUnit.MILLISECONDS);
        this.stateRequestExecutor = new LimitedExecutor("StateRequest-" + this.cacheName, this.stateTransferExecutor, 1);
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Stop(priority=0)
    public void stop() {
        if (trace) {
            log.tracef("Shutting down StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
        this.running = false;
        try {
            Object object = this.transferMapsLock;
            synchronized (object) {
                ArrayList<List<InboundTransferTask>> transfers = new ArrayList<List<InboundTransferTask>>(this.transfersBySource.values());
                this.transfersBySource.clear();
                this.transfersBySegment.clear();
                for (List list : transfers) {
                    list.forEach(InboundTransferTask::cancel);
                }
            }
            this.stateRequestExecutor.shutdownNow();
        }
        catch (Throwable t) {
            log.errorf(t, "Failed to stop StateConsumer of cache %s on node %s", this.cacheName, this.rpcManager.getAddress());
        }
    }

    @Override
    public CacheTopology getCacheTopology() {
        return this.cacheTopology;
    }

    public void setKeyInvalidationListener(KeyInvalidationListener keyInvalidationListener) {
        this.keyInvalidationListener = keyInvalidationListener;
    }

    private void addTransfers(IntSet segments) {
        log.debugf("Adding inbound state transfer for segments %s", segments);
        HashSet<Address> excludedSources = new HashSet<Address>();
        HashMap<Address, IntSet> sources = new HashMap<Address, IntSet>();
        if (this.isTransactional && !this.isTotalOrder) {
            this.requestTransactions(segments, sources, excludedSources);
        }
        if (this.isFetchEnabled) {
            this.requestSegments(segments, sources, excludedSources);
        }
        if (trace) {
            log.tracef("Finished adding inbound state transfer for segments %s", segments, this.cacheName);
        }
    }

    private void findSources(IntSet segments, Map<Address, IntSet> sources, Set<Address> excludedSources) {
        if (this.cache.wired().getStatus().isTerminated()) {
            return;
        }
        int numSegments = this.configuration.clustering().hash().numSegments();
        IntSet segmentsWithoutSource = IntSets.mutableEmptySet((int)numSegments);
        PrimitiveIterator.OfInt iter = segments.iterator();
        while (iter.hasNext()) {
            int segmentId = iter.nextInt();
            Address source = this.findSource(segmentId, excludedSources);
            if (source != null) {
                IntSet segmentsFromSource = sources.computeIfAbsent(source, k -> IntSets.mutableEmptySet((int)numSegments));
                segmentsFromSource.set(segmentId);
                continue;
            }
            segmentsWithoutSource.set(segmentId);
        }
        if (!segmentsWithoutSource.isEmpty()) {
            log.noLiveOwnersFoundForSegments((Collection<Integer>)segmentsWithoutSource, this.cacheName, excludedSources);
        }
    }

    private Address findSource(int segmentId, Set<Address> excludedSources) {
        List<Address> owners = this.cacheTopology.getReadConsistentHash().locateOwnersForSegment(segmentId);
        if (!owners.contains(this.rpcManager.getAddress())) {
            for (Address o : owners) {
                if (o.equals(this.rpcManager.getAddress()) || excludedSources.contains(o)) continue;
                return o;
            }
        }
        return null;
    }

    private void requestTransactions(IntSet segments, Map<Address, IntSet> sources, Set<Address> excludedSources) {
        this.findSources(segments, sources, excludedSources);
        boolean seenFailures = false;
        while (true) {
            IntSet failedSegments = IntSets.mutableEmptySet((int)this.configuration.clustering().hash().numSegments());
            int topologyId = this.cacheTopology.getTopologyId();
            for (Map.Entry<Address, IntSet> sourceEntry : sources.entrySet()) {
                Address source = sourceEntry.getKey();
                IntSet segmentsFromSource = sourceEntry.getValue();
                boolean failed = false;
                boolean exclude = false;
                try {
                    Response response = this.getTransactions(source, segmentsFromSource, topologyId);
                    if (response instanceof SuccessfulResponse) {
                        List transactions = (List)((SuccessfulResponse)response).getResponseValue();
                        this.applyTransactions(source, transactions, topologyId);
                    } else if (response instanceof CacheNotFoundResponse) {
                        log.debugf("Cache %s was stopped on node %s before sending transaction information", this.cacheName, source);
                        failed = true;
                        exclude = true;
                    } else {
                        log.unsuccessfulResponseRetrievingTransactionsForSegments(source, response);
                        failed = true;
                    }
                }
                catch (SuspectException e) {
                    log.debugf("Node %s left the cluster before sending transaction information", source);
                    failed = true;
                    exclude = true;
                }
                catch (Exception e) {
                    if (this.cache.wired().getStatus().isTerminated()) {
                        log.debugf("Cache %s has stopped while requesting transactions", this.cacheName);
                        sources.clear();
                        return;
                    }
                    log.failedToRetrieveTransactionsForSegments(this.cacheName, source, (Collection<Integer>)segments, e);
                    failed = true;
                }
                if (failed) {
                    failedSegments.addAll(segmentsFromSource);
                }
                if (!exclude) continue;
                excludedSources.add(source);
            }
            if (failedSegments.isEmpty()) break;
            seenFailures = true;
            sources.clear();
            this.findSources(failedSegments, sources, excludedSources);
        }
        if (seenFailures) {
            sources.clear();
        }
    }

    private Collection<ClusterListenerReplicateCallable<Object, Object>> getClusterListeners(CacheTopology topology) {
        for (Address source : topology.getMembers()) {
            if (source.equals(this.rpcManager.getAddress())) continue;
            if (trace) {
                log.tracef("Requesting cluster listeners of cache %s from node %s", this.cacheName, source);
            }
            try {
                StateRequestCommand cmd = this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.GET_CACHE_LISTENERS, this.rpcManager.getAddress(), topology.getTopologyId(), null);
                Response response = this.rpcManager.blocking(this.rpcManager.invokeCommand(source, (ReplicableCommand)cmd, SingleResponseCollector.validOnly(), this.rpcOptions));
                if (response instanceof SuccessfulResponse) {
                    return (Collection)((SuccessfulResponse)response).getResponseValue();
                }
                log.unsuccessfulResponseForClusterListeners(source, response);
            }
            catch (CacheException e) {
                log.exceptionDuringClusterListenerRetrieval(source, e);
            }
        }
        if (trace) {
            log.trace("Unable to acquire cluster listeners from other members, assuming none are present");
        }
        return Collections.emptySet();
    }

    private Response getTransactions(Address source, IntSet segments, int topologyId) {
        if (trace) {
            log.tracef("Requesting transactions from node %s for segments %s", source, segments);
        }
        StateRequestCommand cmd = this.commandsFactory.buildStateRequestCommand(StateRequestCommand.Type.GET_TRANSACTIONS, this.rpcManager.getAddress(), topologyId, segments);
        return this.rpcManager.blocking(this.rpcManager.invokeCommand(source, (ReplicableCommand)cmd, SingleResponseCollector.validOnly(), this.rpcOptions));
    }

    private void requestSegments(IntSet segments, Map<Address, IntSet> sources, Set<Address> excludedSources) {
        if (sources.isEmpty()) {
            this.findSources(segments, sources, excludedSources);
        }
        for (Map.Entry<Address, IntSet> e : sources.entrySet()) {
            this.addTransfer(e.getKey(), e.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void cancelTransfers(IntSet removedSegments) {
        Object object = this.transferMapsLock;
        synchronized (object) {
            ArrayList segmentsToCancel = new ArrayList(removedSegments);
            while (!segmentsToCancel.isEmpty()) {
                int segmentId = (Integer)segmentsToCancel.remove(0);
                List<InboundTransferTask> inboundTransfers = this.transfersBySegment.get(segmentId);
                if (inboundTransfers == null) continue;
                for (InboundTransferTask inboundTransfer : inboundTransfers) {
                    IntSet cancelledSegments = IntSets.mutableCopyFrom((Set)removedSegments);
                    cancelledSegments.retainAll(inboundTransfer.getSegments());
                    segmentsToCancel.removeAll((Collection<?>)cancelledSegments);
                    this.transfersBySegment.keySet().removeAll((Collection<?>)cancelledSegments);
                    inboundTransfer.cancelSegments(cancelledSegments);
                    if (!inboundTransfer.isCancelled()) continue;
                    this.removeTransfer(inboundTransfer);
                }
            }
        }
    }

    protected void removeStaleData(IntSet removedSegments) throws InterruptedException {
        log.debugf("Removing no longer owned entries for cache %s", this.cacheName);
        if (this.keyInvalidationListener != null) {
            this.keyInvalidationListener.beforeInvalidation(removedSegments, IntSets.immutableEmptySet());
        }
        ConcurrentHashMap.KeySetView keysToRemove = ConcurrentHashMap.newKeySet();
        this.localPublisherManager.segmentsLost(removedSegments);
        this.dataContainer.removeSegments(removedSegments);
        if (removedSegments.isEmpty()) {
            return;
        }
        if (!this.persistenceManager.removeSegments(removedSegments)) {
            return;
        }
        try {
            Predicate<Object> filter = key -> {
                if (this.dataContainer.containsKey(key)) {
                    return false;
                }
                int keySegment = this.getSegment(key);
                return removedSegments.contains(keySegment);
            };
            Publisher<Object> publisher = this.persistenceManager.publishKeys(filter, PersistenceManager.AccessMode.PRIVATE);
            Flowable.fromPublisher(publisher).blockingForEach(keysToRemove::add);
        }
        catch (CacheException e) {
            log.failedLoadingKeysFromCacheStore((Exception)((Object)e));
        }
        if (!keysToRemove.isEmpty()) {
            try {
                InvalidateCommand invalidateCmd = this.commandsFactory.buildInvalidateCommand(EnumUtil.bitSetOf((Enum)Flag.CACHE_MODE_LOCAL, (Enum)Flag.SKIP_LOCKING), keysToRemove.toArray());
                NonTxInvocationContext ctx = this.icf.createNonTxInvocationContext();
                ctx.setLockOwner(invalidateCmd.getKeyLockOwner());
                this.interceptorChain.invoke(ctx, invalidateCmd);
                if (trace) {
                    log.tracef("Removed %d keys, data container now has %d keys", keysToRemove.size(), this.dataContainer.sizeIncludingExpired());
                }
            }
            catch (IllegalLifecycleStateException invalidateCmd) {
            }
            catch (CacheException e) {
                log.failedToInvalidateKeys((Exception)((Object)e));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartBrokenTransfers(CacheTopology cacheTopology, IntSet addedSegments) {
        HashSet<Address> members = new HashSet<Address>(cacheTopology.getReadConsistentHash().getMembers());
        Object object = this.transferMapsLock;
        synchronized (object) {
            Iterator<Map.Entry<Address, List<InboundTransferTask>>> it = this.transfersBySource.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Address, List<InboundTransferTask>> entry = it.next();
                Address source = entry.getKey();
                if (members.contains(source)) continue;
                if (trace) {
                    log.tracef("Removing inbound transfers from source %s for cache %s", source, this.cacheName);
                }
                List<InboundTransferTask> inboundTransfers = entry.getValue();
                it.remove();
                for (InboundTransferTask inboundTransfer : inboundTransfers) {
                    if (trace) {
                        log.tracef("Removing inbound transfers from node %s for segments %s", source, inboundTransfer.getSegments());
                    }
                    IntSet unfinishedSegments = inboundTransfer.getUnfinishedSegments();
                    inboundTransfer.cancel();
                    addedSegments.addAll(unfinishedSegments);
                    this.transfersBySegment.keySet().removeAll((Collection<?>)unfinishedSegments);
                }
            }
            addedSegments.removeAll(this.transfersBySegment.keySet());
        }
    }

    private int getSegment(Object key) {
        return this.keyPartitioner.getSegment(key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private InboundTransferTask addTransfer(Address source, IntSet segmentsFromSource) {
        InboundTransferTask inboundTransfer;
        Object object = this.transferMapsLock;
        synchronized (object) {
            if (trace) {
                log.tracef("Adding transfer from %s for segments %s", source, segmentsFromSource);
            }
            segmentsFromSource.removeAll(this.transfersBySegment.keySet());
            if (segmentsFromSource.isEmpty()) {
                if (trace) {
                    log.tracef("All segments are already in progress, skipping", new Object[0]);
                }
                return null;
            }
            inboundTransfer = new InboundTransferTask(segmentsFromSource, source, this.cacheTopology.getTopologyId(), this.rpcManager, this.commandsFactory, this.timeout, this.cacheName, true);
            this.addTransfer(inboundTransfer, segmentsFromSource);
        }
        this.stateRequestExecutor.executeAsync(() -> {
            CompletableFuture<Void> transferStarted = inboundTransfer.requestSegments();
            if (trace) {
                log.tracef("Waiting for inbound transfer to finish: %s", inboundTransfer);
            }
            return transferStarted.whenComplete((aVoid, throwable) -> this.onTaskCompletion(inboundTransfer));
        });
        return inboundTransfer;
    }

    @GuardedBy(value="transferMapsLock")
    protected void addTransfer(InboundTransferTask inboundTransfer, IntSet segments) {
        if (!this.running) {
            throw new IllegalLifecycleStateException("State consumer is not running for cache " + this.cacheName);
        }
        PrimitiveIterator.OfInt iter = segments.iterator();
        while (iter.hasNext()) {
            int segmentId = iter.nextInt();
            this.transfersBySegment.computeIfAbsent(segmentId, s -> new ArrayList()).add(inboundTransfer);
        }
        this.transfersBySource.computeIfAbsent(inboundTransfer.getSource(), s -> new ArrayList()).add(inboundTransfer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean removeTransfer(InboundTransferTask inboundTransfer) {
        boolean found = false;
        Object object = this.transferMapsLock;
        synchronized (object) {
            List<InboundTransferTask> transfers;
            if (trace) {
                log.tracef("Removing inbound transfers from node %s for segments %s", inboundTransfer.getSegments(), inboundTransfer.getSource(), this.cacheName);
            }
            if ((transfers = this.transfersBySource.get(inboundTransfer.getSource())) != null && (found = transfers.remove(inboundTransfer)) && transfers.isEmpty()) {
                this.transfersBySource.remove(inboundTransfer.getSource());
            }
            for (Integer segment : inboundTransfer.getSegments()) {
                List<InboundTransferTask> innerTransfers = this.transfersBySegment.get(segment);
                if (innerTransfers == null || !innerTransfers.remove(inboundTransfer) || !innerTransfers.isEmpty()) continue;
                this.transfersBySegment.remove(segment);
            }
        }
        return found;
    }

    protected void onTaskCompletion(InboundTransferTask inboundTransfer) {
        if (trace) {
            log.tracef("Inbound transfer finished: %s", inboundTransfer);
        }
        if (inboundTransfer.isCompletedSuccessfully()) {
            this.removeTransfer(inboundTransfer);
            this.notifyEndOfStateTransferIfNeeded();
        }
    }

    public static interface KeyInvalidationListener {
        public void beforeInvalidation(IntSet var1, IntSet var2);
    }
}

