/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecSpec;
import ratpack.exec.Execution;
import ratpack.func.Action;
import ratpack.stream.internal.BufferedWriteStream;

public class ForkingSubscription<T>
implements Subscription {
    private final Action<? super ExecSpec> execConfig;
    private final Publisher<T> publisher;
    private final BufferedWriteStream<T> write;
    private volatile boolean started;
    private volatile Subscription upstream;
    private volatile boolean cancelled;

    public ForkingSubscription(Action<? super ExecSpec> execConfig, Publisher<T> publisher, BufferedWriteStream<T> write) {
        this.execConfig = execConfig;
        this.publisher = publisher;
        this.write = write;
    }

    public void request(long n) {
        if (!this.started) {
            this.started = true;
            try {
                this.execConfig.with(Execution.fork()).start(e -> this.publisher.subscribe(new Subscriber<T>(){

                    public void onSubscribe(Subscription s) {
                        ForkingSubscription.this.upstream = s;
                        if (ForkingSubscription.this.cancelled) {
                            ForkingSubscription.this.upstream.cancel();
                        } else {
                            s.request(Long.MAX_VALUE);
                        }
                    }

                    public void onNext(T t) {
                        ForkingSubscription.this.write.item(t);
                    }

                    public void onError(Throwable t) {
                        ForkingSubscription.this.write.error(t);
                    }

                    public void onComplete() {
                        ForkingSubscription.this.write.complete();
                    }
                }));
            }
            catch (Exception e2) {
                this.write.error(e2);
            }
        }
    }

    public void cancel() {
        this.cancelled = true;
        if (this.upstream != null) {
            this.upstream.cancel();
        }
    }
}

