package io.nats.client;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import io.nats.client.Constants;
import io.nats.client.Parser;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/nats/client/ConnectionImpl.class */
public class ConnectionImpl implements Connection {
    final Logger logger;
    String version;
    private static final String inboxPrefix = "_INBOX.";
    public Constants.ConnState status;
    protected static final String STALE_CONNECTION = "Stale Connection";
    protected static final String NATS_EXEC = "jnats-exec";
    protected static final String NATS_SUB = "jnats-sub";
    protected static final String NATS_CB = "jnats-callbacks";
    protected static final String LANG_STRING = "java";
    protected static final int DEFAULT_BUF_SIZE = 65536;
    protected static final int DEFAULT_STREAM_BUF_SIZE = 65536;
    protected static final int FLUSH_CHAN_SIZE = 1;
    protected long flushTimerInterval;
    protected TimeUnit flushTimerUnit;
    public static final String _CRLF_ = "\r\n";
    public static final String _EMPTY_ = "";
    public static final String _SPC_ = " ";
    public static final String _PUB_P_ = "PUB ";
    public static final String _OK_OP_ = "+OK";
    public static final String _ERR_OP_ = "-ERR";
    public static final String _MSG_OP_ = "MSG";
    public static final String _PING_OP_ = "PING";
    public static final String _PONG_OP_ = "PONG";
    public static final String _INFO_OP_ = "INFO";
    public static final String CONN_PROTO = "CONNECT %s\r\n";
    public static final String PING_PROTO = "PING\r\n";
    public static final String PONG_PROTO = "PONG\r\n";
    public static final String PUB_PROTO = "PUB %s %s %d\r\n";
    public static final String SUB_PROTO = "SUB %s%s %d\r\n";
    public static final String UNSUB_PROTO = "UNSUB %d %s\r\n";
    public static final String OK_PROTO = "+OK\r\n";
    private ConnectionImpl nc;
    protected final Lock mu;
    private AtomicLong sidCounter;
    private URI url;
    protected Options opts;
    private TcpConnectionFactory tcf;
    TcpConnection conn;
    ByteBuffer pubProtoBuf;
    private OutputStream bw;
    private InputStream br;
    private ByteArrayOutputStream pending;
    protected Map<Long, SubscriptionImpl> subs;
    protected List<Srv> srvPool;
    protected Map<String, URI> urls;
    private Exception lastEx;
    private ServerInfo info;
    private int pout;
    protected Parser parser;
    protected Parser.ParseState ps;
    protected byte[] pingProtoBytes;
    protected int pingProtoBytesLen;
    protected byte[] pongProtoBytes;
    protected int pongProtoBytesLen;
    protected byte[] pubPrimBytes;
    protected int pubPrimBytesLen;
    protected byte[] crlfProtoBytes;
    protected int crlfProtoBytesLen;
    protected Statistics stats;
    private ArrayList<BlockingQueue<Boolean>> pongs;
    ExecutorService cbexec;
    ExecutorService exec;
    ExecutorService subexec;
    ScheduledExecutorService scheduler;
    private ScheduledFuture<?> ftmr;
    private ScheduledFuture<?> ptmr;
    private List<Future<?>> tasks;
    private static final int NUM_WATCHER_THREADS = 2;
    private CountDownLatch socketWatchersStartLatch;
    private CountDownLatch socketWatchersDoneLatch;
    private BlockingQueue<Boolean> fch;
    static final byte[] digits = {48, 49, 50, 51, 52, 53, 54, 55, 56, 57};

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$ClientProto.class */
    public enum ClientProto {
        CLIENT_PROTO_ZERO(0),
        CLIENT_PROTO_INFO(ConnectionImpl.FLUSH_CHAN_SIZE);

        private final int value;

        ClientProto(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$ConnectInfo.class */
    public class ConnectInfo {

        @SerializedName("verbose")
        private Boolean verbose;

        @SerializedName("pedantic")
        private Boolean pedantic;

        @SerializedName("user")
        private String user;

        @SerializedName("pass")
        private String pass;

        @SerializedName("auth_token")
        private String token;

        @SerializedName("tls_required")
        private Boolean tlsRequired;

        @SerializedName("name")
        private String name;

        @SerializedName("lang")
        private String lang;

        @SerializedName("version")
        private String version;

        @SerializedName("protocol")
        private int protocol;
        private transient Gson gson = new GsonBuilder().create();

        public ConnectInfo(boolean z, boolean z2, String str, String str2, String str3, boolean z3, String str4, String str5, String str6, ClientProto clientProto) {
            this.lang = ConnectionImpl.LANG_STRING;
            this.version = ConnectionImpl.this.version;
            this.verbose = new Boolean(z);
            this.pedantic = new Boolean(z2);
            this.user = str;
            this.pass = str2;
            this.token = str3;
            this.tlsRequired = new Boolean(z3);
            this.name = str4;
            this.lang = str5;
            this.version = str6;
            this.protocol = clientProto.getValue();
        }

        public String toString() {
            return this.gson.toJson(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$Control.class */
    public class Control {
        String op;
        String args;

        protected Control(String str) {
            this.op = null;
            this.args = null;
            if (str == null) {
                return;
            }
            String[] split = str.split(ConnectionImpl._SPC_, 2);
            switch (split.length) {
                case ConnectionImpl.FLUSH_CHAN_SIZE /* 1 */:
                    this.op = split[0].trim();
                    return;
                case 2:
                    this.op = split[0].trim();
                    this.args = split[ConnectionImpl.FLUSH_CHAN_SIZE].trim();
                    if (this.args.isEmpty()) {
                        this.args = null;
                        return;
                    }
                    return;
                default:
                    return;
            }
        }

        public String toString() {
            return "{op=" + this.op + ", args=" + this.args + "}";
        }
    }

    /* loaded from: input_file:io/nats/client/ConnectionImpl$FlushTimerTask.class */
    class FlushTimerTask extends TimerTask {
        FlushTimerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ConnectionImpl.this.flushSocket();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$PingTimerTask.class */
    public class PingTimerTask extends TimerTask {
        PingTimerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ConnectionImpl.this.mu.lock();
            if (ConnectionImpl.this.status != Constants.ConnState.CONNECTED) {
                ConnectionImpl.this.mu.unlock();
                return;
            }
            ConnectionImpl.this.setActualPingsOutstanding(ConnectionImpl.this.getActualPingsOutstanding() + ConnectionImpl.FLUSH_CHAN_SIZE);
            if (ConnectionImpl.this.getActualPingsOutstanding() > ConnectionImpl.this.opts.getMaxPingsOut()) {
                ConnectionImpl.this.mu.unlock();
                ConnectionImpl.this.processOpError(new IOException(Constants.ERR_STALE_CONNECTION));
            } else {
                ConnectionImpl.this.sendPing(null);
                ConnectionImpl.this.mu.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$Srv.class */
    public class Srv {
        URI url;
        int reconnects = 0;
        long lastAttempt = 0;
        long lastAttemptNanos = 0;
        boolean secure;

        protected Srv(URI uri) {
            this.url = null;
            this.secure = false;
            this.url = uri;
            if (uri.getScheme().equals("tls")) {
                this.secure = true;
            }
        }

        void updateLastAttempt() {
            this.lastAttemptNanos = System.nanoTime();
            this.lastAttempt = System.currentTimeMillis();
        }

        long timeSinceLastAttempt() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.lastAttemptNanos);
        }

        public String toString() {
            return String.format("{url=%s, reconnects=%d, lastAttempt=%s, timeSinceLastAttempt=%dms}", this.url.toString(), Integer.valueOf(this.reconnects), new SimpleDateFormat("MM/dd/yyyy hh:mm:ss").format(new Date(this.lastAttempt)), Long.valueOf(timeSinceLastAttempt()));
        }
    }

    ConnectionImpl() {
        this.logger = LoggerFactory.getLogger(ConnectionImpl.class);
        this.version = null;
        this.status = Constants.ConnState.DISCONNECTED;
        this.flushTimerInterval = 1L;
        this.flushTimerUnit = TimeUnit.MICROSECONDS;
        this.nc = null;
        this.mu = new ReentrantLock();
        this.sidCounter = new AtomicLong();
        this.url = null;
        this.opts = null;
        this.tcf = null;
        this.conn = null;
        this.pubProtoBuf = null;
        this.bw = null;
        this.br = null;
        this.pending = null;
        this.subs = new ConcurrentHashMap();
        this.srvPool = null;
        this.urls = null;
        this.lastEx = null;
        this.info = null;
        this.parser = new Parser(this);
        Parser parser = this.parser;
        parser.getClass();
        this.ps = new Parser.ParseState();
        this.pingProtoBytes = null;
        this.pingProtoBytesLen = 0;
        this.pongProtoBytes = null;
        this.pongProtoBytesLen = 0;
        this.pubPrimBytes = null;
        this.pubPrimBytesLen = 0;
        this.crlfProtoBytes = null;
        this.crlfProtoBytesLen = 0;
        this.stats = null;
        this.ftmr = null;
        this.ptmr = null;
        this.tasks = new ArrayList();
        this.socketWatchersStartLatch = new CountDownLatch(2);
        this.socketWatchersDoneLatch = null;
    }

    ConnectionImpl(Options options) {
        this(options, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionImpl(Options options, TcpConnectionFactory tcpConnectionFactory) {
        this.logger = LoggerFactory.getLogger(ConnectionImpl.class);
        this.version = null;
        this.status = Constants.ConnState.DISCONNECTED;
        this.flushTimerInterval = 1L;
        this.flushTimerUnit = TimeUnit.MICROSECONDS;
        this.nc = null;
        this.mu = new ReentrantLock();
        this.sidCounter = new AtomicLong();
        this.url = null;
        this.opts = null;
        this.tcf = null;
        this.conn = null;
        this.pubProtoBuf = null;
        this.bw = null;
        this.br = null;
        this.pending = null;
        this.subs = new ConcurrentHashMap();
        this.srvPool = null;
        this.urls = null;
        this.lastEx = null;
        this.info = null;
        this.parser = new Parser(this);
        Parser parser = this.parser;
        parser.getClass();
        this.ps = new Parser.ParseState();
        this.pingProtoBytes = null;
        this.pingProtoBytesLen = 0;
        this.pongProtoBytes = null;
        this.pongProtoBytesLen = 0;
        this.pubPrimBytes = null;
        this.pubPrimBytesLen = 0;
        this.crlfProtoBytes = null;
        this.crlfProtoBytesLen = 0;
        this.stats = null;
        this.ftmr = null;
        this.ptmr = null;
        this.tasks = new ArrayList();
        this.socketWatchersStartLatch = new CountDownLatch(2);
        this.socketWatchersDoneLatch = null;
        this.version = getProperties("jnats.properties").getProperty("client.version");
        this.nc = this;
        this.opts = options;
        this.stats = new Statistics();
        if (tcpConnectionFactory != null) {
            this.tcf = tcpConnectionFactory;
        } else {
            this.tcf = new TcpConnectionFactory();
        }
        setTcpConnection(this.tcf.createConnection());
        this.sidCounter.set(0L);
        this.pingProtoBytes = PING_PROTO.getBytes();
        this.pingProtoBytesLen = this.pingProtoBytes.length;
        this.pongProtoBytes = PONG_PROTO.getBytes();
        this.pongProtoBytesLen = this.pongProtoBytes.length;
        this.pubPrimBytes = _PUB_P_.getBytes();
        this.pubPrimBytesLen = this.pubPrimBytes.length;
        this.crlfProtoBytes = _CRLF_.getBytes();
        this.crlfProtoBytesLen = this.crlfProtoBytes.length;
        buildPublishProtocolBuffer(1024);
        setupServerPool();
    }

    void setup() {
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.cbexec = Executors.newSingleThreadExecutor(new NatsThreadFactory(NATS_CB));
        this.exec = Executors.newCachedThreadPool(new NatsThreadFactory(NATS_EXEC));
        this.subexec = Executors.newCachedThreadPool(new NatsThreadFactory(NATS_SUB));
        this.fch = createFlushChannel();
        this.pongs = createPongs();
        this.subs.clear();
    }

    protected Properties getProperties(InputStream inputStream) {
        Properties properties = new Properties();
        if (inputStream == null) {
            properties = null;
        } else {
            try {
                properties.load(inputStream);
            } catch (IOException e) {
                this.logger.warn("nats: error loading properties from InputStream", e);
                properties = null;
            }
        }
        return properties;
    }

    protected Properties getProperties(String str) {
        return getProperties(getClass().getClassLoader().getResourceAsStream(str));
    }

    private void buildPublishProtocolBuffer(int i) {
        this.pubProtoBuf = ByteBuffer.allocate(i);
        this.pubProtoBuf.put(this.pubPrimBytes, 0, this.pubPrimBytesLen);
        this.pubProtoBuf.mark();
    }

    protected void setupServerPool() {
        URI url = this.opts.getUrl();
        List<URI> servers = this.opts.getServers();
        this.srvPool = new ArrayList();
        this.urls = new ConcurrentHashMap();
        if (servers != null) {
            Iterator<URI> it = servers.iterator();
            while (it.hasNext()) {
                addUrlToPool(it.next());
            }
        }
        if (!this.opts.isNoRandomize()) {
            Collections.shuffle(this.srvPool, new Random(System.nanoTime()));
        }
        if (url != null) {
            this.srvPool.add(0, new Srv(url));
            this.urls.put(url.getAuthority(), url);
        }
        if (this.srvPool.isEmpty()) {
            addUrlToPool("nats://localhost:4222");
        }
        setUrl(this.srvPool.get(0).url);
    }

    void addUrlToPool(String str) {
        URI create = URI.create(str);
        this.srvPool.add(new Srv(create));
        this.urls.put(create.getAuthority(), create);
    }

    void addUrlToPool(URI uri) {
        this.srvPool.add(new Srv(uri));
        this.urls.put(uri.getAuthority(), uri);
    }

    protected Srv currentServer() {
        Srv srv = null;
        Iterator<Srv> it = this.srvPool.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Srv next = it.next();
            if (next.url.equals(getUrl())) {
                srv = next;
                break;
            }
        }
        return srv;
    }

    protected Srv selectNextServer() throws IOException {
        this.logger.trace("In selectNextServer()");
        Srv currentServer = currentServer();
        if (currentServer == null) {
            throw new IOException(Constants.ERR_NO_SERVERS);
        }
        this.logger.trace("selectNextServer, removing {}", currentServer);
        this.srvPool.remove(currentServer);
        int maxReconnect = this.opts.getMaxReconnect();
        if (maxReconnect < 0 || currentServer.reconnects < maxReconnect) {
            this.logger.trace("selectNextServer: adding {}, maxReconnect: {}", currentServer, Integer.valueOf(maxReconnect));
            this.srvPool.add(currentServer);
        }
        if (!this.srvPool.isEmpty()) {
            return this.srvPool.get(0);
        }
        setUrl(null);
        throw new IOException(Constants.ERR_NO_SERVERS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void connect() throws IOException, TimeoutException {
        IOException iOException = null;
        this.mu.lock();
        for (int i = 0; i < this.srvPool.size(); i += FLUSH_CHAN_SIZE) {
            try {
                Srv srv = this.srvPool.get(i);
                setUrl(srv.url);
                try {
                    this.logger.debug("Connecting to {}", getUrl());
                    createConn();
                    this.logger.debug("Connected to {}", getUrl());
                    setup();
                    try {
                        processConnectInit();
                        this.logger.trace("connect() Resetting reconnects for {}", srv);
                        srv.reconnects = 0;
                        iOException = null;
                        break;
                    } catch (IOException e) {
                        iOException = e;
                        this.logger.trace("{} Exception: {}", getUrl(), e.getMessage());
                        this.mu.unlock();
                        close(Constants.ConnState.DISCONNECTED, false);
                        this.mu.lock();
                        setUrl(null);
                    }
                } catch (IOException e2) {
                    if ((e2 instanceof SocketException) && e2.getMessage() != null && e2.getMessage().contains("Connection refused")) {
                        setLastError(null);
                    }
                }
            } finally {
                this.mu.unlock();
            }
        }
        if (iOException == null && this.status != Constants.ConnState.CONNECTED) {
            iOException = new IOException(Constants.ERR_NO_SERVERS);
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    protected void createConn() throws IOException {
        Srv currentServer = currentServer();
        if (currentServer == null) {
            throw new IOException(Constants.ERR_NO_SERVERS);
        }
        currentServer.updateLastAttempt();
        this.logger.trace("createConn(): {}", currentServer.url);
        try {
            this.logger.trace("Opening {}", currentServer.url);
            this.conn = this.tcf.createConnection();
            this.conn.open(currentServer.url.getHost(), currentServer.url.getPort(), this.opts.getConnectionTimeout());
            this.logger.trace("Opened {}", currentServer.url);
            if (this.pending != null && this.bw != null) {
                this.logger.trace("Flushing old outputstream to pending");
                try {
                    this.bw.flush();
                } catch (IOException e) {
                    this.logger.warn(Constants.ERR_TCP_FLUSH_FAILED);
                }
            }
            this.bw = this.conn.getOutputStream(65536);
            this.br = this.conn.getInputStream(65536);
        } catch (IOException e2) {
            this.logger.debug("Couldn't establish connection to {}: {}", currentServer.url, e2.getMessage());
            throw e2;
        }
    }

    BlockingQueue<Message> createMsgChannel() {
        return createMsgChannel(Integer.MAX_VALUE);
    }

    BlockingQueue<Message> createMsgChannel(int i) {
        int i2 = i;
        if (i2 <= 0) {
            i2 = FLUSH_CHAN_SIZE;
        }
        return new LinkedBlockingQueue(i2);
    }

    BlockingQueue<Boolean> createBooleanChannel() {
        return new LinkedBlockingQueue();
    }

    BlockingQueue<Boolean> createBooleanChannel(int i) {
        int i2 = i;
        if (i2 <= 0) {
            i2 = FLUSH_CHAN_SIZE;
        }
        return new LinkedBlockingQueue(i2);
    }

    BlockingQueue<Boolean> createFlushChannel() {
        return new LinkedBlockingQueue(FLUSH_CHAN_SIZE);
    }

    void clearPendingFlushCalls() {
        if (this.pongs == null) {
            return;
        }
        Iterator<BlockingQueue<Boolean>> it = this.pongs.iterator();
        while (it.hasNext()) {
            BlockingQueue<Boolean> next = it.next();
            if (next != null) {
                next.clear();
                next.add(false);
            }
        }
        this.pongs.clear();
        this.pongs = null;
    }

    @Override // io.nats.client.AbstractConnection, java.lang.AutoCloseable
    public void close() {
        close(Constants.ConnState.CLOSED, true);
    }

    private void close(Constants.ConnState connState, boolean z) {
        this.logger.debug("close({}, {})", connState, String.valueOf(z));
        this.mu.lock();
        try {
            if (_isClosed()) {
                this.status = connState;
                return;
            }
            this.status = Constants.ConnState.CLOSED;
            kickFlusher();
            this.mu.unlock();
            this.mu.lock();
            try {
                clearPendingFlushCalls();
                if (this.ptmr != null) {
                    this.ptmr.cancel(true);
                }
                if (this.ftmr != null) {
                    this.ftmr.cancel(true);
                }
                if (this.conn != null) {
                    try {
                        if (this.bw != null) {
                            this.bw.flush();
                        }
                    } catch (IOException e) {
                    }
                }
                this.logger.trace("Closing subscriptions");
                Iterator<Map.Entry<Long, SubscriptionImpl>> it = this.subs.entrySet().iterator();
                while (it.hasNext()) {
                    SubscriptionImpl value = it.next().getValue();
                    value.lock();
                    try {
                        value.closeChannel();
                        value.closed = true;
                        value.connClosed = true;
                        value.close();
                        value.unlock();
                    } catch (Throwable th) {
                        value.unlock();
                        throw th;
                    }
                }
                this.subs.clear();
                if (z) {
                    if (this.opts.getDisconnectedCallback() != null && this.conn != null) {
                        this.cbexec.execute(new Runnable() { // from class: io.nats.client.ConnectionImpl.1
                            @Override // java.lang.Runnable
                            public void run() {
                                ConnectionImpl.this.opts.getDisconnectedCallback().onDisconnect(new ConnectionEvent(this));
                                ConnectionImpl.this.logger.trace("executed DisconnectedCB");
                            }
                        });
                    }
                    if (this.opts.getClosedCallback() != null) {
                        this.cbexec.execute(new Runnable() { // from class: io.nats.client.ConnectionImpl.2
                            @Override // java.lang.Runnable
                            public void run() {
                                ConnectionImpl.this.opts.getClosedCallback().onClose(new ConnectionEvent(this));
                                ConnectionImpl.this.logger.trace("executed ClosedCB");
                            }
                        });
                    }
                }
                this.status = connState;
                if (this.conn != null) {
                    this.conn.close();
                }
                if (this.exec != null) {
                    this.exec.shutdownNow();
                }
                if (this.subexec != null) {
                    this.subexec.shutdownNow();
                }
                if (this.scheduler != null) {
                    this.scheduler.shutdownNow();
                }
                if (this.cbexec != null) {
                    this.cbexec.shutdown();
                }
                this.mu.unlock();
            } finally {
            }
        } finally {
            this.mu.unlock();
        }
    }

    protected void processConnectInit() throws IOException {
        this.logger.trace("processConnectInit(): {}", getUrl());
        this.status = Constants.ConnState.CONNECTING;
        processExpectedInfo();
        sendConnect();
        setActualPingsOutstanding(0);
        spinUpSocketWatchers();
    }

    private void checkForSecure() throws IOException {
        if (this.opts.isSecure() && !this.info.isTlsRequired()) {
            throw new IOException(Constants.ERR_SECURE_CONN_WANTED);
        }
        if (this.info.isTlsRequired() && !this.opts.isSecure()) {
            throw new IOException(Constants.ERR_SECURE_CONN_REQUIRED);
        }
        if (this.opts.isSecure() || "tls".equals(getUrl().getScheme())) {
            makeTLSConn();
        }
    }

    void makeTLSConn() throws IOException {
        this.conn.setTlsDebug(this.opts.isTlsDebug());
        this.conn.makeTLS(this.opts.getSslContext());
        this.bw = this.conn.getOutputStream(65536);
        this.br = this.conn.getInputStream(65536);
    }

    protected void processExpectedInfo() throws IOException {
        try {
            Control readOp = readOp();
            if (!readOp.op.equals(_INFO_OP_)) {
                throw new IOException(Constants.ERR_NO_INFO_RECEIVED);
            }
            processInfo(readOp.args);
            checkForSecure();
        } catch (IOException e) {
            processOpError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processPing() {
        try {
            sendProto(this.pongProtoBytes, this.pongProtoBytesLen);
        } catch (IOException e) {
            setLastError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processPong() {
        this.logger.trace("Processing PONG");
        BlockingQueue<Boolean> createBooleanChannel = createBooleanChannel(FLUSH_CHAN_SIZE);
        this.mu.lock();
        try {
            if (this.pongs != null && this.pongs.size() > 0) {
                createBooleanChannel = this.pongs.get(0);
                this.pongs.remove(0);
            }
            setActualPingsOutstanding(0);
            if (createBooleanChannel != null) {
                try {
                    createBooleanChannel.put(true);
                } catch (InterruptedException e) {
                    this.logger.warn("processPong interrupted", e);
                    Thread.currentThread().interrupt();
                }
            }
            this.logger.trace("Processed PONG");
        } finally {
            this.mu.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processOk() {
    }

    protected void processInfo(String str) {
        if (str == null || str.isEmpty()) {
            return;
        }
        this.info = ServerInfo.createFromWire(str);
        boolean z = false;
        if (this.info.getConnectUrls() != null) {
            String[] connectUrls = this.info.getConnectUrls();
            int length = connectUrls.length;
            for (int i = 0; i < length; i += FLUSH_CHAN_SIZE) {
                String str2 = connectUrls[i];
                if (!this.urls.containsKey(str2)) {
                    addUrlToPool(String.format("nats://%s", str2));
                    z = FLUSH_CHAN_SIZE;
                }
            }
            if (!z || this.opts.isNoRandomize()) {
                return;
            }
            Collections.shuffle(this.srvPool);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processAsyncInfo(String str) {
        this.mu.lock();
        try {
            processInfo(str);
        } finally {
            this.mu.unlock();
        }
    }

    void processOpError(Exception exc) {
        this.mu.lock();
        try {
            if (isConnecting() || _isClosed() || _isReconnecting()) {
                return;
            }
            if (!this.opts.isReconnectAllowed() || this.status != Constants.ConnState.CONNECTED) {
                this.logger.trace("\t\tcalling processDisconnect() in state {}", this.status);
                processDisconnect();
                setLastError(exc);
                close();
                return;
            }
            this.status = Constants.ConnState.RECONNECTING;
            if (this.ptmr != null) {
                this.ptmr.cancel(true);
            }
            if (this.ftmr != null) {
                this.ftmr.cancel(true);
            }
            if (this.conn != null) {
                try {
                    this.bw.flush();
                } catch (IOException e) {
                    this.logger.error("I/O error during flush", e);
                }
                this.conn.close();
            }
            this.logger.trace("processOpError: redirecting output to pending buffer");
            setPending(new ByteArrayOutputStream(this.opts.getReconnectBufSize()));
            setOutputStream(getPending());
            this.logger.trace("\t\tspawning doReconnect() in state {}", this.status);
            if (this.exec.isShutdown()) {
                this.exec = Executors.newCachedThreadPool(new NatsThreadFactory(NATS_EXEC));
            }
            this.exec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.3
                @Override // java.lang.Runnable
                public void run() {
                    Thread.currentThread().setName("reconnect");
                    ConnectionImpl.this.doReconnect();
                }
            });
            if (this.cbexec.isShutdown()) {
                this.cbexec = Executors.newSingleThreadExecutor(new NatsThreadFactory(NATS_EXEC));
            }
            this.logger.trace("\t\tspawned doReconnect() in state {}", this.status);
        } finally {
            this.mu.unlock();
        }
    }

    protected void processDisconnect() {
        this.logger.trace("processDisconnect()");
        this.status = Constants.ConnState.DISCONNECTED;
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isReconnecting() {
        this.mu.lock();
        try {
            return _isReconnecting();
        } finally {
            this.mu.unlock();
        }
    }

    boolean _isReconnecting() {
        return this.status == Constants.ConnState.RECONNECTING;
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isConnected() {
        this.mu.lock();
        try {
            return _isConnected();
        } finally {
            this.mu.unlock();
        }
    }

    private boolean _isConnected() {
        return this.status == Constants.ConnState.CONNECTING || this.status == Constants.ConnState.CONNECTED;
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isClosed() {
        this.mu.lock();
        try {
            return _isClosed();
        } finally {
            this.mu.unlock();
        }
    }

    boolean _isClosed() {
        return this.status == Constants.ConnState.CLOSED;
    }

    protected void flushReconnectPendingItems() {
        this.logger.trace("flushReconnectPendingItems()");
        if (this.pending == null) {
            return;
        }
        if (this.pending.size() > 0) {
            try {
                this.logger.trace("flushReconnectPendingItems() writing {} bytes.", Integer.valueOf(this.pending.size()));
                this.bw.write(this.pending.toByteArray(), 0, this.pending.size());
                this.bw.flush();
            } catch (IOException e) {
                this.logger.error("Error flushing pending items", e);
            }
        }
        this.pending = null;
        this.logger.trace("flushReconnectPendingItems() DONE");
    }

    /* JADX WARN: Code restructure failed: missing block: B:51:0x01f9, code lost:
    
        setPending(null);
        r5.status = io.nats.client.Constants.ConnState.CONNECTED;
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x020c, code lost:
    
        if (r5.opts.getReconnectedCallback() == null) goto L42;
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x020f, code lost:
    
        r5.logger.trace("Spawning reconnectedCb from doReconnect()");
        r5.cbexec.execute(new io.nats.client.ConnectionImpl.AnonymousClass5(r5));
        r5.logger.trace("Spawned reconnectedCb from doReconnect()");
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x0238, code lost:
    
        r5.mu.unlock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x0241, code lost:
    
        flush();
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x0246, code lost:
    
        r5.mu.lock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:63:0x026d, code lost:
    
        r13 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:66:0x027a, code lost:
    
        throw r13;
     */
    /* JADX WARN: Code restructure failed: missing block: B:67:0x0251, code lost:
    
        r9 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:68:0x0253, code lost:
    
        r5.logger.warn("Error flushing connection", r9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:69:0x0262, code lost:
    
        r5.mu.lock();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void doReconnect() {
        /*
            Method dump skipped, instructions count: 744
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.nats.client.ConnectionImpl.doReconnect():void");
    }

    boolean isConnecting() {
        return this.status == Constants.ConnState.CONNECTING;
    }

    static String normalizeErr(String str) {
        return str.replaceFirst("-ERR\\s+", _EMPTY_).toLowerCase().replaceAll("^'|'$", _EMPTY_);
    }

    static String normalizeErr(ByteBuffer byteBuffer) {
        return normalizeErr(Parser.bufToString(byteBuffer).trim());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processErr(ByteBuffer byteBuffer) {
        String normalizeErr = normalizeErr(byteBuffer);
        this.logger.trace("processErr(error={})", normalizeErr);
        if (STALE_CONNECTION.equalsIgnoreCase(normalizeErr)) {
            processOpError(new IOException(Constants.ERR_STALE_CONNECTION));
            return;
        }
        if (normalizeErr.startsWith("permissions violation")) {
            processPermissionsViolation(normalizeErr);
            return;
        }
        NATSException nATSException = new NATSException("nats: " + normalizeErr);
        nATSException.setConnection(this);
        this.mu.lock();
        try {
            setLastError(nATSException);
            this.mu.unlock();
            close();
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    protected void sendConnect() throws IOException {
        this.logger.trace("sendConnect()");
        this.bw.write(connectProto().getBytes());
        this.logger.trace("=> {}", connectProto().trim());
        this.bw.flush();
        if (this.opts.isVerbose()) {
            String readLine = readLine();
            if (!_OK_OP_.equals(readLine)) {
                throw new IOException(String.format("nats: expected '%s', got '%s'", _OK_OP_, readLine));
            }
        }
        this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
        this.logger.trace("=> {}", new String(this.pingProtoBytes).trim());
        this.bw.flush();
        try {
            this.logger.trace("Awaiting PONG...");
            String readLine2 = readLine();
            if (PONG_PROTO.trim().equals(readLine2)) {
                this.status = Constants.ConnState.CONNECTED;
            } else {
                if (!readLine2.startsWith(_ERR_OP_)) {
                    throw new IOException(String.format("nats: expected '%s', got '%s'", _PONG_OP_, readLine2));
                }
                throw new IOException("nats: " + normalizeErr(readLine2));
            }
        } catch (IOException e) {
            throw new IOException(Constants.ERR_CONNECTION_READ, e);
        }
    }

    protected String readLine() throws IOException {
        BufferedReader bufferedReader = this.conn.getBufferedReader();
        this.logger.trace("readLine() Reading from input stream");
        String readLine = bufferedReader.readLine();
        if (readLine == null) {
            throw new EOFException(Constants.ERR_CONNECTION_CLOSED);
        }
        this.logger.trace("<= {}", readLine != null ? readLine.trim() : "null");
        return readLine;
    }

    protected void sendProto(byte[] bArr, int i) throws IOException {
        this.logger.trace("in sendProto()");
        this.mu.lock();
        try {
            this.logger.trace("in sendProto(), writing");
            this.bw.write(bArr, 0, i);
            this.logger.trace("=> {}", new String(bArr).trim());
            kickFlusher();
        } finally {
            this.mu.unlock();
        }
    }

    String connectProto() {
        String userInfo = getUrl().getUserInfo();
        String str = null;
        String str2 = null;
        String str3 = null;
        if (userInfo != null) {
            String[] split = userInfo.split(":");
            if (split[0].length() > 0) {
                switch (split.length) {
                    case FLUSH_CHAN_SIZE /* 1 */:
                        str3 = split[0];
                        break;
                    case 2:
                        str = split[0];
                        str2 = split[FLUSH_CHAN_SIZE];
                        break;
                }
            }
        } else {
            str = this.opts.getUsername();
            str2 = this.opts.getPassword();
            str3 = this.opts.getToken();
        }
        return String.format(CONN_PROTO, new ConnectInfo(this.opts.isVerbose(), this.opts.isPedantic(), str, str2, str3, this.opts.isSecure(), this.opts.getConnectionName(), LANG_STRING, this.version, ClientProto.CLIENT_PROTO_INFO));
    }

    protected Control readOp() throws IOException {
        Control control = new Control(readLine());
        this.logger.trace("readOp returning: " + control);
        return control;
    }

    private void waitForExits() {
        kickFlusher();
        if (this.socketWatchersDoneLatch != null) {
            try {
                this.socketWatchersDoneLatch.await();
            } catch (InterruptedException e) {
                this.logger.warn("nats: interrupted waiting for threads to exit");
                Thread.currentThread().interrupt();
            }
        }
    }

    protected void spinUpSocketWatchers() {
        this.logger.trace("Spinning up threads");
        waitForExits();
        this.socketWatchersDoneLatch = new CountDownLatch(2);
        this.socketWatchersStartLatch = new CountDownLatch(2);
        this.tasks.add(this.exec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.6
            @Override // java.lang.Runnable
            public void run() {
                ConnectionImpl.this.logger.debug("readloop starting...");
                Thread.currentThread().setName("readloop");
                ConnectionImpl.this.socketWatchersStartLatch.countDown();
                try {
                    ConnectionImpl.this.socketWatchersStartLatch.await();
                    ConnectionImpl.this.readLoop();
                } catch (InterruptedException e) {
                    ConnectionImpl.this.logger.debug("Interrupted", e);
                }
                ConnectionImpl.this.socketWatchersDoneLatch.countDown();
                ConnectionImpl.this.logger.debug("readloop exiting");
            }
        }));
        this.tasks.add(this.exec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.7
            @Override // java.lang.Runnable
            public void run() {
                ConnectionImpl.this.logger.debug("flusher starting...");
                Thread.currentThread().setName("flusher");
                ConnectionImpl.this.socketWatchersStartLatch.countDown();
                try {
                    ConnectionImpl.this.socketWatchersStartLatch.await();
                    ConnectionImpl.this.flusher();
                } catch (InterruptedException e) {
                    ConnectionImpl.this.logger.debug("Interrupted", e);
                }
                ConnectionImpl.this.socketWatchersDoneLatch.countDown();
                ConnectionImpl.this.logger.debug("flusher exiting");
            }
        }));
        this.socketWatchersStartLatch.countDown();
        resetPingTimer();
    }

    /* JADX WARN: Code restructure failed: missing block: B:24:0x00a1, code lost:
    
        throw new java.io.IOException(io.nats.client.Constants.ERR_STALE_CONNECTION);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void readLoop() {
        /*
            Method dump skipped, instructions count: 238
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.nats.client.ConnectionImpl.readLoop():void");
    }

    protected void waitForMsgs(AsyncSubscriptionImpl asyncSubscriptionImpl) throws InterruptedException {
        long j = 0;
        while (true) {
            asyncSubscriptionImpl.lock();
            try {
                BlockingQueue<Message> channel = asyncSubscriptionImpl.getChannel();
                while (channel.size() == 0 && !asyncSubscriptionImpl.isClosed()) {
                    asyncSubscriptionImpl.pCond.await();
                }
                Message poll = channel.poll();
                if (poll != null) {
                    asyncSubscriptionImpl.pMsgs -= FLUSH_CHAN_SIZE;
                    asyncSubscriptionImpl.pBytes -= poll.getData() == null ? 0 : poll.getData().length;
                }
                MessageHandler messageHandler = asyncSubscriptionImpl.msgHandler;
                long j2 = asyncSubscriptionImpl.max;
                boolean isClosed = asyncSubscriptionImpl.isClosed();
                if (!isClosed) {
                    asyncSubscriptionImpl.delivered++;
                    j = asyncSubscriptionImpl.delivered;
                }
                if (isClosed) {
                    return;
                }
                if (poll != null && (j2 <= 0 || j <= j2)) {
                    messageHandler.onMessage(poll);
                }
                if (j2 > 0 && j >= j2) {
                    this.mu.lock();
                    try {
                        removeSub(asyncSubscriptionImpl);
                        this.mu.unlock();
                        return;
                    } catch (Throwable th) {
                        this.mu.unlock();
                        throw th;
                    }
                }
            } finally {
                asyncSubscriptionImpl.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMsg(byte[] bArr, int i, int i2) {
        this.mu.lock();
        try {
            this.stats.incrementInMsgs();
            this.stats.incrementInBytes(i2);
            SubscriptionImpl subscriptionImpl = this.subs.get(Long.valueOf(this.ps.ma.sid));
            if (subscriptionImpl == null) {
                return;
            }
            Message message = new Message(this.ps.ma, subscriptionImpl, bArr, i, i2);
            subscriptionImpl.lock();
            try {
                subscriptionImpl.pMsgs += FLUSH_CHAN_SIZE;
                if (subscriptionImpl.pMsgs > subscriptionImpl.pMsgsMax) {
                    subscriptionImpl.pMsgsMax = subscriptionImpl.pMsgs;
                }
                subscriptionImpl.pBytes += message.getData() == null ? 0 : message.getData().length;
                if (subscriptionImpl.pBytes > subscriptionImpl.pBytesMax) {
                    subscriptionImpl.pBytesMax = subscriptionImpl.pBytes;
                }
                if ((subscriptionImpl.pMsgsLimit > 0 && subscriptionImpl.pMsgs > subscriptionImpl.pMsgsLimit) || (subscriptionImpl.pBytesLimit > 0 && subscriptionImpl.pBytes > subscriptionImpl.pBytesLimit)) {
                    handleSlowConsumer(subscriptionImpl, message);
                    subscriptionImpl.unlock();
                    this.mu.unlock();
                    return;
                }
                if (subscriptionImpl.getChannel() != null) {
                    if (!subscriptionImpl.getChannel().add(message)) {
                        handleSlowConsumer(subscriptionImpl, message);
                        subscriptionImpl.unlock();
                        this.mu.unlock();
                        return;
                    }
                    subscriptionImpl.pCond.signal();
                    subscriptionImpl.setSlowConsumer(false);
                }
                subscriptionImpl.unlock();
                this.mu.unlock();
            } catch (Throwable th) {
                subscriptionImpl.unlock();
                throw th;
            }
        } finally {
            this.mu.unlock();
        }
    }

    protected void handleSlowConsumer(SubscriptionImpl subscriptionImpl, Message message) {
        subscriptionImpl.dropped += FLUSH_CHAN_SIZE;
        processSlowConsumer(subscriptionImpl);
        subscriptionImpl.pMsgs -= FLUSH_CHAN_SIZE;
        if (message.getData() != null) {
            subscriptionImpl.pBytes -= message.getData().length;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSub(SubscriptionImpl subscriptionImpl) {
        this.subs.remove(Long.valueOf(subscriptionImpl.getSid()));
        subscriptionImpl.lock();
        try {
            if (subscriptionImpl.getChannel() != null) {
                subscriptionImpl.mch.clear();
                subscriptionImpl.mch = null;
            }
            subscriptionImpl.setConnection(null);
            subscriptionImpl.closed = true;
        } finally {
            subscriptionImpl.unlock();
        }
    }

    void processSlowConsumer(SubscriptionImpl subscriptionImpl) {
        IOException iOException = new IOException(Constants.ERR_SLOW_CONSUMER);
        final NATSException nATSException = new NATSException(iOException, this, subscriptionImpl);
        setLastError(iOException);
        if (this.opts.getExceptionHandler() != null && !subscriptionImpl.isSlowConsumer()) {
            this.cbexec.execute(new Runnable() { // from class: io.nats.client.ConnectionImpl.8
                @Override // java.lang.Runnable
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nATSException);
                }
            });
        }
        subscriptionImpl.setSlowConsumer(true);
    }

    void processPermissionsViolation(String str) {
        IOException iOException = new IOException("nats: " + str);
        final NATSException nATSException = new NATSException(iOException);
        nATSException.setConnection(this);
        setLastError(iOException);
        if (this.opts.getExceptionHandler() != null) {
            this.cbexec.execute(new Runnable() { // from class: io.nats.client.ConnectionImpl.9
                @Override // java.lang.Runnable
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nATSException);
                }
            });
        }
    }

    protected boolean removeFlushEntry(BlockingQueue<Boolean> blockingQueue) {
        this.mu.lock();
        try {
            if (this.pongs == null) {
                return false;
            }
            Iterator<BlockingQueue<Boolean>> it = this.pongs.iterator();
            while (it.hasNext()) {
                BlockingQueue<Boolean> next = it.next();
                if (next == blockingQueue) {
                    next.clear();
                    this.pongs.remove(next);
                    this.mu.unlock();
                    return true;
                }
            }
            this.mu.unlock();
            return false;
        } finally {
            this.mu.unlock();
        }
    }

    protected void sendPing(BlockingQueue<Boolean> blockingQueue) {
        if (this.pongs == null) {
            this.pongs = createPongs();
        }
        if (blockingQueue != null) {
            this.pongs.add(blockingQueue);
        }
        try {
            this.bw.write(this.pingProtoBytes, 0, this.pingProtoBytesLen);
            this.logger.trace("=> {}", new String(this.pingProtoBytes).trim());
            this.bw.flush();
        } catch (IOException e) {
            setLastError(e);
        }
    }

    ArrayList<BlockingQueue<Boolean>> createPongs() {
        return new ArrayList<>();
    }

    ScheduledFuture<?> createPingTimer() {
        return this.scheduler.scheduleWithFixedDelay(new PingTimerTask(), this.opts.getPingInterval(), this.opts.getPingInterval(), TimeUnit.MILLISECONDS);
    }

    protected void resetPingTimer() {
        this.mu.lock();
        try {
            if (this.ptmr != null) {
                this.ptmr.cancel(true);
            }
            if (this.opts.getPingInterval() > 0) {
                this.ptmr = createPingTimer();
            }
        } finally {
            this.mu.unlock();
        }
    }

    protected void writeUnsubProto(SubscriptionImpl subscriptionImpl, long j) throws IOException {
        Object[] objArr = new Object[2];
        objArr[0] = Long.valueOf(subscriptionImpl.getSid());
        objArr[FLUSH_CHAN_SIZE] = j > 0 ? Long.toString(j) : _EMPTY_;
        String replaceAll = String.format(UNSUB_PROTO, objArr).replaceAll(" +\r\n", _CRLF_);
        this.bw.write(replaceAll.getBytes());
        this.logger.trace("=> {}", replaceAll.trim());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(SubscriptionImpl subscriptionImpl, int i) throws IOException {
        unsubscribe(subscriptionImpl, i);
    }

    protected void unsubscribe(SubscriptionImpl subscriptionImpl, long j) throws IOException {
        this.mu.lock();
        try {
            if (isClosed()) {
                throw new IllegalStateException(Constants.ERR_CONNECTION_CLOSED);
            }
            SubscriptionImpl subscriptionImpl2 = this.subs.get(Long.valueOf(subscriptionImpl.getSid()));
            if (subscriptionImpl2 == null) {
                return;
            }
            if (j > 0) {
                subscriptionImpl2.setMax(j);
            } else {
                removeSub(subscriptionImpl2);
            }
            if (!_isReconnecting()) {
                writeUnsubProto(subscriptionImpl2, j);
            }
            kickFlusher();
            this.mu.unlock();
        } finally {
            this.mu.unlock();
        }
    }

    protected void kickFlusher() {
        if (this.bw == null || this.fch == null) {
            return;
        }
        this.fch.offer(true);
    }

    protected void flushSocket() {
        this.mu.lock();
        try {
            OutputStream outputStream = this.bw;
            TcpConnection tcpConnection = this.conn;
            BlockingQueue<Boolean> blockingQueue = this.fch;
            this.mu.unlock();
            if (tcpConnection == null || outputStream == null || blockingQueue.poll() == null) {
                return;
            }
            this.mu.lock();
            try {
                try {
                    if (_isConnected() && !isConnecting() && outputStream == this.bw && tcpConnection == this.conn) {
                        outputStream.flush();
                        this.stats.incrementFlushes();
                        this.mu.unlock();
                    }
                } catch (IOException e) {
                    this.logger.debug("I/O exception encountered during flush");
                    setLastError(e);
                    this.mu.unlock();
                }
            } finally {
                this.mu.unlock();
            }
        } finally {
            this.mu.unlock();
        }
    }

    protected void flusher() throws InterruptedException {
        this.mu.lock();
        OutputStream outputStream = this.bw;
        TcpConnection tcpConnection = this.conn;
        BlockingQueue<Boolean> blockingQueue = this.fch;
        this.mu.unlock();
        if (tcpConnection == null || outputStream == null) {
            return;
        }
        this.logger.trace("entering flusher loop...");
        while (blockingQueue.take().booleanValue()) {
            this.mu.lock();
            try {
                try {
                } catch (IOException e) {
                    this.logger.debug("I/O exception encountered during flush");
                    setLastError(e);
                    this.mu.unlock();
                }
                if (!_isConnected() || isConnecting() || outputStream != this.bw || tcpConnection != this.conn) {
                    this.mu.unlock();
                    return;
                }
                outputStream.flush();
                this.stats.incrementFlushes();
                this.mu.unlock();
                sleepInterval(this.flushTimerInterval, this.flushTimerUnit);
            } catch (Throwable th) {
                this.mu.unlock();
                throw th;
            }
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void flush(int i) throws Exception {
        Throwable th = null;
        if (i <= 0) {
            throw new IllegalArgumentException(Constants.ERR_BAD_TIMEOUT);
        }
        this.mu.lock();
        try {
            if (_isClosed()) {
                throw new IllegalStateException(Constants.ERR_CONNECTION_CLOSED);
            }
            BlockingQueue<Boolean> createBooleanChannel = createBooleanChannel(FLUSH_CHAN_SIZE);
            sendPing(createBooleanChannel);
            this.mu.unlock();
            while (true) {
                if (Thread.currentThread().isInterrupted()) {
                    break;
                }
                try {
                    Boolean poll = createBooleanChannel.poll(i, TimeUnit.MILLISECONDS);
                    if (poll == null) {
                        th = new TimeoutException(Constants.ERR_TIMEOUT);
                    } else if (poll.booleanValue() == FLUSH_CHAN_SIZE) {
                        createBooleanChannel.clear();
                    } else {
                        th = new IllegalStateException(Constants.ERR_CONNECTION_CLOSED);
                    }
                } catch (InterruptedException e) {
                    this.logger.debug("flush was interrupted while waiting for PONG", e);
                    Thread.currentThread().interrupt();
                }
            }
            if (th != null) {
                removeFlushEntry(createBooleanChannel);
                throw th;
            }
        } catch (Throwable th2) {
            this.mu.unlock();
            throw th2;
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void flush() throws Exception {
        flush(60000);
    }

    protected void resendSubscriptions() {
        long j = 0;
        Iterator<Long> it = this.subs.keySet().iterator();
        while (it.hasNext()) {
            SubscriptionImpl subscriptionImpl = this.subs.get(it.next());
            if (subscriptionImpl instanceof AsyncSubscription) {
                ((AsyncSubscriptionImpl) subscriptionImpl).start();
            }
            this.logger.trace("Resending subscriptions:");
            subscriptionImpl.lock();
            try {
                this.logger.trace("Sub = {}", subscriptionImpl);
                if (subscriptionImpl.max > 0) {
                    if (subscriptionImpl.delivered < subscriptionImpl.max) {
                        j = subscriptionImpl.max - subscriptionImpl.delivered;
                    }
                    if (j == 0) {
                        try {
                            unsubscribe(subscriptionImpl, 0);
                        } catch (Exception e) {
                        }
                    }
                }
                subscriptionImpl.unlock();
                sendSubscriptionMessage(subscriptionImpl);
                if (j > 0) {
                    try {
                        writeUnsubProto(subscriptionImpl, j);
                    } catch (Exception e2) {
                        this.logger.debug("nats: exception while writing UNSUB proto");
                    }
                }
            } finally {
                subscriptionImpl.unlock();
            }
        }
    }

    SubscriptionImpl subscribe(String str, String str2, MessageHandler messageHandler, BlockingQueue<Message> blockingQueue) {
        final SubscriptionImpl syncSubscriptionImpl;
        this.mu.lock();
        try {
            if (_isClosed()) {
                throw new IllegalStateException(Constants.ERR_CONNECTION_CLOSED);
            }
            if (messageHandler == null && blockingQueue == null) {
                throw new IllegalArgumentException(Constants.ERR_BAD_SUBSCRIPTION);
            }
            if (messageHandler != null) {
                syncSubscriptionImpl = new AsyncSubscriptionImpl(this, str, str2, messageHandler);
                this.logger.debug("Starting subscription for subject '{}'", str);
                this.subexec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.10
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ConnectionImpl.this.waitForMsgs((AsyncSubscriptionImpl) syncSubscriptionImpl);
                        } catch (InterruptedException e) {
                            ConnectionImpl.this.logger.debug("Interrupted in waitForMsgs");
                            Thread.currentThread().interrupt();
                        }
                    }
                });
            } else {
                syncSubscriptionImpl = new SyncSubscriptionImpl(this, str, str2);
                syncSubscriptionImpl.setChannel(blockingQueue);
            }
            addSubscription(syncSubscriptionImpl);
            if (!_isReconnecting()) {
                sendSubscriptionMessage(syncSubscriptionImpl);
            }
            kickFlusher();
            SubscriptionImpl subscriptionImpl = syncSubscriptionImpl;
            this.mu.unlock();
            return subscriptionImpl;
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribe(String str) {
        return subscribeSync(str, null);
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribe(String str, String str2) {
        return subscribeSync(str, str2);
    }

    @Override // io.nats.client.AbstractConnection
    public AsyncSubscription subscribe(String str, MessageHandler messageHandler) {
        return (AsyncSubscriptionImpl) subscribe(str, null, messageHandler);
    }

    @Override // io.nats.client.AbstractConnection
    public AsyncSubscription subscribe(String str, String str2, MessageHandler messageHandler) {
        return (AsyncSubscriptionImpl) subscribe(str, str2, messageHandler, null);
    }

    @Override // io.nats.client.AbstractConnection
    @Deprecated
    public AsyncSubscription subscribeAsync(String str, String str2, MessageHandler messageHandler) {
        return (AsyncSubscriptionImpl) subscribe(str, str2, messageHandler, null);
    }

    @Override // io.nats.client.AbstractConnection
    @Deprecated
    public AsyncSubscription subscribeAsync(String str, MessageHandler messageHandler) {
        return (AsyncSubscriptionImpl) subscribe(str, null, messageHandler);
    }

    private void addSubscription(SubscriptionImpl subscriptionImpl) {
        subscriptionImpl.setSid(this.sidCounter.incrementAndGet());
        this.subs.put(Long.valueOf(subscriptionImpl.getSid()), subscriptionImpl);
        this.logger.trace("Successfully added subscription to {} [{}]", subscriptionImpl.getSubject(), Long.valueOf(subscriptionImpl.getSid()));
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribeSync(String str, String str2) {
        return (SyncSubscription) subscribe(str, str2, (MessageHandler) null, createMsgChannel());
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribeSync(String str) {
        return (SyncSubscription) subscribe(str, null, (MessageHandler) null, createMsgChannel());
    }

    void writePublishProto(ByteBuffer byteBuffer, byte[] bArr, byte[] bArr2, int i) {
        this.pubProtoBuf.put(bArr, 0, bArr.length);
        if (bArr2 != null) {
            this.pubProtoBuf.put((byte) 32);
            this.pubProtoBuf.put(bArr2, 0, bArr2.length);
        }
        this.pubProtoBuf.put((byte) 32);
        byte[] bArr3 = new byte[12];
        int length = bArr3.length;
        if (i > 0) {
            int i2 = i;
            while (true) {
                int i3 = i2;
                if (i3 <= 0) {
                    break;
                }
                length--;
                bArr3[length] = digits[i3 % 10];
                i2 = i3 / 10;
            }
        } else {
            length--;
            bArr3[length] = digits[0];
        }
        this.pubProtoBuf.put(bArr3, length, bArr3.length - length);
        this.pubProtoBuf.put(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
    }

    void publish(byte[] bArr, byte[] bArr2, byte[] bArr3, boolean z) throws IOException {
        int length = bArr3 != null ? bArr3.length : 0;
        this.mu.lock();
        try {
            if (length > this.info.getMaxPayload()) {
                throw new IllegalArgumentException(Constants.ERR_MAX_PAYLOAD);
            }
            if (_isClosed()) {
                throw new IllegalStateException(Constants.ERR_CONNECTION_CLOSED);
            }
            if (_isReconnecting()) {
                try {
                    this.bw.flush();
                } catch (IOException e) {
                    this.logger.error("I/O exception during flush", e);
                }
                if (this.pending.size() >= this.opts.getReconnectBufSize()) {
                    throw new IOException(Constants.ERR_RECONNECT_BUF_EXCEEDED);
                }
            }
            try {
                writePublishProto(this.pubProtoBuf, bArr, bArr2, length);
            } catch (BufferOverflowException e2) {
                this.logger.warn("nats: reallocating publish buffer due to overflow");
                buildPublishProtocolBuffer(1024 + bArr.length + (bArr2 != null ? bArr2.length : 0));
                writePublishProto(this.pubProtoBuf, bArr, bArr2, length);
            }
            try {
                this.bw.write(this.pubProtoBuf.array(), 0, this.pubProtoBuf.position());
                this.pubProtoBuf.position(this.pubPrimBytesLen);
                if (length > 0) {
                    this.bw.write(bArr3, 0, length);
                }
                this.bw.write(this.crlfProtoBytes, 0, this.crlfProtoBytesLen);
                this.stats.incrementOutMsgs();
                this.stats.incrementOutBytes(length);
                if (z) {
                    this.bw.flush();
                    this.stats.incrementFlushes();
                } else if (this.fch.isEmpty()) {
                    kickFlusher();
                }
                this.mu.unlock();
            } catch (IOException e3) {
                setLastError(e3);
                this.mu.unlock();
            }
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    @Override // io.nats.client.Connection
    public void publish(String str, String str2, byte[] bArr, boolean z) throws IOException {
        if (str == null) {
            throw new NullPointerException(Constants.ERR_BAD_SUBJECT);
        }
        if (str.isEmpty()) {
            throw new IllegalArgumentException(Constants.ERR_BAD_SUBJECT);
        }
        byte[] bytes = str.getBytes();
        byte[] bArr2 = null;
        if (str2 != null) {
            bArr2 = str2.getBytes();
        }
        publish(bytes, bArr2, bArr, z);
    }

    @Override // io.nats.client.Connection
    public void publish(String str, String str2, byte[] bArr) throws IOException {
        publish(str, str2, bArr, false);
    }

    @Override // io.nats.client.Connection
    public void publish(String str, byte[] bArr) throws IOException {
        publish(str, (String) null, bArr);
    }

    @Override // io.nats.client.Connection
    public void publish(Message message) throws IOException {
        publish(message.getSubjectBytes(), message.getReplyToBytes(), message.getData(), false);
    }

    @Override // io.nats.client.Connection
    public Message request(String str, byte[] bArr, long j, TimeUnit timeUnit) throws TimeoutException, IOException {
        String newInbox = newInbox();
        BlockingQueue<Message> createMsgChannel = createMsgChannel(8);
        Message message = null;
        if (Thread.currentThread().isInterrupted()) {
            Thread.interrupted();
        }
        SyncSubscription syncSubscription = (SyncSubscription) subscribe(newInbox, null, null, createMsgChannel);
        Throwable th = null;
        try {
            syncSubscription.autoUnsubscribe(FLUSH_CHAN_SIZE);
            publish(str, newInbox, bArr);
            try {
                message = syncSubscription.nextMessage(j, timeUnit);
            } catch (InterruptedException e) {
                this.logger.debug("request() interrupted (and cleared)", e);
                Thread.interrupted();
            }
            return message;
        } finally {
            if (syncSubscription != null) {
                if (0 != 0) {
                    try {
                        syncSubscription.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    syncSubscription.close();
                }
            }
        }
    }

    @Override // io.nats.client.Connection
    public Message request(String str, byte[] bArr, long j) throws TimeoutException, IOException {
        return request(str, bArr, j, TimeUnit.MILLISECONDS);
    }

    @Override // io.nats.client.Connection
    public Message request(String str, byte[] bArr) throws IOException, TimeoutException {
        return request(str, bArr, -1L, TimeUnit.MILLISECONDS);
    }

    @Override // io.nats.client.AbstractConnection
    public String newInbox() {
        return String.format("%s%s", inboxPrefix, NUID.nextGlobal());
    }

    @Override // io.nats.client.AbstractConnection
    public synchronized Statistics getStats() {
        return new Statistics(this.stats);
    }

    @Override // io.nats.client.AbstractConnection
    public synchronized void resetStats() {
        this.stats.clear();
    }

    @Override // io.nats.client.AbstractConnection
    public synchronized long getMaxPayload() {
        return this.info.getMaxPayload();
    }

    protected void sendSubscriptionMessage(SubscriptionImpl subscriptionImpl) {
        String queue = subscriptionImpl.getQueue();
        Object[] objArr = new Object[3];
        objArr[0] = subscriptionImpl.getSubject();
        objArr[FLUSH_CHAN_SIZE] = (queue == null || queue.isEmpty()) ? _EMPTY_ : _SPC_ + queue;
        objArr[2] = Long.valueOf(subscriptionImpl.getSid());
        try {
            this.bw.write(String.format(SUB_PROTO, objArr).getBytes());
        } catch (IOException e) {
            this.logger.warn("nats: I/O exception while sending subscription message");
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ClosedCallback getClosedCallback() {
        return this.opts.getClosedCallback();
    }

    @Override // io.nats.client.AbstractConnection
    public void setClosedCallback(ClosedCallback closedCallback) {
        this.mu.lock();
        try {
            this.opts.setClosedCallback(closedCallback);
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public DisconnectedCallback getDisconnectedCallback() {
        return this.opts.getDisconnectedCallback();
    }

    @Override // io.nats.client.AbstractConnection
    public void setDisconnectedCallback(DisconnectedCallback disconnectedCallback) {
        this.mu.lock();
        try {
            this.opts.setDisconnectedCallback(disconnectedCallback);
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ReconnectedCallback getReconnectedCallback() {
        return this.opts.getReconnectedCallback();
    }

    @Override // io.nats.client.AbstractConnection
    public void setReconnectedCallback(ReconnectedCallback reconnectedCallback) {
        this.mu.lock();
        try {
            this.opts.setReconnectedCallback(reconnectedCallback);
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ExceptionHandler getExceptionHandler() {
        return this.opts.getExceptionHandler();
    }

    @Override // io.nats.client.AbstractConnection
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.mu.lock();
        try {
            this.opts.setExceptionHandler(exceptionHandler);
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public String getConnectedUrl() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                return null;
            }
            return getUrl().toString();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public String getConnectedServerId() {
        this.mu.lock();
        try {
            if (this.status != Constants.ConnState.CONNECTED) {
                return null;
            }
            return this.info.getId();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public Constants.ConnState getState() {
        this.mu.lock();
        try {
            return this.status;
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ServerInfo getConnectedServerInfo() {
        return this.info;
    }

    void setConnectedServerInfo(ServerInfo serverInfo) {
        this.info = serverInfo;
    }

    void setConnectedServerInfo(String str) {
        processInfo(str);
    }

    @Override // io.nats.client.AbstractConnection
    public Exception getLastException() {
        return this.lastEx;
    }

    void setLastError(Exception exc) {
        this.lastEx = exc;
    }

    protected Options getOptions() {
        return this.opts;
    }

    void setPending(ByteArrayOutputStream byteArrayOutputStream) {
        this.pending = byteArrayOutputStream;
    }

    ByteArrayOutputStream getPending() {
        return this.pending;
    }

    protected void sleepInterval(long j, TimeUnit timeUnit) throws InterruptedException {
        timeUnit.sleep(j);
    }

    void setOutputStream(OutputStream outputStream) {
        this.mu.lock();
        try {
            this.bw = outputStream;
        } finally {
            this.mu.unlock();
        }
    }

    OutputStream getOutputStream() {
        return this.bw;
    }

    void setInputStream(InputStream inputStream) {
        this.mu.lock();
        try {
            this.br = inputStream;
        } finally {
            this.mu.unlock();
        }
    }

    InputStream getInputStream() {
        return this.br;
    }

    protected ArrayList<BlockingQueue<Boolean>> getPongs() {
        return this.pongs;
    }

    protected void setPongs(ArrayList<BlockingQueue<Boolean>> arrayList) {
        this.pongs = arrayList;
    }

    protected Map<Long, SubscriptionImpl> getSubs() {
        return this.subs;
    }

    protected void setSubs(Map<Long, SubscriptionImpl> map) {
        this.subs = map;
    }

    protected List<Srv> getServerPool() {
        return this.srvPool;
    }

    protected void setServerPool(List<Srv> list) {
        this.srvPool = list;
    }

    @Override // io.nats.client.AbstractConnection
    public int getPendingByteCount() {
        int i = 0;
        if (getPending() != null) {
            i = getPending().size();
        }
        return i;
    }

    protected void setFlushChannel(BlockingQueue<Boolean> blockingQueue) {
        this.fch = blockingQueue;
    }

    protected BlockingQueue<Boolean> getFlushChannel() {
        return this.fch;
    }

    protected void setTcpConnection(TcpConnection tcpConnection) {
        this.conn = tcpConnection;
    }

    protected TcpConnection getTcpConnection() {
        return this.conn;
    }

    protected void setTcpConnectionFactory(TcpConnectionFactory tcpConnectionFactory) {
        this.tcf = tcpConnectionFactory;
    }

    protected TcpConnectionFactory getTcpConnectionFactory() {
        return this.tcf;
    }

    URI getUrl() {
        return this.url;
    }

    void setUrl(URI uri) {
        this.url = uri;
    }

    int getActualPingsOutstanding() {
        return this.pout;
    }

    void setActualPingsOutstanding(int i) {
        this.pout = i;
    }

    ScheduledFuture<?> getPingTimer() {
        return this.ptmr;
    }

    void setPingTimer(ScheduledFuture<?> scheduledFuture) {
        this.ptmr = scheduledFuture;
    }
}
