/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.client.impl;

import java.lang.invoke.MethodHandles;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.DisconnectReason;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.Connector;
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.sm.SecurityManagerShim;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientSessionFactoryImpl
implements ClientSessionFactoryInternal,
ClientConnectionLifeCycleListener {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ServerLocatorInternal serverLocator;
    private final ClientProtocolManager clientProtocolManager;
    private TransportConfiguration connectorConfig;
    private TransportConfiguration previousConnectorConfig;
    private volatile TransportConfiguration currentConnectorConfig;
    private volatile TransportConfiguration backupConnectorConfig;
    private ConnectorFactory connectorFactory;
    private final long callTimeout;
    private final long callFailoverTimeout;
    private final long clientFailureCheckPeriod;
    private final long connectionTTL;
    private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<ClientSessionInternal>();
    private final Object createSessionLock = new Object();
    private final Lock newFailoverLock = new ReentrantLock();
    private final Object connectionLock = new Object();
    private final ExecutorFactory orderedExecutorFactory;
    private final Executor threadPool;
    private final ScheduledExecutorService scheduledThreadPool;
    private final ArtemisExecutor closeExecutor;
    private final Executor flowControlExecutor;
    private RemotingConnection connection;
    private final long retryInterval;
    private final double retryIntervalMultiplier;
    private volatile boolean topologyReady = false;
    private final CountDownLatch latchFinalTopology = new CountDownLatch(1);
    private final long maxRetryInterval;
    private int reconnectAttempts;
    private int failoverAttempts;
    private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<SessionFailureListener>();
    private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<FailoverEventListener>();
    private Connector connector;
    private Future<?> pingerFuture;
    private PingRunnable pingRunnable;
    private final List<Interceptor> incomingInterceptors;
    private final List<Interceptor> outgoingInterceptors;
    private volatile boolean stopPingingAfterOne;
    private volatile boolean closed;
    public final Exception createTrace = new Exception();
    public static final Set<CloseRunnable> CLOSE_RUNNABLES = Collections.synchronizedSet(new HashSet());
    private final ConfirmationWindowWarning confirmationWindowWarning;
    private String primaryNodeID;
    private boolean connectionReadyForWrites;
    private final TransportConfiguration[] connectorConfigs;

    public ClientSessionFactoryImpl(ServerLocatorInternal serverLocator, TransportConfiguration connectorConfig, ServerLocatorConfig locatorConfig, int reconnectAttempts, Executor threadPool, ScheduledExecutorService scheduledThreadPool, Executor flowControlThreadPool, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) {
        this(serverLocator, new Pair<TransportConfiguration, Object>(connectorConfig, null), locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors);
    }

    ClientSessionFactoryImpl(ServerLocatorInternal serverLocator, Pair<TransportConfiguration, TransportConfiguration> connectorConfig, ServerLocatorConfig locatorConfig, int reconnectAttempts, Executor threadPool, ScheduledExecutorService scheduledThreadPool, Executor flowControlThreadPool, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) {
        this(serverLocator, connectorConfig, locatorConfig, reconnectAttempts, threadPool, scheduledThreadPool, flowControlThreadPool, incomingInterceptors, outgoingInterceptors, null);
    }

    ClientSessionFactoryImpl(ServerLocatorInternal serverLocator, Pair<TransportConfiguration, TransportConfiguration> connectorConfig, ServerLocatorConfig locatorConfig, int reconnectAttempts, Executor threadPool, ScheduledExecutorService scheduledThreadPool, Executor flowControlThreadPool, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TransportConfiguration[] connectorConfigs) {
        this.serverLocator = serverLocator;
        this.clientProtocolManager = serverLocator.newProtocolManager();
        this.clientProtocolManager.setSessionFactory(this);
        this.connectorConfig = connectorConfig.getA();
        this.currentConnectorConfig = connectorConfig.getA();
        this.connectorFactory = this.instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName());
        this.checkTransportKeys(this.connectorFactory, connectorConfig.getA());
        this.callTimeout = locatorConfig.callTimeout;
        this.callFailoverTimeout = locatorConfig.callFailoverTimeout;
        if (this.connectorFactory.isReliable() && locatorConfig.clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD && locatorConfig.connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) {
            this.clientFailureCheckPeriod = -1L;
            this.connectionTTL = -1L;
        } else {
            this.clientFailureCheckPeriod = locatorConfig.clientFailureCheckPeriod;
            this.connectionTTL = locatorConfig.connectionTTL;
        }
        this.retryInterval = locatorConfig.retryInterval;
        this.retryIntervalMultiplier = locatorConfig.retryIntervalMultiplier;
        this.maxRetryInterval = locatorConfig.maxRetryInterval;
        this.reconnectAttempts = reconnectAttempts;
        this.failoverAttempts = locatorConfig.failoverAttempts;
        this.scheduledThreadPool = scheduledThreadPool;
        this.threadPool = threadPool;
        this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
        this.flowControlExecutor = new OrderedExecutorFactory(flowControlThreadPool).getExecutor();
        this.closeExecutor = this.orderedExecutorFactory.getExecutor();
        this.incomingInterceptors = incomingInterceptors;
        this.outgoingInterceptors = outgoingInterceptors;
        this.confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
        this.connectionReadyForWrites = true;
        if (connectorConfig.getB() != null) {
            this.backupConnectorConfig = connectorConfig.getB();
        }
        this.connectorConfigs = connectorConfigs;
    }

    @Override
    public Lock lockFailover() {
        this.newFailoverLock.lock();
        return this.newFailoverLock;
    }

    @Override
    public void connect(int initialConnectAttempts) throws ActiveMQException {
        this.getConnectionWithRetry(initialConnectAttempts, null);
        if (this.connection == null) {
            StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(this.currentConnectorConfig);
            if (this.backupConnectorConfig != null) {
                msg.append(" and backup configuration ").append(this.backupConnectorConfig);
            }
            throw new ActiveMQNotConnectedException(msg.toString());
        }
    }

    @Override
    @Deprecated
    public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws ActiveMQException {
        this.connect(initialConnectAttempts);
    }

    @Override
    public TransportConfiguration getConnectorConfiguration() {
        return this.currentConnectorConfig;
    }

    @Override
    public void setBackupConnector(TransportConfiguration primary, TransportConfiguration backUp) {
        Connector localConnector = this.connector;
        if (localConnector == null) {
            localConnector = this.connectorFactory.createConnector(this.currentConnectorConfig.getCombinedParams(), new DelegatingBufferHandler(), this, this.closeExecutor, this.threadPool, this.scheduledThreadPool, this.clientProtocolManager);
        }
        if (!(!localConnector.isEquivalent(primary.getParams()) || backUp == null || localConnector.isEquivalent(backUp.getParams()) || this.serverLocator.getClusterTransportConfiguration() != null && this.serverLocator.getClusterTransportConfiguration().isSameParams(backUp))) {
            logger.debug("Setting up backup config = {} for primary = {}", (Object)backUp, (Object)primary);
            this.backupConnectorConfig = backUp;
        } else if (logger.isDebugEnabled()) {
            logger.debug("ClientSessionFactoryImpl received backup update for primary/backup pair = {} / {}  but it didn't belong to {}", primary, backUp, this.currentConnectorConfig);
        }
    }

    @Override
    public Object getBackupConnector() {
        return this.backupConnectorConfig;
    }

    @Override
    public ClientSession createSession(String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int ackBatchSize, String clientID) throws ActiveMQException {
        return this.createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize, clientID);
    }

    @Override
    public ClientSession createSession(String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int ackBatchSize) throws ActiveMQException {
        return this.createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize, null);
    }

    @Override
    public ClientSession createSession(boolean autoCommitSends, boolean autoCommitAcks, int ackBatchSize) throws ActiveMQException {
        return this.createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, this.serverLocator.isPreAcknowledge(), ackBatchSize, null);
    }

    @Override
    public ClientSession createXASession() throws ActiveMQException {
        return this.createSessionInternal(null, null, true, false, false, this.serverLocator.isPreAcknowledge(), this.serverLocator.getAckBatchSize(), null);
    }

    @Override
    public ClientSession createTransactedSession() throws ActiveMQException {
        return this.createSessionInternal(null, null, false, false, false, this.serverLocator.isPreAcknowledge(), this.serverLocator.getAckBatchSize(), null);
    }

    @Override
    public ClientSession createSession() throws ActiveMQException {
        return this.createSessionInternal(null, null, false, true, true, this.serverLocator.isPreAcknowledge(), this.serverLocator.getAckBatchSize(), null);
    }

    @Override
    public ClientSession createSession(boolean autoCommitSends, boolean autoCommitAcks) throws ActiveMQException {
        return this.createSessionInternal(null, null, false, autoCommitSends, autoCommitAcks, this.serverLocator.isPreAcknowledge(), this.serverLocator.getAckBatchSize(), null);
    }

    @Override
    public ClientSession createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks) throws ActiveMQException {
        return this.createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, this.serverLocator.isPreAcknowledge(), this.serverLocator.getAckBatchSize(), null);
    }

    @Override
    public ClientSession createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge) throws ActiveMQException {
        return this.createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preAcknowledge, this.serverLocator.getAckBatchSize(), null);
    }

    @Override
    public void connectionCreated(ActiveMQComponent component, Connection connection, ClientProtocolManager protocol) {
    }

    @Override
    public void connectionDestroyed(Object connectionID, boolean failed) {
        ActiveMQNotConnectedException ex = ActiveMQClientMessageBundle.BUNDLE.channelDisconnected();
        this.closeExecutor.execute(() -> this.handleConnectionFailure(connectionID, ex));
    }

    @Override
    public void connectionException(Object connectionID, ActiveMQException me) {
        this.handleConnectionFailure(connectionID, me);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeSession(ClientSessionInternal session, boolean failingOver) {
        Set<ClientSessionInternal> set = this.sessions;
        synchronized (set) {
            this.sessions.remove(session);
        }
    }

    @Override
    public void connectionReadyForWrites(Object connectionID, boolean ready) {
    }

    @Override
    public synchronized int numConnections() {
        return this.connection != null ? 1 : 0;
    }

    @Override
    public int numSessions() {
        return this.sessions.size();
    }

    @Override
    public void addFailureListener(SessionFailureListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public boolean removeFailureListener(SessionFailureListener listener) {
        return this.listeners.remove(listener);
    }

    @Override
    public ClientSessionFactoryImpl addFailoverListener(FailoverEventListener listener) {
        this.failoverListeners.add(listener);
        return this;
    }

    @Override
    public boolean removeFailoverListener(FailoverEventListener listener) {
        return this.failoverListeners.remove(listener);
    }

    @Override
    public void causeExit() {
        this.clientProtocolManager.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void interruptConnectAndCloseAllSessions(boolean close) {
        this.latchFinalTopology.countDown();
        this.clientProtocolManager.stop();
        Object object = this.createSessionLock;
        synchronized (object) {
            this.closeCleanSessions(close);
            this.closed = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeCleanSessions(boolean close) {
        HashSet<ClientSessionInternal> sessionsToClose;
        Set<ClientSessionInternal> set = this.sessions;
        synchronized (set) {
            sessionsToClose = new HashSet<ClientSessionInternal>(this.sessions);
        }
        for (ClientSessionInternal session : sessionsToClose) {
            try {
                if (close) {
                    session.close();
                    continue;
                }
                session.cleanUp(false);
            }
            catch (Exception e1) {
                ActiveMQClientLogger.LOGGER.unableToCloseSession(e1);
            }
        }
        this.checkCloseConnection();
    }

    @Override
    public void close() {
        if (this.closed) {
            return;
        }
        this.interruptConnectAndCloseAllSessions(true);
        this.serverLocator.factoryClosed(this);
    }

    @Override
    public void cleanup() {
        if (this.closed) {
            return;
        }
        this.interruptConnectAndCloseAllSessions(false);
    }

    @Override
    public boolean waitForTopology(long timeout, TimeUnit unit) {
        try {
            return this.latchFinalTopology.await(timeout, unit) && this.topologyReady;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (!this.isClosed()) {
                ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e);
            }
            return false;
        }
    }

    @Override
    public boolean isClosed() {
        return this.closed || this.serverLocator.isClosed();
    }

    @Override
    public ServerLocator getServerLocator() {
        return this.serverLocator;
    }

    public void stopPingingAfterOne() {
        this.stopPingingAfterOne = true;
    }

    private void handleConnectionFailure(Object connectionID, ActiveMQException me) {
        this.handleConnectionFailure(connectionID, me, null);
    }

    private void handleConnectionFailure(Object connectionID, ActiveMQException me, String scaleDownTargetNodeID) {
        try {
            this.failoverOrReconnect(connectionID, me, scaleDownTargetNodeID);
        }
        catch (ActiveMQInterruptedException e1) {
            logger.debug(e1.getMessage(), e1);
        }
        catch (Throwable t) {
            ActiveMQClientLogger.LOGGER.unableToHandleConnectionFailure(t);
            this.close();
            throw t;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failoverOrReconnect(Object connectionID, ActiveMQException me, String scaleDownTargetNodeID) {
        HashSet<ClientSessionInternal> sessionsToClose;
        block38: {
            logger.debug("Failure captured on connectionID={}, performing failover or reconnection now", connectionID, (Object)me);
            for (ClientSessionInternal session : this.sessions) {
                ActiveMQSessionContext sessionContext;
                SessionContext context = session.getSessionContext();
                if (!(context instanceof ActiveMQSessionContext) || !(sessionContext = (ActiveMQSessionContext)context).isKilled()) continue;
                this.setReconnectAttempts(0);
            }
            sessionsToClose = null;
            if (!this.clientProtocolManager.isAlive()) {
                return;
            }
            Lock localFailoverLock = this.lockFailover();
            try {
                if (this.connection == null || !this.connection.getID().equals(connectionID) || !this.clientProtocolManager.isAlive()) {
                    return;
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts={}", (Object)this.reconnectAttempts);
                }
                this.callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
                this.callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);
                if (this.reconnectAttempts != 0 || this.failoverAttempts != 0) {
                    if (this.clientProtocolManager.cleanupBeforeFailover(me)) {
                        HashSet<ClientSessionInternal> sessionsToFailover;
                        RemotingConnection oldConnection = this.connection;
                        this.connection = null;
                        Connector localConnector = this.connector;
                        if (localConnector != null) {
                            try {
                                localConnector.close();
                            }
                            catch (Exception exception) {
                                // empty catch block
                            }
                        }
                        this.cancelScheduledTasks();
                        this.connector = null;
                        Set<ClientSessionInternal> set = this.sessions;
                        synchronized (set) {
                            sessionsToFailover = new HashSet<ClientSessionInternal>(this.sessions);
                        }
                        for (ClientSessionInternal session : sessionsToFailover) {
                            session.preHandleFailover(this.connection);
                        }
                        int reconnectRetries = 0;
                        boolean sessionsReconnected = false;
                        BiPredicate<Boolean, Integer> reconnectRetryPredicate = (reconnected, retries) -> this.clientProtocolManager.isAlive() && reconnected == false && (this.reconnectAttempts == -1 || retries < this.reconnectAttempts);
                        while (reconnectRetryPredicate.test(sessionsReconnected, reconnectRetries)) {
                            int remainingReconnectRetries = this.reconnectAttempts == -1 ? -1 : this.reconnectAttempts - reconnectRetries;
                            reconnectRetries += this.getConnectionWithRetry(remainingReconnectRetries, oldConnection);
                            if (this.connection != null && !(sessionsReconnected = this.reconnectSessions(sessionsToFailover, oldConnection, me))) {
                                if (oldConnection != null) {
                                    oldConnection.destroy();
                                }
                                oldConnection = this.connection;
                                this.connection = null;
                            }
                            if (!reconnectRetryPredicate.test(sessionsReconnected, ++reconnectRetries)) continue;
                            this.waitForRetry(this.retryInterval);
                        }
                        int connectorsCount = 0;
                        int failoverRetries = 0;
                        long failoverRetryInterval = this.retryInterval;
                        BiPredicate<Boolean, Integer> failoverRetryPredicate = (reconnected, retries) -> this.clientProtocolManager.isAlive() && reconnected == false && (this.failoverAttempts == -1 || retries < this.failoverAttempts);
                        while (failoverRetryPredicate.test(sessionsReconnected, failoverRetries)) {
                            ++connectorsCount;
                            Pair<TransportConfiguration, TransportConfiguration> connectorPair = this.serverLocator.selectNextConnectorPair();
                            if (connectorPair != null) {
                                this.connectorConfig = connectorPair.getA();
                                this.currentConnectorConfig = connectorPair.getA();
                                if (connectorPair.getB() != null) {
                                    this.backupConnectorConfig = connectorPair.getB();
                                }
                                this.getConnection();
                            }
                            if (this.connection != null && !(sessionsReconnected = this.reconnectSessions(sessionsToFailover, oldConnection, me))) {
                                if (oldConnection != null) {
                                    oldConnection.destroy();
                                }
                                oldConnection = this.connection;
                                this.connection = null;
                            }
                            if (connectorsCount < this.serverLocator.getConnectorsSize()) continue;
                            connectorsCount = 0;
                            if (!failoverRetryPredicate.test(false, ++failoverRetries)) continue;
                            this.waitForRetry(failoverRetryInterval);
                            failoverRetryInterval = this.serverLocator.getNextRetryInterval(failoverRetryInterval, this.retryIntervalMultiplier, this.maxRetryInterval);
                        }
                        for (ClientSessionInternal session : sessionsToFailover) {
                            session.postHandleFailover(this.connection, sessionsReconnected);
                        }
                        if (oldConnection != null) {
                            oldConnection.destroy();
                        }
                        if (this.connection != null) {
                            this.callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
                        }
                    }
                } else {
                    RemotingConnection connectionToDestory = this.connection;
                    if (connectionToDestory != null) {
                        connectionToDestory.destroy();
                    }
                    this.connection = null;
                }
                if (this.connection != null) break block38;
                Set<ClientSessionInternal> set = this.sessions;
                synchronized (set) {
                    sessionsToClose = new HashSet<ClientSessionInternal>(this.sessions);
                }
                this.callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
                this.callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);
            }
            finally {
                localFailoverLock.unlock();
            }
        }
        if (this.connection != null) {
            this.callSessionFailureListeners(me, true, true);
        }
        if (sessionsToClose != null) {
            for (ClientSessionInternal session : sessionsToClose) {
                try {
                    session.cleanUp(true);
                }
                catch (Exception cause) {
                    ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClientSession createSessionInternal(String rawUsername, String rawPassword, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int ackBatchSize, String clientID) throws ActiveMQException {
        String password;
        String username;
        String name = UUIDGenerator.getInstance().generateStringUUID();
        try {
            username = PasswordMaskingUtil.resolveMask(rawUsername, this.serverLocator.getPasswordCodec());
            password = PasswordMaskingUtil.resolveMask(rawPassword, this.serverLocator.getPasswordCodec());
        }
        catch (Exception e) {
            throw new ActiveMQException(e.getMessage(), e, ActiveMQExceptionType.GENERIC_EXCEPTION);
        }
        SessionContext context = this.createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
        ClientSessionImpl session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, this.serverLocator.isBlockOnAcknowledge(), this.serverLocator.isAutoGroup(), ackBatchSize, this.serverLocator.getConsumerWindowSize(), this.serverLocator.getConsumerMaxRate(), this.serverLocator.getConfirmationWindowSize(), this.serverLocator.getProducerWindowSize(), this.serverLocator.getProducerMaxRate(), this.serverLocator.isBlockOnNonDurableSend(), this.serverLocator.isBlockOnDurableSend(), this.serverLocator.isCacheLargeMessagesClient(), this.serverLocator.getMinLargeMessageSize(), this.serverLocator.isCompressLargeMessage(), this.serverLocator.getCompressionLevel(), this.serverLocator.getInitialMessagePacketSize(), this.serverLocator.getGroupID(), this.serverLocator.getOnMessageCloseTimeout(), context, this.orderedExecutorFactory.getExecutor(), this.orderedExecutorFactory.getExecutor(), this.flowControlExecutor, this.orderedExecutorFactory.getExecutor());
        Set<ClientSessionInternal> set = this.sessions;
        synchronized (set) {
            if (this.closed || !this.clientProtocolManager.isAlive()) {
                session.close();
                throw ActiveMQClientMessageBundle.BUNDLE.unableToCreateSession();
            }
            this.sessions.add(session);
        }
        return session;
    }

    private void callSessionFailureListeners(ActiveMQException me, boolean afterReconnect, boolean failedOver) {
        this.callSessionFailureListeners(me, afterReconnect, failedOver, null);
    }

    private void callSessionFailureListeners(ActiveMQException me, boolean afterReconnect, boolean failedOver, String scaleDownTargetNodeID) {
        ArrayList<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(this.listeners);
        for (SessionFailureListener listener : listenersClone) {
            try {
                if (afterReconnect) {
                    listener.connectionFailed(me, failedOver, scaleDownTargetNodeID);
                    continue;
                }
                listener.beforeReconnect(me);
            }
            catch (Throwable t) {
                ActiveMQClientLogger.LOGGER.failedToExecuteListener(t);
            }
        }
    }

    private void callFailoverListeners(FailoverEventType type) {
        ArrayList<FailoverEventListener> listenersClone = new ArrayList<FailoverEventListener>(this.failoverListeners);
        for (FailoverEventListener listener : listenersClone) {
            try {
                listener.failoverEvent(type);
            }
            catch (Throwable t) {
                ActiveMQClientLogger.LOGGER.failedToExecuteListener(t);
            }
        }
    }

    private boolean reconnectSessions(Set<ClientSessionInternal> sessionsToFailover, RemotingConnection oldConnection, ActiveMQException cause) {
        if (this.connection == null) {
            if (!this.clientProtocolManager.isAlive()) {
                ActiveMQClientLogger.LOGGER.failedToConnectToServer();
            }
            return false;
        }
        List<FailureListener> oldListeners = oldConnection.getFailureListeners();
        ArrayList<FailureListener> newListeners = new ArrayList<FailureListener>(this.connection.getFailureListeners());
        for (FailureListener listener : oldListeners) {
            if (listener instanceof DelegatingFailureListener) continue;
            newListeners.add(listener);
        }
        this.connection.setFailureListeners(newListeners);
        ((CoreRemotingConnection)this.connection).syncIDGeneratorSequence(((CoreRemotingConnection)oldConnection).getIDGeneratorSequence());
        boolean sessionFailoverError = false;
        for (ClientSessionInternal session : sessionsToFailover) {
            if (sessionFailoverError) {
                session.getSessionContext().transferConnection(this.connection);
                continue;
            }
            if (session.handleFailover(this.connection, cause)) continue;
            sessionFailoverError = true;
        }
        return !sessionFailoverError;
    }

    private int getConnectionWithRetry(int reconnectAttempts, RemotingConnection oldConnection) {
        if (!this.clientProtocolManager.isAlive()) {
            return 0;
        }
        if (logger.isTraceEnabled()) {
            logger.trace("getConnectionWithRetry::{} with retryInterval = {} multiplier = {}", reconnectAttempts, this.retryInterval, this.retryIntervalMultiplier, new Exception("trace"));
        }
        long interval = this.retryInterval;
        int count = 0;
        while (this.clientProtocolManager.isAlive()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Trying reconnection attempt {}/{}", (Object)count, (Object)reconnectAttempts);
            }
            if (this.getConnection() != null) {
                if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
                    CoreRemotingConnection oldRemotingConnection = (CoreRemotingConnection)oldConnection;
                    ((CoreRemotingConnection)this.connection).setChannelVersion(oldRemotingConnection.getChannelVersion());
                }
                logger.debug("Reconnection successful");
                return count;
            }
            if (reconnectAttempts != 0) {
                if (reconnectAttempts != -1 && ++count == reconnectAttempts) {
                    if (reconnectAttempts != 1) {
                        ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
                    }
                    return count;
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Waiting {} milliseconds before next retry. RetryInterval={} and multiplier={}", interval, this.retryInterval, this.retryIntervalMultiplier);
                }
                if (this.waitForRetry(interval)) {
                    return count;
                }
                interval = this.serverLocator.getNextRetryInterval(interval, this.retryIntervalMultiplier, this.maxRetryInterval);
                continue;
            }
            logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
            return count;
        }
        return count;
    }

    @Override
    public boolean waitForRetry(long interval) {
        try {
            if (this.clientProtocolManager.waitOnLatch(interval)) {
                return true;
            }
        }
        catch (InterruptedException ignore) {
            throw new ActiveMQInterruptedException(this.createTrace);
        }
        return false;
    }

    private void cancelScheduledTasks() {
        PingRunnable pingRunnableLocal;
        Future<?> pingerFutureLocal = this.pingerFuture;
        if (pingerFutureLocal != null) {
            pingerFutureLocal.cancel(false);
        }
        if ((pingRunnableLocal = this.pingRunnable) != null) {
            pingRunnableLocal.cancel();
        }
        this.pingerFuture = null;
        this.pingRunnable = null;
    }

    private void checkCloseConnection() {
        RemotingConnection connectionInUse = this.connection;
        Connector connectorInUse = this.connector;
        if (connectionInUse != null && this.sessions.isEmpty()) {
            this.cancelScheduledTasks();
            try {
                connectionInUse.destroy();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.connection = null;
            try {
                if (connectorInUse != null) {
                    connectorInUse.close();
                }
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.connector = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RemotingConnection getConnection() {
        if (this.closed) {
            throw new IllegalStateException("ClientSessionFactory is closed!");
        }
        if (!this.clientProtocolManager.isAlive()) {
            return null;
        }
        Object object = this.connectionLock;
        synchronized (object) {
            RemotingConnection connection;
            if (this.connection != null) {
                return this.connection;
            }
            this.connection = connection = this.establishNewConnection();
            if (connection != null && this.primaryNodeID != null) {
                try {
                    if (!this.clientProtocolManager.checkForFailover(this.primaryNodeID)) {
                        connection.destroy();
                        this.connection = null;
                        return null;
                    }
                }
                catch (ActiveMQException e) {
                    connection.destroy();
                    this.connection = null;
                    return null;
                }
            }
            if (connection != null && this.serverLocator.getAfterConnectInternalListener() != null) {
                this.serverLocator.getAfterConnectInternalListener().onConnection(this);
            }
            if (this.serverLocator.getTopology() != null) {
                if (connection != null) {
                    logger.trace("{}::Subscribing Topology", (Object)this);
                    this.clientProtocolManager.sendSubscribeTopology(this.serverLocator.isClusterConnection());
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug("serverLocator@{} had no topology", (Object)System.identityHashCode(this.serverLocator));
            }
            return connection;
        }
    }

    protected void schedulePing() {
        if (this.pingerFuture == null) {
            this.pingRunnable = new PingRunnable();
            if (this.clientFailureCheckPeriod != -1L) {
                this.pingerFuture = this.scheduledThreadPool.scheduleWithFixedDelay(new ActualScheduledPinger(this.pingRunnable), 0L, this.clientFailureCheckPeriod, TimeUnit.MILLISECONDS);
            }
            this.pingRunnable.send();
        } else {
            this.pingRunnable.run();
        }
    }

    protected ConnectorFactory instantiateConnectorFactory(String connectorFactoryClassName) {
        ConnectorFactory cachedFactory = this.connectorFactory;
        if (cachedFactory != null && cachedFactory.getClass().getName().equals(connectorFactoryClassName)) {
            return cachedFactory;
        }
        return SecurityManagerShim.doPrivileged(() -> (ConnectorFactory)ClassloadingUtil.newInstanceFromClassLoader(ClientSessionFactoryImpl.class, connectorFactoryClassName, ConnectorFactory.class));
    }

    @Override
    public void setReconnectAttempts(int attempts) {
        this.reconnectAttempts = attempts;
    }

    public int getReconnectAttempts() {
        return this.reconnectAttempts;
    }

    @Override
    public Object getConnector() {
        return this.connector;
    }

    @Override
    public ConfirmationWindowWarning getConfirmationWindowWarning() {
        return this.confirmationWindowWarning;
    }

    protected Connection openTransportConnection(Connector connector) {
        connector.start();
        Connection transportConnection = connector.createConnection();
        if (transportConnection == null) {
            logger.debug("Connector towards {} failed", (Object)connector);
            try {
                connector.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        return transportConnection;
    }

    protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration) {
        NettyConnector nettyConnector;
        Connector connector = connectorFactory.createConnector(configuration.getCombinedParams(), new DelegatingBufferHandler(), this, this.closeExecutor, this.threadPool, this.scheduledThreadPool, this.clientProtocolManager);
        if (connector instanceof NettyConnector && (nettyConnector = (NettyConnector)connector).getConnectTimeoutMillis() < 0) {
            nettyConnector.setConnectTimeoutMillis((int)this.serverLocator.getConnectionTTL());
        }
        return connector;
    }

    private void checkTransportKeys(ConnectorFactory factory, TransportConfiguration tc) {
    }

    protected Connection createTransportConnection() {
        Connection transportConnection = null;
        try {
            transportConnection = this.createTransportConnection("current", this.currentConnectorConfig);
            if (transportConnection != null) {
                return transportConnection;
            }
            if (this.backupConnectorConfig != null) {
                String backupConnectorName = this.backupConnectorConfig.getName();
                if (backupConnectorName != null && this.connectorConfigs != null) {
                    for (TransportConfiguration connectorConfig : this.connectorConfigs) {
                        if (!backupConnectorName.equals(connectorConfig.getName()) || (transportConnection = this.createTransportConnection("backup", connectorConfig)) == null) continue;
                        return transportConnection;
                    }
                }
                if ((transportConnection = this.createTransportConnection("backup", this.backupConnectorConfig)) != null) {
                    return transportConnection;
                }
            }
            if (this.previousConnectorConfig != null && !this.currentConnectorConfig.equals(this.previousConnectorConfig) && (transportConnection = this.createTransportConnection("previous", this.previousConnectorConfig)) != null) {
                return transportConnection;
            }
            if (!this.currentConnectorConfig.equals(this.connectorConfig) && (transportConnection = this.createTransportConnection("initial", this.connectorConfig)) != null) {
                return transportConnection;
            }
            logger.debug("no connection been made, returning null");
            return null;
        }
        catch (Exception cause) {
            ActiveMQClientLogger.LOGGER.createConnectorException(cause);
            if (this.connector != null) {
                try {
                    this.connector.close();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            this.connector = null;
            return null;
        }
    }

    private Connection createTransportConnection(String name, TransportConfiguration transportConnectorConfig) {
        Connector transportConnector;
        Connection transportConnection;
        ConnectorFactory transportConnectorFactory = this.instantiateConnectorFactory(transportConnectorConfig.getFactoryClassName());
        if (logger.isDebugEnabled()) {
            logger.debug("Trying to connect with connectorFactory={} and {}ConnectorConfig: {}", transportConnectorFactory, name, transportConnectorConfig);
        }
        if ((transportConnection = this.openTransportConnection(transportConnector = this.createConnector(transportConnectorFactory, transportConnectorConfig))) != null) {
            logger.debug("Connected with the {}ConnectorConfig={}", (Object)name, (Object)transportConnectorConfig);
            this.connector = transportConnector;
            this.connectorFactory = transportConnectorFactory;
            this.previousConnectorConfig = this.currentConnectorConfig;
            this.currentConnectorConfig = transportConnectorConfig;
        }
        return transportConnection;
    }

    protected RemotingConnection establishNewConnection() {
        Connection transportConnection = this.createTransportConnection();
        if (transportConnection == null) {
            logger.trace("Neither backup or primary were active, will just give up now");
            return null;
        }
        RemotingConnection newConnection = this.clientProtocolManager.connect(transportConnection, this.callTimeout, this.callFailoverTimeout, this.incomingInterceptors, this.outgoingInterceptors, new SessionFactoryTopologyHandler());
        newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID()));
        this.schedulePing();
        logger.trace("returning {}", (Object)newConnection);
        return newConnection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SessionContext createSessionChannel(String name, String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, String clientID) throws ActiveMQException {
        Object object = this.createSessionLock;
        synchronized (object) {
            return this.clientProtocolManager.createSessionContext(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, this.serverLocator.getMinLargeMessageSize(), this.serverLocator.getConfirmationWindowSize(), clientID);
        }
    }

    @Override
    public String getPrimaryNodeId() {
        return this.primaryNodeID;
    }

    public void flushCloseExecutor(int time, TimeUnit unit) {
        this.closeExecutor.flush(time, unit);
    }

    private class DelegatingBufferHandler
    implements BufferHandler {
        private DelegatingBufferHandler() {
        }

        @Override
        public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
            RemotingConnection theConn = ClientSessionFactoryImpl.this.connection;
            if (theConn != null && connectionID.equals(theConn.getID())) {
                try {
                    theConn.bufferReceived(connectionID, buffer);
                }
                catch (RuntimeException e) {
                    ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e);
                    ClientSessionFactoryImpl.this.threadPool.execute(() -> theConn.fail(new ActiveMQException(e.getMessage())));
                }
            } else {
                logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
            }
        }

        @Override
        public void endOfBatch(Object connectionID) {
            RemotingConnection theConn = ClientSessionFactoryImpl.this.connection;
            if (theConn != null && connectionID.equals(theConn.getID())) {
                try {
                    theConn.endOfBatch(connectionID);
                }
                catch (RuntimeException e) {
                    ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e);
                    ClientSessionFactoryImpl.this.threadPool.execute(() -> theConn.fail(new ActiveMQException(e.getMessage())));
                }
            } else {
                logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
            }
        }
    }

    private final class DelegatingFailureListener
    implements FailureListener {
        private final Object connectionID;

        DelegatingFailureListener(Object connectionID) {
            this.connectionID = connectionID;
        }

        @Override
        public void connectionFailed(ActiveMQException me, boolean failedOver) {
            this.connectionFailed(me, failedOver, null);
        }

        @Override
        public void connectionFailed(ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
            ClientSessionFactoryImpl.this.handleConnectionFailure(this.connectionID, me, scaleDownTargetNodeID);
        }

        public String toString() {
            return DelegatingFailureListener.class.getSimpleName() + "('reconnectsOrFailover', hash=" + super.hashCode() + ")";
        }
    }

    private final class PingRunnable
    implements Runnable {
        private boolean cancelled;
        private boolean first;
        private long lastCheck = System.currentTimeMillis();

        private PingRunnable() {
        }

        @Override
        public synchronized void run() {
            if (this.cancelled || ClientSessionFactoryImpl.this.stopPingingAfterOne && !this.first) {
                return;
            }
            this.first = false;
            long now = System.currentTimeMillis();
            RemotingConnection connectionInUse = ClientSessionFactoryImpl.this.connection;
            if (connectionInUse != null && ClientSessionFactoryImpl.this.clientFailureCheckPeriod != -1L && ClientSessionFactoryImpl.this.connectionTTL != -1L && now >= this.lastCheck + ClientSessionFactoryImpl.this.connectionTTL) {
                if (!connectionInUse.checkDataReceived()) {
                    ActiveMQConnectionTimedOutException me = ActiveMQClientMessageBundle.BUNDLE.connectionTimedOut(ClientSessionFactoryImpl.this.connection.getTransportConnection());
                    this.cancelled = true;
                    ClientSessionFactoryImpl.this.threadPool.execute(() -> connectionInUse.fail(me));
                    return;
                }
                this.lastCheck = now;
            }
            this.send();
        }

        public void send() {
            ClientSessionFactoryImpl.this.clientProtocolManager.ping(ClientSessionFactoryImpl.this.connectionTTL);
        }

        public synchronized void cancel() {
            this.cancelled = true;
        }
    }

    private static final class ActualScheduledPinger
    implements Runnable {
        private final WeakReference<PingRunnable> pingRunnable;

        ActualScheduledPinger(PingRunnable runnable) {
            this.pingRunnable = new WeakReference<PingRunnable>(runnable);
        }

        @Override
        public void run() {
            PingRunnable runnable = (PingRunnable)this.pingRunnable.get();
            if (runnable != null) {
                runnable.run();
            }
        }
    }

    class SessionFactoryTopologyHandler
    implements TopologyResponseHandler {
        SessionFactoryTopologyHandler() {
        }

        @Override
        public void nodeDisconnected(RemotingConnection conn, String nodeID, DisconnectReason reason, String targetNodeID, TransportConfiguration tagetConnector) {
            if (logger.isTraceEnabled()) {
                logger.trace("Disconnect being called on client: server locator = {} notifying node {} as down with reason {}", new Object[]{ClientSessionFactoryImpl.this.serverLocator, nodeID, reason, new Exception("trace")});
            }
            ClientSessionFactoryImpl.this.serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID, true);
            if (reason.isRedirect()) {
                if (ClientSessionFactoryImpl.this.serverLocator.isHA()) {
                    TopologyMemberImpl topologyMember = ClientSessionFactoryImpl.this.serverLocator.getTopology().getMember(nodeID);
                    if (topologyMember != null) {
                        if (topologyMember.getConnector().getB() != null) {
                            ClientSessionFactoryImpl.this.backupConnectorConfig = topologyMember.getConnector().getB();
                        } else {
                            logger.debug("The topology member {} with connector {} has no backup", (Object)nodeID, (Object)tagetConnector);
                        }
                    } else {
                        logger.debug("The topology member {} with connector {} not found", (Object)nodeID, (Object)tagetConnector);
                    }
                }
                ClientSessionFactoryImpl.this.currentConnectorConfig = tagetConnector;
            }
            ClientSessionFactoryImpl.this.closeExecutor.execute(new CloseRunnable(conn, reason, targetNodeID));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void notifyNodeUp(long uniqueEventID, String nodeID, String backupGroupName, String scaleDownGroupName, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean isLast) {
            try {
                if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), ClientSessionFactoryImpl.this.currentConnectorConfig)) {
                    ClientSessionFactoryImpl.this.primaryNodeID = nodeID;
                }
                ClientSessionFactoryImpl.this.serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast);
            }
            finally {
                if (isLast) {
                    ClientSessionFactoryImpl.this.topologyReady = true;
                    ClientSessionFactoryImpl.this.latchFinalTopology.countDown();
                }
            }
        }

        @Override
        public void notifyNodeDown(long eventTime, String nodeID, boolean disconnect) {
            ClientSessionFactoryImpl.this.serverLocator.notifyNodeDown(eventTime, nodeID, disconnect);
        }
    }

    public class CloseRunnable
    implements Runnable {
        private final RemotingConnection conn;
        private final DisconnectReason reason;
        private final String targetNodeID;

        public CloseRunnable(RemotingConnection conn, DisconnectReason reason, String targetNodeID) {
            this.conn = conn;
            this.reason = reason;
            this.targetNodeID = targetNodeID;
        }

        @Override
        public void run() {
            try {
                CLOSE_RUNNABLES.add(this);
                if (this.reason.isRedirect()) {
                    this.conn.fail(ActiveMQClientMessageBundle.BUNDLE.redirected());
                } else if (this.reason.isScaleDown()) {
                    this.conn.fail(ActiveMQClientMessageBundle.BUNDLE.disconnected(), this.targetNodeID);
                } else {
                    this.conn.fail(ActiveMQClientMessageBundle.BUNDLE.disconnected());
                }
            }
            finally {
                CLOSE_RUNNABLES.remove(this);
            }
        }

        public ClientSessionFactoryImpl stop() {
            ClientSessionFactoryImpl.this.causeExit();
            CLOSE_RUNNABLES.remove(this);
            return ClientSessionFactoryImpl.this;
        }
    }
}

