/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import io.reactivex.internal.subscriptions.EmptySubscription;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.RangeSet;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.IteratorResponse;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.SegmentAwareOperation;
import org.infinispan.stream.impl.StreamIteratorNextCommand;
import org.infinispan.stream.impl.StreamRequestCommand;
import org.infinispan.stream.impl.TerminalOperation;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ClusterStreamManagerImpl<K>
implements ClusterStreamManager<K> {
    protected final Map<String, RequestTracker> currentlyRunning = new ConcurrentHashMap<String, RequestTracker>();
    protected final Set<Subscriber> iteratorsRunning = new ConcurrentHashSet();
    protected final AtomicInteger requestId = new AtomicInteger();
    @Inject
    protected RpcManager rpc;
    @Inject
    protected CommandsFactory factory;
    protected RpcOptions rpcOptions;
    protected Address localAddress;
    protected static final Log log = LogFactory.getLog(ClusterStreamManagerImpl.class);
    protected static final boolean trace = log.isTraceEnabled();

    @Start
    public void start() {
        this.localAddress = this.rpc.getAddress();
        this.rpcOptions = new RpcOptions(DeliverOrder.NONE, Long.MAX_VALUE, TimeUnit.DAYS);
    }

    @Override
    public <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        return this.commonRemoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL, earlyTerminatePredicate);
    }

    @Override
    public <R> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        return this.commonRemoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL_REHASH, earlyTerminatePredicate);
    }

    private <R> Object commonRemoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, SegmentAwareOperation operation, ClusterStreamManager.ResultsCallback<R> callback, StreamRequestCommand.Type type, Predicate<? super R> earlyTerminatePredicate) {
        String id;
        Map<Address, Set<Integer>> targets = this.determineTargets(ch, segments, callback);
        if (!targets.isEmpty()) {
            id = this.localAddress.toString() + this.requestId.getAndIncrement();
            log.tracef("Performing remote operations %s for id %s", targets, id);
            RequestTracker<? super R> tracker = new RequestTracker<R>(callback, targets, earlyTerminatePredicate);
            this.currentlyRunning.put(id, tracker);
            if (parallelDistribution) {
                this.submitAsyncTasks(id, targets, keysToExclude, parallelStream, keysToInclude, includeLoader, type, operation);
            } else {
                for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
                    Set<Integer> targetSegments = targetInfo.getValue();
                    Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, targetSegments);
                    StreamRequestCommand<K> command = this.factory.buildStreamRequestCommand(id, parallelStream, type, targetSegments, keysToInclude, keysExcluded, includeLoader, operation);
                    command.setTopologyId(this.rpc.getTopologyId());
                    this.rpc.blocking(this.rpc.invokeCommand(targetInfo.getKey(), command, VoidResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()));
                }
            }
        } else {
            log.tracef("Not performing remote operation for request as no valid targets for segments %s found", segments);
            id = null;
        }
        return id;
    }

    @Override
    public <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, KeyTrackingTerminalOperation<K, R, ?> operation, ClusterStreamManager.ResultsCallback<Collection<R>> callback) {
        return this.commonRemoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL_KEY, null);
    }

    @Override
    public <R2> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, KeyTrackingTerminalOperation<K, ?, R2> operation, ClusterStreamManager.ResultsCallback<Map<K, R2>> callback) {
        String id;
        Map<Address, Set<Integer>> targets = this.determineTargets(ch, segments, callback);
        if (!targets.isEmpty()) {
            id = this.localAddress.toString() + "-" + this.requestId.getAndIncrement();
            log.tracef("Performing remote rehash key aware operations %s for id %s", targets, id);
            RequestTracker<Map<K, R2>> tracker = new RequestTracker<Map<K, R2>>(callback, targets, null);
            this.currentlyRunning.put(id, tracker);
            if (parallelDistribution) {
                this.submitAsyncTasks(id, targets, keysToExclude, parallelStream, keysToInclude, includeLoader, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, operation);
            } else {
                for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
                    Address dest = targetInfo.getKey();
                    Set<Integer> targetSegments = targetInfo.getValue();
                    try {
                        Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, targetSegments);
                        log.tracef("Submitting task to %s for %s excluding keys %s", dest, id, keysExcluded);
                        StreamRequestCommand<K> command = this.factory.buildStreamRequestCommand(id, parallelStream, StreamRequestCommand.Type.TERMINAL_KEY_REHASH, targetSegments, keysToInclude, keysExcluded, includeLoader, operation);
                        command.setTopologyId(this.rpc.getTopologyId());
                        Response response = this.rpc.blocking(this.rpc.invokeCommand(dest, command, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions()));
                        if (response.isSuccessful()) continue;
                        log.tracef("Unsuccessful response for %s from %s - making segments %s suspect", id, dest, targetSegments);
                        this.receiveResponse(id, dest, true, targetSegments, null);
                    }
                    catch (Exception e) {
                        boolean wasSuspect = this.containedSuspectException(e);
                        if (!wasSuspect) {
                            log.tracef(e, "Encountered exception for %s from %s", id, dest);
                            throw e;
                        }
                        log.tracef("Exception from %s contained a SuspectException, making all segments %s suspect", dest, targetSegments);
                        this.receiveResponse(id, dest, true, targetSegments, null);
                    }
                }
            }
        } else {
            log.tracef("Not performing remote rehash key aware operation for request as no valid targets for segments %s found", segments);
            id = null;
        }
        return id;
    }

    private void submitAsyncTasks(String id, Map<Address, Set<Integer>> targets, Map<Integer, Set<K>> keysToExclude, boolean parallelStream, Set<K> keysToInclude, boolean includeLoader, StreamRequestCommand.Type type, Object operation) {
        for (Map.Entry<Address, Set<Integer>> targetInfo : targets.entrySet()) {
            Set<Integer> segments = targetInfo.getValue();
            Set<K> keysExcluded = this.determineExcludedKeys(keysToExclude, segments);
            Address dest = targetInfo.getKey();
            log.tracef("Submitting async task to %s for %s excluding keys %s", dest, id, keysExcluded);
            StreamRequestCommand<K> command = this.factory.buildStreamRequestCommand(id, parallelStream, type, segments, keysToInclude, keysExcluded, includeLoader, operation);
            command.setTopologyId(this.rpc.getTopologyId());
            CompletionStage<ValidResponse> completableFuture = this.rpc.invokeCommand(dest, command, SingleResponseCollector.validOnly(), this.rpc.getSyncRpcOptions());
            completableFuture.whenComplete((response, e) -> {
                if (e != null) {
                    boolean wasSuspect = this.containedSuspectException((Throwable)e);
                    if (!wasSuspect) {
                        log.tracef((Throwable)e, "Encountered exception for %s from %s", id, targetInfo.getKey());
                        RequestTracker tracker = this.currentlyRunning.get(id);
                        if (tracker != null) {
                            ClusterStreamManagerImpl.markTrackerWithException(tracker, dest, e, id);
                        } else {
                            log.warnf("Unhandled remote stream exception encountered", e);
                        }
                    } else {
                        log.tracef("Exception contained a SuspectException, making all segments %s suspect", targetInfo.getValue());
                        this.receiveResponse(id, (Address)targetInfo.getKey(), true, (Set)targetInfo.getValue(), null);
                    }
                } else if (response != null && !response.isSuccessful()) {
                    log.tracef("Unsuccessful response for %s from %s - making segments suspect", id, targetInfo.getKey());
                    this.receiveResponse(id, (Address)targetInfo.getKey(), true, (Set)targetInfo.getValue(), null);
                }
            });
        }
    }

    private boolean containedSuspectException(Throwable e) {
        Throwable cause = e;
        boolean wasSuspect = false;
        do {
            if (!(cause instanceof SuspectException)) continue;
            wasSuspect = true;
            break;
        } while ((cause = cause.getCause()) != null);
        return wasSuspect;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static void markTrackerWithException(RequestTracker<?> tracker, Address dest, Throwable e, Object uuid) {
        log.tracef("Marking tracker to have exception", new Object[0]);
        tracker.throwable = e;
        if (dest == null || tracker.lastResult(dest, null)) {
            if (uuid != null) {
                log.tracef("Tracker %s completed with exception, waking sleepers!", uuid);
            } else {
                log.trace("Tracker completed due to outside cause, waking sleepers! ");
            }
            tracker.completionLock.lock();
            try {
                tracker.completionCondition.signalAll();
            }
            finally {
                tracker.completionLock.unlock();
            }
        }
    }

    private Set<K> determineExcludedKeys(Map<Integer, Set<K>> keysToExclude, Set<Integer> segmentsToUse) {
        if (keysToExclude.isEmpty()) {
            return Collections.emptySet();
        }
        return segmentsToUse.stream().flatMap(s -> {
            Set keysForSegment = (Set)keysToExclude.get(s);
            if (keysForSegment != null) {
                return keysForSegment.stream();
            }
            return null;
        }).collect(Collectors.toSet());
    }

    private Set<K> determineExcludedKeys(IntFunction<Set<K>> keysToExclude, IntSet segmentsToUse) {
        if (keysToExclude == null) {
            return Collections.emptySet();
        }
        return segmentsToUse.intStream().mapToObj(s -> {
            Set keysForSegment = (Set)keysToExclude.apply(s);
            if (keysForSegment != null) {
                return keysForSegment.stream();
            }
            return null;
        }).flatMap(Function.identity()).collect(Collectors.toSet());
    }

    private Map<Address, Set<Integer>> determineTargets(ConsistentHash ch, Set<Integer> segments, ClusterStreamManager.ResultsCallback<?> callback) {
        if (segments == null) {
            segments = new RangeSet(ch.getNumSegments());
        }
        ConcurrentHashMap<Address, Set<Integer>> targets = new ConcurrentHashMap<Address, Set<Integer>>();
        for (Integer segment : segments) {
            Address owner = ch.locatePrimaryOwnerForSegment(segment);
            if (owner == null) {
                callback.onSegmentsLost(Collections.singleton(segment));
                callback.requestFutureTopology();
                continue;
            }
            if (owner.equals(this.localAddress)) continue;
            targets.computeIfAbsent(owner, t -> new SmallIntSet()).add(segment);
        }
        return targets;
    }

    @Override
    public boolean isComplete(Object id) {
        return !this.currentlyRunning.containsKey(id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean awaitCompletion(Object id, long time, TimeUnit unit) throws InterruptedException {
        if (time <= 0L) {
            throw new IllegalArgumentException("Time must be greater than 0");
        }
        Objects.requireNonNull(id, "Identifier must be non null");
        log.tracef("Awaiting completion of %s", id);
        boolean completed = false;
        long target = System.nanoTime() + unit.toNanos(time);
        Throwable throwable = null;
        while (target - System.nanoTime() > 0L) {
            RequestTracker tracker = this.currentlyRunning.get(id);
            if (tracker == null) {
                completed = true;
                break;
            }
            throwable = tracker.throwable;
            if (throwable != null) break;
            tracker.completionLock.lock();
            try {
                if (!this.currentlyRunning.containsKey(id)) {
                    completed = true;
                    throwable = tracker.throwable;
                    break;
                }
                if (tracker.completionCondition.await(target - System.nanoTime(), TimeUnit.NANOSECONDS)) continue;
                throwable = tracker.throwable;
                completed = false;
                break;
            }
            finally {
                tracker.completionLock.unlock();
            }
        }
        log.tracef("Returning back to caller due to %s being completed: %s", id, completed);
        if (throwable != null) {
            if (throwable instanceof RuntimeException) {
                throw (RuntimeException)throwable;
            }
            throw new CacheException(throwable);
        }
        return completed;
    }

    @Override
    public void forgetOperation(Object id) {
        RequestTracker tracker;
        if (id != null && (tracker = this.currentlyRunning.remove(id)) != null) {
            tracker.completionLock.lock();
            try {
                tracker.completionCondition.signalAll();
            }
            finally {
                tracker.completionLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <R1> boolean receiveResponse(Object id, Address origin, boolean complete, Set<Integer> missingSegments, R1 response) {
        log.tracef("Received response from %s with a completed response %s for id %s with %s suspected segments.", new Object[]{origin, complete, id, missingSegments});
        RequestTracker tracker = this.currentlyRunning.get(id);
        if (tracker != null) {
            boolean notify = false;
            RequestTracker requestTracker = tracker;
            synchronized (requestTracker) {
                if (tracker.awaitingResponse.containsKey(origin)) {
                    if (!missingSegments.isEmpty()) {
                        tracker.missingSegments(missingSegments);
                    }
                    if (complete) {
                        notify = tracker.lastResult(origin, response);
                    } else {
                        tracker.intermediateResults(origin, response);
                    }
                }
            }
            if (notify) {
                log.tracef("Marking %s as completed!", id);
                tracker.completionLock.lock();
                try {
                    this.currentlyRunning.remove(id);
                    tracker.completionCondition.signalAll();
                }
                finally {
                    tracker.completionLock.unlock();
                }
            }
            return !notify;
        }
        log.tracef("Ignoring response as we already received a completed response for %s from %s", id, origin);
        return false;
    }

    @Override
    public <E> ClusterStreamManager.RemoteIteratorPublisher<E> remoteIterationPublisher(boolean parallelStream, Supplier<Map.Entry<Address, IntSet>> targets, Set<K> keysToInclude, IntFunction<Set<K>> keysToExclude, boolean includeLoader, Iterable<IntermediateOperation> intermediateOperations) {
        return new RemoteIteratorPublisherImpl(parallelStream, targets, keysToInclude, keysToExclude, includeLoader, intermediateOperations);
    }

    static class RequestTracker<R> {
        final ClusterStreamManager.ResultsCallback<R> callback;
        final Map<Address, Set<Integer>> awaitingResponse;
        final Lock completionLock = new ReentrantLock();
        final Condition completionCondition = this.completionLock.newCondition();
        final Predicate<? super R> earlyTerminatePredicate;
        Set<Integer> missingSegments;
        volatile Throwable throwable;

        RequestTracker(ClusterStreamManager.ResultsCallback<R> callback, Map<Address, Set<Integer>> awaitingResponse, Predicate<? super R> earlyTerminatePredicate) {
            this.callback = callback;
            this.awaitingResponse = awaitingResponse;
            this.earlyTerminatePredicate = earlyTerminatePredicate;
        }

        public void intermediateResults(Address origin, R intermediateResult) {
            this.callback.onIntermediateResult(origin, intermediateResult);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean lastResult(Address origin, R result) {
            Set<Integer> completedSegments = this.awaitingResponse.get(origin);
            if (this.missingSegments != null) {
                completedSegments.removeAll(this.missingSegments);
            }
            this.callback.onCompletion(origin, completedSegments, result);
            RequestTracker requestTracker = this;
            synchronized (requestTracker) {
                if (this.earlyTerminatePredicate != null && this.earlyTerminatePredicate.test(result)) {
                    this.awaitingResponse.clear();
                } else {
                    this.awaitingResponse.remove(origin);
                }
                return this.awaitingResponse.isEmpty();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void missingSegments(Set<Integer> segments) {
            RequestTracker requestTracker = this;
            synchronized (requestTracker) {
                if (this.missingSegments == null) {
                    this.missingSegments = segments;
                } else {
                    this.missingSegments.addAll(segments);
                }
            }
            this.callback.onSegmentsLost(segments);
        }
    }

    private class ClusterStreamSubscription<V>
    implements Subscription {
        private final Subscriber<? super V> s;
        private final RemoteIteratorPublisherImpl<V> publisher;
        private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsComplete;
        private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsLost;
        private final String id;
        private final AtomicLong requestedAmount = new AtomicLong();
        private final AtomicBoolean pendingRequest = new AtomicBoolean();
        private volatile AtomicReference<Map.Entry<Address, IntSet>> currentTarget;
        private volatile boolean alreadyCreated;

        ClusterStreamSubscription(Subscriber<? super V> s, RemoteIteratorPublisherImpl<V> publisher, Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsComplete, Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsLost, String id, Map.Entry<Address, IntSet> currentTarget) {
            this.s = s;
            this.publisher = publisher;
            this.onSegmentsComplete = onSegmentsComplete;
            this.onSegmentsLost = onSegmentsLost;
            this.id = id;
            this.currentTarget = new AtomicReference<Map.Entry<Address, IntSet>>(currentTarget);
        }

        public void request(long n) {
            if (this.currentTarget == null) {
                return;
            }
            if (n <= 0L) {
                throw new IllegalArgumentException("request amount must be greater than 0");
            }
            long batchAmount = this.requestedAmount.addAndGet(n);
            if (!this.pendingRequest.getAndSet(true)) {
                this.sendRequest(batchAmount);
            }
        }

        StreamIteratorNextCommand getCommand(IntSet segments, long batchAmount) {
            if (this.alreadyCreated) {
                return ClusterStreamManagerImpl.this.factory.buildStreamIteratorNextCommand(this.id, batchAmount);
            }
            this.alreadyCreated = true;
            return ClusterStreamManagerImpl.this.factory.buildStreamIteratorRequestCommand(this.id, ((RemoteIteratorPublisherImpl)this.publisher).parallelStream, segments, ((RemoteIteratorPublisherImpl)this.publisher).keysToInclude, ClusterStreamManagerImpl.this.determineExcludedKeys(((RemoteIteratorPublisherImpl)this.publisher).keysToExclude, segments), ((RemoteIteratorPublisherImpl)this.publisher).includeLoader, ((RemoteIteratorPublisherImpl)this.publisher).intermediateOperations, batchAmount);
        }

        private void sendRequest(long batchAmount) {
            Map.Entry<Address, IntSet> target = this.currentTarget.get();
            if (target != null) {
                IntSet segments = target.getValue();
                if (trace) {
                    log.tracef("Request: %s is requesting %d more entries from %s in segments %s", new Object[]{this.id, batchAmount, target, segments});
                }
                Address sendee = target.getKey();
                StreamIteratorNextCommand command = this.getCommand(segments, batchAmount);
                command.setTopologyId(ClusterStreamManagerImpl.this.rpc.getTopologyId());
                CompletionStage<ValidResponse> rpcStage = ClusterStreamManagerImpl.this.rpc.invokeCommand(sendee, (ReplicableCommand)command, SingleResponseCollector.validOnly(), ClusterStreamManagerImpl.this.rpcOptions);
                rpcStage.whenComplete((r, t) -> {
                    if (t != null) {
                        this.handleThrowable((Throwable)t, target);
                    } else {
                        try {
                            if (r instanceof SuccessfulResponse) {
                                long remaining;
                                IteratorResponse iteratorResponse = (IteratorResponse)r.getResponseValue();
                                if (trace) {
                                    log.tracef("Received valid response %s for id %s from node %s", iteratorResponse, this.id, target.getKey());
                                }
                                long returnedAmount = 0L;
                                Iterator iter = iteratorResponse.getIterator();
                                while (iter.hasNext()) {
                                    ++returnedAmount;
                                    this.s.onNext(iter.next());
                                }
                                log.tracef("Received %d entries for id %s from %s", returnedAmount, this.id, sendee);
                                if (iteratorResponse.isComplete()) {
                                    Set<Integer> lostSegments = iteratorResponse.getSuspectedSegments();
                                    if (lostSegments.isEmpty()) {
                                        this.onSegmentsComplete.accept(() -> ((IntSet)segments).iterator());
                                    } else {
                                        this.onSegmentsLost.accept(() -> lostSegments.stream().mapToInt(Integer::intValue).iterator());
                                        if (lostSegments.size() != segments.size()) {
                                            this.onSegmentsComplete.accept(() -> segments.intStream().filter(s -> !lostSegments.contains(s)).iterator());
                                        }
                                    }
                                    Map.Entry nextTarget = (Map.Entry)((RemoteIteratorPublisherImpl)this.publisher).targets.get();
                                    if (nextTarget != null) {
                                        this.alreadyCreated = false;
                                        this.currentTarget.compareAndSet(target, nextTarget);
                                    } else {
                                        this.currentTarget.set(null);
                                        this.completed();
                                        return;
                                    }
                                }
                                if ((remaining = this.requestedAmount.addAndGet(-returnedAmount)) > 0L) {
                                    this.sendRequest(remaining);
                                } else {
                                    this.pendingRequest.set(false);
                                    remaining = this.requestedAmount.get();
                                    if (remaining > 0L && !this.pendingRequest.getAndSet(true)) {
                                        this.sendRequest(remaining);
                                    }
                                }
                            } else {
                                this.handleThrowable(new IllegalArgumentException("Unsupported response received: " + r), target);
                            }
                        }
                        catch (Throwable throwable) {
                            this.cancel();
                            this.s.onError(throwable);
                        }
                    }
                });
            }
        }

        private void handleThrowable(Throwable t, Map.Entry<Address, IntSet> target) {
            this.cancel();
            if (t instanceof SuspectException || t.getCause() instanceof SuspectException) {
                if (trace) {
                    log.tracef("Received suspect exception for id %s from node %s when requesting segments %s", this.id, target.getKey(), target.getValue());
                }
                this.onSegmentsLost.accept(() -> ((IntSet)target.getValue()).iterator());
                this.s.onComplete();
            } else {
                if (trace) {
                    log.tracef(t, "Received exception for id %s from node %s when requesting segments %s", this.id, target.getKey(), target.getValue());
                }
                this.s.onError(t);
            }
        }

        public void cancel() {
            Map.Entry target = this.currentTarget.getAndSet(null);
            if (target != null && this.alreadyCreated) {
                Address targetNode = (Address)target.getKey();
                CompletionStage<ValidResponse> rpcStage = ClusterStreamManagerImpl.this.rpc.invokeCommand(targetNode, (ReplicableCommand)ClusterStreamManagerImpl.this.factory.buildStreamIteratorCloseCommand(this.id), SingleResponseCollector.validOnly(), ClusterStreamManagerImpl.this.rpcOptions);
                if (trace) {
                    rpcStage.exceptionally(t -> {
                        log.tracef((Throwable)t, "Unable to close iterator on %s for requestId %s", targetNode, ClusterStreamManagerImpl.this.requestId);
                        return null;
                    });
                }
            }
            ClusterStreamManagerImpl.this.iteratorsRunning.remove(this.s);
        }

        private void completed() {
            if (trace) {
                log.tracef("Processor completed for request: %s", this.id);
            }
            this.cancel();
            this.s.onComplete();
        }
    }

    private class RemoteIteratorPublisherImpl<V>
    implements ClusterStreamManager.RemoteIteratorPublisher<V> {
        private final boolean parallelStream;
        private final Supplier<Map.Entry<Address, IntSet>> targets;
        private final Set<K> keysToInclude;
        private final IntFunction<Set<K>> keysToExclude;
        private final boolean includeLoader;
        private final Iterable<IntermediateOperation> intermediateOperations;

        RemoteIteratorPublisherImpl(boolean parallelStream, Supplier<Map.Entry<Address, IntSet>> targets, Set<K> keysToInclude, IntFunction<Set<K>> keysToExclude, boolean includeLoader, Iterable<IntermediateOperation> intermediateOperations) {
            this.parallelStream = parallelStream;
            this.targets = targets;
            this.keysToInclude = keysToInclude;
            this.keysToExclude = keysToExclude;
            this.includeLoader = includeLoader;
            this.intermediateOperations = intermediateOperations;
        }

        @Override
        public void subscribe(Subscriber<? super V> s, Consumer<? super Supplier<PrimitiveIterator.OfInt>> onSegmentsComplete, Consumer<? super Supplier<PrimitiveIterator.OfInt>> onLostSegments) {
            Map.Entry<Address, IntSet> target = this.targets.get();
            if (target == null) {
                EmptySubscription.complete(s);
            } else {
                String id = ClusterStreamManagerImpl.this.localAddress.toString() + "-" + ClusterStreamManagerImpl.this.requestId.getAndIncrement();
                if (trace) {
                    log.tracef("Starting request: %s", id);
                }
                ClusterStreamManagerImpl.this.iteratorsRunning.add(s);
                s.onSubscribe(new ClusterStreamSubscription<V>(s, this, onSegmentsComplete, onLostSegments, id, target));
            }
        }
    }
}

