/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.dr;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.configuration.Factory;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.net.ssl.SSLContext;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridBufferedParser;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioParser;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.dr.DrSender;
import org.gridgain.grid.dr.DrSenderConnection;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.DrSenderMBean;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopKey;
import org.gridgain.grid.internal.processors.cache.dr.DrSenderGroupNodeStopKey;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderHealthCheckScheduler;
import org.gridgain.grid.internal.processors.dr.DrSenderMBeanImpl;
import org.gridgain.grid.internal.processors.dr.DrSenderRemoteDataCenter;
import org.gridgain.grid.internal.processors.dr.DrSenderRemoteDataCenterNode;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequestEntry;
import org.gridgain.grid.internal.processors.dr.store.DrCommonStoreManager;
import org.gridgain.grid.internal.processors.dr.store.DrReplicaStoreManager;
import org.gridgain.grid.internal.processors.dr.store.DrStoreManager;
import org.jetbrains.annotations.Nullable;

class DrSenderImpl
implements DrSender {
    static final String MBEAN_NAME = "Sender hub";
    static final String MBEAN_GROUP_NAME = "Data center replication";
    private static final AtomicLong ID_GEN = new AtomicLong();
    private final GridKernalContext kctx;
    private final IgniteLogger log;
    private final DrProcessor proc;
    private final DrSenderConfiguration sndCfg;
    private final List<DrSenderRemoteDataCenter> rmtDataCenters = new ArrayList<DrSenderRemoteDataCenter>();
    private final DrStoreManager storeMgr;
    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
    private final Queue<IgniteBiTuple<UUID, DrInternalRequest>> pendingReqs = new ConcurrentLinkedDeque<IgniteBiTuple<UUID, DrInternalRequest>>();
    private Throwable err;
    private GridNioServer<byte[]> nioSrv;
    private GridWorker pendingClient;
    private ObjectName sndHubMBean;
    private SenderHubState state = SenderHubState.STARTING;
    private final BlockingQueue<GridRunnableFuture> taskQueue = new LinkedBlockingDeque<GridRunnableFuture>();
    private GridWorker taskExecWorker;
    private DrSenderHealthCheckScheduler healthCheckScheduler;
    private final GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>(){

        public void onConnected(GridNioSession ses) {
            assert (!ses.accepted());
        }

        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
        }

        public void onMessage(GridNioSession ses, byte[] msg) {
            DrSenderRemoteDataCenterNode node = (DrSenderRemoteDataCenterNode)ses.meta(DrSenderRemoteDataCenterNode.DR_SENDER_NODE);
            assert (node != null);
            DrSenderImpl.this.submit(() -> node.processInMessage(ses, msg));
        }
    };
    private final ReadWriteLock taskQueueLock = new ReentrantReadWriteLock();
    private final GridWorkerListener workerLsnr = new GridWorkerListenerAdapter(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onStopped(GridWorker w) {
            Lock writeLock = DrSenderImpl.this.taskQueueLock.writeLock();
            writeLock.lock();
            try {
                for (GridRunnableFuture fut : DrSenderImpl.this.taskQueue) {
                    try {
                        fut.cancel();
                    }
                    catch (Throwable e) {
                        U.warn((IgniteLogger)DrSenderImpl.this.log, (Object)("Error cancelling dr task: " + e));
                    }
                }
                DrSenderImpl.this.taskQueue.clear();
            }
            finally {
                writeLock.unlock();
            }
        }
    };

    DrSenderImpl(DrProcessor drProc) {
        assert (drProc != null);
        this.proc = drProc;
        this.kctx = drProc.kernalContext();
        this.log = drProc.context().log(DrSenderImpl.class);
        this.sndCfg = drProc.ggConfig().getDrSenderConfiguration();
        assert (this.sndCfg != null);
        long fstBufferSize = this.sndCfg.getFullStateTransferBufferSize();
        assert (fstBufferSize > 0L);
        this.storeMgr = this.sndCfg.getStore() == null ? new DrReplicaStoreManager(this.kctx, this.proc, fstBufferSize, this.sndCfg.getConnectionConfiguration()) : new DrCommonStoreManager(this.kctx, this.proc, fstBufferSize, this.sndCfg.getStore());
        this.healthCheckScheduler = new DrSenderHealthCheckScheduler(drProc);
    }

    void start() throws IgniteCheckedException {
        this.stateLock.writeLock().lock();
        try {
            if (this.state == SenderHubState.STARTING) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Starting DR sender hub: " + this.getConfiguration());
                }
                this.proc.registerSenderHub(this);
                this.storeMgr.start();
            }
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
    }

    @Override
    public boolean isGlobalStore() {
        return this.storeMgr.isGlobalStore();
    }

    @Override
    public void clearGlobalStore() {
        if (!this.isGlobalStore()) {
            throw new IllegalStateException("Global store is not configured for this sender hub.");
        }
        try {
            this.pause();
            this.storeMgr.clearGlobalStore();
            for (DrSenderRemoteDataCenter dc : this.rmtDataCenters) {
                dc.reset();
            }
            this.resume();
        }
        catch (Exception e) {
            throw new IgniteException("Failed to clear global sender hubs store: " + this.storeMgr, (Throwable)e);
        }
    }

    @Override
    public void clearFullStateTransferBuffer() {
        try {
            this.pause();
            this.storeMgr.clearFullStateTransferBuffer();
            for (DrSenderRemoteDataCenter dc : this.rmtDataCenters) {
                dc.reset();
            }
            this.resume();
        }
        catch (Exception e) {
            throw new IgniteException("Failed to clear full state transfer buffer: " + this.storeMgr, (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onKernalStart() throws IgniteCheckedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting DR sender hub: " + this.sndCfg);
        }
        this.stateLock.writeLock().lock();
        try {
            if (this.state == SenderHubState.STOPPED) {
                this.state = SenderHubState.STARTING;
            }
            this.taskExecWorker = new DrTaskExecWorker(this.log, this.workerLsnr);
            new IgniteThread(this.taskExecWorker).start();
            this.healthCheckScheduler.start();
            if (this.state == SenderHubState.STARTING) {
                this.storeMgr.onKernalStart();
                for (DrSenderConnectionConfiguration connCfg : this.sndCfg.getConnectionConfiguration()) {
                    this.rmtDataCenters.add(new DrSenderRemoteDataCenter(this.kctx, this.proc, connCfg, this.storeMgr));
                }
                this.registerMBean();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Starting DR sender hub TCP client [dataCenterId=" + this.proc.ggConfig().getDataCenterId() + "]");
                }
                ArrayList<Object> filters = new ArrayList<Object>();
                filters.add(new GridNioCodecFilter((GridNioParser)new GridBufferedParser(DrUtils.DR_SND_DIRECT_BUF, DrUtils.DR_BYTE_ORDER), this.log, false));
                filters.add(new GridConnectionBytesVerifyFilter(this.log));
                Factory<SSLContext> sslCtxFactory = this.proc.getSslContextFactory(this.sndCfg);
                if (sslCtxFactory != null) {
                    GridNioSslFilter sslFilter = new GridNioSslFilter((SSLContext)sslCtxFactory.create(), DrUtils.DR_SND_DIRECT_BUF, DrUtils.DR_BYTE_ORDER, this.log);
                    filters.add(sslFilter);
                }
                this.nioSrv = GridNioServer.builder().port(-1).listener(this.lsnr).logger(this.log).selectorCount(1).igniteInstanceName(this.proc.igniteInstanceName()).serverName("dr-snd").byteOrder(DrUtils.DR_BYTE_ORDER).tcpNoDelay(true).directBuffer(DrUtils.DR_SND_DIRECT_BUF).idleTimeout(Long.MAX_VALUE).filters(filters.toArray(new GridNioFilter[filters.size()])).writeTimeout(Long.MAX_VALUE).socketSendBufferSize(this.sndCfg.getSocketSendBufferSize()).socketReceiveBufferSize(this.sndCfg.getSocketReceiveBufferSize()).serverName("dr-snd").build();
                this.nioSrv.start();
                if (!this.pendingReqs.isEmpty()) {
                    this.pendingClient = new PendingRequestsClient();
                    new IgniteThread(this.pendingClient).start();
                }
                for (DrSenderRemoteDataCenter dc : this.rmtDataCenters) {
                    dc.start(this.nioSrv, this.healthCheckScheduler);
                }
                this.state = SenderHubState.STARTED;
            }
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
    }

    void onKernalStop() {
        this.stateLock.writeLock().lock();
        try {
            if (this.state == SenderHubState.STOPPED) {
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Stopping DR sender hub ...");
            }
            this.state = SenderHubState.STOPPED;
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
        this.unregisterMBean();
        if (this.nioSrv != null) {
            this.nioSrv.stop();
        }
        if (this.pendingClient != null) {
            U.cancel((GridWorker)this.pendingClient);
            U.join((GridWorker)this.pendingClient, (IgniteLogger)this.log);
            this.pendingReqs.clear();
        }
        try {
            this.healthCheckScheduler.stop();
        }
        catch (IgniteCheckedException ice) {
            U.warn((IgniteLogger)this.log, (Object)("Error stopping health checker scheduler: " + (Object)((Object)ice)));
        }
        U.cancel((GridWorker)this.taskExecWorker);
        U.join((GridWorker)this.taskExecWorker, (IgniteLogger)this.log);
        for (DrSenderRemoteDataCenter replica : this.rmtDataCenters) {
            replica.stop();
        }
        this.rmtDataCenters.clear();
        this.storeMgr.onKernalStop();
    }

    void stop() {
        this.storeMgr.stop();
        this.kctx.io().removeMessageListener((Object)CU.replicationTopicSend());
    }

    private void registerMBean() {
        if (U.IGNITE_MBEANS_DISABLED) {
            return;
        }
        try {
            this.sndHubMBean = U.registerMBean((MBeanServer)this.proc.config().getMBeanServer(), (String)this.proc.igniteInstanceName(), (String)MBEAN_GROUP_NAME, (String)MBEAN_NAME, (Object)new DrSenderMBeanImpl(this.proc, this), DrSenderMBean.class);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Registered DR sender hub MBean: " + this.sndHubMBean);
            }
        }
        catch (JMException e) {
            U.error((IgniteLogger)this.log, (Object)"Failed to register DR sender hub MBean.", (Throwable)e);
        }
    }

    private void unregisterMBean() {
        if (this.sndHubMBean == null) {
            return;
        }
        assert (!U.IGNITE_MBEANS_DISABLED);
        try {
            this.proc.config().getMBeanServer().unregisterMBean(this.sndHubMBean);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unregistered DR sender hub MBean: " + this.sndHubMBean);
            }
        }
        catch (JMException e) {
            U.error((IgniteLogger)this.log, (Object)("Failed to unregister DR sender hub MBean: " + this.sndHubMBean), (Throwable)e);
        }
    }

    /*
     * Exception decompiling
     */
    void onReplicationRequest(UUID nodeId, DrInternalRequest req) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private boolean isFstRequest(DrInternalRequest req) {
        return req.stateTransferId() != null;
    }

    private static boolean isThrottlingSupported(ClusterNode node) {
        return node != null && IgniteFeatures.nodeSupports(null, (ClusterNode)node, (IgniteFeatures)IgniteFeatures.DR_DATA_NODE_SMART_THROTTLING);
    }

    private boolean reschedule(UUID sender, DrInternalRequest req) {
        if (req.force()) {
            return false;
        }
        if (!DrSenderImpl.isThrottlingSupported(this.kctx.discovery().node(sender))) {
            return false;
        }
        ArrayList<DrSenderRemoteDataCenter> likelyOverflowed = new ArrayList<DrSenderRemoteDataCenter>(this.rmtDataCenters.size());
        for (DrSenderRemoteDataCenter rmtDataCenter : this.rmtDataCenters) {
            if (rmtDataCenter.availablePermits() != 0) continue;
            likelyOverflowed.add(rmtDataCenter);
        }
        if (likelyOverflowed.isEmpty()) {
            return false;
        }
        if (!F.isEmpty(req.dataCenterIds())) {
            for (DrSenderRemoteDataCenter dc : likelyOverflowed) {
                if (!req.dataCenterIds().contains(dc.dataCenterId())) continue;
                return true;
            }
            return false;
        }
        ArrayList<Byte> dcIds = new ArrayList<Byte>(req.entries().size());
        for (DrInternalRequestEntry drInternalRequestEntry : req.entries()) {
            dcIds.add(drInternalRequestEntry.dataCenterId());
        }
        for (DrSenderRemoteDataCenter dc : likelyOverflowed) {
            if (dc.ignoreList().containsAll(dcIds)) continue;
            return true;
        }
        return false;
    }

    void printMemoryStats() {
        X.println((String)">>>", (Object[])new Object[0]);
        X.println((String)(">>> DR sender hub memory stats [igniteInstanceName=" + this.proc.igniteInstanceName() + ']'), (Object[])new Object[0]);
        for (DrSenderRemoteDataCenter replica : this.rmtDataCenters) {
            replica.printMemoryStats();
        }
    }

    private void storeIfNeeded(DrInternalRequest req, final UUID nodeId) throws IgniteCheckedException {
        CountDownFuture reqFut;
        assert (!this.isFstRequest(req) || req.dataCenterIds() != null) : "Target DCs must be set for full state transfer batch";
        boolean fstRequest = this.isFstRequest(req);
        final String cacheName = req.cacheName();
        if (fstRequest) {
            final long reqId = req.id();
            reqFut = new CountDownFuture(req.entries().size()){

                protected void afterDone() {
                    DrSenderImpl.this.proc.sendReplicationResponse(nodeId, cacheName, reqId, this.error(), null);
                }
            };
        } else {
            reqFut = null;
        }
        ArrayList<DrSenderRemoteDataCenter> targets = new ArrayList<DrSenderRemoteDataCenter>(this.rmtDataCenters.size());
        for (DrInternalRequestEntry entry : req.entries()) {
            byte srcDataCenterId = entry.dataCenterId();
            targets.clear();
            for (DrSenderRemoteDataCenter replica : this.rmtDataCenters) {
                if (!this.shouldReplicateTo(srcDataCenterId, replica, req.dataCenterIds())) continue;
                targets.add(replica);
            }
            if (targets.isEmpty()) {
                if (this.log.isTraceEnabled()) {
                    Object[] targetDCs = req.dataCenterIds() != null ? req.dataCenterIds().toArray() : null;
                    this.log.trace("A DR send request has been filtered: reqID=" + req.id() + ", fstID=" + req.stateTransferId() + ", targetDC=" + Arrays.toString(targetDCs) + ", entries=" + entry.entryCount());
                }
                this.proc.metrics().onSenderHubEntryFiltered(cacheName, entry.entryCount(), entry.dataBytes().length);
                continue;
            }
            IgniteUuid extRequestId = new IgniteUuid(this.kctx.localNodeId(), ID_GEN.incrementAndGet());
            byte[] extReq = this.prepareExternalRequest(extRequestId, cacheName, srcDataCenterId, entry.entryCount(), entry.dataBytes(), entry.dataLength());
            byte[] targetArr = this.dataCenterIds(targets);
            if (fstRequest) {
                this.storeMgr.storeFSTBatch(targetArr, extReq, entry.entryCount(), req.stateTransferId(), (GridFutureAdapter<Void>)reqFut);
            } else {
                this.storeMgr.storeRegularBatch(targetArr, extReq, entry.entryCount());
            }
            if (!this.log.isTraceEnabled()) continue;
            this.log.trace("A DR send request has been saved: reqID=" + extRequestId + ", fstID=" + req.stateTransferId() + ", targetDC=" + Arrays.toString(targetArr) + ", entries=" + entry.entryCount() + ", dataLen=" + extReq.length);
        }
    }

    private byte[] dataCenterIds(List<DrSenderRemoteDataCenter> targets) {
        byte[] ids = new byte[targets.size()];
        for (int i = 0; i < targets.size(); ++i) {
            ids[i] = targets.get(i).dataCenterId();
        }
        return ids;
    }

    @Override
    public void pause() {
        for (DrSenderRemoteDataCenter rmtDc : this.rmtDataCenters) {
            this.stopRemoteDC(rmtDc);
        }
    }

    @Override
    public void resume() {
        for (DrSenderRemoteDataCenter rmtDc : this.rmtDataCenters) {
            this.resumeRemoteDC(rmtDc);
        }
    }

    @Override
    public void pause(byte dcId) {
        DrSenderImpl.validateDcId(dcId);
        for (DrSenderRemoteDataCenter rmtDc : this.rmtDataCenters) {
            if (rmtDc.dataCenterId() != dcId) continue;
            this.stopRemoteDC(rmtDc);
            return;
        }
        this.log.warning("Can't pause replication to remote DC. Data center is not configured: dcId=" + dcId);
    }

    @Override
    public boolean paused(byte dcId) {
        DrSenderImpl.validateDcId(dcId);
        for (DrSenderRemoteDataCenter rmtDc : this.rmtDataCenters) {
            if (rmtDc.dataCenterId() != dcId) continue;
            return rmtDc.paused();
        }
        this.log.warning("Can't acquire state for remote DC. Data center is not configured: dcId=" + dcId);
        return false;
    }

    @Override
    public void resume(byte dcId) {
        DrSenderImpl.validateDcId(dcId);
        for (DrSenderRemoteDataCenter rmtDc : this.rmtDataCenters) {
            if (rmtDc.dataCenterId() != dcId) continue;
            this.resumeRemoteDC(rmtDc);
            return;
        }
        this.log.warning("Can't resume replication to remote DC. Data center is not configured: dcId=" + dcId);
    }

    private void stopRemoteDC(DrSenderRemoteDataCenter rmtDc) {
        rmtDc.pause();
        if (this.log.isInfoEnabled()) {
            this.log.info("Pause replication to remote DC: remoteDcId=" + rmtDc.dataCenterId());
        }
    }

    private void resumeRemoteDC(DrSenderRemoteDataCenter rmtDc) {
        rmtDc.resume();
        if (this.log.isInfoEnabled()) {
            this.log.info("Resume replication to remote DC: remoteDcId=" + rmtDc.dataCenterId());
        }
    }

    private boolean shouldReplicateTo(byte srcDataCenterId, DrSenderRemoteDataCenter replica, @Nullable Collection<Byte> destDataCenterIds) {
        if (F.isEmpty(destDataCenterIds)) {
            return !replica.ignoreList().contains(srcDataCenterId);
        }
        return destDataCenterIds.contains(replica.dataCenterId());
    }

    private byte[] prepareExternalRequest(IgniteUuid reqId, String cacheName, byte dataCenterId, int entryCnt, byte[] dataBytes, int dataSize) throws IgniteCheckedException {
        DrExternalBatchRequest req = new DrExternalBatchRequest(reqId, cacheName, dataCenterId, entryCnt, dataBytes, dataSize);
        return DrUtils.marshal(req);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopOnError(Throwable e) {
        block10: {
            this.stateLock.writeLock().lock();
            try {
                assert (this.state != SenderHubState.STARTING);
                if (this.state != SenderHubState.STARTED) break block10;
                this.err = e;
                try {
                    HashMap map;
                    CacheDrSenderHubStopInfo info = new CacheDrSenderHubStopInfo(this.err);
                    if (this.proc.ggConfig().isDrUseCacheNames()) {
                        map = U.newHashMap((int)this.sndCfg.getCacheNames().length);
                        for (String cache : this.sndCfg.getCacheNames()) {
                            map.put(new CacheDrSenderHubStopKey(cache, this.proc.localNodeId()), info);
                        }
                    } else {
                        String[] sndGroups = DrUtils.effectiveSenderGroups(this.sndCfg);
                        map = U.newHashMap((int)sndGroups.length);
                        for (String grp : sndGroups) {
                            map.put(new DrSenderGroupNodeStopKey(grp, this.proc.localNodeId()), info);
                        }
                    }
                    this.log.error("Stop DR sender due to an error: ", e);
                    this.kctx.cache().utilityCache().putAll((Map)map);
                }
                catch (IgniteCheckedException putErr) {
                    U.warn((IgniteLogger)this.log, (Object)("Failed to create DR sender hub stop entries: " + (Object)((Object)putErr)));
                }
                this.state = SenderHubState.ERROR;
            }
            finally {
                this.stateLock.writeLock().unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    IgniteInternalFuture<Void> submit(Runnable task) {
        Lock readLock = this.taskQueueLock.readLock();
        try {
            if (!readLock.tryLock() || this.taskExecWorker.isDone()) {
                throw new IgniteException("Failed to submit task to the queue: task execution worker is stopped.");
            }
            GridRunnableFuture fut = new GridRunnableFuture(task);
            if (!this.taskQueue.offer(fut)) {
                assert (false) : "Should never happen.";
                throw new IgniteException("Failed to submit task to the queue.");
            }
            GridRunnableFuture gridRunnableFuture = fut;
            return gridRunnableFuture;
        }
        finally {
            readLock.unlock();
        }
    }

    @Override
    public Collection<DrSenderConnection> connections() {
        this.stateLock.readLock().lock();
        try {
            List<DrSenderConnection> list = Collections.unmodifiableList(this.rmtDataCenters);
            return list;
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override
    public boolean hasConnection(byte dcId) {
        return this.getConnection(dcId) != null;
    }

    @Override
    public DrSenderConnection connection(byte dcId) throws IllegalStateException {
        DrSenderConnection conn = this.getConnection(dcId);
        if (conn == null) {
            throw new IllegalStateException("Data center replication sender connection is not configured for data center ID: " + dcId);
        }
        return conn;
    }

    @Override
    public DrSenderConfiguration getConfiguration() {
        return this.sndCfg;
    }

    @Nullable
    private DrSenderConnection getConnection(byte dcId) {
        for (DrSenderRemoteDataCenter dc : this.rmtDataCenters) {
            if (dc.dataCenterId() != dcId) continue;
            return dc;
        }
        return null;
    }

    private static void validateDcId(byte dcId) {
        if (dcId <= 0 || dcId >= 32) {
            throw new IllegalArgumentException("Argument is invalid: dcId should be in range 1..31 (inclusive) but was " + dcId);
        }
    }

    private class DrTaskExecWorker
    extends GridWorker {
        DrTaskExecWorker(IgniteLogger log, GridWorkerListener workerLsnr) {
            super(DrSenderImpl.this.kctx.igniteInstanceName(), "dr-sender-task-exec-worker", log, workerLsnr);
        }

        protected void body() throws InterruptedException {
            while (!this.isCancelled()) {
                GridRunnableFuture task = (GridRunnableFuture)DrSenderImpl.this.taskQueue.take();
                try {
                    task.run();
                }
                catch (Throwable e) {
                    if (this.isCancelled || DrSenderImpl.this.kctx.isStopping()) {
                        return;
                    }
                    if (X.hasCause((Throwable)e, (Class[])new Class[]{CacheStoppedException.class})) continue;
                    this.log.error("Failed to process DR sender task, sender hub will be stopped.", e);
                    DrSenderImpl.this.stopOnError(e);
                    throw e;
                }
            }
        }
    }

    private static class GridRunnableFuture
    extends GridFutureAdapter<Void>
    implements Runnable {
        private final Runnable task;

        private GridRunnableFuture(Runnable task) {
            this.task = task;
        }

        public boolean cancel() {
            return this.onCancelled();
        }

        @Override
        public void run() {
            try {
                if (this.isDone()) {
                    return;
                }
                this.task.run();
                this.onDone();
            }
            catch (Throwable t) {
                this.onDone(t);
                throw t;
            }
        }
    }

    private static enum SenderHubState {
        STARTING,
        STARTED,
        ERROR,
        STOPPED;

    }

    private class PendingRequestsClient
    extends GridWorker {
        private PendingRequestsClient() {
            super(DrSenderImpl.this.proc.igniteInstanceName(), "dr-pending-requests-client", DrSenderImpl.this.log);
        }

        protected void body() {
            IgniteBiTuple req;
            while (!this.isCancelled() && (req = (IgniteBiTuple)DrSenderImpl.this.pendingReqs.poll()) != null) {
                DrSenderImpl.this.onReplicationRequest((UUID)req.get1(), (DrInternalRequest)req.get2());
            }
        }
    }
}

