/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.scheduler;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerTestSupport;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsSchedulerTest
extends JobSchedulerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(JmsSchedulerTest.class);

    @Test
    public void testCron() throws Exception {
        int COUNT = 10;
        final AtomicInteger count = new AtomicInteger();
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(10);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                count.incrementAndGet();
                latch.countDown();
                LOG.info("Received scheduled message, waiting for {} more", (Object)latch.getCount());
            }
        });
        connection.start();
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        long time = 1000L;
        message.setStringProperty("AMQ_SCHEDULED_CRON", "* * * * *");
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        message.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        message.setIntProperty("AMQ_SCHEDULED_REPEAT", 9);
        producer.send((Message)message);
        producer.close();
        Thread.sleep(500L);
        SchedulerBroker sb = (SchedulerBroker)this.broker.getBroker().getAdaptor(SchedulerBroker.class);
        JobScheduler js = sb.getJobScheduler();
        List list = js.getAllJobs();
        Assert.assertEquals((long)1L, (long)list.size());
        latch.await(240L, TimeUnit.SECONDS);
        Assert.assertEquals((long)10L, (long)count.get());
        connection.close();
    }

    @Test
    public void testSchedule() throws Exception {
        boolean COUNT = true;
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(1);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        connection.start();
        long time = 5000L;
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        producer.send((Message)message);
        producer.close();
        Thread.sleep(2000L);
        Assert.assertEquals((long)latch.getCount(), (long)1L);
        latch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals((long)latch.getCount(), (long)0L);
        connection.close();
    }

    @Test
    public void testTransactedSchedule() throws Exception {
        boolean COUNT = true;
        Connection connection = this.createConnection();
        final Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(1);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    session.commit();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            }
        });
        connection.start();
        long time = 5000L;
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        producer.send((Message)message);
        session.commit();
        producer.close();
        Thread.sleep(2000L);
        Assert.assertEquals((long)latch.getCount(), (long)1L);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)latch.getCount(), (long)0L);
        connection.close();
    }

    @Test
    public void testScheduleRepeated() throws Exception {
        int NUMBER = 10;
        final AtomicInteger count = new AtomicInteger();
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(10);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                count.incrementAndGet();
                latch.countDown();
                LOG.info("Received scheduled message, waiting for {} more", (Object)latch.getCount());
            }
        });
        connection.start();
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        long time = 1000L;
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        message.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        message.setIntProperty("AMQ_SCHEDULED_REPEAT", 9);
        producer.send((Message)message);
        producer.close();
        Assert.assertEquals((long)latch.getCount(), (long)10L);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
        Thread.sleep(1000L);
        Assert.assertEquals((long)10L, (long)count.get());
        connection.close();
    }

    @Test
    public void testScheduleRestart() throws Exception {
        this.testScheduleRestart(JobSchedulerTestSupport.RestartType.NORMAL);
    }

    @Test
    public void testScheduleFullRecoveryRestart() throws Exception {
        this.testScheduleRestart(JobSchedulerTestSupport.RestartType.FULL_RECOVERY);
    }

    @Test
    public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
        int NUMBER_OF_MESSAGES = 1000;
        final AtomicInteger numberOfDiscardedJobs = new AtomicInteger();
        JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl)this.broker.getJobSchedulerStore();
        Location middleLocation = null;
        DefaultTestAppender appender = new DefaultTestAppender(){

            public void doAppend(LoggingEvent event) {
                if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) {
                    numberOfDiscardedJobs.incrementAndGet();
                }
            }
        };
        this.registerLogAppender((Appender)appender);
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        connection.start();
        MessageProducer producer = session.createProducer((Destination)this.destination);
        for (int i = 0; i < 1000; ++i) {
            TextMessage message = session.createTextMessage("test msg");
            long time = 5000L;
            message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
            producer.send((Message)message);
            if (500 != i) continue;
            middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation();
        }
        producer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation);
        jobSchedulerStore.load();
        Assert.assertEquals((long)numberOfDiscardedJobs.get(), (long)500L);
    }

    private void registerLogAppender(Appender appender) {
        org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
        log4jLogger.addAppender(appender);
        log4jLogger.setLevel(Level.TRACE);
    }

    private void testScheduleRestart(JobSchedulerTestSupport.RestartType restartType) throws Exception {
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        connection.start();
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        long time = 5000L;
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        producer.send((Message)message);
        producer.close();
        this.restartBroker(restartType);
        connection = this.createConnection();
        connection.start();
        session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        Message msg = consumer.receive(10000L);
        Assert.assertNotNull((String)"Didn't receive the message", (Object)msg);
        producer = session.createProducer((Destination)this.destination);
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        producer.send((Message)message);
        producer.close();
        connection.close();
    }

    @Test
    public void testJobSchedulerStoreUsage() throws Exception {
        this.broker.getSystemUsage().getJobSchedulerUsage().setLimit(10240L);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = factory.createConnection();
        connection.start();
        Session sess = connection.createSession(false, 1);
        long time = 5000L;
        final ProducerThread producer = new ProducerThread(sess, (Destination)this.destination){

            protected Message createMessage(int i) throws Exception {
                Message message = super.createMessage(i);
                message.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
                return message;
            }
        };
        producer.setMessageCount(100);
        producer.start();
        MessageConsumer consumer = sess.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(100);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        Thread.sleep(10000L);
        Assert.assertEquals((long)100L, (long)latch.getCount());
        this.broker.getSystemUsage().getJobSchedulerUsage().setLimit(0x2100000L);
        Thread.sleep(10000L);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return producer.getSentCount() == producer.getMessageCount();
            }
        }, (long)20000L);
        Assert.assertEquals((String)"Producer didn't send all messages", (long)producer.getMessageCount(), (long)producer.getSentCount());
        latch.await(20000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals((String)"Consumer did not receive all messages.", (long)0L, (long)latch.getCount());
        connection.close();
    }
}

