/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.dr;

import java.lang.annotation.Annotation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.net.ssl.SSLContext;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.gridgain.grid.GridDr;
import org.gridgain.grid.cache.dr.CacheDrStateTransfer;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainCacheConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.store.DrSenderStore;
import org.gridgain.grid.dr.store.DurableStore;
import org.gridgain.grid.dr.store.fs.DrSenderFsStore;
import org.gridgain.grid.internal.GridPluginProcessorAdapter;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultKey;
import org.gridgain.grid.internal.processors.cache.dr.GridGainCacheDrManager;
import org.gridgain.grid.internal.processors.dr.DrImpl;
import org.gridgain.grid.internal.processors.dr.DrMetricsManager;
import org.gridgain.grid.internal.processors.dr.DrReceiver;
import org.gridgain.grid.internal.processors.dr.DrSenderAttributes;
import org.gridgain.grid.internal.processors.dr.DrSenderImpl;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.fst.Batch;
import org.gridgain.grid.internal.processors.dr.fst.DrStateTransferWorkerPool;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferTask;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalResponse;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class DrProcessor
extends GridPluginProcessorAdapter
implements PartitionsExchangeAware,
IgniteChangeGlobalStateSupport {
    public static final String DR_PROTO_VER = "1.0-20140117";
    private final DrImpl dr;
    private DrSenderImpl sndHub;
    private DrReceiver rcvHub;
    private IgniteThreadPoolExecutor drMgmtPool;
    private DrStateTransferWorkerPool stateTransferWorkerPool;
    private final BlockingQueue<StateTransferTask<Batch>> stateTransferQ = new LinkedBlockingQueue<StateTransferTask<Batch>>();
    private volatile DrMetricsManager metrics;
    private boolean started;
    private CountDownLatch activeLatch;

    public DrProcessor(PluginContext ctx, GridGainConfiguration cfg) {
        super(ctx, cfg);
        this.dr = new DrImpl(this, this.igniteCtx);
    }

    @Override
    public void start() throws IgniteCheckedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Initialize DR Processor.");
        }
        this.validate();
        this.igniteCtx.cache().context().exchange().registerExchangeAwareComponent((PartitionsExchangeAware)this);
        DrReceiverConfiguration rcvHubCfg = this.cfg.getDrReceiverConfiguration();
        DrSenderConfiguration sndHubCfg = this.cfg.getDrSenderConfiguration();
        if (rcvHubCfg != null || sndHubCfg != null) {
            this.metrics = new DrMetricsManager(this.igniteCtx, this.cfg);
        }
        if (sndHubCfg != null) {
            DrSenderAttributes sndHub = new DrSenderAttributes(sndHubCfg);
            this.igniteCtx.addNodeAttribute("plugins.gg.replication.snd.hub", (Object)sndHub);
            this.igniteCtx.addNodeAttribute("plugins.gg.replication.snd.store.persistent", (Object)this.isDurableStoreConfigured(sndHubCfg));
            this.igniteCtx.addNodeAttribute("plugins.gg.replication.snd.fst.buffer.supported", (Object)Boolean.TRUE);
            if (!this.cfg.isDrUseCacheNames()) {
                this.igniteCtx.addNodeAttribute("plugins.gg.replication.snd.groups", (Object)DrUtils.effectiveSenderGroups(sndHubCfg));
            }
        }
        ArrayList<String> replicationCaches = new ArrayList<String>();
        for (CacheConfiguration ccfg : this.config().getCacheConfiguration()) {
            GridGainCacheConfiguration ggCcfg = (GridGainCacheConfiguration)GridCacheUtils.cachePluginConfiguration((CacheConfiguration)ccfg, GridGainCacheConfiguration.class);
            if (ggCcfg == null || ggCcfg.getDrSenderConfiguration() == null) continue;
            replicationCaches.add(ccfg.getName());
        }
        this.igniteCtx.addNodeAttribute("plugins.gg.replication.caches", replicationCaches);
        if (rcvHubCfg != null) {
            this.rcvHub = new DrReceiver(this);
            this.rcvHub.start();
        }
        if (sndHubCfg != null) {
            this.sndHub = new DrSenderImpl(this);
            this.sndHub.start();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started data center replication processor.");
        }
    }

    private boolean isDurableStoreConfigured(DrSenderConfiguration sndHubCfg) {
        return this.storeHasAnnotation(sndHubCfg, DurableStore.class);
    }

    private boolean storeHasAnnotation(DrSenderConfiguration sndHubCfg, Class<? extends Annotation> annCls) {
        return sndHubCfg.getStore() != null ? sndHubCfg.getStore().getClass().isAnnotationPresent(annCls) : Arrays.stream(sndHubCfg.getConnectionConfiguration()).allMatch(c -> c.getStore() != null && c.getStore().getClass().isAnnotationPresent(annCls));
    }

    GridKernalContext kernalContext() {
        return this.igniteCtx;
    }

    public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
        this.onIgniteStart();
        if (!this.kernalContext().clientNode()) {
            this.activeLatch.countDown();
        }
    }

    @Override
    public void onIgniteStart() throws IgniteCheckedException {
        if (this.started) {
            return;
        }
        this.started = true;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting DR Processor.");
        }
        if (!IgniteSystemProperties.getBoolean((String)"IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK") && this.cfg.getDrSenderConfiguration() != null) {
            for (ClusterNode node : this.ctx.nodes()) {
                if (node.isLocal()) continue;
                this.checkSenderHub(node);
            }
        }
        this.activeLatch = new CountDownLatch(1);
        if (!this.ctx.localNode().isClient() && !DrUtils.isDrEnabled(this.cfg)) {
            return;
        }
        if (!this.ctx.localNode().isClient()) {
            assert (DrUtils.isDrEnabled(this.cfg));
            this.stateTransferWorkerPool = new DrStateTransferWorkerPool(this.igniteCtx, this.cfg.getDrStateTransferThreadPoolSize(), this.log, this.activeLatch, this.stateTransferQ);
            this.stateTransferWorkerPool.start();
        }
        this.drMgmtPool = new IgniteThreadPoolExecutor("dr-mgmt-pool", this.igniteCtx.igniteInstanceName(), this.cfg.getDrManagementThreadPoolSize(), this.cfg.getDrManagementThreadPoolSize(), 60000L, new LinkedBlockingQueue(), -1, this.igniteCtx.uncaughtExceptionHandler());
        if (this.rcvHub != null) {
            this.rcvHub.onKernalStart();
        }
        if (this.sndHub != null) {
            this.sndHub.onKernalStart();
        }
    }

    public void onDeActivate(GridKernalContext kctx) {
        this.onIgniteStop(false);
    }

    @Override
    public void onIgniteStop(boolean cancel) {
        if (!this.started) {
            return;
        }
        this.started = false;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stopping DR Processor.");
        }
        if (this.rcvHub != null) {
            this.rcvHub.onKernalStop(cancel);
        }
        if (this.sndHub != null) {
            this.sndHub.onKernalStop();
        }
        if (this.activeLatch != null) {
            this.activeLatch.countDown();
        }
        if (this.metrics != null) {
            this.metrics.stop();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("DR processor has been stopped.");
        }
    }

    @Override
    public void stop(boolean cancel) {
        super.stop(cancel);
        if (this.stateTransferWorkerPool != null) {
            this.stateTransferWorkerPool.shutdown();
            this.stateTransferQ.clear();
            this.stateTransferWorkerPool.awaitTermination();
        }
        if (this.sndHub != null) {
            this.sndHub.stop();
        }
        if (this.drMgmtPool != null) {
            this.drMgmtPool.shutdownNow();
        }
        this.igniteCtx.cache().context().exchange().unregisterExchangeAwareComponent((PartitionsExchangeAware)this);
    }

    public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
        if (fut.activateCluster()) {
            this.cleanObsoleteState();
        } else if (fut.exchangeActions() != null && fut.exchangeActions().hasStop()) {
            Set grpToDestroy = fut.exchangeActions().cacheGroupsToStop().stream().filter(g -> g.destroy()).map(g -> g.descriptor().groupId()).collect(Collectors.toSet());
            Set<DynamicCacheDescriptor> allCachesToDestroy = fut.exchangeActions().cacheStopRequests().stream().filter(c -> c.request().destroy() || grpToDestroy.contains(c.descriptor().groupId())).map(ExchangeActions.CacheActionData::descriptor).collect(Collectors.toSet());
            Set<String> cachesToDestroy = allCachesToDestroy.stream().filter(cacheDesc -> DrUtils.isDrSenderEnabled(cacheDesc.cacheConfiguration())).map(DynamicCacheDescriptor::cacheName).collect(Collectors.toSet());
            if (!F.isEmpty(cachesToDestroy)) {
                this.clearDrCacheState(cachesToDestroy);
            }
            if (this.metrics != null && !F.isEmpty(allCachesToDestroy)) {
                allCachesToDestroy.forEach(cacheDesc -> this.metrics.onCacheDestroy(cacheDesc.cacheName()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearDrCacheState(Set<String> cachesToDestroy) {
        GridCacheContext sysCtx = this.igniteCtx.cache().utilityCache().context();
        this.igniteCtx.cache().context().database().checkpointReadLock();
        try {
            for (GridDhtLocalPartition part : sysCtx.topology().localPartitions()) {
                GridCursor cur = part.dataStore().cursor();
                while (cur.next()) {
                    KeyCacheObject keyObj = ((CacheDataRow)cur.get()).key();
                    Object key = sysCtx.unwrapBinaryIfNeeded((Object)keyObj, false);
                    String cacheName = null;
                    if (key instanceof CacheDrPauseKey) {
                        cacheName = ((CacheDrPauseKey)key).cacheName();
                    } else if (key instanceof CacheDrStateTransferKey) {
                        cacheName = ((CacheDrStateTransferKey)key).cacheName();
                    } else if (key instanceof CacheDrStateTransferResultKey) {
                        cacheName = ((CacheDrStateTransferResultKey)key).cacheName();
                    }
                    if (cacheName == null || !cachesToDestroy.contains(cacheName)) continue;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Clearing DR state for a destroyed cache [key=" + key + ']');
                    }
                    sysCtx.offheap().remove(sysCtx, keyObj, part.id(), part);
                }
            }
        }
        catch (Exception e) {
            this.log.error("Failed to cleanup outdated DR state.", (Throwable)e);
        }
        finally {
            this.igniteCtx.cache().context().database().checkpointReadUnlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanObsoleteState() {
        GridCacheContext sysCtx = this.igniteCtx.cache().utilityCache().context();
        this.igniteCtx.cache().context().database().checkpointReadLock();
        try {
            for (GridDhtLocalPartition part : sysCtx.topology().localPartitions()) {
                GridCursor cur = part.dataStore().cursor();
                while (cur.next()) {
                    KeyCacheObject keyObj = ((CacheDataRow)cur.get()).key();
                    Object key = sysCtx.unwrapBinaryIfNeeded((Object)keyObj, false);
                    if (!(key instanceof CacheDrPauseKey) && !(key instanceof CacheDrStateTransferKey) && !(key instanceof CacheDrStateTransferResultKey)) continue;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Clearing obsolete DR state [key=" + key + ']');
                    }
                    sysCtx.offheap().remove(sysCtx, keyObj, part.id(), part);
                }
            }
        }
        catch (Exception e) {
            this.log.error("Failed to cleanup outdated DR full transfer states.", (Throwable)e);
        }
        finally {
            this.igniteCtx.cache().context().database().checkpointReadUnlock();
        }
    }

    public void registerSenderHub(DrSenderImpl sndHub) {
        this.igniteCtx.io().addMessageListener((Object)CU.replicationTopicSend(), (nodeId, msg, plc) -> {
            if (msg instanceof DrInternalRequest) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received internal replication request message [sourceNodeId=" + nodeId + ", msg=" + msg + ']');
                }
                sndHub.onReplicationRequest(nodeId, (DrInternalRequest)msg);
            } else assert (false) : "Unexpected message type.";
        });
    }

    public void sendReplicationResponse(UUID nodeId, String cacheName, long seq, @Nullable Throwable err, @Nullable Byte code) {
        block4: {
            try {
                byte[] errBytes = null;
                if (err != null && !this.localNodeId().equals(nodeId)) {
                    errBytes = U.marshal((Marshaller)this.ctx.grid().configuration().getMarshaller(), (Object)err);
                }
                DrInternalResponse msg = new DrInternalResponse(seq, err, errBytes);
                if (code != null) {
                    msg.code(code);
                }
                this.igniteCtx.io().sendToCustomTopic(nodeId, (Object)CU.replicationTopicReceive((String)cacheName), (Message)msg, (byte)33);
            }
            catch (IgniteCheckedException e) {
                if (this.igniteCtx.isStopping()) break block4;
                U.error((IgniteLogger)this.log, (Object)("Failed to send replication response message to the node: " + nodeId), (Throwable)e);
            }
        }
    }

    DrSenderImpl senderHub() {
        return this.sndHub;
    }

    DrReceiver receiverHub() {
        return this.rcvHub;
    }

    public DrMetricsManager metrics() {
        return this.metrics;
    }

    public void resetMetrics() {
        if (this.metrics != null) {
            this.metrics.reset();
        }
    }

    private void validate() {
        DrReceiverConfiguration rcvHubCfg;
        byte dataCenterId = this.cfg.getDataCenterId();
        if (dataCenterId < 0 || dataCenterId >= 32) {
            throw new IgniteException("Data center ID should be non-negative and < 32.");
        }
        DrSenderConfiguration sndHubCfg = this.cfg.getDrSenderConfiguration();
        if (sndHubCfg != null) {
            if (dataCenterId == 0) {
                throw new IgniteException("Data center ID should be positive if sender hub is configured [dataCenterId=0]");
            }
            this.validateSender(sndHubCfg);
        }
        if ((rcvHubCfg = this.cfg.getDrReceiverConfiguration()) != null) {
            if (dataCenterId == 0) {
                throw new IgniteException("Data center ID should be positive if receiver hub is configured [dataCenterId=0]");
            }
            this.validateReceiver(rcvHubCfg);
        }
    }

    private void validateReceiver(DrReceiverConfiguration cfg) {
        if (cfg.getLocalInboundHost() != null) {
            try {
                InetAddress.getByName(cfg.getLocalInboundHost());
            }
            catch (UnknownHostException ignore) {
                throw new IgniteException("Configuration parameter 'localInboundHost' cannot be resolved to local address.");
            }
        }
        this.assertParameter(cfg.getLocalInboundPort() >= 0 || cfg.getLocalInboundPort() <= 65535, "localInboundPort >= 0 || localInboundPort <= 65535");
        this.assertParameter(cfg.getWriteTimeout() >= 0L, "writeTimeout >= 0");
        this.assertParameter(cfg.getIdleTimeout() >= 0L, "idleTimeout >= 0");
        this.assertParameter(cfg.getSelectorCount() > 0, "selectorCount > 0");
        this.assertParameter(cfg.getWorkerThreads() > 0, "workerThreads > 0");
        this.assertParameter(cfg.getMessageQueueLimit() >= 0, "messageQueueLimit >= 0");
        this.assertParameter(cfg.getFlushFrequency() >= 0L, "flushFrequency >= 0");
        this.assertParameter(cfg.getPerNodeBufferSize() > 0, "cfg.getPerNodeBufferSize() > 0");
        this.assertParameter(cfg.getPerNodeParallelLoadOperations() > 0, "cfg.getPerNodeParallelLoadOperations() > 0");
        this.assertParameter(cfg.getSocketSendBufferSize() >= 0, "socketSendBufferSize >= 0");
        this.assertParameter(cfg.getSocketReceiveBufferSize() >= 0, "socketReceiveBufferSize >= 0");
    }

    private void validateSender(DrSenderConfiguration sndCfg) {
        if (this.cfg.isDrUseCacheNames()) {
            this.assertParameter(sndCfg.getCacheNames() != null, "'cacheNames' cannot be null");
            this.assertParameter(sndCfg.getCacheNames().length > 0, "'cacheNames' must contain at least one entry");
            HashSet<String> cacheNames = new HashSet<String>(sndCfg.getCacheNames().length, 1.0f);
            for (String cacheName : sndCfg.getCacheNames()) {
                if (cacheNames.add(cacheName)) continue;
                throw new IgniteException("Configuration parameter 'cacheNames' cannot have duplicates: " + cacheName);
            }
        } else {
            this.assertParameter(F.isEmpty((Object[])sndCfg.getCacheNames()), "'cacheNames' are not allowed (either set GridGainConfiguration.drUseCacheNames to true or use 'senderGroups' instead)");
            Object[] sndGroups = DrUtils.effectiveSenderGroups(sndCfg);
            assert (!F.isEmpty((Object[])sndGroups));
            HashSet<Object> senderGroups = new HashSet<Object>(sndGroups.length, 1.0f);
            for (Object grpName : sndGroups) {
                if (senderGroups.add(grpName)) continue;
                throw new IgniteException("Configuration parameter 'senderGroups' cannot have duplicates: " + (String)grpName);
            }
        }
        this.assertParameter(sndCfg.getHealthCheckFrequency() > 0L, "healthCheckFrequency > 0");
        this.assertParameter(sndCfg.getReadTimeout() > 0L, "readTimeout > 0");
        this.assertParameter(sndCfg.getSystemRequestTimeout() > 0L, "systemRequestTimeout > 0");
        this.assertParameter(sndCfg.getReconnectOnFailureTimeout() > 0L, "reconnectOnFailureTimeout > 0");
        this.assertParameter(sndCfg.getMaxQueueSize() > 0, "maxQueueSize > 0");
        this.assertParameter(sndCfg.getMaxErrors() > 0, "maxErrors > 0");
        this.assertParameter(sndCfg.getMaxFailedConnectAttempts() > 0, "maxFailedConnectAttempts > 0");
        this.assertParameter(sndCfg.getSocketSendBufferSize() >= 0, "socketSendBufferSize >= 0");
        this.assertParameter(sndCfg.getSocketReceiveBufferSize() >= 0, "socketReceiveBufferSize >= 0");
        Object[] replicas = sndCfg.getConnectionConfiguration();
        this.assertParameter(!F.isEmpty((Object[])replicas), "'connectionConfiguration' cannot be null or empty");
        HashSet<Byte> replicaDataCenterIds = new HashSet<Byte>(replicas.length, 1.0f);
        for (Object replica : replicas) {
            byte[] ignoredIds = ((DrSenderConnectionConfiguration)replica).getIgnoredDataCenterIds();
            if (!F.isEmpty((byte[])ignoredIds)) {
                for (byte ignoredId : ignoredIds) {
                    DrProcessor.assertReplicaParameter(DrProcessor.isValidDatacenterID(ignoredId), "Ignored data center id must be between 1 and 31 inclusively");
                }
            }
            DrProcessor.assertReplicaParameter(DrProcessor.isValidDatacenterID(((DrSenderConnectionConfiguration)replica).getDataCenterId()), "'dataCenterId' must be between 1 and 31 inclusively.");
            DrProcessor.assertReplicaParameter(!F.eq((Object)((DrSenderConnectionConfiguration)replica).getDataCenterId(), (Object)this.cfg.getDataCenterId()), "'dataCenterId' cannot be the same as send hub data center ID.");
            if (!replicaDataCenterIds.add(((DrSenderConnectionConfiguration)replica).getDataCenterId())) {
                DrProcessor.assertReplicaParameter(false, "'dataCenterId' is not unique across all replicas defined within the sender hub: " + ((DrSenderConnectionConfiguration)replica).getDataCenterId());
            }
            if (((DrSenderConnectionConfiguration)replica).getLocalOutboundAddress() != null) {
                String host;
                String addrStr = ((DrSenderConnectionConfiguration)replica).getLocalOutboundAddress();
                int colIdx = addrStr.indexOf(58);
                if (colIdx > 0) {
                    boolean portValid;
                    host = addrStr.substring(0, colIdx);
                    String portStr = addrStr.substring(colIdx + 1, addrStr.length());
                    DrProcessor.assertReplicaParameter(!F.isEmpty((String)portStr), "'localOutboundAddress' has empty port: " + addrStr);
                    int portRangeIdx = portStr.indexOf("..");
                    if (portRangeIdx >= 0) {
                        String fromPort = portStr.substring(0, portRangeIdx);
                        String toPort = portStr.substring(portRangeIdx + 2, portStr.length());
                        boolean bl = portValid = DrProcessor.validPort(fromPort) && DrProcessor.validPort(toPort);
                        if (portValid && Integer.parseInt(fromPort) > Integer.parseInt(toPort)) {
                            portValid = false;
                        }
                    } else {
                        portValid = DrProcessor.validPort(portStr);
                    }
                    DrProcessor.assertReplicaParameter(portValid, "'localOutboundAddress' has invalid port value: " + addrStr);
                } else {
                    host = addrStr;
                }
                DrProcessor.assertReplicaParameter(!F.isEmpty((String)host), "'localOutboundAddress' has empty host name: " + addrStr);
                try {
                    InetAddress.getByName(host);
                }
                catch (UnknownHostException ignore) {
                    DrProcessor.assertReplicaParameter(false, "'localOutboundAddress' cannot be resolved to local address: " + ((DrSenderConnectionConfiguration)replica).getLocalOutboundAddress());
                }
            } else if (((DrSenderConnectionConfiguration)replica).getLocalOutboundHost() != null) {
                try {
                    InetAddress.getByName(((DrSenderConnectionConfiguration)replica).getLocalOutboundHost());
                }
                catch (UnknownHostException ignore) {
                    DrProcessor.assertReplicaParameter(false, "'localOutboundHost' cannot be resolved to local address: " + ((DrSenderConnectionConfiguration)replica).getLocalOutboundHost());
                }
            }
            DrProcessor.assertReplicaParameter(((DrSenderConnectionConfiguration)replica).getLoadBalancingMode() != null, "'loadBalancingMode' cannot be null.");
            DrProcessor.assertReplicaParameter(((DrSenderConnectionConfiguration)replica).getReceiverAddresses() != null, "'receiverAddresses' cannot be null.");
            DrProcessor.assertReplicaParameter(((DrSenderConnectionConfiguration)replica).getReceiverAddresses().length > 0, "'receiverAddresses' must have at least one network address defined.");
            HashSet<String> addrs = new HashSet<String>(((DrSenderConnectionConfiguration)replica).getReceiverAddresses().length, 1.0f);
            for (String addr : ((DrSenderConnectionConfiguration)replica).getReceiverAddresses()) {
                block33: {
                    if (addr.endsWith(":")) {
                        addr = addr.substring(0, addr.length() - 1);
                    }
                    if (addr.indexOf(58) >= 0) {
                        StringTokenizer st = new StringTokenizer(addr, ":");
                        if (st.countTokens() == 2) {
                            st.nextToken();
                            String portStr = st.nextToken();
                            try {
                                int port = Integer.parseInt(portStr);
                                if (port < 0 || port > 65535) {
                                    throw new IgniteException("Replica address has invalid port: " + addr);
                                }
                                break block33;
                            }
                            catch (NumberFormatException ignore) {
                                throw new IgniteException("Replica address has invalid port: " + addr);
                            }
                        }
                        throw new IgniteException("Replica address cannot be parsed: " + addr);
                    }
                }
                if (addrs.add(addr)) continue;
                throw new IgniteException("Replica address is not unique within the replica: " + addr);
            }
        }
        BitSet noStoreReplicas = new BitSet();
        for (Object replica : replicas) {
            DrSenderStore replicaStore = ((DrSenderConnectionConfiguration)replica).getStore();
            if (sndCfg.getStore() != null && replicaStore != null) {
                throw new IgniteException("Must be either set global store for sender hub or per replica. Currently set store for hub and data center: " + ((DrSenderConnectionConfiguration)replica).getDataCenterId());
            }
            if (replicaStore != null) continue;
            noStoreReplicas.set(((DrSenderConnectionConfiguration)replica).getDataCenterId());
        }
        if (sndCfg.getStore() == null && noStoreReplicas.cardinality() == replicas.length) {
            sndCfg.setStore(new DrSenderFsStore());
        } else if (sndCfg.getStore() == null && noStoreReplicas.cardinality() > 0) {
            throw new IgniteException("Must be either set global store for sender hub or per replica. Currently missed stores for data centers: " + noStoreReplicas.toString().replace('{', '[').replace('}', ']'));
        }
        this.assertParameter(sndCfg.getFullStateTransferBufferSize() >= 0L, "fullStateTransferBufferSize >= 0");
    }

    private static boolean isValidDatacenterID(byte dcID) {
        return dcID > 0 && dcID < 32;
    }

    private static boolean validPort(String portStr) {
        try {
            int port = Integer.parseInt(portStr);
            return port >= 0 && port <= 65535;
        }
        catch (NumberFormatException e) {
            return false;
        }
    }

    private static void assertReplicaParameter(boolean cond, String condDesc) {
        if (!cond) {
            throw new IgniteException("Replica configuration parameter invalid: " + condDesc);
        }
    }

    private void checkSenderHub(ClusterNode rmtNode) throws IgniteCheckedException {
        DrSenderAttributes rmtCfg = (DrSenderAttributes)rmtNode.attribute("plugins.gg.replication.snd.hub");
        DrSenderAttributes locCfg = (DrSenderAttributes)this.ctx.localNode().attribute("plugins.gg.replication.snd.hub");
        assert (locCfg != null);
        if (rmtCfg == null) {
            return;
        }
        HashSet<String> commonCaches = Collections.EMPTY_SET;
        HashSet<String> commonGroups = Collections.EMPTY_SET;
        if (this.cfg.isDrUseCacheNames()) {
            assert (this.ctx.localNode().attribute("plugins.gg.replication.snd.groups") == null);
            if (rmtNode.attribute("plugins.gg.replication.snd.groups") != null) {
                return;
            }
            HashSet<String> allCaches = new HashSet<String>(locCfg.getCacheNames());
            commonCaches = new HashSet<String>(allCaches.size(), 1.0f);
            for (String cacheName : rmtCfg.getCacheNames()) {
                if (allCaches.add(cacheName)) continue;
                commonCaches.add(cacheName);
            }
        } else {
            String[] rmtGroups = (String[])rmtNode.attribute("plugins.gg.replication.snd.groups");
            Object[] locGroups = (String[])this.ctx.localNode().attribute("plugins.gg.replication.snd.groups");
            assert (!F.isEmpty((Object[])locGroups) && F.isEmpty(locCfg.getCacheNames()));
            if (rmtNode.attribute("plugins.gg.replication.snd.groups") == null) {
                return;
            }
            HashSet<Object> allGroups = new HashSet<Object>(Arrays.asList(locGroups));
            commonGroups = new HashSet<String>(allGroups.size(), 1.0f);
            for (String cacheName : rmtGroups) {
                if (allGroups.add(cacheName)) continue;
                commonGroups.add(cacheName);
            }
        }
        if (!commonGroups.isEmpty() || !commonCaches.isEmpty()) {
            if (!F.eq((Object)locCfg.getStoreClassName(), (Object)rmtCfg.getStoreClassName())) {
                throw new IgniteCheckedException("Sender hubs with common caches or belonging to common group must have the same store implementation (fix configuration or set -DIGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK=true system property) [commonCaches=" + commonCaches + ", localStore=" + locCfg.getStoreClassName() + ", remoteStore=" + rmtCfg.getStoreClassName() + ", remoteNode=" + rmtNode.id() + ']');
            }
            if (!F.eq(locCfg.getDataCenterIds(), rmtCfg.getDataCenterIds())) {
                throw new IgniteCheckedException("Sender hubs with common caches or belonging to common group must work with the same set of receiver data centers (fix configuration or set -DIGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK=true system property) [commonCaches=" + commonCaches + ", localDataCenters=" + locCfg.getDataCenterIds() + ", remoteDataCenters=" + rmtCfg.getDataCenterIds() + ", remoteNode=" + rmtNode.id() + ']');
            }
            if (!F.eq(locCfg.getReplicasIgnore(), rmtCfg.getReplicasIgnore())) {
                throw new IgniteCheckedException("Sender hubs with common caches or belonging to common group must have the same ignored data centers (fix configuration or set -DIGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK=true system property)[commonCaches=" + commonCaches + ", locIgnore=" + locCfg.getReplicasIgnore() + ", rmtIgnore=" + rmtCfg.getReplicasIgnore() + ", rmtNodeId=" + rmtNode.id() + ']');
            }
        }
    }

    @Override
    public void printMemoryStats() {
        X.println((String)">>>", (Object[])new Object[0]);
        X.println((String)(">>> Replication processor memory stats [igniteInstanceName=" + this.igniteInstanceName() + ']'), (Object[])new Object[0]);
        if (this.rcvHub != null) {
            this.rcvHub.printMemoryStats();
        }
        if (this.sndHub != null) {
            this.sndHub.printMemoryStats();
        }
    }

    @Nullable
    public GridDr dr() {
        return this.dr;
    }

    public void submit(Runnable task) {
        assert (this.drMgmtPool != null);
        this.drMgmtPool.submit(task);
    }

    public void submitStateTransferTask(StateTransferTask task) {
        assert (this.stateTransferWorkerPool != null);
        this.stateTransferQ.offer(task);
    }

    @Nullable
    public Factory<SSLContext> getSslContextFactory(DrSenderConfiguration cfg) {
        Factory res = cfg.getSslContextFactory();
        if (res == null && cfg.isUseIgniteSslContextFactory()) {
            res = this.config().getSslContextFactory();
        }
        return res;
    }

    @Nullable
    public Factory<SSLContext> getSslContextFactory(DrReceiverConfiguration cfg) {
        Factory res = cfg.getSslContextFactory();
        if (res == null && cfg.isUseIgniteSslContextFactory()) {
            res = this.config().getSslContextFactory();
        }
        return res;
    }

    public Collection<CacheDrStateTransfer> listStateTransfers() throws IgniteCheckedException {
        LinkedList<CacheDrStateTransfer> res = new LinkedList<CacheDrStateTransfer>();
        IgniteInternalCache sysCache = this.kernalContext().cache().utilityCache();
        Iterator it = sysCache.scanIterator(false, null);
        while (it.hasNext()) {
            Cache.Entry entry = (Cache.Entry)it.next();
            if (!(entry.getKey() instanceof CacheDrStateTransferKey)) continue;
            CacheDrStateTransferKey key = (CacheDrStateTransferKey)entry.getKey();
            CacheDrStateTransferInfo val = (CacheDrStateTransferInfo)entry.getValue();
            DynamicCacheDescriptor cacheDesc = this.kernalContext().cache().cacheDescriptor(key.cacheName());
            if (cacheDesc == null || val == null) continue;
            int parts = cacheDesc.cacheConfiguration().getAffinity().partitions();
            CacheDrStateTransferResultInfo info = (CacheDrStateTransferResultInfo)sysCache.get((Object)CacheDrStateTransferResultKey.fromStateTransferKey(key));
            if (info == null || info.done()) continue;
            res.add(new CacheDrStateTransfer(key.id(), key.dataCenterIds(), key.cacheName(), val.nodeId(), val.startTime(), val.isSyncTransfer(), parts - val.partitions().size()));
        }
        return res;
    }

    public IgniteInternalFuture<?> cancelStateTransfer(@NotNull IgniteUuid taskUuid) throws IgniteCheckedException {
        IgniteInternalCache sysCache = this.kernalContext().cache().utilityCache();
        Iterator it = sysCache.scanIterator(false, null);
        while (it.hasNext()) {
            CacheDrStateTransferKey key;
            Cache.Entry entry = (Cache.Entry)it.next();
            if (!(entry.getKey() instanceof CacheDrStateTransferKey) || !(key = (CacheDrStateTransferKey)entry.getKey()).id().equals((Object)taskUuid)) continue;
            String cacheName = key.cacheName();
            GridCacheAdapter cache = this.kernalContext().cache().internalCache(cacheName);
            if (cache == null) {
                throw new IllegalArgumentException("Cache is not configured: " + cacheName);
            }
            GridGainCacheDrManager drMgr = DrUtils.drManagerSafe(cache.context().dr(), cacheName);
            return drMgr.stopDrStateTransfer(key);
        }
        this.log.warning("State transfer task with requested uuid has already been cancelled or never been started: taskUid=" + taskUuid);
        return new GridFinishedFuture();
    }
}

