/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.client.impl;

import java.io.File;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.CompressedLargeMessageControllerImpl;
import org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl;
import org.apache.activemq.artemis.shaded.org.jboss.logging.Logger;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;

public final class ClientConsumerImpl
implements ClientConsumerInternal {
    private static final Logger logger = Logger.getLogger(ClientConsumerImpl.class);
    private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000L;
    private static final int NUM_PRIORITIES = 10;
    public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
    private final ClientSessionInternal session;
    private final SessionContext sessionContext;
    private final ConsumerContext consumerContext;
    private final SimpleString filterString;
    private final int priority;
    private final SimpleString queueName;
    private final boolean browseOnly;
    private final Executor sessionExecutor;
    private final Executor flowControlExecutor;
    private final ReusableLatch pendingFlowControl = new ReusableLatch(0);
    private final int initialWindow;
    private final int clientWindowSize;
    private final int ackBatchSize;
    private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(10);
    private final Runner runner = new Runner();
    private LargeMessageControllerImpl currentLargeMessageController;
    private ClientMessageInternal largeMessageReceived;
    private final TokenBucketLimiter rateLimiter;
    private volatile Thread receiverThread;
    private volatile Thread onMessageThread;
    private volatile MessageHandler handler;
    private volatile boolean closing;
    private volatile boolean closed;
    private int creditsToSend;
    private volatile boolean failedOver;
    private volatile Exception lastException;
    private int ackBytes;
    private volatile ClientMessageInternal lastAckedMessage;
    private boolean stopped = false;
    private AtomicLong forceDeliveryCount = new AtomicLong(0L);
    private final ClientSession.QueueQuery queueInfo;
    private volatile boolean ackIndividually;
    private final ClassLoader contextClassLoader;

    public ClientConsumerImpl(ClientSessionInternal session, ConsumerContext consumerContext, SimpleString queueName, SimpleString filterString, int priority, boolean browseOnly, int initialWindow, int clientWindowSize, int ackBatchSize, TokenBucketLimiter rateLimiter, Executor executor, Executor flowControlExecutor, SessionContext sessionContext, ClientSession.QueueQuery queueInfo, ClassLoader contextClassLoader) {
        this.consumerContext = consumerContext;
        this.queueName = queueName;
        this.filterString = filterString;
        this.priority = priority;
        this.browseOnly = browseOnly;
        this.sessionContext = sessionContext;
        this.session = session;
        this.rateLimiter = rateLimiter;
        this.sessionExecutor = executor;
        this.initialWindow = initialWindow;
        this.clientWindowSize = clientWindowSize;
        this.ackBatchSize = ackBatchSize;
        this.queueInfo = queueInfo;
        this.contextClassLoader = contextClassLoader;
        this.flowControlExecutor = flowControlExecutor;
        if (logger.isTraceEnabled()) {
            logger.trace((Object)(this + ":: being created at"), new Exception("trace"));
        }
    }

    @Override
    public ConsumerContext getConsumerContext() {
        return this.consumerContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClientMessage receive(long timeout, boolean forcingDelivery) throws ActiveMQException {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ")");
        }
        this.checkClosed();
        if (this.largeMessageReceived != null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> discard LargeMessage body for " + this.largeMessageReceived);
            }
            this.largeMessageReceived.discardBody();
            this.largeMessageReceived = null;
        }
        if (this.rateLimiter != null) {
            this.rateLimiter.limit();
        }
        if (this.handler != null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> throwing messageHandlerSet");
            }
            throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
        }
        if (this.clientWindowSize == 0) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> start slowConsumer");
            }
            this.startSlowConsumer();
        }
        this.receiverThread = Thread.currentThread();
        boolean deliveryForced = false;
        boolean callForceDelivery = false;
        long start = -1L;
        long toWait = timeout == 0L ? Long.MAX_VALUE : timeout;
        try {
            ClientConsumerImpl clientConsumerImpl;
            block37: {
                ClientMessageInternal m;
                block38: {
                    while (true) {
                        m = null;
                        clientConsumerImpl = this;
                        synchronized (clientConsumerImpl) {
                            while ((this.stopped || (m = this.buffer.poll()) == null) && !this.closed && toWait > 0L) {
                                if (start == -1L) {
                                    start = System.currentTimeMillis();
                                }
                                if (m == null && forcingDelivery) {
                                    if (this.stopped) break;
                                    if (!deliveryForced) {
                                        callForceDelivery = true;
                                        break;
                                    }
                                }
                                try {
                                    this.wait(toWait);
                                }
                                catch (InterruptedException e) {
                                    throw new ActiveMQInterruptedException(e);
                                }
                                if (m != null || this.closed) break;
                                long now = System.currentTimeMillis();
                                toWait -= now - start;
                                start = now;
                            }
                        }
                        if (this.failedOver) {
                            if (m == null) {
                                if (logger.isTraceEnabled()) {
                                    logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> m == null and failover");
                                }
                                this.failedOver = false;
                                deliveryForced = false;
                                toWait = timeout == 0L ? Long.MAX_VALUE : timeout;
                                continue;
                            }
                            if (logger.isTraceEnabled()) {
                                logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> failedOver, but m != null, being " + m);
                            }
                            this.failedOver = false;
                        }
                        if (callForceDelivery) {
                            if (logger.isTraceEnabled()) {
                                logger.trace(this + "::Forcing delivery");
                            }
                            this.sessionContext.forceDelivery(this, this.forceDeliveryCount.getAndIncrement());
                            callForceDelivery = false;
                            deliveryForced = true;
                            continue;
                        }
                        if (m == null) break block37;
                        this.session.workDone();
                        if (m.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                            long seq = m.getLongProperty(FORCED_DELIVERY_MESSAGE);
                            if (forcingDelivery && deliveryForced && seq == this.forceDeliveryCount.get() - 1L) {
                                this.resetIfSlowConsumer();
                                if (logger.isTraceEnabled()) {
                                    logger.trace(this + "::There was nothing on the queue, leaving it now:: returning null");
                                }
                                ClientMessage clientMessage = null;
                                return clientMessage;
                            }
                            if (!logger.isTraceEnabled()) continue;
                            logger.trace(this + "::Ignored force delivery answer as it belonged to another call");
                            continue;
                        }
                        boolean expired = m.isExpired();
                        this.flowControlBeforeConsumption(m);
                        if (!expired) break block38;
                        m.discardBody();
                        this.session.expire(this, m);
                        if (this.clientWindowSize == 0) {
                            this.startSlowConsumer();
                        }
                        if (toWait <= 0L) break;
                    }
                    ClientMessage clientMessage = null;
                    return clientMessage;
                }
                if (m.isLargeMessage()) {
                    this.largeMessageReceived = m;
                }
                if (logger.isTraceEnabled()) {
                    logger.trace(this + "::Returning " + m);
                }
                ClientMessageInternal clientMessageInternal = m;
                return clientMessageInternal;
            }
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::Returning null");
            }
            this.resetIfSlowConsumer();
            clientConsumerImpl = null;
            return clientConsumerImpl;
        }
        finally {
            this.receiverThread = null;
        }
    }

    @Override
    public ClientMessage receive(long timeout) throws ActiveMQException {
        ClientMessage msg;
        if (logger.isTraceEnabled()) {
            logger.trace(this + ":: receive(" + timeout + ")");
        }
        if ((msg = this.receive(timeout, false)) == null && !this.closed) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: receive(" + timeout + ") -> null, trying again with receive(0)");
            }
            msg = this.receive(0L, true);
        }
        if (logger.isTraceEnabled()) {
            logger.trace(this + ":: returning " + msg);
        }
        return msg;
    }

    @Override
    public ClientMessage receive() throws ActiveMQException {
        return this.receive(0L, false);
    }

    @Override
    public ClientMessage receiveImmediate() throws ActiveMQException {
        return this.receive(0L, true);
    }

    @Override
    public MessageHandler getMessageHandler() throws ActiveMQException {
        this.checkClosed();
        return this.handler;
    }

    @Override
    public Thread getCurrentThread() {
        if (this.onMessageThread != null) {
            return this.onMessageThread;
        }
        return this.receiverThread;
    }

    @Override
    public synchronized ClientConsumerImpl setMessageHandler(MessageHandler theHandler) throws ActiveMQException {
        boolean noPreviousHandler;
        this.checkClosed();
        if (this.receiverThread != null) {
            throw ActiveMQClientMessageBundle.BUNDLE.inReceive();
        }
        boolean bl = noPreviousHandler = this.handler == null;
        if (this.handler != theHandler && this.clientWindowSize == 0) {
            this.startSlowConsumer();
        }
        this.handler = theHandler;
        if (this.handler != null && noPreviousHandler) {
            this.requeueExecutors();
        } else if (this.handler == null && !noPreviousHandler) {
            this.waitForOnMessageToComplete(true);
        }
        return this;
    }

    @Override
    public void close() throws ActiveMQException {
        this.doCleanUp(true);
    }

    @Override
    public Thread prepareForClose(final FutureLatch future) throws ActiveMQException {
        this.closing = true;
        this.resetLargeMessageController();
        this.sessionExecutor.execute(new Runnable(){

            @Override
            public void run() {
                future.run();
            }
        });
        return this.onMessageThread;
    }

    @Override
    public void cleanUp() {
        try {
            this.doCleanUp(false);
        }
        catch (ActiveMQException e) {
            ActiveMQClientLogger.LOGGER.failedCleaningUp(this.toString());
        }
    }

    @Override
    public boolean isClosed() {
        return this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop(boolean waitForOnMessage) throws ActiveMQException {
        if (this.browseOnly) {
            return;
        }
        ClientConsumerImpl clientConsumerImpl = this;
        synchronized (clientConsumerImpl) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
        }
        this.waitForOnMessageToComplete(waitForOnMessage);
    }

    @Override
    public void clearAtFailover() {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::ClearAtFailover");
        }
        this.clearBuffer();
        this.stopped = true;
        this.resetLargeMessageController();
        this.lastAckedMessage = null;
        this.creditsToSend = 0;
        this.failedOver = true;
        this.ackIndividually = false;
    }

    @Override
    public synchronized void start() {
        this.stopped = false;
        this.requeueExecutors();
    }

    @Override
    public Exception getLastException() {
        return this.lastException;
    }

    @Override
    public ClientSession.QueueQuery getQueueInfo() {
        return this.queueInfo;
    }

    @Override
    public long getForceDeliveryCount() {
        return this.forceDeliveryCount.get();
    }

    @Override
    public SimpleString getFilterString() {
        return this.filterString;
    }

    @Override
    public int getPriority() {
        return this.priority;
    }

    @Override
    public SimpleString getQueueName() {
        return this.queueName;
    }

    @Override
    public boolean isBrowseOnly() {
        return this.browseOnly;
    }

    @Override
    public synchronized void handleMessage(ClientMessageInternal message) throws Exception {
        if (this.closing) {
            return;
        }
        if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED).booleanValue()) {
            this.handleCompressedMessage(message);
        } else {
            this.handleRegularMessage(message);
        }
    }

    private void handleRegularMessage(ClientMessageInternal message) {
        if (message.getAddress() == null) {
            message.setAddress(this.queueInfo.getAddress());
        }
        message.onReceipt(this);
        if (!this.ackIndividually && message.getPriority() != 4 && !message.containsProperty(FORCED_DELIVERY_MESSAGE)) {
            this.ackIndividually = true;
        }
        this.buffer.addTail(message, message.getPriority());
        if (this.handler != null) {
            if (!this.stopped) {
                this.queueExecutor();
            }
        } else {
            this.notify();
        }
    }

    private void handleCompressedMessage(ClientMessageInternal clMessage) throws Exception {
        ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl();
        largeMessage.retrieveExistingData(clMessage);
        File largeMessageCache = null;
        if (this.session.isCacheLargeMessageClient()) {
            largeMessageCache = File.createTempFile("tmp-large-message-" + largeMessage.getMessageID() + "-", ".tmp");
            largeMessageCache.deleteOnExit();
        }
        ClientSessionFactory sf = this.session.getSessionFactory();
        ServerLocator locator = sf.getServerLocator();
        long callTimeout = locator.getCallTimeout();
        this.currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessage.getLargeMessageSize(), callTimeout, largeMessageCache);
        this.currentLargeMessageController.setLocal(true);
        ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
        int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
        byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer());
        largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(this.currentLargeMessageController));
        this.currentLargeMessageController.addPacket(body, body.length, false);
        this.handleRegularMessage(largeMessage);
    }

    @Override
    public synchronized void handleLargeMessage(ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception {
        if (this.closing) {
            return;
        }
        File largeMessageCache = null;
        if (this.session.isCacheLargeMessageClient()) {
            largeMessageCache = File.createTempFile("tmp-large-message-" + clientLargeMessage.getMessageID() + "-", ".tmp");
            largeMessageCache.deleteOnExit();
        }
        ClientSessionFactory sf = this.session.getSessionFactory();
        ServerLocator locator = sf.getServerLocator();
        long callTimeout = locator.getCallTimeout();
        this.currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessageSize, callTimeout, largeMessageCache);
        if (clientLargeMessage.isCompressed()) {
            clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(this.currentLargeMessageController));
        } else {
            clientLargeMessage.setLargeMessageController(this.currentLargeMessageController);
        }
        this.handleRegularMessage(clientLargeMessage);
    }

    @Override
    public synchronized void handleLargeMessageContinuation(byte[] chunk, int flowControlSize, boolean isContinues) throws Exception {
        if (this.closing) {
            return;
        }
        if (this.currentLargeMessageController == null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::Sending back credits for largeController = null " + flowControlSize);
            }
            this.flowControl(flowControlSize, false);
        } else {
            this.currentLargeMessageController.addPacket(chunk, flowControlSize, isContinues);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clear(boolean waitForOnMessage) throws ActiveMQException {
        ClientConsumerImpl clientConsumerImpl = this;
        synchronized (clientConsumerImpl) {
            LinkedListIterator<ClientMessageInternal> iter = this.buffer.iterator();
            while (iter.hasNext()) {
                try {
                    ClientMessageInternal message = (ClientMessageInternal)iter.next();
                    if (message.isLargeMessage()) {
                        ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
                        largeMessage.getLargeMessageController().cancel();
                    }
                    this.flowControlBeforeConsumption(message);
                }
                catch (Exception e) {
                    ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
                }
            }
            this.clearBuffer();
            try {
                this.resetLargeMessageController();
            }
            catch (Throwable e) {
                ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
            }
        }
        this.waitForOnMessageToComplete(waitForOnMessage);
    }

    private void resetLargeMessageController() {
        LargeMessageControllerImpl controller = this.currentLargeMessageController;
        if (controller != null) {
            controller.cancel();
            this.currentLargeMessageController = null;
        }
    }

    @Override
    public int getInitialWindowSize() {
        return this.initialWindow;
    }

    @Override
    public int getClientWindowSize() {
        return this.clientWindowSize;
    }

    @Override
    public int getBufferSize() {
        return this.buffer.size();
    }

    @Override
    public void acknowledge(ClientMessage message) throws ActiveMQException {
        ClientMessageInternal cmi = (ClientMessageInternal)message;
        if (this.ackIndividually) {
            this.individualAcknowledge(message);
        } else {
            this.ackBytes += message.getEncodeSize();
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::acknowledge ackBytes=" + this.ackBytes + " and ackBatchSize=" + this.ackBatchSize + ", encodeSize=" + message.getEncodeSize());
            }
            if (this.ackBytes >= this.ackBatchSize) {
                if (logger.isTraceEnabled()) {
                    logger.trace(this + ":: acknowledge acking " + cmi);
                }
                this.doAck(cmi);
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
                }
                this.lastAckedMessage = cmi;
            }
        }
    }

    @Override
    public void individualAcknowledge(ClientMessage message) throws ActiveMQException {
        if (this.lastAckedMessage != null) {
            this.flushAcks();
        }
        this.session.individualAcknowledge(this, message);
    }

    @Override
    public void flushAcks() throws ActiveMQException {
        if (this.lastAckedMessage != null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::FlushACK acking lastMessage::" + this.lastAckedMessage);
            }
            this.doAck(this.lastAckedMessage);
        }
    }

    @Override
    public void flowControl(int messageBytes, boolean discountSlowConsumer) throws ActiveMQException {
        if (this.clientWindowSize >= 0) {
            this.creditsToSend += messageBytes;
            if (this.creditsToSend >= this.clientWindowSize) {
                if (this.clientWindowSize == 0 && discountSlowConsumer) {
                    if (logger.isTraceEnabled()) {
                        logger.trace(this + "::FlowControl::Sending " + this.creditsToSend + " -1, for slow consumer");
                    }
                    int credits = this.creditsToSend - 1;
                    this.creditsToSend = 0;
                    if (credits > 0) {
                        this.sendCredits(credits);
                    }
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Sending " + messageBytes + " from flow-control");
                    }
                    int credits = this.creditsToSend;
                    this.creditsToSend = 0;
                    if (credits > 0) {
                        this.sendCredits(credits);
                    }
                }
            }
        }
    }

    private void startSlowConsumer() {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer");
        }
        this.sendCredits(1);
        try {
            this.pendingFlowControl.await(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void resetIfSlowConsumer() {
        if (this.clientWindowSize == 0) {
            this.sendCredits(0);
            final CountDownLatch latch = new CountDownLatch(1);
            this.flowControlExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    latch.countDown();
                }
            });
            try {
                latch.await(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new ActiveMQInterruptedException(e);
            }
        }
    }

    private void requeueExecutors() {
        for (int i = 0; i < this.buffer.size(); ++i) {
            this.queueExecutor();
        }
    }

    private void queueExecutor() {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Adding Runner on Executor for delivery");
        }
        this.sessionExecutor.execute(this.runner);
    }

    private void sendCredits(final int credits) {
        this.pendingFlowControl.countUp();
        this.flowControlExecutor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    ClientConsumerImpl.this.sessionContext.sendConsumerCredits(ClientConsumerImpl.this, credits);
                }
                finally {
                    ClientConsumerImpl.this.pendingFlowControl.countDown();
                }
            }
        });
    }

    private void waitForOnMessageToComplete(boolean waitForOnMessage) {
        if (this.handler == null) {
            return;
        }
        if (!waitForOnMessage || Thread.currentThread() == this.onMessageThread) {
            return;
        }
        FutureLatch future = new FutureLatch();
        this.sessionExecutor.execute(future);
        boolean ok = future.await(10000L);
        if (!ok) {
            ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
        }
    }

    private void checkClosed() throws ActiveMQException {
        if (this.closed) {
            throw ActiveMQClientMessageBundle.BUNDLE.consumerClosed();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void callOnMessage() throws Exception {
        block17: {
            block19: {
                ClientMessageInternal message;
                block18: {
                    if (this.closing || this.stopped) {
                        return;
                    }
                    this.session.workDone();
                    MessageHandler theHandler = this.handler;
                    if (theHandler == null) break block17;
                    if (this.rateLimiter != null) {
                        this.rateLimiter.limit();
                    }
                    this.failedOver = false;
                    ClientConsumerImpl clientConsumerImpl = this;
                    synchronized (clientConsumerImpl) {
                        message = this.buffer.poll();
                    }
                    if (message == null) break block17;
                    if (message.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                        return;
                    }
                    boolean expired = message.isExpired();
                    this.flowControlBeforeConsumption(message);
                    if (expired) break block18;
                    if (logger.isTraceEnabled()) {
                        logger.trace(this + "::Calling handler.onMessage");
                    }
                    ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>(){

                        @Override
                        public ClassLoader run() {
                            ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
                            Thread.currentThread().setContextClassLoader(ClientConsumerImpl.this.contextClassLoader);
                            return originalLoader;
                        }
                    });
                    this.onMessageThread = Thread.currentThread();
                    try {
                        theHandler.onMessage(message);
                    }
                    catch (Throwable throwable) {
                        try {
                            AccessController.doPrivileged(new PrivilegedAction<Object>(originalLoader){
                                final /* synthetic */ ClassLoader val$originalLoader;
                                {
                                    this.val$originalLoader = classLoader;
                                }

                                @Override
                                public Object run() {
                                    Thread.currentThread().setContextClassLoader(this.val$originalLoader);
                                    return null;
                                }
                            });
                        }
                        catch (Exception e) {
                            ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                        }
                        this.onMessageThread = null;
                        throw throwable;
                    }
                    try {
                        AccessController.doPrivileged(new /* invalid duplicate definition of identical inner class */);
                    }
                    catch (Exception e) {
                        ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                    }
                    this.onMessageThread = null;
                    if (logger.isTraceEnabled()) {
                        logger.trace(this + "::Handler.onMessage done");
                    }
                    if (message.isLargeMessage()) {
                        message.discardBody();
                    }
                    break block19;
                }
                this.session.expire(this, message);
            }
            if (this.clientWindowSize == 0) {
                this.startSlowConsumer();
            }
        }
    }

    private void flowControlBeforeConsumption(ClientMessageInternal message) throws ActiveMQException {
        if (message.getFlowControlSize() != 0) {
            this.flowControl(message.getFlowControlSize(), !message.isLargeMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doCleanUp(boolean sendCloseMessage) throws ActiveMQException {
        try {
            if (this.closed) {
                return;
            }
            this.closing = true;
            this.waitForOnMessageToComplete(true);
            this.resetLargeMessageController();
            this.closed = true;
            ClientConsumerImpl clientConsumerImpl = this;
            synchronized (clientConsumerImpl) {
                if (this.receiverThread != null) {
                    this.notify();
                }
                this.handler = null;
                this.receiverThread = null;
            }
            this.flushAcks();
            this.clearBuffer();
            if (sendCloseMessage) {
                this.sessionContext.closeConsumer(this);
            }
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.session.removeConsumer(this);
    }

    private void clearBuffer() {
        this.buffer.clear();
    }

    private void doAck(ClientMessageInternal message) throws ActiveMQException {
        this.ackBytes = 0;
        this.lastAckedMessage = null;
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Acking message " + message);
        }
        this.session.acknowledge(this, message);
    }

    public String toString() {
        return super.toString() + "{" + "consumerContext=" + this.consumerContext + ", queueName=" + this.queueName + '}';
    }

    private class Runner
    implements Runnable {
        private Runner() {
        }

        @Override
        public void run() {
            try {
                ClientConsumerImpl.this.callOnMessage();
            }
            catch (Exception e) {
                ActiveMQClientLogger.LOGGER.onMessageError(e);
                ClientConsumerImpl.this.lastException = e;
            }
        }
    }
}

