/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoBrokerQueueClientsReconnectTest
extends JmsMultipleBrokersTestSupport {
    protected static final int MESSAGE_COUNT = 100;
    protected static final int PREFETCH_COUNT = 1;
    protected static final int NETWORK_PREFETCH = 1;
    private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueClientsReconnectTest.class);
    protected int msgsClient1;
    protected int msgsClient2;
    protected String broker1;
    protected String broker2;

    public void testClientAReceivesOnly() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doOneClientReceivesOnly();
    }

    public void testClientBReceivesOnly() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doOneClientReceivesOnly();
    }

    public void doOneClientReceivesOnly() throws Exception {
        this.applyRateLimitNetworkFilter(0.0);
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, (Destination)dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        client2.close();
        this.msgsClient1 += this.receiveAllMessages(client1);
        client1.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have receive all messages."), (int)100, (int)this.msgsClient1);
    }

    public void testClientAReceivesOnlyAfterReconnect() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doOneClientReceivesOnlyAfterReconnect();
    }

    public void testClientBReceivesOnlyAfterReconnect() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doOneClientReceivesOnlyAfterReconnect();
    }

    public void doOneClientReceivesOnlyAfterReconnect() throws Exception {
        this.applyRateLimitNetworkFilter(0.0);
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, (Destination)dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        client1.close();
        client1 = this.createConsumer(this.broker1, (Destination)dest);
        Thread.sleep(500L);
        client2.close();
        this.msgsClient1 += this.receiveAllMessages(client1);
        client1.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received all messages."), (int)100, (int)this.msgsClient1);
    }

    public void testTwoClientsReceiveClientADisconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doTwoClientsReceiveOneClientDisconnects();
    }

    public void testTwoClientsReceiveClientBDisconnects() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doTwoClientsReceiveOneClientDisconnects();
    }

    public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
        this.applyRateLimitNetworkFilter(80.0);
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, (Destination)dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        LOG.info("Let each client receive 20% of the messages - 40% total");
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client1.close();
        LOG.info("Let the second client receive the rest of the messages");
        this.msgsClient2 += this.receiveAllMessages(client2);
        client2.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received 20% of the messages."), (int)20, (int)this.msgsClient1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker2 + " should have received 80% of the messages."), (int)80, (int)this.msgsClient2);
    }

    public void testTwoClientsReceiveClientAReconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doTwoClientsReceiveOneClientReconnects();
    }

    public void testTwoClientsReceiveClientBReconnects() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doTwoClientsReceiveOneClientReconnects();
    }

    public void doTwoClientsReceiveOneClientReconnects() throws Exception {
        this.applyRateLimitNetworkFilter(20.0);
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, (Destination)dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        LOG.info("msgsClient1=" + this.msgsClient1);
        LOG.info("msgsClient2=" + this.msgsClient2);
        Thread.sleep(1000L);
        LOG.info("Disconnect the first client");
        client1.close();
        LOG.info("Let the second client receive 20% more of the total messages");
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        LOG.info("msgsClient2=" + this.msgsClient2);
        client1 = this.createConsumer(this.broker1, (Destination)dest);
        Thread.sleep(1000L);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        client1.close();
        LOG.info("new consumer addition, msgsClient1=" + this.msgsClient1);
        Thread.sleep(2000L);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client2.close();
        LOG.info("msgsClient2=" + this.msgsClient2);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received 40% of the messages."), (int)40, (int)this.msgsClient1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker2 + " should have received 60% of the messages."), (int)60, (int)this.msgsClient2);
    }

    private void applyRateLimitNetworkFilter(double rateLimit) {
        ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory();
        filterFactory.setReplayWhenNoConsumers(true);
        filterFactory.setRateLimit((int)rateLimit);
        filterFactory.setRateDuration(1000);
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            BrokerService broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory((NetworkBridgeFilterFactory)filterFactory);
        }
    }

    public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
        this.applyRateLimitNetworkFilter(50.0);
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, (Destination)dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        LOG.info("Disconnect both clients");
        client1.close();
        client2.close();
        LOG.info("Serially create another two clients for each broker and consume in turn");
        client1 = this.createConsumer(this.broker1, (Destination)dest);
        this.msgsClient1 += this.receiveExactMessages(client1, 30);
        client1.close();
        client2 = this.createConsumer(this.broker2, (Destination)dest);
        this.msgsClient2 += this.receiveExactMessages(client2, 30);
        client2.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received 50% of the messages."), (int)50, (int)this.msgsClient1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker2 + " should have received 50% of the messages."), (int)50, (int)this.msgsClient2);
    }

    public void testDuplicateSend() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        ((TransportConnector)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getTransportConnectors().get(0)).setAuditNetworkProducers(true);
        this.bridgeBrokers(this.broker1, this.broker2);
        final AtomicBoolean first = new AtomicBoolean();
        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
        BrokerService brokerService = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker;
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                if (first.compareAndSet(false, true)) {
                    producerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                LOG.info("Waiting for recepit");
                                TestCase.assertTrue((String)"message received on time", (boolean)gotMessageLatch.await(60L, TimeUnit.SECONDS));
                                LOG.info("Stopping connection post send and receive and multiple producers");
                                producerExchange.getConnectionContext().getConnection().stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        this.sendMessages("BrokerA", (Destination)dest, 1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"Client got message", (int)1, (int)this.receiveExactMessages(client2, 1));
        client2.close();
        gotMessageLatch.countDown();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"messages message still there", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker1)).broker.getAdminView().getTotalMessageCount());
        client2 = this.createConsumer(this.broker2, (Destination)dest);
        LOG.info("Let the second client receive the rest of the messages");
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"no duplicate message", (int)0, (int)this.receiveAllMessages(client2));
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"no duplicate message", (int)0, (int)this.receiveAllMessages(client2));
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"no messages enqueued", (long)0L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getAdminView().getTotalMessageCount());
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"no messages enqueued on origin", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker1)).broker.getAdminView().getTotalMessageCount();
            }
        }));
    }

    public void testDuplicateSendWithCursorAudit() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getDestinationPolicy().getDefaultEntry().setEnableAudit(true);
        this.bridgeBrokers(this.broker1, this.broker2);
        final AtomicBoolean first = new AtomicBoolean();
        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
        BrokerService brokerService = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker;
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                if (first.compareAndSet(false, true)) {
                    producerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                LOG.info("Waiting for recepit");
                                TestCase.assertTrue((String)"message received on time", (boolean)gotMessageLatch.await(60L, TimeUnit.SECONDS));
                                LOG.info("Stopping connection post send and receive and multiple producers");
                                producerExchange.getConnectionContext().getConnection().stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client2 = this.createConsumer(this.broker2, (Destination)dest);
        this.sendMessages("BrokerA", (Destination)dest, 1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"Client got message", (int)1, (int)this.receiveExactMessages(client2, 1));
        client2.close();
        gotMessageLatch.countDown();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"messages message still there", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker1)).broker.getAdminView().getTotalMessageCount());
        client2 = this.createConsumer(this.broker2, (Destination)dest);
        LOG.info("Let the second client receive the rest of the messages");
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"no duplicate message", (int)0, (int)this.receiveAllMessages(client2));
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"no duplicate message", (int)0, (int)this.receiveAllMessages(client2));
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"1 messages enqueued on dlq", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getAdminView().getTotalMessageCount());
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"no messages enqueued on origin", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker1)).broker.getAdminView().getTotalMessageCount();
            }
        }));
    }

    public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        NetworkConnector networkConnector = this.bridgeBrokers(this.broker1, this.broker2);
        final AtomicBoolean first = new AtomicBoolean();
        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
        BrokerService brokerService = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker;
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
        brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                if (first.compareAndSet(false, true)) {
                    producerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                LOG.info("Waiting for recepit");
                                TestCase.assertTrue((String)"message received on time", (boolean)gotMessageLatch.await(60L, TimeUnit.SECONDS));
                                LOG.info("Stopping connection post send and receive and multiple producers");
                                producerExchange.getConnectionContext().getConnection().stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        networkConnector.addStaticallyIncludedDestination(dest);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        this.sendMessages("BrokerA", (Destination)dest, 1);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker2)).broker.getAdminView().getTotalMessageCount() == 1L;
            }
        });
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"messages message still there", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker1)).broker.getAdminView().getTotalMessageCount());
        gotMessageLatch.countDown();
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"no messages enqueued on origin", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker1)).broker.getAdminView().getTotalMessageCount();
            }
        }));
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"one messages pending", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getAdminView().getTotalMessageCount());
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"one messages enqueued", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
    }

    public void testDuplicateSendWithNoAuditEnqueueCountStatConcurrentStoreAndDispatch() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        NetworkConnector networkConnector = this.bridgeBrokers(this.broker1, this.broker2);
        final AtomicBoolean first = new AtomicBoolean();
        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
        BrokerService brokerService = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker;
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                if (first.compareAndSet(false, true)) {
                    producerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                LOG.info("Waiting for recepit");
                                TestCase.assertTrue((String)"message received on time", (boolean)gotMessageLatch.await(60L, TimeUnit.SECONDS));
                                LOG.info("Stopping connection post send and receive by local queue over bridge");
                                producerExchange.getConnectionContext().getConnection().stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        final ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        networkConnector.addStaticallyIncludedDestination(dest);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        this.sendMessages("BrokerA", (Destination)dest, 1);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker2)).broker.getAdminView().getTotalMessageCount() == 1L;
            }
        });
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"messages message still there", (long)1L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker1)).broker.getAdminView().getTotalMessageCount());
        gotMessageLatch.countDown();
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"no messages enqueued on origin", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker1)).broker.getAdminView().getTotalMessageCount();
            }
        }));
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"one messages pending", (long)2L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getAdminView().getTotalMessageCount());
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"one messages enqueued", (long)2L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"one messages", (long)2L, (long)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)this.broker2)).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount());
        Connection conn = this.createConnection(this.broker2);
        conn.start();
        Session sess = conn.createSession(false, 2);
        MessageConsumer messageConsumer = sess.createConsumer((Destination)dest);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"Client got message", (int)1, (int)this.receiveExactMessages(messageConsumer, 1));
        messageConsumer.close();
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"1 messages enqueued on origin", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker2)).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
            }
        }));
        this.destroyAllBrokers();
        this.createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?useJmx=true&advisorySupport=false")).start();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)"Receive after restart and previous receive unacked", (int)1, (int)this.receiveExactMessages(this.createConsumer(this.broker2, (Destination)dest), 1));
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"no messages enqueued", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker2)).broker.getDestination(dest).getDestinationStatistics().getMessages().getCount();
            }
        }));
        final ActiveMQDestination dlq = this.createDestination("ActiveMQ.DLQ", false);
        TwoBrokerQueueClientsReconnectTest.assertTrue((String)"one message still on dlq", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1L == ((JmsMultipleBrokersTestSupport.BrokerItem)((TwoBrokerQueueClientsReconnectTest)TwoBrokerQueueClientsReconnectTest.this).brokers.get((Object)TwoBrokerQueueClientsReconnectTest.this.broker2)).broker.getDestination(dlq).getDestinationStatistics().getMessages().getCount();
            }
        }));
    }

    protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
        int i;
        for (i = 0; i < msgCount; ++i) {
            javax.jms.Message msg = consumer.receive(4000L);
            if (msg != null) continue;
            LOG.error("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i);
            break;
        }
        return i;
    }

    protected int receiveAllMessages(MessageConsumer consumer) throws Exception {
        javax.jms.Message msg;
        int msgsReceived = 0;
        do {
            if ((msg = consumer.receive(1000L)) == null) continue;
            ++msgsReceived;
        } while (msg != null);
        return msgsReceived;
    }

    @Override
    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
        Connection conn = this.createConnection(brokerName);
        conn.start();
        Session sess = conn.createSession(false, 1);
        return sess.createConsumer(dest);
    }

    @Override
    protected void configureBroker(BrokerService broker) {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(0L);
        defaultEntry.setEnableAudit(false);
        policyMap.setDefaultEntry(defaultEntry);
        broker.setDestinationPolicy(policyMap);
    }

    @Override
    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception {
        NetworkConnector nc = super.bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit, failover);
        nc.setPrefetchSize(1);
        nc.setDecreaseNetworkConsumerPriority(true);
        return nc;
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        this.createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=true"));
        this.createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=true"));
        ActiveMQConnectionFactory factoryA = (ActiveMQConnectionFactory)this.getConnectionFactory("BrokerA");
        ActiveMQConnectionFactory factoryB = (ActiveMQConnectionFactory)this.getConnectionFactory("BrokerB");
        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
        policy.setAll(1);
        factoryA.setPrefetchPolicy(policy);
        factoryB.setPrefetchPolicy(policy);
        factoryA.setWatchTopicAdvisories(false);
        factoryB.setWatchTopicAdvisories(false);
        this.msgsClient1 = 0;
        this.msgsClient2 = 0;
    }
}

