package com.amazon.sqs.javamessaging;

import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazon.sqs.javamessaging.acknowledge.AcknowledgeMode;
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:BOOT-INF/lib/amazon-sqs-java-messaging-lib-1.0.4.jar:com/amazon/sqs/javamessaging/SQSSessionCallbackScheduler.class */
public class SQSSessionCallbackScheduler implements Runnable {
    private static final Log LOG = LogFactory.getLog(SQSSessionCallbackScheduler.class);
    private AcknowledgeMode acknowledgeMode;
    private SQSSession session;
    private NegativeAcknowledger negativeAcknowledger;
    private final Acknowledger acknowledger;
    private SQSMessageConsumer consumerCloseAfterCallback;
    private volatile boolean closed = false;
    protected ArrayDeque<SQSSession.CallbackEntry> callbackQueue = new ArrayDeque<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SQSSessionCallbackScheduler(SQSSession sQSSession, AcknowledgeMode acknowledgeMode, Acknowledger acknowledger, NegativeAcknowledger negativeAcknowledger) {
        this.session = sQSSession;
        this.acknowledgeMode = acknowledgeMode;
        this.acknowledger = acknowledger;
        this.negativeAcknowledger = negativeAcknowledger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.closed = true;
        synchronized (this.callbackQueue) {
            this.callbackQueue.notify();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        SQSSession.CallbackEntry callbackEntry = null;
        while (!this.closed) {
            try {
                try {
                    synchronized (this.callbackQueue) {
                        callbackEntry = this.callbackQueue.pollFirst();
                        if (callbackEntry == null) {
                            try {
                                this.callbackQueue.wait();
                            } catch (InterruptedException e) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("wait on empty callback queue interrupted: " + e.getMessage());
                                }
                            }
                        } else {
                            MessageListener messageListener = callbackEntry.getMessageListener();
                            SQSMessageConsumerPrefetch.MessageManager messageManager = callbackEntry.getMessageManager();
                            SQSMessage sQSMessage = (SQSMessage) messageManager.getMessage();
                            SQSMessageConsumer messageConsumer = messageManager.getPrefetchManager().getMessageConsumer();
                            if (messageConsumer.isClosed()) {
                                nackReceivedMessage(sQSMessage);
                            } else {
                                try {
                                    this.session.startingCallback(messageConsumer);
                                    try {
                                        messageManager.getPrefetchManager().messageDispatched();
                                        int originalAcknowledgeMode = this.acknowledgeMode.getOriginalAcknowledgeMode();
                                        boolean z = true;
                                        if (messageListener != null) {
                                            if (originalAcknowledgeMode != 1) {
                                                try {
                                                    try {
                                                        this.acknowledger.notifyMessageReceived(sQSMessage);
                                                    } catch (JMSException e2) {
                                                        LOG.warn("Unable to complete message dispatch for the message " + sQSMessage.getSQSMessageId(), e2);
                                                        if (z) {
                                                            nackReceivedMessage(sQSMessage);
                                                        }
                                                    }
                                                } catch (Throwable th) {
                                                    if (1 != 0) {
                                                        nackReceivedMessage(sQSMessage);
                                                    }
                                                    throw th;
                                                }
                                            }
                                            try {
                                                try {
                                                    messageListener.onMessage(sQSMessage);
                                                    if (0 == 0) {
                                                        if (originalAcknowledgeMode == 1) {
                                                            sQSMessage.acknowledge();
                                                        }
                                                        z = false;
                                                    }
                                                } catch (Throwable th2) {
                                                    LOG.info("Exception thrown from onMessage callback for message " + sQSMessage.getSQSMessageId(), th2);
                                                    if (1 == 0) {
                                                        if (originalAcknowledgeMode == 1) {
                                                            sQSMessage.acknowledge();
                                                        }
                                                        z = false;
                                                    }
                                                }
                                            } catch (Throwable th3) {
                                                if (0 == 0) {
                                                    if (originalAcknowledgeMode == 1) {
                                                        sQSMessage.acknowledge();
                                                    }
                                                    z = false;
                                                }
                                                throw th3;
                                                break;
                                            }
                                        }
                                        if (z) {
                                            nackReceivedMessage(sQSMessage);
                                        }
                                        if (this.consumerCloseAfterCallback != null) {
                                            this.consumerCloseAfterCallback.doClose();
                                            this.consumerCloseAfterCallback = null;
                                        }
                                        this.session.finishedCallback();
                                        messageManager.getPrefetchManager().messageListenerReady();
                                    } catch (Throwable th4) {
                                        this.session.finishedCallback();
                                        messageManager.getPrefetchManager().messageListenerReady();
                                        throw th4;
                                    }
                                } catch (JMSException e3) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Not running callback: " + e3.getMessage());
                                    }
                                }
                            }
                        }
                    }
                } catch (Throwable th5) {
                    LOG.error("Unexpected exception thrown during the run of the scheduled callback", th5);
                }
            } finally {
                if (callbackEntry != null) {
                    nackReceivedMessage((SQSMessage) callbackEntry.getMessageManager().getMessage());
                }
                nackQueuedMessages();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConsumerCloseAfterCallback(SQSMessageConsumer sQSMessageConsumer) {
        this.consumerCloseAfterCallback = sQSMessageConsumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    public void scheduleCallBacks(MessageListener messageListener, List<SQSMessageConsumerPrefetch.MessageManager> list) {
        synchronized (this.callbackQueue) {
            try {
                Iterator<SQSMessageConsumerPrefetch.MessageManager> it = list.iterator();
                while (it.hasNext()) {
                    this.callbackQueue.addLast(new SQSSession.CallbackEntry(messageListener, it.next()));
                }
                this.callbackQueue.notify();
            } catch (Throwable th) {
                this.callbackQueue.notify();
                throw th;
            }
        }
    }

    void nackQueuedMessages() {
        synchronized (this.callbackQueue) {
            try {
                ArrayList arrayList = new ArrayList();
                while (!this.callbackQueue.isEmpty()) {
                    arrayList.add(SQSMessageIdentifier.fromSQSMessage((SQSMessage) this.callbackQueue.pollFirst().getMessageManager().getMessage()));
                }
                if (!arrayList.isEmpty()) {
                    this.negativeAcknowledger.bulkAction(arrayList, arrayList.size());
                }
            } catch (JMSException e) {
                LOG.warn("Caught exception while nacking the remaining messages on session callback queue", e);
            }
        }
    }

    private void nackReceivedMessage(SQSMessage sQSMessage) {
        try {
            SQSMessageIdentifier fromSQSMessage = SQSMessageIdentifier.fromSQSMessage(sQSMessage);
            ArrayList arrayList = new ArrayList();
            arrayList.add(fromSQSMessage);
            if (fromSQSMessage.getGroupId() != null) {
                arrayList.addAll(purgeScheduledCallbacksForQueuesAndGroups(Collections.singletonMap(fromSQSMessage.getQueueUrl(), Collections.singleton(fromSQSMessage.getGroupId()))));
            }
            this.negativeAcknowledger.bulkAction(arrayList, arrayList.size());
        } catch (JMSException e) {
            LOG.warn("Unable to nack the message " + sQSMessage.getSQSMessageId(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SQSMessageIdentifier> purgeScheduledCallbacksForQueuesAndGroups(Map<String, Set<String>> map) throws JMSException {
        ArrayList arrayList = new ArrayList();
        synchronized (this.callbackQueue) {
            Iterator<SQSSession.CallbackEntry> it = this.callbackQueue.iterator();
            while (it.hasNext()) {
                SQSSession.CallbackEntry next = it.next();
                SQSMessageIdentifier fromSQSMessage = SQSMessageIdentifier.fromSQSMessage((SQSMessage) next.getMessageManager().getMessage());
                Set<String> set = map.get(fromSQSMessage.getQueueUrl());
                if (set != null && set.contains(fromSQSMessage.getGroupId())) {
                    arrayList.add(fromSQSMessage);
                    it.remove();
                    next.getMessageManager().getPrefetchManager().messageDispatched();
                }
            }
        }
        return arrayList;
    }
}
