/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.ClientConstants;
import com.microsoft.azure.servicebus.ClientEntity;
import com.microsoft.azure.servicebus.ErrorContext;
import com.microsoft.azure.servicebus.ExceptionUtil;
import com.microsoft.azure.servicebus.IErrorContextProvider;
import com.microsoft.azure.servicebus.IteratorUtil;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.OperationCancelledException;
import com.microsoft.azure.servicebus.PayloadSizeExceededException;
import com.microsoft.azure.servicebus.ReplayableWorkItem;
import com.microsoft.azure.servicebus.RetryPolicy;
import com.microsoft.azure.servicebus.SenderContext;
import com.microsoft.azure.servicebus.ServerBusyException;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.StringUtil;
import com.microsoft.azure.servicebus.TimeoutException;
import com.microsoft.azure.servicebus.TimeoutTracker;
import com.microsoft.azure.servicebus.Timer;
import com.microsoft.azure.servicebus.TimerType;
import com.microsoft.azure.servicebus.Util;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

public class MessageSender
extends ClientEntity
implements IAmqpSender,
IErrorContextProvider {
    private static final Logger TRACE_LOGGER = Logger.getLogger("servicebus.trace");
    private static final String SEND_TIMED_OUT = "Send operation timed out";
    private final MessagingFactory underlyingFactory;
    private final String sendPath;
    private final Duration operationTimeout;
    private final RetryPolicy retryPolicy;
    private final CompletableFuture<Void> linkClose;
    private final Object pendingSendLock;
    private final ConcurrentHashMap<String, ReplayableWorkItem<Void>> pendingSendsData;
    private final PriorityQueue<WeightedDeliveryTag> pendingSends;
    private final DispatchHandler sendWork;
    private Sender sendLink;
    private CompletableFuture<MessageSender> linkFirstOpen;
    private int linkCredit;
    private TimeoutTracker openLinkTracker;
    private Exception lastKnownLinkError;
    private Instant lastKnownErrorReportedAt;

    public static CompletableFuture<MessageSender> create(MessagingFactory factory, String sendLinkName, String senderPath) {
        final MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath);
        msgSender.openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
        msgSender.initializeLinkOpen(msgSender.openLinkTracker);
        try {
            msgSender.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                @Override
                public void onEvent() {
                    msgSender.createSendLink();
                }
            });
        }
        catch (IOException ioException) {
            msgSender.linkFirstOpen.completeExceptionally(new ServiceBusException(false, "Failed to create Sender, see cause for more details.", ioException));
        }
        return msgSender.linkFirstOpen;
    }

    private MessageSender(MessagingFactory factory, String sendLinkName, String senderPath) {
        super(sendLinkName, factory);
        this.sendPath = senderPath;
        this.underlyingFactory = factory;
        this.operationTimeout = factory.getOperationTimeout();
        this.lastKnownLinkError = null;
        this.lastKnownErrorReportedAt = Instant.EPOCH;
        this.retryPolicy = factory.getRetryPolicy();
        this.pendingSendLock = new Object();
        this.pendingSendsData = new ConcurrentHashMap();
        this.pendingSends = new PriorityQueue<WeightedDeliveryTag>(1000, new DeliveryTagComparator());
        this.linkCredit = 0;
        this.linkClose = new CompletableFuture();
        this.sendWork = new DispatchHandler(){

            @Override
            public void onEvent() {
                MessageSender.this.processSendWork();
            }
        };
    }

    public String getSendPath() {
        return this.sendPath;
    }

    private CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat) {
        return this.send(bytes, arrayOffset, messageFormat, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> sendCore(byte[] bytes, int arrayOffset, int messageFormat, CompletableFuture<Void> onSend, TimeoutTracker tracker, String deliveryTag, Exception lastKnownError, ScheduledFuture<?> timeoutTask) {
        ReplayableWorkItem<Void> sendWaiterData;
        this.throwIfClosed(this.lastKnownLinkError);
        if (tracker != null && onSend != null && (tracker.remaining().isNegative() || tracker.remaining().isZero())) {
            if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                TRACE_LOGGER.log(Level.FINE, String.format(Locale.US, "path[%s], linkName[%s], deliveryTag[%s] - timed out at sendCore", this.sendPath, this.sendLink.getName(), deliveryTag));
            }
            if (timeoutTask != null) {
                timeoutTask.cancel(false);
            }
            this.throwSenderTimeout(onSend, null);
            return onSend;
        }
        boolean isRetrySend = onSend != null;
        String tag = deliveryTag == null ? UUID.randomUUID().toString().replace("-", "") : deliveryTag;
        CompletableFuture<Void> onSendFuture = onSend == null ? new CompletableFuture<Void>() : onSend;
        ReplayableWorkItem<Void> replayableWorkItem = sendWaiterData = tracker == null ? new ReplayableWorkItem<Void>(bytes, arrayOffset, messageFormat, onSendFuture, this.operationTimeout) : new ReplayableWorkItem<Void>(bytes, arrayOffset, messageFormat, onSendFuture, tracker);
        if (lastKnownError != null) {
            sendWaiterData.setLastKnownException(lastKnownError);
        }
        Object object = this.pendingSendLock;
        synchronized (object) {
            this.pendingSendsData.put(tag, sendWaiterData);
            this.pendingSends.offer(new WeightedDeliveryTag(tag, isRetrySend ? 1 : 0));
        }
        try {
            this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
        }
        catch (IOException ioException) {
            onSendFuture.completeExceptionally(new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", ioException));
        }
        return onSendFuture;
    }

    private CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat, CompletableFuture<Void> onSend, TimeoutTracker tracker) {
        return this.sendCore(bytes, arrayOffset, messageFormat, onSend, tracker, null, null, null);
    }

    private int getPayloadSize(Message msg) {
        if (msg == null || msg.getBody() == null) {
            return 0;
        }
        Data payloadSection = (Data)msg.getBody();
        if (payloadSection == null) {
            return 0;
        }
        Binary payloadBytes = payloadSection.getValue();
        if (payloadBytes == null) {
            return 0;
        }
        return payloadBytes.getLength();
    }

    private int getDataSerializedSize(Message amqpMessage) {
        if (amqpMessage == null) {
            return 0;
        }
        int payloadSize = this.getPayloadSize(amqpMessage);
        MessageAnnotations messageAnnotations = amqpMessage.getMessageAnnotations();
        ApplicationProperties applicationProperties = amqpMessage.getApplicationProperties();
        int annotationsSize = 0;
        int applicationPropertiesSize = 0;
        if (messageAnnotations != null) {
            for (Object value : messageAnnotations.getValue().keySet()) {
                annotationsSize += Util.sizeof(value);
            }
            for (Object value : messageAnnotations.getValue().values()) {
                annotationsSize += Util.sizeof(value);
            }
        }
        if (applicationProperties != null) {
            for (Object value : applicationProperties.getValue().keySet()) {
                applicationPropertiesSize += Util.sizeof(value);
            }
            for (Object value : applicationProperties.getValue().values()) {
                applicationPropertiesSize += Util.sizeof(value);
            }
        }
        return annotationsSize + applicationPropertiesSize + payloadSize;
    }

    public CompletableFuture<Void> send(Iterable<Message> messages) {
        int encodedSize;
        if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
            throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
        }
        Message firstMessage = messages.iterator().next();
        if (IteratorUtil.sizeEquals(messages, 1)) {
            return this.send(firstMessage);
        }
        Message batchMessage = Proton.message();
        batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
        byte[] bytes = new byte[262144];
        int byteArrayOffset = encodedSize = batchMessage.encode(bytes, 0, 262144);
        for (Message amqpMessage : messages) {
            Message messageWrappedByData = Proton.message();
            int payloadSize = this.getDataSerializedSize(amqpMessage);
            int allocationSize = Math.min(payloadSize + 512, 262144);
            byte[] messageBytes = new byte[allocationSize];
            int messageSizeBytes = amqpMessage.encode(messageBytes, 0, allocationSize);
            messageWrappedByData.setBody((Section)new Data(new Binary(messageBytes, 0, messageSizeBytes)));
            try {
                encodedSize = messageWrappedByData.encode(bytes, byteArrayOffset, 262144 - byteArrayOffset - 1);
            }
            catch (BufferOverflowException exception) {
                CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
                sendTask.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", 256), (Throwable)exception));
                return sendTask;
            }
            byteArrayOffset += encodedSize;
        }
        return this.send(bytes, byteArrayOffset, -2147404032);
    }

    public CompletableFuture<Void> send(Message msg) {
        int payloadSize = this.getDataSerializedSize(msg);
        int allocationSize = Math.min(payloadSize + 512, 262144);
        byte[] bytes = new byte[allocationSize];
        int encodedSize = 0;
        try {
            encodedSize = msg.encode(bytes, 0, allocationSize);
        }
        catch (BufferOverflowException exception) {
            CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
            sendTask.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", 256), (Throwable)exception));
            return sendTask;
        }
        return this.send(bytes, encodedSize, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onOpenComplete(Exception completionException) {
        if (completionException == null) {
            this.openLinkTracker = null;
            this.lastKnownLinkError = null;
            this.retryPolicy.resetRetryCount(this.getClientId());
            if (!this.linkFirstOpen.isDone()) {
                this.linkFirstOpen.complete(this);
            } else {
                Object object = this.pendingSendLock;
                synchronized (object) {
                    if (!this.pendingSendsData.isEmpty()) {
                        LinkedList unacknowledgedSends = new LinkedList();
                        unacknowledgedSends.addAll(this.pendingSendsData.keySet());
                        if (unacknowledgedSends.size() > 0) {
                            for (String unacknowledgedSend : unacknowledgedSends) {
                                if (!this.pendingSendsData.get(unacknowledgedSend).isWaitingForAck()) continue;
                                this.pendingSends.offer(new WeightedDeliveryTag(unacknowledgedSend, 1));
                            }
                        }
                        unacknowledgedSends.clear();
                    }
                }
            }
        } else if (!this.linkFirstOpen.isDone()) {
            this.setClosed();
            ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this);
        }
    }

    @Override
    public void onClose(ErrorCondition condition) {
        Exception completionException = condition != null ? ExceptionUtil.toException(condition) : new ServiceBusException(true, "The entity has been close due to transient failures (underlying link closed), please retry the operation.");
        this.onError(completionException);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(Exception completionException) {
        Duration nextRetryInterval;
        TimeoutTracker tracker;
        this.linkCredit = 0;
        if (this.getIsClosingOrClosed()) {
            Object object = this.pendingSendLock;
            synchronized (object) {
                for (Map.Entry<String, ReplayableWorkItem<Void>> pendingSend : this.pendingSendsData.entrySet()) {
                    ExceptionUtil.completeExceptionally(pendingSend.getValue().getWork(), completionException == null ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.") : completionException, this);
                }
                this.pendingSendsData.clear();
                this.pendingSends.clear();
            }
            this.linkClose.complete(null);
            return;
        }
        this.lastKnownLinkError = completionException;
        this.lastKnownErrorReportedAt = Instant.now();
        this.onOpenComplete(completionException);
        if (!(completionException == null || completionException instanceof ServiceBusException && ((ServiceBusException)completionException).getIsTransient())) {
            Object object = this.pendingSendLock;
            synchronized (object) {
                for (Map.Entry<String, ReplayableWorkItem<Void>> pendingSend : this.pendingSendsData.entrySet()) {
                    this.cleanupFailedSend(pendingSend.getValue(), completionException);
                }
                this.pendingSendsData.clear();
                this.pendingSends.clear();
            }
        }
        Map.Entry<String, ReplayableWorkItem<Void>> pendingSendEntry = IteratorUtil.getFirst(this.pendingSendsData.entrySet());
        if (pendingSendEntry != null && pendingSendEntry.getValue() != null && (tracker = pendingSendEntry.getValue().getTimeoutTracker()) != null && (nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, tracker.remaining())) != null) {
            try {
                this.underlyingFactory.scheduleOnReactorThread((int)nextRetryInterval.toMillis(), new DispatchHandler(){

                    @Override
                    public void onEvent() {
                        if (MessageSender.this.sendLink.getLocalState() == EndpointState.CLOSED || MessageSender.this.sendLink.getRemoteState() == EndpointState.CLOSED) {
                            MessageSender.this.createSendLink();
                        }
                    }
                });
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    @Override
    public void onSendComplete(Delivery delivery) {
        ReplayableWorkItem<Void> pendingSendWorkItem;
        DeliveryState outcome = delivery.getRemoteState();
        final String deliveryTag = new String(delivery.getTag());
        if (TRACE_LOGGER.isLoggable(Level.FINEST)) {
            TRACE_LOGGER.log(Level.FINEST, String.format(Locale.US, "path[%s], linkName[%s], deliveryTag[%s]", this.sendPath, this.sendLink.getName(), deliveryTag));
        }
        if ((pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag)) != null) {
            if (outcome instanceof Accepted) {
                this.lastKnownLinkError = null;
                this.retryPolicy.resetRetryCount(this.getClientId());
                pendingSendWorkItem.getTimeoutTask().cancel(false);
                pendingSendWorkItem.getWork().complete(null);
            } else if (outcome instanceof Rejected) {
                Duration retryInterval;
                Rejected rejected = (Rejected)outcome;
                ErrorCondition error = rejected.getError();
                Exception exception = ExceptionUtil.toException(error);
                if (ExceptionUtil.isGeneralSendError(error.getCondition())) {
                    this.lastKnownLinkError = exception;
                    this.lastKnownErrorReportedAt = Instant.now();
                }
                if ((retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining())) == null) {
                    this.cleanupFailedSend(pendingSendWorkItem, exception);
                } else {
                    pendingSendWorkItem.setLastKnownException(exception);
                    try {
                        this.underlyingFactory.scheduleOnReactorThread((int)retryInterval.toMillis(), new DispatchHandler(){

                            @Override
                            public void onEvent() {
                                MessageSender.this.reSend(deliveryTag, pendingSendWorkItem, false);
                            }
                        });
                    }
                    catch (IOException ioException) {
                        exception.initCause(ioException);
                        this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, "Send operation failed while scheduling a retry on Reactor, see cause for more details.", ioException));
                    }
                }
            } else if (outcome instanceof Released) {
                this.cleanupFailedSend(pendingSendWorkItem, new OperationCancelledException(outcome.toString()));
            } else {
                this.cleanupFailedSend(pendingSendWorkItem, new ServiceBusException(false, outcome.toString()));
            }
        } else if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
            TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "path[%s], linkName[%s], delivery[%s] - mismatch", this.sendPath, this.sendLink.getName(), deliveryTag));
        }
    }

    private void reSend(String deliveryTag, ReplayableWorkItem<Void> pendingSend, boolean reuseDeliveryTag) {
        if (pendingSend != null) {
            this.sendCore(pendingSend.getMessage(), pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), pendingSend.getWork(), pendingSend.getTimeoutTracker(), reuseDeliveryTag ? deliveryTag : null, pendingSend.getLastKnownException(), pendingSend.getTimeoutTask());
        }
    }

    private void cleanupFailedSend(ReplayableWorkItem<Void> failedSend, Exception exception) {
        if (failedSend.getTimeoutTask() != null) {
            failedSend.getTimeoutTask().cancel(false);
        }
        ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this);
    }

    private void createSendLink() {
        Connection connection = this.underlyingFactory.getConnection();
        Session session = connection.session();
        session.setOutgoingWindow(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler((Extendable)session, (Handler)new SessionHandler(this.sendPath));
        String sendLinkNamePrefix = StringUtil.getRandomString();
        String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? sendLinkNamePrefix.concat("_").concat(connection.getRemoteContainer()) : sendLinkNamePrefix;
        Sender sender = session.sender(sendLinkName);
        Target target = new Target();
        target.setAddress(this.sendPath);
        sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        Source source = new Source();
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        SendLinkHandler handler = new SendLinkHandler(this);
        BaseHandler.setHandler((Extendable)sender, (Handler)handler);
        this.underlyingFactory.registerForConnectionError((Link)sender);
        sender.open();
        if (this.sendLink != null) {
            Sender oldSender = this.sendLink;
            this.underlyingFactory.deregisterForConnectionError((Link)oldSender);
        }
        this.sendLink = sender;
    }

    private void initializeLinkOpen(TimeoutTracker timeout) {
        this.linkFirstOpen = new CompletableFuture();
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                if (!MessageSender.this.linkFirstOpen.isDone()) {
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", MessageSender.this.sendLink.getName(), MessageSender.this.getSendPath(), ZonedDateTime.now().toString()), MessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(4L)) ? MessageSender.this.lastKnownLinkError : null);
                    if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                        TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "path[%s], linkName[%s], open call timedout", MessageSender.this.sendPath, MessageSender.this.sendLink.getName()), operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageSender.this.linkFirstOpen, operationTimedout, MessageSender.this);
                }
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    public ErrorContext getContext() {
        boolean isLinkOpened;
        boolean bl = isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone();
        String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() : (this.sendLink != null ? this.sendLink.getName() : null);
        SenderContext errorContext = new SenderContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, referenceId, isLinkOpened && this.sendLink != null ? Integer.valueOf(this.sendLink.getCredit()) : null);
        return errorContext;
    }

    @Override
    public void onFlow(int creditIssued) {
        this.lastKnownLinkError = null;
        if (creditIssued <= 0) {
            return;
        }
        if (TRACE_LOGGER.isLoggable(Level.FINE)) {
            int numberOfSendsWaitingforCredit = this.pendingSends.size();
            TRACE_LOGGER.log(Level.FINE, String.format(Locale.US, "path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]", this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
        }
        this.linkCredit += creditIssued;
        this.sendWork.onEvent();
    }

    private void recreateSendLink() {
        this.createSendLink();
        this.retryPolicy.incrementRetryCount(this.getClientId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSendWork() {
        Sender sendLinkCurrent = this.sendLink;
        if (sendLinkCurrent.getLocalState() == EndpointState.CLOSED || sendLinkCurrent.getRemoteState() == EndpointState.CLOSED) {
            this.recreateSendLink();
            return;
        }
        while (sendLinkCurrent != null && sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE && this.linkCredit > 0) {
            ReplayableWorkItem<Void> sendData;
            WeightedDeliveryTag deliveryTag;
            Object object = this.pendingSendLock;
            synchronized (object) {
                deliveryTag = this.pendingSends.poll();
                sendData = deliveryTag != null ? this.pendingSendsData.get(deliveryTag.getDeliveryTag()) : null;
            }
            if (sendData != null) {
                if (sendData.getWork() != null && sendData.getWork().isDone()) {
                    this.pendingSendsData.remove(sendData);
                    continue;
                }
                Delivery delivery = null;
                boolean linkAdvance = false;
                int sentMsgSize = 0;
                Exception sendException = null;
                try {
                    delivery = sendLinkCurrent.delivery(deliveryTag.getDeliveryTag().getBytes());
                    delivery.setMessageFormat(sendData.getMessageFormat());
                    sentMsgSize = sendLinkCurrent.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize());
                    assert (sentMsgSize == sendData.getEncodedMessageSize()) : "Contract of the ProtonJ library for Sender.Send API changed";
                    linkAdvance = sendLinkCurrent.advance();
                }
                catch (Exception exception) {
                    sendException = exception;
                }
                if (linkAdvance) {
                    --this.linkCredit;
                    ScheduledFuture<?> timeoutTask = Timer.schedule(new Runnable(){

                        @Override
                        public void run() {
                            if (!sendData.getWork().isDone()) {
                                MessageSender.this.pendingSendsData.remove(deliveryTag);
                                MessageSender.this.throwSenderTimeout(sendData.getWork(), sendData.getLastKnownException());
                            }
                        }
                    }, this.operationTimeout, TimerType.OneTimeRun);
                    sendData.setTimeoutTask(timeoutTask);
                    sendData.setWaitingForAck();
                    continue;
                }
                if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                    TRACE_LOGGER.log(Level.FINE, String.format(Locale.US, "path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed", this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
                }
                if (delivery != null) {
                    delivery.free();
                }
                sendData.getWork().completeExceptionally(sendException != null ? new OperationCancelledException("Send operation failed. Please see cause for more details", (Throwable)sendException) : new OperationCancelledException(String.format(Locale.US, "Send operation failed while advancing delivery(tag: %s) on SendLink(path: %s).", this.sendPath, deliveryTag)));
                continue;
            }
            if (deliveryTag == null || !TRACE_LOGGER.isLoggable(Level.SEVERE)) break;
            TRACE_LOGGER.log(Level.SEVERE, String.format(Locale.US, "path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.", this.sendPath, this.sendLink.getName(), deliveryTag));
            break;
        }
    }

    private void throwSenderTimeout(CompletableFuture<Void> pendingSendWork, Exception lastKnownException) {
        Exception cause = lastKnownException;
        if (lastKnownException == null && this.lastKnownLinkError != null) {
            boolean isServerBusy = this.lastKnownLinkError instanceof ServerBusyException && this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(4L));
            cause = isServerBusy || this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? this.lastKnownLinkError : null;
        }
        boolean isClientSideTimeout = cause == null || !(cause instanceof ServiceBusException);
        ServiceBusException exception = isClientSideTimeout ? new TimeoutException(String.format(Locale.US, "%s %s %s.", SEND_TIMED_OUT, " at ", ZonedDateTime.now(), cause)) : (ServiceBusException)cause;
        ExceptionUtil.completeExceptionally(pendingSendWork, exception, this);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeout) {
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                if (!MessageSender.this.linkClose.isDone()) {
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", MessageSender.this.sendLink.getName(), ZonedDateTime.now()));
                    if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                        TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageSender.this.sendLink.getName(), MessageSender.this.sendPath, "Close"), operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageSender.this.linkClose, operationTimedout, MessageSender.this);
                }
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        if (!this.getIsClosed()) {
            if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
                this.sendLink.close();
                this.scheduleLinkCloseTimeout(TimeoutTracker.create(this.operationTimeout));
            } else if (this.sendLink == null || this.sendLink.getRemoteState() == EndpointState.CLOSED) {
                this.linkClose.complete(null);
            }
        }
        return this.linkClose;
    }

    private static class DeliveryTagComparator
    implements Comparator<WeightedDeliveryTag> {
        private DeliveryTagComparator() {
        }

        @Override
        public int compare(WeightedDeliveryTag deliveryTag0, WeightedDeliveryTag deliveryTag1) {
            return deliveryTag1.getPriority() - deliveryTag0.getPriority();
        }
    }

    private static class WeightedDeliveryTag {
        private final String deliveryTag;
        private final int priority;

        WeightedDeliveryTag(String deliveryTag, int priority) {
            this.deliveryTag = deliveryTag;
            this.priority = priority;
        }

        public String getDeliveryTag() {
            return this.deliveryTag;
        }

        public int getPriority() {
            return this.priority;
        }
    }
}

