/*
 * Decompiled with CFR 0.152.
 */
package org.mariadb.r2dbc.client;

import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.mariadb.r2dbc.MariadbConnectionConfiguration;
import org.mariadb.r2dbc.client.Client;
import org.mariadb.r2dbc.client.ClientBase;
import org.mariadb.r2dbc.client.CmdElement;
import org.mariadb.r2dbc.client.DecoderState;
import org.mariadb.r2dbc.message.client.ClientMessage;
import org.mariadb.r2dbc.message.client.ExecutePacket;
import org.mariadb.r2dbc.message.client.PreparePacket;
import org.mariadb.r2dbc.message.client.QueryPacket;
import org.mariadb.r2dbc.message.server.ServerMessage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

public final class ClientImpl
extends ClientBase {
    private static final Logger logger = Loggers.getLogger(ClientImpl.class);
    protected final Queue<ClientMessage> sendingQueue = (Queue)Queues.unbounded().get();

    public ClientImpl(Connection connection, MariadbConnectionConfiguration configuration) {
        super(connection, configuration);
    }

    public static Mono<Client> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, MariadbConnectionConfiguration configuration) {
        TcpClient tcpClient = TcpClient.create((ConnectionProvider)connectionProvider).remoteAddress(() -> socketAddress);
        tcpClient = ClientImpl.setSocketOption(configuration, tcpClient);
        return tcpClient.connect().flatMap(it -> Mono.just((Object)new ClientImpl((Connection)it, configuration)));
    }

    @Override
    public void sendCommandWithoutResult(ClientMessage message) {
        try {
            this.lock.lock();
            if (this.responseReceivers.isEmpty()) {
                this.connection.channel().writeAndFlush((Object)message);
            } else {
                this.sendingQueue.add(message);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Flux<ServerMessage> sendCommand(PreparePacket preparePacket, ExecutePacket executePacket) {
        return Flux.error((Throwable)new R2dbcNonTransientResourceException("Cannot pipeline"));
    }

    @Override
    public Flux<ServerMessage> sendCommand(ClientMessage message, DecoderState initialState, String sql) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return Flux.create(sink -> {
            if (!this.isConnected()) {
                sink.error((Throwable)new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            if (atomicBoolean.compareAndSet(false, true)) {
                try {
                    this.lock.lock();
                    if (this.responseReceivers.isEmpty()) {
                        this.responseReceivers.add(new CmdElement((FluxSink<ServerMessage>)sink, initialState, sql));
                        this.connection.channel().writeAndFlush((Object)message);
                    } else {
                        this.responseReceivers.add(new CmdElement((FluxSink<ServerMessage>)sink, initialState, sql));
                        this.sendingQueue.add(message);
                    }
                }
                finally {
                    this.lock.unlock();
                }
            }
        });
    }

    @Override
    protected void begin(FluxSink<ServerMessage> sink) {
        if (this.responseReceivers.isEmpty()) {
            if ((this.context.getServerStatus() & 1) == 0) {
                this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, "BEGIN"));
                this.connection.channel().writeAndFlush((Object)new QueryPacket("BEGIN"));
            } else {
                logger.debug("Skipping begin transaction because already in transaction");
                sink.complete();
            }
        } else {
            this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, "BEGIN"));
            this.sendingQueue.add(new QueryPacket("BEGIN"));
        }
    }

    @Override
    protected void executeAutoCommit(FluxSink<ServerMessage> sink, boolean autoCommit) {
        String cmd = "SET autocommit=" + (autoCommit ? (char)'1' : '0');
        if (this.responseReceivers.isEmpty()) {
            if (autoCommit != this.isAutoCommit()) {
                this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, cmd));
                this.connection.channel().writeAndFlush((Object)new QueryPacket(cmd));
            } else {
                logger.debug("Skipping autocommit since already in that state");
                sink.complete();
            }
        } else {
            this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, cmd));
            this.sendingQueue.add(new QueryPacket(cmd));
        }
    }

    @Override
    protected void executeWhenTransaction(FluxSink<ServerMessage> sink, String cmd) {
        if (this.responseReceivers.isEmpty()) {
            if ((this.context.getServerStatus() & 1) > 0) {
                this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, cmd));
                this.connection.channel().writeAndFlush((Object)new QueryPacket(cmd));
            } else {
                logger.debug(String.format("Skipping '%s' because no active transaction", cmd));
                sink.complete();
            }
        } else {
            this.responseReceivers.add(new CmdElement(sink, DecoderState.QUERY_RESPONSE, cmd));
            this.sendingQueue.add(new QueryPacket(cmd));
        }
    }

    @Override
    public void sendNext() {
        this.lock.lock();
        try {
            ClientMessage next = this.sendingQueue.poll();
            if (next != null) {
                this.connection.channel().writeAndFlush((Object)next);
            }
        }
        finally {
            this.lock.unlock();
        }
    }
}

