/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest
extends JmsMultipleBrokersTestSupport {
    static final String payload = new String(new byte[10240]);
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class);
    final int portBase = 61600;
    final int numBrokers = 4;
    final int numProducers = 10;
    final int numMessages = 800;
    final int consumerSleepTime = 20;
    StringBuilder brokersUrl = new StringBuilder();
    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap();
    private ArrayList<Throwable> exceptions = new ArrayList();

    protected void buildUrlList() throws Exception {
        for (int i = 0; i < 4; ++i) {
            this.brokersUrl.append("tcp://localhost:" + (61600 + i));
            if (i == 3) continue;
            this.brokersUrl.append(',');
        }
    }

    protected BrokerService createBroker(int brokerid) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(true);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.getManagementContext().setCreateConnector(false);
        broker.setUseJmx(true);
        broker.setBrokerName("B" + brokerid);
        broker.addConnector(new URI("tcp://localhost:" + (61600 + brokerid)));
        this.addNetworkConnector(broker);
        broker.setSchedulePeriodForDestinationPurge(0);
        broker.getSystemUsage().setSendFailIfNoSpace(true);
        broker.getSystemUsage().getMemoryUsage().setLimit(0x20000000L);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setQueuePrefetch(1000);
        policyEntry.setMemoryLimit(0x100000L);
        policyEntry.setOptimizedDispatch(false);
        policyEntry.setProducerFlowControl(false);
        policyEntry.setEnableAudit(true);
        policyEntry.setUseCache(true);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("GW.>"), (Object)policyEntry);
        broker.setDestinationPolicy(policyMap);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
        this.brokers.put(broker.getBrokerName(), new JmsMultipleBrokersTestSupport.BrokerItem(this, broker));
        return broker;
    }

    private void addNetworkConnector(BrokerService broker) throws Exception {
        StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(this.brokersUrl.toString());
        networkConnectorUrl.append(')');
        for (int i = 0; i < 2; ++i) {
            DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
            nc.setName("Bridge-" + i);
            nc.setNetworkTTL(1);
            nc.setDecreaseNetworkConsumerPriority(true);
            nc.setDynamicOnly(true);
            nc.setPrefetchSize(100);
            nc.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQQueue("GW.*")));
            broker.addNetworkConnector((NetworkConnector)nc);
        }
    }

    public void testBrokers() throws Exception {
        this.buildUrlList();
        for (int i = 0; i < 4; ++i) {
            this.createBroker(i);
        }
        this.startAllBrokers();
        this.waitForBridgeFormation(3);
        this.verifyPeerBrokerInfos(3);
        final List<ConsumerState> consumerStates = this.startAllGWConsumers(4);
        this.startAllGWFanoutConsumers(4);
        LOG.info("Waiting for percolation of consumers..");
        TimeUnit.SECONDS.sleep(5L);
        LOG.info("Produce mesages..");
        long startTime = System.currentTimeMillis();
        this.produce(800);
        AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.assertTrue((String)"Got all sent", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                for (ConsumerState tally : consumerStates) {
                    int expected = 800 * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
                    LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
                    if (tally.accumulator.get() != expected) {
                        LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
                        return false;
                    }
                    LOG.info("got tally on " + tally.brokerName);
                }
                return true;
            }
        }, (long)60000000L));
        AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.assertTrue((String)("No exceptions:" + this.exceptions), (boolean)this.exceptions.isEmpty());
        LOG.info("done");
        long duration = System.currentTimeMillis() - startTime;
        LOG.info("Duration:" + TimeUtils.printDuration((double)duration));
    }

    private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
        StringBuffer compositeDest = new StringBuffer();
        for (int k = 0; k < nBrokers; ++k) {
            compositeDest.append("GW." + k);
            if (k + 1 == nBrokers) continue;
            compositeDest.append(',');
        }
        ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
        for (int id = 0; id < nBrokers; ++id) {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + id) + ")");
            connectionFactory.setWatchTopicAdvisories(false);
            QueueConnection queueConnection = connectionFactory.createQueueConnection();
            queueConnection.start();
            final QueueSession queueSession = queueConnection.createQueueSession(true, 0);
            final MessageProducer producer = queueSession.createProducer((Destination)compositeQ);
            queueSession.createReceiver((Queue)new ActiveMQQueue("IN")).setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    try {
                        producer.send(message);
                        queueSession.commit();
                    }
                    catch (Exception e) {
                        LOG.error("Failed to fanout to GW: " + message, (Throwable)e);
                    }
                }
            });
        }
    }

    private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
        LinkedList<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
        for (int id = 0; id < nBrokers; ++id) {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + id) + ")");
            connectionFactory.setWatchTopicAdvisories(false);
            QueueConnection queueConnection = connectionFactory.createQueueConnection();
            queueConnection.start();
            QueueSession queueSession = queueConnection.createQueueSession(false, 1);
            ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
            QueueReceiver queueReceiver = queueSession.createReceiver((Queue)destination);
            final ConsumerState consumerState = new ConsumerState();
            consumerState.brokerName = ((ActiveMQConnection)queueConnection).getBrokerName();
            consumerState.receiver = queueReceiver;
            consumerState.destination = destination;
            for (int j = 0; j < 800 * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); ++j) {
                consumerState.expected.add(j);
            }
            if (!this.accumulators.containsKey(destination)) {
                this.accumulators.put(destination, new AtomicInteger(0));
            }
            consumerState.accumulator = this.accumulators.get(destination);
            queueReceiver.setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(20L);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        consumerState.accumulator.incrementAndGet();
                        try {
                            consumerState.expected.remove(((ActiveMQMessage)message).getProperty("NUM"));
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    catch (Exception e) {
                        LOG.error("Failed to commit slow receipt of " + message, (Throwable)e);
                    }
                }
            });
            consumerStates.add(consumerState);
        }
        return consumerStates;
    }

    private void produce(int numMessages) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        final AtomicInteger toSend = new AtomicInteger(numMessages);
        for (int i = 1; i <= 10; ++i) {
            final int id = i % 4;
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + id) + ")");
                        connectionFactory.setWatchTopicAdvisories(false);
                        QueueConnection queueConnection = connectionFactory.createQueueConnection();
                        queueConnection.start();
                        QueueSession queueSession = queueConnection.createQueueSession(false, 1);
                        MessageProducer producer = queueSession.createProducer(null);
                        int val = 0;
                        while ((val = toSend.decrementAndGet()) >= 0) {
                            ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
                            LOG.info("Send to: " + ((ActiveMQConnection)queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
                            TextMessage textMessage = queueSession.createTextMessage(((ActiveMQConnection)queueConnection).getBrokerName() + "->" + val + " payload:" + payload);
                            textMessage.setIntProperty("NUM", val);
                            producer.send((Destination)compositeQ, (Message)textMessage);
                        }
                        queueConnection.close();
                    }
                    catch (Throwable throwable) {
                        throwable.printStackTrace();
                        AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.this.exceptions.add(throwable);
                    }
                }
            });
        }
    }

    private void verifyPeerBrokerInfo(JmsMultipleBrokersTestSupport.BrokerItem brokerItem, final int max) throws Exception {
        final BrokerService broker = brokerItem.broker;
        final RegionBroker regionBroker = (RegionBroker)broker.getRegionBroker();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
                return max == regionBroker.getPeerBrokerInfos().length;
            }
        });
        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
        ArrayList<String> missing = new ArrayList<String>();
        for (int i = 0; i < max; ++i) {
            missing.add("B" + i);
        }
        if (max != regionBroker.getPeerBrokerInfos().length) {
            for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
                LOG.info(info.getBrokerName());
                missing.remove(info.getBrokerName());
            }
            LOG.info("Broker infos off.." + missing);
        }
        AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.assertEquals((String)broker.getBrokerName(), (int)max, (int)regionBroker.getPeerBrokerInfos().length);
    }

    private void verifyPeerBrokerInfos(int max) throws Exception {
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)i.next(), max);
        }
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
    }

    class ConsumerState {
        AtomicInteger accumulator;
        String brokerName;
        QueueReceiver receiver;
        ActiveMQDestination destination;
        Vector<Integer> expected = new Vector();

        ConsumerState() {
        }
    }
}

