/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XASession;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AMQ7067Test {
    protected static Random r = new Random();
    static final String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
    protected BrokerService broker;
    protected ActiveMQXAConnection connection;
    protected XASession xaSession;
    protected XAResource xaRes;
    private final String xbean = "xbean:";
    private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq7067";
    private static final ActiveMQXAConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQXAConnectionFactory("tcp://localhost:61616");
    private static final ActiveMQConnectionFactory ACTIVE_MQ_NON_XA_CONNECTION_FACTORY = new ActiveMQConnectionFactory("tcp://localhost:61616");

    @Before
    public void setup() throws Exception {
        this.deleteData(new File("target/data"));
        this.createBroker();
    }

    @After
    public void shutdown() throws Exception {
        this.broker.stop();
    }

    public void setupXAConnection() throws Exception {
        this.connection = (ActiveMQXAConnection)ACTIVE_MQ_CONNECTION_FACTORY.createXAConnection();
        this.connection.start();
        this.xaSession = this.connection.createXASession();
        this.xaRes = this.xaSession.getXAResource();
    }

    private void createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker = BrokerFactory.createBroker((String)"xbean:src/test/resources/org/apache/activemq/bugs/amq7067/activemq.xml");
        this.broker.start();
    }

    @Test
    public void testXAPrepare() throws Exception {
        this.setupXAConnection();
        Queue holdKahaDb = this.xaSession.createQueue("holdKahaDb");
        MessageProducer holdKahaDbProducer = this.xaSession.createProducer((Destination)holdKahaDb);
        XATransactionId txid = AMQ7067Test.createXATransaction();
        System.out.println("****** create new txid = " + txid);
        this.xaRes.start((Xid)txid, 0);
        TextMessage helloMessage = this.xaSession.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        this.xaRes.end((Xid)txid, 0x4000000);
        final Queue queue = this.xaSession.createQueue("test");
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        this.xaRes.prepare((Xid)txid);
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        Xid[] xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)1L, (long)xids.length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.setupXAConnection();
        xids = this.xaRes.recover(0x1000000);
        System.out.println("****** recovered = " + xids);
        Assert.assertEquals((long)1L, (long)xids.length);
    }

    @Test
    public void testXAPrepareWithAckCompactionDoesNotLooseInflight() throws Exception {
        Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
        this.setupXAConnection();
        Queue holdKahaDb = this.xaSession.createQueue("holdKahaDb");
        MessageProducer holdKahaDbProducer = this.xaSession.createProducer((Destination)holdKahaDb);
        XATransactionId txid = AMQ7067Test.createXATransaction();
        System.out.println("****** create new txid = " + txid);
        this.xaRes.start((Xid)txid, 0);
        TextMessage helloMessage = this.xaSession.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        this.xaRes.end((Xid)txid, 0x4000000);
        final Queue queue = this.xaSession.createQueue("test");
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        this.xaRes.prepare((Xid)txid);
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        int limit = ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
        for (int i = 0; i < limit * 2; ++i) {
            this.broker.getPersistenceAdapter().checkpoint(true);
        }
        TimeUnit.SECONDS.sleep(5L);
        Xid[] xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)1L, (long)xids.length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.setupXAConnection();
        xids = this.xaRes.recover(0x1000000);
        System.out.println("****** recovered = " + xids);
        Assert.assertEquals((long)1L, (long)xids.length);
    }

    @Test
    public void testXACommitWithAckCompactionDoesNotLooseOutcomeOnFullRecovery() throws Exception {
        this.doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(true);
    }

    @Test
    public void testXARollbackWithAckCompactionDoesNotLooseOutcomeOnFullRecovery() throws Exception {
        this.doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(false);
    }

    protected void doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(boolean commit) throws Exception {
        ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).setCompactAcksAfterNoGC(2);
        Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
        this.setupXAConnection();
        Queue holdKahaDb = this.xaSession.createQueue("holdKahaDb");
        MessageProducer holdKahaDbProducer = this.xaSession.createProducer((Destination)holdKahaDb);
        XATransactionId txid = AMQ7067Test.createXATransaction();
        System.out.println("****** create new txid = " + txid);
        this.xaRes.start((Xid)txid, 0);
        TextMessage helloMessage = this.xaSession.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        this.xaRes.end((Xid)txid, 0x4000000);
        final Queue queue = this.xaSession.createQueue("test");
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        this.xaRes.prepare((Xid)txid);
        AMQ7067Test.produce(this.xaRes, this.xaSession, holdKahaDb, 1, 10);
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 50, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        if (commit) {
            this.xaRes.commit((Xid)txid, false);
        } else {
            this.xaRes.rollback((Xid)txid);
        }
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 50, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        int limit = ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
        for (int dataFilesToMove = 0; dataFilesToMove < 4; ++dataFilesToMove) {
            for (int i = 0; i < limit; ++i) {
                this.broker.getPersistenceAdapter().checkpoint(true);
            }
            TimeUnit.SECONDS.sleep(2L);
        }
        Xid[] xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)0L, (long)xids.length);
        this.connection.close();
        AMQ7067Test.curruptIndexFile(this.getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.setupXAConnection();
        xids = this.xaRes.recover(0x1000000);
        System.out.println("****** recovered = " + xids);
        Assert.assertEquals((long)0L, (long)xids.length);
    }

    @Test
    public void testXAcommit() throws Exception {
        this.setupXAConnection();
        Queue holdKahaDb = this.xaSession.createQueue("holdKahaDb");
        AMQ7067Test.createDanglingTransaction(this.xaRes, this.xaSession, holdKahaDb);
        MessageProducer holdKahaDbProducer = this.xaSession.createProducer((Destination)holdKahaDb);
        XATransactionId txid = AMQ7067Test.createXATransaction();
        System.out.println("****** create new txid = " + txid);
        this.xaRes.start((Xid)txid, 0);
        TextMessage helloMessage = this.xaSession.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        this.xaRes.end((Xid)txid, 0x4000000);
        this.xaRes.prepare((Xid)txid);
        final Queue queue = this.xaSession.createQueue("test");
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        this.xaRes.commit((Xid)txid, false);
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        Xid[] xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)1L, (long)xids.length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.setupXAConnection();
        xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)1L, (long)xids.length);
    }

    @Test
    public void testXArollback() throws Exception {
        this.setupXAConnection();
        Queue holdKahaDb = this.xaSession.createQueue("holdKahaDb");
        AMQ7067Test.createDanglingTransaction(this.xaRes, this.xaSession, holdKahaDb);
        MessageProducer holdKahaDbProducer = this.xaSession.createProducer((Destination)holdKahaDb);
        XATransactionId txid = AMQ7067Test.createXATransaction();
        System.out.println("****** create new txid = " + txid);
        this.xaRes.start((Xid)txid, 0);
        TextMessage helloMessage = this.xaSession.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        this.xaRes.end((Xid)txid, 0x4000000);
        this.xaRes.prepare((Xid)txid);
        Queue queue = this.xaSession.createQueue("test");
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        this.xaRes.rollback((Xid)txid);
        AMQ7067Test.produce(this.xaRes, this.xaSession, queue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue)this.broker.getRegionBroker().getDestinationMap().get(queue)).purge();
        Xid[] xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)1L, (long)xids.length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.setupXAConnection();
        xids = this.xaRes.recover(0x1000000);
        Assert.assertEquals((long)1L, (long)xids.length);
    }

    @Test
    public void testCommit() throws Exception {
        Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        Queue holdKahaDb = session.createQueue("holdKahaDb");
        MessageProducer holdKahaDbProducer = session.createProducer((Destination)holdKahaDb);
        TextMessage helloMessage = session.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        final Queue queue = session.createQueue("test");
        AMQ7067Test.produce(connection, queue, 100, 524288);
        session.commit();
        AMQ7067Test.produce(connection, queue, 100, 524288);
        System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), this.getQueueSize(holdKahaDb.getQueueName())));
        this.purgeQueue(queue.getQueueName());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        connection.close();
        AMQ7067Test.curruptIndexFile(this.getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.broker.waitUntilStarted();
        try {
            TimeUnit.SECONDS.sleep(1L);
            System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), this.getQueueSize(holdKahaDb.getQueueName())));
        }
        catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
        Assert.assertEquals((long)1L, (long)this.getQueueSize(holdKahaDb.getQueueName()));
    }

    @Test
    public void testRollback() throws Exception {
        Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        Queue holdKahaDb = session.createQueue("holdKahaDb");
        MessageProducer holdKahaDbProducer = session.createProducer((Destination)holdKahaDb);
        TextMessage helloMessage = session.createTextMessage(StringUtils.repeat((String)"a", (int)10));
        holdKahaDbProducer.send((Message)helloMessage);
        final Queue queue = session.createQueue("test");
        AMQ7067Test.produce(connection, queue, 100, 524288);
        session.rollback();
        AMQ7067Test.produce(connection, queue, 100, 524288);
        System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), this.getQueueSize(holdKahaDb.getQueueName())));
        this.purgeQueue(queue.getQueueName());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 0L == AMQ7067Test.this.getQueueSize(queue.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        connection.close();
        AMQ7067Test.curruptIndexFile(this.getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.createBroker();
        this.broker.waitUntilStarted();
        try {
            this.getQueueSize(holdKahaDb.getQueueName());
            Assert.fail((String)"expect InstanceNotFoundException");
        }
        catch (UndeclaredThrowableException expected) {
            Assert.assertTrue((boolean)(expected.getCause() instanceof InstanceNotFoundException));
        }
    }

    protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
        MessageProducer producer = xaSession.createProducer((Destination)queue);
        XATransactionId txId = AMQ7067Test.createXATransaction();
        xaRes.start((Xid)txId, 0);
        TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat((String)"dangler", (int)10));
        producer.send((Message)helloMessage);
        xaRes.end((Xid)txId, 0x4000000);
        xaRes.prepare((Xid)txId);
        System.out.println("****** createDanglingTransaction txId = " + txId);
    }

    protected static void produce(XAResource xaRes, XASession xaSession, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException {
        MessageProducer producer = xaSession.createProducer((Destination)queue);
        for (int i = 0; i < messageCount; ++i) {
            XATransactionId txid = AMQ7067Test.createXATransaction();
            xaRes.start((Xid)txid, 0);
            TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat((String)"a", (int)messageSize));
            producer.send((Message)helloMessage);
            xaRes.end((Xid)txid, 0x4000000);
            xaRes.commit((Xid)txid, true);
        }
    }

    protected static void produce(Connection connection, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException {
        Session session = connection.createSession(true, 0);
        MessageProducer producer = session.createProducer((Destination)queue);
        for (int i = 0; i < messageCount; ++i) {
            TextMessage helloMessage = session.createTextMessage(StringUtils.repeat((String)"a", (int)messageSize));
            producer.send((Message)helloMessage);
            session.commit();
        }
    }

    protected static XATransactionId createXATransaction() throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream os = new DataOutputStream(baos);
        os.writeLong(r.nextInt());
        os.close();
        byte[] bs = baos.toByteArray();
        XATransactionId xid = new XATransactionId();
        xid.setBranchQualifier(bs);
        xid.setGlobalTransactionId(bs);
        xid.setFormatId(55);
        return xid;
    }

    private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=" + JMXSupport.encodeObjectNamePart((String)xid.toString()));
        RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, RecoveredXATransactionViewMBean.class, true);
        return proxy;
    }

    private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String name) throws MalformedObjectNameException, JMSException {
        return (PersistenceAdapterViewMBean)this.broker.getManagementContext().newProxyInstance(BrokerMBeanSupport.createPersistenceAdapterName((String)this.broker.getBrokerObjectName().toString(), (String)name), PersistenceAdapterViewMBean.class, true);
    }

    private void deleteData(File file) throws Exception {
        String[] entries = file.list();
        if (entries == null) {
            return;
        }
        for (String s : entries) {
            File currentFile = new File(file.getPath(), s);
            if (currentFile.isDirectory()) {
                this.deleteData(currentFile);
            }
            currentFile.delete();
        }
        file.delete();
    }

    private long getQueueSize(String queueName) throws MalformedObjectNameException {
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + JMXSupport.encodeObjectNamePart((String)queueName));
        DestinationViewMBean proxy = (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, DestinationViewMBean.class, true);
        return proxy.getQueueSize();
    }

    private void purgeQueue(String queueName) throws MalformedObjectNameException, Exception {
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + JMXSupport.encodeObjectNamePart((String)queueName));
        QueueViewMBean proxy = (QueueViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, true);
        proxy.purge();
    }

    private String getDataDirectory() throws MalformedObjectNameException {
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
        BrokerViewMBean proxy = (BrokerViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true);
        return proxy.getDataDirectory();
    }

    protected static void curruptIndexFile(String dataPath) throws FileNotFoundException, UnsupportedEncodingException {
        PrintWriter writer = new PrintWriter(String.format("%s/kahadb/db.data", dataPath), "UTF-8");
        writer.println("asdasdasd");
        writer.close();
    }
}

