/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.stream.TransformablePublisher;

public final class TakePublisher<T>
implements TransformablePublisher<T> {
    private final AtomicLong count;
    private final Publisher<T> upstreamPublisher;

    public TakePublisher(long count, Publisher<T> upstreamPublisher) {
        this.count = new AtomicLong(count);
        this.upstreamPublisher = upstreamPublisher;
    }

    public void subscribe(final Subscriber<? super T> downstreamSubscriber) {
        this.upstreamPublisher.subscribe(new Subscriber<T>(){
            private Subscription subscription;

            public void onSubscribe(Subscription upstreamPublisherSubscription) {
                this.subscription = upstreamPublisherSubscription;
                downstreamSubscriber.onSubscribe(upstreamPublisherSubscription);
            }

            public void onNext(T t) {
                long i = TakePublisher.this.count.decrementAndGet();
                if (i >= 0L) {
                    downstreamSubscriber.onNext(t);
                }
                if (i == 0L) {
                    this.subscription.cancel();
                    this.onComplete();
                }
            }

            public void onError(Throwable t) {
                downstreamSubscriber.onError(t);
            }

            public void onComplete() {
                downstreamSubscriber.onComplete();
            }
        });
    }
}

