/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.cassandra.impl;

import com.datastax.driver.core.Row;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.streams.impl.InboundBuffer;

public class CassandraRowStreamImpl
implements CassandraRowStream {
    private final Context context;
    private final ResultSet resultSet;
    private final InboundBuffer<Row> internalQueue;
    private State state;
    private int inFlight;
    private Handler<Row> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;

    public CassandraRowStreamImpl(Context context, ResultSet resultSet) {
        this.context = context;
        this.resultSet = resultSet;
        this.internalQueue = new InboundBuffer(context).exceptionHandler(this::handleException).drainHandler(v -> this.fetchRow());
        this.state = State.IDLE;
    }

    @Override
    public synchronized CassandraRowStream exceptionHandler(Handler<Throwable> handler) {
        if (this.state != State.STOPPED) {
            this.exceptionHandler = handler;
        }
        return this;
    }

    @Override
    public synchronized CassandraRowStream handler(Handler<Row> handler) {
        if (this.state == State.STOPPED) {
            return this;
        }
        if (handler == null) {
            this.stop();
            if (this.context != Vertx.currentContext()) {
                this.context.runOnContext(v -> this.handleEnd());
            } else {
                this.handleEnd();
            }
        } else {
            this.handler = handler;
            this.internalQueue.handler(this::handleRow);
            if (this.state == State.IDLE) {
                this.state = State.STARTED;
                if (this.context != Vertx.currentContext()) {
                    this.context.runOnContext(v -> this.fetchRow());
                } else {
                    this.fetchRow();
                }
            }
        }
        return this;
    }

    @Override
    public synchronized CassandraRowStream pause() {
        if (this.state != State.STOPPED) {
            this.internalQueue.pause();
        }
        return this;
    }

    @Override
    public synchronized CassandraRowStream resume() {
        if (this.state != State.STOPPED) {
            this.internalQueue.resume();
        }
        return this;
    }

    @Override
    public synchronized CassandraRowStream endHandler(Handler<Void> handler) {
        if (this.state != State.STOPPED) {
            this.endHandler = handler;
        }
        return this;
    }

    @Override
    public synchronized CassandraRowStream fetch(long l) {
        if (this.state != State.STOPPED) {
            this.internalQueue.fetch(l);
        }
        return this;
    }

    private synchronized void fetchRow() {
        if (this.state == State.STOPPED) {
            return;
        }
        this.resultSet.one((Handler<AsyncResult<Row>>)((Handler)ar -> {
            if (ar.succeeded()) {
                this.handleFetched((Row)ar.result());
            } else {
                this.handleException(ar.cause());
            }
        }));
    }

    private synchronized void handleFetched(Row row) {
        if (this.state == State.STOPPED) {
            return;
        }
        if (row != null) {
            ++this.inFlight;
            if (this.internalQueue.write((Object)row)) {
                this.fetchRow();
            }
        } else {
            this.state = State.EXHAUSTED;
            if (this.inFlight == 0) {
                this.stop();
                this.handleEnd();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRow(Row row) {
        CassandraRowStreamImpl cassandraRowStreamImpl = this;
        synchronized (cassandraRowStreamImpl) {
            if (this.state == State.STOPPED) {
                return;
            }
            --this.inFlight;
        }
        this.handler.handle((Object)row);
        cassandraRowStreamImpl = this;
        synchronized (cassandraRowStreamImpl) {
            if (this.state == State.EXHAUSTED && this.inFlight == 0) {
                this.stop();
                this.handleEnd();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleException(Throwable cause) {
        Handler<Throwable> h;
        CassandraRowStreamImpl cassandraRowStreamImpl = this;
        synchronized (cassandraRowStreamImpl) {
            if (this.state != State.STOPPED) {
                this.stop();
                h = this.exceptionHandler;
            } else {
                h = null;
            }
        }
        if (h != null) {
            h.handle((Object)cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void handleEnd() {
        Handler<Void> h;
        CassandraRowStreamImpl cassandraRowStreamImpl = this;
        synchronized (cassandraRowStreamImpl) {
            h = this.endHandler;
        }
        if (h != null) {
            h.handle(null);
        }
    }

    private synchronized void stop() {
        this.state = State.STOPPED;
        this.internalQueue.handler(null).drainHandler(null);
    }

    private static enum State {
        IDLE,
        STARTED,
        EXHAUSTED,
        STOPPED;

    }
}

