/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.communication.tcp.internal;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridShmemCommunicationClient;
import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.tcp.AttributeNames;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationMetricsListener;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.ClusterStateProvider;
import org.apache.ignite.spi.communication.tcp.internal.CommunicationTcpUtils;
import org.apache.ignite.spi.communication.tcp.internal.CommunicationWorker;
import org.apache.ignite.spi.communication.tcp.internal.ConnectFuture;
import org.apache.ignite.spi.communication.tcp.internal.ConnectGateway;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
import org.apache.ignite.spi.communication.tcp.internal.DisconnectedSessionInfo;
import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConfiguration;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.jetbrains.annotations.Nullable;

public class InboundConnectionHandler
extends GridNioServerListenerAdapter<Message> {
    private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
    private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
    private final IgniteLogger log;
    private final TcpCommunicationConfiguration cfg;
    private final Function<UUID, ClusterNode> nodeGetter;
    private final Supplier<ClusterNode> locNodeSupplier;
    private final ClusterStateProvider stateProvider;
    private ConnectionClientPool clientPool;
    private final ConnectGateway connectGate;
    private final Supplier<FailureProcessor> failureProcessorSupplier;
    private final AttributeNames attributeNames;
    private final Supplier<TcpCommunicationMetricsListener> metricsLsnrSupplier;
    private final CountDownLatch ctxInitLatch;
    private final Supplier<Ignite> igniteExSupplier;
    private final CommunicationListener<Message> lsnr;
    private volatile GridNioServerWrapper nioSrvWrapper;
    private volatile CommunicationWorker commWorker;
    private final boolean client;
    private volatile boolean stopping = false;

    public InboundConnectionHandler(IgniteLogger log, TcpCommunicationConfiguration cfg, Function<UUID, ClusterNode> nodeGetter, Supplier<ClusterNode> locNodeSupplier, ClusterStateProvider stateProvider, ConnectionClientPool clientPool, CommunicationWorker commWorker, ConnectGateway connectGate, Supplier<FailureProcessor> failureProcessorSupplier, AttributeNames attributeNames, Supplier<TcpCommunicationMetricsListener> metricsLsnrSupplier, GridNioServerWrapper nioSrvWrapper, CountDownLatch ctxInitLatch, boolean client, Supplier<Ignite> igniteExSupplier, CommunicationListener<Message> lsnr) {
        this.log = log;
        this.cfg = cfg;
        this.nodeGetter = nodeGetter;
        this.locNodeSupplier = locNodeSupplier;
        this.stateProvider = stateProvider;
        this.clientPool = clientPool;
        this.commWorker = commWorker;
        this.connectGate = connectGate;
        this.failureProcessorSupplier = failureProcessorSupplier;
        this.attributeNames = attributeNames;
        this.metricsLsnrSupplier = metricsLsnrSupplier;
        this.nioSrvWrapper = nioSrvWrapper;
        this.ctxInitLatch = ctxInitLatch;
        this.client = client;
        this.igniteExSupplier = igniteExSupplier;
        this.lsnr = lsnr;
    }

    public void setNioSrvWrapper(GridNioServerWrapper nioSrvWrapper) {
        this.nioSrvWrapper = nioSrvWrapper;
    }

    public void setClientPool(ConnectionClientPool pool) {
        this.clientPool = pool;
    }

    @Override
    public void onSessionWriteTimeout(GridNioSession ses) {
        LT.warn(this.log, "Communication SPI session write timed out (consider increasing 'socketWriteTimeout' configuration property) [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + this.cfg.socketWriteTimeout() + ']');
        if (this.log.isDebugEnabled()) {
            this.log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + this.cfg.socketWriteTimeout() + ']');
        }
        ses.close();
    }

    @Override
    public void onConnected(GridNioSession ses) {
        block9: {
            if (ses.accepted()) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Accepted incoming communication connection [locAddr=" + ses.localAddress() + ", rmtAddr=" + ses.remoteAddress() + ']');
                }
                try {
                    if (this.client || this.ctxInitLatch.getCount() == 0L || !this.stateProvider.isHandshakeWaitSupported()) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Sending local node ID to newly accepted session: " + ses);
                        }
                        ses.sendNoFuture(this.stateProvider.nodeIdMessage(), null);
                        break block9;
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Sending handshake wait message to newly accepted session: " + ses);
                    }
                    ses.sendNoFuture(new HandshakeWaitMessage(), null);
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to send message: " + e, e);
                }
            } else if (this.log.isInfoEnabled()) {
                this.log.info("Established outgoing communication connection [locAddr=" + ses.localAddress() + ", rmtAddr=" + ses.remoteAddress() + ']');
            }
        }
    }

    @Override
    public void onMessageSent(GridNioSession ses, Message msg) {
        Object consistentId = ses.meta(TcpCommunicationSpi.CONSISTENT_ID_META);
        if (consistentId != null) {
            this.metricsLsnrSupplier.get().onMessageSent(msg, consistentId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(GridNioSession ses, Message msg) {
        Span span = MTC.span();
        span.addLog(() -> "Communication received");
        span.addTag("message", () -> TraceableMessagesTable.traceName(msg));
        ConnectionKey connKey = (ConnectionKey)ses.meta(TcpCommunicationSpi.CONN_IDX_META);
        if (connKey == null) {
            assert (ses.accepted()) : ses;
            if (!this.connectGate.tryEnter()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Close incoming connection, failed to enter gateway.");
                }
                ses.send(new RecoveryLastReceivedMessage(-2L)).listen(fut -> ses.close());
                return;
            }
            try {
                this.onFirstMessage(ses, msg);
            }
            finally {
                this.connectGate.leave();
            }
        } else {
            IgniteRunnable c;
            Object consistentId = ses.meta(TcpCommunicationSpi.CONSISTENT_ID_META);
            assert (consistentId != null);
            if (msg instanceof RecoveryLastReceivedMessage) {
                this.metricsLsnrSupplier.get().onMessageReceived(msg, consistentId);
                GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
                if (recovery != null) {
                    RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']');
                    }
                    recovery.ackReceived(msg0.received());
                }
                return;
            }
            GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor();
            if (recovery != null) {
                long rcvCnt = recovery.onReceived();
                if (rcvCnt % (long)this.cfg.ackSendThreshold() == 0L) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + rcvCnt + ']');
                    }
                    ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt));
                    recovery.lastAcknowledged(rcvCnt);
                }
            } else if (connKey.dummy()) {
                assert (msg instanceof NodeIdMessage) : msg;
                TcpCommunicationNodeConnectionCheckFuture fut2 = (TcpCommunicationNodeConnectionCheckFuture)ses.meta(TcpCommunicationConnectionCheckFuture.SES_FUT_META);
                assert (fut2 != null) : msg;
                fut2.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
                this.nioSrvWrapper.nio().closeFromWorkerThread(ses);
                return;
            }
            this.metricsLsnrSupplier.get().onMessageReceived(msg, consistentId);
            if (this.cfg.messageQueueLimit() > 0) {
                GridNioMessageTracker tracker = (GridNioMessageTracker)ses.meta(TRACKER_META);
                if (tracker == null) {
                    tracker = new GridNioMessageTracker(ses, this.cfg.messageQueueLimit());
                    GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker);
                    assert (old == null);
                }
                tracker.onMessageReceived();
                c = tracker;
            } else {
                c = CommunicationTcpUtils.NOOP;
            }
            this.lsnr.onMessage(connKey.nodeId(), msg, c);
        }
    }

    @Override
    public void onFailure(FailureType failureType, Throwable failure) {
        FailureProcessor failureProcessor = this.failureProcessorSupplier.get();
        if (failureProcessor != null) {
            failureProcessor.process(new FailureContext(failureType, failure));
        }
    }

    @Override
    public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
        ConnectionKey connId = (ConnectionKey)ses.meta(TcpCommunicationSpi.CONN_IDX_META);
        if (connId != null) {
            CommunicationListener<Message> lsnr0;
            GridNioRecoveryDescriptor outDesc;
            GridCommunicationClient[] nodeClients;
            if (connId.dummy()) {
                return;
            }
            UUID id = connId.nodeId();
            if (this.log.isDebugEnabled()) {
                String errMsg = e != null ? e.getMessage() : null;
                this.log.debug("The node was disconnected [nodeId=" + id + ", err=" + errMsg + "]");
            }
            if ((nodeClients = this.clientPool.clientFor(id)) != null) {
                for (GridCommunicationClient client : nodeClients) {
                    if (!(client instanceof GridTcpNioCommunicationClient) || ((GridTcpNioCommunicationClient)client).session() != ses) continue;
                    client.close();
                    this.clientPool.removeNodeClient(id, client);
                }
            }
            if (!this.stopping && (outDesc = ses.outRecoveryDescriptor()) != null) {
                if (outDesc.nodeAlive(this.nodeGetter.apply(id))) {
                    if (!outDesc.messagesRequests().isEmpty()) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Session was closed but there are unacknowledged messages, will try to reconnect [rmtNode=" + outDesc.node().id() + ']');
                        }
                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(outDesc, connId.connectionIndex());
                        this.commWorker.addProcessDisconnectRequest(disconnectData);
                    }
                } else {
                    outDesc.onNodeLeft();
                }
            }
            if ((lsnr0 = this.lsnr) != null) {
                lsnr0.onDisconnected(id);
            }
        }
    }

    public void stop() {
        this.stopping = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onFirstMessage(GridNioSession ses, Message msg) {
        ClusterNode rmtNode;
        ConnectionKey connKey;
        UUID sndId;
        if (msg instanceof NodeIdMessage) {
            sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
            connKey = new ConnectionKey(sndId, 0, -1L);
        } else {
            assert (msg instanceof HandshakeMessage) : msg;
            HandshakeMessage msg0 = (HandshakeMessage)msg;
            sndId = ((HandshakeMessage)msg).nodeId();
            connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount());
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Remote node ID received: " + sndId);
        }
        if ((rmtNode = this.nodeGetter.apply(sndId)) == null) {
            DiscoverySpi discoverySpi = this.igniteExSupplier.get().configuration().getDiscoverySpi();
            boolean unknownNode = true;
            if (discoverySpi instanceof TcpDiscoverySpi) {
                TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi)discoverySpi;
                ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
                if (node0 != null) {
                    assert (node0.isClient()) : node0;
                    if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) {
                        unknownNode = false;
                    }
                }
            } else if (discoverySpi instanceof IgniteDiscoverySpi) {
                boolean bl = unknownNode = !((IgniteDiscoverySpi)discoverySpi).knownNode(sndId);
            }
            if (unknownNode) {
                U.warn(this.log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
                ses.send(new RecoveryLastReceivedMessage(-4L)).listen(fut -> ses.close());
            } else {
                ses.send(new RecoveryLastReceivedMessage(-3L)).listen(fut -> ses.close());
            }
            return;
        }
        ses.addMeta(TcpCommunicationSpi.CONSISTENT_ID_META, rmtNode.consistentId());
        ConnectionKey old = ses.addMeta(TcpCommunicationSpi.CONN_IDX_META, connKey);
        assert (old == null);
        ClusterNode locNode = this.locNodeSupplier.get();
        if (ses.remoteAddress() == null) {
            return;
        }
        assert (msg instanceof HandshakeMessage) : msg;
        HandshakeMessage msg0 = (HandshakeMessage)msg;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Received handshake message [locNodeId=" + locNode.id() + ", rmtNodeId=" + sndId + ", msg=" + msg0 + ']');
        }
        if (this.cfg.usePairedConnections() && CommunicationTcpUtils.usePairedConnections(rmtNode, this.attributeNames.pairedConnection())) {
            GridNioRecoveryDescriptor recoveryDesc = this.nioSrvWrapper.inRecoveryDescriptor(rmtNode, connKey);
            boolean reserve = recoveryDesc.tryReserve();
            if (reserve) {
                this.connectedNew(recoveryDesc, ses, true);
            } else {
                ses.send(new RecoveryLastReceivedMessage(-1L));
                this.closeStaleConnections(connKey);
            }
        } else {
            assert (connKey.connectionIndex() >= 0) : connKey;
            GridCommunicationClient[] curClients = this.clientPool.clientFor(sndId);
            GridCommunicationClient oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? curClients[connKey.connectionIndex()] : null;
            boolean hasShmemClient = false;
            if (oldClient != null) {
                if (oldClient instanceof GridTcpNioCommunicationClient) {
                    if (this.log.isInfoEnabled()) {
                        this.log.info("Received incoming connection when already connected to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']');
                    }
                    ses.send(new RecoveryLastReceivedMessage(-1L));
                    this.closeStaleConnections(connKey);
                    return;
                }
                assert (oldClient instanceof GridShmemCommunicationClient);
                hasShmemClient = true;
            }
            GridFutureAdapter<GridCommunicationClient> fut2 = new GridFutureAdapter<GridCommunicationClient>();
            GridFutureAdapter<GridCommunicationClient> oldFut = this.clientPool.putIfAbsentFut(connKey, fut2);
            GridNioRecoveryDescriptor recoveryDesc = this.nioSrvWrapper.inRecoveryDescriptor(rmtNode, connKey);
            if (oldFut == null) {
                try {
                    curClients = this.clientPool.clientFor(sndId);
                    GridCommunicationClient gridCommunicationClient = oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? curClients[connKey.connectionIndex()] : null;
                    if (oldClient != null) {
                        if (oldClient instanceof GridTcpNioCommunicationClient) {
                            assert (oldClient.connectionIndex() == connKey.connectionIndex()) : oldClient;
                            if (this.log.isInfoEnabled()) {
                                this.log.info("Received incoming connection when already connected to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']');
                            }
                            ses.send(new RecoveryLastReceivedMessage(-1L));
                            this.closeStaleConnections(connKey);
                            fut2.onDone(oldClient);
                            return;
                        }
                        assert (oldClient instanceof GridShmemCommunicationClient);
                        hasShmemClient = true;
                    }
                    boolean reserved = recoveryDesc.tryReserve();
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Received incoming connection from remote node [rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ", recovery=" + recoveryDesc + ']');
                    }
                    if (reserved) {
                        GridTcpNioCommunicationClient client = this.connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                        fut2.onDone(client);
                    }
                    ses.send(new RecoveryLastReceivedMessage(-1L));
                    fut2.onDone();
                }
                catch (Throwable e) {
                    this.log.warning("Communication connection isn't established due to exception [rmtNode=" + rmtNode.id() + ", err=" + e.getMessage() + ']');
                    fut2.onDone(e);
                }
                finally {
                    this.clientPool.removeFut(connKey, fut2);
                }
            } else if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Received incoming connection from remote node while connecting to this node, rejecting [locNode=" + locNode.id() + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + ", rmtNodeOrder=" + rmtNode.order() + ']');
                }
                ses.send(new RecoveryLastReceivedMessage(-1L));
            } else {
                try {
                    boolean reserved = recoveryDesc.tryReserve();
                    if (reserved) {
                        GridTcpNioCommunicationClient client = this.connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
                        oldFut.onDone(client);
                    } else {
                        ses.send(new RecoveryLastReceivedMessage(-1L));
                        oldFut.onDone();
                    }
                }
                finally {
                    this.clientPool.removeFut(connKey, oldFut);
                }
            }
        }
    }

    private void closeStaleConnections(ConnectionKey connKey) {
        for (GridNioSession ses0 : this.nioSrvWrapper.nio().sessions()) {
            ConnectionKey key0 = (ConnectionKey)ses0.meta(TcpCommunicationSpi.CONN_IDX_META);
            if (key0 == null || !key0.nodeId().equals(connKey.nodeId()) || key0.connectionIndex() != connKey.connectionIndex() || key0.connectCount() >= connKey.connectCount()) continue;
            ses0.close();
        }
    }

    private GridTcpNioCommunicationClient connected(GridNioRecoveryDescriptor recovery, GridNioSession ses, ClusterNode node, long rcvCnt, boolean sndRes, boolean createClient) {
        ConnectionKey connKey = (ConnectionKey)ses.meta(TcpCommunicationSpi.CONN_IDX_META);
        assert (connKey != null && connKey.connectionIndex() >= 0) : connKey;
        assert (!this.cfg.usePairedConnections() || !CommunicationTcpUtils.usePairedConnections(node, this.attributeNames.pairedConnection()));
        recovery.onHandshake(rcvCnt);
        ses.inRecoveryDescriptor(recovery);
        ses.outRecoveryDescriptor(recovery);
        this.nioSrvWrapper.nio().resend(ses);
        try {
            if (sndRes) {
                this.nioSrvWrapper.nio().sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send message: " + e, e);
        }
        recovery.onConnected();
        GridTcpNioCommunicationClient client = null;
        if (createClient) {
            client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, this.log);
            this.clientPool.addNodeClient(node, connKey.connectionIndex(), client);
        }
        return client;
    }

    private void connectedNew(GridNioRecoveryDescriptor recovery, GridNioSession ses, boolean sndRes) {
        try {
            ses.inRecoveryDescriptor(recovery);
            if (sndRes) {
                this.nioSrvWrapper.nio().sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
            }
            recovery.onConnected();
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send message: " + e, e);
        }
    }

    public void communicationWorker(CommunicationWorker commWorker) {
        this.commWorker = commWorker;
    }
}

