/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v1_0;

import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.filter.selector.ParseException;
import org.apache.qpid.server.filter.selector.TokenMgrError;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
import org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint;
import org.apache.qpid.server.protocol.v1_0.ConsumerTarget_1_0;
import org.apache.qpid.server.protocol.v1_0.ExchangeSendingDestination;
import org.apache.qpid.server.protocol.v1_0.LinkImpl;
import org.apache.qpid.server.protocol.v1_0.Link_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.protocol.v1_0.SendingDestination;
import org.apache.qpid.server.protocol.v1_0.SequenceNumber;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.StandardSendingDestination;
import org.apache.qpid.server.protocol.v1_0.UnknownTransactionException;
import org.apache.qpid.server.protocol.v1_0.UnsettledAction;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SendingLinkEndpoint
extends AbstractLinkEndpoint<Source, Target> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
    private static final Symbol PRIORITY = Symbol.valueOf("priority");
    private static final Pattern ANY_CONTAINER_ID = Pattern.compile(".*");
    private final List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
    private final List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
    private final Map<Binary, OutgoingDelivery> _unsettled = new ConcurrentHashMap<Binary, OutgoingDelivery>();
    private final Action<Session_1_0> _cleanUpUnsettledDeliveryTask = object -> this.cleanUpUnsettledDeliveries();
    private volatile Binary _transactionId;
    private volatile Integer _priority;
    private volatile boolean _draining = false;
    private volatile SendingDestination _destination;
    private volatile EnumSet<ConsumerOption> _consumerOptions;
    private volatile FilterManager _consumerFilters;
    private volatile ConsumerTarget_1_0 _consumerTarget;
    private volatile MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;

    public SendingLinkEndpoint(Session_1_0 session, LinkImpl<Source, Target> link) {
        super(session, link);
        this.setDeliveryCount(new SequenceNumber(0));
        this.setAvailable(UnsignedInteger.valueOf(0));
        this.setCapabilities(Collections.singletonList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
    }

    @Override
    public void start() {
    }

    private void prepareConsumerOptionsAndFilters(SendingDestination destination) throws AmqpErrorException {
        this._destination = destination;
        Source source = (Source)this.getSource();
        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
        boolean noLocal = false;
        org.apache.qpid.server.filter.JMSSelectorFilter messageFilter = null;
        if (destination instanceof ExchangeSendingDestination) {
            options.add(ConsumerOption.ACQUIRES);
            options.add(ConsumerOption.SEES_REQUEUES);
        } else if (destination instanceof StandardSendingDestination) {
            MessageSource messageSource = this._destination.getMessageSource();
            if (messageSource instanceof Queue && ((Queue)messageSource).getAvailableAttributes().contains("topic")) {
                source.setDistributionMode(StdDistMode.COPY);
            }
            Map<Symbol, Filter> filters = source.getFilter();
            HashMap<Symbol, Filter> actualFilters = new HashMap<Symbol, Filter>();
            if (filters != null) {
                for (Map.Entry<Symbol, Filter> entry : filters.entrySet()) {
                    if (entry.getValue() instanceof NoLocalFilter) {
                        actualFilters.put(entry.getKey(), entry.getValue());
                        noLocal = true;
                        continue;
                    }
                    if (messageFilter != null || !(entry.getValue() instanceof JMSSelectorFilter)) continue;
                    JMSSelectorFilter selectorFilter = (JMSSelectorFilter)entry.getValue();
                    try {
                        messageFilter = new org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
                        actualFilters.put(entry.getKey(), entry.getValue());
                    }
                    catch (SelectorParsingException | ParseException | TokenMgrError e) {
                        Error error = new Error();
                        error.setCondition(AmqpError.INVALID_FIELD);
                        error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
                        error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
                        throw new AmqpErrorException(error);
                    }
                }
            }
            source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
            if (source.getDistributionMode() != StdDistMode.COPY) {
                options.add(ConsumerOption.ACQUIRES);
                options.add(ConsumerOption.SEES_REQUEUES);
            }
        } else {
            throw new ConnectionScopedRuntimeException("Unknown destination type");
        }
        if (noLocal) {
            options.add(ConsumerOption.NO_LOCAL);
        }
        FilterManager filters = null;
        if (messageFilter != null) {
            filters = new FilterManager();
            filters.add(messageFilter.getName(), messageFilter);
        }
        this._consumerOptions = options;
        this._consumerFilters = filters;
    }

    private void createConsumerTarget() throws AmqpErrorException {
        Source source = (Source)this.getSource();
        this._consumerTarget = new ConsumerTarget_1_0(this, this._destination instanceof ExchangeSendingDestination || source.getDistributionMode() != StdDistMode.COPY);
        try {
            String name = ((Target)this.getTarget()).getAddress() == null ? this.getLinkName() : ((Target)this.getTarget()).getAddress();
            this._consumer = this._destination.getMessageSource().addConsumer((ConsumerTarget)this._consumerTarget, this._consumerFilters, Message_1_0.class, name, this._consumerOptions, this.getPriority());
            this._consumerTarget.updateNotifyWorkDesired();
        }
        catch (MessageSource.ExistingExclusiveConsumer e) {
            String msg = "Cannot add a consumer to the destination as there is already an exclusive consumer";
            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), (Throwable)e);
        }
        catch (MessageSource.ExistingConsumerPreventsExclusive e) {
            String msg = "Cannot add an exclusive consumer to the destination as there is already a consumer";
            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), (Throwable)e);
        }
        catch (MessageSource.ConsumerAccessRefused e) {
            String msg = "Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy";
            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), (Throwable)e);
        }
        catch (MessageSource.QueueDeleted e) {
            String msg = "Cannot add a consumer to the destination as the destination has been deleted";
            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_DELETED, msg), (Throwable)e);
        }
    }

    @Override
    protected Map<Symbol, Object> initProperties(Attach attach) {
        Map<Symbol, Object> peerProperties = attach.getProperties();
        if (peerProperties != null) {
            HashMap<Symbol, Object> actualProperties = new HashMap<Symbol, Object>();
            if (peerProperties.containsKey(PRIORITY)) {
                Object value = peerProperties.get(PRIORITY);
                if (value instanceof Number) {
                    this._priority = ((Number)value).intValue();
                } else if (value instanceof String) {
                    try {
                        this._priority = Integer.parseInt(value.toString());
                    }
                    catch (NumberFormatException numberFormatException) {
                        // empty catch block
                    }
                }
                if (this._priority != null) {
                    actualProperties.put(PRIORITY, this._priority);
                }
            }
            return actualProperties;
        }
        return Collections.emptyMap();
    }

    @Override
    protected void reattachLink(Attach attach) throws AmqpErrorException {
        if (this.getSource() == null) {
            throw new IllegalStateException("Terminus should be set when resuming a Link.");
        }
        if (attach.getSource() == null) {
            throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
        }
        Source newSource = (Source)attach.getSource();
        Source oldSource = (Source)this.getSource();
        SendingDestination destination = this.getSession().getSendingDestination(this.getLink(), oldSource);
        this.prepareConsumerOptionsAndFilters(destination);
        if (this.getDestination() instanceof ExchangeSendingDestination && !Boolean.TRUE.equals(newSource.getDynamic())) {
            SendingDestination newDestination = this.getSession().getSendingDestination(this.getLink(), newSource);
            if (this.getSession().updateSourceForSubscription(this, newSource, newDestination)) {
                this.setDestination(newDestination);
            }
        }
        this.attachReceived(attach);
    }

    @Override
    protected void resumeLink(Attach attach) throws AmqpErrorException {
        if (this.getSource() == null) {
            throw new IllegalStateException("Terminus should be set when resuming a Link.");
        }
        if (attach.getSource() == null) {
            throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
        }
        Source newSource = (Source)attach.getSource();
        Source oldSource = (Source)this.getSource();
        SendingDestination destination = this.getSession().getSendingDestination(this.getLink(), oldSource);
        this.prepareConsumerOptionsAndFilters(destination);
        if (this.getDestination() instanceof ExchangeSendingDestination && !Boolean.TRUE.equals(newSource.getDynamic())) {
            SendingDestination newDestination = this.getSession().getSendingDestination(this.getLink(), newSource);
            if (this.getSession().updateSourceForSubscription(this, newSource, newDestination)) {
                this.setDestination(newDestination);
            }
        }
        this.attachReceived(attach);
    }

    @Override
    protected void establishLink(Attach attach) throws AmqpErrorException {
        if (this.getSource() != null || this.getTarget() != null) {
            throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
        }
        this.attachReceived(attach);
    }

    @Override
    protected void recoverLink(Attach attach) throws AmqpErrorException {
        List<Symbol> capabilities;
        Source source = (Source)this.getSource();
        if (source == null && attach.getDesiredCapabilities() != null && (capabilities = Arrays.asList(attach.getDesiredCapabilities())).contains(Session_1_0.GLOBAL_CAPABILITY) && capabilities.contains(Session_1_0.SHARED_CAPABILITY) && this.getLinkName().endsWith("|global")) {
            NamedAddressSpace namedAddressSpace = this.getSession().getConnection().getAddressSpace();
            Collection links = namedAddressSpace.findSendingLinks(ANY_CONTAINER_ID, Pattern.compile("^" + Pattern.quote(this.getLinkName()) + "$"));
            for (Link_1_0 link : links) {
                Object baseSource = link.getSource();
                if (!(baseSource instanceof Source)) continue;
                Source linkSource = (Source)baseSource;
                source = new Source(linkSource);
                this.getLink().setSource(source);
                break;
            }
        }
        if (source == null) {
            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
        }
        attach.setSource(source);
        this.receiveAttach(attach);
    }

    @Override
    public Role getRole() {
        return Role.SENDER;
    }

    private Integer getPriority() {
        return this._priority;
    }

    void transfer(Transfer xfr, boolean decrementCredit) {
        Session_1_0 s = this.getSession();
        xfr.setMessageFormat(UnsignedInteger.ZERO);
        if (decrementCredit) {
            this.setLinkCredit(this.getLinkCredit().subtract(UnsignedInteger.ONE));
        }
        this.getDeliveryCount().incr();
        xfr.setHandle(this.getLocalHandle());
        s.sendTransfer(xfr, this);
    }

    boolean drained() {
        if (this._draining) {
            this.getDeliveryCount().add(this.getLinkCredit().intValue());
            this.setLinkCredit(UnsignedInteger.ZERO);
            this.sendFlow();
            this._draining = false;
            return true;
        }
        return false;
    }

    @Override
    public void receiveFlow(Flow flow) {
        UnsignedInteger receiverDeliveryCount = flow.getDeliveryCount();
        UnsignedInteger receiverLinkCredit = flow.getLinkCredit();
        this.setDrain(flow.getDrain());
        Map<Symbol, Object> properties = flow.getProperties();
        if (properties != null) {
            Binary transactionId = (Binary)properties.get(Symbol.valueOf("txn-id"));
            if (transactionId != null) {
                try {
                    this.getSession().getTransaction(transactionId);
                }
                catch (UnknownTransactionException e) {
                    this.close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
                    return;
                }
            }
            this._transactionId = transactionId;
        }
        if (receiverDeliveryCount == null) {
            this.setLinkCredit(receiverLinkCredit);
        } else {
            UnsignedInteger limit = receiverDeliveryCount.add(receiverLinkCredit);
            if (limit.compareTo(this.getDeliveryCount().unsignedIntegerValue()) <= 0) {
                this.setLinkCredit(UnsignedInteger.valueOf(0));
            } else {
                this.setLinkCredit(limit.subtract(this.getDeliveryCount().unsignedIntegerValue()));
            }
        }
        this.flowStateChanged();
    }

    @Override
    public void flowStateChanged() {
        if (Boolean.TRUE.equals(this.getDrain())) {
            if (this.getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0) {
                this._draining = true;
                this.getSession().notifyWork((ConsumerTarget)this.getConsumerTarget());
            }
        } else {
            this._draining = false;
        }
        while (!this._resumeAcceptedTransfers.isEmpty() && this.hasCreditToSend()) {
            Accepted accepted = new Accepted();
            Transfer xfr = new Transfer();
            Binary dt = this._resumeAcceptedTransfers.remove(0);
            xfr.setDeliveryTag(dt);
            xfr.setState(accepted);
            xfr.setResume(Boolean.TRUE);
            this.transfer(xfr, true);
            xfr.dispose();
        }
        if (this._resumeAcceptedTransfers.isEmpty()) {
            this.getConsumerTarget().flowStateChanged();
        }
    }

    @Override
    protected void remoteDetachedPerformDetach(Detach detach) {
        TerminusExpiryPolicy expiryPolicy = ((Source)this.getSource()).getExpiryPolicy();
        if (Boolean.TRUE.equals(detach.getClosed()) || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy) || (expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && this.getSession().isClosing() || TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && this.getSession().getConnection().isClosing()) {
            this.cleanUpUnsettledDeliveries();
            this.close();
        } else if (detach.getError() != null) {
            this.cleanUpUnsettledDeliveries();
            this.detach();
            this.destroy();
            this.getConsumerTarget().updateNotifyWorkDesired();
        } else {
            this.detach();
            this.destroy();
            this.getConsumerTarget().updateNotifyWorkDesired();
        }
    }

    private void cleanUpUnsettledDeliveries() {
        this.getSession().removeDeleteTask(this._cleanUpUnsettledDeliveryTask);
        Modified state = new Modified();
        state.setDeliveryFailed(true);
        for (OutgoingDelivery delivery : this._unsettled.values()) {
            UnsettledAction action = delivery.getAction();
            if (action == null) continue;
            action.process(state, Boolean.TRUE);
            delivery.setAction(null);
        }
        this._unsettled.clear();
    }

    void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance messageInstance) {
        this._unsettled.put(tag, new OutgoingDelivery(messageInstance, unsettledAction, null));
    }

    @Override
    protected void handleDeliveryState(Binary deliveryTag, DeliveryState state, Boolean settled) {
        UnsettledAction action;
        OutgoingDelivery outgoingDelivery = this._unsettled.get(deliveryTag);
        boolean localSettle = false;
        if (outgoingDelivery != null && outgoingDelivery.getAction() != null && (localSettle = (action = outgoingDelivery.getAction()).process(state, settled)) && !Boolean.TRUE.equals(settled)) {
            this.updateDisposition(deliveryTag, state, true);
        }
        if (Boolean.TRUE.equals(settled) || localSettle) {
            this._unsettled.remove(deliveryTag);
        }
    }

    public ServerTransaction getTransaction(Binary transactionId) {
        Session_1_0 session = this.getSession();
        return session == null ? null : session.getTransaction(transactionId);
    }

    public boolean hasCreditToSend() {
        UnsignedInteger linkCredit = this.getLinkCredit();
        return linkCredit != null && linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0 && this.getSession().hasCreditToSend();
    }

    public void updateDisposition(Binary deliveryTag, DeliveryState state, boolean settled) {
        if (settled && this._unsettled.remove(deliveryTag) != null) {
            this.getSession().updateDisposition(this.getRole(), deliveryTag, state, settled);
        }
    }

    public Binary getTransactionId() {
        return this._transactionId;
    }

    @Override
    public void attachReceived(Attach attach) throws AmqpErrorException {
        super.attachReceived(attach);
        Target target = (Target)attach.getTarget();
        Source source = (Source)this.getSource();
        if (source == null) {
            source = new Source();
            Source attachSource = (Source)attach.getSource();
            Modified defaultOutcome = new Modified();
            defaultOutcome.setDeliveryFailed(true);
            source.setDefaultOutcome(defaultOutcome);
            source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Released.RELEASED_SYMBOL, Rejected.REJECTED_SYMBOL);
            source.setAddress(attachSource.getAddress());
            source.setDynamic(attachSource.getDynamic());
            if (Boolean.TRUE.equals(attachSource.getDynamic()) && attachSource.getDynamicNodeProperties() != null) {
                HashMap<Symbol, Object> dynamicNodeProperties = new HashMap<Symbol, Object>();
                if (attachSource.getDynamicNodeProperties().containsKey(Session_1_0.LIFETIME_POLICY)) {
                    dynamicNodeProperties.put(Session_1_0.LIFETIME_POLICY, attachSource.getDynamicNodeProperties().get(Session_1_0.LIFETIME_POLICY));
                }
                source.setDynamicNodeProperties(dynamicNodeProperties);
            }
            source.setDurable(TerminusDurability.min(attachSource.getDurable(), this.getLink().getHighestSupportedTerminusDurability()));
            source.setExpiryPolicy(attachSource.getExpiryPolicy());
            source.setDistributionMode(attachSource.getDistributionMode());
            source.setFilter(attachSource.getFilter());
            source.setCapabilities(attachSource.getCapabilities());
            SendingDestination destination = this.getSession().getSendingDestination(this.getLink(), source);
            source.setCapabilities(destination.getCapabilities());
            this.getLink().setSource(source);
            this.prepareConsumerOptionsAndFilters(destination);
        }
        this.getLink().setTarget(target);
        final MessageInstanceConsumer<ConsumerTarget_1_0> oldConsumer = this.getConsumer();
        this.createConsumerTarget();
        this._resumeAcceptedTransfers.clear();
        this._resumeFullTransfers.clear();
        NamedAddressSpace addressSpace = this.getSession().getConnection().getAddressSpace();
        this.cleanUpUnsettledDeliveries();
        this.getSession().addDeleteTask(this._cleanUpUnsettledDeliveryTask);
        HashMap<Binary, OutgoingDelivery> unsettledCopy = new HashMap<Binary, OutgoingDelivery>(this._unsettled);
        HashMap<Binary, DeliveryState> remoteUnsettled = attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<Binary, DeliveryState>(attach.getUnsettled());
        boolean isUnsettledComplete = !Boolean.TRUE.equals(attach.getIncompleteUnsettled());
        for (Map.Entry entry : unsettledCopy.entrySet()) {
            Binary deliveryTag = (Binary)entry.getKey();
            final MessageInstance queueEntry = ((OutgoingDelivery)entry.getValue()).getMessageInstance();
            if (!remoteUnsettled.containsKey(deliveryTag) && isUnsettledComplete) {
                queueEntry.setRedelivered();
                queueEntry.release(oldConsumer);
                this._unsettled.remove(deliveryTag);
                continue;
            }
            if (remoteUnsettled.get(deliveryTag) instanceof Outcome) {
                AutoCommitTransaction txn;
                Outcome outcome = (Outcome)remoteUnsettled.get(deliveryTag);
                if (outcome instanceof Accepted) {
                    if (oldConsumer.acquires()) {
                        txn = new AutoCommitTransaction(addressSpace.getMessageStore());
                        if (queueEntry.acquire() || queueEntry.isAcquired()) {
                            txn.dequeue(Collections.singleton(queueEntry), new ServerTransaction.Action(){

                                public void postCommit() {
                                    queueEntry.delete();
                                }

                                public void onRollback() {
                                }
                            });
                        }
                    }
                } else if (outcome instanceof Released && oldConsumer.acquires()) {
                    txn = new AutoCommitTransaction(addressSpace.getMessageStore());
                    txn.dequeue(Collections.singleton(queueEntry), new ServerTransaction.Action(){

                        public void postCommit() {
                            queueEntry.release(oldConsumer);
                        }

                        public void onRollback() {
                        }
                    });
                }
                remoteUnsettled.remove(deliveryTag);
                this._resumeAcceptedTransfers.add(deliveryTag);
                continue;
            }
            this._resumeFullTransfers.add(queueEntry);
        }
        this.getConsumerTarget().updateNotifyWorkDesired();
    }

    @Override
    protected Map<Binary, DeliveryState> getLocalUnsettled() {
        HashMap<Binary, DeliveryState> unsettled = new HashMap<Binary, DeliveryState>();
        for (Map.Entry<Binary, OutgoingDelivery> entry : this._unsettled.entrySet()) {
            unsettled.put(entry.getKey(), entry.getValue().getLocalState());
        }
        return unsettled;
    }

    private MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer() {
        return this._consumer;
    }

    ConsumerTarget_1_0 getConsumerTarget() {
        return this._consumerTarget;
    }

    public SendingDestination getDestination() {
        return this._destination;
    }

    public void setDestination(SendingDestination destination) {
        this._destination = destination;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void detach(Error error, boolean close) {
        List sourceCapabilities;
        if (this._consumerTarget != null) {
            this._consumerTarget.close();
        }
        Source source = (Source)this.getSource();
        TerminusExpiryPolicy expiryPolicy = source.getExpiryPolicy();
        NamedAddressSpace addressSpace = this.getSession().getConnection().getAddressSpace();
        List<Object> list = sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());
        if (close || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy) || (expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && this.getSession().isClosing() || TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && this.getSession().getConnection().isClosing()) {
            Error closingError = null;
            if (this.getDestination() instanceof ExchangeSendingDestination && addressSpace instanceof QueueManagingVirtualHost) {
                this.cleanUpUnsettledDeliveries();
                try {
                    ((QueueManagingVirtualHost)addressSpace).removeSubscriptionQueue(((ExchangeSendingDestination)this.getDestination()).getQueue().getName());
                    TerminusDurability sourceDurability = source.getDurable();
                    if (sourceDurability != null && !TerminusDurability.NONE.equals(sourceDurability) && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY) && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)) {
                        Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY) ? ANY_CONTAINER_ID : Pattern.compile("^" + Pattern.quote(this.getSession().getConnection().getRemoteContainerId()) + "$");
                        Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(this.getLinkName()) + "\\|?\\d*$");
                        Collection links = addressSpace.findSendingLinks(containerIdPattern, linkNamePattern);
                        for (LinkModel link : links) {
                            if (!(link instanceof Link_1_0)) continue;
                            ((Link_1_0)link).linkClosed();
                        }
                    }
                }
                catch (AccessControlException e) {
                    LOGGER.error("Error unregistering subscription", (Throwable)e);
                    closingError = new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription");
                }
                catch (IllegalStateException e) {
                    closingError = new Error(AmqpError.RESOURCE_LOCKED, e.getMessage());
                }
                catch (NotFoundException e) {
                    closingError = new Error(AmqpError.NOT_FOUND, e.getMessage());
                }
            }
            if (error == null) {
                error = closingError;
            } else {
                LOGGER.warn("Unexpected error on detaching endpoint {}: {}", (Object)this.getLinkName(), (Object)error);
            }
        } else if (addressSpace instanceof QueueManagingVirtualHost && ((QueueManagingVirtualHost)addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach() && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY) && sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY) && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)) {
            if (!this.getLinkName().endsWith("|global")) {
                this.getLink().linkClosed();
            } else {
                Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(this.getLinkName()) + "$");
                NamedAddressSpace namedAddressSpace = addressSpace;
                synchronized (namedAddressSpace) {
                    Collection links = addressSpace.findSendingLinks(ANY_CONTAINER_ID, linkNamePattern);
                    if (links.size() > 1) {
                        this.getLink().linkClosed();
                    }
                }
            }
        }
        super.detach(error, close);
    }

    private static class OutgoingDelivery {
        private final MessageInstance _messageInstance;
        private volatile UnsettledAction _action;
        private volatile DeliveryState _localState;

        public OutgoingDelivery(MessageInstance messageInstance, UnsettledAction action, DeliveryState localState) {
            this._messageInstance = messageInstance;
            this._action = action;
            this._localState = localState;
        }

        public MessageInstance getMessageInstance() {
            return this._messageInstance;
        }

        public UnsettledAction getAction() {
            return this._action;
        }

        public DeliveryState getLocalState() {
            return this._localState;
        }

        public void setLocalState(DeliveryState localState) {
            this._localState = localState;
        }

        public void setAction(UnsettledAction action) {
            this._action = action;
        }
    }
}

