/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network.jms;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.network.jms.OutboundQueueBridge;
import org.apache.activemq.network.jms.SimpleJmsQueueConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class QueueOutboundBridgeReconnectTest {
    private BrokerService producerBroker;
    private BrokerService consumerBroker;
    private ActiveMQConnectionFactory producerConnectionFactory;
    private ActiveMQConnectionFactory consumerConnectionFactory;
    private Destination destination;
    private final ArrayList<Connection> connections = new ArrayList();

    @Test
    public void testMultipleProducerBrokerRestarts() throws Exception {
        for (int i = 0; i < 10; ++i) {
            this.testWithProducerBrokerRestart();
            this.disposeConsumerConnections();
        }
    }

    @Test
    public void testRestartProducerWithNoConsumer() throws Exception {
        this.stopConsumerBroker();
        this.startProducerBroker();
        this.sendMessage("test123");
        this.sendMessage("test456");
    }

    @Test
    public void testWithoutRestartsConsumerFirst() throws Exception {
        this.startConsumerBroker();
        this.startProducerBroker();
        this.sendMessage("test123");
        this.sendMessage("test456");
        MessageConsumer consumer = this.createConsumer();
        Message message = consumer.receive(3000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test123", (Object)((TextMessage)message).getText());
        message = consumer.receive(3000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test456", (Object)((TextMessage)message).getText());
        Assert.assertNull((Object)consumer.receiveNoWait());
    }

    @Test
    public void testWithoutRestartsProducerFirst() throws Exception {
        this.startProducerBroker();
        this.sendMessage("test123");
        this.startConsumerBroker();
        this.sendMessage("test456");
        MessageConsumer consumer = this.createConsumer();
        Message message = consumer.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test123", (Object)((TextMessage)message).getText());
        message = consumer.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test456", (Object)((TextMessage)message).getText());
        Assert.assertNull((Object)consumer.receiveNoWait());
    }

    @Test
    public void testWithProducerBrokerRestart() throws Exception {
        this.startProducerBroker();
        this.startConsumerBroker();
        this.sendMessage("test123");
        MessageConsumer consumer = this.createConsumer();
        Message message = consumer.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test123", (Object)((TextMessage)message).getText());
        Assert.assertNull((Object)consumer.receiveNoWait());
        this.stopProducerBroker();
        this.startProducerBroker();
        this.sendMessage("test123");
        message = consumer.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test123", (Object)((TextMessage)message).getText());
        Assert.assertNull((Object)consumer.receiveNoWait());
    }

    @Test
    public void testWithConsumerBrokerRestart() throws Exception {
        this.startProducerBroker();
        this.startConsumerBroker();
        this.sendMessage("test123");
        MessageConsumer consumer1 = this.createConsumer();
        Message message = consumer1.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test123", (Object)((TextMessage)message).getText());
        Assert.assertNull((Object)consumer1.receiveNoWait());
        consumer1.close();
        this.stopConsumerBroker();
        this.startConsumerBroker();
        this.sendMessage("test123");
        final MessageConsumer consumer2 = this.createConsumer();
        Assert.assertTrue((String)"Expected recover and delivery failed", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = consumer2.receiveNoWait();
                return message != null && ((TextMessage)message).getText().equals("test123");
            }
        }));
        Assert.assertNull((Object)consumer2.receiveNoWait());
    }

    @Test
    public void testWithConsumerBrokerStartDelay() throws Exception {
        this.startConsumerBroker();
        final MessageConsumer consumer = this.createConsumer();
        TimeUnit.SECONDS.sleep(5L);
        this.startProducerBroker();
        this.sendMessage("test123");
        Assert.assertTrue((String)"Expected recover and delivery failed", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = consumer.receiveNoWait();
                return message != null && ((TextMessage)message).getText().equals("test123");
            }
        }));
        Assert.assertNull((Object)consumer.receiveNoWait());
    }

    @Test
    public void testWithProducerBrokerStartDelay() throws Exception {
        this.startProducerBroker();
        TimeUnit.SECONDS.sleep(5L);
        this.startConsumerBroker();
        MessageConsumer consumer = this.createConsumer();
        this.sendMessage("test123");
        Message message = consumer.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((Object)"test123", (Object)((TextMessage)message).getText());
        Assert.assertNull((Object)consumer.receiveNoWait());
    }

    @Before
    public void setUp() throws Exception {
        this.producerConnectionFactory = this.createProducerConnectionFactory();
        this.consumerConnectionFactory = this.createConsumerConnectionFactory();
        this.destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
    }

    @After
    public void tearDown() throws Exception {
        this.disposeConsumerConnections();
        try {
            this.stopProducerBroker();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        try {
            this.stopConsumerBroker();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    protected void disposeConsumerConnections() {
        for (Connection connection : this.connections) {
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    protected void startProducerBroker() throws Exception {
        if (this.producerBroker == null) {
            this.producerBroker = this.createFirstBroker();
            this.producerBroker.start();
        }
    }

    protected void stopProducerBroker() throws Exception {
        if (this.producerBroker != null) {
            this.producerBroker.stop();
            this.producerBroker = null;
        }
    }

    protected void startConsumerBroker() throws Exception {
        if (this.consumerBroker == null) {
            this.consumerBroker = this.createSecondBroker();
            this.consumerBroker.start();
        }
    }

    protected void stopConsumerBroker() throws Exception {
        if (this.consumerBroker != null) {
            this.consumerBroker.stop();
            this.consumerBroker = null;
        }
    }

    protected BrokerService createFirstBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("broker1");
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.addConnector("tcp://localhost:61616");
        broker.addConnector("vm://broker1");
        SimpleJmsQueueConnector jmsQueueConnector = new SimpleJmsQueueConnector();
        jmsQueueConnector.setOutboundQueueBridges(new OutboundQueueBridge[]{new OutboundQueueBridge("RECONNECT.TEST.QUEUE")});
        jmsQueueConnector.setOutboundQueueConnectionFactory((QueueConnectionFactory)new ActiveMQConnectionFactory("tcp://localhost:61617"));
        broker.setJmsBridgeConnectors(new JmsConnector[]{jmsQueueConnector});
        return broker;
    }

    protected BrokerService createSecondBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("broker2");
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.addConnector("tcp://localhost:61617");
        broker.addConnector("vm://broker2");
        return broker;
    }

    protected ActiveMQConnectionFactory createProducerConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://broker1");
    }

    protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://broker2");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendMessage(String text) throws JMSException {
        Connection connection = null;
        try {
            connection = this.producerConnectionFactory.createConnection();
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer(this.destination);
            TextMessage message = session.createTextMessage();
            message.setText(text);
            producer.send((Message)message);
        }
        finally {
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    protected MessageConsumer createConsumer() throws JMSException {
        Connection connection = this.consumerConnectionFactory.createConnection();
        this.connections.add(connection);
        connection.start();
        Session session = connection.createSession(false, 1);
        return session.createConsumer(this.destination);
    }
}

