/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.statistics;

import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

public abstract class AbstractInflightMessageSizeTest {
    protected BrokerService brokerService;
    protected Connection connection;
    protected String brokerUrlString;
    protected Session session;
    protected Destination dest;
    protected org.apache.activemq.broker.region.Destination amqDestination;
    protected MessageConsumer consumer;
    protected int prefetch = 100;
    protected final int ackType;
    protected final boolean optimizeAcknowledge;
    protected final String destName = "testDest";

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({0, true}, {1, true}, {4, true}, {2, true}, {0, false}, {1, false}, {4, false}, {2, false});
    }

    public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
        this.ackType = ackType;
        this.optimizeAcknowledge = optimizeAcknowledge;
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        TransportConnector tcp = this.brokerService.addConnector("tcp://localhost:0");
        this.brokerService.start();
        String optAckString = this.optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
        this.brokerUrlString = tcp.getPublishableConnectString() + optAckString;
        this.connection = this.createConnectionFactory().createConnection();
        this.connection.setClientID("client1");
        this.connection.start();
        this.session = this.connection.createSession(this.ackType == 0, this.ackType);
        this.dest = this.getDestination();
        this.consumer = this.getMessageConsumer();
        this.amqDestination = TestSupport.getDestination(this.brokerService, this.getActiveMQDestination());
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.brokerUrlString);
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setTopicPrefetch(this.prefetch);
        prefetchPolicy.setQueuePrefetch(this.prefetch);
        prefetchPolicy.setOptimizeDurableTopicPrefetch(this.prefetch);
        factory.setPrefetchPolicy(prefetchPolicy);
        return factory;
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.brokerService.stop();
    }

    @Test(timeout=15000L)
    public void testInflightMessageSize() throws Exception {
        final long size = this.sendMessages(10);
        Assert.assertTrue((String)"Inflight message size should be greater than the content length sent", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > size;
            }
        }));
        this.receiveMessages(10);
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() == 0L;
            }
        }));
    }

    @Test(timeout=15000L)
    public void testInflightMessageSizePrefetchFilled() throws Exception {
        final long size = this.sendMessages(this.prefetch);
        Assert.assertTrue((String)"Inflight message size should be greater than content length", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > size;
            }
        }));
        long inFlightSize = this.getSubscription().getInFlightMessageSize();
        this.sendMessages(10);
        Assert.assertEquals((String)"Inflight message size should not change", (long)inFlightSize, (long)this.getSubscription().getInFlightMessageSize());
        this.receiveMessages(this.prefetch + 10);
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() == 0L;
            }
        }));
    }

    @Test(timeout=15000L)
    public void testInflightMessageSizePrefetchNotFilled() throws Exception {
        final long size = this.sendMessages(this.prefetch - 10);
        Assert.assertTrue((String)"Inflight message size should be greater than content length", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > size;
            }
        }));
        final long inFlightSize = this.getSubscription().getInFlightMessageSize();
        this.sendMessages(10);
        Assert.assertTrue((String)"Inflight message size should be greater than previous inlight size", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > inFlightSize;
            }
        }));
        this.receiveMessages(this.prefetch);
        Assert.assertTrue((String)"Inflight message size should be 0", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() == 0L;
            }
        }));
    }

    @Test(timeout=15000L)
    public void testInflightMessageSizeRollback() throws Exception {
        Assume.assumeTrue((this.ackType == 0 ? 1 : 0) != 0);
        final long size = this.sendMessages(10);
        Assert.assertTrue((String)"Inflight message size should be greater than the content length sent", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AbstractInflightMessageSizeTest.this.getSubscription().getInFlightMessageSize() > size;
            }
        }));
        long inFlightSize = this.getSubscription().getInFlightMessageSize();
        for (int i = 0; i < 10; ++i) {
            this.consumer.receive();
        }
        this.session.rollback();
        Assert.assertEquals((String)"Inflight message size should not change on rollback", (long)inFlightSize, (long)this.getSubscription().getInFlightMessageSize());
    }

    protected long sendMessages(int count) throws JMSException {
        MessageProducer producer = this.session.createProducer(this.dest);
        long totalSize = 0L;
        for (int i = 0; i < count; ++i) {
            Random r = new Random();
            int size = r.nextInt(150000);
            totalSize += (long)size;
            byte[] bytes = new byte[size > 0 ? size : 1];
            r.nextBytes(bytes);
            BytesMessage bytesMessage = this.session.createBytesMessage();
            bytesMessage.writeBytes(bytes);
            producer.send((Message)bytesMessage);
        }
        if (this.session.getTransacted()) {
            this.session.commit();
        }
        return totalSize;
    }

    protected void receiveMessages(int count) throws JMSException {
        for (int i = 0; i < count; ++i) {
            Message m = this.consumer.receive();
            if (this.ackType == 0) {
                this.session.commit();
                continue;
            }
            if (this.ackType == 1) continue;
            m.acknowledge();
        }
    }

    protected abstract Subscription getSubscription();

    protected abstract ActiveMQDestination getActiveMQDestination();

    protected abstract MessageConsumer getMessageConsumer() throws JMSException;

    protected abstract Destination getDestination() throws JMSException;
}

