/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commons.util.IntSet;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class InnerPublisherSubscription<K, I, R>
extends AtomicLong
implements Publisher<R>,
Subscription {
    protected static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    protected static final boolean trace = log.isTraceEnabled();
    private final ClusterPublisherManagerImpl.SubscriberHandler<I, R> parent;
    private final SimplePlainQueue<R> queue;
    private final Supplier<Map.Entry<Address, IntSet>> supplier;
    private final int batchSize;
    private final Map<Address, Set<K>> excludedKeys;
    private final int topologyId;
    private AtomicReference<Subscriber<? super R>> subscriber = new AtomicReference();
    private final AtomicInteger requestors = new AtomicInteger();
    private volatile Map.Entry<Address, IntSet> currentTarget;
    private volatile boolean cancelled;
    private volatile boolean alreadyCreated;

    InnerPublisherSubscription(ClusterPublisherManagerImpl.SubscriberHandler<I, R> parent, int batchSize, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> excludedKeys, int topologyId) {
        this.parent = parent;
        this.queue = InnerPublisherSubscription.queueToUse(parent.publisher.shouldTrackKeys, batchSize);
        this.supplier = supplier;
        this.batchSize = batchSize;
        this.excludedKeys = excludedKeys;
        this.topologyId = topologyId;
    }

    InnerPublisherSubscription(ClusterPublisherManagerImpl.SubscriberHandler<I, R> parent, int batchSize, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> excludedKeys, int topologyId, Map.Entry<Address, IntSet> specificTarget) {
        this(parent, batchSize, supplier, excludedKeys, topologyId);
        this.currentTarget = Objects.requireNonNull(specificTarget);
    }

    private static <R> SimplePlainQueue<R> queueToUse(boolean infiniteSize, int batchSize) {
        return infiniteSize ? new SpscLinkedArrayQueue(batchSize) : new SpscArrayQueue(batchSize);
    }

    public void subscribe(Subscriber<? super R> s) {
        if (this.subscriber.compareAndSet(null, s)) {
            if (trace) {
                log.tracef("Subscribed to %s via %s", this.parent.requestId, s);
            }
            s.onSubscribe((Subscription)this);
        } else {
            EmptySubscription.error((Throwable)new IllegalStateException("This processor allows only a single Subscriber"), s);
        }
    }

    public void request(long n) {
        if (SubscriptionHelper.validate((long)n)) {
            BackpressureHelper.add((AtomicLong)this, (long)n);
            if (this.requestors.getAndIncrement() == 0) {
                this.sendRequest(this.get());
            }
        }
    }

    public void cancel() {
        Map.Entry<Address, IntSet> target;
        this.cancelled = true;
        if (this.alreadyCreated && (target = this.currentTarget) != null) {
            this.parent.sendCancelCommand(target.getKey());
        }
    }

    @GuardedBy(value="requestors")
    private void sendRequest(long remaining) {
        CompletionStage<PublisherResponse> stage;
        assert (remaining > 0L);
        while (!this.queue.isEmpty()) {
            Object queuedValue;
            long produced;
            Subscriber<? super R> localSubscriber = this.subscriber.get();
            for (produced = 0L; produced < remaining && (queuedValue = this.queue.poll()) != null; ++produced) {
                localSubscriber.onNext(queuedValue);
            }
            remaining = BackpressureHelper.produced((AtomicLong)this, (long)produced);
            if (remaining != 0L || (remaining = this.continueWithRemaining(0L)) != 0L) continue;
            return;
        }
        assert (remaining > 0L);
        if (this.checkCancelled()) {
            return;
        }
        Map.Entry<Address, IntSet> target = this.currentTarget;
        if (target == null) {
            this.alreadyCreated = false;
            target = this.supplier.get();
            if (target == null) {
                if (trace) {
                    log.tracef("Completing subscription %s", this);
                }
                this.subscriber.get().onComplete();
                return;
            }
            this.currentTarget = target;
        }
        Address address = target.getKey();
        IntSet segments = target.getValue();
        if (this.alreadyCreated) {
            stage = this.parent.sendNextCommand(address, this.topologyId);
        } else {
            this.alreadyCreated = true;
            stage = this.parent.sendInitialCommand(address, segments, this.batchSize, this.excludedKeys.remove(address), this.topologyId);
        }
        stage.whenComplete((values, t) -> {
            if (t != null) {
                this.handleThrowableInResponse((Throwable)t, address, segments);
                return;
            }
            try {
                KeyPublisherResponse kpr;
                int extraSize;
                boolean complete;
                IntSet lostSegments;
                IntSet completedSegments;
                if (trace) {
                    log.tracef("Received %s for id %s from %s", values, this.parent.requestId, address);
                }
                if ((completedSegments = values.getCompletedSegments()) != null) {
                    if (trace) {
                        log.tracef("Completed segments %s for id %s from %s", completedSegments, this.parent.requestId, address);
                    }
                    completedSegments.forEach(this.parent::completeSegment);
                    completedSegments.forEach(arg_0 -> ((IntSet)segments).remove(arg_0));
                }
                if ((lostSegments = values.getLostSegments()) != null) {
                    if (trace) {
                        log.tracef("Lost segments %s for id %s from %s", completedSegments, this.parent.requestId, address);
                    }
                    lostSegments.forEach(arg_0 -> ((IntSet)segments).remove(arg_0));
                }
                if (complete = values.isComplete()) {
                    this.currentTarget = null;
                } else {
                    int segment = segments.iterator().nextInt();
                    values.forEachSegmentValue(this.parent, segment);
                }
                long requested = this.get();
                assert (requested > 0L);
                long produced = 0L;
                Object lastValue = null;
                Subscriber<? super R> localSubscriber = this.subscriber.get();
                Object[] valueArray = values.getResults();
                if (values instanceof KeyPublisherResponse && (extraSize = (kpr = (KeyPublisherResponse)values).getExtraSize()) > 0) {
                    int arrayLength = valueArray.length;
                    Object[] newArray = new Object[arrayLength + extraSize];
                    System.arraycopy(valueArray, 0, newArray, 0, arrayLength);
                    System.arraycopy(kpr.getExtraObjects(), 0, newArray, arrayLength, extraSize);
                    valueArray = newArray;
                }
                for (Object value : valueArray) {
                    if (value == null) break;
                    if (this.checkCancelled()) {
                        return;
                    }
                    if (produced >= requested) {
                        if (!this.queue.offer(value)) {
                            throw new MissingBackpressureException("Inner queue full?!");
                        }
                    } else {
                        localSubscriber.onNext(value);
                        ++produced;
                    }
                    lastValue = value;
                }
                if (completedSegments != null) {
                    this.parent.notifySegmentsComplete(completedSegments, lastValue);
                }
                this.trySendRequest(produced);
            }
            catch (Throwable innerT) {
                this.handleThrowableInResponse(innerT, address, segments);
            }
        });
    }

    private boolean checkCancelled() {
        if (this.cancelled) {
            if (trace) {
                log.tracef("Subscription %s was cancelled, terminating early", this);
            }
            return true;
        }
        return false;
    }

    @GuardedBy(value="requestors")
    private void handleThrowableInResponse(Throwable t, Address address, IntSet segments) {
        if (this.parent.handleThrowable(t, address, segments)) {
            this.currentTarget = null;
            this.trySendRequest(0L);
        }
    }

    @GuardedBy(value="requestors")
    private void trySendRequest(long produced) {
        long innerRemaining = this.continueWithRemaining(produced);
        if (innerRemaining > 0L) {
            this.sendRequest(innerRemaining);
        }
    }

    @GuardedBy(value="requestors")
    private long continueWithRemaining(long produced) {
        long remaining = BackpressureHelper.produced((AtomicLong)this, (long)produced);
        if (remaining > 0L) {
            return remaining;
        }
        do {
            if (this.requestors.decrementAndGet() != 0) continue;
            return 0L;
        } while ((remaining = this.get()) == 0L);
        return remaining;
    }

    @Override
    public String toString() {
        return "InnerPublisher-" + System.identityHashCode(this) + "{requestId=" + this.parent.requestId + ", topologyId=" + this.topologyId + '}';
    }
}

