package io.vertx.cassandra.impl;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import io.vertx.cassandra.CassandraClient;
import io.vertx.cassandra.CassandraClientOptions;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collector;

/* loaded from: input_file:io/vertx/cassandra/impl/CassandraClientImpl.class */
public class CassandraClientImpl implements CassandraClient {
    private static final String HOLDERS_LOCAL_MAP_NAME = "__vertx.cassandraClient.sessionHolders";
    final VertxInternal vertx;
    private final String clientName;
    private final CassandraClientOptions options;
    private final Map<String, SessionHolder> holders;
    private final TaskQueue connectionQueue;
    private Session cachedSession;
    private boolean closed;

    public CassandraClientImpl(Vertx vertx, String str, CassandraClientOptions cassandraClientOptions) {
        Objects.requireNonNull(vertx, "vertx");
        Objects.requireNonNull(str, "clientName");
        Objects.requireNonNull(cassandraClientOptions, "options");
        this.vertx = (VertxInternal) vertx;
        this.clientName = str;
        this.options = cassandraClientOptions;
        this.holders = vertx.sharedData().getLocalMap(HOLDERS_LOCAL_MAP_NAME);
        this.connectionQueue = this.holders.compute(str, (str2, sessionHolder) -> {
            return sessionHolder == null ? new SessionHolder() : sessionHolder.increment();
        }).connectionQueue;
        Context currentContext = Vertx.currentContext();
        if (currentContext == null || currentContext.owner() != vertx) {
            return;
        }
        currentContext.addCloseHook(this::close);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public synchronized boolean isConnected() {
        if (this.closed) {
            return false;
        }
        Session session = this.cachedSession != null ? this.cachedSession : this.holders.get(this.clientName).session;
        return (session == null || session.isClosed()) ? false : true;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient executeWithFullFetch(String str, Handler<AsyncResult<List<Row>>> handler) {
        return executeWithFullFetch((Statement) new SimpleStatement(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient executeWithFullFetch(Statement statement, Handler<AsyncResult<List<Row>>> handler) {
        execute(statement, asyncResult -> {
            if (asyncResult.succeeded()) {
                ((ResultSet) asyncResult.result()).all(handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient execute(String str, Handler<AsyncResult<ResultSet>> handler) {
        return execute((Statement) new SimpleStatement(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> CassandraClient execute(String str, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> handler) {
        return execute((Statement) new SimpleStatement(str), (Collector) collector, (Handler) handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient execute(Statement statement, Handler<AsyncResult<ResultSet>> handler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        getSession(orCreateContext, asyncResult -> {
            if (asyncResult.succeeded()) {
                Util.handleOnContext(((Session) asyncResult.result()).executeAsync(statement), orCreateContext, resultSet -> {
                    return new ResultSetImpl(resultSet, this.vertx);
                }, handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> CassandraClient execute(Statement statement, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> handler) {
        executeAndCollect(statement, collector, handler);
        return this;
    }

    private <C, R> void executeAndCollect(Statement statement, Collector<Row, C, R> collector, Handler<AsyncResult<R>> handler) {
        Promise promise = Promise.promise();
        queryStream(statement, (Handler<AsyncResult<CassandraRowStream>>) promise);
        C c = collector.supplier().get();
        BiConsumer<C, Row> accumulator = collector.accumulator();
        Function<C, R> finisher = collector.finisher();
        promise.future().compose(cassandraRowStream -> {
            Promise promise2 = Promise.promise();
            cassandraRowStream.endHandler(r6 -> {
                promise2.complete(finisher.apply(c));
            });
            cassandraRowStream.handler(row -> {
                accumulator.accept(c, row);
            });
            promise2.getClass();
            cassandraRowStream.exceptionHandler(promise2::fail);
            return promise2.future();
        }).setHandler(handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient prepare(String str, Handler<AsyncResult<PreparedStatement>> handler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        getSession(orCreateContext, asyncResult -> {
            if (asyncResult.succeeded()) {
                Util.handleOnContext(((Session) asyncResult.result()).prepareAsync(str), orCreateContext, handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient queryStream(String str, Handler<AsyncResult<CassandraRowStream>> handler) {
        return queryStream((Statement) new SimpleStatement(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient queryStream(Statement statement, Handler<AsyncResult<CassandraRowStream>> handler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        getSession(orCreateContext, asyncResult -> {
            if (asyncResult.succeeded()) {
                Util.handleOnContext(((Session) asyncResult.result()).executeAsync(statement), orCreateContext, resultSet -> {
                    return new CassandraRowStreamImpl(orCreateContext, new ResultSetImpl(resultSet, this.vertx));
                }, handler);
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient close() {
        return close(null);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public synchronized CassandraClient close(Handler<AsyncResult<Void>> handler) {
        if (!this.closed) {
            this.closed = true;
            SessionHolder compute = this.holders.compute(this.clientName, (str, sessionHolder) -> {
                return sessionHolder.decrement();
            });
            if (compute.refCount < 1 && compute.session != null) {
                Util.handleOnContext(compute.session.closeAsync(), this.vertx.getOrCreateContext(), handler);
            } else if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        } else if (handler != null) {
            handler.handle(Future.succeededFuture());
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void getSession(ContextInternal contextInternal, Handler<AsyncResult<Session>> handler) {
        if (this.closed) {
            handler.handle(Future.failedFuture("Client is closed"));
        } else if (this.cachedSession != null) {
            handler.handle(Future.succeededFuture(this.cachedSession));
        } else {
            contextInternal.executeBlocking(promise -> {
                connect(promise, this.holders, this.clientName, this.options);
            }, this.connectionQueue, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                    return;
                }
                Session session = (Session) asyncResult.result();
                synchronized (this) {
                    this.cachedSession = session;
                }
                handler.handle(Future.succeededFuture(session));
            });
        }
    }

    private static void connect(Promise<Session> promise, Map<String, SessionHolder> map, String str, CassandraClientOptions cassandraClientOptions) {
        SessionHolder sessionHolder = map.get(str);
        if (sessionHolder.session != null) {
            promise.complete(sessionHolder.session);
            return;
        }
        Cluster.Builder dataStaxClusterBuilder = cassandraClientOptions.dataStaxClusterBuilder();
        if (dataStaxClusterBuilder.getContactPoints().isEmpty()) {
            dataStaxClusterBuilder.addContactPoint(CassandraClientOptions.DEFAULT_HOST);
        }
        Session connect = dataStaxClusterBuilder.build().connect(cassandraClientOptions.getKeyspace());
        SessionHolder compute = map.compute(str, (str2, sessionHolder2) -> {
            if (sessionHolder2 == null) {
                return null;
            }
            return sessionHolder2.connected(connect);
        });
        if (compute != null) {
            promise.complete(compute.session);
        } else {
            try {
                connect.close();
            } catch (Exception e) {
            }
            promise.fail("Client closed while connecting");
        }
    }
}
