/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.MultiError;
import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.SingleError;
import io.helidon.common.reactive.SubscriptionArbiter;
import io.helidon.common.reactive.SubscriptionHelper;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;

final class MultiRetry<T, U>
implements Multi<T> {
    private final Multi<T> source;
    private final BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenFunction;
    private static final Multi<Object> NOW = Multi.singleton(1);

    MultiRetry(Multi<T> source, BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenFunction) {
        this.source = source;
        this.whenFunction = whenFunction;
    }

    MultiRetry(Multi<T> source, long count) {
        this(source, MultiRetry.withCount(count));
    }

    MultiRetry(Multi<T> source, BiPredicate<? super Throwable, ? super Long> predicate) {
        this(source, MultiRetry.withPredicate(predicate));
    }

    static <U> BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> withCount(long count) {
        return (e, n) -> n < count ? NOW : Single.error(e);
    }

    static <U> BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> withPredicate(BiPredicate<? super Throwable, ? super Long> predicate) {
        return (e, n) -> predicate.test((Throwable)e, (Long)n) ? NOW : Single.error(e);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        RetrySubscriber parent = new RetrySubscriber(subscriber, this.whenFunction, this.source);
        subscriber.onSubscribe(parent);
        parent.retry();
    }

    static final class RetrySubscriber<T, U>
    extends SubscriptionArbiter
    implements Flow.Subscriber<T> {
        private final Flow.Subscriber<? super T> downstream;
        private final AtomicInteger wip;
        private final BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenFunction;
        private final AtomicReference<Flow.Subscription> whenSubscription;
        private final Multi<T> source;
        private long count;
        private long produced;
        private boolean hasSubscriber;

        RetrySubscriber(Flow.Subscriber<? super T> downstream, BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenFunction, Multi<T> source) {
            this.downstream = downstream;
            this.whenFunction = whenFunction;
            this.source = source;
            this.whenSubscription = new AtomicReference();
            this.wip = new AtomicInteger();
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (!this.hasSubscriber) {
                this.hasSubscriber = true;
                this.setSubscription(subscription);
            } else {
                subscription.cancel();
            }
        }

        @Override
        public void onNext(T item) {
            ++this.produced;
            this.downstream.onNext(item);
        }

        @Override
        public void onError(Throwable throwable) {
            Flow.Publisher<U> when;
            long p = this.produced;
            if (p != 0L) {
                this.produced = 0L;
                this.produced(p);
            }
            this.hasSubscriber = false;
            try {
                when = Objects.requireNonNull(this.whenFunction.apply(throwable, this.count++), "The whenFunction returned a null Flow.Publisher");
            }
            catch (Throwable ex) {
                if (ex != throwable) {
                    ex.addSuppressed(throwable);
                }
                this.error(ex);
                return;
            }
            if (when == Single.empty() || when == Multi.empty()) {
                this.complete();
            } else if (when == NOW) {
                this.retry();
            } else if (when instanceof SingleError) {
                this.error(((SingleError)when).getError());
            } else if (when instanceof MultiError) {
                this.error(((MultiError)when).getError());
            } else {
                when.subscribe(new WhenSubscriber(this));
            }
        }

        @Override
        public void onComplete() {
            this.downstream.onComplete();
        }

        @Override
        public void cancel() {
            super.cancel();
            SubscriptionHelper.cancel(this.whenSubscription);
        }

        @Override
        public void request(long n) {
            if (n <= 0L) {
                this.downstream.onError(new IllegalArgumentException("Rule \u00a73.9 violated: non-positive requests are forbidden"));
            } else {
                super.request(n);
            }
        }

        void setWhenSubscription(Flow.Subscription s) {
            SubscriptionHelper.replace(this.whenSubscription, s);
        }

        void retry() {
            if (this.wip.getAndIncrement() != 0) {
                return;
            }
            while (!this.isCanceled()) {
                this.source.subscribe(this);
                if (this.wip.decrementAndGet() != 0) continue;
            }
        }

        void complete() {
            this.downstream.onComplete();
        }

        void error(Throwable ex) {
            this.downstream.onError(ex);
        }

        static final class WhenSubscriber
        implements Flow.Subscriber<Object> {
            private final RetrySubscriber<?, ?> parent;
            private Flow.Subscription upstream;

            WhenSubscriber(RetrySubscriber<?, ?> parent) {
                this.parent = parent;
            }

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                SubscriptionHelper.validate(this.upstream, subscription);
                this.upstream = subscription;
                this.parent.setWhenSubscription(subscription);
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Object item) {
                this.upstream.cancel();
                this.upstream = SubscriptionHelper.CANCELED;
                this.parent.setWhenSubscription(null);
                this.parent.retry();
            }

            @Override
            public void onError(Throwable throwable) {
                Flow.Subscription s = this.upstream;
                if (s != SubscriptionHelper.CANCELED) {
                    this.upstream = SubscriptionHelper.CANCELED;
                    this.parent.error(throwable);
                }
            }

            @Override
            public void onComplete() {
                Flow.Subscription s = this.upstream;
                if (s != SubscriptionHelper.CANCELED) {
                    this.upstream = SubscriptionHelper.CANCELED;
                    this.parent.complete();
                }
            }
        }
    }
}

