package io.vertx.cassandra.impl;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;
import java.util.Iterator;

/* loaded from: input_file:io/vertx/cassandra/impl/CassandraRowStreamImpl.class */
public class CassandraRowStreamImpl implements CassandraRowStream {
    private final ResultSet datastaxResultSet;
    private final Iterator<Row> resultSetIterator;
    private final InboundBuffer<Row> internalQueue;
    private final Context context;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;

    public CassandraRowStreamImpl(ResultSet resultSet, Context context) {
        this.datastaxResultSet = resultSet;
        this.resultSetIterator = resultSet.iterator();
        this.internalQueue = new InboundBuffer(context).drainHandler(r3 -> {
            fire();
        });
        this.context = context;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public CassandraRowStream exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        this.internalQueue.exceptionHandler(handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public CassandraRowStream handler(Handler<Row> handler) {
        this.internalQueue.handler(handler);
        fire();
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: pause */
    public CassandraRowStream mo3pause() {
        this.internalQueue.pause();
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: resume */
    public CassandraRowStream mo2resume() {
        this.internalQueue.resume();
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public synchronized CassandraRowStream endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        tryToTriggerEndOfTheStream();
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: fetch */
    public synchronized CassandraRowStream mo1fetch(long j) {
        this.internalQueue.fetch(j);
        return this;
    }

    private synchronized void fire() {
        int availableWithoutFetching = this.datastaxResultSet.getAvailableWithoutFetching();
        boolean isFullyFetched = this.datastaxResultSet.isFullyFetched();
        if (availableWithoutFetching == 0) {
            if (isFullyFetched) {
                tryToTriggerEndOfTheStream();
                return;
            } else {
                fetchAndCallOneMoreTime();
                return;
            }
        }
        for (int i = 0; i < availableWithoutFetching && this.internalQueue.write(this.resultSetIterator.next()); i++) {
        }
        if (this.internalQueue.isWritable()) {
            fetchAndCallOneMoreTime();
        }
    }

    private void fetchAndCallOneMoreTime() {
        if (this.datastaxResultSet.isFullyFetched()) {
            fire();
        } else {
            Util.handleOnContext(this.datastaxResultSet.fetchMoreResults(), this.context, asyncResult -> {
                if (asyncResult.succeeded()) {
                    fire();
                    return;
                }
                if (this.exceptionHandler != null) {
                    this.exceptionHandler.handle(asyncResult.cause());
                }
                if (this.endHandler != null) {
                    this.endHandler.handle((Object) null);
                }
            });
        }
    }

    private void tryToTriggerEndOfTheStream() {
        if (this.endHandler == null || !this.datastaxResultSet.isFullyFetched() || this.resultSetIterator.hasNext()) {
            return;
        }
        this.endHandler.handle((Object) null);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo0endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo4handler(Handler handler) {
        return handler((Handler<Row>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo6exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
