/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.amq;

import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.amq.AMQMessageStore;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.amq.RecoveryListenerAdapter;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AMQTopicMessageStore
extends AMQMessageStore
implements TopicMessageStore {
    private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
    private TopicReferenceStore topicReferenceStore;

    public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
        super(adapter, topicReferenceStore, destinationName);
        this.topicReferenceStore = topicReferenceStore;
    }

    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
        this.flush();
        this.topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
    }

    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
        RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
        this.topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
        if (recoveryListener.size() == 0) {
            this.flush();
            this.topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
        }
    }

    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
        return this.topicReferenceStore.lookupSubscription(clientId, subscriptionName);
    }

    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
        this.peristenceAdapter.writeCommand(subscriptionInfo, false);
        this.topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
        final boolean debug = LOG.isDebugEnabled();
        JournalTopicAck ack = new JournalTopicAck();
        ack.setDestination(this.destination);
        ack.setMessageId(messageId);
        ack.setMessageSequenceId(messageId.getBrokerSequenceId());
        ack.setSubscritionName(subscriptionName);
        ack.setClientId(clientId);
        ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
        final Location location = this.peristenceAdapter.writeCommand(ack, false);
        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
        if (!context.isInTransaction()) {
            if (debug) {
                LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
            }
            this.acknowledge(context, messageId, location, clientId, subscriptionName);
        } else {
            if (debug) {
                LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
            }
            this.lock.lock();
            try {
                this.inFlightTxLocations.add(location);
            }
            finally {
                this.lock.unlock();
            }
            this.transactionStore.acknowledge(this, ack, location);
            context.getTransaction().addSynchronization(new Synchronization(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void afterCommit() throws Exception {
                    if (debug) {
                        LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
                    }
                    AMQTopicMessageStore.this.lock.lock();
                    try {
                        AMQTopicMessageStore.this.inFlightTxLocations.remove(location);
                        AMQTopicMessageStore.this.acknowledge(context, messageId, location, clientId, subscriptionName);
                    }
                    finally {
                        AMQTopicMessageStore.this.lock.unlock();
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void afterRollback() throws Exception {
                    if (debug) {
                        LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
                    }
                    AMQTopicMessageStore.this.lock.lock();
                    try {
                        AMQTopicMessageStore.this.inFlightTxLocations.remove(location);
                    }
                    finally {
                        AMQTopicMessageStore.this.lock.unlock();
                    }
                }
            });
        }
    }

    public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
        try {
            SubscriptionInfo sub = this.topicReferenceStore.lookupSubscription(clientId, subscritionName);
            if (sub != null) {
                this.topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
                return true;
            }
        }
        catch (Throwable e) {
            LOG.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void acknowledge(ConnectionContext context, MessageId messageId, Location location, String clientId, String subscriptionName) throws IOException {
        MessageAck ack = null;
        this.lock.lock();
        try {
            this.lastLocation = location;
        }
        finally {
            this.lock.unlock();
        }
        if (this.topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)) {
            ack = new MessageAck();
            ack.setLastMessageId(messageId);
        }
        if (ack != null) {
            this.removeMessage(context, ack);
        }
    }

    public TopicReferenceStore getTopicReferenceStore() {
        return this.topicReferenceStore;
    }

    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
        this.topicReferenceStore.deleteSubscription(clientId, subscriptionName);
    }

    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return this.topicReferenceStore.getAllSubscriptions();
    }

    public int getMessageCount(String clientId, String subscriberName) throws IOException {
        this.flush();
        return this.topicReferenceStore.getMessageCount(clientId, subscriberName);
    }

    public void resetBatching(String clientId, String subscriptionName) {
        this.topicReferenceStore.resetBatching(clientId, subscriptionName);
    }
}

