/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.core;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.ReceiveAndReplyCallback;
import org.springframework.amqp.core.ReceiveAndReplyMessageCallback;
import org.springframework.amqp.core.ReplyToAddressCallback;
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannelConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitOperations;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerContainerAware;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.PendingConfirm;
import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class RabbitTemplate
extends RabbitAccessor
implements BeanFactoryAware,
RabbitOperations,
MessageListener,
ListenerContainerAware,
PublisherCallbackChannel.Listener {
    private static final String RETURN_CORRELATION_KEY = "spring_request_return_correlation";
    private static final String DEFAULT_EXCHANGE = "";
    private static final String DEFAULT_ROUTING_KEY = "";
    private static final long DEFAULT_REPLY_TIMEOUT = 5000L;
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();
    private final ConcurrentMap<Channel, RabbitTemplate> publisherConfirmChannels = new ConcurrentHashMap<Channel, RabbitTemplate>();
    private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>();
    private final String uuid = UUID.randomUUID().toString();
    private final AtomicInteger messageTagProvider = new AtomicInteger();
    private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
    private final ReplyToAddressCallback<?> defaultReplyToAddressCallback = new ReplyToAddressCallback<Object>(){

        public Address getReplyToAddress(Message request, Object reply) {
            return RabbitTemplate.this.getReplyToAddress(request);
        }
    };
    private volatile String exchange = "";
    private volatile String routingKey = "";
    private volatile String queue;
    private volatile long receiveTimeout = 0L;
    private volatile long replyTimeout = 5000L;
    private volatile MessageConverter messageConverter = new SimpleMessageConverter();
    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    private volatile String encoding = "UTF-8";
    private volatile String replyAddress;
    private volatile ConfirmCallback confirmCallback;
    private volatile ReturnCallback returnCallback;
    private volatile Boolean confirmsOrReturnsCapable;
    private volatile Expression mandatoryExpression = new ValueExpression<Boolean>(false);
    private volatile String correlationKey = null;
    private volatile RetryTemplate retryTemplate;
    private volatile RecoveryCallback<?> recoveryCallback;
    private volatile Expression sendConnectionFactorySelectorExpression;
    private volatile Expression receiveConnectionFactorySelectorExpression;
    private volatile boolean usingFastReplyTo;
    private volatile boolean evaluatedFastReplyTo;
    private volatile boolean useTemporaryReplyQueues;
    private volatile Collection<MessagePostProcessor> beforePublishPostProcessors;
    private volatile Collection<MessagePostProcessor> afterReceivePostProcessors;
    private volatile boolean isListener;
    private volatile Expression userIdExpression;

    public RabbitTemplate() {
        this.initDefaultStrategies();
    }

    public RabbitTemplate(ConnectionFactory connectionFactory) {
        this();
        this.setConnectionFactory(connectionFactory);
    }

    protected void initDefaultStrategies() {
        this.setMessageConverter((MessageConverter)new SimpleMessageConverter());
    }

    public void setExchange(String exchange) {
        this.exchange = exchange != null ? exchange : "";
    }

    public String getExchange() {
        return this.exchange;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public void setQueue(String queue) {
        this.queue = queue;
    }

    public void setEncoding(String encoding) {
        this.encoding = encoding;
    }

    @Deprecated
    public void setReplyQueue(Queue replyQueue) {
        this.setReplyAddress(replyQueue.getName());
    }

    public void setReplyAddress(String replyAddress) {
        this.replyAddress = replyAddress;
        this.evaluatedFastReplyTo = false;
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public void setReplyTimeout(long replyTimeout) {
        this.replyTimeout = replyTimeout;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) {
        Assert.notNull((Object)messagePropertiesConverter, (String)"messagePropertiesConverter must not be null");
        this.messagePropertiesConverter = messagePropertiesConverter;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setConfirmCallback(ConfirmCallback confirmCallback) {
        Assert.state((this.confirmCallback == null || this.confirmCallback == confirmCallback ? 1 : 0) != 0, (String)"Only one ConfirmCallback is supported by each RabbitTemplate");
        this.confirmCallback = confirmCallback;
    }

    public void setReturnCallback(ReturnCallback returnCallback) {
        Assert.state((this.returnCallback == null || this.returnCallback == returnCallback ? 1 : 0) != 0, (String)"Only one ReturnCallback is supported by each RabbitTemplate");
        this.returnCallback = returnCallback;
    }

    public void setMandatory(boolean mandatory) {
        this.mandatoryExpression = new ValueExpression<Boolean>(mandatory);
    }

    public void setMandatoryExpression(Expression mandatoryExpression) {
        Assert.notNull((Object)mandatoryExpression, (String)"'mandatoryExpression' must not be null");
        this.mandatoryExpression = mandatoryExpression;
    }

    public void setSendConnectionFactorySelectorExpression(Expression sendConnectionFactorySelectorExpression) {
        this.sendConnectionFactorySelectorExpression = sendConnectionFactorySelectorExpression;
    }

    public void setReceiveConnectionFactorySelectorExpression(Expression receiveConnectionFactorySelectorExpression) {
        this.receiveConnectionFactorySelectorExpression = receiveConnectionFactorySelectorExpression;
    }

    public void setCorrelationKey(String correlationKey) {
        Assert.hasText((String)correlationKey, (String)"'correlationKey' must not be null or empty");
        if (!correlationKey.trim().equals("correlationId")) {
            this.correlationKey = correlationKey.trim();
        }
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.evaluationContext.setBeanResolver((BeanResolver)new BeanFactoryResolver(beanFactory));
        this.evaluationContext.addPropertyAccessor((PropertyAccessor)new MapAccessor());
    }

    public void setBeforePublishPostProcessors(MessagePostProcessor ... beforePublishPostProcessors) {
        Assert.notNull((Object)beforePublishPostProcessors, (String)"'beforePublishPostProcessors' cannot be null");
        Assert.noNullElements((Object[])beforePublishPostProcessors, (String)"'beforePublishPostProcessors' cannot have null elements");
        this.beforePublishPostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(beforePublishPostProcessors));
    }

    @Deprecated
    public void setAfterReceivePostProcessor(MessagePostProcessor ... afterReceivePostProcessors) {
        this.setAfterReceivePostProcessors(afterReceivePostProcessors);
    }

    public void setAfterReceivePostProcessors(MessagePostProcessor ... afterReceivePostProcessors) {
        Assert.notNull((Object)afterReceivePostProcessors, (String)"'afterReceivePostProcessors' cannot be null");
        Assert.noNullElements((Object[])afterReceivePostProcessors, (String)"'afterReceivePostProcessors' cannot have null elements");
        this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
    }

    public void setUseTemporaryReplyQueues(boolean value) {
        this.useTemporaryReplyQueues = value;
    }

    public void setUserIdExpression(Expression userIdExpression) {
        this.userIdExpression = userIdExpression;
    }

    public void setUserIdExpressionString(String userIdExpression) {
        this.userIdExpression = PARSER.parseExpression(userIdExpression);
    }

    @Override
    public Collection<String> expectedQueueNames() {
        this.isListener = true;
        List<String> replyQueue = null;
        if (this.replyAddress != null) {
            Address address = new Address(this.replyAddress);
            if ("".equals(address.getExchangeName())) {
                replyQueue = Collections.singletonList(address.getRoutingKey());
            } else {
                this.logger.debug((Object)"Cannot verify reply queue because it has the form 'exchange/routingKey'");
            }
        }
        return replyQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<CorrelationData> getUnconfirmed(long age) {
        HashSet<CorrelationData> unconfirmed = new HashSet<CorrelationData>();
        ConcurrentMap<Channel, RabbitTemplate> concurrentMap = this.publisherConfirmChannels;
        synchronized (concurrentMap) {
            long cutoffTime = System.currentTimeMillis() - age;
            for (Channel channel : this.publisherConfirmChannels.keySet()) {
                Collection<PendingConfirm> confirms = ((PublisherCallbackChannel)channel).expire(this, cutoffTime);
                for (PendingConfirm confirm : confirms) {
                    unconfirmed.add(confirm.getCorrelationData());
                }
            }
        }
        return unconfirmed.size() > 0 ? unconfirmed : null;
    }

    private void evaluateFastReplyTo() {
        this.usingFastReplyTo = this.useDirectReplyTo();
        this.evaluatedFastReplyTo = true;
    }

    protected boolean useDirectReplyTo() {
        if (this.useTemporaryReplyQueues) {
            if (this.replyAddress != null) {
                this.logger.error((Object)"'useTemporaryReplyQueues' is ignored when a 'replyAddress' is provided");
            } else {
                return false;
            }
        }
        if (this.replyAddress == null || "amq.rabbitmq.reply-to".equals(this.replyAddress)) {
            try {
                this.execute(new ChannelCallback<Void>(){

                    @Override
                    public Void doInRabbit(Channel channel) throws Exception {
                        channel.queueDeclarePassive("amq.rabbitmq.reply-to");
                        return null;
                    }
                });
                return true;
            }
            catch (Exception e) {
                if (this.replyAddress != null) {
                    this.logger.error((Object)("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary queues will be used:" + e.getMessage() + "."));
                } else if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary queues will be used:" + e.getMessage() + "."));
                }
                this.replyAddress = null;
            }
        }
        return false;
    }

    public void send(Message message) throws AmqpException {
        this.send(this.exchange, this.routingKey, message);
    }

    public void send(String routingKey, Message message) throws AmqpException {
        this.send(this.exchange, routingKey, message);
    }

    public void send(String exchange, String routingKey, Message message) throws AmqpException {
        this.send(exchange, routingKey, message, null);
    }

    public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException {
        this.execute(new ChannelCallback<Object>(){

            @Override
            public Object doInRabbit(Channel channel) throws Exception {
                RabbitTemplate.this.doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null && (Boolean)RabbitTemplate.this.mandatoryExpression.getValue((EvaluationContext)RabbitTemplate.this.evaluationContext, (Object)message, Boolean.class) != false, correlationData);
                return null;
            }
        }, this.obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    private ConnectionFactory obtainTargetConnectionFactory(Expression expression, Object rootObject) {
        if (expression != null && this.getConnectionFactory() instanceof AbstractRoutingConnectionFactory) {
            AbstractRoutingConnectionFactory routingConnectionFactory = (AbstractRoutingConnectionFactory)this.getConnectionFactory();
            Object lookupKey = rootObject != null ? this.sendConnectionFactorySelectorExpression.getValue((EvaluationContext)this.evaluationContext, rootObject) : this.sendConnectionFactorySelectorExpression.getValue((EvaluationContext)this.evaluationContext);
            if (lookupKey != null) {
                ConnectionFactory connectionFactory = routingConnectionFactory.getTargetConnectionFactory(lookupKey);
                if (connectionFactory != null) {
                    return connectionFactory;
                }
                if (!routingConnectionFactory.isLenientFallback()) {
                    throw new IllegalStateException("Cannot determine target ConnectionFactory for lookup key [" + lookupKey + "]");
                }
            }
        }
        return this.getConnectionFactory();
    }

    public void convertAndSend(Object object) throws AmqpException {
        this.convertAndSend(this.exchange, this.routingKey, object, (CorrelationData)null);
    }

    @Deprecated
    public void correlationconvertAndSend(Object object, CorrelationData correlationData) throws AmqpException {
        this.correlationConvertAndSend(object, correlationData);
    }

    public void correlationConvertAndSend(Object object, CorrelationData correlationData) throws AmqpException {
        this.convertAndSend(this.exchange, this.routingKey, object, correlationData);
    }

    public void convertAndSend(String routingKey, Object object) throws AmqpException {
        this.convertAndSend(this.exchange, routingKey, object, (CorrelationData)null);
    }

    public void convertAndSend(String routingKey, Object object, CorrelationData correlationData) throws AmqpException {
        this.convertAndSend(this.exchange, routingKey, object, correlationData);
    }

    public void convertAndSend(String exchange, String routingKey, Object object) throws AmqpException {
        this.convertAndSend(exchange, routingKey, object, (CorrelationData)null);
    }

    public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException {
        this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
    }

    public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
        this.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor);
    }

    public void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
        this.convertAndSend(this.exchange, routingKey, message, messagePostProcessor, null);
    }

    public void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        this.convertAndSend(this.exchange, routingKey, message, messagePostProcessor, correlationData);
    }

    public void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
        this.convertAndSend(exchange, routingKey, message, messagePostProcessor, null);
    }

    public void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        Message messageToSend = this.convertMessageIfNecessary(message);
        messageToSend = messagePostProcessor.postProcessMessage(messageToSend);
        this.send(exchange, routingKey, messageToSend, correlationData);
    }

    public Message receive() throws AmqpException {
        String queue = this.getRequiredQueue();
        return this.receive(queue);
    }

    public Message receive(String queueName) {
        if (this.receiveTimeout == 0L) {
            return this.doReceiveNoWait(queueName);
        }
        return this.receive(queueName, this.receiveTimeout);
    }

    protected Message doReceiveNoWait(final String queueName) {
        return this.execute(new ChannelCallback<Message>(){

            @Override
            public Message doInRabbit(Channel channel) throws IOException {
                GetResponse response = channel.basicGet(queueName, !RabbitTemplate.this.isChannelTransacted());
                if (response != null) {
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    if (RabbitTemplate.this.isChannelLocallyTransacted(channel)) {
                        channel.basicAck(deliveryTag, false);
                        channel.txCommit();
                    } else if (RabbitTemplate.this.isChannelTransacted()) {
                        ConnectionFactoryUtils.registerDeliveryTag(RabbitTemplate.this.getConnectionFactory(), channel, deliveryTag);
                    }
                    return RabbitTemplate.this.buildMessageFromResponse(response);
                }
                return null;
            }
        }, this.obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, queueName));
    }

    public Message receive(long timeoutMillis) throws AmqpException {
        String queue = this.getRequiredQueue();
        if (timeoutMillis == 0L) {
            return this.doReceiveNoWait(queue);
        }
        return this.receive(queue, timeoutMillis);
    }

    public Message receive(final String queueName, final long timeoutMillis) {
        return this.execute(new ChannelCallback<Message>(){

            @Override
            public Message doInRabbit(Channel channel) throws Exception {
                QueueingConsumer consumer = RabbitTemplate.this.createQueueingConsumer(queueName, channel);
                QueueingConsumer.Delivery delivery = timeoutMillis < 0L ? consumer.nextDelivery() : consumer.nextDelivery(timeoutMillis);
                channel.basicCancel(consumer.getConsumerTag());
                if (delivery == null) {
                    return null;
                }
                if (RabbitTemplate.this.isChannelLocallyTransacted(channel)) {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    channel.txCommit();
                } else if (RabbitTemplate.this.isChannelTransacted()) {
                    ConnectionFactoryUtils.registerDeliveryTag(RabbitTemplate.this.getConnectionFactory(), channel, delivery.getEnvelope().getDeliveryTag());
                } else {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
                return RabbitTemplate.this.buildMessageFromDelivery(delivery);
            }
        });
    }

    public Object receiveAndConvert() throws AmqpException {
        return this.receiveAndConvert(this.getRequiredQueue());
    }

    public Object receiveAndConvert(String queueName) throws AmqpException {
        return this.receiveAndConvert(queueName, this.receiveTimeout);
    }

    public Object receiveAndConvert(long timeoutMillis) throws AmqpException {
        return this.receiveAndConvert(this.getRequiredQueue(), timeoutMillis);
    }

    public Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException {
        Message response;
        Message message = response = timeoutMillis == 0L ? this.doReceiveNoWait(queueName) : this.receive(queueName, timeoutMillis);
        if (response != null) {
            return this.getRequiredMessageConverter().fromMessage(response);
        }
        return null;
    }

    public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException {
        return this.receiveAndReply(this.getRequiredQueue(), callback);
    }

    public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException {
        return this.receiveAndReply(queueName, callback, this.defaultReplyToAddressCallback);
    }

    public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String exchange, String routingKey) throws AmqpException {
        return this.receiveAndReply(this.getRequiredQueue(), callback, exchange, routingKey);
    }

    public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, final String replyExchange, final String replyRoutingKey) throws AmqpException {
        return this.receiveAndReply(queueName, callback, new ReplyToAddressCallback<S>(){

            public Address getReplyToAddress(Message request, S reply) {
                return new Address(replyExchange, replyRoutingKey);
            }
        });
    }

    public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
        return this.receiveAndReply(this.getRequiredQueue(), callback, replyToAddressCallback);
    }

    public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
        return this.doReceiveAndReply(queueName, callback, replyToAddressCallback);
    }

    private <R, S> boolean doReceiveAndReply(final String queueName, final ReceiveAndReplyCallback<R, S> callback, final ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
        return this.execute(new ChannelCallback<Boolean>(){

            @Override
            public Boolean doInRabbit(Channel channel) throws Exception {
                boolean channelTransacted = RabbitTemplate.this.isChannelTransacted();
                Message receiveMessage = null;
                boolean channelLocallyTransacted = RabbitTemplate.this.isChannelLocallyTransacted(channel);
                if (RabbitTemplate.this.receiveTimeout == 0L) {
                    GetResponse response = channel.basicGet(queueName, !channelTransacted);
                    if (response != null) {
                        long deliveryTag = response.getEnvelope().getDeliveryTag();
                        if (channelLocallyTransacted) {
                            channel.basicAck(deliveryTag, false);
                        } else if (channelTransacted) {
                            ConnectionFactoryUtils.registerDeliveryTag(RabbitTemplate.this.getConnectionFactory(), channel, deliveryTag);
                        }
                        receiveMessage = RabbitTemplate.this.buildMessageFromResponse(response);
                    }
                } else {
                    QueueingConsumer consumer = RabbitTemplate.this.createQueueingConsumer(queueName, channel);
                    QueueingConsumer.Delivery delivery = RabbitTemplate.this.receiveTimeout < 0L ? consumer.nextDelivery() : consumer.nextDelivery(RabbitTemplate.this.receiveTimeout);
                    channel.basicCancel(consumer.getConsumerTag());
                    if (delivery != null) {
                        long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                        if (channelTransacted && !channelLocallyTransacted) {
                            ConnectionFactoryUtils.registerDeliveryTag(RabbitTemplate.this.getConnectionFactory(), channel, deliveryTag);
                        } else {
                            channel.basicAck(deliveryTag, false);
                        }
                        receiveMessage = RabbitTemplate.this.buildMessageFromDelivery(delivery);
                    }
                }
                if (receiveMessage != null) {
                    Object reply;
                    Object receive = receiveMessage;
                    if (!ReceiveAndReplyMessageCallback.class.isAssignableFrom(callback.getClass())) {
                        receive = RabbitTemplate.this.getRequiredMessageConverter().fromMessage(receiveMessage);
                    }
                    try {
                        reply = callback.handle(receive);
                    }
                    catch (ClassCastException e) {
                        StackTraceElement[] trace = e.getStackTrace();
                        if (trace[0].getMethodName().equals("handle") && trace[1].getFileName().equals("RabbitTemplate.java")) {
                            throw new IllegalArgumentException("ReceiveAndReplyCallback '" + callback + "' can't handle received object '" + receive + "'", e);
                        }
                        throw e;
                    }
                    if (reply != null) {
                        byte[] correlation;
                        Address replyTo = replyToAddressCallback.getReplyToAddress(receiveMessage, reply);
                        Message replyMessage = RabbitTemplate.this.convertMessageIfNecessary(reply);
                        MessageProperties receiveMessageProperties = receiveMessage.getMessageProperties();
                        MessageProperties replyMessageProperties = replyMessage.getMessageProperties();
                        Object object = correlation = RabbitTemplate.this.correlationKey == null ? receiveMessageProperties.getCorrelationId() : (Object)receiveMessageProperties.getHeaders().get(RabbitTemplate.this.correlationKey);
                        if (RabbitTemplate.this.correlationKey == null || correlation == null) {
                            String messageId;
                            if (correlation == null && (messageId = receiveMessageProperties.getMessageId()) != null) {
                                correlation = messageId.getBytes(RabbitTemplate.this.encoding);
                            }
                            replyMessageProperties.setCorrelationId(correlation);
                        } else {
                            replyMessageProperties.setHeader(RabbitTemplate.this.correlationKey, (Object)correlation);
                        }
                        RabbitTemplate.this.doSend(channel, replyTo.getExchangeName(), replyTo.getRoutingKey(), replyMessage, RabbitTemplate.this.returnCallback != null && (Boolean)RabbitTemplate.this.mandatoryExpression.getValue((EvaluationContext)RabbitTemplate.this.evaluationContext, (Object)replyMessage, Boolean.class) != false, null);
                    } else if (channelLocallyTransacted) {
                        channel.txCommit();
                    }
                    return true;
                }
                return false;
            }
        }, this.obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, queueName));
    }

    public Message sendAndReceive(Message message) throws AmqpException {
        return this.sendAndReceive(message, null);
    }

    public Message sendAndReceive(Message message, CorrelationData correlationData) throws AmqpException {
        return this.doSendAndReceive(this.exchange, this.routingKey, message, correlationData);
    }

    public Message sendAndReceive(String routingKey, Message message) throws AmqpException {
        return this.sendAndReceive(routingKey, message, null);
    }

    public Message sendAndReceive(String routingKey, Message message, CorrelationData correlationData) throws AmqpException {
        return this.doSendAndReceive(this.exchange, routingKey, message, correlationData);
    }

    public Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException {
        return this.sendAndReceive(exchange, routingKey, message, null);
    }

    public Message sendAndReceive(String exchange, String routingKey, Message message, CorrelationData correlationData) throws AmqpException {
        return this.doSendAndReceive(exchange, routingKey, message, correlationData);
    }

    public Object convertSendAndReceive(Object message) throws AmqpException {
        return this.convertSendAndReceive(message, (CorrelationData)null);
    }

    public Object convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException {
        return this.convertSendAndReceive(this.exchange, this.routingKey, message, null, correlationData);
    }

    public Object convertSendAndReceive(String routingKey, Object message) throws AmqpException {
        return this.convertSendAndReceive(routingKey, message, (CorrelationData)null);
    }

    public Object convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData) throws AmqpException {
        return this.convertSendAndReceive(this.exchange, routingKey, message, null, correlationData);
    }

    public Object convertSendAndReceive(String exchange, String routingKey, Object message) throws AmqpException {
        return this.convertSendAndReceive(exchange, routingKey, message, (CorrelationData)null);
    }

    public Object convertSendAndReceive(String exchange, String routingKey, Object message, CorrelationData correlationData) throws AmqpException {
        return this.convertSendAndReceive(exchange, routingKey, message, null, correlationData);
    }

    public Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
        return this.convertSendAndReceive(message, messagePostProcessor, null);
    }

    public Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        return this.convertSendAndReceive(this.exchange, this.routingKey, message, messagePostProcessor, correlationData);
    }

    public Object convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
        return this.convertSendAndReceive(routingKey, message, messagePostProcessor, null);
    }

    public Object convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        return this.convertSendAndReceive(this.exchange, routingKey, message, messagePostProcessor, correlationData);
    }

    public Object convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
        return this.convertSendAndReceive(exchange, routingKey, message, messagePostProcessor, null);
    }

    public Object convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
        Message replyMessage;
        Message requestMessage = this.convertMessageIfNecessary(message);
        if (messagePostProcessor != null) {
            requestMessage = messagePostProcessor.postProcessMessage(requestMessage);
        }
        if ((replyMessage = this.doSendAndReceive(exchange, routingKey, requestMessage, correlationData)) == null) {
            return null;
        }
        return this.getRequiredMessageConverter().fromMessage(replyMessage);
    }

    protected Message convertMessageIfNecessary(Object object) {
        if (object instanceof Message) {
            return (Message)object;
        }
        return this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Message doSendAndReceive(String exchange, String routingKey, Message message, CorrelationData correlationData) {
        if (!this.evaluatedFastReplyTo) {
            RabbitTemplate rabbitTemplate = this;
            synchronized (rabbitTemplate) {
                if (!this.evaluatedFastReplyTo) {
                    this.evaluateFastReplyTo();
                }
            }
        }
        if (this.replyAddress == null || this.usingFastReplyTo) {
            return this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
        }
        return this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData);
    }

    protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) {
        return this.execute(new ChannelCallback<Message>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Message doInRabbit(Channel channel) throws Exception {
                String replyTo;
                final PendingReply pendingReply = new PendingReply();
                String messageTag = String.valueOf(RabbitTemplate.this.messageTagProvider.incrementAndGet());
                RabbitTemplate.this.replyHolder.put(messageTag, pendingReply);
                Assert.isNull((Object)message.getMessageProperties().getReplyTo(), (String)"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
                if (RabbitTemplate.this.usingFastReplyTo) {
                    replyTo = "amq.rabbitmq.reply-to";
                } else {
                    AMQP.Queue.DeclareOk queueDeclaration = channel.queueDeclare();
                    replyTo = queueDeclaration.getQueue();
                }
                message.getMessageProperties().setReplyTo(replyTo);
                String consumerTag = UUID.randomUUID().toString();
                DefaultConsumer consumer = new DefaultConsumer(channel){

                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        MessageProperties messageProperties = RabbitTemplate.this.messagePropertiesConverter.toMessageProperties(properties, envelope, RabbitTemplate.this.encoding);
                        Message reply = new Message(body, messageProperties);
                        if (RabbitTemplate.this.logger.isTraceEnabled()) {
                            RabbitTemplate.this.logger.trace((Object)("Message received " + reply));
                        }
                        pendingReply.reply(reply);
                    }
                };
                channel.basicConsume(replyTo, true, consumerTag, true, true, null, (Consumer)consumer);
                Message reply = null;
                try {
                    reply = RabbitTemplate.this.exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply, messageTag);
                }
                finally {
                    RabbitTemplate.this.replyHolder.remove(messageTag);
                    try {
                        channel.basicCancel(consumerTag);
                    }
                    catch (Exception exception) {}
                }
                return reply;
            }
        }, this.obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) {
        Assert.state((boolean)this.isListener, (String)("RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': " + this.replyAddress));
        return this.execute(new ChannelCallback<Message>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Message doInRabbit(Channel channel) throws Exception {
                PendingReply pendingReply = new PendingReply();
                String messageTag = String.valueOf(RabbitTemplate.this.messageTagProvider.incrementAndGet());
                RabbitTemplate.this.replyHolder.put(messageTag, pendingReply);
                String savedReplyTo = message.getMessageProperties().getReplyTo();
                pendingReply.setSavedReplyTo(savedReplyTo);
                if (StringUtils.hasLength((String)savedReplyTo) && RabbitTemplate.this.logger.isDebugEnabled()) {
                    RabbitTemplate.this.logger.debug((Object)("Replacing replyTo header:" + savedReplyTo + " in favor of template's configured reply-queue:" + RabbitTemplate.this.replyAddress));
                }
                message.getMessageProperties().setReplyTo(RabbitTemplate.this.replyAddress);
                String savedCorrelation = null;
                if (RabbitTemplate.this.correlationKey == null) {
                    byte[] correlationId = message.getMessageProperties().getCorrelationId();
                    if (correlationId != null) {
                        savedCorrelation = new String(correlationId, RabbitTemplate.this.encoding);
                    }
                } else {
                    savedCorrelation = (String)message.getMessageProperties().getHeaders().get(RabbitTemplate.this.correlationKey);
                }
                pendingReply.setSavedCorrelation(savedCorrelation);
                if (RabbitTemplate.this.correlationKey == null) {
                    message.getMessageProperties().setCorrelationId(messageTag.getBytes(RabbitTemplate.this.encoding));
                } else {
                    message.getMessageProperties().setHeader(RabbitTemplate.this.correlationKey, (Object)messageTag);
                }
                if (RabbitTemplate.this.logger.isDebugEnabled()) {
                    RabbitTemplate.this.logger.debug((Object)("Sending message with tag " + messageTag));
                }
                Message reply = null;
                try {
                    reply = RabbitTemplate.this.exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply, messageTag);
                }
                finally {
                    RabbitTemplate.this.replyHolder.remove(messageTag);
                }
                return reply;
            }
        }, this.obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

    private Message exchangeMessages(String exchange, String routingKey, Message message, CorrelationData correlationData, Channel channel, PendingReply pendingReply, String messageTag) throws Exception {
        boolean mandatory = (Boolean)this.mandatoryExpression.getValue((EvaluationContext)this.evaluationContext, (Object)message, Boolean.class);
        if (mandatory && this.returnCallback == null) {
            message.getMessageProperties().getHeaders().put(RETURN_CORRELATION_KEY, messageTag);
        }
        this.doSend(channel, exchange, routingKey, message, mandatory, correlationData);
        Message reply = this.replyTimeout < 0L ? pendingReply.get() : pendingReply.get(this.replyTimeout, TimeUnit.MILLISECONDS);
        return reply;
    }

    @Override
    public <T> T execute(ChannelCallback<T> action) {
        return this.execute(action, this.getConnectionFactory());
    }

    private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
        if (this.retryTemplate != null) {
            try {
                return (T)this.retryTemplate.execute(new RetryCallback<T, Exception>(){

                    public T doWithRetry(RetryContext context) throws Exception {
                        return RabbitTemplate.this.doExecute(action, connectionFactory);
                    }
                }, this.recoveryCallback);
            }
            catch (Exception e) {
                if (e instanceof RuntimeException) {
                    throw (RuntimeException)e;
                }
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            }
        }
        return this.doExecute(action, connectionFactory);
    }

    private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
        Channel channel;
        Assert.notNull(action, (String)"Callback object must not be null");
        RabbitResourceHolder resourceHolder = null;
        Connection connection = null;
        if (this.isChannelTransacted()) {
            resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(connectionFactory, true);
            channel = resourceHolder.getChannel();
            if (channel == null) {
                ConnectionFactoryUtils.releaseResources(resourceHolder);
                throw new IllegalStateException("Resource holder returned a null channel");
            }
        } else {
            connection = connectionFactory.createConnection();
            if (connection == null) {
                throw new IllegalStateException("Connection factory returned a null connection");
            }
            try {
                channel = connection.createChannel(false);
                if (channel == null) {
                    throw new IllegalStateException("Connection returned a null channel");
                }
            }
            catch (RuntimeException e) {
                RabbitUtils.closeConnection(connection);
                throw e;
            }
        }
        try {
            if (this.confirmsOrReturnsCapable == null) {
                this.determineConfirmsReturnsCapability(connectionFactory);
            }
            if (this.confirmsOrReturnsCapable.booleanValue()) {
                this.addListener(channel);
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Executing callback on RabbitMQ Channel: " + channel));
            }
            T e = action.doInRabbit(channel);
            return e;
        }
        catch (Exception ex) {
            if (this.isChannelLocallyTransacted(channel)) {
                resourceHolder.rollbackAll();
            }
            throw this.convertRabbitAccessException(ex);
        }
        finally {
            if (resourceHolder != null) {
                ConnectionFactoryUtils.releaseResources(resourceHolder);
            } else {
                RabbitUtils.closeChannel(channel);
                RabbitUtils.closeConnection(connection);
            }
        }
    }

    public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) {
        PublisherCallbackChannelConnectionFactory pcccf;
        this.confirmsOrReturnsCapable = connectionFactory instanceof PublisherCallbackChannelConnectionFactory ? Boolean.valueOf((pcccf = (PublisherCallbackChannelConnectionFactory)((Object)connectionFactory)).isPublisherConfirms() || pcccf.isPublisherReturns()) : Boolean.FALSE;
    }

    protected void doSend(Channel channel, String exchange, String routingKey, Message message, boolean mandatory, CorrelationData correlationData) throws Exception {
        String userId;
        if (exchange == null) {
            exchange = this.exchange;
        }
        if (routingKey == null) {
            routingKey = this.routingKey;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]"));
        }
        this.setupConfirm(channel, correlationData);
        Message messageToUse = message;
        MessageProperties messageProperties = messageToUse.getMessageProperties();
        if (mandatory) {
            messageProperties.getHeaders().put("spring_listener_return_correlation", this.uuid);
        }
        if (this.beforePublishPostProcessors != null) {
            for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
                messageToUse = processor.postProcessMessage(messageToUse);
            }
        }
        if (this.userIdExpression != null && messageProperties.getUserId() == null && (userId = (String)this.userIdExpression.getValue((EvaluationContext)this.evaluationContext, (Object)messageToUse, String.class)) != null) {
            messageProperties.setUserId(userId);
        }
        AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(messageProperties, this.encoding);
        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
        if (this.isChannelLocallyTransacted(channel)) {
            RabbitUtils.commitIfNecessary(channel);
        }
    }

    private void setupConfirm(Channel channel, CorrelationData correlationData) {
        if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
            PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel)channel;
            publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(), new PendingConfirm(correlationData, System.currentTimeMillis()));
        }
    }

    protected boolean isChannelLocallyTransacted(Channel channel) {
        return this.isChannelTransacted() && !ConnectionFactoryUtils.isChannelTransactional(channel, this.getConnectionFactory());
    }

    private Message buildMessageFromDelivery(QueueingConsumer.Delivery delivery) {
        return this.buildMessage(delivery.getEnvelope(), delivery.getProperties(), delivery.getBody(), -1);
    }

    private Message buildMessageFromResponse(GetResponse response) {
        return this.buildMessage(response.getEnvelope(), response.getProps(), response.getBody(), response.getMessageCount());
    }

    private Message buildMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, int msgCount) {
        MessageProperties messageProps = this.messagePropertiesConverter.toMessageProperties(properties, envelope, this.encoding);
        if (msgCount >= 0) {
            messageProps.setMessageCount(Integer.valueOf(msgCount));
        }
        Message message = new Message(body, messageProps);
        if (this.afterReceivePostProcessors != null) {
            for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
                message = processor.postProcessMessage(message);
            }
        }
        return message;
    }

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        MessageConverter converter = this.getMessageConverter();
        if (converter == null) {
            throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        }
        return converter;
    }

    private String getRequiredQueue() throws IllegalStateException {
        String name = this.queue;
        if (name == null) {
            throw new AmqpIllegalStateException("No 'queue' specified. Check configuration of RabbitTemplate.");
        }
        return name;
    }

    private Address getReplyToAddress(Message request) throws AmqpException {
        Address replyTo = request.getMessageProperties().getReplyToAddress();
        if (replyTo == null) {
            if (this.exchange == null) {
                throw new AmqpException("Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default Exchange was set.");
            }
            replyTo = new Address(this.exchange, this.routingKey);
        }
        return replyTo;
    }

    private void addListener(Channel channel) {
        if (channel instanceof PublisherCallbackChannel) {
            Channel key;
            PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel)channel;
            Channel channel2 = key = channel instanceof ChannelProxy ? ((ChannelProxy)channel).getTargetChannel() : channel;
            if (this.publisherConfirmChannels.putIfAbsent(key, this) == null) {
                publisherCallbackChannel.addListener(this);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Added pubsub channel: " + channel + " to map, size now " + this.publisherConfirmChannels.size()));
                }
            }
        } else {
            throw new IllegalStateException("Channel does not support confirms or returns; is the connection factory configured for confirms or returns?");
        }
    }

    @Override
    public void handleConfirm(PendingConfirm pendingConfirm, boolean ack) {
        if (this.confirmCallback != null) {
            this.confirmCallback.confirm(pendingConfirm.getCorrelationData(), ack, pendingConfirm.getCause());
        } else if (this.logger.isDebugEnabled()) {
            this.logger.warn((Object)"Confirm received but no callback available");
        }
    }

    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        ReturnCallback returnCallback = this.returnCallback;
        if (returnCallback == null) {
            Object messageTagHeader = properties.getHeaders().remove(RETURN_CORRELATION_KEY);
            if (messageTagHeader != null) {
                String messageTag = messageTagHeader.toString();
                final PendingReply pendingReply = this.replyHolder.get(messageTag);
                if (pendingReply != null) {
                    returnCallback = new ReturnCallback(){

                        @Override
                        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                            pendingReply.returned(new AmqpMessageReturnedException("Message returned", message, replyCode, replyText, exchange, routingKey));
                        }
                    };
                } else if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)"Returned request message but caller has timed out");
                }
            } else if (this.logger.isWarnEnabled()) {
                this.logger.warn((Object)"Returned message but no callback available");
            }
        }
        if (returnCallback != null) {
            properties.getHeaders().remove("spring_listener_return_correlation");
            MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(properties, null, this.encoding);
            Message returnedMessage = new Message(body, messageProperties);
            returnCallback.returnedMessage(returnedMessage, replyCode, replyText, exchange, routingKey);
        }
    }

    @Override
    public boolean isConfirmListener() {
        return this.confirmCallback != null;
    }

    @Override
    public boolean isReturnListener() {
        return true;
    }

    @Override
    public void revoke(Channel channel) {
        this.publisherConfirmChannels.remove(channel);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Removed pubsub channel: " + channel + " from map, size now " + this.publisherConfirmChannels.size()));
        }
    }

    @Override
    public String getUUID() {
        return this.uuid;
    }

    public void onMessage(Message message) {
        try {
            String messageTag = this.correlationKey == null ? new String(message.getMessageProperties().getCorrelationId(), this.encoding) : (String)message.getMessageProperties().getHeaders().get(this.correlationKey);
            if (messageTag == null) {
                this.logger.error((Object)"No correlation header in reply");
                return;
            }
            PendingReply pendingReply = this.replyHolder.get(messageTag);
            if (pendingReply == null) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)("Reply received after timeout for " + messageTag));
                }
                throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
            }
            String savedCorrelation = pendingReply.getSavedCorrelation();
            if (this.correlationKey == null) {
                if (savedCorrelation == null) {
                    message.getMessageProperties().setCorrelationId(null);
                } else {
                    message.getMessageProperties().setCorrelationId(savedCorrelation.getBytes(this.encoding));
                }
            } else if (savedCorrelation != null) {
                message.getMessageProperties().setHeader(this.correlationKey, (Object)savedCorrelation);
            } else {
                message.getMessageProperties().getHeaders().remove(this.correlationKey);
            }
            String savedReplyTo = pendingReply.getSavedReplyTo();
            message.getMessageProperties().setReplyTo(savedReplyTo);
            pendingReply.reply(message);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Reply received for " + messageTag));
                if (savedReplyTo != null) {
                    this.logger.debug((Object)("Restored replyTo to " + savedReplyTo));
                }
            }
        }
        catch (UnsupportedEncodingException e) {
            throw new AmqpIllegalStateException("Invalid Character Set:" + this.encoding, (Throwable)e);
        }
    }

    private QueueingConsumer createQueueingConsumer(String queueName, Channel channel) throws Exception {
        channel.basicQos(1);
        final CountDownLatch latch = new CountDownLatch(1);
        QueueingConsumer consumer = new QueueingConsumer(channel){

            public void handleCancel(String consumerTag) throws IOException {
                super.handleCancel(consumerTag);
                latch.countDown();
            }

            public void handleConsumeOk(String consumerTag) {
                super.handleConsumeOk(consumerTag);
                latch.countDown();
            }
        };
        channel.basicConsume(queueName, (Consumer)consumer);
        if (!latch.await(10L, TimeUnit.SECONDS)) {
            if (channel instanceof ChannelProxy) {
                ((ChannelProxy)channel).getTargetChannel().close();
            }
            throw new AmqpException("Blocking receive, consumer failed to consume");
        }
        return consumer;
    }

    public static interface ReturnCallback {
        public void returnedMessage(Message var1, int var2, String var3, String var4, String var5);
    }

    public static interface ConfirmCallback {
        public void confirm(CorrelationData var1, boolean var2, String var3);
    }

    private static class PendingReply {
        private volatile String savedReplyTo;
        private volatile String savedCorrelation;
        private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);

        private PendingReply() {
        }

        public String getSavedReplyTo() {
            return this.savedReplyTo;
        }

        public void setSavedReplyTo(String savedReplyTo) {
            this.savedReplyTo = savedReplyTo;
        }

        public String getSavedCorrelation() {
            return this.savedCorrelation;
        }

        public void setSavedCorrelation(String savedCorrelation) {
            this.savedCorrelation = savedCorrelation;
        }

        public Message get() throws InterruptedException {
            Object reply = this.queue.take();
            return this.processReply(reply);
        }

        public Message get(long timeout, TimeUnit unit) throws InterruptedException {
            Object reply = this.queue.poll(timeout, unit);
            return reply == null ? null : this.processReply(reply);
        }

        private Message processReply(Object reply) {
            if (reply instanceof Message) {
                return (Message)reply;
            }
            if (reply instanceof AmqpException) {
                throw (AmqpException)((Object)reply);
            }
            throw new AmqpException("Unexpected reply type " + reply.getClass().getName());
        }

        public void reply(Message reply) {
            this.queue.add(reply);
        }

        public void returned(AmqpMessageReturnedException e) {
            this.queue.add(e);
        }
    }
}

