/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.reactivestreams.impl;

import io.vertx.core.Handler;
import io.vertx.ext.reactivestreams.ReactiveReadStream;
import java.util.ArrayDeque;
import java.util.Queue;
import org.reactivestreams.Subscription;

public class ReactiveReadStreamImpl<T>
implements ReactiveReadStream<T> {
    private final long batchSize;
    private Handler<T> dataHandler;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private Subscription subscription;
    private final Queue<T> pending = new ArrayDeque<T>();
    private long demand = Long.MAX_VALUE;
    private long tokens;

    public ReactiveReadStreamImpl(long batchSize) {
        this.batchSize = batchSize;
    }

    @Override
    public synchronized ReactiveReadStream<T> handler(Handler<T> handler) {
        this.dataHandler = handler;
        if (this.dataHandler != null && this.demand > 0L) {
            this.checkRequestTokens();
        }
        return this;
    }

    @Override
    public synchronized ReactiveReadStream<T> pause() {
        this.demand = 0L;
        return this;
    }

    public ReactiveReadStream<T> fetch(long amount) {
        if (amount > 0L) {
            T data;
            this.demand += amount;
            if (this.demand < 0L) {
                this.demand = Long.MAX_VALUE;
            }
            while (this.demand > 0L && (data = this.pending.poll()) != null) {
                if (this.demand != Long.MAX_VALUE) {
                    --this.demand;
                }
                this.handleData(data);
            }
            this.checkRequestTokens();
        }
        return this;
    }

    @Override
    public synchronized ReactiveReadStream<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized ReactiveReadStream<T> endHandler(Handler<Void> endHandler) {
        this.endHandler = endHandler;
        return this;
    }

    @Override
    public synchronized ReactiveReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public synchronized void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("subscription");
        }
        if (this.subscription != null) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
        }
    }

    public synchronized void onNext(T data) {
        if (data == null) {
            throw new NullPointerException("data");
        }
        this.checkUnsolicitedTokens();
        if (this.demand > 0L) {
            if (this.demand != Long.MAX_VALUE) {
                --this.demand;
            }
            if (this.pending.size() > 0) {
                this.pending.add(data);
                data = this.pending.poll();
            }
            this.handleData(data);
        } else {
            this.pending.add(data);
        }
    }

    public synchronized void onError(Throwable throwable) {
        if (throwable == null) {
            throw new NullPointerException("throwable");
        }
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle((Object)throwable);
        }
    }

    public synchronized void onComplete() {
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
    }

    protected void checkUnsolicitedTokens() {
        if (this.tokens == 0L) {
            throw new IllegalStateException("Data received but wasn't requested");
        }
    }

    private synchronized void handleData(T data) {
        if (this.dataHandler != null) {
            this.dataHandler.handle(data);
            --this.tokens;
            this.checkRequestTokens();
        }
    }

    private void checkRequestTokens() {
        if (this.demand > 0L && this.subscription != null && this.tokens == 0L) {
            this.tokens = this.batchSize;
            this.subscription.request(this.batchSize);
        }
    }
}

