/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.ManagedSubscription;

public class FlattenPublisher<T>
implements TransformablePublisher<T> {
    private final Publisher<? extends Publisher<T>> publisher;
    private final Action<? super T> disposer;

    public FlattenPublisher(Publisher<? extends Publisher<T>> publisher, Action<? super T> disposer) {
        this.publisher = publisher;
        this.disposer = disposer;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        subscriber.onSubscribe((Subscription)new ManagedSubscription<T>(subscriber, this.disposer){
            private Subscription outerSubscription;
            private Subscription innerSubscription;
            private final AtomicReference<State> state;
            volatile boolean pendingComplete;
            {
                this.state = new AtomicReference<State>(State.INIT);
            }

            @Override
            protected void onRequest(long n) {
                if (this.state.compareAndSet(State.INIT, State.SUBSCRIBE)) {
                    if (this.outerSubscription == null) {
                        this.subscribeUpstream();
                    }
                } else if (this.innerSubscription != null) {
                    this.innerSubscription.request(n);
                } else {
                    this.nextPublisher();
                }
            }

            private void subscribeUpstream() {
                FlattenPublisher.this.publisher.subscribe(new Subscriber<Publisher<T>>(){

                    public void onSubscribe(Subscription subscription) {
                        outerSubscription = subscription;
                        outerSubscription.request(1L);
                    }

                    public void onNext(Publisher<T> next) {
                        next.subscribe(new Subscriber<T>(){

                            public void onSubscribe(Subscription s) {
                                innerSubscription = s;
                                state.set(State.EMITTING);
                                innerSubscription.request(this.getDemand());
                            }

                            public void onNext(T t) {
                                this.emitNext(t);
                            }

                            public void onError(Throwable t) {
                                outerSubscription.cancel();
                                this.emitError(t);
                            }

                            public void onComplete() {
                                innerSubscription = null;
                                state.set(State.IDLE);
                                this.nextPublisher();
                            }
                        });
                    }

                    public void onError(Throwable t) {
                        if (innerSubscription != null) {
                            innerSubscription.cancel();
                            innerSubscription = null;
                        }
                        this.emitError(t);
                    }

                    public void onComplete() {
                        pendingComplete = true;
                        this.nextPublisher();
                    }
                });
            }

            @Override
            protected void onCancel() {
                if (this.innerSubscription != null) {
                    this.innerSubscription.cancel();
                    this.innerSubscription = null;
                }
                if (this.outerSubscription != null) {
                    this.outerSubscription.cancel();
                    this.outerSubscription = null;
                }
            }

            private void nextPublisher() {
                if (this.state.compareAndSet(State.IDLE, State.PENDING)) {
                    if (this.pendingComplete) {
                        this.emitComplete();
                    } else if (this.hasDemand()) {
                        this.outerSubscription.request(1L);
                    } else {
                        this.state.set(State.IDLE);
                        if (this.hasDemand()) {
                            this.nextPublisher();
                        }
                    }
                } else if (this.state.get() == State.PENDING && this.pendingComplete) {
                    this.emitComplete();
                }
            }
        });
    }

    static enum State {
        INIT,
        SUBSCRIBE,
        IDLE,
        PENDING,
        EMITTING;

    }
}

