package io.vertx.cassandra.impl;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import io.vertx.cassandra.CassandraClient;
import io.vertx.cassandra.CassandraClientOptions;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collector;

/* loaded from: input_file:io/vertx/cassandra/impl/CassandraClientImpl.class */
public class CassandraClientImpl implements CassandraClient {
    static final String HOLDERS_LOCAL_MAP_NAME = "__vertx.cassandraClient.sessionHolders";
    final VertxInternal vertx;
    private final String clientName;
    private final CassandraClientOptions options;
    private final Map<String, SessionHolder> holders;
    private boolean closed;

    public CassandraClientImpl(Vertx vertx, String str, CassandraClientOptions cassandraClientOptions) {
        Objects.requireNonNull(vertx, "vertx");
        Objects.requireNonNull(str, "clientName");
        Objects.requireNonNull(cassandraClientOptions, "options");
        this.vertx = (VertxInternal) vertx;
        this.clientName = str;
        this.options = cassandraClientOptions;
        this.holders = vertx.sharedData().getLocalMap(HOLDERS_LOCAL_MAP_NAME);
        this.holders.compute(str, (str2, sessionHolder) -> {
            return sessionHolder == null ? new SessionHolder() : sessionHolder.increment();
        });
        Context currentContext = Vertx.currentContext();
        if (currentContext == null || currentContext.owner() != vertx) {
            return;
        }
        currentContext.addCloseHook(this::close);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public synchronized boolean isConnected() {
        Session session;
        return (this.closed || (session = this.holders.get(this.clientName).session) == null || session.isClosed()) ? false : true;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient executeWithFullFetch(String str, Handler<AsyncResult<List<Row>>> handler) {
        return executeWithFullFetch((Statement) new SimpleStatement(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<List<Row>> executeWithFullFetch(String str) {
        Promise promise = Promise.promise();
        executeWithFullFetch(str, (Handler<AsyncResult<List<Row>>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient executeWithFullFetch(Statement statement, Handler<AsyncResult<List<Row>>> handler) {
        execute(statement, asyncResult -> {
            if (asyncResult.succeeded()) {
                ((ResultSet) asyncResult.result()).all(handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<List<Row>> executeWithFullFetch(Statement statement) {
        Promise promise = Promise.promise();
        executeWithFullFetch(statement, (Handler<AsyncResult<List<Row>>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient execute(String str, Handler<AsyncResult<ResultSet>> handler) {
        return execute((Statement) new SimpleStatement(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<ResultSet> execute(String str) {
        Promise promise = Promise.promise();
        execute(str, (Handler<AsyncResult<ResultSet>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> CassandraClient execute(String str, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> handler) {
        return execute((Statement) new SimpleStatement(str), (Collector) collector, (Handler) handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> Future<R> execute(String str, Collector<Row, ?, R> collector) {
        Promise promise = Promise.promise();
        execute(str, (Collector) collector, (Handler) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient execute(Statement statement, Handler<AsyncResult<ResultSet>> handler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        getSession(orCreateContext, asyncResult -> {
            if (asyncResult.succeeded()) {
                Util.handleOnContext(((Session) asyncResult.result()).executeAsync(statement), orCreateContext, resultSet -> {
                    return new ResultSetImpl(resultSet, this.vertx);
                }, handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<ResultSet> execute(Statement statement) {
        Promise promise = Promise.promise();
        execute(statement, (Handler<AsyncResult<ResultSet>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> CassandraClient execute(Statement statement, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> handler) {
        executeAndCollect(statement, collector, handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> Future<R> execute(Statement statement, Collector<Row, ?, R> collector) {
        Promise promise = Promise.promise();
        execute(statement, (Collector) collector, (Handler) promise);
        return promise.future();
    }

    private <C, R> void executeAndCollect(Statement statement, Collector<Row, C, R> collector, Handler<AsyncResult<R>> handler) {
        Promise promise = Promise.promise();
        queryStream(statement, (Handler<AsyncResult<CassandraRowStream>>) promise);
        C c = collector.supplier().get();
        BiConsumer<C, Row> accumulator = collector.accumulator();
        Function<C, R> finisher = collector.finisher();
        promise.future().compose(cassandraRowStream -> {
            Promise promise2 = Promise.promise();
            cassandraRowStream.endHandler(r6 -> {
                promise2.complete(finisher.apply(c));
            });
            cassandraRowStream.handler(row -> {
                accumulator.accept(c, row);
            });
            promise2.getClass();
            cassandraRowStream.exceptionHandler(promise2::fail);
            return promise2.future();
        }).setHandler(handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient prepare(String str, Handler<AsyncResult<PreparedStatement>> handler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        getSession(orCreateContext, asyncResult -> {
            if (asyncResult.succeeded()) {
                Util.handleOnContext(((Session) asyncResult.result()).prepareAsync(str), orCreateContext, handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<PreparedStatement> prepare(String str) {
        Promise promise = Promise.promise();
        prepare(str, promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient queryStream(String str, Handler<AsyncResult<CassandraRowStream>> handler) {
        return queryStream((Statement) new SimpleStatement(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<CassandraRowStream> queryStream(String str) {
        Promise promise = Promise.promise();
        queryStream(str, (Handler<AsyncResult<CassandraRowStream>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient queryStream(Statement statement, Handler<AsyncResult<CassandraRowStream>> handler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        getSession(orCreateContext, asyncResult -> {
            if (asyncResult.succeeded()) {
                Util.handleOnContext(((Session) asyncResult.result()).executeAsync(statement), orCreateContext, resultSet -> {
                    return new CassandraRowStreamImpl(orCreateContext, new ResultSetImpl(resultSet, this.vertx));
                }, handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<CassandraRowStream> queryStream(Statement statement) {
        Promise promise = Promise.promise();
        queryStream(statement, (Handler<AsyncResult<CassandraRowStream>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<Void> close() {
        Promise promise = Promise.promise();
        close(promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient close(Handler<AsyncResult<Void>> handler) {
        if (raiseCloseFlag()) {
            while (true) {
                SessionHolder sessionHolder = this.holders.get(this.clientName);
                SessionHolder decrement = sessionHolder.decrement();
                if (decrement.refCount == 0) {
                    if (this.holders.remove(this.clientName, sessionHolder)) {
                        if (sessionHolder.session != null) {
                            Util.handleOnContext(sessionHolder.session.closeAsync(), this.vertx.getOrCreateContext(), handler);
                            return this;
                        }
                    }
                } else if (this.holders.replace(this.clientName, sessionHolder, decrement)) {
                    break;
                }
            }
        }
        if (handler != null) {
            handler.handle(Future.succeededFuture());
        }
        return this;
    }

    private synchronized boolean raiseCloseFlag() {
        if (this.closed) {
            return false;
        }
        this.closed = true;
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void getSession(ContextInternal contextInternal, Handler<AsyncResult<Session>> handler) {
        if (this.closed) {
            handler.handle(Future.failedFuture("Client is closed"));
            return;
        }
        SessionHolder sessionHolder = this.holders.get(this.clientName);
        if (sessionHolder.session != null) {
            handler.handle(Future.succeededFuture(sessionHolder.session));
        } else {
            contextInternal.executeBlocking(promise -> {
                connect(promise);
            }, sessionHolder.connectionQueue, handler);
        }
    }

    private void connect(Promise<Session> promise) {
        SessionHolder sessionHolder = this.holders.get(this.clientName);
        if (sessionHolder == null) {
            promise.fail("Client closed while connecting");
            return;
        }
        if (sessionHolder.session != null) {
            promise.complete(sessionHolder.session);
            return;
        }
        Cluster.Builder dataStaxClusterBuilder = this.options.dataStaxClusterBuilder();
        if (dataStaxClusterBuilder.getContactPoints().isEmpty()) {
            dataStaxClusterBuilder.addContactPoint(CassandraClientOptions.DEFAULT_HOST);
        }
        Session connect = dataStaxClusterBuilder.build().connect(this.options.getKeyspace());
        SessionHolder compute = this.holders.compute(this.clientName, (str, sessionHolder2) -> {
            if (sessionHolder2 == null) {
                return null;
            }
            return sessionHolder2.connected(connect);
        });
        if (compute != null) {
            promise.complete(compute.session);
        } else {
            try {
                connect.close();
            } catch (Exception e) {
            }
            promise.fail("Client closed while connecting");
        }
    }
}
