/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AMQ6131Test {
    protected BrokerService broker;
    protected URI brokerConnectURI;

    @Before
    public void startBroker() throws Exception {
        Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
        this.setUpBroker(true);
    }

    protected void setUpBroker(boolean clearDataDir) throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setDeleteAllMessagesOnStartup(clearDataDir);
        TransportConnector connector = this.broker.addConnector(new TransportConnector());
        connector.setUri(new URI("tcp://0.0.0.0:0"));
        connector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    protected BrokerService getBroker() {
        return this.broker;
    }

    public File getPersistentDir() throws IOException {
        return this.getBroker().getPersistenceAdapter().getDirectory();
    }

    @Test(timeout=300000L)
    public void testDurableWithOnePendingAfterRestartAndIndexRecovery() throws Exception {
        final File persistentDir = this.getPersistentDir();
        this.broker.getBroker().addDestination(this.broker.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQTopic("durable.sub"), false);
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerConnectURI);
        ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
        connection.setClientID("myId");
        connection.start();
        Session jmsSession = connection.createSession(false, 1);
        TopicSubscriber durable = jmsSession.createDurableSubscriber((Topic)new ActiveMQTopic("durable.sub"), "sub");
        final MessageProducer producer = jmsSession.createProducer((Destination)new ActiveMQTopic("durable.sub"));
        final int original = new ArrayList(FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size();
        final byte[] data = new byte[100000];
        Random random = new Random();
        random.nextBytes(data);
        final AtomicInteger messageCount = new AtomicInteger();
        Assert.assertTrue((String)"Should have added a journal file", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ActiveMQBytesMessage message = new ActiveMQBytesMessage();
                message.setContent(new ByteSequence(data));
                for (int i = 0; i < 100; ++i) {
                    producer.send((Message)message);
                    messageCount.getAndIncrement();
                }
                return new ArrayList(FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size() > original;
            }
        }));
        for (int i = 0; i < messageCount.get() - 1; ++i) {
            durable.receive();
        }
        durable.close();
        Assert.assertTrue((String)"Subscription should go inactive", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ6131Test.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
            }
        }));
        this.getBroker().getPersistenceAdapter().checkpoint(true);
        Assert.assertFalse((String)"Should not have garbage collected", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return new ArrayList(FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size() == original;
            }
        }, (long)5000L, (long)500L));
        this.getBroker().stop();
        this.getBroker().waitUntilStopped();
        for (File index : FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("db.*"), (IOFileFilter)TrueFileFilter.INSTANCE)) {
            FileUtils.deleteQuietly((File)index);
        }
        this.stopBroker();
        this.setUpBroker(false);
        Assert.assertEquals((long)1L, (long)this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals((long)0L, (long)this.broker.getAdminView().getDurableTopicSubscribers().length);
        ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(this.brokerConnectURI);
        ActiveMQConnection connection2 = (ActiveMQConnection)connectionFactory2.createConnection();
        connection2.setClientID("myId");
        connection2.start();
        Session jmsSession2 = connection2.createSession(false, 1);
        TopicSubscriber durable2 = jmsSession2.createDurableSubscriber((Topic)new ActiveMQTopic("durable.sub"), "sub");
        Assert.assertEquals((long)0L, (long)this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.broker.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertNotNull((Object)durable2.receive(5000L));
    }

    @Test(timeout=300000L)
    public void testDurableWithNoMessageAfterRestartAndIndexRecovery() throws Exception {
        final File persistentDir = this.getPersistentDir();
        this.broker.getBroker().addDestination(this.broker.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQTopic("durable.sub"), false);
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerConnectURI);
        ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
        connection.setClientID("myId");
        connection.start();
        Session jmsSession = connection.createSession(false, 1);
        TopicSubscriber durable = jmsSession.createDurableSubscriber((Topic)new ActiveMQTopic("durable.sub"), "sub");
        final MessageProducer producer = jmsSession.createProducer((Destination)new ActiveMQTopic("durable.sub"));
        final int original = new ArrayList(FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size();
        final byte[] data = new byte[100000];
        Random random = new Random();
        random.nextBytes(data);
        final AtomicInteger messageCount = new AtomicInteger();
        Assert.assertTrue((String)"Should have added a journal file", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ActiveMQBytesMessage message = new ActiveMQBytesMessage();
                message.setContent(new ByteSequence(data));
                for (int i = 0; i < 100; ++i) {
                    producer.send((Message)message);
                    messageCount.getAndIncrement();
                }
                return new ArrayList(FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size() > original;
            }
        }));
        for (int i = 0; i < messageCount.get(); ++i) {
            durable.receive();
        }
        durable.close();
        Assert.assertTrue((String)"Subscription should go inactive", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ6131Test.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
            }
        }));
        this.getBroker().getPersistenceAdapter().checkpoint(true);
        Assert.assertTrue((String)"Should have garbage collected", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return new ArrayList(FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("*.log"), (IOFileFilter)TrueFileFilter.INSTANCE)).size() == original;
            }
        }));
        this.getBroker().stop();
        this.getBroker().waitUntilStopped();
        for (File index : FileUtils.listFiles((File)persistentDir, (IOFileFilter)new WildcardFileFilter("db.*"), (IOFileFilter)TrueFileFilter.INSTANCE)) {
            FileUtils.deleteQuietly((File)index);
        }
        this.stopBroker();
        this.setUpBroker(false);
        Assert.assertEquals((long)1L, (long)this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals((long)0L, (long)this.broker.getAdminView().getDurableTopicSubscribers().length);
        ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(this.brokerConnectURI);
        ActiveMQConnection connection2 = (ActiveMQConnection)connectionFactory2.createConnection();
        connection2.setClientID("myId");
        connection2.start();
        Session jmsSession2 = connection2.createSession(false, 1);
        TopicSubscriber durable2 = jmsSession2.createDurableSubscriber((Topic)new ActiveMQTopic("durable.sub"), "sub");
        Assert.assertEquals((long)0L, (long)this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        Assert.assertEquals((long)1L, (long)this.broker.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertNull((Object)durable2.receive(500L));
    }
}

