/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb.perf;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KahaBulkLoadingTest
extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(KahaBulkLoadingTest.class);
    protected int messageSize = 4096;

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(new File("target/activemq-data/kahadb"));
        broker.setPersistenceAdapter((PersistenceAdapter)kaha);
        broker.addConnector("tcp://localhost:0");
        return broker;
    }

    @Override
    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getServer().getConnectURI());
        factory.setUseAsyncSend(true);
        return factory;
    }

    public void testQueueSendThenAddConsumer() throws Exception {
        long end;
        long start;
        ActiveMQQueue destination = new ActiveMQQueue("TEST");
        this.connection.setUseCompression(false);
        this.connection.getPrefetchPolicy().setAll(10);
        this.connection.start();
        Session session = this.connection.createSession(false, 3);
        LOG.info("Receiving messages that are in the queue");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        BytesMessage msg = (BytesMessage)consumer.receive(2000L);
        int consumed = 0;
        if (msg != null) {
            ++consumed;
        }
        while (true) {
            int counter = 0;
            if (msg == null) break;
            end = start = System.currentTimeMillis();
            int size = 0;
            while (end - start < 5000L && (msg = (BytesMessage)consumer.receive(5000L)) != null) {
                ++counter;
                ++consumed;
                end = System.currentTimeMillis();
                size = (int)((long)size + msg.getBodyLength());
            }
            LOG.info("Consumed: " + (double)counter * 1000.0 / (double)(end - start) + "  messages/sec, " + 1.0 * (double)size / 1048576.0 * (1000.0 / (double)(end - start)) + " megs/sec ");
        }
        consumer.close();
        LOG.info("Consumed " + consumed + " messages from the queue.");
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.setDeliveryMode(2);
        LOG.info("Sending messages that are " + (double)this.messageSize / 1024.0 + "k large");
        start = System.currentTimeMillis();
        final AtomicBoolean stop = new AtomicBoolean();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                stop.set(true);
            }
        });
        int produced = 0;
        while (!stop.get()) {
            end = start = System.currentTimeMillis();
            int produceCount = 0;
            while (end - start < 5000L && !stop.get()) {
                BytesMessage bm = session.createBytesMessage();
                bm.writeBytes(new byte[this.messageSize]);
                producer.send((Message)bm);
                ++produceCount;
                ++produced;
                end = System.currentTimeMillis();
            }
            LOG.info("Produced: " + (double)produceCount * 1000.0 / (double)(end - start) + " messages/sec, " + 1.0 * (double)produceCount * (double)this.messageSize / 1048576.0 * (1000.0 / (double)(end - start)) + " megs/sec");
        }
        LOG.info("Prodcued " + produced + " messages to the queue.");
    }

    public static Test suite() {
        return KahaBulkLoadingTest.suite(KahaBulkLoadingTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)KahaBulkLoadingTest.suite());
    }
}

