/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class MessageGroupReconnectDistributionTest {
    public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class);
    final Random random = new Random();
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected ActiveMQQueue destination = new ActiveMQQueue("GroupQ");
    protected TransportConnector connector;
    ActiveMQConnectionFactory connFactory;
    BrokerService broker;
    int numMessages = 10000;
    int groupSize = 10;
    int batchSize = 20;
    @Parameterized.Parameter(value=0)
    public int numConsumers = 4;
    @Parameterized.Parameter(value=1)
    public boolean consumerPriority = true;

    @Parameterized.Parameters(name="numConsumers={0},consumerPriority={1}")
    public static Iterable<Object[]> combinations() {
        return Arrays.asList({4, true}, {10, true});
    }

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.connFactory = new ActiveMQConnectionFactory(this.connector.getConnectUri() + "?jms.prefetchPolicy.all=200");
        this.connFactory.setWatchTopicAdvisories(false);
        this.connection = this.connFactory.createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer((Destination)this.destination);
        this.connection.start();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService service = new BrokerService();
        service.setAdvisorySupport(false);
        service.setPersistent(false);
        service.setUseJmx(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setUseConsumerPriority(this.consumerPriority);
        policy.setMessageGroupMapFactoryType("cached?cacheSize=" + (this.numConsumers - 1));
        policyMap.setDefaultEntry(policy);
        service.setDestinationPolicy(policyMap);
        this.connector = service.addConnector("tcp://localhost:0");
        return service;
    }

    @After
    public void tearDown() throws Exception {
        this.producer.close();
        this.session.close();
        this.connection.close();
        this.broker.stop();
    }

    @Test(timeout=300000L)
    public void testReconnect() throws Exception {
        final AtomicLong totalConsumed = new AtomicLong(0L);
        ExecutorService executorService = Executors.newFixedThreadPool(this.numConsumers);
        final ArrayList<AtomicLong> consumedCounters = new ArrayList<AtomicLong>(this.numConsumers);
        final ArrayList<AtomicLong> batchCounters = new ArrayList<AtomicLong>(this.numConsumers);
        int i = 0;
        while (i < this.numConsumers) {
            consumedCounters.add(new AtomicLong(0L));
            batchCounters.add(new AtomicLong(0L));
            final int id = i++;
            executorService.submit(new Runnable(){

                int getBatchSize() {
                    return (id + 1) * MessageGroupReconnectDistributionTest.this.batchSize;
                }

                @Override
                public void run() {
                    try {
                        Session connectionSession = MessageGroupReconnectDistributionTest.this.connection.createSession(false, 1);
                        int batchSize = this.getBatchSize();
                        MessageConsumer messageConsumer = connectionSession.createConsumer(MessageGroupReconnectDistributionTest.this.destWithPrefetch(MessageGroupReconnectDistributionTest.this.destination));
                        AtomicLong consumed = (AtomicLong)consumedCounters.get(id);
                        AtomicLong batches = (AtomicLong)batchCounters.get(id);
                        LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", totalConsumed:" + totalConsumed.get() + ", consumed:" + consumed.get());
                        while (totalConsumed.get() < (long)MessageGroupReconnectDistributionTest.this.numMessages) {
                            Message message = messageConsumer.receive(10000L);
                            if (message == null) {
                                LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", null message (totalConsumed:" + totalConsumed.get() + ") consumed:" + consumed.get());
                                messageConsumer.close();
                                if (totalConsumed.get() != (long)MessageGroupReconnectDistributionTest.this.numMessages) {
                                    batchSize = this.getBatchSize();
                                    messageConsumer = connectionSession.createConsumer(MessageGroupReconnectDistributionTest.this.destWithPrefetch(MessageGroupReconnectDistributionTest.this.destination));
                                    batches.incrementAndGet();
                                    continue;
                                }
                                break;
                            }
                            consumed.incrementAndGet();
                            totalConsumed.incrementAndGet();
                            if (consumed.get() <= 0L || consumed.intValue() % batchSize != 0) continue;
                            messageConsumer.close();
                            batchSize = this.getBatchSize();
                            messageConsumer = connectionSession.createConsumer(MessageGroupReconnectDistributionTest.this.destWithPrefetch(MessageGroupReconnectDistributionTest.this.destination));
                            batches.incrementAndGet();
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            TimeUnit.MILLISECONDS.sleep(200L);
        }
        TimeUnit.SECONDS.sleep(1L);
        this.produceMessages(this.numMessages);
        executorService.shutdown();
        Assert.assertTrue((String)"threads done on time", (boolean)executorService.awaitTermination(10L, TimeUnit.MINUTES));
        Assert.assertEquals((String)"All consumed", (long)this.numMessages, (long)totalConsumed.intValue());
        LOG.info("Distribution: " + consumedCounters);
        LOG.info("Batches: " + batchCounters);
        double max = (double)((AtomicLong)consumedCounters.get(0)).longValue() * 1.5;
        double min = (double)((AtomicLong)consumedCounters.get(0)).longValue() * 0.5;
        for (AtomicLong l : consumedCounters) {
            Assert.assertTrue((String)("Even +/- 50% distribution on consumed:" + consumedCounters + ", outlier:" + l.get()), ((double)l.longValue() < max && (double)l.longValue() > min ? 1 : 0) != 0);
        }
    }

    private Destination destWithPrefetch(ActiveMQQueue destination) throws Exception {
        return destination;
    }

    private void produceMessages(int numMessages) throws JMSException {
        int groupID = 0;
        for (int i = 0; i < numMessages; ++i) {
            if (i > 0 && i % this.groupSize == 0) {
                ++groupID;
            }
            TextMessage msga = this.session.createTextMessage("hello " + i);
            msga.setStringProperty("JMSXGroupID", "Group-" + groupID);
            this.producer.send((Message)msga);
        }
    }
}

