/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4636Test {
    private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4636Test.class);
    private String transportUrl = "tcp://0.0.0.0:0";
    private BrokerService broker;
    EmbeddedDataSource embeddedDataSource;
    CountDownLatch throwSQLException = new CountDownLatch(0);

    @Before
    public void startBroker() throws Exception {
        this.broker = this.createBroker();
        this.broker.deleteAllMessages();
        this.broker.start();
        this.broker.waitUntilStarted();
        LOG.info("Broker started...");
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            LOG.info("Stopping broker...");
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        if (this.embeddedDataSource != null) {
            DataSourceServiceSupport.shutdownDefaultDataSource((DataSource)this.embeddedDataSource);
        }
    }

    protected BrokerService createBroker() throws Exception {
        this.embeddedDataSource = (EmbeddedDataSource)DataSourceServiceSupport.createDataSource((String)IOHelper.getDefaultDataDirectory());
        this.embeddedDataSource.setCreateDatabase("create");
        this.embeddedDataSource.getConnection().close();
        TestJDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
        jdbc.setDataSource((DataSource)this.embeddedDataSource);
        jdbc.setLockKeepAlivePeriod(1000L);
        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
        leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
        jdbc.setLocker((Locker)leaseDatabaseLocker);
        this.broker = new BrokerService();
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setPersistenceAdapter((PersistenceAdapter)jdbc);
        this.broker.setIoExceptionHandler((IOExceptionHandler)new LeaseLockerIOExceptionHandler());
        this.transportUrl = this.broker.addConnector(this.transportUrl).getPublishableConnectString();
        return this.broker;
    }

    @Test
    public void testProducerWithDBShutdown() throws Exception {
        String failoverTransportURL = "failover:(" + this.transportUrl + ")?timeout=1000";
        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false);
    }

    @Test
    public void testTransactedProducerCommitWithDBShutdown() throws Exception {
        String failoverTransportURL = "failover:(" + this.transportUrl + ")?timeout=1000";
        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
        try {
            this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true);
            Assert.fail((String)"Expect rollback after failover - inddoubt commit");
        }
        catch (TransactionRolledBackException expectedInDoubt) {
            LOG.info("Got rollback after failover failed commit", (Throwable)expectedInDoubt);
        }
    }

    @Test
    public void testTransactedProducerRollbackWithDBShutdown() throws Exception {
        String failoverTransportURL = "failover:(" + this.transportUrl + ")?timeout=1000";
        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createDurableConsumer(String topic, String transportURL) throws JMSException {
        LOG.info("*** createDurableConsumer() called ...");
        try (Connection connection = null;){
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
            connection = factory.createConnection();
            connection.setClientID("myconn1");
            Session session = connection.createSession(false, 1);
            Topic destination = session.createTopic(topic);
            TopicSubscriber topicSubscriber = session.createDurableSubscriber(destination, "MySub1");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit) throws JMSException {
        try (Connection connection = null;){
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
            connection = factory.createConnection();
            Session session = connection.createSession(transacted, transacted ? 0 : 1);
            Topic destination = session.createTopic(topic);
            MessageProducer producer = session.createProducer((Destination)destination);
            producer.setDeliveryMode(2);
            TextMessage m = session.createTextMessage("testMessage");
            LOG.info("*** send message to broker...");
            this.throwSQLException = new CountDownLatch(1);
            producer.send((Message)m);
            if (transacted) {
                if (commit) {
                    session.commit();
                } else {
                    session.rollback();
                }
            }
            LOG.info("*** Finished send message to broker");
        }
    }

    public class TestTransactionContext
    extends TransactionContext {
        public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException {
            super(jdbcPersistenceAdapter, -1, -1);
        }

        public void executeBatch() throws SQLException {
            if (AMQ4636Test.this.throwSQLException.getCount() > 0L) {
                AMQ4636Test.this.throwSQLException.countDown();
                throw new SQLException("TEST SQL EXCEPTION");
            }
            super.executeBatch();
        }
    }

    public class TestJDBCPersistenceAdapter
    extends JDBCPersistenceAdapter {
        public TransactionContext getTransactionContext() throws IOException {
            return new TestTransactionContext(this);
        }
    }
}

