/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite.statetransfer;

import io.reactivex.Flowable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.XSiteStateTransferConfiguration;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.RetryOnFailureXSiteCommand;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStateProvider;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;

public class XSiteStateProviderImpl
implements XSiteStateProvider {
    private static final int DEFAULT_CHUNK_SIZE = 1024;
    private static final Log log = LogFactory.getLog(XSiteStateProviderImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final boolean debug = log.isDebugEnabled();
    private final ConcurrentMap<String, StatePushTask> runningStateTransfer = CollectionFactory.makeConcurrentMap();
    @Inject
    private InternalDataContainer<?, ?> dataContainer;
    @Inject
    private PersistenceManager persistenceManager;
    @Inject
    private ClusteringDependentLogic clusteringDependentLogic;
    @Inject
    private CommandsFactory commandsFactory;
    @Inject
    private RpcManager rpcManager;
    @Inject
    @ComponentName(value="org.infinispan.executors.transport")
    private ExecutorService executorService;
    @Inject
    private Configuration configuration;
    @Inject
    private ComponentRef<XSiteStateTransferManager> stateTransferManager;
    @Inject
    private StateTransferLock stateTransferLock;

    @Override
    public void startStateTransfer(String siteName, Address origin, int minTopologyId) {
        XSiteStateTransferConfiguration stateTransferConfiguration = null;
        for (BackupConfiguration backupConfiguration : this.configuration.sites().allBackups()) {
            if (!backupConfiguration.site().equals(siteName)) continue;
            stateTransferConfiguration = backupConfiguration.stateTransfer();
            break;
        }
        if (stateTransferConfiguration == null) {
            throw new CacheException("Unable to start X-Site State Transfer! Backup configuration not found for " + siteName + "!");
        }
        StatePushTask task = new StatePushTask(siteName, origin, stateTransferConfiguration, minTopologyId);
        if (this.runningStateTransfer.putIfAbsent(siteName, task) == null) {
            if (debug) {
                log.debugf("Starting state transfer to site '%s'", siteName);
            }
            this.executorService.execute(task);
        } else if (debug) {
            log.debugf("Do not start state transfer to site '%s'. It has already started!", siteName);
        }
        if (this.rpcManager.getAddress().equals(this.rpcManager.getMembers().get(0)) && !this.rpcManager.getMembers().contains(origin)) {
            this.stateTransferManager.running().becomeCoordinator(siteName);
        }
    }

    @Override
    public void cancelStateTransfer(String siteName) {
        StatePushTask task = (StatePushTask)this.runningStateTransfer.remove(siteName);
        if (task != null) {
            task.canceled = true;
        }
    }

    @Override
    public Collection<String> getCurrentStateSending() {
        ArrayList<String> sending = new ArrayList<String>(this.runningStateTransfer.size());
        for (Map.Entry entry : this.runningStateTransfer.entrySet()) {
            if (((StatePushTask)entry.getValue()).finished) continue;
            sending.add((String)entry.getKey());
        }
        return sending;
    }

    @Override
    public Collection<String> getSitesMissingCoordinator(Collection<Address> currentMembers) {
        ArrayList<String> stateTransferNeedsNewCoordinator = new ArrayList<String>(this.runningStateTransfer.size());
        for (Map.Entry entry : this.runningStateTransfer.entrySet()) {
            if (currentMembers.contains(((StatePushTask)entry.getValue()).origin)) continue;
            stateTransferNeedsNewCoordinator.add((String)entry.getKey());
        }
        return stateTransferNeedsNewCoordinator;
    }

    private void notifyStateTransferEnd(String siteName, Address origin, boolean error) {
        if (this.rpcManager.getAddress().equals(origin)) {
            this.executorService.submit(() -> {
                try {
                    this.stateTransferManager.running().notifyStatePushFinished(siteName, origin, !error);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                return null;
            });
        } else {
            XSiteStateTransferControlCommand command = this.commandsFactory.buildXSiteStateTransferControlCommand(XSiteStateTransferControlCommand.StateTransferControl.FINISH_SEND, siteName);
            command.setStatusOk(!error);
            this.rpcManager.invokeRemotely(Collections.singleton(origin), command, this.rpcManager.getDefaultRpcOptions(false));
        }
    }

    private boolean shouldSendKey(Object key) {
        return this.clusteringDependentLogic.getCacheTopology().getDistribution(key).isPrimary();
    }

    private void sendFromSharedBuffer(XSiteBackup xSiteBackup, List<XSiteState> sharedBuffer, StatePushTask task) throws Throwable {
        if (sharedBuffer.size() == 0) {
            return;
        }
        Object[] privateBuffer = sharedBuffer.toArray(new XSiteState[sharedBuffer.size()]);
        if (trace) {
            log.debugf("Sending chunk to site '%s'. Chunk contains %s", xSiteBackup.getSiteName(), Arrays.toString(privateBuffer));
        } else if (debug) {
            log.debugf("Sending chunk to site '%s'. Chunk has %s keys.", xSiteBackup.getSiteName(), privateBuffer.length);
        }
        XSiteStatePushCommand command = this.commandsFactory.buildXSiteStatePushCommand((XSiteState[])privateBuffer, xSiteBackup.getTimeout());
        RetryOnFailureXSiteCommand remoteSite = RetryOnFailureXSiteCommand.newInstance(xSiteBackup, command, task.retryPolicy);
        remoteSite.execute(this.rpcManager.getTransport(), task.waitTime, TimeUnit.MILLISECONDS);
    }

    private class StatePushTask
    implements Runnable {
        private final XSiteBackup xSiteBackup;
        private final int chunkSize;
        private final Address origin;
        private final RetryOnFailureXSiteCommand.RetryPolicy retryPolicy;
        private final long waitTime;
        private final int minTopologyId;
        private volatile boolean finished;
        private volatile boolean canceled;
        private boolean error;

        public StatePushTask(String siteName, Address origin, XSiteStateTransferConfiguration configuration, int minTopologyId) {
            this.minTopologyId = minTopologyId;
            this.chunkSize = configuration.chunkSize();
            this.waitTime = configuration.waitTime();
            this.retryPolicy = new RetryOnFailureXSiteCommand.MaxRetriesPolicy(configuration.maxRetries());
            this.origin = origin;
            this.xSiteBackup = new XSiteBackup(siteName, true, configuration.timeout());
            this.canceled = false;
            this.finished = false;
            this.error = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        @Override
        public void run() {
            block37: {
                try {
                    AdvancedCacheLoader stProvider;
                    if (debug) {
                        log.debugf("[X-Site State Transfer - %s] wait for min topology %s", this.xSiteBackup.getSiteName(), this.minTopologyId);
                    }
                    CompletableFutures.await(XSiteStateProviderImpl.this.stateTransferLock.topologyFuture(this.minTopologyId));
                    ArrayList<XSiteState> chunk = new ArrayList<XSiteState>(this.chunkSize <= 0 ? 1024 : this.chunkSize);
                    if (debug) {
                        log.debugf("[X-Site State Transfer - %s] start DataContainer iteration", this.xSiteBackup.getSiteName());
                    }
                    for (InternalCacheEntry ice : XSiteStateProviderImpl.this.dataContainer) {
                        if (this.canceled) {
                            log.debugf("[X-Site State Transfer - %s] State transfer canceled!", this.xSiteBackup.getSiteName());
                            return;
                        }
                        if (this.chunkSize > 0 && chunk.size() == this.chunkSize) {
                            try {
                                XSiteStateProviderImpl.this.sendFromSharedBuffer(this.xSiteBackup, chunk, this);
                            }
                            catch (Throwable t) {
                                this.error = true;
                                log.unableToSendXSiteState(this.xSiteBackup.getSiteName(), t);
                                this.finished = true;
                                log.debugf("[X-Site State Transfer - %s] State transfer finished!", this.xSiteBackup.getSiteName());
                                if (!this.canceled) {
                                    XSiteStateProviderImpl.this.notifyStateTransferEnd(this.xSiteBackup.getSiteName(), this.origin, this.error);
                                }
                                return;
                            }
                            chunk.clear();
                        }
                        if (!XSiteStateProviderImpl.this.shouldSendKey(ice.getKey())) continue;
                        if (trace) {
                            log.tracef("Added key '%s' to current chunk", ice.getKey());
                        }
                        chunk.add(XSiteState.fromDataContainer(ice));
                    }
                    if (this.canceled) {
                        return;
                    }
                    if (chunk.size() > 0) {
                        try {
                            XSiteStateProviderImpl.this.sendFromSharedBuffer(this.xSiteBackup, chunk, this);
                        }
                        catch (Throwable t) {
                            this.error = true;
                            log.unableToSendXSiteState(this.xSiteBackup.getSiteName(), t);
                            this.finished = true;
                            log.debugf("[X-Site State Transfer - %s] State transfer finished!", this.xSiteBackup.getSiteName());
                            if (!this.canceled) {
                                XSiteStateProviderImpl.this.notifyStateTransferEnd(this.xSiteBackup.getSiteName(), this.origin, this.error);
                            }
                            return;
                        }
                    }
                    if (debug) {
                        log.debugf("[X-Site State Transfer - %s] finish DataContainer iteration", this.xSiteBackup.getSiteName());
                    }
                    if ((stProvider = XSiteStateProviderImpl.this.persistenceManager.getStateTransferProvider()) != null) {
                        if (debug) {
                            log.debugf("[X-Site State Transfer - %s] start Persistence iteration", this.xSiteBackup.getSiteName());
                        }
                        try {
                            Flowable.fromPublisher(stProvider.publishEntries(k -> XSiteStateProviderImpl.this.shouldSendKey(k) && !XSiteStateProviderImpl.this.dataContainer.containsKey(k), true, true)).map(XSiteState::fromCacheLoader).takeUntil(l -> this.canceled).buffer(this.chunkSize <= 0 ? Integer.MAX_VALUE : this.chunkSize).blockingForEach(l -> {
                                try {
                                    XSiteStateProviderImpl.this.sendFromSharedBuffer(this.xSiteBackup, l, this);
                                }
                                catch (Throwable throwable) {
                                    throw new CacheException(throwable);
                                }
                            });
                            if (this.canceled) {
                                log.debugf("[X-Site State Transfer - %s] State transfer canceled!", this.xSiteBackup.getSiteName());
                                return;
                            }
                        }
                        catch (CacheException e) {
                            this.error = true;
                            log.failedLoadingKeysFromCacheStore((Exception)((Object)e));
                            return;
                        }
                        catch (Throwable t) {
                            this.error = true;
                            log.unableToSendXSiteState(this.xSiteBackup.getSiteName(), t);
                            this.finished = true;
                            log.debugf("[X-Site State Transfer - %s] State transfer finished!", this.xSiteBackup.getSiteName());
                            if (!this.canceled) {
                                XSiteStateProviderImpl.this.notifyStateTransferEnd(this.xSiteBackup.getSiteName(), this.origin, this.error);
                            }
                            return;
                        }
                        if (debug) {
                            log.debugf("[X-Site State Transfer - %s] finish Persistence iteration", this.xSiteBackup.getSiteName());
                        }
                        break block37;
                    }
                    if (debug) {
                        log.debugf("[X-Site State Transfer - %s] skip Persistence iteration", this.xSiteBackup.getSiteName());
                    }
                    break block37;
                    {
                        catch (Throwable e) {
                            this.error = true;
                            log.unableToSendXSiteState(this.xSiteBackup.getSiteName(), e);
                            break block37;
                        }
                        catch (Throwable throwable) {
                            throw throwable;
                        }
                    }
                }
                finally {
                    this.finished = true;
                    log.debugf("[X-Site State Transfer - %s] State transfer finished!", this.xSiteBackup.getSiteName());
                    if (!this.canceled) {
                        XSiteStateProviderImpl.this.notifyStateTransferEnd(this.xSiteBackup.getSiteName(), this.origin, this.error);
                    }
                }
            }
        }

        public String toString() {
            return "StatePushTask{origin=" + this.origin + ", canceled=" + this.canceled + '}';
        }
    }
}

