package org.apache.activemq.transport.failover;

import java.net.Socket;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverTimeoutTest.class */
public class FailoverTimeoutTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTimeoutTest.class);
    private static final String QUEUE_NAME = "test.failovertimeout";
    BrokerService bs;
    URI tcpUri;

    @Before
    public void setUp() throws Exception {
        this.bs = new BrokerService();
        this.bs.setUseJmx(false);
        this.bs.addConnector(getTransportUri());
        this.bs.start();
        this.tcpUri = ((TransportConnector) this.bs.getTransportConnectors().get(0)).getConnectUri();
    }

    @After
    public void tearDown() throws Exception {
        if (this.bs != null) {
            this.bs.stop();
        }
    }

    protected String getTransportUri() {
        return JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT;
    }

    @Test
    public void testTimoutDoesNotFailConnectionAttempts() throws Exception {
        this.bs.stop();
        long currentTimeMillis = System.currentTimeMillis();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?timeout=1000&useExponentialBackOff=false&maxReconnectAttempts=5&initialReconnectDelay=1000").createConnection();
        try {
            createConnection.start();
            Assert.fail("Should have failed to connect");
        } catch (JMSException e) {
            LOG.info("Caught exception on call to start: {}", e.getMessage());
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        LOG.info("Time spent waiting to connect: {} ms", Long.valueOf(currentTimeMillis2));
        Assert.assertTrue(currentTimeMillis2 > 3000);
        safeClose(createConnection);
    }

    private void safeClose(Connection connection) {
        try {
            connection.close();
        } catch (Exception e) {
        }
    }

    @Test
    public void testTimeout() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?timeout=1000&useExponentialBackOff=false").createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE_NAME));
        TextMessage createTextMessage = createSession.createTextMessage("Test message");
        createProducer.send(createTextMessage);
        this.bs.stop();
        try {
            createProducer.send(createTextMessage);
        } catch (JMSException e) {
            Assert.assertEquals("Failover timeout of 1000 ms reached.", e.getMessage());
        }
        this.bs = new BrokerService();
        this.bs.setUseJmx(false);
        this.bs.addConnector(this.tcpUri);
        this.bs.start();
        this.bs.waitUntilStarted();
        createProducer.send(createTextMessage);
        this.bs.stop();
        createConnection.close();
    }

    @Test
    public void testInterleaveAckAndException() throws Exception {
        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?maxReconnectAttempts=0").createConnection();
        doTestInterleaveAndException(activeMQConnection, new MessageAck());
        safeClose(activeMQConnection);
    }

    @Test
    public void testInterleaveTxAndException() throws Exception {
        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?maxReconnectAttempts=0").createConnection();
        TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.setConnectionId(activeMQConnection.getConnectionInfo().getConnectionId());
        transactionInfo.setTransactionId(new LocalTransactionId(transactionInfo.getConnectionId(), 1L));
        doTestInterleaveAndException(activeMQConnection, transactionInfo);
        safeClose(activeMQConnection);
    }

    public void doTestInterleaveAndException(final ActiveMQConnection activeMQConnection, final Command command) throws Exception {
        activeMQConnection.start();
        activeMQConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.transport.failover.FailoverTimeoutTest.1
            public void onException(JMSException jMSException) {
                try {
                    FailoverTimeoutTest.LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
                    activeMQConnection.asyncSendPacket(command);
                } catch (Exception e) {
                }
            }
        });
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        final AtomicLong atomicLong = new AtomicLong(1000L);
        for (int i = 0; i < 200; i++) {
            newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTimeoutTest.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(Math.max(0L, atomicLong.addAndGet(-50L)));
                        activeMQConnection.asyncSendPacket(command);
                    } catch (Exception e) {
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        while (countDownLatch.getCount() > 190) {
            countDownLatch.await(20L, TimeUnit.MILLISECONDS);
        }
        ((Socket) activeMQConnection.getTransport().narrow(Socket.class)).close();
        newCachedThreadPool.shutdown();
        Assert.assertTrue("all ops finish", countDownLatch.await(15L, TimeUnit.SECONDS));
    }

    @Test
    public void testUpdateUris() throws Exception {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?useExponentialBackOff=false").createConnection();
        createConnection.start();
        ((FailoverTransport) createConnection.getTransport().narrow(FailoverTransport.class)).add(false, new URI[]{new URI("tcp://unknownHost:" + this.tcpUri.getPort()), new URI("tcp://unknownHost2:" + this.tcpUri.getPort()), new URI("tcp://localhost:2222")});
    }
}
