/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.cursors.AbstractStoreCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubCacheTest {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubCacheTest.class);
    private final ActiveMQTopic topic = new ActiveMQTopic("T1");
    private BrokerService broker;

    @Before
    public void setUp() throws Exception {
        this.broker = this.createAndStartBroker();
        this.broker.waitUntilStarted();
    }

    private BrokerService createAndStartBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setUseJmx(false);
        broker.setAdvisorySupport(false);
        broker.getSystemUsage().getMemoryUsage().setLimit(102400L);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setCursorMemoryHighWaterMark(20);
        policyMap.put((ActiveMQDestination)this.topic, (Object)policy);
        broker.setDestinationPolicy(policyMap);
        broker.start();
        return broker;
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
    }

    @Test
    public void testCacheExhaustion() throws Exception {
        this.doTestCacheExhaustion(1000);
    }

    @Test
    public void testCacheExhaustionPrefetch0() throws Exception {
        this.doTestCacheExhaustion(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doTestCacheExhaustion(int prefetch) throws Exception {
        this.createDurableSub(this.topic, "my_sub_1");
        this.publishMesssages(this.topic, 20);
        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger((String)AbstractStoreCursor.class.getCanonicalName());
        final AtomicBoolean failed = new AtomicBoolean(false);
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel() == Level.WARN) {
                    LOG.info("Got warn event:" + event.getRenderedMessage());
                    failed.set(true);
                }
            }
        };
        log4jLogger.addAppender((Appender)appender);
        try {
            this.consumeDurableSub(this.topic, "my_sub_1", 20, prefetch);
        }
        finally {
            log4jLogger.removeAppender((Appender)appender);
        }
        Assert.assertFalse((String)"no warning from the cursor", (boolean)failed.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishMesssages(ActiveMQTopic topic, int messageCount) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        connectionFactory.setWatchTopicAdvisories(false);
        Connection con = connectionFactory.createConnection();
        con.start();
        Session session = con.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)topic);
        try {
            String textMessage = new String(new byte[1024]);
            TextMessage msg = session.createTextMessage(textMessage);
            for (int i = 0; i < messageCount; ++i) {
                producer.send((Message)msg);
            }
        }
        finally {
            con.close();
        }
    }

    private void createDurableSub(ActiveMQTopic topic, String subID) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        connectionFactory.setWatchTopicAdvisories(false);
        Connection con = connectionFactory.createConnection();
        con.setClientID("CONNECTION-" + subID);
        con.start();
        Session session = con.createSession(false, 1);
        session.createDurableSubscriber((Topic)topic, subID, null, true);
        session.close();
        con.close();
    }

    private void consumeDurableSub(ActiveMQTopic topic, String subID, int messageCount) throws Exception {
        this.consumeDurableSub(topic, subID, messageCount, 1000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumeDurableSub(ActiveMQTopic topic, String subID, int messageCount, int prefetch) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        ActiveMQConnection con = (ActiveMQConnection)connectionFactory.createConnection();
        con.setClientID("CONNECTION-" + subID);
        con.getPrefetchPolicy().setAll(prefetch);
        con.start();
        Session session = con.createSession(false, 1);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber((Topic)topic, subID, null, true);
        try {
            for (int i = 0; i < messageCount; ++i) {
                Message message = topicSubscriber.receive(4000L);
                if (message != null) continue;
                Assert.fail((String)"should have received a message");
            }
        }
        finally {
            con.close();
        }
    }
}

