/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;

public class UsageBlockedDispatchTest
extends TestSupport {
    private static final int MESSAGES_COUNT = 100;
    private static byte[] buf = new byte[2048];
    private BrokerService broker;
    protected long messageReceiveTimeout = 4000L;
    private String connectionUri;

    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.setDataDirectory("target" + File.separator + "activemq-data");
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(true);
        SystemUsage sysUsage = this.broker.getSystemUsage();
        sysUsage.getMemoryUsage().setLimit(102400L);
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setProducerFlowControl(false);
        defaultPolicy.setCursorMemoryHighWaterMark(100);
        defaultPolicy.setMemoryLimit(51200L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(defaultPolicy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setSystemUsage(sysUsage);
        this.broker.addConnector("tcp://localhost:0").setName("Default");
        this.broker.start();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    protected BrokerService createBroker() throws IOException {
        BrokerService broker = new BrokerService();
        this.setDefaultPersistenceAdapter(broker);
        return broker;
    }

    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testFillMemToBlockConsumer() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setTopicPrefetch(10);
        factory.setPrefetchPolicy(prefetch);
        Connection producerConnection = factory.createConnection();
        producerConnection.start();
        Session producerSession = producerConnection.createSession(false, 1);
        MessageProducer producer = producerSession.createProducer(null);
        BytesMessage message = producerSession.createBytesMessage();
        message.writeBytes(buf);
        int numFillers = 4;
        ArrayList<ActiveMQQueue> fillers = new ArrayList<ActiveMQQueue>();
        for (int i = 0; i < numFillers; ++i) {
            fillers.add(new ActiveMQQueue("Q" + i));
        }
        for (int idx = 0; idx < 100; ++idx) {
            for (ActiveMQQueue q : fillers) {
                producer.send((Destination)q, (Message)message);
            }
        }
        ActiveMQQueue willGetAPage = new ActiveMQQueue("Q" + numFillers++);
        for (int idx = 0; idx < 100; ++idx) {
            producer.send((Destination)willGetAPage, (Message)message);
        }
        ActiveMQQueue shouldBeStuckForDispatch = new ActiveMQQueue("Q" + numFillers);
        for (int idx = 0; idx < 100; ++idx) {
            producer.send((Destination)shouldBeStuckForDispatch, (Message)message);
        }
        Connection consumerConnection = factory.createConnection();
        consumerConnection.start();
        Session consumerSession = consumerConnection.createSession(false, 1);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)willGetAPage);
        consumer.receive(this.messageReceiveTimeout);
        final AtomicBoolean gotExpectedLogEvent = new AtomicBoolean(false);
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getLevel() == Level.WARN && event.getRenderedMessage().contains("cursor blocked")) {
                    gotExpectedLogEvent.set(true);
                }
            }
        };
        try {
            Logger.getLogger(Queue.class).addAppender((Appender)appender);
            MessageConsumer noDispatchConsumer = consumerSession.createConsumer((Destination)shouldBeStuckForDispatch);
            Message m = noDispatchConsumer.receive(this.messageReceiveTimeout);
            UsageBlockedDispatchTest.assertNull((String)"did not get a message", (Object)m);
            UsageBlockedDispatchTest.assertTrue((String)"Got the new warning about the blocked cursor", (boolean)gotExpectedLogEvent.get());
        }
        finally {
            Logger.getLogger(Queue.class).removeAppender((Appender)appender);
            Logger.getRootLogger().removeAppender((Appender)appender);
        }
    }
}

