package org.apache.activemq.store.journal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:META-INF/lib/activemq-core-5.4.2.jar:org/apache/activemq/store/journal/JournalMessageStore.class */
public class JournalMessageStore extends AbstractMessageStore {
    private static final Log LOG = LogFactory.getLog(JournalMessageStore.class);
    protected final JournalPersistenceAdapter peristenceAdapter;
    protected final JournalTransactionStore transactionStore;
    protected final MessageStore longTermStore;
    protected final TransactionTemplate transactionTemplate;
    protected RecordLocation lastLocation;
    protected Set<RecordLocation> inFlightTxLocations;
    private Map<MessageId, Message> messages;
    private List<MessageAck> messageAcks;
    private Map<MessageId, Message> cpAddedMessageIds;
    private MemoryUsage memoryUsage;

    public JournalMessageStore(JournalPersistenceAdapter journalPersistenceAdapter, MessageStore messageStore, ActiveMQDestination activeMQDestination) {
        super(activeMQDestination);
        this.inFlightTxLocations = new HashSet();
        this.messages = new LinkedHashMap();
        this.messageAcks = new ArrayList();
        this.peristenceAdapter = journalPersistenceAdapter;
        this.transactionStore = journalPersistenceAdapter.getTransactionStore();
        this.longTermStore = messageStore;
        this.transactionTemplate = new TransactionTemplate(journalPersistenceAdapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
    public void setMemoryUsage(MemoryUsage memoryUsage) {
        this.memoryUsage = memoryUsage;
        this.longTermStore.setMemoryUsage(memoryUsage);
    }

    @Override // org.apache.activemq.store.MessageStore
    public void addMessage(ConnectionContext connectionContext, final Message message) throws IOException {
        final MessageId messageId = message.getMessageId();
        final boolean isDebugEnabled = LOG.isDebugEnabled();
        message.incrementReferenceCount();
        final RecordLocation writeCommand = this.peristenceAdapter.writeCommand(message, message.isResponseRequired());
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                LOG.debug("Journalled message add for: " + messageId + ", at: " + writeCommand);
            }
            addMessage(message, writeCommand);
        } else {
            if (isDebugEnabled) {
                LOG.debug("Journalled transacted message add for: " + messageId + ", at: " + writeCommand);
            }
            synchronized (this) {
                this.inFlightTxLocations.add(writeCommand);
            }
            this.transactionStore.addMessage(this, message, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.store.journal.JournalMessageStore.1
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (isDebugEnabled) {
                        JournalMessageStore.LOG.debug("Transacted message add commit for: " + messageId + ", at: " + writeCommand);
                    }
                    synchronized (JournalMessageStore.this) {
                        JournalMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        JournalMessageStore.this.addMessage(message, writeCommand);
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (isDebugEnabled) {
                        JournalMessageStore.LOG.debug("Transacted message add rollback for: " + messageId + ", at: " + writeCommand);
                    }
                    synchronized (JournalMessageStore.this) {
                        JournalMessageStore.this.inFlightTxLocations.remove(writeCommand);
                    }
                    message.decrementReferenceCount();
                }
            });
        }
    }

    void addMessage(Message message, RecordLocation recordLocation) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            this.messages.put(message.getMessageId(), message);
        }
    }

    public void replayAddMessage(ConnectionContext connectionContext, Message message) {
        try {
            if (this.longTermStore.getMessage(message.getMessageId()) == null) {
                this.longTermStore.addMessage(connectionContext, message);
            }
        } catch (Throwable th) {
            LOG.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + th);
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeMessage(ConnectionContext connectionContext, final MessageAck messageAck) throws IOException {
        final boolean isDebugEnabled = LOG.isDebugEnabled();
        JournalQueueAck journalQueueAck = new JournalQueueAck();
        journalQueueAck.setDestination(this.destination);
        journalQueueAck.setMessageAck(messageAck);
        final RecordLocation writeCommand = this.peristenceAdapter.writeCommand(journalQueueAck, messageAck.isResponseRequired());
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                LOG.debug("Journalled message remove for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
            }
            removeMessage(messageAck, writeCommand);
        } else {
            if (isDebugEnabled) {
                LOG.debug("Journalled transacted message remove for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
            }
            synchronized (this) {
                this.inFlightTxLocations.add(writeCommand);
            }
            this.transactionStore.removeMessage(this, messageAck, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.store.journal.JournalMessageStore.2
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (isDebugEnabled) {
                        JournalMessageStore.LOG.debug("Transacted message remove commit for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
                    }
                    synchronized (JournalMessageStore.this) {
                        JournalMessageStore.this.inFlightTxLocations.remove(writeCommand);
                        JournalMessageStore.this.removeMessage(messageAck, writeCommand);
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (isDebugEnabled) {
                        JournalMessageStore.LOG.debug("Transacted message remove rollback for: " + messageAck.getLastMessageId() + ", at: " + writeCommand);
                    }
                    synchronized (JournalMessageStore.this) {
                        JournalMessageStore.this.inFlightTxLocations.remove(writeCommand);
                    }
                }
            });
        }
    }

    final void removeMessage(MessageAck messageAck, RecordLocation recordLocation) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            Message remove = this.messages.remove(messageAck.getLastMessageId());
            if (remove == null) {
                this.messageAcks.add(messageAck);
            } else {
                remove.decrementReferenceCount();
            }
        }
    }

    public void replayRemoveMessage(ConnectionContext connectionContext, MessageAck messageAck) {
        try {
            if (this.longTermStore.getMessage(messageAck.getLastMessageId()) != null) {
                this.longTermStore.removeMessage(connectionContext, messageAck);
            }
        } catch (Throwable th) {
            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + th);
        }
    }

    public RecordLocation checkpoint() throws IOException {
        return checkpoint(null);
    }

    public RecordLocation checkpoint(final Callback callback) throws IOException {
        final List<MessageAck> list;
        ArrayList arrayList;
        RecordLocation recordLocation;
        final int maxCheckpointMessageAddSize = this.peristenceAdapter.getMaxCheckpointMessageAddSize();
        synchronized (this) {
            this.cpAddedMessageIds = this.messages;
            list = this.messageAcks;
            arrayList = new ArrayList(this.inFlightTxLocations);
            this.messages = new LinkedHashMap();
            this.messageAcks = new ArrayList();
        }
        this.transactionTemplate.run(new Callback() { // from class: org.apache.activemq.store.journal.JournalMessageStore.3
            @Override // org.apache.activemq.util.Callback
            public void execute() throws Exception {
                int i = 0;
                PersistenceAdapter persistenceAdapter = JournalMessageStore.this.transactionTemplate.getPersistenceAdapter();
                ConnectionContext context = JournalMessageStore.this.transactionTemplate.getContext();
                synchronized (JournalMessageStore.this) {
                    for (Message message : JournalMessageStore.this.cpAddedMessageIds.values()) {
                        try {
                            JournalMessageStore.this.longTermStore.addMessage(context, message);
                        } catch (Throwable th) {
                            JournalMessageStore.LOG.warn("Message could not be added to long term store: " + th.getMessage(), th);
                        }
                        i += message.getSize();
                        message.decrementReferenceCount();
                        if (i >= maxCheckpointMessageAddSize) {
                            persistenceAdapter.commitTransaction(context);
                            persistenceAdapter.beginTransaction(context);
                            i = 0;
                        }
                    }
                }
                persistenceAdapter.commitTransaction(context);
                persistenceAdapter.beginTransaction(context);
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    try {
                        JournalMessageStore.this.longTermStore.removeMessage(JournalMessageStore.this.transactionTemplate.getContext(), (MessageAck) it.next());
                    } catch (Throwable th2) {
                        JournalMessageStore.LOG.debug("Message could not be removed from long term store: " + th2.getMessage(), th2);
                    }
                }
                if (callback != null) {
                    callback.execute();
                }
            }
        });
        synchronized (this) {
            this.cpAddedMessageIds = null;
        }
        if (arrayList.size() > 0) {
            Collections.sort(arrayList);
            return (RecordLocation) arrayList.get(0);
        }
        synchronized (this) {
            recordLocation = this.lastLocation;
        }
        return recordLocation;
    }

    @Override // org.apache.activemq.store.MessageStore
    public Message getMessage(MessageId messageId) throws IOException {
        Message message;
        synchronized (this) {
            message = this.messages.get(messageId);
            if (message == null && this.cpAddedMessageIds != null) {
                message = this.cpAddedMessageIds.get(messageId);
            }
        }
        return message != null ? message : this.longTermStore.getMessage(messageId);
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recover(MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recover(messageRecoveryListener);
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.Service
    public void start() throws Exception {
        if (this.memoryUsage != null) {
            this.memoryUsage.addUsageListener(this.peristenceAdapter);
        }
        this.longTermStore.start();
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.Service
    public void stop() throws Exception {
        this.longTermStore.stop();
        if (this.memoryUsage != null) {
            this.memoryUsage.removeUsageListener(this.peristenceAdapter);
        }
    }

    public MessageStore getLongTermMessageStore() {
        return this.longTermStore;
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.removeAllMessages(connectionContext);
    }

    public void addMessageReference(ConnectionContext connectionContext, MessageId messageId, long j, String str) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    public String getMessageReference(MessageId messageId) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    @Override // org.apache.activemq.store.MessageStore
    public int getMessageCount() throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        return this.longTermStore.getMessageCount();
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recoverNextMessages(int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recoverNextMessages(i, messageRecoveryListener);
    }

    @Override // org.apache.activemq.store.MessageStore
    public void resetBatching() {
        this.longTermStore.resetBatching();
    }

    @Override // org.apache.activemq.store.AbstractMessageStore, org.apache.activemq.store.MessageStore
    public void setBatch(MessageId messageId) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.setBatch(messageId);
    }
}
