/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateProviderImpl;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.concurrent.AggregatingNotifyingFutureBuilder;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class OutboundTransferTask
implements Runnable {
    private static final Log log = LogFactory.getLog(OutboundTransferTask.class);
    private final boolean trace = log.isTraceEnabled();
    private StateProviderImpl stateProvider;
    private final int topologyId;
    private final Address destination;
    private final Set<Integer> segments = new CopyOnWriteArraySet<Integer>();
    private final int stateTransferChunkSize;
    private final Configuration configuration;
    private final ConsistentHash readCh;
    private final DataContainer dataContainer;
    private final CacheLoaderManager cacheLoaderManager;
    private final RpcManager rpcManager;
    private final CommandsFactory commandsFactory;
    private final long timeout;
    private final Map<Integer, List<InternalCacheEntry>> entriesBySegment = ConcurrentMapFactory.makeConcurrentMap();
    private int accumulatedEntries;
    private final NotifyingNotifiableFuture<Object> sendFuture = new AggregatingNotifyingFutureBuilder(null);
    private FutureTask runnableFuture;

    public OutboundTransferTask(Address destination, Set<Integer> segments, int stateTransferChunkSize, int topologyId, ConsistentHash readCh, StateProviderImpl stateProvider, DataContainer dataContainer, CacheLoaderManager cacheLoaderManager, RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory, long timeout) {
        if (segments == null || segments.isEmpty()) {
            throw new IllegalArgumentException("Segments must not be null or empty");
        }
        if (destination == null) {
            throw new IllegalArgumentException("Destination address cannot be null");
        }
        if (stateTransferChunkSize <= 0) {
            throw new IllegalArgumentException("stateTransferChunkSize must be greater than 0");
        }
        this.stateProvider = stateProvider;
        this.destination = destination;
        this.segments.addAll(segments);
        this.stateTransferChunkSize = stateTransferChunkSize;
        this.topologyId = topologyId;
        this.readCh = readCh;
        this.dataContainer = dataContainer;
        this.cacheLoaderManager = cacheLoaderManager;
        this.rpcManager = rpcManager;
        this.configuration = configuration;
        this.commandsFactory = commandsFactory;
        this.timeout = timeout;
    }

    public void execute(ExecutorService executorService) {
        if (this.runnableFuture != null) {
            throw new IllegalStateException("This task was already submitted");
        }
        this.runnableFuture = new FutureTask<Void>((Runnable)this, null){

            @Override
            protected void done() {
                OutboundTransferTask.this.stateProvider.onTaskCompletion(OutboundTransferTask.this);
            }
        };
        executorService.submit(this.runnableFuture);
    }

    public Address getDestination() {
        return this.destination;
    }

    public Set<Integer> getSegments() {
        return this.segments;
    }

    @Override
    public void run() {
        block12: {
            try {
                for (InternalCacheEntry ice : this.dataContainer) {
                    Object key = ice.getKey();
                    int segmentId = this.readCh.getSegment(key);
                    if (!this.segments.contains(segmentId)) continue;
                    this.sendEntry(ice, segmentId);
                }
                CacheStore cacheStore = this.getCacheStore();
                if (cacheStore != null) {
                    try {
                        Set<Object> storedKeys = cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(this.dataContainer));
                        for (Object key : storedKeys) {
                            int segmentId = this.readCh.getSegment(key);
                            if (!this.segments.contains(segmentId)) continue;
                            try {
                                InternalCacheEntry ice = cacheStore.load(key);
                                if (ice == null) continue;
                                this.sendEntry(ice, segmentId);
                            }
                            catch (CacheLoaderException e) {
                                log.failedLoadingValueFromCacheStore(key, e);
                            }
                        }
                    }
                    catch (CacheLoaderException e) {
                        log.failedLoadingKeysFromCacheStore(e);
                    }
                } else if (this.trace) {
                    log.tracef("No cache store or the cache store is shared, no need to send any stored cache entries for segments: %s", this.segments);
                }
                this.sendEntries(true);
            }
            catch (Throwable t) {
                if (this.runnableFuture.isCancelled()) break block12;
                log.error("Failed to execute outbound transfer", t);
            }
        }
        if (this.trace) {
            log.tracef("Outbound transfer of segments %s to %s is complete", this.segments, this.destination);
        }
    }

    private CacheStore getCacheStore() {
        if (this.cacheLoaderManager != null && this.cacheLoaderManager.isEnabled() && !this.cacheLoaderManager.isShared() && this.cacheLoaderManager.isFetchPersistentState()) {
            return this.cacheLoaderManager.getCacheStore();
        }
        return null;
    }

    private void sendEntry(InternalCacheEntry ice, int segmentId) {
        List<InternalCacheEntry> entries;
        if (this.accumulatedEntries >= this.stateTransferChunkSize) {
            this.sendEntries(false);
            this.entriesBySegment.clear();
            this.accumulatedEntries = 0;
        }
        if ((entries = this.entriesBySegment.get(segmentId)) == null) {
            entries = new ArrayList<InternalCacheEntry>();
            this.entriesBySegment.put(segmentId, entries);
        }
        entries.add(ice);
        ++this.accumulatedEntries;
    }

    private void sendEntries(boolean isLast) {
        ArrayList<StateChunk> chunks = new ArrayList<StateChunk>();
        if (isLast) {
            Iterator<Object> i$ = this.segments.iterator();
            while (i$.hasNext()) {
                int n = (Integer)i$.next();
                List<InternalCacheEntry> entries = this.entriesBySegment.get(n);
                if (entries == null) {
                    entries = Collections.emptyList();
                }
                chunks.add(new StateChunk(n, entries, isLast));
            }
        } else {
            for (Map.Entry entry : this.entriesBySegment.entrySet()) {
                List entries = (List)entry.getValue();
                if (entries.isEmpty()) continue;
                chunks.add(new StateChunk((Integer)entry.getKey(), entries, isLast));
            }
        }
        if (!chunks.isEmpty() || isLast) {
            if (this.trace) {
                log.tracef("Sending %d cache entries from segments %s to node %s", this.accumulatedEntries, this.entriesBySegment.keySet(), this.destination);
            }
            StateResponseCommand cmd = this.commandsFactory.buildStateResponseCommand(this.rpcManager.getAddress(), this.topologyId, chunks);
            this.rpcManager.invokeRemotelyInFuture(Collections.singleton(this.destination), cmd, false, this.sendFuture, this.timeout);
        }
    }

    public void cancelSegments(Set<Integer> cancelledSegments) {
        if (this.trace) {
            log.tracef("Cancelling outbound transfer of segments %s to %s", cancelledSegments, this.destination);
        }
        if (this.segments.removeAll(cancelledSegments)) {
            this.entriesBySegment.keySet().removeAll(cancelledSegments);
            if (this.segments.isEmpty()) {
                this.cancel();
            }
        }
    }

    public void cancel() {
        if (this.runnableFuture != null && !this.runnableFuture.isCancelled()) {
            this.runnableFuture.cancel(true);
            this.sendFuture.cancel(true);
        }
    }

    public boolean isCancelled() {
        return this.runnableFuture != null && this.runnableFuture.isCancelled();
    }
}

