/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.test.retroactive;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetroactiveConsumerBrokerRestartedTest
extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(RetroactiveConsumerBrokerRestartedTest.class);
    private static final String ACTIVEMQ_BROKER_URI = "tcp://localhost:62626";
    private BrokerService broker;
    Connection connection;

    protected void setUp() throws Exception {
        this.createBroker();
        this.connection = this.getConnection();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
    }

    protected void restartBroker() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        this.createRestartedBroker();
    }

    private void createBroker() throws Exception {
        PolicyEntry policy = new PolicyEntry();
        policy.setTopic(">");
        policy.setDispatchPolicy((DispatchPolicy)new SimpleDispatchPolicy());
        FixedCountSubscriptionRecoveryPolicy fixed = new FixedCountSubscriptionRecoveryPolicy();
        fixed.setMaximumSize(10);
        policy.setSubscriptionRecoveryPolicy((SubscriptionRecoveryPolicy)fixed);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policy);
        this.broker = new BrokerService();
        this.broker.setBrokerName("durable-broker");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistenceAdapter(this.createPersistenceAdapter());
        this.broker.setPersistent(true);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector(ACTIVEMQ_BROKER_URI);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    private void createRestartedBroker() throws Exception {
        PolicyEntry policy = new PolicyEntry();
        policy.setTopic(">");
        policy.setDispatchPolicy((DispatchPolicy)new SimpleDispatchPolicy());
        FixedCountSubscriptionRecoveryPolicy fixed = new FixedCountSubscriptionRecoveryPolicy();
        fixed.setMaximumSize(10);
        policy.setSubscriptionRecoveryPolicy((SubscriptionRecoveryPolicy)fixed);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policy);
        this.broker = new BrokerService();
        this.broker.setBrokerName("durable-broker");
        this.broker.setDeleteAllMessagesOnStartup(false);
        this.broker.setPersistenceAdapter(this.createPersistenceAdapter());
        this.broker.setPersistent(true);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector(ACTIVEMQ_BROKER_URI);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected PersistenceAdapter createPersistenceAdapter() throws Exception {
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        return adapter;
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
    }

    private Connection getConnection() throws Exception {
        Connection connection = this.createConnectionFactory().createConnection();
        connection.setClientID("cliId1");
        return connection;
    }

    public void testFixedCountSubscriptionRecoveryPolicy() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        Topic topicSub = session.createTopic("TestTopic?consumer.retroactive=true");
        Topic topic = session.createTopic("TestTopic");
        TopicSubscriber sub1 = session.createDurableSubscriber(topicSub, "sub1");
        MessageProducer producer = session.createProducer((Destination)topic);
        producer.setDeliveryMode(2);
        producer.send((Message)session.createTextMessage("Msg:1"));
        producer.send((Message)session.createTextMessage("Msg:2"));
        producer.send((Message)session.createTextMessage("Msg:3"));
        this.restartBroker();
        this.connection = this.getConnection();
        this.connection.start();
        session = this.connection.createSession(false, 1);
        producer = session.createProducer((Destination)topic);
        producer.setDeliveryMode(2);
        producer.send((Message)session.createTextMessage("Msg:4"));
        sub1 = session.createDurableSubscriber(topicSub, "sub1");
        this.assertTextMessageEquals("Msg:1", sub1.receive(1000L));
        this.assertTextMessageEquals("Msg:2", sub1.receive(1000L));
        this.assertTextMessageEquals("Msg:3", sub1.receive(1000L));
        this.assertTextMessageEquals("Msg:4", sub1.receive(1000L));
    }

    private void assertTextMessageEquals(String string, Message message) throws JMSException {
        RetroactiveConsumerBrokerRestartedTest.assertNotNull((String)"Message was null", (Object)message);
        RetroactiveConsumerBrokerRestartedTest.assertTrue((String)"Message is not a TextMessage", (boolean)(message instanceof TextMessage));
        RetroactiveConsumerBrokerRestartedTest.assertEquals((String)string, (String)((TextMessage)message).getText());
    }
}

