/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.util.nio;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanType;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridDirectParser;
import org.apache.ignite.internal.util.nio.GridNioException;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
import org.apache.ignite.internal.util.nio.GridNioFilterChain;
import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
import org.apache.ignite.internal.util.nio.GridNioKeyAttachment;
import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.GridNioWorker;
import org.apache.ignite.internal.util.nio.GridSelectorNioSessionImpl;
import org.apache.ignite.internal.util.nio.SelectedSelectionKeySet;
import org.apache.ignite.internal.util.nio.SessionWriteRequest;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

public class GridNioServer<T> {
    public static final String IGNITE_IO_BALANCE_RANDOM_BALANCE = "IGNITE_IO_BALANCE_RANDOM_BALANCER";
    public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
    public static final int DFLT_SEND_QUEUE_LIMIT = 0;
    private static final long ERR_WAIT_TIME = 2000L;
    private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
    private static final int BUF_SSL_SYSTEM_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
    private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
    public static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
    private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
    private static final int REQUESTS_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
    private static final boolean DISABLE_KEYSET_OPTIMIZATION = IgniteSystemProperties.getBoolean("IGNITE_NO_SELECTOR_OPTS");
    public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME = "outboundMessagesQueueSize";
    public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Number of messages waiting to be sent";
    public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes";
    public static final String RECEIVED_BYTES_METRIC_DESC = "Total number of bytes received by current node";
    public static final String SENT_BYTES_METRIC_NAME = "sentBytes";
    public static final String SENT_BYTES_METRIC_DESC = "Total number of bytes sent by current node";
    private long selectorSpins;
    @GridToStringExclude
    private final GridNioAcceptWorker acceptWorker;
    private final IgniteThread[] clientThreads;
    private final List<AbstractNioClientWorker> clientWorkers;
    private final GridNioFilterChain<T> filterChain;
    private final GridNioServerListener<T> lsnr;
    @GridToStringExclude
    private final IgniteLogger log;
    private volatile boolean closed;
    private final boolean directBuf;
    @GridToStringExclude
    private int readBalanceIdx;
    @GridToStringExclude
    private int writeBalanceIdx = 1;
    private final boolean tcpNoDelay;
    private final int sockSndBuf;
    private final int sockRcvBuf;
    private volatile long writeTimeout = 5000L;
    private volatile long idleTimeout = 7000L;
    private boolean skipWrite;
    private boolean skipRead;
    private final InetSocketAddress locAddr;
    private final ByteOrder order;
    private final int sndQueueLimit;
    private final boolean directMode;
    @Nullable
    private final MetricRegistry mreg;
    @Nullable
    private final LongAdderMetric rcvdBytesCntMetric;
    @Nullable
    private final LongAdderMetric sentBytesCntMetric;
    @Nullable
    private final LongAdderMetric outboundMessagesQueueSizeMetric;
    private final GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions = new GridConcurrentHashSet();
    private GridNioSslFilter sslFilter;
    @GridToStringExclude
    private GridNioMessageWriterFactory writerFactory;
    @GridToStringExclude
    private IgnitePredicate<Message> skipRecoveryPred;
    private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
    private final AtomicLong readerMoveCnt = new AtomicLong();
    private final AtomicLong writerMoveCnt = new AtomicLong();
    private final IgniteRunnable balancer;
    private final boolean readWriteSelectorsAssign;
    private Tracing tracing;

    private GridNioServer(InetAddress addr, int port, IgniteLogger log, int selectorCnt, @Nullable String igniteInstanceName, @Nullable String srvName, long selectorSpins, boolean tcpNoDelay, boolean directBuf, ByteOrder order, GridNioServerListener<T> lsnr, int sockSndBuf, int sockRcvBuf, int sndQueueLimit, boolean directMode, boolean daemon, GridNioMessageWriterFactory writerFactory, IgnitePredicate<Message> skipRecoveryPred, IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr, boolean readWriteSelectorsAssign, @Nullable GridWorkerListener workerLsnr, @Nullable MetricRegistry mreg, Tracing tracing, GridNioFilter ... filters) throws IgniteCheckedException {
        if (port != -1) {
            A.notNull(addr, "addr");
        }
        A.notNull(lsnr, "lsnr");
        A.notNull(log, "log");
        A.notNull(order, "order");
        A.ensure(port == -1 || port > 0 && port < 65535, "port");
        A.ensure(selectorCnt > 0, "selectorCnt");
        A.ensure(sockRcvBuf >= 0, "sockRcvBuf");
        A.ensure(sockSndBuf >= 0, "sockSndBuf");
        A.ensure(sndQueueLimit >= 0, "sndQueueLimit");
        this.log = log;
        this.directBuf = directBuf;
        this.order = order;
        this.tcpNoDelay = tcpNoDelay;
        this.sockRcvBuf = sockRcvBuf;
        this.sockSndBuf = sockSndBuf;
        this.sndQueueLimit = sndQueueLimit;
        this.msgQueueLsnr = msgQueueLsnr;
        this.selectorSpins = selectorSpins;
        this.readWriteSelectorsAssign = readWriteSelectorsAssign;
        this.lsnr = lsnr;
        this.tracing = tracing == null ? new NoopTracing() : tracing;
        this.filterChain = new GridNioFilterChain<T>(log, lsnr, new HeadFilter(), filters);
        if (directMode) {
            for (GridNioFilter filter : filters) {
                if (!(filter instanceof GridNioSslFilter)) continue;
                this.sslFilter = (GridNioSslFilter)filter;
                assert (this.sslFilter.directMode());
            }
        }
        if (port != -1) {
            this.locAddr = new InetSocketAddress(addr, port);
            Selector acceptSelector = this.createSelector(this.locAddr);
            String threadName = srvName == null ? "nio-acceptor" : "nio-acceptor-" + srvName;
            this.acceptWorker = new GridNioAcceptWorker(igniteInstanceName, threadName, log, acceptSelector, workerLsnr);
        } else {
            this.locAddr = null;
            this.acceptWorker = null;
        }
        this.clientWorkers = new ArrayList<AbstractNioClientWorker>(selectorCnt);
        this.clientThreads = new IgniteThread[selectorCnt];
        for (int i = 0; i < selectorCnt; ++i) {
            String threadName = srvName == null ? "grid-nio-worker-" + i : "grid-nio-worker-" + srvName + "-" + i;
            AbstractNioClientWorker worker = directMode ? new DirectNioClientWorker(i, igniteInstanceName, threadName, log, workerLsnr) : new ByteBufferNioClientWorker(i, igniteInstanceName, threadName, log, workerLsnr);
            this.clientWorkers.add(worker);
            this.clientThreads[i] = new IgniteThread(worker);
            this.clientThreads[i].setDaemon(daemon);
        }
        this.directMode = directMode;
        this.writerFactory = writerFactory;
        this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.alwaysFalse();
        long balancePeriod = IgniteSystemProperties.getLong("IGNITE_IO_BALANCE_PERIOD", 5000L);
        IgniteRunnable balancer0 = null;
        if (balancePeriod > 0L) {
            boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE, false);
            balancer0 = rndBalance ? new RandomBalancer() : (readWriteSelectorsAssign ? new ReadWriteSizeBasedBalancer(balancePeriod) : new SizeBasedBalancer(balancePeriod));
        }
        this.balancer = balancer0;
        this.mreg = mreg;
        this.rcvdBytesCntMetric = mreg == null ? null : mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC);
        this.sentBytesCntMetric = mreg == null ? null : mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC);
        this.outboundMessagesQueueSizeMetric = mreg == null ? null : mreg.longAdderMetric(OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME, OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC);
    }

    public long readerMoveCount() {
        return this.readerMoveCnt.get();
    }

    public long writerMoveCount() {
        return this.writerMoveCnt.get();
    }

    public int port() {
        return this.locAddr != null ? this.locAddr.getPort() : -1;
    }

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public void start() {
        this.filterChain.start();
        if (this.acceptWorker != null) {
            new IgniteThread(this.acceptWorker).start();
        }
        for (IgniteThread thread : this.clientThreads) {
            thread.start();
        }
    }

    public void stop() {
        if (!this.closed) {
            this.closed = true;
            U.cancel(this.acceptWorker);
            U.join(this.acceptWorker, this.log);
            U.cancel(this.clientWorkers);
            U.join(this.clientWorkers, this.log);
            this.filterChain.stop();
            for (GridSelectorNioSessionImpl ses : this.sessions) {
                ses.onServerStopped();
            }
        }
    }

    public InetSocketAddress localAddress() {
        return this.locAddr;
    }

    public long selectorSpins() {
        return this.selectorSpins;
    }

    public GridNioFuture<Boolean> close(GridNioSession ses) {
        assert (ses instanceof GridSelectorNioSessionImpl) : ses;
        GridSelectorNioSessionImpl impl2 = (GridSelectorNioSessionImpl)ses;
        if (impl2.closed()) {
            return new GridNioFinishedFuture<Boolean>(false);
        }
        NioOperationFuture<Boolean> fut = new NioOperationFuture<Boolean>(impl2, NioOperation.CLOSE);
        impl2.offerStateChange(fut);
        return fut;
    }

    public void closeFromWorkerThread(GridNioSession ses) {
        assert (ses instanceof GridSelectorNioSessionImpl) : ses;
        GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
        ((AbstractNioClientWorker)ses0.worker()).close((GridSelectorNioSessionImpl)ses, null);
    }

    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (ses instanceof GridSelectorNioSessionImpl) : ses;
        GridSelectorNioSessionImpl impl2 = (GridSelectorNioSessionImpl)ses;
        if (createFut) {
            NioOperationFuture fut = new NioOperationFuture(impl2, NioOperation.REQUIRE_WRITE, msg, ackC);
            this.send0(impl2, fut, false);
            return fut;
        }
        WriteRequestImpl req = new WriteRequestImpl(ses, msg, true, ackC);
        this.send0(impl2, req, false);
        return null;
    }

    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
        assert (ses instanceof GridSelectorNioSessionImpl);
        GridSelectorNioSessionImpl impl2 = (GridSelectorNioSessionImpl)ses;
        if (createFut) {
            NioOperationFuture fut = new NioOperationFuture(impl2, NioOperation.REQUIRE_WRITE, msg, this.skipRecoveryPred.apply(msg), ackC);
            this.send0(impl2, fut, false);
            return fut;
        }
        WriteRequestImpl req = new WriteRequestImpl(ses, msg, this.skipRecoveryPred.apply(msg), ackC);
        this.send0(impl2, req, false);
        return null;
    }

    private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest req, boolean sys) throws IgniteCheckedException {
        AbstractNioClientWorker worker;
        int msgCnt;
        assert (ses != null);
        assert (req != null);
        int n = msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req);
        if (ses.closed()) {
            if (ses.removeFuture(req)) {
                IOException err = new IOException("Failed to send message (connection was closed): " + ses);
                req.onError(err);
                if (!(req instanceof GridNioFuture)) {
                    throw new IgniteCheckedException(err);
                }
            }
        } else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true) && (worker = (AbstractNioClientWorker)ses.worker()) != null) {
            worker.offer((SessionChangeRequest)((Object)req));
        }
        if (this.msgQueueLsnr != null) {
            this.msgQueueLsnr.apply(ses, msgCnt);
        }
    }

    public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException {
        this.sendSystem(ses, msg, null);
    }

    public void sendSystem(GridNioSession ses, Message msg, @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws IgniteCheckedException {
        assert (ses instanceof GridSelectorNioSessionImpl);
        GridSelectorNioSessionImpl impl2 = (GridSelectorNioSessionImpl)ses;
        if (lsnr != null) {
            NioOperationFuture fut = new NioOperationFuture(impl2, NioOperation.REQUIRE_WRITE, msg, this.skipRecoveryPred.apply(msg), null);
            fut.listen(lsnr);
            assert (!fut.isDone());
            this.send0(impl2, fut, true);
        } else {
            WriteRequestSystemImpl req = new WriteRequestSystemImpl(ses, msg);
            this.send0(impl2, req, true);
        }
    }

    public void resend(GridNioSession ses) {
        assert (ses instanceof GridSelectorNioSessionImpl);
        GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
        if (recoveryDesc != null && !recoveryDesc.messagesRequests().isEmpty()) {
            Deque<SessionWriteRequest> futs = recoveryDesc.messagesRequests();
            if (this.log.isDebugEnabled()) {
                this.log.debug("Resend messages [rmtNode=" + recoveryDesc.node().id() + ", msgCnt=" + futs.size() + ']');
            }
            GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
            SessionWriteRequest fut0 = futs.iterator().next();
            for (SessionWriteRequest fut : futs) {
                fut.messageThread(true);
                fut.resetSession(ses0);
            }
            ses0.resend(futs);
            ses0.offerStateChange((SessionChangeRequest)((Object)fut0));
        }
    }

    public Collection<? extends GridNioSession> sessions() {
        return this.sessions;
    }

    public List<AbstractNioClientWorker> workers() {
        return this.clientWorkers;
    }

    private void moveSession(GridNioSession ses, int from, int to) {
        assert (from >= 0 && from < this.clientWorkers.size()) : from;
        assert (to >= 0 && to < this.clientWorkers.size()) : to;
        assert (from != to);
        GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
        SessionMoveFuture fut = new SessionMoveFuture(ses0, to);
        if (!ses0.offerMove(this.clientWorkers.get(from), fut)) {
            fut.onDone(false);
        }
    }

    private GridNioFuture<?> pauseResumeReads(GridNioSession ses, NioOperation op) {
        assert (ses instanceof GridSelectorNioSessionImpl);
        assert (op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ);
        GridSelectorNioSessionImpl impl2 = (GridSelectorNioSessionImpl)ses;
        if (impl2.closed()) {
            return new GridNioFinishedFuture(new IOException("Failed to pause/resume reads (connection was closed): " + ses));
        }
        NioOperationFuture fut = new NioOperationFuture(impl2, op);
        impl2.offerStateChange(fut);
        return fut;
    }

    public IgniteInternalFuture<String> dumpStats() {
        String msg = "NIO server statistics [readerSesBalanceCnt=" + this.readerMoveCnt.get() + ", writerSesBalanceCnt=" + this.writerMoveCnt.get() + ']';
        return this.dumpStats(msg, null);
    }

    public IgniteInternalFuture<String> dumpStats(final String msg, IgnitePredicate<GridNioSession> p) {
        GridCompoundFuture<String, String> fut = new GridCompoundFuture<String, String>(new IgniteReducer<String, String>(){
            private final StringBuilder sb;
            {
                this.sb = new StringBuilder(msg);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean collect(@Nullable String msg2) {
                if (!F.isEmpty(msg2)) {
                    StringBuilder stringBuilder = this.sb;
                    synchronized (stringBuilder) {
                        if (this.sb.length() > 0) {
                            this.sb.append(U.nl());
                        }
                        this.sb.append(msg2);
                    }
                }
                return true;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public String reduce() {
                StringBuilder stringBuilder = this.sb;
                synchronized (stringBuilder) {
                    return this.sb.toString();
                }
            }
        });
        for (int i = 0; i < this.clientWorkers.size(); ++i) {
            NioOperationFuture opFut = new NioOperationFuture(null, NioOperation.DUMP_STATS);
            opFut.msg = p;
            this.clientWorkers.get(i).offer(opFut);
            fut.add(opFut);
        }
        fut.markInitialized();
        return fut;
    }

    public IgniteInternalFuture<String> dumpNodeStats(final String msg, IgnitePredicate<GridNioSession> p) {
        GridCompoundFuture<String, String> fut = new GridCompoundFuture<String, String>(new IgniteReducer<String, String>(){
            private final StringBuilder sb;
            {
                this.sb = new StringBuilder(msg);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean collect(@Nullable String msg2) {
                if (!F.isEmpty(msg2)) {
                    StringBuilder stringBuilder = this.sb;
                    synchronized (stringBuilder) {
                        if (this.sb.length() > 0) {
                            this.sb.append(U.nl());
                        }
                        this.sb.append(msg2);
                    }
                }
                return true;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public String reduce() {
                StringBuilder stringBuilder = this.sb;
                synchronized (stringBuilder) {
                    return this.sb.toString();
                }
            }
        });
        for (int i = 0; i < this.clientWorkers.size(); ++i) {
            NioOperationFuture opFut = new NioOperationFuture(null, NioOperation.DUMP_STATS);
            opFut.msg = p;
            this.clientWorkers.get(i).offer(opFut);
            fut.add(opFut);
        }
        fut.markInitialized();
        return fut;
    }

    public GridNioFuture<GridNioSession> createSession(SocketChannel ch, @Nullable Map<Integer, Object> meta, boolean async, @Nullable IgniteInClosure<? super IgniteInternalFuture<GridNioSession>> lsnr) {
        try {
            if (!this.closed) {
                ch.configureBlocking(false);
                NioOperationFuture<GridNioSession> req = new NioOperationFuture<GridNioSession>(ch, false, meta);
                if (async) {
                    assert (meta != null);
                    ((NioOperationFuture)req).op = NioOperation.CONNECT;
                }
                if (lsnr != null) {
                    req.listen(lsnr);
                }
                this.offerBalanced(req, meta);
                return req;
            }
            return new GridNioFinishedFuture<GridNioSession>(new GridNioException("Failed to create session, server is stopped."));
        }
        catch (IOException e) {
            return new GridNioFinishedFuture<GridNioSession>(e);
        }
    }

    public GridNioFuture<GridNioSession> cancelConnect(SocketChannel ch, Map<Integer, ?> meta) {
        if (!this.closed) {
            NioOperationFuture<GridNioSession> req = new NioOperationFuture<GridNioSession>(ch, false, meta);
            ((NioOperationFuture)req).op = NioOperation.CANCEL_CONNECT;
            Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY);
            assert (idx != null) : meta;
            this.clientWorkers.get(idx).offer(req);
            return req;
        }
        return new GridNioFinishedFuture<GridNioSession>(new IgniteCheckedException("Failed to cancel connection, server is stopped."));
    }

    public long writeTimeout() {
        return this.writeTimeout;
    }

    public void writeTimeout(long writeTimeout) {
        this.writeTimeout = writeTimeout;
    }

    public long idleTimeout() {
        return this.idleTimeout;
    }

    public void idleTimeout(long idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    private Selector createSelector(@Nullable SocketAddress addr) throws IgniteCheckedException {
        AbstractSelector selector = null;
        ServerSocketChannel srvrCh = null;
        try {
            selector = SelectorProvider.provider().openSelector();
            if (addr != null) {
                srvrCh = ServerSocketChannel.open();
                srvrCh.configureBlocking(false);
                if (this.sockRcvBuf > 0) {
                    srvrCh.socket().setReceiveBufferSize(this.sockRcvBuf);
                }
                srvrCh.socket().bind(addr);
                srvrCh.register(selector, 16);
            }
            return selector;
        }
        catch (Throwable e) {
            U.close(srvrCh, this.log);
            U.close(selector, this.log);
            if (e instanceof Error) {
                throw (Error)e;
            }
            throw new IgniteCheckedException("Failed to initialize NIO selector.", e);
        }
    }

    private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map<Integer, Object> meta) {
        int balanceIdx;
        assert (req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT) : req;
        assert (req.socketChannel() != null) : req;
        int workers = this.clientWorkers.size();
        if (workers > 1) {
            if (this.readWriteSelectorsAssign) {
                if (req.accepted()) {
                    balanceIdx = this.readBalanceIdx;
                    this.readBalanceIdx += 2;
                    if (this.readBalanceIdx >= workers) {
                        this.readBalanceIdx = 0;
                    }
                } else {
                    balanceIdx = this.writeBalanceIdx;
                    this.writeBalanceIdx += 2;
                    if (this.writeBalanceIdx >= workers) {
                        this.writeBalanceIdx = 1;
                    }
                }
            } else {
                balanceIdx = this.readBalanceIdx++;
                if (this.readBalanceIdx >= workers) {
                    this.readBalanceIdx = 0;
                }
            }
        } else {
            balanceIdx = 0;
        }
        if (meta != null) {
            meta.put(WORKER_IDX_META_KEY, balanceIdx);
        }
        this.clientWorkers.get(balanceIdx).offer(req);
    }

    private void stopPollingForWrite(SelectionKey key, GridSelectorNioSessionImpl ses) {
        if (ses.procWrite.get()) {
            ses.procWrite.set(false);
            if (ses.writeQueue().isEmpty()) {
                if ((key.interestOps() & 4) != 0) {
                    key.interestOps(key.interestOps() & 0xFFFFFFFB);
                }
            } else {
                ses.procWrite.set(true);
            }
        }
    }

    public String toString() {
        return S.toString(GridNioServer.class, this);
    }

    private void onRequestsWritten(GridSelectorNioSessionImpl ses, List<SessionWriteRequest> requests) {
        for (SessionWriteRequest request : requests) {
            request.onMessageWritten();
            this.onMessageWritten(ses, (Message)request.message());
        }
    }

    private void onMessageWritten(GridSelectorNioSessionImpl ses, Message msg) {
        if (this.lsnr != null) {
            this.lsnr.onMessageSent(ses, msg);
        }
    }

    public int outboundMessagesQueueSize() {
        if (this.outboundMessagesQueueSizeMetric == null) {
            return -1;
        }
        return (int)this.outboundMessagesQueueSizeMetric.value();
    }

    static interface SessionChangeRequest {
        public GridNioSession session();

        public NioOperation operation();
    }

    private class RandomBalancer
    implements IgniteRunnable {
        private static final long serialVersionUID = 0L;

        private RandomBalancer() {
        }

        @Override
        public void run() {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            int w1 = rnd.nextInt(GridNioServer.this.clientWorkers.size());
            if (((AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(w1)).workerSessions.isEmpty()) {
                return;
            }
            int w2 = rnd.nextInt(GridNioServer.this.clientWorkers.size());
            while (w2 == w1) {
                w2 = rnd.nextInt(GridNioServer.this.clientWorkers.size());
            }
            GridNioSession ses = this.randomSession((AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(w1));
            if (ses != null) {
                if (GridNioServer.this.log.isInfoEnabled()) {
                    GridNioServer.this.log.info("Move session [from=" + w1 + ", to=" + w2 + ", ses=" + ses + ']');
                }
                GridNioServer.this.moveSession(ses, w1, w2);
            }
        }

        private GridNioSession randomSession(AbstractNioClientWorker worker) {
            GridConcurrentHashSet sessions = worker.workerSessions;
            int size = sessions.size();
            if (size == 0) {
                return null;
            }
            int idx = ThreadLocalRandom.current().nextInt(size);
            Iterator it = sessions.iterator();
            int cnt = 0;
            while (it.hasNext()) {
                GridNioSession ses = (GridNioSession)it.next();
                if (cnt != idx) continue;
                return ses;
            }
            return null;
        }
    }

    private class SizeBasedBalancer
    implements IgniteRunnable {
        private static final long serialVersionUID = 0L;
        private long lastBalance;
        private final long balancePeriod;

        SizeBasedBalancer(long balancePeriod) {
            this.balancePeriod = balancePeriod;
        }

        @Override
        public void run() {
            long now = U.currentTimeMillis();
            if (this.lastBalance + this.balancePeriod < now) {
                int i;
                this.lastBalance = now;
                long maxBytes0 = -1L;
                long minBytes0 = -1L;
                int maxBytesIdx = -1;
                int minBytesIdx = -1;
                for (i = 0; i < GridNioServer.this.clientWorkers.size(); ++i) {
                    AbstractNioClientWorker worker = (AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(i);
                    int sesCnt = worker.workerSessions.size();
                    long bytes0 = worker.bytesRcvd0 + worker.bytesSent0;
                    if ((maxBytes0 == -1L || bytes0 > maxBytes0) && bytes0 > 0L && sesCnt > 1) {
                        maxBytes0 = bytes0;
                        maxBytesIdx = i;
                    }
                    if (minBytes0 != -1L && bytes0 >= minBytes0) continue;
                    minBytes0 = bytes0;
                    minBytesIdx = i;
                }
                if (GridNioServer.this.log.isDebugEnabled()) {
                    GridNioServer.this.log.debug("Balancing data [min0=" + minBytes0 + ", minIdx=" + minBytesIdx + ", max0=" + maxBytes0 + ", maxIdx=" + maxBytesIdx + ']');
                }
                if (maxBytes0 != -1L && minBytes0 != -1L) {
                    long bytesDiff;
                    GridSelectorNioSessionImpl ses = null;
                    long delta = bytesDiff = maxBytes0 - minBytes0;
                    double threshold = (double)bytesDiff * 0.9;
                    GridConcurrentHashSet sessions = ((AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(maxBytesIdx)).workerSessions;
                    for (GridSelectorNioSessionImpl ses0 : sessions) {
                        long bytesSent0 = ses0.bytesSent0();
                        if (!((double)bytesSent0 < threshold) || ses != null && delta <= U.safeAbs(bytesSent0 - bytesDiff / 2L)) continue;
                        ses = ses0;
                        delta = U.safeAbs(bytesSent0 - bytesDiff / 2L);
                    }
                    if (ses != null) {
                        if (GridNioServer.this.log.isDebugEnabled()) {
                            GridNioServer.this.log.debug("Will move session to less loaded worker [ses=" + ses + ", from=" + maxBytesIdx + ", to=" + minBytesIdx + ']');
                        }
                        GridNioServer.this.moveSession(ses, maxBytesIdx, minBytesIdx);
                    } else if (GridNioServer.this.log.isDebugEnabled()) {
                        GridNioServer.this.log.debug("Unable to find session to move.");
                    }
                }
                for (i = 0; i < GridNioServer.this.clientWorkers.size(); ++i) {
                    AbstractNioClientWorker worker = (AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(i);
                    worker.reset0();
                }
            }
        }
    }

    private class ReadWriteSizeBasedBalancer
    implements IgniteRunnable {
        private static final long serialVersionUID = 0L;
        private long lastBalance;
        private final long balancePeriod;

        ReadWriteSizeBasedBalancer(long balancePeriod) {
            this.balancePeriod = balancePeriod;
        }

        @Override
        public void run() {
            long now = U.currentTimeMillis();
            if (this.lastBalance + this.balancePeriod < now) {
                GridConcurrentHashSet sessions;
                double threshold;
                long delta;
                int i;
                this.lastBalance = now;
                long maxRcvd0 = -1L;
                long minRcvd0 = -1L;
                long maxSent0 = -1L;
                long minSent0 = -1L;
                int maxRcvdIdx = -1;
                int minRcvdIdx = -1;
                int maxSentIdx = -1;
                int minSentIdx = -1;
                for (i = 0; i < GridNioServer.this.clientWorkers.size(); ++i) {
                    AbstractNioClientWorker worker = (AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(i);
                    int sesCnt = worker.workerSessions.size();
                    if (i % 2 == 0) {
                        long bytesRcvd0 = worker.bytesRcvd0;
                        if ((maxRcvd0 == -1L || bytesRcvd0 > maxRcvd0) && bytesRcvd0 > 0L && sesCnt > 1) {
                            maxRcvd0 = bytesRcvd0;
                            maxRcvdIdx = i;
                        }
                        if (minRcvd0 != -1L && bytesRcvd0 >= minRcvd0) continue;
                        minRcvd0 = bytesRcvd0;
                        minRcvdIdx = i;
                        continue;
                    }
                    long bytesSent0 = worker.bytesSent0;
                    if ((maxSent0 == -1L || bytesSent0 > maxSent0) && bytesSent0 > 0L && sesCnt > 1) {
                        maxSent0 = bytesSent0;
                        maxSentIdx = i;
                    }
                    if (minSent0 != -1L && bytesSent0 >= minSent0) continue;
                    minSent0 = bytesSent0;
                    minSentIdx = i;
                }
                if (GridNioServer.this.log.isDebugEnabled()) {
                    GridNioServer.this.log.debug("Balancing data [minSent0=" + minSent0 + ", minSentIdx=" + minSentIdx + ", maxSent0=" + maxSent0 + ", maxSentIdx=" + maxSentIdx + ", minRcvd0=" + minRcvd0 + ", minRcvdIdx=" + minRcvdIdx + ", maxRcvd0=" + maxRcvd0 + ", maxRcvdIdx=" + maxRcvdIdx + ']');
                }
                if (maxSent0 != -1L && minSent0 != -1L) {
                    long sentDiff;
                    GridSelectorNioSessionImpl ses = null;
                    delta = sentDiff = maxSent0 - minSent0;
                    threshold = (double)sentDiff * 0.9;
                    sessions = ((AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(maxSentIdx)).workerSessions;
                    for (GridSelectorNioSessionImpl ses0 : sessions) {
                        long bytesSent0 = ses0.bytesSent0();
                        if (!((double)bytesSent0 < threshold) || ses != null && delta <= U.safeAbs(bytesSent0 - sentDiff / 2L)) continue;
                        ses = ses0;
                        delta = U.safeAbs(bytesSent0 - sentDiff / 2L);
                    }
                    if (ses != null) {
                        if (GridNioServer.this.log.isDebugEnabled()) {
                            GridNioServer.this.log.debug("Will move session to less loaded writer [ses=" + ses + ", from=" + maxSentIdx + ", to=" + minSentIdx + ']');
                        }
                        GridNioServer.this.moveSession(ses, maxSentIdx, minSentIdx);
                    } else if (GridNioServer.this.log.isDebugEnabled()) {
                        GridNioServer.this.log.debug("Unable to find session to move for writers.");
                    }
                }
                if (maxRcvd0 != -1L && minRcvd0 != -1L) {
                    long rcvdDiff;
                    GridSelectorNioSessionImpl ses = null;
                    delta = rcvdDiff = maxRcvd0 - minRcvd0;
                    threshold = (double)rcvdDiff * 0.9;
                    sessions = ((AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(maxRcvdIdx)).workerSessions;
                    for (GridSelectorNioSessionImpl ses0 : sessions) {
                        long bytesRcvd0 = ses0.bytesReceived0();
                        if (!((double)bytesRcvd0 < threshold) || ses != null && delta <= U.safeAbs(bytesRcvd0 - rcvdDiff / 2L)) continue;
                        ses = ses0;
                        delta = U.safeAbs(bytesRcvd0 - rcvdDiff / 2L);
                    }
                    if (ses != null) {
                        if (GridNioServer.this.log.isDebugEnabled()) {
                            GridNioServer.this.log.debug("Will move session to less loaded reader [ses=" + ses + ", from=" + maxRcvdIdx + ", to=" + minRcvdIdx + ']');
                        }
                        GridNioServer.this.moveSession(ses, maxRcvdIdx, minRcvdIdx);
                    } else if (GridNioServer.this.log.isDebugEnabled()) {
                        GridNioServer.this.log.debug("Unable to find session to move for readers.");
                    }
                }
                for (i = 0; i < GridNioServer.this.clientWorkers.size(); ++i) {
                    AbstractNioClientWorker worker = (AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(i);
                    worker.reset0();
                }
            }
        }
    }

    public static class Builder<T> {
        private static final GridNioFilter[] EMPTY_FILTERS = new GridNioFilter[0];
        private InetAddress addr;
        private int port;
        private IgniteLogger log;
        private int selectorCnt;
        private String igniteInstanceName;
        private boolean tcpNoDelay;
        private boolean directBuf;
        private ByteOrder byteOrder = ByteOrder.LITTLE_ENDIAN;
        private GridNioServerListener<T> lsnr;
        private int sockSndBufSize;
        private int sockRcvBufSize;
        private int sndQueueLimit = 0;
        private boolean directMode;
        private GridNioFilter[] filters;
        private long idleTimeout = -1L;
        private long writeTimeout = -1L;
        private boolean daemon;
        private GridNioMessageWriterFactory writerFactory;
        private IgnitePredicate<Message> skipRecoveryPred;
        private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
        private String srvName;
        private long selectorSpins;
        private boolean readWriteSelectorsAssign;
        private GridWorkerListener workerLsnr;
        private MetricRegistry mreg;
        private Tracing tracing;

        public GridNioServer<T> build() throws IgniteCheckedException {
            GridNioServer ret = new GridNioServer(this.addr, this.port, this.log, this.selectorCnt, this.igniteInstanceName, this.srvName, this.selectorSpins, this.tcpNoDelay, this.directBuf, this.byteOrder, (GridNioServerListener)this.lsnr, this.sockSndBufSize, this.sockRcvBufSize, this.sndQueueLimit, this.directMode, this.daemon, this.writerFactory, (IgnitePredicate)this.skipRecoveryPred, (IgniteBiInClosure)this.msgQueueLsnr, this.readWriteSelectorsAssign, this.workerLsnr, this.mreg, this.tracing, this.filters != null ? Arrays.copyOf(this.filters, this.filters.length) : EMPTY_FILTERS);
            if (this.idleTimeout >= 0L) {
                ret.idleTimeout(this.idleTimeout);
            }
            if (this.writeTimeout >= 0L) {
                ret.writeTimeout(this.writeTimeout);
            }
            return ret;
        }

        public Builder<T> readWriteSelectorsAssign(boolean readWriteSelectorsAssign) {
            this.readWriteSelectorsAssign = readWriteSelectorsAssign;
            return this;
        }

        public Builder<T> tracing(Tracing tracing) {
            this.tracing = tracing;
            return this;
        }

        public Builder<T> address(InetAddress addr) {
            this.addr = addr;
            return this;
        }

        public Builder<T> port(int port) {
            this.port = port;
            return this;
        }

        public Builder<T> logger(IgniteLogger log) {
            this.log = log;
            return this;
        }

        public Builder<T> selectorCount(int selectorCnt) {
            this.selectorCnt = selectorCnt;
            return this;
        }

        public Builder<T> igniteInstanceName(@Nullable String igniteInstanceName) {
            this.igniteInstanceName = igniteInstanceName;
            return this;
        }

        public Builder<T> serverName(@Nullable String srvName) {
            this.srvName = srvName;
            return this;
        }

        public Builder<T> selectorSpins(long selectorSpins) {
            this.selectorSpins = selectorSpins;
            return this;
        }

        public Builder<T> tcpNoDelay(boolean tcpNoDelay) {
            this.tcpNoDelay = tcpNoDelay;
            return this;
        }

        public Builder<T> directBuffer(boolean directBuf) {
            this.directBuf = directBuf;
            return this;
        }

        public Builder<T> byteOrder(ByteOrder byteOrder) {
            this.byteOrder = byteOrder;
            return this;
        }

        public Builder<T> listener(GridNioServerListener<T> lsnr) {
            this.lsnr = lsnr;
            return this;
        }

        public Builder<T> socketSendBufferSize(int sockSndBufSize) {
            this.sockSndBufSize = sockSndBufSize;
            return this;
        }

        public Builder<T> socketReceiveBufferSize(int sockRcvBufSize) {
            this.sockRcvBufSize = sockRcvBufSize;
            return this;
        }

        public Builder<T> sendQueueLimit(int sndQueueLimit) {
            this.sndQueueLimit = sndQueueLimit;
            return this;
        }

        public Builder<T> directMode(boolean directMode) {
            this.directMode = directMode;
            return this;
        }

        public Builder<T> filters(GridNioFilter ... filters) {
            this.filters = filters;
            return this;
        }

        public Builder<T> idleTimeout(long idleTimeout) {
            this.idleTimeout = idleTimeout;
            return this;
        }

        public Builder<T> writeTimeout(long writeTimeout) {
            this.writeTimeout = writeTimeout;
            return this;
        }

        public Builder<T> daemon(boolean daemon) {
            this.daemon = daemon;
            return this;
        }

        public Builder<T> writerFactory(GridNioMessageWriterFactory writerFactory) {
            this.writerFactory = writerFactory;
            return this;
        }

        public Builder<T> skipRecoveryPredicate(IgnitePredicate<Message> skipRecoveryPred) {
            this.skipRecoveryPred = skipRecoveryPred;
            return this;
        }

        public Builder<T> messageQueueSizeListener(IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr) {
            this.msgQueueLsnr = msgQueueLsnr;
            return this;
        }

        public Builder<T> workerListener(GridWorkerListener workerLsnr) {
            this.workerLsnr = workerLsnr;
            return this;
        }

        public Builder<T> metricRegistry(MetricRegistry mreg) {
            this.mreg = mreg;
            return this;
        }
    }

    private class HeadFilter
    extends GridNioFilterAdapter {
        protected HeadFilter() {
            super("HeadFilter");
        }

        @Override
        public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
            if (GridNioServer.this.directMode && GridNioServer.this.sslFilter != null) {
                ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedQueue());
            }
            this.proceedSessionOpened(ses);
        }

        @Override
        public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
            this.proceedSessionClosed(ses);
        }

        @Override
        public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
            this.proceedExceptionCaught(ses, ex);
        }

        @Override
        public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
            if (GridNioServer.this.directMode) {
                boolean sslSys;
                boolean bl = sslSys = GridNioServer.this.sslFilter != null && msg instanceof ByteBuffer;
                if (sslSys) {
                    GridNioWorker worker;
                    ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue)ses.meta(BUF_SSL_SYSTEM_META_KEY);
                    assert (queue != null);
                    queue.offer((ByteBuffer)msg);
                    GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
                    if (!ses0.procWrite.get() && ses0.procWrite.compareAndSet(false, true) && (worker = ses0.worker()) != null) {
                        worker.registerWrite(ses0);
                    }
                    return null;
                }
                return GridNioServer.this.send(ses, (Message)msg, fut, ackC);
            }
            return GridNioServer.this.send(ses, (ByteBuffer)msg, fut, ackC);
        }

        @Override
        public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
            this.proceedMessageReceived(ses, msg);
        }

        @Override
        public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) {
            return GridNioServer.this.close(ses);
        }

        @Override
        public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
            this.proceedSessionIdleTimeout(ses);
        }

        @Override
        public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
            this.proceedSessionWriteTimeout(ses);
        }

        @Override
        public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException {
            return GridNioServer.this.pauseResumeReads(ses, NioOperation.PAUSE_READ);
        }

        @Override
        public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException {
            return GridNioServer.this.pauseResumeReads(ses, NioOperation.RESUME_READ);
        }
    }

    private static class SessionMoveFuture
    extends NioOperationFuture<Boolean> {
        private final int toIdx;
        @GridToStringExclude
        private SocketChannel movedSockCh;

        SessionMoveFuture(GridSelectorNioSessionImpl ses, int toIdx) {
            super(ses, NioOperation.MOVE);
            this.toIdx = toIdx;
        }

        int toIndex() {
            return this.toIdx;
        }

        SocketChannel movedSocketChannel() {
            return this.movedSockCh;
        }

        void movedSocketChannel(SocketChannel movedSockCh) {
            assert (movedSockCh != null);
            this.movedSockCh = movedSockCh;
        }

        @Override
        public String toString() {
            return S.toString(SessionMoveFuture.class, this, super.toString());
        }
    }

    private static class NioOperationFuture<R>
    extends GridNioFutureImpl<R>
    implements SessionWriteRequest,
    SessionChangeRequest,
    GridNioKeyAttachment {
        @GridToStringExclude
        private SocketChannel sockCh;
        @GridToStringExclude
        private GridSelectorNioSessionImpl ses;
        private NioOperation op;
        private Object msg;
        @GridToStringExclude
        private boolean accepted;
        @GridToStringExclude
        private Map<Integer, ?> meta;
        @GridToStringExclude
        private boolean skipRecovery;
        private Span span;

        NioOperationFuture(SocketChannel sockCh, boolean accepted, @Nullable Map<Integer, ?> meta) {
            super(null);
            this.op = NioOperation.REGISTER;
            this.sockCh = sockCh;
            this.accepted = accepted;
            this.meta = meta;
            this.span = MTC.span();
        }

        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) {
            super(null);
            assert (ses != null || op == NioOperation.DUMP_STATS) : "Invalid params [ses=" + ses + ", op=" + (Object)((Object)op) + ']';
            assert (op != null);
            assert (op != NioOperation.REGISTER);
            this.ses = ses;
            this.op = op;
            this.span = MTC.span();
        }

        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg, IgniteInClosure<IgniteException> ackC) {
            super(ackC);
            assert (ses != null);
            assert (op != null);
            assert (op != NioOperation.REGISTER);
            assert (msg != null);
            this.ses = ses;
            this.op = op;
            this.msg = msg;
            this.span = MTC.span();
        }

        NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Message commMsg, boolean skipRecovery, IgniteInClosure<IgniteException> ackC) {
            super(ackC);
            assert (ses != null);
            assert (op != null);
            assert (op != NioOperation.REGISTER);
            assert (commMsg != null);
            this.ses = ses;
            this.op = op;
            this.msg = commMsg;
            this.skipRecovery = skipRecovery;
            this.span = MTC.span();
        }

        @Override
        public boolean hasSession() {
            return this.ses != null;
        }

        @Override
        public NioOperation operation() {
            return this.op;
        }

        @Override
        public Span span() {
            return this.span;
        }

        @Override
        public Object message() {
            return this.msg;
        }

        @Override
        public void resetSession(GridNioSession ses) {
            assert (this.msg instanceof Message) : this.msg;
            this.ses = (GridSelectorNioSessionImpl)ses;
        }

        SocketChannel socketChannel() {
            return this.sockCh;
        }

        @Override
        public GridSelectorNioSessionImpl session() {
            return this.ses;
        }

        boolean accepted() {
            return this.accepted;
        }

        public Map<Integer, ?> meta() {
            return this.meta;
        }

        @Override
        public void onError(Exception e) {
            this.onDone(e);
        }

        @Override
        public void onAckReceived() {
            assert (this.msg instanceof Message) : this.msg;
            ((Message)this.msg).onAckReceived();
        }

        @Override
        public void onMessageWritten() {
            this.onDone();
        }

        @Override
        public boolean skipRecovery() {
            return this.skipRecovery;
        }

        @Override
        public String toString() {
            return S.toString(NioOperationFuture.class, this);
        }
    }

    private static final class WriteRequestImpl
    implements SessionWriteRequest,
    SessionChangeRequest {
        private GridNioSession ses;
        private final Object msg;
        private boolean msgThread;
        private final boolean skipRecovery;
        private final IgniteInClosure<IgniteException> ackC;
        private Span span;

        WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery, IgniteInClosure<IgniteException> ackC) {
            this.ses = ses;
            this.msg = msg;
            this.skipRecovery = skipRecovery;
            this.ackC = ackC;
            this.span = MTC.span();
        }

        @Override
        public void messageThread(boolean msgThread) {
            this.msgThread = msgThread;
        }

        @Override
        public boolean messageThread() {
            return this.msgThread;
        }

        @Override
        public boolean skipRecovery() {
            return this.skipRecovery;
        }

        @Override
        public void onAckReceived() {
            assert (this.msg instanceof Message);
            ((Message)this.msg).onAckReceived();
        }

        @Override
        public IgniteInClosure<IgniteException> ackClosure() {
            return this.ackC;
        }

        @Override
        public void onError(Exception e) {
        }

        @Override
        public Object message() {
            return this.msg;
        }

        @Override
        public void onMessageWritten() {
        }

        @Override
        public void resetSession(GridNioSession ses) {
            this.ses = ses;
        }

        @Override
        public GridNioSession session() {
            return this.ses;
        }

        @Override
        public NioOperation operation() {
            return NioOperation.REQUIRE_WRITE;
        }

        @Override
        public Span span() {
            return this.span;
        }

        public String toString() {
            return S.toString(WriteRequestImpl.class, this);
        }
    }

    private static final class WriteRequestSystemImpl
    implements SessionWriteRequest,
    SessionChangeRequest {
        private final Object msg;
        private final GridNioSession ses;
        private Span span;

        WriteRequestSystemImpl(GridNioSession ses, Object msg) {
            this.ses = ses;
            this.msg = msg;
            this.span = MTC.span();
        }

        @Override
        public void messageThread(boolean msgThread) {
        }

        @Override
        public boolean messageThread() {
            return true;
        }

        @Override
        public boolean skipRecovery() {
            return true;
        }

        @Override
        public void onAckReceived() {
            throw new UnsupportedOperationException();
        }

        @Override
        public IgniteInClosure<IgniteException> ackClosure() {
            return null;
        }

        @Override
        public void onError(Exception e) {
        }

        @Override
        public Object message() {
            return this.msg;
        }

        @Override
        public void onMessageWritten() {
        }

        @Override
        public void resetSession(GridNioSession ses) {
            throw new UnsupportedOperationException();
        }

        @Override
        public GridNioSession session() {
            return this.ses;
        }

        @Override
        public NioOperation operation() {
            return NioOperation.REQUIRE_WRITE;
        }

        @Override
        public Span span() {
            return this.span;
        }

        public String toString() {
            return S.toString(WriteRequestSystemImpl.class, this);
        }
    }

    private static enum NioOperation {
        CONNECT,
        CANCEL_CONNECT,
        REGISTER,
        MOVE,
        REQUIRE_WRITE,
        CLOSE,
        PAUSE_READ,
        RESUME_READ,
        DUMP_STATS;

    }

    private class GridNioAcceptWorker
    extends GridWorker {
        private Selector selector;

        protected GridNioAcceptWorker(String igniteInstanceName, String name, IgniteLogger log, @Nullable Selector selector, GridWorkerListener workerLsnr) {
            super(igniteInstanceName, name, log, workerLsnr);
            this.selector = selector;
        }

        @Override
        public void cancel() {
            super.cancel();
            if (this.runner() == null) {
                this.closeSelector();
            }
        }

        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            Throwable err = null;
            try {
                boolean reset = false;
                while (!GridNioServer.this.closed && !this.isCancelled()) {
                    try {
                        if (reset) {
                            this.selector = GridNioServer.this.createSelector(GridNioServer.this.locAddr);
                        }
                        this.accept();
                    }
                    catch (IgniteCheckedException e) {
                        if (Thread.currentThread().isInterrupted()) continue;
                        U.error(this.log, "Failed to accept remote connection (will wait for 2000ms).", e);
                        U.sleep(2000L);
                        reset = true;
                    }
                }
            }
            catch (Throwable t) {
                if (!(t instanceof IgniteInterruptedCheckedException)) {
                    err = t;
                }
                throw t;
            }
            finally {
                try {
                    this.closeSelector();
                }
                catch (RuntimeException reset) {}
                if (err == null && !GridNioServer.this.closed) {
                    err = new IllegalStateException("Thread " + this.name() + " is terminated unexpectedly");
                }
                if (err instanceof OutOfMemoryError) {
                    GridNioServer.this.lsnr.onFailure(FailureType.CRITICAL_ERROR, err);
                } else if (err != null) {
                    GridNioServer.this.lsnr.onFailure(FailureType.SYSTEM_WORKER_TERMINATION, err);
                } else {
                    this.cancel();
                }
            }
        }

        private void accept() throws IgniteCheckedException {
            try {
                while (!GridNioServer.this.closed && this.selector.isOpen() && !Thread.currentThread().isInterrupted()) {
                    this.blockingSectionBegin();
                    int numKeys = this.selector.select(2000L);
                    this.blockingSectionEnd();
                    if (numKeys > 0) {
                        this.processSelectedKeys(this.selector.selectedKeys());
                        this.updateHeartbeat();
                    }
                    if (GridNioServer.this.balancer != null) {
                        GridNioServer.this.balancer.run();
                    }
                    this.onIdle();
                }
            }
            catch (ClosedByInterruptException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closing selector due to thread interruption [srvr=" + this + ", err=" + e.getMessage() + ']');
                }
            }
            catch (ClosedSelectorException e) {
                throw new IgniteCheckedException("Selector got closed while active: " + this, e);
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to accept connection: " + this, e);
            }
            finally {
                this.closeSelector();
            }
        }

        private void closeSelector() {
            if (this.selector.isOpen()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closing all listening sockets.");
                }
                for (SelectionKey key : this.selector.keys()) {
                    U.close(key.channel(), this.log);
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closing NIO selector.");
                }
                U.close(this.selector, this.log);
            }
        }

        private void processSelectedKeys(Set<SelectionKey> keys) throws IOException {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing keys in accept worker: " + keys.size());
            }
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (!key.isValid() || !key.isAcceptable()) continue;
                ServerSocketChannel srvrCh = (ServerSocketChannel)key.channel();
                SocketChannel sockCh = srvrCh.accept();
                sockCh.configureBlocking(false);
                sockCh.socket().setTcpNoDelay(GridNioServer.this.tcpNoDelay);
                sockCh.socket().setKeepAlive(true);
                if (GridNioServer.this.sockSndBuf > 0) {
                    sockCh.socket().setSendBufferSize(GridNioServer.this.sockSndBuf);
                }
                if (GridNioServer.this.sockRcvBuf > 0) {
                    sockCh.socket().setReceiveBufferSize(GridNioServer.this.sockRcvBuf);
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Accepted new client connection: " + sockCh.socket().getRemoteSocketAddress());
                }
                this.addRegistrationRequest(sockCh);
            }
        }

        private void addRegistrationRequest(SocketChannel sockCh) {
            GridNioServer.this.offerBalanced(new NioOperationFuture(sockCh, true, null), null);
        }
    }

    private abstract class AbstractNioClientWorker
    extends GridWorker
    implements GridNioWorker {
        @GridToStringExclude
        private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs;
        @GridToStringExclude
        private Selector selector;
        @GridToStringExclude
        private SelectedSelectionKeySet selectedKeys;
        private final int idx;
        private long bytesRcvd;
        private long bytesSent;
        private volatile long bytesRcvd0;
        private volatile long bytesSent0;
        @GridToStringExclude
        private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions;
        private volatile boolean select;

        AbstractNioClientWorker(@Nullable int idx, String igniteInstanceName, String name, @Nullable IgniteLogger log, GridWorkerListener workerLsnr) throws IgniteCheckedException {
            super(igniteInstanceName, name, log, workerLsnr);
            this.changeReqs = new ConcurrentLinkedQueue();
            this.workerSessions = new GridConcurrentHashSet();
            this.createSelector();
            this.idx = idx;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            Throwable err = null;
            try {
                boolean reset = false;
                while (!GridNioServer.this.closed) {
                    this.updateHeartbeat();
                    try {
                        if (reset) {
                            this.createSelector();
                        }
                        this.bodyInternal();
                        this.onIdle();
                    }
                    catch (IgniteCheckedException e) {
                        if (Thread.currentThread().isInterrupted()) continue;
                        U.error(this.log, "Failed to read data from remote connection (will wait for 2000ms).", e);
                        U.sleep(2000L);
                        reset = true;
                    }
                }
            }
            catch (Throwable e) {
                U.error(this.log, "Caught unhandled exception in NIO worker thread (restart the node).", e);
                err = e;
                if (e instanceof Error) {
                    throw e;
                }
            }
            finally {
                if (err instanceof OutOfMemoryError) {
                    GridNioServer.this.lsnr.onFailure(FailureType.CRITICAL_ERROR, err);
                } else if (!GridNioServer.this.closed) {
                    if (err == null) {
                        GridNioServer.this.lsnr.onFailure(FailureType.SYSTEM_WORKER_TERMINATION, new IllegalStateException("Thread " + this.name() + " is terminated unexpectedly"));
                    } else {
                        GridNioServer.this.lsnr.onFailure(FailureType.SYSTEM_WORKER_TERMINATION, err);
                    }
                } else if (err != null) {
                    GridNioServer.this.lsnr.onFailure(FailureType.SYSTEM_WORKER_TERMINATION, err);
                } else {
                    this.cancel();
                }
            }
        }

        private void createSelector() throws IgniteCheckedException {
            block5: {
                this.selectedKeys = null;
                this.selector = GridNioServer.this.createSelector(null);
                if (DISABLE_KEYSET_OPTIMIZATION) {
                    return;
                }
                try {
                    SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
                    Class<?> selectorImplCls = Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
                    if (!selectorImplCls.isAssignableFrom(this.selector.getClass())) {
                        return;
                    }
                    Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplCls.getDeclaredField("publicSelectedKeys");
                    selectedKeysField.setAccessible(true);
                    publicSelectedKeysField.setAccessible(true);
                    selectedKeysField.set(this.selector, selectedKeySet);
                    publicSelectedKeysField.set(this.selector, selectedKeySet);
                    this.selectedKeys = selectedKeySet;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Instrumented an optimized java.util.Set into: " + this.selector);
                    }
                }
                catch (Exception e) {
                    this.selectedKeys = null;
                    if (!this.log.isDebugEnabled()) break block5;
                    this.log.debug("Failed to instrument an optimized java.util.Set into selector [selector=" + this.selector + ", err=" + e + ']');
                }
            }
        }

        @Override
        public void offer(SessionChangeRequest req) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("The session change request was offered [req=" + req + "]");
            }
            this.changeReqs.offer(req);
            if (this.select) {
                this.selector.wakeup();
            }
        }

        @Override
        public void offer(Collection<SessionChangeRequest> reqs) {
            if (this.log.isDebugEnabled()) {
                String strReqs = reqs.stream().map(Objects::toString).collect(Collectors.joining(","));
                this.log.debug("The session change requests were offered [reqs=" + strReqs + "]");
            }
            for (SessionChangeRequest req : reqs) {
                this.changeReqs.offer(req);
            }
            this.selector.wakeup();
        }

        @Override
        public List<SessionChangeRequest> clearSessionRequests(GridNioSession ses) {
            ArrayList<SessionChangeRequest> sesReqs = null;
            if (this.log.isDebugEnabled()) {
                this.log.debug("The session was removed [ses=" + ses + "]");
            }
            for (SessionChangeRequest changeReq : this.changeReqs) {
                if (changeReq.session() != ses || changeReq instanceof SessionMoveFuture) continue;
                boolean rmv = this.changeReqs.remove(changeReq);
                assert (rmv) : changeReq;
                if (sesReqs == null) {
                    sesReqs = new ArrayList<SessionChangeRequest>();
                }
                sesReqs.add(changeReq);
            }
            return sesReqs;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void bodyInternal() throws IgniteCheckedException, InterruptedException {
            try {
                long lastIdleCheck = U.currentTimeMillis();
                block29: while (!GridNioServer.this.closed && this.selector.isOpen()) {
                    SessionChangeRequest req0;
                    this.updateHeartbeat();
                    block30: while ((req0 = this.changeReqs.poll()) != null) {
                        this.updateHeartbeat();
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("The session request will be processed [req=" + req0 + "]");
                        }
                        switch (req0.operation()) {
                            case CONNECT: {
                                NioOperationFuture fut = (NioOperationFuture)req0;
                                SocketChannel ch = fut.socketChannel();
                                try {
                                    ch.register(this.selector, 8, fut);
                                }
                                catch (IOException e) {
                                    fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e));
                                }
                                break;
                            }
                            case CANCEL_CONNECT: {
                                SessionWriteRequest req = (NioOperationFuture)req0;
                                SocketChannel ch = ((NioOperationFuture)req).socketChannel();
                                SelectionKey key = ch.keyFor(this.selector);
                                if (key != null) {
                                    key.cancel();
                                }
                                U.closeQuiet(ch);
                                ((GridFutureAdapter)((Object)req)).onDone();
                                break;
                            }
                            case REGISTER: {
                                this.register((NioOperationFuture)req0);
                                break;
                            }
                            case MOVE: {
                                SessionMoveFuture f = (SessionMoveFuture)req0;
                                GridSelectorNioSessionImpl ses = f.session();
                                if (this.idx == f.toIdx) {
                                    assert (f.movedSocketChannel() != null) : f;
                                    boolean add = this.workerSessions.add(ses);
                                    assert (add);
                                    ses.finishMoveSession(this);
                                    if (this.idx % 2 == 0) {
                                        GridNioServer.this.readerMoveCnt.incrementAndGet();
                                    } else {
                                        GridNioServer.this.writerMoveCnt.incrementAndGet();
                                    }
                                    SelectionKey key = f.movedSocketChannel().register(this.selector, 5, ses);
                                    ses.key(key);
                                    ses.procWrite.set(true);
                                    f.onDone(true);
                                    break;
                                }
                                assert (f.movedSocketChannel() == null) : f;
                                if (this.workerSessions.remove(ses)) {
                                    ses.startMoveSession(this);
                                    SelectionKey key = ses.key();
                                    assert (key.channel() != null) : key;
                                    f.movedSocketChannel((SocketChannel)key.channel());
                                    key.cancel();
                                    ((AbstractNioClientWorker)GridNioServer.this.clientWorkers.get(f.toIndex())).offer(f);
                                    break;
                                }
                                f.onDone(false);
                                break;
                            }
                            case REQUIRE_WRITE: {
                                SessionWriteRequest req = (SessionWriteRequest)((Object)req0);
                                this.registerWrite((GridSelectorNioSessionImpl)req.session());
                                break;
                            }
                            case CLOSE: {
                                SessionWriteRequest req = (NioOperationFuture)req0;
                                if (this.close(((NioOperationFuture)req).session(), null)) {
                                    ((GridFutureAdapter)((Object)req)).onDone(true);
                                    break;
                                }
                                ((GridFutureAdapter)((Object)req)).onDone(false);
                                break;
                            }
                            case PAUSE_READ: {
                                SessionWriteRequest req = (NioOperationFuture)req0;
                                SelectionKey key = ((NioOperationFuture)req).session().key();
                                if (key.isValid()) {
                                    key.interestOps(key.interestOps() & 0xFFFFFFFE);
                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
                                    ses.readsPaused(true);
                                    ((GridFutureAdapter)((Object)req)).onDone(true);
                                    break;
                                }
                                ((GridFutureAdapter)((Object)req)).onDone(false);
                                break;
                            }
                            case RESUME_READ: {
                                SessionWriteRequest req = (NioOperationFuture)req0;
                                SelectionKey key = ((NioOperationFuture)req).session().key();
                                if (key.isValid()) {
                                    key.interestOps(key.interestOps() | 1);
                                    GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
                                    ses.readsPaused(false);
                                    ((GridFutureAdapter)((Object)req)).onDone(true);
                                    break;
                                }
                                ((GridFutureAdapter)((Object)req)).onDone(false);
                                break;
                            }
                            case DUMP_STATS: {
                                NioOperationFuture req = (NioOperationFuture)req0;
                                IgnitePredicate p = req.msg instanceof IgnitePredicate ? (IgnitePredicate)req.msg : null;
                                StringBuilder sb = new StringBuilder();
                                try {
                                    this.dumpStats(sb, p, p != null);
                                    continue block30;
                                }
                                finally {
                                    req.onDone(sb.toString());
                                    continue block30;
                                }
                            }
                        }
                    }
                    int res = 0;
                    for (long i = 0L; i < GridNioServer.this.selectorSpins && res == 0; ++i) {
                        res = this.selector.selectNow();
                        if (res > 0) {
                            this.updateHeartbeat();
                            if (this.selectedKeys == null) {
                                this.processSelectedKeys(this.selector.selectedKeys());
                            } else {
                                this.processSelectedKeysOptimized(this.selectedKeys.flip());
                            }
                        }
                        if (!this.changeReqs.isEmpty()) continue block29;
                        long now = U.currentTimeMillis();
                        if (now - lastIdleCheck > 2000L) {
                            lastIdleCheck = now;
                            this.checkIdle(this.selector.keys());
                        }
                        if (!this.isCancelled()) continue;
                        return;
                    }
                    this.select = true;
                    try {
                        if (!this.changeReqs.isEmpty()) continue;
                        this.blockingSectionBegin();
                        int numKeys = this.selector.select(2000L);
                        this.blockingSectionEnd();
                        if (numKeys > 0) {
                            if (this.selectedKeys == null) {
                                this.processSelectedKeys(this.selector.selectedKeys());
                            } else {
                                this.processSelectedKeysOptimized(this.selectedKeys.flip());
                            }
                            this.updateHeartbeat();
                        }
                        if (!GridNioServer.this.closed && !this.isCancelled && Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                    }
                    finally {
                        this.select = false;
                        continue;
                    }
                    long now = U.currentTimeMillis();
                    if (now - lastIdleCheck <= 2000L) continue;
                    lastIdleCheck = now;
                    this.checkIdle(this.selector.keys());
                }
            }
            catch (ClosedByInterruptException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closing selector due to thread interruption: " + e.getMessage());
                }
            }
            catch (ClosedSelectorException e) {
                throw new IgniteCheckedException("Selector got closed while active.", e);
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to select events on selector.", e);
            }
            finally {
                if (this.selector.isOpen()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Closing all connected client sockets.");
                    }
                    for (SelectionKey key : this.selector.keys()) {
                        GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
                        if (attach == null || !attach.hasSession()) continue;
                        this.close(attach.session(), null);
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Closing NIO selector.");
                    }
                    U.close(this.selector, this.log);
                }
            }
        }

        @Override
        public final void registerWrite(GridSelectorNioSessionImpl ses) {
            SelectionKey key = ses.key();
            if (key.isValid()) {
                if ((key.interestOps() & 4) == 0) {
                    key.interestOps(key.interestOps() | 4);
                }
                ses.bytesSent(0);
            }
        }

        private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) {
            sb.append(">> Selector info [id=").append(this.idx).append(", keysCnt=").append(keys.size()).append(", bytesRcvd=").append(this.bytesRcvd).append(", bytesRcvd0=").append(this.bytesRcvd0).append(", bytesSent=").append(this.bytesSent).append(", bytesSent0=").append(this.bytesSent0).append("]").append(U.nl());
        }

        private void dumpStats(StringBuilder sb, @Nullable IgnitePredicate<GridNioSession> p, boolean shortInfo) {
            boolean selInfo;
            Set<SelectionKey> keys = this.selector.keys();
            boolean bl = selInfo = p == null;
            if (selInfo) {
                this.dumpSelectorInfo(sb, keys);
            }
            for (SelectionKey key : keys) {
                GridNioRecoveryDescriptor inDesc;
                GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
                if (!attach.hasSession()) continue;
                GridSelectorNioSessionImpl ses = attach.session();
                boolean sesInfo = p == null || p.apply(ses);
                if (!sesInfo) continue;
                if (!selInfo) {
                    this.dumpSelectorInfo(sb, keys);
                    selInfo = true;
                }
                sb.append("    Connection info [").append("in=").append(ses.accepted()).append(", rmtAddr=").append(ses.remoteAddress()).append(", locAddr=").append(ses.localAddress());
                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
                if (outDesc != null) {
                    sb.append(", msgsSent=").append(outDesc.sent()).append(", msgsAckedByRmt=").append(outDesc.acked()).append(", descIdHash=").append(System.identityHashCode(outDesc));
                    if (!outDesc.messagesRequests().isEmpty()) {
                        int cnt = 0;
                        sb.append(", unackedMsgs=[");
                        for (SessionWriteRequest req : outDesc.messagesRequests()) {
                            if (cnt != 0) {
                                sb.append(", ");
                            }
                            Object msg = req.message();
                            if (shortInfo && msg instanceof GridIoMessage) {
                                msg = ((GridIoMessage)msg).message().getClass().getSimpleName();
                            }
                            sb.append(msg);
                            if (++cnt != 5) continue;
                            break;
                        }
                        sb.append(']');
                    }
                } else {
                    sb.append(", outRecoveryDesc=null");
                }
                if ((inDesc = ses.inRecoveryDescriptor()) != null) {
                    sb.append(", msgsRcvd=").append(inDesc.received()).append(", lastAcked=").append(inDesc.lastAcknowledged()).append(", descIdHash=").append(System.identityHashCode(inDesc));
                } else {
                    sb.append(", inRecoveryDesc=null");
                }
                sb.append(", bytesRcvd=").append(ses.bytesReceived()).append(", bytesRcvd0=").append(ses.bytesReceived0()).append(", bytesSent=").append(ses.bytesSent()).append(", bytesSent0=").append(ses.bytesSent0()).append(", opQueueSize=").append(ses.writeQueueSize());
                if (!shortInfo) {
                    MessageWriter writer = (MessageWriter)ses.meta(GridNioSessionMetaKey.MSG_WRITER.ordinal());
                    MessageReader reader = (MessageReader)ses.meta(GridDirectParser.READER_META_KEY);
                    sb.append(", msgWriter=").append(writer != null ? writer.toString() : "null").append(", msgReader=").append(reader != null ? reader.toString() : "null");
                }
                int cnt = 0;
                for (SessionWriteRequest req : ses.writeQueue()) {
                    Object msg = req.message();
                    if (shortInfo && msg instanceof GridIoMessage) {
                        msg = ((GridIoMessage)msg).message().getClass().getSimpleName();
                    }
                    if (cnt == 0) {
                        sb.append(",\n opQueue=[").append(msg);
                    } else {
                        sb.append(',').append(msg);
                    }
                    if (++cnt != 5) continue;
                    sb.append(']');
                    break;
                }
                sb.append("]");
            }
        }

        private void processSelectedKeysOptimized(SelectionKey[] keys) throws ClosedByInterruptException {
            SelectionKey key;
            int i = 0;
            while ((key = keys[i]) != null) {
                keys[i] = null;
                if (key.isValid()) {
                    GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
                    assert (attach != null);
                    try {
                        if (!attach.hasSession() && key.isConnectable()) {
                            this.processConnect(key);
                        } else {
                            if (key.isReadable()) {
                                this.processRead(key);
                            }
                            if (key.isValid() && key.isWritable()) {
                                this.processWrite(key);
                            }
                        }
                    }
                    catch (ClosedByInterruptException e) {
                        throw e;
                    }
                    catch (Error | Exception e) {
                        try {
                            U.sleep(1000L);
                        }
                        catch (IgniteInterruptedCheckedException igniteInterruptedCheckedException) {
                            // empty catch block
                        }
                        GridSelectorNioSessionImpl ses = attach.session();
                        if (!GridNioServer.this.closed) {
                            U.error(this.log, "Failed to process selector key [ses=" + ses + ']', e);
                        } else if (this.log.isDebugEnabled()) {
                            this.log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
                        }
                        if (ses != null) {
                            this.close(ses, new GridNioException(e));
                        }
                        this.closeKey(key);
                    }
                }
                ++i;
            }
        }

        private void processSelectedKeys(Set<SelectionKey> keys) throws ClosedByInterruptException {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Processing keys in client worker: " + keys.size());
            }
            if (keys.isEmpty()) {
                return;
            }
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (!key.isValid()) continue;
                GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
                assert (attach != null);
                try {
                    if (!attach.hasSession() && key.isConnectable()) {
                        this.processConnect(key);
                        continue;
                    }
                    if (key.isReadable()) {
                        this.processRead(key);
                    }
                    if (!key.isValid() || !key.isWritable()) continue;
                    this.processWrite(key);
                }
                catch (ClosedByInterruptException e) {
                    throw e;
                }
                catch (Error | Exception e) {
                    try {
                        U.sleep(1000L);
                    }
                    catch (IgniteInterruptedCheckedException igniteInterruptedCheckedException) {
                        // empty catch block
                    }
                    GridSelectorNioSessionImpl ses = attach.session();
                    if (!GridNioServer.this.closed) {
                        U.error(this.log, "Failed to process selector key [ses=" + ses + ']', e);
                        continue;
                    }
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']');
                }
            }
        }

        private void checkIdle(Iterable<SelectionKey> keys) {
            long now = U.currentTimeMillis();
            for (SelectionKey key : keys) {
                GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment();
                if (attach == null || !attach.hasSession()) continue;
                GridSelectorNioSessionImpl ses = attach.session();
                try {
                    boolean opWrite;
                    long writeTimeout0 = GridNioServer.this.writeTimeout;
                    boolean bl = opWrite = key.isValid() && (key.interestOps() & 4) != 0;
                    if (opWrite && now - ses.lastSendTime() > writeTimeout0) {
                        GridNioServer.this.filterChain.onSessionWriteTimeout(ses);
                        ses.bytesSent(0);
                        continue;
                    }
                    long idleTimeout0 = GridNioServer.this.idleTimeout;
                    if (opWrite || now - ses.lastReceiveTime() <= idleTimeout0 || now - ses.lastSendScheduleTime() <= idleTimeout0) continue;
                    GridNioServer.this.filterChain.onSessionIdleTimeout(ses);
                    ses.resetSendScheduleTime();
                    ses.bytesReceived(0);
                }
                catch (IgniteCheckedException e) {
                    this.close(ses, e);
                }
            }
        }

        private void register(NioOperationFuture<GridNioSession> fut) {
            assert (fut != null);
            SocketChannel sockCh = fut.socketChannel();
            assert (sockCh != null);
            Socket sock = sockCh.socket();
            try {
                SelectionKey key;
                ByteBuffer writeBuf = null;
                ByteBuffer readBuf = null;
                if (GridNioServer.this.directMode) {
                    writeBuf = GridNioServer.this.directBuf ? ByteBuffer.allocateDirect(sock.getSendBufferSize()) : ByteBuffer.allocate(sock.getSendBufferSize());
                    readBuf = GridNioServer.this.directBuf ? ByteBuffer.allocateDirect(sock.getReceiveBufferSize()) : ByteBuffer.allocate(sock.getReceiveBufferSize());
                    writeBuf.order(GridNioServer.this.order);
                    readBuf.order(GridNioServer.this.order);
                }
                GridSelectorNioSessionImpl ses = new GridSelectorNioSessionImpl(this.log, this, GridNioServer.this.filterChain, (InetSocketAddress)sockCh.getLocalAddress(), (InetSocketAddress)sockCh.getRemoteAddress(), fut.accepted(), GridNioServer.this.sndQueueLimit, GridNioServer.this.mreg, writeBuf, readBuf);
                Map<Integer, ?> meta = fut.meta();
                if (meta != null) {
                    GridNioRecoveryDescriptor desc;
                    for (Map.Entry<Integer, ?> e : meta.entrySet()) {
                        ses.addMeta(e.getKey(), e.getValue());
                    }
                    if (!ses.accepted() && (desc = (GridNioRecoveryDescriptor)meta.get(RECOVERY_DESC_META_KEY)) != null) {
                        ses.outRecoveryDescriptor(desc);
                        if (!desc.pairedConnections()) {
                            ses.inRecoveryDescriptor(desc);
                        }
                    }
                }
                if (!sockCh.isRegistered()) {
                    assert (((NioOperationFuture)fut).op == NioOperation.REGISTER) : NioOperationFuture.access$200(fut);
                    key = sockCh.register(this.selector, 1, ses);
                    ses.key(key);
                    GridNioServer.this.resend(ses);
                } else {
                    assert (((NioOperationFuture)fut).op == NioOperation.CONNECT) : NioOperationFuture.access$200(fut);
                    key = sockCh.keyFor(this.selector);
                    key.attach(ses);
                    key.interestOps(key.interestOps() & 0xFFFFFFF7);
                    key.interestOps(key.interestOps() | 1);
                    ses.key(key);
                }
                GridNioServer.this.sessions.add(ses);
                this.workerSessions.add(ses);
                try {
                    GridNioServer.this.filterChain.onSessionOpened(ses);
                    fut.onDone(ses);
                }
                catch (IgniteCheckedException e) {
                    this.close(ses, e);
                    fut.onDone(e);
                }
                if (GridNioServer.this.closed) {
                    ses.onServerStopped();
                }
            }
            catch (ClosedChannelException e) {
                U.warn(this.log, "Failed to register accepted socket channel to selector (channel was closed): " + sock.getRemoteSocketAddress(), e);
            }
            catch (IOException e) {
                U.error(this.log, "Failed to get socket addresses.", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeKey(SelectionKey key) {
            Socket sock = ((SocketChannel)key.channel()).socket();
            try {
                try {
                    sock.shutdownInput();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                try {
                    sock.shutdownOutput();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            finally {
                U.close(key, this.log);
                U.close(sock, this.log);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected boolean close(GridSelectorNioSessionImpl ses, @Nullable IgniteCheckedException e) {
            if (e != null) {
                if (e.hasCause(IOException.class)) {
                    U.warn(this.log, "Client disconnected abruptly due to network connection loss or because the connection was left open on application shutdown. [cls=" + e.getClass() + ", msg=" + e.getMessage() + ']', e);
                } else {
                    U.error(this.log, "Closing NIO session because of unhandled exception.", e);
                }
            }
            GridNioServer.this.sessions.remove(ses);
            this.workerSessions.remove(ses);
            if (ses.setClosed()) {
                ses.onClosed();
                if (GridNioServer.this.directBuf) {
                    if (ses.writeBuffer() != null) {
                        GridUnsafe.cleanDirectBuffer(ses.writeBuffer());
                    }
                    if (ses.readBuffer() != null) {
                        GridUnsafe.cleanDirectBuffer(ses.readBuffer());
                    }
                }
                this.closeKey(ses.key());
                if (e != null) {
                    GridNioServer.this.filterChain.onExceptionCaught(ses, e);
                }
                ses.removeMeta(BUF_META_KEY);
                SessionWriteRequest req = (SessionWriteRequest)ses.removeMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal());
                GridNioRecoveryDescriptor outRecovery = ses.outRecoveryDescriptor();
                GridNioRecoveryDescriptor inRecovery = ses.inRecoveryDescriptor();
                IOException err = new IOException("Failed to send message (connection was closed): " + ses);
                if (outRecovery != null || inRecovery != null) {
                    try {
                        while ((req = ses.pollFuture()) != null) {
                            if (!req.skipRecovery()) continue;
                            req.onError(err);
                        }
                    }
                    finally {
                        if (outRecovery != null) {
                            outRecovery.release();
                        }
                        if (inRecovery != null && inRecovery != outRecovery) {
                            inRecovery.release();
                        }
                    }
                } else {
                    if (req != null) {
                        req.onError(err);
                    }
                    while ((req = ses.pollFuture()) != null) {
                        req.onError(err);
                    }
                }
                try {
                    GridNioServer.this.filterChain.onSessionClosed(ses);
                }
                catch (IgniteCheckedException e1) {
                    GridNioServer.this.filterChain.onExceptionCaught(ses, e1);
                }
                return true;
            }
            return false;
        }

        private void processConnect(SelectionKey key) throws IOException {
            SocketChannel ch = (SocketChannel)key.channel();
            NioOperationFuture sesFut = (NioOperationFuture)key.attachment();
            assert (sesFut != null);
            try {
                if (ch.finishConnect()) {
                    this.register(sesFut);
                }
            }
            catch (IOException e) {
                U.closeQuiet(ch);
                sesFut.onDone(new GridNioException("Failed to connect to node", e));
                throw e;
            }
        }

        protected abstract void processRead(SelectionKey var1) throws IOException;

        protected abstract void processWrite(SelectionKey var1) throws IOException;

        final void onRead(int cnt) {
            this.bytesRcvd += (long)cnt;
            this.bytesRcvd0 += (long)cnt;
        }

        final void onWrite(int cnt) {
            this.bytesSent += (long)cnt;
            this.bytesSent0 += (long)cnt;
        }

        final void reset0() {
            this.bytesSent0 = 0L;
            this.bytesRcvd0 = 0L;
            for (GridSelectorNioSessionImpl ses : this.workerSessions) {
                ses.reset0();
            }
        }

        @Override
        public String toString() {
            return S.toString(AbstractNioClientWorker.class, this, super.toString());
        }
    }

    private class DirectNioClientWorker
    extends AbstractNioClientWorker {
        protected DirectNioClientWorker(@Nullable int idx, String igniteInstanceName, String name, @Nullable IgniteLogger log, GridWorkerListener workerLsnr) throws IgniteCheckedException {
            super(idx, igniteInstanceName, name, log, workerLsnr);
        }

        @Override
        protected void processRead(SelectionKey key) throws IOException {
            GridSelectorNioSessionImpl ses;
            ByteBuffer readBuf;
            if (GridNioServer.this.skipRead) {
                try {
                    U.sleep(50L);
                }
                catch (IgniteInterruptedCheckedException ignored) {
                    U.warn(this.log, "Sleep has been interrupted.");
                }
                return;
            }
            ReadableByteChannel sockCh = (ReadableByteChannel)((Object)key.channel());
            int cnt = sockCh.read(readBuf = (ses = (GridSelectorNioSessionImpl)key.attachment()).readBuffer());
            if (cnt == -1) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Remote client closed connection: " + ses);
                }
                this.close(ses, null);
                return;
            }
            if (this.log.isTraceEnabled()) {
                this.log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']');
            }
            if (cnt == 0) {
                return;
            }
            if (GridNioServer.this.rcvdBytesCntMetric != null) {
                GridNioServer.this.rcvdBytesCntMetric.add(cnt);
            }
            ses.bytesReceived(cnt);
            this.onRead(cnt);
            readBuf.flip();
            assert (readBuf.hasRemaining());
            try {
                GridNioServer.this.filterChain.onMessageReceived(ses, readBuf);
                if (readBuf.hasRemaining()) {
                    readBuf.compact();
                } else {
                    readBuf.clear();
                }
                if (ses.hasSystemMessage() && !ses.procWrite.get()) {
                    ses.procWrite.set(true);
                    this.registerWrite(ses);
                }
            }
            catch (IgniteCheckedException e) {
                this.close(ses, e);
            }
        }

        @Override
        protected void processWrite(SelectionKey key) throws IOException {
            if (GridNioServer.this.sslFilter != null) {
                this.processWriteSsl(key);
            } else {
                this.processWrite0(key);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processWriteSsl(SelectionKey key) throws IOException {
            WritableByteChannel sockCh = (WritableByteChannel)((Object)key.channel());
            GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
            MessageWriter writer = (MessageWriter)ses.meta(GridNioSessionMetaKey.MSG_WRITER.ordinal());
            if (writer == null) {
                try {
                    writer = GridNioServer.this.writerFactory.writer(ses);
                    ses.addMeta(GridNioSessionMetaKey.MSG_WRITER.ordinal(), writer);
                }
                catch (IgniteCheckedException e) {
                    throw new IOException("Failed to create message writer.", e);
                }
            }
            boolean handshakeFinished = GridNioServer.this.sslFilter.lock(ses);
            try {
                boolean writeFinished = this.writeSslSystem(ses, sockCh);
                if (!handshakeFinished) {
                    if (writeFinished) {
                        GridNioServer.this.stopPollingForWrite(key, ses);
                    }
                    return;
                }
                ByteBuffer sslNetBuf = (ByteBuffer)ses.removeMeta(BUF_META_KEY);
                if (sslNetBuf != null) {
                    int cnt = sockCh.write(sslNetBuf);
                    if (GridNioServer.this.sentBytesCntMetric != null) {
                        GridNioServer.this.sentBytesCntMetric.add(cnt);
                    }
                    ses.bytesSent(cnt);
                    if (sslNetBuf.hasRemaining()) {
                        ses.addMeta(BUF_META_KEY, sslNetBuf);
                        return;
                    }
                    List requests = (List)ses.removeMeta(REQUESTS_META_KEY);
                    if (requests != null) {
                        GridNioServer.this.onRequestsWritten(ses, requests);
                    }
                }
                ByteBuffer buf = ses.writeBuffer();
                if (ses.meta(WRITE_BUF_LIMIT) != null) {
                    buf.limit((Integer)ses.meta(WRITE_BUF_LIMIT));
                }
                SessionWriteRequest req = (SessionWriteRequest)ses.removeMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal());
                while (true) {
                    if (req == null && (req = this.systemMessage(ses)) == null && (req = ses.pollFuture()) == null && buf.position() == 0) {
                        GridNioServer.this.stopPollingForWrite(key, ses);
                        break;
                    }
                    boolean finished = false;
                    ArrayList<SessionWriteRequest> pendingRequests = new ArrayList<SessionWriteRequest>(2);
                    if (req != null) {
                        finished = this.writeToBuffer(writer, buf, req, pendingRequests);
                    }
                    while (finished) {
                        req = this.systemMessage(ses);
                        if (req == null) {
                            req = ses.pollFuture();
                        }
                        if (req == null) break;
                        finished = this.writeToBuffer(writer, buf, req, pendingRequests);
                    }
                    int sesBufLimit = buf.limit();
                    int sesCap = buf.capacity();
                    buf.flip();
                    buf = GridNioServer.this.sslFilter.encrypt(ses, buf);
                    ByteBuffer sesBuf = ses.writeBuffer();
                    sesBuf.clear();
                    if (sesCap - buf.limit() < 0) {
                        int limit = sesBufLimit + (sesCap - buf.limit()) - 100;
                        ses.addMeta(WRITE_BUF_LIMIT, limit);
                        sesBuf.limit(limit);
                    }
                    assert (buf.hasRemaining());
                    if (!GridNioServer.this.skipWrite) {
                        int cnt = sockCh.write(buf);
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
                        }
                        if (GridNioServer.this.sentBytesCntMetric != null) {
                            GridNioServer.this.sentBytesCntMetric.add(cnt);
                        }
                        ses.bytesSent(cnt);
                    } else {
                        try {
                            U.sleep(50L);
                        }
                        catch (IgniteInterruptedCheckedException e) {
                            throw new IOException("Thread has been interrupted.", e);
                        }
                    }
                    ses.addMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal(), req);
                    if (buf.hasRemaining()) {
                        ses.addMeta(BUF_META_KEY, buf);
                        ses.addMeta(REQUESTS_META_KEY, pendingRequests);
                        break;
                    }
                    GridNioServer.this.onRequestsWritten(ses, pendingRequests);
                    buf = ses.writeBuffer();
                    if (ses.meta(WRITE_BUF_LIMIT) == null) continue;
                    buf.limit((Integer)ses.meta(WRITE_BUF_LIMIT));
                }
            }
            finally {
                GridNioServer.this.sslFilter.unlock(ses);
            }
        }

        private boolean writeToBuffer(MessageWriter writer, ByteBuffer buf, SessionWriteRequest req, List<SessionWriteRequest> pendingRequests) {
            Message msg = (Message)req.message();
            Span span = GridNioServer.this.tracing.create(SpanType.COMMUNICATION_SOCKET_WRITE, req.span());
            try (MTC.TraceSurroundings ignore = span.equals(NoopSpan.INSTANCE) ? null : MTC.support(span);){
                span.addTag("message", () -> TraceableMessagesTable.traceName(msg));
                assert (msg != null);
                if (writer != null) {
                    writer.setCurrentWriteClass(msg.getClass());
                }
                int startPos = buf.position();
                boolean finished = msg.writeTo(buf, writer);
                span.addTag("socket.write.bytes", () -> Integer.toString(buf.position() - startPos));
                if (finished) {
                    pendingRequests.add(req);
                    if (writer != null) {
                        writer.reset();
                    }
                }
                boolean bl = finished;
                return bl;
            }
        }

        private boolean writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh) throws IOException {
            ByteBuffer buf;
            ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue)ses.meta(BUF_SSL_SYSTEM_META_KEY);
            assert (queue != null);
            while ((buf = (ByteBuffer)queue.peek()) != null) {
                int cnt = sockCh.write(buf);
                if (GridNioServer.this.sentBytesCntMetric != null) {
                    GridNioServer.this.sentBytesCntMetric.add(cnt);
                }
                ses.bytesSent(cnt);
                if (!buf.hasRemaining()) {
                    queue.poll();
                    continue;
                }
                return false;
            }
            return true;
        }

        private SessionWriteRequest systemMessage(GridSelectorNioSessionImpl ses) {
            if (ses.hasSystemMessage()) {
                Object msg = ses.systemMessage();
                WriteRequestSystemImpl req = new WriteRequestSystemImpl(ses, msg);
                assert (!ses.hasSystemMessage());
                return req;
            }
            return null;
        }

        private void processWrite0(SelectionKey key) throws IOException {
            WritableByteChannel sockCh = (WritableByteChannel)((Object)key.channel());
            GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
            ByteBuffer buf = ses.writeBuffer();
            SessionWriteRequest req = (SessionWriteRequest)ses.removeMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal());
            MessageWriter writer = (MessageWriter)ses.meta(GridNioSessionMetaKey.MSG_WRITER.ordinal());
            if (writer == null) {
                try {
                    writer = GridNioServer.this.writerFactory.writer(ses);
                    ses.addMeta(GridNioSessionMetaKey.MSG_WRITER.ordinal(), writer);
                }
                catch (IgniteCheckedException e) {
                    throw new IOException("Failed to create message writer.", e);
                }
            }
            if (req == null && (req = this.systemMessage(ses)) == null && (req = ses.pollFuture()) == null && buf.position() == 0) {
                GridNioServer.this.stopPollingForWrite(key, ses);
                return;
            }
            boolean finished = false;
            if (req != null) {
                finished = this.writeToBuffer(ses, buf, req, writer);
            }
            while (finished) {
                req.onMessageWritten();
                req = this.systemMessage(ses);
                if (req == null) {
                    req = ses.pollFuture();
                }
                if (req == null) break;
                finished = this.writeToBuffer(ses, buf, req, writer);
            }
            buf.flip();
            assert (buf.hasRemaining());
            if (!GridNioServer.this.skipWrite) {
                int cnt = sockCh.write(buf);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
                }
                if (GridNioServer.this.sentBytesCntMetric != null) {
                    GridNioServer.this.sentBytesCntMetric.add(cnt);
                }
                ses.bytesSent(cnt);
                this.onWrite(cnt);
            } else {
                try {
                    U.sleep(50L);
                }
                catch (IgniteInterruptedCheckedException e) {
                    throw new IOException("Thread has been interrupted.", e);
                }
            }
            if (buf.hasRemaining() || !finished) {
                buf.compact();
                ses.addMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal(), req);
            } else {
                buf.clear();
            }
        }

        private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, SessionWriteRequest req, MessageWriter writer) {
            Message msg = (Message)req.message();
            assert (msg != null) : req;
            Span span = GridNioServer.this.tracing.create(SpanType.COMMUNICATION_SOCKET_WRITE, req.span());
            try (MTC.TraceSurroundings ignore = span.equals(NoopSpan.INSTANCE) ? null : MTC.support(span);){
                span.addTag("message", () -> TraceableMessagesTable.traceName(msg));
                if (writer != null) {
                    writer.setCurrentWriteClass(msg.getClass());
                }
                int startPos = buf.position();
                boolean finished = msg.writeTo(buf, writer);
                span.addTag("socket.write.bytes", () -> Integer.toString(buf.position() - startPos));
                if (finished) {
                    GridNioServer.this.onMessageWritten(ses, msg);
                    if (writer != null) {
                        writer.reset();
                    }
                }
                boolean bl = finished;
                return bl;
            }
        }

        @Override
        public String toString() {
            return S.toString(DirectNioClientWorker.class, this, super.toString());
        }
    }

    private class ByteBufferNioClientWorker
    extends AbstractNioClientWorker {
        private final ByteBuffer readBuf;

        protected ByteBufferNioClientWorker(@Nullable int idx, String igniteInstanceName, String name, @Nullable IgniteLogger log, GridWorkerListener workerLsnr) throws IgniteCheckedException {
            super(idx, igniteInstanceName, name, log, workerLsnr);
            this.readBuf = GridNioServer.this.directBuf ? ByteBuffer.allocateDirect(8192) : ByteBuffer.allocate(8192);
            this.readBuf.order(GridNioServer.this.order);
        }

        @Override
        protected void processRead(SelectionKey key) throws IOException {
            if (GridNioServer.this.skipRead) {
                try {
                    U.sleep(50L);
                }
                catch (IgniteInterruptedCheckedException ignored) {
                    U.warn(this.log, "Sleep has been interrupted.");
                }
                return;
            }
            ReadableByteChannel sockCh = (ReadableByteChannel)((Object)key.channel());
            GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
            this.readBuf.clear();
            int cnt = sockCh.read(this.readBuf);
            if (cnt == -1) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Remote client closed connection: " + ses);
                }
                this.close(ses, null);
                return;
            }
            if (cnt == 0) {
                return;
            }
            if (this.log.isTraceEnabled()) {
                this.log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']');
            }
            if (GridNioServer.this.rcvdBytesCntMetric != null) {
                GridNioServer.this.rcvdBytesCntMetric.add(cnt);
            }
            ses.bytesReceived(cnt);
            this.readBuf.flip();
            try {
                assert (this.readBuf.hasRemaining());
                GridNioServer.this.filterChain.onMessageReceived(ses, this.readBuf);
                if (this.readBuf.remaining() > 0) {
                    LT.warn(this.log, "Read buffer contains data after filter chain processing (will discard remaining bytes) [ses=" + ses + ", remainingCnt=" + this.readBuf.remaining() + ']');
                    this.readBuf.clear();
                }
            }
            catch (IgniteCheckedException e) {
                this.close(ses, e);
            }
        }

        @Override
        protected void processWrite(SelectionKey key) throws IOException {
            WritableByteChannel sockCh = (WritableByteChannel)((Object)key.channel());
            GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
            while (true) {
                ByteBuffer buf = (ByteBuffer)ses.removeMeta(BUF_META_KEY);
                SessionWriteRequest req = (SessionWriteRequest)ses.removeMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal());
                if (buf == null) {
                    assert (req == null);
                    req = ses.pollFuture();
                    if (req == null) {
                        GridNioServer.this.stopPollingForWrite(key, ses);
                        break;
                    }
                    buf = (ByteBuffer)req.message();
                }
                if (!GridNioServer.this.skipWrite) {
                    Span span = GridNioServer.this.tracing.create(SpanType.COMMUNICATION_SOCKET_WRITE, req.span());
                    try (MTC.TraceSurroundings ignore = span.equals(NoopSpan.INSTANCE) ? null : MTC.support(span);){
                        int cnt = sockCh.write(buf);
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']');
                        }
                        span.addTag("socket.write.bytes", () -> Integer.toString(cnt));
                        if (GridNioServer.this.sentBytesCntMetric != null) {
                            GridNioServer.this.sentBytesCntMetric.add(cnt);
                        }
                        ses.bytesSent(cnt);
                    }
                }
                try {
                    U.sleep(50L);
                }
                catch (IgniteInterruptedCheckedException e) {
                    throw new IOException("Thread has been interrupted.", e);
                }
                if (buf.remaining() > 0) {
                    ses.addMeta(BUF_META_KEY, buf);
                    ses.addMeta(GridNioSessionMetaKey.NIO_OPERATION.ordinal(), req);
                    break;
                }
                assert (req != null);
                req.onMessageWritten();
            }
        }

        @Override
        public String toString() {
            return S.toString(ByteBufferNioClientWorker.class, this, super.toString());
        }
    }
}

