/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v1_0;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
import org.apache.qpid.server.protocol.v1_0.SendingLinkEndpoint;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.UnknownTransactionException;
import org.apache.qpid.server.protocol.v1_0.UnsettledAction;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerTarget_1_0
extends AbstractConsumerTarget<ConsumerTarget_1_0> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
    private final boolean _acquires;
    private long _deliveryTag = 0L;
    private Binary _transactionId;
    private final SendingLinkEndpoint _linkEndpoint;
    private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>(){

        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState) {
            if (this.isConsumerAcquiredStateForThis(oldState) && !this.isConsumerAcquiredStateForThis(newState)) {
                ConsumerTarget_1_0.this.removeUnacknowledgedMessage(entry);
                entry.removeStateChangeListener((StateChangeListener)this);
            }
        }

        private boolean isConsumerAcquiredStateForThis(MessageInstance.EntryState state) {
            return state instanceof MessageInstance.ConsumerAcquiredState && ((MessageInstance.ConsumerAcquiredState)state).getConsumer().getTarget() == ConsumerTarget_1_0.this;
        }
    };

    public ConsumerTarget_1_0(SendingLinkEndpoint linkEndpoint, boolean acquires) {
        super(false, linkEndpoint.getSession().getAMQPConnection());
        this._linkEndpoint = linkEndpoint;
        this._acquires = acquires;
    }

    private SendingLinkEndpoint getEndpoint() {
        return this._linkEndpoint;
    }

    public void updateNotifyWorkDesired() {
        boolean state = false;
        Session_1_0 session = this._linkEndpoint.getSession();
        if (session != null) {
            AMQPConnection amqpConnection = session.getAMQPConnection();
            state = !amqpConnection.isTransportBlockedForWriting() && this._linkEndpoint.isAttached() && this.getEndpoint().hasCreditToSend();
        }
        this.setNotifyWorkDesired(state);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch) {
        block41: {
            Message_1_0 message;
            MessageConverter converter;
            ServerMessage serverMessage = entry.getMessage();
            if (serverMessage instanceof Message_1_0) {
                converter = null;
                message = (Message_1_0)serverMessage;
            } else {
                converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
                if (converter == null) {
                    throw new ServerScopedRuntimeException(String.format("Could not find message converter from '%s' to '%s'. This is unexpected since we should not try to send if the converter is not present.", serverMessage.getClass(), Message_1_0.class));
                }
                message = (Message_1_0)converter.convert(serverMessage, this._linkEndpoint.getAddressSpace());
            }
            Transfer transfer = new Transfer();
            try {
                AbstractSection section;
                UnsignedInteger ttl;
                QpidByteBuffer bodyContent = message.getContent();
                HeaderSection headerSection = message.getHeaderSection();
                UnsignedInteger unsignedInteger = ttl = headerSection == null ? null : ((Header)headerSection.getValue()).getTtl();
                if (entry.getDeliveryCount() != 0 || ttl != null) {
                    Header header = new Header();
                    if (headerSection != null) {
                        Header oldHeader = (Header)headerSection.getValue();
                        header.setDurable(oldHeader.getDurable());
                        header.setPriority(oldHeader.getPriority());
                        if (ttl != null) {
                            long timeSpentOnBroker = System.currentTimeMillis() - message.getArrivalTime();
                            long adjustedTtl = Math.max(0L, ttl.longValue() - timeSpentOnBroker);
                            header.setTtl(UnsignedInteger.valueOf(adjustedTtl));
                        }
                        headerSection.dispose();
                    }
                    if (entry.getDeliveryCount() != 0) {
                        header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
                    }
                    headerSection = header.createEncodingRetainingSection();
                }
                ArrayList<QpidByteBuffer> payload = new ArrayList<QpidByteBuffer>();
                if (headerSection != null) {
                    payload.add(headerSection.getEncodedForm());
                    headerSection.dispose();
                }
                if ((section = message.getDeliveryAnnotationsSection()) != null) {
                    payload.add(section.getEncodedForm());
                    section.dispose();
                }
                if ((section = message.getMessageAnnotationsSection()) != null) {
                    payload.add(section.getEncodedForm());
                    section.dispose();
                }
                if ((section = message.getPropertiesSection()) != null) {
                    payload.add(section.getEncodedForm());
                    section.dispose();
                }
                if ((section = message.getApplicationPropertiesSection()) != null) {
                    payload.add(section.getEncodedForm());
                    section.dispose();
                }
                payload.add(bodyContent);
                section = message.getFooterSection();
                if (section != null) {
                    payload.add(section.getEncodedForm());
                    section.dispose();
                }
                try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);){
                    transfer.setPayload(combined);
                }
                payload.forEach(QpidByteBuffer::dispose);
                byte[] data = new byte[8];
                ByteBuffer.wrap(data).putLong(this._deliveryTag++);
                final Binary tag = new Binary(data);
                transfer.setDeliveryTag(tag);
                if (this._linkEndpoint.isAttached()) {
                    if (SenderSettleMode.SETTLED.equals(this.getEndpoint().getSendingSettlementMode())) {
                        transfer.setSettled(true);
                    } else {
                        UnsettledAction action;
                        if (this._acquires) {
                            action = new DispositionAction(tag, entry, consumer);
                            this.addUnacknowledgedMessage(entry);
                        } else {
                            action = new DoNothingAction();
                        }
                        this._linkEndpoint.addUnsettled(tag, action, entry);
                    }
                    if (this._transactionId != null) {
                        TransactionalState state = new TransactionalState();
                        state.setTxnId(this._transactionId);
                        transfer.setState(state);
                    }
                    if (this._acquires && this._transactionId != null) {
                        try {
                            ServerTransaction txn = this._linkEndpoint.getTransaction(this._transactionId);
                            txn.addPostTransactionAction(new ServerTransaction.Action(){

                                public void postCommit() {
                                }

                                public void onRollback() {
                                    entry.release(consumer);
                                    ConsumerTarget_1_0.this._linkEndpoint.updateDisposition(tag, null, true);
                                }
                            });
                        }
                        catch (UnknownTransactionException e) {
                            entry.release(consumer);
                            this.getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
                            transfer.dispose();
                            if (converter != null) {
                                converter.dispose((ServerMessage)message);
                            }
                            return;
                        }
                    }
                    this.getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
                    this.getEndpoint().transfer(transfer, false);
                    break block41;
                }
                entry.release(consumer);
            }
            finally {
                transfer.dispose();
                if (converter != null) {
                    converter.dispose((ServerMessage)message);
                }
            }
        }
    }

    public void flushBatched() {
    }

    public boolean allocateCredit(ServerMessage msg) {
        AMQPConnection_1_0<?> protocolEngine = this.getSession().getConnection();
        boolean hasCredit = this._linkEndpoint.isAttached() && this.getEndpoint().hasCreditToSend();
        this.updateNotifyWorkDesired();
        if (hasCredit) {
            this._linkEndpoint.setLinkCredit(this._linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
        }
        return hasCredit;
    }

    public void restoreCredit(ServerMessage message) {
        this._linkEndpoint.setLinkCredit(this._linkEndpoint.getLinkCredit().add(UnsignedInteger.ONE));
        this.updateNotifyWorkDesired();
    }

    public void noMessagesAvailable() {
        if (this._linkEndpoint.drained()) {
            this.updateNotifyWorkDesired();
        }
    }

    public void flowStateChanged() {
        this.updateNotifyWorkDesired();
        if (this._linkEndpoint != null) {
            this._transactionId = this._linkEndpoint.getTransactionId();
        }
    }

    public Session_1_0 getSession() {
        return this._linkEndpoint.getSession();
    }

    private void addUnacknowledgedMessage(MessageInstance entry) {
        this._unacknowledgedCount.incrementAndGet();
        this._unacknowledgedBytes.addAndGet(entry.getMessage().getSizeIncludingHeader());
        entry.addStateChangeListener(this._unacknowledgedMessageListener);
    }

    private void removeUnacknowledgedMessage(MessageInstance entry) {
        this._unacknowledgedBytes.addAndGet(-entry.getMessage().getSizeIncludingHeader());
        this._unacknowledgedCount.decrementAndGet();
    }

    public String getTargetAddress() {
        return ((Target)this._linkEndpoint.getTarget()).getAddress();
    }

    public String toString() {
        return "ConsumerTarget_1_0[linkSession=" + this._linkEndpoint.getSession().toLogString() + "]";
    }

    private class DoNothingAction
    implements UnsettledAction {
        @Override
        public boolean process(DeliveryState state, Boolean settled) {
            return true;
        }
    }

    private class DispositionAction
    implements UnsettledAction {
        private final MessageInstance _queueEntry;
        private final Binary _deliveryTag;
        private final MessageInstanceConsumer _consumer;

        public DispositionAction(Binary tag, MessageInstance queueEntry, MessageInstanceConsumer consumer) {
            this._deliveryTag = tag;
            this._queueEntry = queueEntry;
            this._consumer = consumer;
        }

        public MessageInstanceConsumer getConsumer() {
            return this._consumer;
        }

        @Override
        public boolean process(DeliveryState state, final Boolean settled) {
            ServerTransaction txn;
            Outcome outcome;
            Binary transactionId = null;
            if (state instanceof TransactionalState) {
                transactionId = ((TransactionalState)state).getTxnId();
                outcome = ((TransactionalState)state).getOutcome();
                try {
                    txn = ConsumerTarget_1_0.this._linkEndpoint.getTransaction(transactionId);
                    ConsumerTarget_1_0.this.getSession().getConnection().registerTransactedMessageDelivered();
                }
                catch (UnknownTransactionException e) {
                    ConsumerTarget_1_0.this.getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
                    this.applyModifiedOutcome();
                    return false;
                }
            } else if (state instanceof Outcome) {
                outcome = (Outcome)state;
                txn = new AutoCommitTransaction(ConsumerTarget_1_0.this.getSession().getConnection().getAddressSpace().getMessageStore());
            } else {
                outcome = null;
                txn = null;
            }
            if (outcome instanceof Accepted) {
                if (this._queueEntry.makeAcquisitionUnstealable(this.getConsumer())) {
                    txn.dequeue(this._queueEntry.getEnqueueRecord(), new ServerTransaction.Action(){

                        public void postCommit() {
                            if (DispositionAction.this._queueEntry.isAcquiredBy(DispositionAction.this.getConsumer())) {
                                DispositionAction.this._queueEntry.delete();
                            }
                        }

                        public void onRollback() {
                        }
                    });
                }
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        if (Boolean.TRUE.equals(settled)) {
                            ConsumerTarget_1_0.this._linkEndpoint.settle(DispositionAction.this._deliveryTag);
                        } else {
                            ConsumerTarget_1_0.this._linkEndpoint.updateDisposition(DispositionAction.this._deliveryTag, outcome, true);
                        }
                        ConsumerTarget_1_0.this._linkEndpoint.sendFlowConditional();
                    }

                    public void onRollback() {
                        if (Boolean.TRUE.equals(settled)) {
                            DispositionAction.this.applyModifiedOutcome();
                        }
                    }
                });
            } else if (outcome instanceof Released) {
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        DispositionAction.this._queueEntry.release(DispositionAction.this.getConsumer());
                        ConsumerTarget_1_0.this._linkEndpoint.settle(DispositionAction.this._deliveryTag);
                    }

                    public void onRollback() {
                        ConsumerTarget_1_0.this._linkEndpoint.settle(DispositionAction.this._deliveryTag);
                    }
                });
            } else if (outcome instanceof Modified) {
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        Modified modifiedOutcome = (Modified)outcome;
                        if (Boolean.TRUE.equals(modifiedOutcome.getUndeliverableHere())) {
                            DispositionAction.this._queueEntry.reject(DispositionAction.this.getConsumer());
                        }
                        if (Boolean.TRUE.equals(modifiedOutcome.getDeliveryFailed())) {
                            DispositionAction.this.incrementDeliveryCountOrRouteToAlternateOrDiscard();
                        } else {
                            DispositionAction.this._queueEntry.release(DispositionAction.this.getConsumer());
                        }
                        ConsumerTarget_1_0.this._linkEndpoint.settle(DispositionAction.this._deliveryTag);
                    }

                    public void onRollback() {
                        if (Boolean.TRUE.equals(settled)) {
                            DispositionAction.this.applyModifiedOutcome();
                        }
                    }
                });
            } else if (outcome instanceof Rejected) {
                txn.addPostTransactionAction(new ServerTransaction.Action(){

                    public void postCommit() {
                        ConsumerTarget_1_0.this._linkEndpoint.settle(DispositionAction.this._deliveryTag);
                        DispositionAction.this.incrementDeliveryCountOrRouteToAlternateOrDiscard();
                        ConsumerTarget_1_0.this._linkEndpoint.sendFlowConditional();
                    }

                    public void onRollback() {
                        if (Boolean.TRUE.equals(settled)) {
                            DispositionAction.this.applyModifiedOutcome();
                        }
                    }
                });
            }
            return transactionId == null && outcome != null;
        }

        private void applyModifiedOutcome() {
            Modified modified = new Modified();
            modified.setDeliveryFailed(true);
            ConsumerTarget_1_0.this._linkEndpoint.updateDisposition(this._deliveryTag, modified, true);
            ConsumerTarget_1_0.this._linkEndpoint.sendFlowConditional();
            this.incrementDeliveryCountOrRouteToAlternateOrDiscard();
        }

        private void incrementDeliveryCountOrRouteToAlternateOrDiscard() {
            this._queueEntry.incrementDeliveryCount();
            if (this._queueEntry.getMaximumDeliveryCount() > 0 && this._queueEntry.getDeliveryCount() >= this._queueEntry.getMaximumDeliveryCount()) {
                this.routeToAlternateOrDiscard();
            } else {
                this._queueEntry.release(this.getConsumer());
            }
        }

        private void routeToAlternateOrDiscard() {
            TransactionLogResource owningResource;
            Session_1_0 session = ConsumerTarget_1_0.this._linkEndpoint.getSession();
            final ServerMessage message = this._queueEntry.getMessage();
            final EventLogger eventLogger = session.getEventLogger();
            final LogSubject logSubject = session.getLogSubject();
            int requeues = 0;
            if (this._queueEntry.makeAcquisitionUnstealable(this.getConsumer())) {
                requeues = this._queueEntry.routeToAlternate((Action)new Action<MessageInstance>(){

                    public void performAction(MessageInstance requeueEntry) {
                        eventLogger.message(logSubject, ChannelMessages.DEADLETTERMSG((Number)message.getMessageNumber(), (String)requeueEntry.getOwningResource().getName()));
                    }
                }, null);
            }
            if (requeues == 0 && (owningResource = this._queueEntry.getOwningResource()) instanceof Queue) {
                Queue queue = (Queue)owningResource;
                MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
                if (alternateBindingDestination == null) {
                    eventLogger.message(logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH((Number)message.getMessageNumber(), (String)queue.getName(), (String)message.getInitialRoutingAddress()));
                } else {
                    eventLogger.message(logSubject, ChannelMessages.DISCARDMSG_NOROUTE((Number)message.getMessageNumber(), (String)alternateBindingDestination.getName()));
                }
            }
        }
    }
}

