/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

public class GridCacheIoManager
extends GridCacheSharedManagerAdapter {
    private static final String QUERY_TOPIC_PREFIX = "QUERY";
    private static final AtomicLong idGen = new AtomicLong();
    private static final int MAX_STORED_PENDING_MESSAGES = 100;
    private long retryDelay;
    private int retryCnt;
    private final MessageHandlers cacheHandlers = new MessageHandlers();
    private final MessageHandlers grpHandlers = new MessageHandlers();
    private boolean stopping;
    private final StripedCompositeReadWriteLock rw = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
    private boolean depEnabled;
    private final List<GridCacheMessage> pendingMsgs = new ArrayList<GridCacheMessage>(100);
    private GridMessageListener lsnr = new GridMessageListener(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(final UUID nodeId, Object msg, final byte plc) {
            if (GridCacheIoManager.this.log.isDebugEnabled()) {
                GridCacheIoManager.this.log.debug("Received unordered cache communication message [nodeId=" + nodeId + ", locId=" + GridCacheIoManager.this.cctx.localNodeId() + ", msg=" + msg + ']');
            }
            final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
            AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion();
            AffinityTopologyVersion lastAffChangedVer = cacheMsg.lastAffinityChangedTopologyVersion();
            GridCacheIoManager.this.cctx.exchange().lastAffinityChangedTopologyVersion(rmtAffVer, lastAffChangedVer);
            IgniteInternalFuture<Comparable<AffinityTopologyVersion>> fut = null;
            if (cacheMsg.partitionExchangeMessage()) {
                long rmtTopVer;
                long locTopVer;
                if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
                    GridDhtAffinityAssignmentRequest msg0 = (GridDhtAffinityAssignmentRequest)cacheMsg;
                    assert (cacheMsg.topologyVersion() != null) : cacheMsg;
                    AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(GridCacheIoManager.this.cctx.localNode().order());
                    CacheGroupDescriptor desc = GridCacheIoManager.this.cctx.cache().cacheGroupDescriptors().get(msg0.groupId());
                    if (desc != null) {
                        if (desc.startTopologyVersion() != null) {
                            startTopVer = desc.startTopologyVersion();
                        } else if (desc.receivedFromStartVersion() != null) {
                            startTopVer = desc.receivedFromStartVersion();
                        }
                    }
                    if ((fut = GridCacheIoManager.this.cctx.exchange().affinityReadyFuture(startTopVer)) != null && !fut.isDone()) {
                        if (GridCacheIoManager.this.log.isDebugEnabled()) {
                            GridCacheIoManager.this.log.debug("Wait for exchange before processing message [msg=" + msg + ", node=" + nodeId + ", waitVer=" + startTopVer + ", cacheDesc=" + GridCacheIoManager.this.descriptorForMessage(cacheMsg) + ']');
                        }
                        fut.listen(new CI1<IgniteInternalFuture<?>>(){

                            @Override
                            public void apply(IgniteInternalFuture<?> fut) {
                                GridCacheIoManager.this.cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable(){

                                    @Override
                                    public void run() {
                                        GridCacheIoManager.this.handleMessage(nodeId, cacheMsg, plc);
                                    }
                                });
                            }
                        });
                        return;
                    }
                }
                if ((locTopVer = GridCacheIoManager.this.cctx.discovery().topologyVersion()) < (rmtTopVer = cacheMsg.topologyVersion().topologyVersion())) {
                    if (GridCacheIoManager.this.log.isDebugEnabled()) {
                        GridCacheIoManager.this.log.debug("Received message has higher topology version [msg=" + msg + ", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']');
                    }
                    fut = GridCacheIoManager.this.cctx.discovery().topologyFuture(rmtTopVer);
                }
            } else {
                AffinityTopologyVersion locAffVer = GridCacheIoManager.this.cctx.exchange().readyAffinityVersion();
                if (locAffVer.before(lastAffChangedVer)) {
                    IgniteLogger log = cacheMsg.messageLogger(GridCacheIoManager.this.cctx);
                    if (log.isDebugEnabled()) {
                        StringBuilder msg0 = new StringBuilder("Received message has higher affinity topology version [");
                        GridCacheIoManager.this.appendMessageInfo(cacheMsg, nodeId, msg0);
                        msg0.append(", locTopVer=").append(locAffVer).append(", rmtTopVer=").append(rmtAffVer).append(", lastAffChangedVer=").append(lastAffChangedVer).append(']');
                        log.debug(msg0.toString());
                    }
                    fut = GridCacheIoManager.this.cctx.exchange().affinityReadyFuture(lastAffChangedVer);
                }
            }
            if (fut != null && !fut.isDone()) {
                List locAffVer = GridCacheIoManager.this.pendingMsgs;
                synchronized (locAffVer) {
                    if (GridCacheIoManager.this.pendingMsgs.size() < 100) {
                        GridCacheIoManager.this.pendingMsgs.add(cacheMsg);
                    }
                }
                Thread curThread = Thread.currentThread();
                final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1;
                fut.listen(new CI1<IgniteInternalFuture<?>>(){

                    @Override
                    public void apply(IgniteInternalFuture<?> t) {
                        Runnable c = new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                List list = GridCacheIoManager.this.pendingMsgs;
                                synchronized (list) {
                                    GridCacheIoManager.this.pendingMsgs.remove(cacheMsg);
                                }
                                IgniteLogger log = cacheMsg.messageLogger(GridCacheIoManager.this.cctx);
                                if (log.isDebugEnabled()) {
                                    StringBuilder msg0 = new StringBuilder("Process cache message after wait for affinity topology version [");
                                    GridCacheIoManager.this.appendMessageInfo(cacheMsg, nodeId, msg0).append(']');
                                    log.debug(msg0.toString());
                                }
                                GridCacheIoManager.this.handleMessage(nodeId, cacheMsg, plc);
                            }
                        };
                        if (stripe >= 0) {
                            GridCacheIoManager.this.cctx.kernalContext().getStripedExecutorService().execute(stripe, c);
                        } else {
                            try {
                                GridCacheIoManager.this.cctx.kernalContext().pools().poolForPolicy(plc).execute(c);
                            }
                            catch (IgniteCheckedException e) {
                                U.error(cacheMsg.messageLogger(GridCacheIoManager.this.cctx), "Failed to get pool for policy: " + plc, e);
                            }
                        }
                    }
                });
                return;
            }
            GridCacheIoManager.this.handleMessage(nodeId, cacheMsg, plc);
        }
    };

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dumpPendingMessages(StringBuilder sb) {
        List<GridCacheMessage> list = this.pendingMsgs;
        synchronized (list) {
            if (this.pendingMsgs.isEmpty()) {
                return;
            }
            sb.append("Pending cache messages waiting for exchange [readyVer=").append(this.cctx.exchange().readyAffinityVersion()).append(", discoVer=").append(this.cctx.discovery().topologyVersion()).append(']');
            sb.append(IgniteUtils.nl());
            for (GridCacheMessage msg : this.pendingMsgs) {
                sb.append("Message [waitVer=").append(msg.topologyVersion()).append(", msg=").append(msg).append(']');
                sb.append(IgniteUtils.nl());
            }
        }
    }

    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) {
        this.handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? this.grpHandlers : this.cacheHandlers, plc);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) {
        Lock lock = this.rw.readLock();
        lock.lock();
        try {
            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0;
            IgniteBiInClosure[] cacheClsHandlers;
            int msgIdx = cacheMsg.lookupIndex();
            IgniteBiInClosure c = null;
            if (msgIdx >= 0 && (cacheClsHandlers = (idxClsHandlers0 = msgHandlers.idxClsHandlers).get(cacheMsg.handlerId())) != null) {
                c = cacheClsHandlers[msgIdx];
            }
            if (c == null) {
                c = (IgniteBiInClosure)msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
            }
            if (c == null) {
                if (this.processMissedHandler(nodeId, cacheMsg)) {
                    return;
                }
                IgniteLogger log = cacheMsg.messageLogger(this.cctx);
                StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) [");
                this.appendMessageInfo(cacheMsg, nodeId, msg0);
                msg0.append(", locTopVer=").append(this.cctx.exchange().readyAffinityVersion()).append(", msgTopVer=").append(cacheMsg.topologyVersion()).append(", desc=").append(this.descriptorForMessage(cacheMsg)).append(']');
                msg0.append(IgniteUtils.nl()).append("Registered listeners:");
                Map<Integer, IgniteBiInClosure[]> idxClsHandlers02 = msgHandlers.idxClsHandlers;
                for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers02.entrySet()) {
                    msg0.append(IgniteUtils.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
                }
                if (this.cctx.kernalContext().isStopping()) {
                    if (log.isDebugEnabled()) {
                        log.debug(msg0.toString());
                    }
                } else {
                    U.error(log, msg0.toString());
                    try {
                        cacheMsg.onClassError(new IgniteCheckedException("Failed to find message handler for message: " + cacheMsg));
                        this.processFailedMessage(nodeId, cacheMsg, c, plc);
                    }
                    catch (Exception e) {
                        U.error(log, "Failed to process failed message: " + e, e);
                    }
                }
                return;
            }
            this.onMessage0(nodeId, cacheMsg, c, plc);
        }
        finally {
            lock.unlock();
        }
    }

    private boolean processMissedHandler(UUID nodeId, GridCacheMessage cacheMsg) {
        GridDhtAtomicAbstractUpdateRequest req;
        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest && (req = (GridDhtAtomicAbstractUpdateRequest)cacheMsg).nearSize() > 0) {
            ArrayList<KeyCacheObject> nearEvicted = new ArrayList<KeyCacheObject>(req.nearSize());
            for (int i = 0; i < req.nearSize(); ++i) {
                nearEvicted.add(req.nearKey(i));
            }
            GridDhtAtomicUpdateResponse dhtRes = new GridDhtAtomicUpdateResponse(req.cacheId(), req.partition(), req.futureId(), false);
            dhtRes.nearEvicted(nearEvicted);
            this.sendMessageForMissedHandler(cacheMsg, nodeId, dhtRes, nodeId, (byte)2);
            if (req.nearNodeId() != null) {
                GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), req.nearFutureId(), nodeId, req.flags());
                this.sendMessageForMissedHandler(cacheMsg, nodeId, nearRes, req.nearNodeId(), (byte)2);
            }
            return true;
        }
        return false;
    }

    private void sendMessageForMissedHandler(GridCacheMessage origMsg, UUID origMsgNode, GridCacheMessage msg, UUID nodeId, byte plc) {
        IgniteLogger log = msg.messageLogger(this.cctx);
        try {
            if (log.isDebugEnabled()) {
                log.debug("Received message without registered handler, send response [locTopVer=" + this.cctx.exchange().readyAffinityVersion() + ", msgTopVer=" + origMsg.topologyVersion() + ", node=" + origMsgNode + ", msg=" + origMsg + ", resNode=" + nodeId + ", res=" + msg + ']');
            }
            this.send(nodeId, msg, plc);
        }
        catch (ClusterTopologyCheckedException e) {
            if (log.isDebugEnabled()) {
                log.debug("Failed to send response, node left [nodeId=" + nodeId + ", msg=" + msg + ']');
            }
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed to send response [nodeId=" + nodeId + ", msg=" + msg + ", err=" + e + ']');
        }
    }

    @Override
    public void start0() throws IgniteCheckedException {
        this.retryDelay = this.cctx.gridConfig().getNetworkSendRetryDelay();
        this.retryCnt = this.cctx.gridConfig().getNetworkSendRetryCount();
        this.depEnabled = this.cctx.gridDeploy().enabled();
        this.cctx.gridIO().addMessageListener(GridTopic.TOPIC_CACHE, this.lsnr);
    }

    @Nullable
    public Lock readLock() {
        Lock lock = this.rw.readLock();
        if (!lock.tryLock()) {
            return null;
        }
        if (this.stopping) {
            lock.unlock();
            return null;
        }
        return lock;
    }

    public void writeLock() {
        boolean interrupted = false;
        while (true) {
            try {
                while (!this.rw.writeLock().tryLock(200L, TimeUnit.MILLISECONDS)) {
                    Thread.sleep(200L);
                }
            }
            catch (InterruptedException ignore) {
                interrupted = true;
                continue;
            }
            break;
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    public void writeUnlock() {
        this.rw.writeLock().unlock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onKernalStop0(boolean cancel) {
        this.cctx.gridIO().removeMessageListener(GridTopic.TOPIC_CACHE);
        for (Object ordTopic : this.cacheHandlers.orderedHandlers.keySet()) {
            this.cctx.gridIO().removeMessageListener(ordTopic);
        }
        for (Object ordTopic : this.grpHandlers.orderedHandlers.keySet()) {
            this.cctx.gridIO().removeMessageListener(ordTopic);
        }
        this.writeLock();
        try {
            this.stopping = true;
        }
        finally {
            this.rw.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onMessage0(UUID nodeId, GridCacheMessage cacheMsg, IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) {
        try {
            if (this.stopping) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received cache communication message while stopping (will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
                }
                return;
            }
            if (this.depEnabled) {
                this.cctx.deploy().ignoreOwnership(true);
            }
            this.unmarshall(nodeId, cacheMsg);
            if (cacheMsg.classError() != null) {
                this.processFailedMessage(nodeId, cacheMsg, c, plc);
            } else {
                this.processMessage(nodeId, cacheMsg, c);
            }
        }
        catch (Throwable e) {
            String msgStr;
            try {
                msgStr = String.valueOf(cacheMsg);
            }
            catch (Throwable e0) {
                String clsName = cacheMsg.getClass().getName();
                U.error(this.log, "Failed to log message due to an error: " + clsName, e0);
                msgStr = clsName + "(failed to log message)";
            }
            U.error(this.log, "Failed to process message [senderId=" + nodeId + ", msg=" + msgStr + ']', e);
            this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
            if (e instanceof Error) {
                throw (Error)e;
            }
        }
        finally {
            if (this.depEnabled) {
                this.cctx.deploy().ignoreOwnership(false);
            }
        }
    }

    private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx, byte plc) {
        try {
            cctx.io().send(nodeId, res, plc);
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + ",res=" + res + ']', e);
        }
    }

    private StringBuilder appendMessageInfo(GridCacheMessage cacheMsg, UUID nodeId, StringBuilder builder) {
        if (this.txId(cacheMsg) != null) {
            builder.append("txId=").append(this.txId(cacheMsg)).append(", dhtTxId=").append(this.dhtTxId(cacheMsg)).append(", msg=").append(cacheMsg);
        } else if (this.atomicFututeId(cacheMsg) != null) {
            builder.append("futId=").append(this.atomicFututeId(cacheMsg)).append(", writeVer=").append(this.atomicWriteVersion(cacheMsg)).append(", msg=").append(cacheMsg);
        } else {
            builder.append("msg=").append(cacheMsg);
        }
        builder.append(", node=").append(nodeId);
        return builder;
    }

    @Nullable
    private GridCacheVersion txId(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridDhtTxPrepareRequest) {
            return ((GridDhtTxPrepareRequest)cacheMsg).nearXidVersion();
        }
        if (cacheMsg instanceof GridNearTxPrepareRequest) {
            return ((GridNearTxPrepareRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridNearTxPrepareResponse) {
            return ((GridNearTxPrepareResponse)cacheMsg).version();
        }
        if (cacheMsg instanceof GridNearTxFinishRequest) {
            return ((GridNearTxFinishRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridNearTxFinishResponse) {
            return ((GridNearTxFinishResponse)cacheMsg).xid();
        }
        return null;
    }

    @Nullable
    private GridCacheVersion dhtTxId(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridDhtTxPrepareRequest) {
            return ((GridDhtTxPrepareRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridDhtTxPrepareResponse) {
            return ((GridDhtTxPrepareResponse)cacheMsg).version();
        }
        if (cacheMsg instanceof GridDhtTxFinishRequest) {
            return ((GridDhtTxFinishRequest)cacheMsg).version();
        }
        if (cacheMsg instanceof GridDhtTxFinishResponse) {
            return ((GridDhtTxFinishResponse)cacheMsg).xid();
        }
        return null;
    }

    @Nullable
    private Long atomicFututeId(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) {
            return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
        }
        if (cacheMsg instanceof GridNearAtomicUpdateResponse) {
            return ((GridNearAtomicUpdateResponse)cacheMsg).futureId();
        }
        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) {
            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId();
        }
        if (cacheMsg instanceof GridDhtAtomicUpdateResponse) {
            return ((GridDhtAtomicUpdateResponse)cacheMsg).futureId();
        }
        if (cacheMsg instanceof GridNearAtomicCheckUpdateRequest) {
            return ((GridNearAtomicCheckUpdateRequest)cacheMsg).futureId();
        }
        return null;
    }

    @Nullable
    private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) {
            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
        }
        return null;
    }

    private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) throws IgniteCheckedException {
        assert (msg != null);
        switch (msg.directType()) {
            case 30: {
                GridDhtLockRequest req = (GridDhtLockRequest)msg;
                GridDhtLockResponse res = new GridDhtLockResponse(req.cacheId(), req.version(), req.futureId(), req.miniId(), 0, false);
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 34: {
                GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
                GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(req.partition(), req.version(), req.futureId(), req.miniId(), req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, req.policy());
                break;
            }
            case 38: {
                GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(req.cacheId(), req.partition(), req.futureId(), false);
                res.onError(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                if (req.nearNodeId() == null) break;
                GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), req.nearFutureId(), nodeId, req.flags());
                nearRes.errors(new UpdateErrors(req.classError()));
                this.sendResponseOnFailedMessage(req.nearNodeId(), nearRes, this.cctx, plc);
                break;
            }
            case 40: {
                GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(req.cacheId(), nodeId, req.futureId(), req.partition(), false, false);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 42: {
                GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
                GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(req.cacheId(), req.futureId(), req.miniId(), false);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 49: {
                GridNearGetRequest req = (GridNearGetRequest)msg;
                GridNearGetResponse res = new GridNearGetResponse(req.cacheId(), req.futureId(), req.miniId(), req.version(), req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 50: {
                GridNearGetResponse res = (GridNearGetResponse)msg;
                CacheGetFuture fut = (CacheGetFuture)((Object)this.cctx.mvcc().future(res.futureId()));
                if (fut == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
                    }
                    return;
                }
                res.error(res.classError());
                fut.onResult(nodeId, res);
                break;
            }
            case 51: {
                GridNearLockRequest req = (GridNearLockRequest)msg;
                GridNearLockResponse res = new GridNearLockResponse(req.cacheId(), req.version(), req.futureId(), req.miniId(), false, 0, req.classError(), null, false, false);
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 55: {
                GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
                GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(req.partition(), req.version(), req.futureId(), req.miniId(), req.version(), req.version(), null, null, null, false, req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, req.policy());
                break;
            }
            case 58: {
                GridCacheQueryRequest req = (GridCacheQueryRequest)msg;
                GridCacheQueryResponse res = new GridCacheQueryResponse(req.cacheId(), req.id(), req.classError(), this.cctx.deploymentEnabled());
                ClusterNode node = this.cctx.node(nodeId);
                if (node == null) {
                    U.error(this.log, "Failed to send message because node left grid [nodeId=" + nodeId + ", msg=" + msg + ']');
                    break;
                }
                this.cctx.io().sendOrderedMessage(node, GridTopic.TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), res, plc, Long.MAX_VALUE);
                break;
            }
            case 114: 
            case 120: {
                this.processMessage(nodeId, msg, c);
                break;
            }
            case 116: {
                GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
                GridNearSingleGetResponse res = new GridNearSingleGetResponse(req.cacheId(), req.futureId(), req.topologyVersion(), null, false, req.deployInfo() != null);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 117: {
                GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
                GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)this.cctx.mvcc().future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
                if (fut == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
                    }
                    return;
                }
                res.error(res.classError());
                fut.onResult(nodeId, res);
                break;
            }
            case 125: {
                GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(req.cacheId(), nodeId, req.futureId(), req.partition(), false, false);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 126: {
                GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(req.cacheId(), nodeId, req.futureId(), req.partition(), false, false);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 127: {
                GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg;
                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(req.cacheId(), nodeId, req.futureId(), req.partition(), false, false);
                res.error(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 151: {
                GridNearTxQueryEnlistRequest req = (GridNearTxQueryEnlistRequest)msg;
                GridNearTxQueryEnlistResponse res = new GridNearTxQueryEnlistResponse(req.cacheId(), req.futureId(), req.miniId(), req.version(), req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 153: {
                GridNearTxQueryResultsEnlistRequest req = (GridNearTxQueryResultsEnlistRequest)msg;
                GridNearTxQueryResultsEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(req.cacheId(), req.futureId(), req.miniId(), req.version(), req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 155: 
            case 156: {
                GridDhtTxQueryEnlistRequest req = (GridDhtTxQueryEnlistRequest)msg;
                GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(req.cacheId(), req.dhtFutureId(), req.batchId(), req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case 159: {
                GridNearTxEnlistRequest req = (GridNearTxEnlistRequest)msg;
                GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(), req.futureId(), req.miniId(), req.version(), req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                break;
            }
            case -36: {
                GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(req.cacheId(), req.partition(), req.futureId(), false);
                res.onError(req.classError());
                this.sendResponseOnFailedMessage(nodeId, res, this.cctx, plc);
                if (req.nearNodeId() == null) break;
                GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), req.nearFutureId(), nodeId, req.flags());
                nearRes.errors(new UpdateErrors(req.classError()));
                this.sendResponseOnFailedMessage(req.nearNodeId(), nearRes, this.cctx, plc);
                break;
            }
            default: {
                throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]", msg.classError());
            }
        }
    }

    private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) {
        try {
            c.apply(nodeId, msg);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Finished processing cache communication message [nodeId=" + nodeId + ", msg=" + msg + ']');
            }
        }
        catch (Throwable e) {
            try {
                U.error(this.log, "Failed processing message [senderId=" + nodeId + ", msg=" + msg + ']', e);
            }
            catch (Throwable e0) {
                U.error(this.log, "Failed processing message [senderId=" + nodeId + ", msg=(failed to log message)", e);
                U.error(this.log, "Failed to log message due to an error: ", e0);
            }
            this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
            throw e;
        }
        finally {
            this.onMessageProcessed(msg);
        }
    }

    public void onMessageProcessed(GridCacheMessage msg) {
        GridCacheContext ctx;
        this.cctx.tm().resetContext();
        GridCacheMvccManager mvcc = this.cctx.mvcc();
        if (mvcc != null) {
            mvcc.contextReset();
        }
        if (msg instanceof IgniteTxStateAware) {
            IgniteTxState txState = ((IgniteTxStateAware)((Object)msg)).txState();
            if (txState != null) {
                txState.unwindEvicts(this.cctx);
            }
        } else if (msg instanceof GridCacheIdMessage && (ctx = this.cctx.cacheContext(((GridCacheIdMessage)msg).cacheId())) != null && !(msg instanceof GridCacheQueryRequest)) {
            CU.unwindEvicts(ctx);
        }
    }

    private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
        if (msg.error() != null && this.cctx.kernalContext().isStopping()) {
            return false;
        }
        if (msg.messageId() < 0L) {
            msg.messageId(idGen.incrementAndGet());
        }
        if (destNodeId == null || !this.cctx.localNodeId().equals(destNodeId)) {
            msg.prepareMarshal(this.cctx);
            if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo()) {
                this.cctx.deploy().prepare((GridCacheDeployable)((Object)msg));
            }
        }
        return true;
    }

    public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping) throws IgniteClientDisconnectedCheckedException {
        return this.cctx.gridIO().checkNodeLeft(nodeId, sndErr, ping);
    }

    public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
        assert (!node.isLocal()) : node;
        msg.lastAffinityChangedTopologyVersion(this.cctx.exchange().lastAffinityChangedTopologyVersion(msg.topologyVersion()));
        if (!this.onSend(msg, node.id())) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
        }
        int cnt = 0;
        while (cnt <= this.retryCnt) {
            try {
                ++cnt;
                this.cctx.gridIO().sendToGridTopic(node, GridTopic.TOPIC_CACHE, (Message)msg, plc);
                return;
            }
            catch (ClusterTopologyCheckedException e) {
                throw e;
            }
            catch (IgniteCheckedException e) {
                if (!this.cctx.discovery().alive(node.id()) || !this.cctx.discovery().pingNode(node.id())) {
                    throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
                }
                if (cnt == this.retryCnt || this.cctx.kernalContext().isStopping()) {
                    throw e;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send message to node (will retry): " + node.id());
                }
                U.sleep(this.retryDelay);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
        }
    }

    public void send(UUID nodeId, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
        ClusterNode n = this.cctx.discovery().node(nodeId);
        if (n == null) {
            throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + nodeId + ", msg=" + msg + ']');
        }
        this.send(n, msg, plc);
    }

    public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc, long timeout) throws IgniteCheckedException {
        if (!this.onSend(msg, node.id())) {
            return;
        }
        msg.lastAffinityChangedTopologyVersion(this.cctx.exchange().lastAffinityChangedTopologyVersion(msg.topologyVersion()));
        int cnt = 0;
        while (cnt <= this.retryCnt) {
            try {
                ++cnt;
                this.cctx.gridIO().sendOrderedMessage(node, topic, msg, plc, timeout, false);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg + ", nodeId=" + node.id() + ']');
                }
                return;
            }
            catch (ClusterTopologyCheckedException e) {
                throw e;
            }
            catch (IgniteCheckedException e) {
                if (this.cctx.discovery().node(node.id()) == null) {
                    throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e);
                }
                if (cnt == this.retryCnt) {
                    throw e;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send message to node (will retry): " + node.id());
                }
                U.sleep(this.retryDelay);
            }
        }
    }

    public long nextIoId() {
        return idGen.incrementAndGet();
    }

    void sendNoRetry(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
        assert (node != null);
        assert (msg != null);
        if (!this.onSend(msg, null)) {
            return;
        }
        msg.lastAffinityChangedTopologyVersion(this.cctx.exchange().lastAffinityChangedTopologyVersion(msg.topologyVersion()));
        try {
            this.cctx.gridIO().sendToGridTopic(node, GridTopic.TOPIC_CACHE, (Message)msg, plc);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
            }
        }
        catch (ClusterTopologyCheckedException e) {
            throw e;
        }
        catch (IgniteCheckedException e) {
            if (!this.cctx.discovery().alive(node.id())) {
                throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
            }
            throw e;
        }
    }

    public <Msg extends GridCacheMessage> void addCacheHandler(int hndId, Class<Msg> type, IgniteBiInClosure<UUID, ? super Msg> c) {
        assert (!type.isAssignableFrom(GridCacheGroupIdMessage.class)) : type;
        this.addHandler(hndId, type, c, this.cacheHandlers);
    }

    public <Msg extends GridCacheGroupIdMessage> void addCacheGroupHandler(int hndId, Class<Msg> type, IgniteBiInClosure<UUID, ? super Msg> c) {
        assert (!type.isAssignableFrom(GridCacheIdMessage.class)) : type;
        this.addHandler(hndId, type, c, this.grpHandlers);
    }

    private <Msg extends GridCacheMessage> void addHandler(int hndId, Class<Msg> type, IgniteBiInClosure<UUID, ? super Msg> c, MessageHandlers msgHandlers) {
        int msgIdx = this.messageIndex(type);
        if (msgIdx != -1) {
            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
                if (clsHandlers == null) {
                    clsHandlers = new IgniteBiInClosure[7];
                }
                if (clsHandlers[msgIdx] != null) {
                    return null;
                }
                clsHandlers[msgIdx] = c;
                return clsHandlers;
            });
            if (cacheClsHandlers == null) {
                throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId + ", type=" + type + ']');
            }
            return;
        }
        ListenerKey key2 = new ListenerKey(hndId, type);
        if (msgHandlers.clsHandlers.putIfAbsent(key2, c) != null) assert (false) : "Handler for class already registered [hndId=" + hndId + ", cls=" + type + ", old=" + msgHandlers.clsHandlers.get(key2) + ", new=" + c + ']';
        IgniteLogger log0 = this.log;
        if (log0 != null && log0.isTraceEnabled()) {
            log0.trace("Registered cache communication handler [hndId=" + hndId + ", type=" + type + ", msgIdx=" + msgIdx + ", handler=" + c + ']');
        }
    }

    void removeCacheHandlers(int cacheId) {
        this.removeHandlers(this.cacheHandlers, cacheId);
    }

    void removeCacheGroupHandlers(int grpId) {
        this.removeHandlers(this.grpHandlers, grpId);
    }

    private void removeHandlers(MessageHandlers msgHandlers, int hndId) {
        assert (hndId != 0);
        msgHandlers.idxClsHandlers.remove(hndId);
        Iterator iter = msgHandlers.clsHandlers.keySet().iterator();
        while (iter.hasNext()) {
            ListenerKey key = (ListenerKey)iter.next();
            if (key.msgCls.equals(GridDhtAffinityAssignmentResponse.class) || key.hndId != hndId) continue;
            iter.remove();
        }
    }

    public void removeHandler(boolean cacheGrp, int hndId, Class<? extends GridCacheMessage> type) {
        MessageHandlers msgHandlers = cacheGrp ? this.grpHandlers : this.cacheHandlers;
        msgHandlers.clsHandlers.remove(new ListenerKey(hndId, type));
    }

    private int messageIndex(Class<?> msgCls) {
        try {
            Integer msgIdx = (Integer)U.field(msgCls, "CACHE_MSG_IDX");
            if (msgIdx == null || msgIdx < 0) {
                return -1;
            }
            return msgIdx;
        }
        catch (IgniteCheckedException ignored) {
            return -1;
        }
    }

    public void addOrderedCacheHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) {
        this.addOrderedHandler(cctx, false, topic, c);
    }

    public void addOrderedCacheGroupHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) {
        this.addOrderedHandler(cctx, true, topic, c);
    }

    private void addOrderedHandler(GridCacheSharedContext cctx, boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
        MessageHandlers msgHandlers = cacheGrp ? this.grpHandlers : this.cacheHandlers;
        IgniteLogger log0 = this.log;
        if (msgHandlers.orderedHandlers.putIfAbsent(topic, c) == null) {
            cctx.gridIO().addMessageListener(topic, (GridMessageListener)new OrderedMessageListener(c));
            if (log0 != null && log0.isTraceEnabled()) {
                log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
            }
        } else if (log0 != null) {
            U.warn(log0, "Failed to register ordered cache communication handler because it is already registered for this topic [topic=" + topic + ", handler=" + c + ']');
        }
    }

    public void removeOrderedHandler(boolean cacheGrp, Object topic) {
        MessageHandlers msgHandlers;
        MessageHandlers messageHandlers = msgHandlers = cacheGrp ? this.grpHandlers : this.cacheHandlers;
        if (msgHandlers.orderedHandlers.remove(topic) != null) {
            this.cctx.gridIO().removeMessageListener(topic);
            if (this.log != null && this.log.isDebugEnabled()) {
                this.log.debug("Unregistered ordered cache communication handler for topic:" + topic);
            }
        } else if (this.log != null) {
            U.warn(this.log, "Failed to unregister ordered cache communication handler because it was not found for topic: " + topic);
        }
    }

    private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {
        if (this.cctx.localNodeId().equals(nodeId)) {
            return;
        }
        GridDeploymentInfo bean = cacheMsg.deployInfo();
        try {
            if (bean != null) {
                assert (this.depEnabled) : "Received deployment info while peer class loading is disabled [nodeId=" + nodeId + ", msg=" + cacheMsg + ']';
                this.cctx.deploy().p2pContext(nodeId, bean.classLoaderId(), bean.userVersion(), bean.deployMode(), bean.participants());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']');
                }
            }
            cacheMsg.finishUnmarshal(this.cctx, this.cctx.deploy().globalLoader());
        }
        catch (IgniteCheckedException e) {
            cacheMsg.onClassError(e);
        }
        catch (BinaryObjectException e) {
            cacheMsg.onClassError(new IgniteCheckedException(e));
        }
        catch (Error e) {
            if (cacheMsg.ignoreClassErrors() && X.hasCause((Throwable)e, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) {
                cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
            }
            throw e;
        }
    }

    private Object descriptorForMessage(GridCacheMessage msg) {
        if (msg instanceof GridCacheIdMessage) {
            return this.cctx.cache().cacheDescriptor(((GridCacheIdMessage)msg).cacheId());
        }
        if (msg instanceof GridCacheGroupIdMessage) {
            return this.cctx.cache().cacheGroupDescriptors().get(((GridCacheGroupIdMessage)msg).groupId());
        }
        return null;
    }

    @Override
    public void printMemoryStats() {
        X.println(">>> ", new Object[0]);
        X.println(">>> Cache IO manager memory stats [igniteInstanceName=" + this.cctx.igniteInstanceName() + ']', new Object[0]);
        X.println(">>>   cacheClsHandlersSize: " + this.cacheHandlers.clsHandlers.size(), new Object[0]);
        X.println(">>>   cacheOrderedHandlersSize: " + this.cacheHandlers.orderedHandlers.size(), new Object[0]);
        X.println(">>>   cacheGrpClsHandlersSize: " + this.grpHandlers.clsHandlers.size(), new Object[0]);
        X.println(">>>   cacheGrpOrderedHandlersSize: " + this.grpHandlers.orderedHandlers.size(), new Object[0]);
    }

    private static class ListenerKey {
        private int hndId;
        private Class<? extends GridCacheMessage> msgCls;

        private ListenerKey(int hndId, Class<? extends GridCacheMessage> msgCls) {
            this.hndId = hndId;
            this.msgCls = msgCls;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof ListenerKey)) {
                return false;
            }
            ListenerKey that = (ListenerKey)o;
            return this.hndId == that.hndId && this.msgCls.equals(that.msgCls);
        }

        public int hashCode() {
            int res = this.hndId;
            res = 31 * res + this.msgCls.hashCode();
            return res;
        }
    }

    private class OrderedMessageListener
    implements GridMessageListener {
        private final IgniteBiInClosure<UUID, GridCacheMessage> c;

        OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
            this.c = c;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onMessage(UUID nodeId, Object msg, byte plc) {
            if (GridCacheIoManager.this.log.isDebugEnabled()) {
                GridCacheIoManager.this.log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
            }
            Lock lock = GridCacheIoManager.this.rw.readLock();
            lock.lock();
            try {
                GridCacheMessage cacheMsg = (GridCacheMessage)msg;
                GridCacheIoManager.this.onMessage0(nodeId, cacheMsg, this.c, plc);
            }
            finally {
                lock.unlock();
            }
        }
    }

    static class MessageHandlers {
        volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new ConcurrentHashMap<Integer, IgniteBiInClosure[]>();
        ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> clsHandlers = new ConcurrentHashMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>();
        ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = new ConcurrentHashMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>>();

        MessageHandlers() {
        }
    }
}

