/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainInClosure;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.gridgain.grid.cache.dr.CacheDrEntryFilter;
import org.gridgain.grid.cache.dr.CacheDrMBean;
import org.gridgain.grid.cache.dr.CacheDrPauseReason;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.cache.dr.CacheDrStateTransfer;
import org.gridgain.grid.cache.dr.CacheDrStatus;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainCacheConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.events.DrCacheReplicationEvent;
import org.gridgain.grid.internal.GridPluginUtils;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrEntryImpl;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrHandler;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMBeanAdapter;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMetrics;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrResultType;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderMetricsAdapter;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferHandler;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultKey;
import org.gridgain.grid.internal.processors.cache.dr.DrSenderGroupNodeStopKey;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.cache.dr.Permit;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderAttributes;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferTask;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class GridGainCacheDrManager
extends GridCacheManagerAdapter
implements GridCacheDrManager {
    private static final int TX_RETRIES_WITH_NO_THROTTLING = 10;
    private static final CacheDrSenderConfiguration DFLT_CACHE_SND_CFG = new CacheDrSenderConfiguration().setSenderGroup("<default>");
    private final CachePluginContext<GridGainCacheConfiguration> ggCctx;
    final DrProcessor drProc;
    private GridGainCacheConfiguration ccfg;
    private final CacheDrSenderConfiguration sndCfg;
    private final byte dataCenterId;
    private IgniteInternalCache<Object, Object> sysCache;
    private UUID sysCacheQryId;
    private CacheDrHandler drHnd;
    private CacheDrStateTransferHandler fstHnd;
    private boolean sndEnabled;
    private final GridLocalEventListener discoLsnr = new DiscoveryListener();
    private volatile CacheDrPauseInfo stopInfo;
    private volatile boolean stopping;
    private final CopyOnWriteArrayList<ClusterNode> sndHubs = new CopyOnWriteArrayList();
    private final ThreadLocalRandom sndHubsRnd = ThreadLocalRandom.current();
    private final AtomicLong sndHubIdx = new AtomicLong();
    private final ReadWriteLock sndHubLock = new ReentrantReadWriteLock();
    private volatile boolean sndHubInit;
    private final GridFutureAdapter<?> sndHubInitFut = new GridFutureAdapter();
    private volatile DrSenderAttributes sndHubAttrs;
    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
    protected volatile CacheDrMetrics metrics;
    private ObjectName cacheMBean;
    private ClusterNode locSnd;
    private final boolean useCacheNames;
    private final boolean forceOverrideCacheSndCfg;
    private DrControlTaskExecutor controlTask;

    public GridGainCacheDrManager(CachePluginContext<GridGainCacheConfiguration> ggCctx, DrProcessor drProc) {
        boolean bl = this.forceOverrideCacheSndCfg = IgniteSystemProperties.getInteger((String)"GG_DR_FORCE_DC_ID", (int)0) > 0;
        assert (ggCctx != null);
        this.ggCctx = ggCctx;
        this.drProc = drProc;
        GridGainConfiguration ggCfg = GridPluginUtils.gridPluginConfiguration(ggCctx.grid().configuration());
        if (ggCfg == null) {
            throw new CacheException("GridGain plugin configuration was not found. GridGain plugin has to be configured if DR feature is used.");
        }
        this.dataCenterId = ggCfg.getDataCenterId();
        this.useCacheNames = ggCfg.isDrUseCacheNames();
        assert (this.dataCenterId >= 0);
        this.ccfg = (GridGainCacheConfiguration)GridCacheUtils.cachePluginConfiguration((CacheConfiguration)ggCctx.igniteCacheConfiguration(), GridGainCacheConfiguration.class);
        if (this.forceOverrideCacheSndCfg && !CU.isSystemCache((String)ggCctx.igniteCacheConfiguration().getName())) {
            this.overrideCacheDrConfiguration();
        }
        this.sndCfg = this.ccfg != null ? this.ccfg.getDrSenderConfiguration() : null;
    }

    protected void start0() throws IgniteCheckedException {
        this.sndEnabled = this.sndCfg != null && !this.cctx.isNear();
        boolean isDrSndCache = this.ccfg != null && this.ccfg.getDrSenderConfiguration() != null;
        boolean isDrRcvCache = this.ccfg != null;
        this.metrics = new CacheDrMetrics(isDrSndCache, isDrRcvCache);
        if (this.sndEnabled) {
            assert (this.sndCfg != null);
            this.registerMBean();
            this.injectResources(this.sndCfg.getEntryFilter());
            U.startLifecycleAware(Collections.singleton(this.sndCfg.getEntryFilter()));
            this.controlTask = new DrControlTaskExecutor();
            this.cctx.events().addListener(this.discoLsnr, new int[]{10, 12, 11});
            this.drHnd = new CacheDrHandler(this.cctx, this, this.drProc, this.busyLock, this.sndCfg);
            this.drHnd.onStart();
            this.fstHnd = new CacheDrStateTransferHandler(this.cctx, this.sndCfg, this.busyLock, this);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Started cache data center replication manager [cache=" + this.cctx.name() + ", configuration=" + this.sndCfg + ']');
            }
            if (this.sndCfg.isPreferLocalSender() && this.hasLocalSender()) {
                this.locSnd = this.ggCctx.localNode();
            }
        }
    }

    private boolean hasLocalSender() {
        GridGainConfiguration ggCfg = GridPluginUtils.gridPluginConfiguration(this.cctx.kernalContext().config());
        if (ggCfg == null || ggCfg.getDrSenderConfiguration() == null) {
            return false;
        }
        DrSenderConfiguration sndHubCfg = ggCfg.getDrSenderConfiguration();
        if (this.useCacheNames) {
            assert (!F.isEmpty((Object[])sndHubCfg.getCacheNames()));
            for (String name : sndHubCfg.getCacheNames()) {
                if (!F.eq((Object)name, (Object)this.cctx.name())) continue;
                return true;
            }
        } else {
            assert (F.isEmpty((Object[])sndHubCfg.getCacheNames())) : "cache names are not allowed.";
            String sndGroup = DrUtils.effectiveSenderGroup(this.sndCfg);
            for (String grpName : DrUtils.effectiveSenderGroups(sndHubCfg)) {
                if (!F.eq((Object)grpName, (Object)sndGroup)) continue;
                return true;
            }
        }
        return false;
    }

    private void overrideCacheDrConfiguration() {
        if (this.ccfg == null) {
            this.ccfg = new GridGainCacheConfiguration();
            Object[] origCfgs = this.ggCctx.igniteCacheConfiguration().getPluginConfigurations();
            if (F.isEmpty((Object[])origCfgs)) {
                this.ggCctx.igniteCacheConfiguration().setPluginConfigurations(new CachePluginConfiguration[]{this.ccfg});
            } else {
                CachePluginConfiguration[] cpCfgs = new CachePluginConfiguration[origCfgs.length + 1];
                System.arraycopy(origCfgs, 0, cpCfgs, 0, origCfgs.length);
                cpCfgs[origCfgs.length] = this.ccfg;
                this.ggCctx.igniteCacheConfiguration().setPluginConfigurations(cpCfgs);
            }
        }
        if (this.ccfg.getDrSenderConfiguration() == null) {
            this.ccfg.setDrSenderConfiguration(DFLT_CACHE_SND_CFG);
        }
    }

    private void registerMBean() {
        if (U.IGNITE_MBEANS_DISABLED) {
            return;
        }
        try {
            this.cacheMBean = U.registerMBean((MBeanServer)this.cctx.gridConfig().getMBeanServer(), (String)this.cctx.igniteInstanceName(), (String)this.cctx.cache().name(), (String)"Cache data replication", (Object)new CacheDrMBeanAdapter(this), CacheDrMBean.class);
        }
        catch (JMException e) {
            U.error((IgniteLogger)this.log, (Object)"Failed to register cache MBean.", (Throwable)e);
        }
    }

    private void unregisterMBean() {
        if (this.cacheMBean == null) {
            return;
        }
        assert (!U.IGNITE_MBEANS_DISABLED);
        try {
            this.cctx.gridConfig().getMBeanServer().unregisterMBean(this.cacheMBean);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unregistered cache MBean: " + this.cacheMBean);
            }
        }
        catch (JMException e) {
            U.error((IgniteLogger)this.log, (Object)("Failed to unregister cache MBean: " + this.cacheMBean), (Throwable)e);
        }
    }

    private void injectResources(@Nullable Object obj) throws IgniteCheckedException {
        if (obj != null) {
            this.cctx.kernalContext().resource().injectGeneric(obj);
            this.cctx.kernalContext().resource().injectCacheName(obj, this.cctx.config().getName());
        }
    }

    protected void onKernalStart0() throws IgniteCheckedException {
        this.sysCache = this.cctx.kernalContext().cache().utilityCache();
        if (this.sndEnabled) {
            assert (this.sysCache.configuration().getCacheMode() == CacheMode.REPLICATED);
            this.fstHnd.onKernalStart(this.sysCache);
            if (!this.cctx.gridConfig().isClientMode().booleanValue()) {
                this.sysCacheQryId = this.sysCache.context().continuousQueries().executeInternalQuery((CacheEntryUpdatedListener)new SystemCacheUpdatedListener(), (CacheEntryEventSerializableFilter)(this.useCacheNames ? new DrEntryEventFilter(this.ggCctx.igniteCacheConfiguration().getName()) : new DrGroupControlEventFilter(DrUtils.effectiveSenderGroup(this.sndCfg), this.ggCctx.igniteCacheConfiguration().getName())), true, true, false, false);
            }
            GridCacheSharedContext sctx = this.cctx.shared();
            GridDhtPartitionsExchangeFuture topFuture = sctx.exchange().lastTopologyFuture();
            assert (topFuture != null) : "DR Worker should start after join to topology (last exchange future is null)";
            IgniteInternalFuture affinityReadyFuture = sctx.exchange().affinityReadyFuture(topFuture.initialVersion());
            affinityReadyFuture.listen((IgniteInClosure & Serializable)f -> {
                try {
                    f.get();
                    this.drProc.submit(this.controlTask);
                }
                catch (Exception e) {
                    throw new IgniteException("Failed to wait for affinity ready future [topVer=" + topFuture.initialVersion() + "]", (Throwable)e);
                }
            });
        }
    }

    protected void onKernalStop0(boolean cancel) {
        this.unregisterMBean();
        if (this.sndEnabled) {
            this.stopping = true;
            this.cctx.events().removeListener(this.discoLsnr);
            if (this.sysCacheQryId != null) {
                this.sysCache.context().continuousQueries().cancelInternalQuery(this.sysCacheQryId);
            }
            if (this.controlTask != null) {
                this.controlTask.stop();
            }
            if (this.fstHnd != null) {
                this.fstHnd.onKernalStop();
            }
            if (this.drHnd != null) {
                this.drHnd.onKernalStop();
            }
            this.busyLock.block();
            if (!this.sndHubInitFut.isDone()) {
                this.sndHubInitFut.onDone((Throwable)new IgniteCheckedException("Failed to initialize send hubs because grid is stopping."));
            }
            if (this.fstHnd != null) {
                this.fstHnd.stop();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Stopped cache replication manager.");
            }
        }
    }

    protected void stop0(boolean cancel, boolean destroy) {
        if (this.sndEnabled) {
            U.stopLifecycleAware((IgniteLogger)this.log, Collections.singleton(this.sndCfg.getEntryFilter()));
        }
    }

    public byte dataCenterId() {
        return this.dataCenterId;
    }

    public void replicate(KeyCacheObject key, @Nullable CacheObject val, long ttl, long expireTime, GridCacheVersion ver, GridDrType drType, AffinityTopologyVersion topVer) throws IgniteCheckedException {
        assert (this.sndEnabled);
        assert (!this.cctx.localNode().isClient());
        GridCacheRawVersionedEntry entry = new GridCacheRawVersionedEntry(key, val, ttl, expireTime, ver.conflictVersion());
        this.replicate(entry, drType, topVer);
    }

    private void replicate(GridCacheRawVersionedEntry entry, GridDrType drType, AffinityTopologyVersion topVer) throws IgniteCheckedException {
        boolean apply;
        CacheDrEntryFilter drFilter = this.sndCfg.getEntryFilter();
        boolean bl = apply = drFilter == null;
        if (!apply) {
            entry.unmarshal(this.cctx.cacheObjectContext());
            apply = drFilter.accept(new CacheDrEntryImpl(entry, (CacheObjectValueContext)this.cctx.cacheObjectContext()));
        }
        if (!apply) {
            this.metrics.onSenderCacheEntryFiltered();
            return;
        }
        if (!this.awaitSenderHubsInitialization()) {
            return;
        }
        if (!this.stopped()) {
            this.drHnd.onReplicate(entry, drType, topVer);
        }
    }

    private IgniteInternalFuture<?> stateTransfer(Collection<Byte> dataCenterIds, boolean sync) {
        if (!this.sndEnabled) {
            return new GridFinishedFuture((Throwable)new IgniteCheckedException("Failed to initiate state transfer because data center replication is disabled in cache: " + this.cctx.name()));
        }
        if (!this.awaitSenderHubsInitialization()) {
            return new GridFinishedFuture((Throwable)new IgniteCheckedException("Failed to initiate state transfer because sender hubs are not initialized."));
        }
        GridFutureAdapter fut = new GridFutureAdapter();
        this.enqueueDrMgmtTask(new StateTransferStartTask(fut, dataCenterIds, sync));
        try {
            return (IgniteInternalFuture)fut.get();
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException("Failed to initiate state transfer.", e);
        }
    }

    public boolean enabled() {
        return this.sndEnabled;
    }

    public boolean receiveEnabled() {
        return this.ccfg != null;
    }

    public CacheDrStatus drStopState() {
        if (this.cctx.isNear()) {
            return ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).drStopState();
        }
        this.checkDrEnabled();
        this.awaitSenderHubsInitialization();
        CacheDrPauseInfo stopInfo = this.stopInfo;
        return stopInfo != null && stopInfo.reason() != null ? new CacheDrStatus(stopInfo.reason(), stopInfo.error()) : CacheDrStatus.ACTIVE;
    }

    public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
        assert (this.cctx.isDrEnabled());
        this.drHnd.onExchange(topVer, left, this.stopped());
    }

    public void partitionEvicted(int part) {
        assert (this.cctx.isDrEnabled());
        assert (part >= 0);
        if (!this.stopped()) {
            this.drHnd.onPartitionEvicted(part);
        }
    }

    private void userStateChange(boolean stopped) throws IgniteCheckedException {
        if (!this.sndEnabled) {
            throw new IgniteCheckedException("Failed to initiate state transfer because data center replication is disabled in cache: " + this.cctx.name());
        }
        if (!this.awaitSenderHubsInitialization()) {
            throw new IgniteCheckedException("Failed to change start/stop DR because grid is stopping.");
        }
        DrStopTask task = new DrStopTask(stopped ? CacheDrPauseReason.USER_REQUEST : null, null, null);
        this.enqueueDrMgmtTask(task);
        task.fut.get();
    }

    private GridKernalContext kernalCtx() {
        return this.cctx.kernalContext();
    }

    private void recordCacheReplicationStateChangedEvt(CacheDrPauseInfo info) {
        String msg;
        int type;
        assert (info != null) : "info";
        if (info.reason() == null) {
            type = 1023;
            msg = "Replication started.";
        } else {
            type = 1022;
            msg = "Replication stopped.";
        }
        if (!this.kernalCtx().event().isUserRecordable(type)) {
            return;
        }
        ClusterNode node = this.kernalCtx().discovery().localNode();
        this.kernalCtx().event().record((Event)new DrCacheReplicationEvent(node, msg, type, this.cctx.name(), info));
    }

    public void onBatchFailed(Map<UUID, Throwable> errs) {
        assert (!F.isEmpty(errs));
        this.enqueueDrMgmtTask(new DrStopTask(CacheDrPauseReason.BATCH_FAILED, "All available sender hubs failed to process data center replication batch.", null));
    }

    IgniteInternalFuture<CacheDrResultType> fullStateTransferReplicate(Collection<Byte> dataCenterIds, Map<Byte, EntryBuffer> entryBuffers, IgniteThrowableFunction<Integer, Permit> permitFunction, boolean syncFst, @Nullable IgniteUuid fstId) throws IgniteCheckedException {
        this.awaitSenderHubsInitialization();
        if (this.stopped()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Data center replication is stopped, ignoring full state transfer [cache=" + this.cctx.name() + ']');
            }
            return new GridFinishedFuture((Object)CacheDrResultType.IGNORED);
        }
        return this.drHnd.fullStateTransferReplicate(dataCenterIds, entryBuffers, permitFunction, syncFst, fstId);
    }

    private CacheDrPauseInfo stopReplication(@Nullable CacheDrPauseInfo oldStopInfo, @Nullable CacheDrPauseReason reason, @Nullable String errMsg) throws IgniteCheckedException {
        boolean save;
        boolean bl = save = oldStopInfo == null;
        if (!save) {
            CacheDrPauseReason oldReason = oldStopInfo.reason();
            boolean bl2 = save = oldReason == null && reason != null || oldReason != null && reason == null;
        }
        if (save) {
            DynamicCacheDescriptor cacheDesc = this.cctx.shared().cache().cacheDescriptor(this.cctx.cacheId());
            CacheDrPauseInfo newStopInfo = new CacheDrPauseInfo(this.cctx.localNodeId(), reason, errMsg, cacheDesc.receivedFrom());
            this.sysCache.put((Object)new CacheDrPauseKey(this.cctx.name()), (Object)newStopInfo);
            if (reason != null) {
                this.fstHnd.onReplicationStop(newStopInfo);
                if (this.log.isInfoEnabled()) {
                    this.log.info("Data center replication is stopped [cache=" + this.cctx.name() + ", info=" + newStopInfo + ", reason=" + (Object)((Object)reason) + "]");
                }
            } else {
                if (this.sndHubs.isEmpty()) {
                    throw new IgniteCheckedException("Failed to start replication because there are no sender hubs available.");
                }
                if (this.log.isInfoEnabled()) {
                    this.log.info("Data center replication is started [cache=" + this.cctx.name() + ']');
                }
            }
            return newStopInfo;
        }
        return oldStopInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean txOp(GridPlainInClosure<CacheDrPauseInfo> clo, boolean globalSync) throws IgniteCheckedException {
        int retryCnt = 0;
        while (this.busyLock.enterBusy()) {
            try {
                if (this.stopping) {
                    boolean bl = false;
                    return bl;
                }
                this.cctx.gate().enter();
                try (GridNearTxLocal tx = this.sysCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                    CacheDrPauseInfo info = null;
                    if (globalSync) {
                        CacheDrPauseKey stopKey = new CacheDrPauseKey(this.cctx.name());
                        info = (CacheDrPauseInfo)this.sysCache.get((Object)stopKey);
                        DynamicCacheDescriptor cacheDesc = this.cctx.shared().cache().cacheDescriptor(this.cctx.cacheId());
                        if (!(info == null || info.cacheCreatorUUID() == null || cacheDesc != null && info.cacheCreatorUUID().equals(cacheDesc.receivedFrom()))) {
                            this.sysCache.remove((Object)stopKey);
                            info = null;
                        }
                    }
                    clo.apply(info);
                    tx.commit();
                }
                finally {
                    this.cctx.gate().leave();
                }
                boolean tx = true;
                return tx;
            }
            catch (ClusterTopologyCheckedException e) {
                if (this.stopping || this.cctx.topology().stopping()) {
                    throw e;
                }
                if (retryCnt == 0) {
                    U.warn((IgniteLogger)this.log, (Object)("Failed to execute transaction on system cache, will repeat: " + (Object)((Object)e)));
                }
                if (e instanceof ClusterTopologyServerNotFoundException) {
                    retryCnt += 10;
                }
            }
            catch (NodeStoppingException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.warning("Failed to execute transaction on system cache, node is about to stop.", (Throwable)e);
                }
                boolean bl = false;
                return bl;
            }
            catch (Throwable e) {
                if (this.stopping || this.cctx.topology().stopping()) {
                    throw e;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Error during a transaction over the system cache. " + X.getFullStackTrace((Throwable)e));
                }
                throw e;
            }
            finally {
                this.busyLock.leaveBusy();
            }
            if (++retryCnt < 10) continue;
            U.sleep((long)100L);
        }
        return false;
    }

    public int queuedKeysCount() {
        return this.sndEnabled ? this.drHnd.queuedKeysCount() : 0;
    }

    public int batchWaitingSendCount() {
        return this.sndEnabled ? this.drHnd.batchWaitingSendCount() : 0;
    }

    public int batchWaitingAcknowledgeCount() {
        return this.sndEnabled ? this.drHnd.batchWaitingAcknowledgeCount() : 0;
    }

    public int senderHubsCount() {
        return this.sndHubs.size();
    }

    @Nullable
    public String getDrSenderGroup() {
        return this.sndCfg == null || this.useCacheNames ? null : DrUtils.effectiveSenderGroup(this.sndCfg);
    }

    void setStateTransferThrottle(long stateTransferThrottle) {
        CacheDrStateTransferHandler fstHnd = this.fstHnd;
        if (fstHnd != null) {
            fstHnd.setStateTransferThrottle(stateTransferThrottle);
        }
    }

    long getStateTransferThrottle() {
        CacheDrStateTransferHandler fstHnd = this.fstHnd;
        if (fstHnd != null) {
            return fstHnd.getStateTransferThrottle();
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean initializeSenderHubs() {
        boolean hasHub = false;
        this.sndHubLock.writeLock().lock();
        try {
            assert (!this.sndHubInit);
            for (ClusterNode node : this.cctx.discovery().allNodes()) {
                if (!this.isSenderHub(node)) continue;
                this.sndHubs.add(node);
                this.sndHubAttrs = (DrSenderAttributes)node.attribute("plugins.gg.replication.snd.hub");
                hasHub = true;
            }
        }
        finally {
            this.sndHubLock.writeLock().unlock();
        }
        this.sndHubInit = true;
        this.sndHubInitFut.onDone();
        return hasHub;
    }

    private boolean awaitSenderHubsInitialization() {
        if (this.sndHubInit) {
            return true;
        }
        try {
            this.sndHubInitFut.get();
            return true;
        }
        catch (IgniteCheckedException e) {
            U.error((IgniteLogger)this.log, (Object)"Failed to wait for sender hubs initialization.", (Throwable)e);
            return false;
        }
    }

    public boolean initDr() {
        try {
            GridTuple newStopInfo;
            AtomicBoolean infoChanged;
            boolean shouldStopReplication;
            boolean res;
            if (this.cctx.gridConfig().isClientMode().booleanValue()) {
                this.sysCacheQryId = this.sysCache.context().continuousQueries().executeInternalQuery((CacheEntryUpdatedListener)new SystemCacheUpdatedListener(), (CacheEntryEventSerializableFilter)(this.useCacheNames ? new DrEntryEventFilter(this.ggCctx.igniteCacheConfiguration().getName()) : new DrGroupControlEventFilter(DrUtils.effectiveSenderGroup(this.sndCfg), this.ggCctx.igniteCacheConfiguration().getName())), false, true, false, false);
            }
            if (!(res = this.txOp((GridPlainInClosure<CacheDrPauseInfo>)((GridPlainInClosure)arg_0 -> this.lambda$initDr$0(shouldStopReplication = !this.initializeSenderHubs() && !this.cctx.localNode().isClient(), infoChanged = new AtomicBoolean(false), newStopInfo = F.t(null), arg_0)), true))) {
                return false;
            }
            this.stopInfo = (CacheDrPauseInfo)newStopInfo.get();
            assert (this.stopInfo != null);
            this.metrics.onStopStateChanged(this.stopInfo.reason(), this.stopInfo.error());
        }
        catch (Throwable e) {
            if (!this.stopping) {
                U.error((IgniteLogger)this.log, (Object)"Failed to initialize data center replication state.", (Throwable)e);
            }
            this.sndHubInitFut.onDone(e);
            if (e instanceof Error) {
                throw (Error)e;
            }
            return false;
        }
        return true;
    }

    private void addHub(ClusterNode sndHub) {
        if (this.sndHubs.addIfAbsent(sndHub)) {
            this.sndHubAttrs = (DrSenderAttributes)sndHub.attribute("plugins.gg.replication.snd.hub");
        }
    }

    private boolean removeHub(UUID nodeId) throws IgniteCheckedException {
        ClusterNode rmv = null;
        for (ClusterNode sndHub : this.sndHubs) {
            if (!sndHub.id().equals(nodeId)) continue;
            rmv = sndHub;
            break;
        }
        if (rmv != null) {
            this.sndHubs.remove(rmv);
            Boolean isStorePersistent = (Boolean)rmv.attribute("plugins.gg.replication.snd.store.persistent");
            if (isStorePersistent == null) {
                isStorePersistent = true;
            }
            if (!isStorePersistent.booleanValue()) {
                this.stopReplication(null, CacheDrPauseReason.BATCH_FAILED, "Sender with non-persistent sender store has gone.");
            }
            if (this.sndHubs.isEmpty()) {
                this.sndHubAttrs = null;
                return true;
            }
            ClusterNode lastSndHub = this.sndHubs.get(this.sndHubs.size() - 1);
            this.sndHubAttrs = (DrSenderAttributes)lastSndHub.attribute("plugins.gg.replication.snd.hub");
            assert (this.sndHubAttrs != null);
        }
        return false;
    }

    @Nullable
    ClusterNode nextHub(Collection<UUID> failedHubs) {
        int idx;
        if (!this.awaitSenderHubsInitialization()) {
            return null;
        }
        if (this.locSnd != null && !failedHubs.contains(this.locSnd.id())) {
            return this.locSnd;
        }
        ArrayList<ClusterNode> sndHubs0 = new ArrayList<ClusterNode>(this.sndHubs);
        List<Object> list = sndHubs0 = F.isEmpty(failedHubs) ? sndHubs0 : F.filterList(sndHubs0, (boolean)false, (IgnitePredicate[])new IgnitePredicate[]{(P1 & Serializable)node -> failedHubs.contains(node.id())});
        if (sndHubs0.isEmpty()) {
            return null;
        }
        assert (this.sndCfg.getLoadBalancingMode() != null);
        if (this.sndCfg.getLoadBalancingMode() == DrSenderLoadBalancingMode.DR_RANDOM) {
            idx = this.sndHubsRnd.nextInt(sndHubs0.size());
        } else {
            assert (this.sndCfg.getLoadBalancingMode() == DrSenderLoadBalancingMode.DR_ROUND_ROBIN);
            idx = (int)(this.sndHubIdx.incrementAndGet() & (long)(sndHubs0.size() - 1));
        }
        return (ClusterNode)sndHubs0.get(idx);
    }

    boolean isSenderHub(ClusterNode node) {
        if (this.useCacheNames) {
            DrSenderAttributes attr = (DrSenderAttributes)node.attribute("plugins.gg.replication.snd.hub");
            return this.isSenderHubAttribute(attr);
        }
        Object[] sndGroups = (String[])node.attribute("plugins.gg.replication.snd.groups");
        if (F.isEmpty((Object[])sndGroups)) {
            return false;
        }
        String sndGroup = DrUtils.effectiveSenderGroup(this.sndCfg);
        for (Object grp : sndGroups) {
            if (!sndGroup.equals(grp)) continue;
            return true;
        }
        return false;
    }

    private boolean isSenderHubAttribute(@Nullable DrSenderAttributes attr) {
        if (attr != null) {
            assert (attr.getCacheNames() != null);
            assert (this.sndEnabled);
            for (String cacheName : attr.getCacheNames()) {
                if (!F.eq((Object)CU.mask((String)this.cctx.name()), (Object)cacheName)) continue;
                return true;
            }
        }
        return false;
    }

    boolean stopped() {
        return this.stopping || this.stopInfo != null && this.stopInfo.reason() != null;
    }

    public CacheDrMetrics metrics() {
        CacheDrSenderMetricsAdapter drSndMetrics;
        CacheDrMetrics cp = CacheDrMetrics.copyOf(this.metrics);
        if (cp != null && (drSndMetrics = cp.drSendMetrics0()) != null) {
            drSndMetrics.backupQueueSize(this.sndEnabled ? this.drHnd.backupQueueSize() : 0);
        }
        return cp;
    }

    public IgniteInternalFuture<?> drStateTransfer(Collection<Byte> dataCenterIds, boolean sync) {
        if (this.cctx.isNear()) {
            return ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).stateTransfer(dataCenterIds, sync);
        }
        this.checkDrEnabled();
        return this.stateTransfer(dataCenterIds, sync);
    }

    public IgniteInternalFuture<?> stopDrStateTransfer(CacheDrStateTransferKey key) {
        this.checkDrEnabled();
        if (!this.sndEnabled) {
            return new GridFinishedFuture((Throwable)new IgniteCheckedException("Failed to stop state transfer because data center replication is disabled in cache: " + this.cctx.name()));
        }
        GridFutureAdapter fut = new GridFutureAdapter();
        this.enqueueDrMgmtTask(new StateTransferStopTask(fut, key, CacheDrPauseReason.USER_REQUEST));
        return fut;
    }

    public Collection<CacheDrStateTransfer> drListStateTransfers() {
        if (this.cctx.isNear()) {
            return ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).drListStateTransfers();
        }
        this.checkDrEnabled();
        try {
            return this.fstHnd.listStateTransfers();
        }
        catch (IgniteCheckedException ignored) {
            throw new IllegalStateException("Failed to list state transfers because grid is stopping.");
        }
    }

    public void drStop() {
        if (this.cctx.isNear()) {
            ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).drStop();
            return;
        }
        this.checkDrEnabled();
        try {
            this.userStateChange(true);
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    public void drStart() {
        if (this.cctx.isNear()) {
            ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).drStart();
            return;
        }
        this.checkDrEnabled();
        try {
            this.userStateChange(false);
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException(e.getMessage());
        }
    }

    private void checkDrEnabled() {
        if (!this.enabled()) {
            throw new IllegalStateException("Data center replication is not configured for cache: " + this.cctx.name());
        }
    }

    public void onReceiveCacheEntriesReceived(int entriesCnt) {
        this.metrics.onReceiveCacheEntriesReceived(entriesCnt);
    }

    public void onReceiveCacheConflictResolved(boolean useNew, boolean useOld, boolean merge) {
        this.metrics.onReceiveCacheConflictResolved(useNew, useOld, merge);
    }

    public void resetMetrics() {
        boolean isDrSndCache = this.ccfg != null && this.ccfg.getDrSenderConfiguration() != null;
        boolean isDrRcvCache = this.ccfg != null;
        this.metrics = new CacheDrMetrics(isDrSndCache, isDrRcvCache);
        if (!this.cctx.isColocated() && this.cctx.dht().near() != null) {
            this.metrics.delegate(((GridGainCacheDrManager)this.cctx.dht().near().context().dr()).metrics());
        }
    }

    DrSenderAttributes sendHubAttributes() {
        return this.sndHubAttrs;
    }

    boolean stopKey(Object key) {
        if (this.useCacheNames && key instanceof CacheDrSenderHubStopKey) {
            CacheDrSenderHubStopKey key0 = (CacheDrSenderHubStopKey)key;
            String cacheName = this.ggCctx.igniteCacheConfiguration().getName();
            return F.eq((Object)cacheName, (Object)key0.cacheName());
        }
        if (!this.useCacheNames && key instanceof DrSenderGroupNodeStopKey) {
            DrSenderGroupNodeStopKey key0 = (DrSenderGroupNodeStopKey)key;
            String grpName = DrUtils.effectiveSenderGroup(this.sndCfg);
            return F.eq((Object)grpName, (Object)key0.groupName());
        }
        return false;
    }

    boolean pauseKey(Object key) {
        return key instanceof CacheDrPauseKey && F.eq((Object)this.ggCctx.igniteCacheConfiguration().getName(), (Object)((CacheDrPauseKey)key).cacheName());
    }

    boolean stateTransferResultKey(Object key) {
        return key instanceof CacheDrStateTransferResultKey && F.eq((Object)this.ggCctx.igniteCacheConfiguration().getName(), (Object)((CacheDrStateTransferResultKey)key).cacheName());
    }

    boolean stateTransferKey(Object key) {
        return key instanceof CacheDrStateTransferKey && F.eq((Object)this.ggCctx.igniteCacheConfiguration().getName(), (Object)((CacheDrStateTransferKey)key).cacheName());
    }

    void disableAdaptiveThrottling() {
        CacheDrHandler drHnd = this.drHnd;
        if (drHnd != null) {
            drHnd.setDisableThrottling(true);
        }
    }

    void enableAdaptiveThrottling() {
        CacheDrHandler drHnd = this.drHnd;
        if (drHnd != null) {
            drHnd.setDisableThrottling(false);
        }
    }

    private void enqueueDrMgmtTask(DrTask task) {
        this.controlTask.enqueueTask(task);
    }

    void submitStateTransferTask(StateTransferTask task) {
        this.drProc.submitStateTransferTask(task);
    }

    void runAsync(final DrTask task) {
        this.drProc.submit(new Runnable(){

            @Override
            public void run() {
                block7: {
                    try {
                        GridCacheSharedContext sctx = GridGainCacheDrManager.this.cctx.shared();
                        GridDhtPartitionsExchangeFuture topFuture = sctx.exchange().lastTopologyFuture();
                        assert (topFuture != null) : "DR Worker should start after join to topology (last exchange future is null)";
                        IgniteInternalFuture affinityReadyFuture = sctx.exchange().affinityReadyFuture(topFuture.initialVersion());
                        affinityReadyFuture.get();
                        boolean res = GridGainCacheDrManager.this.txOp((GridPlainInClosure<CacheDrPauseInfo>)((GridPlainInClosure)stopInfo -> task.run((CacheDrPauseInfo)stopInfo)), true);
                        if (!res) {
                            task.onError((Throwable)new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + task + ", cache=" + GridGainCacheDrManager.this.cctx.name()));
                            return;
                        }
                        task.onDone();
                    }
                    catch (Throwable e) {
                        if (!GridGainCacheDrManager.this.cctx.topology().stopping()) {
                            U.error((IgniteLogger)GridGainCacheDrManager.this.log, (Object)"An exception occurred during DR task processing.", (Throwable)e);
                        } else if (GridGainCacheDrManager.this.log.isDebugEnabled()) {
                            GridGainCacheDrManager.this.log.debug("An exception occurred during DR task processing: " + e);
                        }
                        task.onError(e);
                        if (!(e instanceof Error)) break block7;
                        throw (Error)e;
                    }
                }
            }

            public String toString() {
                return task.toString();
            }
        });
    }

    private /* synthetic */ void lambda$initDr$0(boolean shouldStopReplication, AtomicBoolean infoChanged, GridTuple newStopInfo, CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
        CacheDrPauseInfo stopInfo = shouldStopReplication ? this.stopReplication(oldStopInfo, CacheDrPauseReason.NO_SND_HUBS, null) : (oldStopInfo == null ? this.stopReplication(null, null, null) : oldStopInfo);
        infoChanged.set(oldStopInfo != stopInfo);
        newStopInfo.set((Object)stopInfo);
    }

    private class SystemCacheUpdatedListener
    implements CacheEntryUpdatedListener<Object, Object> {
        private SystemCacheUpdatedListener() {
        }

        public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> e : evts) {
                Object key = e.getKey();
                if (GridGainCacheDrManager.this.stateTransferKey(key)) {
                    GridGainCacheDrManager.this.runAsync(new StateTransferChangeTask((CacheDrStateTransferKey)key, (CacheDrStateTransferInfo)e.getValue()));
                    continue;
                }
                if (GridGainCacheDrManager.this.stateTransferResultKey(key)) {
                    CacheDrStateTransferResultInfo oldInfo = (CacheDrStateTransferResultInfo)e.getOldValue();
                    CacheDrStateTransferResultInfo newInfo = (CacheDrStateTransferResultInfo)e.getValue();
                    boolean transferCompleted = oldInfo != null && !oldInfo.done() && newInfo != null && newInfo.done();
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new StateTransferResultTask((CacheDrStateTransferResultKey)key, transferCompleted));
                    continue;
                }
                if (GridGainCacheDrManager.this.stopKey(key)) {
                    CacheDrSenderHubStopInfo stopInfo = (CacheDrSenderHubStopInfo)e.getValue();
                    if (stopInfo == null) continue;
                    UUID stoppedNodeId = key instanceof CacheDrSenderHubStopKey ? ((CacheDrSenderHubStopKey)key).nodeId() : ((DrSenderGroupNodeStopKey)key).nodeId();
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new SenderHubStopTask(Collections.singleton(stoppedNodeId), CacheDrPauseReason.NO_SND_HUBS, stopInfo.error() != null ? stopInfo.error().getMessage() : null));
                    continue;
                }
                if (!GridGainCacheDrManager.this.pauseKey(key)) continue;
                assert (key instanceof CacheDrPauseKey) : key;
                CacheDrPauseInfo info = (CacheDrPauseInfo)e.getValue();
                if (info == null) continue;
                GridGainCacheDrManager.this.recordCacheReplicationStateChangedEvt(info);
                if (F.eq((Object)GridGainCacheDrManager.this.cctx.localNodeId(), (Object)info.nodeId())) continue;
                GridGainCacheDrManager.this.enqueueDrMgmtTask(new DrStopTask(null, null, info));
            }
        }
    }

    static abstract class DrTask {
        DrTask() {
        }

        @Nullable
        public abstract CacheDrPauseInfo run(@Nullable CacheDrPauseInfo var1) throws IgniteCheckedException;

        @Nullable
        public AffinityTopologyVersion topologyVersion() {
            return null;
        }

        public void onDone() {
        }

        public void onError(Throwable err) {
        }
    }

    private class SenderHubStopTask
    extends DrTask {
        private final Collection<UUID> sndHubIds;
        private final CacheDrPauseReason reason;
        private final String errMsg;

        private SenderHubStopTask(Collection<UUID> sndHubIds, @Nullable CacheDrPauseReason reason, String errMsg) {
            this.sndHubIds = sndHubIds;
            this.reason = reason;
            this.errMsg = errMsg;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            assert (oldStopInfo != null);
            boolean noHubs = false;
            for (UUID sndHubId : this.sndHubIds) {
                if (!GridGainCacheDrManager.this.removeHub(sndHubId)) continue;
                noHubs = true;
            }
            GridGainCacheDrManager.this.drHnd.onSenderHubsLeave(this.sndHubIds);
            return noHubs ? GridGainCacheDrManager.this.stopReplication(oldStopInfo, this.reason, this.errMsg) : oldStopInfo;
        }
    }

    private class SenderHubStartTask
    extends DrTask {
        private final ClusterNode sndHubNode;

        private SenderHubStartTask(ClusterNode sndHubNode) {
            this.sndHubNode = sndHubNode;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) {
            assert (oldStopInfo != null);
            GridGainCacheDrManager.this.addHub(this.sndHubNode);
            return oldStopInfo;
        }
    }

    private class NodeLeaveTask
    extends DrTask {
        private final AffinityTopologyVersion topVer;
        private final UUID nodeId;
        private final boolean dataNode;
        private final boolean sndHubNode;

        private NodeLeaveTask(long topVer, UUID nodeId, boolean dataNode, boolean sndHubNode) {
            this.topVer = new AffinityTopologyVersion(topVer);
            this.nodeId = nodeId;
            this.dataNode = dataNode;
            this.sndHubNode = sndHubNode;
        }

        @Override
        @Nullable
        public AffinityTopologyVersion topologyVersion() {
            return this.topVer;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            boolean noHubs = false;
            if (this.sndHubNode) {
                noHubs = GridGainCacheDrManager.this.removeHub(this.nodeId);
                GridGainCacheDrManager.this.drHnd.onSenderHubsLeave(Collections.singleton(this.nodeId));
            }
            Collection cacheNodes = CU.affinityNodes((GridCacheContext)GridGainCacheDrManager.this.cctx, (AffinityTopologyVersion)this.topVer);
            if (F.eq((Object)GridGainCacheDrManager.this.cctx.localNode(), (Object)CU.oldest((Collection)cacheNodes))) {
                HashSet<GridCacheInternal> staleStopKeys = new HashSet<GridCacheInternal>();
                HashSet<CacheDrStateTransferResultKey> orphanFstResults = new HashSet<CacheDrStateTransferResultKey>();
                HashMap<CacheDrStateTransferResultKey, CacheDrStateTransferResultInfo> fstResults = new HashMap<CacheDrStateTransferResultKey, CacheDrStateTransferResultInfo>();
                Iterator it = GridGainCacheDrManager.this.sysCache.scanIterator(false, null);
                while (it.hasNext()) {
                    CacheDrStateTransferResultInfo resInfo;
                    boolean removed;
                    Cache.Entry entry = (Cache.Entry)it.next();
                    Object key = entry.getKey();
                    if (GridGainCacheDrManager.this.stopKey(key)) {
                        UUID stoppedNodeId = key instanceof CacheDrSenderHubStopKey ? ((CacheDrSenderHubStopKey)key).nodeId() : ((DrSenderGroupNodeStopKey)key).nodeId();
                        if (GridGainCacheDrManager.this.cctx.discovery().alive(stoppedNodeId) && !F.eq((Object)this.nodeId, (Object)stoppedNodeId)) continue;
                        staleStopKeys.add((GridCacheInternal)key);
                        continue;
                    }
                    if (!GridGainCacheDrManager.this.stateTransferResultKey(entry.getKey()) || !(removed = (resInfo = (CacheDrStateTransferResultInfo)entry.getValue()).listeners().removeIf(uuid -> !GridGainCacheDrManager.this.cctx.discovery().alive(uuid)))) continue;
                    if (resInfo.done() && resInfo.listeners().isEmpty()) {
                        orphanFstResults.add((CacheDrStateTransferResultKey)entry.getKey());
                        continue;
                    }
                    fstResults.put((CacheDrStateTransferResultKey)entry.getKey(), resInfo);
                }
                if (!staleStopKeys.isEmpty()) {
                    GridGainCacheDrManager.this.sysCache.removeAll(staleStopKeys);
                }
                if (!orphanFstResults.isEmpty()) {
                    GridGainCacheDrManager.this.sysCache.removeAll((Collection)orphanFstResults.stream().map(r -> new CacheDrStateTransferKey(r.cacheName(), r.id(), r.dataCenterIds())).collect(Collectors.toSet()));
                    GridGainCacheDrManager.this.sysCache.removeAll(orphanFstResults);
                }
                if (!fstResults.isEmpty()) {
                    GridGainCacheDrManager.this.sysCache.putAll(fstResults);
                }
            }
            if (noHubs) {
                return GridGainCacheDrManager.this.stopReplication(oldStopInfo, CacheDrPauseReason.NO_SND_HUBS, null);
            }
            if (this.dataNode) {
                GridGainCacheDrManager.this.fstHnd.onDataNodeLeft(this.topVer, this.nodeId);
            }
            return oldStopInfo;
        }
    }

    private class StateTransferResultTask
    extends DrTask {
        private final CacheDrStateTransferResultKey key;
        private final boolean transferCompleted;

        private StateTransferResultTask(CacheDrStateTransferResultKey key, boolean transferCompleted) {
            this.key = key;
            this.transferCompleted = transferCompleted;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            Object info = GridGainCacheDrManager.this.sysCache.get((Object)this.key);
            if (info != null) {
                GridGainCacheDrManager.this.fstHnd.onStateTransferResultChanged(this.key, (CacheDrStateTransferResultInfo)info, this.transferCompleted);
            }
            return oldStopInfo;
        }
    }

    private class StateTransferChangeTask
    extends DrTask {
        private final CacheDrStateTransferKey key;
        private final CacheDrStateTransferInfo info;

        StateTransferChangeTask(CacheDrStateTransferKey key, CacheDrStateTransferInfo info) {
            this.key = key;
            this.info = info;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            GridGainCacheDrManager.this.fstHnd.onStateTransferInfoChanged(this.key, this.info);
            return oldStopInfo;
        }
    }

    private class StateTransferStopTask
    extends DrTask {
        private final GridFutureAdapter<GridFutureAdapter<?>> fut;
        private final CacheDrStateTransferKey key;
        private final CacheDrPauseInfo stopInfo;

        StateTransferStopTask(GridFutureAdapter<GridFutureAdapter<?>> fut, CacheDrStateTransferKey key, CacheDrPauseReason reason) {
            this.fut = fut;
            this.key = key;
            DynamicCacheDescriptor cacheDesc = GridGainCacheDrManager.this.cctx.shared().cache().cacheDescriptor(GridGainCacheDrManager.this.cctx.cacheId());
            this.stopInfo = reason == null ? null : new CacheDrPauseInfo(GridGainCacheDrManager.this.kernalCtx().localNodeId(), reason, "State transfer cancelled.", cacheDesc.receivedFrom());
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            GridGainCacheDrManager.this.fstHnd.stopStateTransfer(this.key, this.stopInfo);
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            this.fut.onDone();
        }

        @Override
        public void onError(Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class StateTransferStartTask
    extends DrTask {
        private final GridFutureAdapter<GridFutureAdapter<?>> fut;
        private final GridFutureAdapter<?> innerFut = new GridFutureAdapter();
        private final Collection<Byte> dataCenterIds;
        private final boolean syncFst;

        private StateTransferStartTask(GridFutureAdapter<GridFutureAdapter<?>> fut, Collection<Byte> dataCenterIds, boolean syncFst) {
            this.fut = fut;
            this.dataCenterIds = dataCenterIds;
            this.syncFst = syncFst;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            assert (oldStopInfo != null);
            if (GridGainCacheDrManager.this.stopInfo.reason() != null) {
                this.innerFut.onDone((Throwable)new IgniteCheckedException("Failed to initiate state transfer because data center replication is stopped: " + oldStopInfo));
            } else {
                GridGainCacheDrManager.this.fstHnd.stateTransfer(this.dataCenterIds, this.syncFst).listen(new IgniteInClosure<IgniteInternalFuture<?>>(){
                    private static final long serialVersionUID = 0L;

                    public void apply(IgniteInternalFuture<?> transferFut) {
                        try {
                            transferFut.get();
                            StateTransferStartTask.this.innerFut.onDone();
                        }
                        catch (Throwable e) {
                            StateTransferStartTask.this.innerFut.onDone(e);
                        }
                    }
                });
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            this.fut.onDone(this.innerFut);
        }

        @Override
        public void onError(@Nullable Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class DrStopTask
    extends DrTask {
        private final CacheDrPauseReason reason;
        private final String errMsg;
        private final CacheDrPauseInfo rmtStopInfo;
        private final GridFutureAdapter<?> fut = new GridFutureAdapter();

        private DrStopTask(@Nullable CacheDrPauseReason reason, @Nullable String errMsg, CacheDrPauseInfo rmtStopInfo) {
            this.reason = reason;
            this.errMsg = errMsg;
            this.rmtStopInfo = rmtStopInfo;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            if (GridGainCacheDrManager.this.log.isInfoEnabled()) {
                GridGainCacheDrManager.this.log.info("Data center replication executes a " + this.taskPurpose() + " task [reason=" + (Object)((Object)this.reason) + ", rmtStopInfo=" + this.rmtStopInfo + ", oldStopInfo=" + oldStopInfo + ", sndHubs=" + GridGainCacheDrManager.this.sndHubs + "]");
            }
            if (this.rmtStopInfo == null) {
                return GridGainCacheDrManager.this.stopReplication(oldStopInfo, this.reason, this.errMsg);
            }
            assert (oldStopInfo != null);
            if (oldStopInfo.reason() == null && GridGainCacheDrManager.this.sndHubs.isEmpty() && !GridGainCacheDrManager.this.cctx.localNode().isClient()) {
                GridGainCacheDrManager.this.enqueueDrMgmtTask(new DrStopTask(CacheDrPauseReason.NO_SND_HUBS, null, null));
            }
            return oldStopInfo;
        }

        @NotNull
        private String taskPurpose() {
            return this.reason == null && (this.rmtStopInfo == null || this.rmtStopInfo.reason() == null) ? "start" : "stop";
        }

        @Override
        public void onDone() {
            this.fut.onDone();
        }

        @Override
        public void onError(Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class DrControlTaskExecutor
    implements Runnable {
        private final BlockingQueue<DrTask> tasksQ = new LinkedBlockingDeque<DrTask>();
        private boolean hasTasks;
        private volatile TaskExecutorState state = TaskExecutorState.NOT_STARTED;
        private volatile Thread thread;

        private DrControlTaskExecutor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            if (this.isCancelled()) {
                return;
            }
            if (this.state == TaskExecutorState.NOT_STARTED && GridGainCacheDrManager.this.initDr()) {
                DrControlTaskExecutor drControlTaskExecutor = this;
                synchronized (drControlTaskExecutor) {
                    if (this.isCancelled()) {
                        return;
                    }
                    this.state = TaskExecutorState.INITIALIZED;
                    if (!this.hasTasks) {
                        return;
                    }
                }
            }
            this.thread = Thread.currentThread();
            try {
                while (!this.isCancelled()) {
                    DrTask task;
                    while (!this.isCancelled() && (task = (DrTask)this.tasksQ.poll()) != null) {
                        this.processDrTask(task);
                    }
                    DrControlTaskExecutor drControlTaskExecutor = this;
                    synchronized (drControlTaskExecutor) {
                        if (this.tasksQ.isEmpty()) {
                            this.hasTasks = false;
                            return;
                        }
                    }
                }
                return;
            }
            finally {
                DrControlTaskExecutor drControlTaskExecutor = this;
                synchronized (drControlTaskExecutor) {
                    this.thread = null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void enqueueTask(DrTask drTask) {
            DrControlTaskExecutor drControlTaskExecutor = this;
            synchronized (drControlTaskExecutor) {
                if (this.isCancelled()) {
                    drTask.onError((Throwable)new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + drTask + ", cache=" + GridGainCacheDrManager.this.cctx.name()));
                    return;
                }
                this.tasksQ.offer(drTask);
                if (!this.hasTasks) {
                    this.hasTasks = true;
                    if (this.isInitialized()) {
                        GridGainCacheDrManager.this.drProc.submit(this);
                    }
                }
            }
        }

        private boolean isInitialized() {
            return this.state == TaskExecutorState.INITIALIZED;
        }

        private boolean isCancelled() {
            return this.state == TaskExecutorState.CANCELLED;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void stop() {
            DrTask t;
            assert (GridGainCacheDrManager.this.stopping);
            DrControlTaskExecutor drControlTaskExecutor = this;
            synchronized (drControlTaskExecutor) {
                this.state = TaskExecutorState.CANCELLED;
                Thread thread0 = this.thread;
                if (thread0 != null) {
                    thread0.interrupt();
                }
            }
            IgniteCheckedException reason = new IgniteCheckedException("Failed to perform DR task because grid is stopping.");
            while ((t = (DrTask)this.tasksQ.poll()) != null) {
                t.onError(reason);
            }
        }

        private void processDrTask(DrTask task) {
            block10: {
                try {
                    GridTuple newStopInfo;
                    boolean res;
                    AffinityTopologyVersion topVer = task.topologyVersion();
                    if (topVer != null) {
                        GridGainCacheDrManager.this.cctx.affinity().affinityReadyFuture(topVer).get();
                    }
                    if (!(res = GridGainCacheDrManager.this.txOp((GridPlainInClosure<CacheDrPauseInfo>)((GridPlainInClosure)arg_0 -> DrControlTaskExecutor.lambda$processDrTask$0(newStopInfo = F.t(null), task, arg_0)), true))) {
                        task.onError((Throwable)new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + task + ", cache=" + GridGainCacheDrManager.this.cctx.name()));
                        return;
                    }
                    CacheDrPauseInfo newInfo = (CacheDrPauseInfo)newStopInfo.get();
                    if (!F.eq((Object)GridGainCacheDrManager.this.stopInfo, (Object)newInfo)) {
                        GridGainCacheDrManager.this.stopInfo = newInfo;
                        if (newInfo != null) {
                            GridGainCacheDrManager.this.metrics.onStopStateChanged(newInfo.reason(), newInfo.error());
                        } else {
                            GridGainCacheDrManager.this.metrics.onStopStateChanged(null, null);
                        }
                    }
                    task.onDone();
                }
                catch (Throwable e) {
                    if (!GridGainCacheDrManager.this.stopping && !GridGainCacheDrManager.this.cctx.topology().stopping()) {
                        U.error((IgniteLogger)GridGainCacheDrManager.this.log, (Object)"An exception occurred during DR task processing.", (Throwable)e);
                    } else if (GridGainCacheDrManager.this.log.isDebugEnabled()) {
                        GridGainCacheDrManager.this.log.debug("An exception occurred during DR task processing: " + e);
                    }
                    task.onError(e);
                    if (!(e instanceof Error)) break block10;
                    throw (Error)e;
                }
            }
        }

        public String toString() {
            return "DrControlTaskExecutor{hasTasks=" + this.hasTasks + ", state=" + (Object)((Object)this.state) + '}';
        }

        private static /* synthetic */ void lambda$processDrTask$0(GridTuple newStopInfo, DrTask task, CacheDrPauseInfo stopInfo) throws IgniteCheckedException {
            newStopInfo.set((Object)task.run(stopInfo));
        }
    }

    private static enum TaskExecutorState {
        NOT_STARTED,
        INITIALIZED,
        CANCELLED;

    }

    private class DiscoveryListener
    implements GridLocalEventListener {
        private DiscoveryListener() {
        }

        public void onEvent(Event evt) {
            DiscoveryEvent evt0 = (DiscoveryEvent)evt;
            switch (evt0.type()) {
                case 11: 
                case 12: {
                    ClusterNode shadow = evt0.eventNode();
                    boolean cacheNode = GridGainCacheDrManager.this.cctx.discovery().cacheNode(shadow, GridGainCacheDrManager.this.cctx.name());
                    boolean sndHubNode = GridGainCacheDrManager.this.isSenderHub(shadow);
                    if (!cacheNode && !sndHubNode) break;
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new NodeLeaveTask(evt0.topologyVersion(), shadow.id(), cacheNode, sndHubNode));
                    break;
                }
                default: {
                    assert (evt0.type() == 10);
                    ClusterNode node = GridGainCacheDrManager.this.cctx.discovery().node(evt0.eventNode().id());
                    if (node == null) {
                        return;
                    }
                    if (!GridGainCacheDrManager.this.isSenderHub(node)) break;
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new SenderHubStartTask(node));
                }
            }
        }
    }

    public static class DrGroupControlEventFilter
    extends DrEntryEventFilter {
        private static final long serialVersionUID = 0L;
        private String grpName;

        public DrGroupControlEventFilter() {
        }

        public DrGroupControlEventFilter(String grpName, String cacheName) {
            super(cacheName);
            this.grpName = grpName;
        }

        @Override
        public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
            Object key = evt.getKey();
            if (key instanceof CacheDrStateTransferKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrStateTransferKey)key).cacheName());
            }
            if (key instanceof CacheDrStateTransferResultKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrStateTransferResultKey)key).cacheName());
            }
            if (key instanceof CacheDrPauseKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrPauseKey)key).cacheName());
            }
            if (key instanceof DrSenderGroupNodeStopKey) {
                return F.eq((Object)this.grpName, (Object)((DrSenderGroupNodeStopKey)key).groupName());
            }
            return false;
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            super.readExternal(in);
            this.grpName = U.readString((DataInput)in);
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            super.writeExternal(out);
            U.writeString((DataOutput)out, (String)this.grpName);
        }
    }

    public static class DrEntryEventFilter
    implements CacheEntryEventSerializableFilter,
    Externalizable {
        private static final long serialVersionUID = 0L;
        protected String cacheName;

        public DrEntryEventFilter() {
        }

        public DrEntryEventFilter(String cacheName) {
            this.cacheName = cacheName;
        }

        public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
            Object key = evt.getKey();
            if (key instanceof CacheDrStateTransferKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrStateTransferKey)key).cacheName());
            }
            if (key instanceof CacheDrStateTransferResultKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrStateTransferResultKey)key).cacheName());
            }
            if (key instanceof CacheDrPauseKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrPauseKey)key).cacheName());
            }
            if (key instanceof CacheDrSenderHubStopKey) {
                return F.eq((Object)this.cacheName, (Object)((CacheDrSenderHubStopKey)key).cacheName());
            }
            return false;
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.cacheName = U.readString((DataInput)in);
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeString((DataOutput)out, (String)this.cacheName);
        }
    }
}

