/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.ErrorContext;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.PayloadSizeExceededException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.ServerBusyException;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.impl.ActiveClientTokenManager;
import com.microsoft.azure.eventhubs.impl.AmqpException;
import com.microsoft.azure.eventhubs.impl.AmqpSender;
import com.microsoft.azure.eventhubs.impl.AmqpUtil;
import com.microsoft.azure.eventhubs.impl.ClientConstants;
import com.microsoft.azure.eventhubs.impl.ClientEntity;
import com.microsoft.azure.eventhubs.impl.DispatchHandler;
import com.microsoft.azure.eventhubs.impl.ErrorContextProvider;
import com.microsoft.azure.eventhubs.impl.ExceptionUtil;
import com.microsoft.azure.eventhubs.impl.IteratorUtil;
import com.microsoft.azure.eventhubs.impl.MessagingFactory;
import com.microsoft.azure.eventhubs.impl.OperationResult;
import com.microsoft.azure.eventhubs.impl.ReplayableWorkItem;
import com.microsoft.azure.eventhubs.impl.SendLinkHandler;
import com.microsoft.azure.eventhubs.impl.SenderContext;
import com.microsoft.azure.eventhubs.impl.TimeoutTracker;
import com.microsoft.azure.eventhubs.impl.Timer;
import com.microsoft.azure.eventhubs.impl.TrackingUtil;
import java.io.IOException;
import java.io.Serializable;
import java.nio.BufferOverflowException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessageSender
extends ClientEntity
implements AmqpSender,
ErrorContextProvider {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessageSender.class);
    private static final String SEND_TIMED_OUT = "Send operation timed out";
    private static volatile Consumer<MessageSender> onOpenRetry = null;
    private final MessagingFactory underlyingFactory;
    private final String sendPath;
    private final Duration operationTimeout;
    private final RetryPolicy retryPolicy;
    private final CompletableFuture<Void> linkClose;
    private final Object pendingSendLock;
    private final ConcurrentHashMap<String, ReplayableWorkItem<Void>> pendingSendsData;
    private final PriorityQueue<WeightedDeliveryTag> pendingSends;
    private final DispatchHandler sendWork;
    private final ActiveClientTokenManager activeClientTokenManager;
    private final String tokenAudience;
    private final Object errorConditionLock;
    private final Timer timer;
    private volatile int maxMessageSize;
    private volatile Sender sendLink;
    private volatile CompletableFuture<MessageSender> linkFirstOpen;
    private volatile TimeoutTracker openLinkTracker;
    private volatile boolean creatingLink;
    private volatile CompletableFuture<?> closeTimer;
    private volatile CompletableFuture<?> openTimer;
    private Exception lastKnownLinkError;
    private Instant lastKnownErrorReportedAt;
    private String linkCreationTime;

    private MessageSender(MessagingFactory factory, String sendLinkName, String senderPath) {
        super(sendLinkName, factory, factory.executor);
        this.sendPath = senderPath;
        this.underlyingFactory = factory;
        this.operationTimeout = factory.getOperationTimeout();
        this.timer = new Timer(factory);
        this.lastKnownLinkError = null;
        this.lastKnownErrorReportedAt = Instant.EPOCH;
        this.retryPolicy = factory.getRetryPolicy();
        this.maxMessageSize = 262144;
        this.errorConditionLock = new Object();
        this.pendingSendLock = new Object();
        this.pendingSendsData = new ConcurrentHashMap();
        this.pendingSends = new PriorityQueue<WeightedDeliveryTag>(1000, new DeliveryTagComparator());
        this.linkClose = new CompletableFuture();
        this.linkFirstOpen = new CompletableFuture();
        this.openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
        this.sendWork = new DispatchHandler(){

            @Override
            public void onEvent() {
                MessageSender.this.processSendWork();
            }
        };
        this.tokenAudience = String.format("amqp://%s/%s", this.underlyingFactory.getHostName(), this.sendPath);
        this.activeClientTokenManager = new ActiveClientTokenManager(this, new Runnable(){

            @Override
            public void run() {
                MessageSender.this.underlyingFactory.getCBSChannel().sendToken(MessageSender.this.underlyingFactory.getReactorDispatcher(), MessageSender.this.underlyingFactory.getTokenProvider().getToken(MessageSender.this.tokenAudience, ClientConstants.TOKEN_VALIDITY), MessageSender.this.tokenAudience, new OperationResult<Void, Exception>(){

                    @Override
                    public void onComplete(Void result) {
                        if (TRACE_LOGGER.isDebugEnabled()) {
                            TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - token renewed", MessageSender.this.getClientId(), MessageSender.this.sendPath, MessageSender.this.getSendLinkName()));
                        }
                    }

                    @Override
                    public void onError(Exception error) {
                        if (TRACE_LOGGER.isInfoEnabled()) {
                            TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - tokenRenewalFailure[%s]", MessageSender.this.getClientId(), MessageSender.this.sendPath, MessageSender.this.getSendLinkName(), error.getMessage()));
                        }
                    }
                }, exception -> {
                    if (TRACE_LOGGER.isWarnEnabled()) {
                        TRACE_LOGGER.warn(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]", MessageSender.this.getClientId(), MessageSender.this.sendPath, MessageSender.this.getSendLinkName(), exception.getMessage()));
                    }
                });
            }
        }, ClientConstants.TOKEN_REFRESH_INTERVAL, this.underlyingFactory);
    }

    public static CompletableFuture<MessageSender> create(MessagingFactory factory, String sendLinkName, String senderPath) {
        final MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath);
        try {
            msgSender.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                @Override
                public void onEvent() {
                    msgSender.createSendLink();
                }
            });
        }
        catch (IOException | RejectedExecutionException schedulerException) {
            msgSender.linkFirstOpen.completeExceptionally(schedulerException);
        }
        return msgSender.linkFirstOpen;
    }

    public String getSendPath() {
        return this.sendPath;
    }

    public int getMaxMessageSize() {
        return this.maxMessageSize;
    }

    private CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat) {
        return this.send(bytes, arrayOffset, messageFormat, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> sendCore(byte[] bytes, int arrayOffset, int messageFormat, CompletableFuture<Void> onSend, TimeoutTracker tracker, Exception lastKnownError, CompletableFuture<?> timeoutTask) {
        CompletableFuture<?> timeoutTimerTask;
        this.throwIfClosed();
        boolean isRetrySend = onSend != null;
        CompletableFuture<Void> onSendFuture = onSend == null ? new CompletableFuture<Void>() : onSend;
        ReplayableWorkItem<Void> sendWaiterData = tracker == null ? new ReplayableWorkItem<Void>(bytes, arrayOffset, messageFormat, onSendFuture, this.operationTimeout) : new ReplayableWorkItem(bytes, arrayOffset, messageFormat, onSendFuture, tracker);
        TimeoutTracker currentSendTracker = sendWaiterData.getTimeoutTracker();
        String deliveryTag = UUID.randomUUID().toString().replace("-", "") + "_" + currentSendTracker.elapsed().getSeconds();
        if (lastKnownError != null) {
            sendWaiterData.setLastKnownException(lastKnownError);
        }
        if (timeoutTask != null) {
            timeoutTask.cancel(false);
        }
        if ((timeoutTimerTask = this.timer.schedule(new SendTimeout(deliveryTag, sendWaiterData), currentSendTracker.remaining())).isCompletedExceptionally()) {
            timeoutTimerTask.handleAsync((unUsed, exception) -> {
                if (exception != null && !(exception instanceof CancellationException)) {
                    onSendFuture.completeExceptionally(new OperationCancelledException(String.format(Locale.US, "Entity(%s): send failed while dispatching to Reactor, see cause for more details.", this.sendPath), (Throwable)exception));
                }
                return null;
            }, (Executor)this.executor);
            return onSendFuture;
        }
        sendWaiterData.setTimeoutTask(timeoutTimerTask);
        Object object = this.pendingSendLock;
        synchronized (object) {
            this.pendingSendsData.put(deliveryTag, sendWaiterData);
            this.pendingSends.offer(new WeightedDeliveryTag(deliveryTag, isRetrySend ? 1 : 0));
        }
        try {
            this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
        }
        catch (IOException | RejectedExecutionException schedulerException) {
            onSendFuture.completeExceptionally(new OperationCancelledException(String.format(Locale.US, "Entity(%s): send failed while dispatching to Reactor, see cause for more details.", this.sendPath), (Throwable)schedulerException));
        }
        return onSendFuture;
    }

    private CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat, CompletableFuture<Void> onSend, TimeoutTracker tracker) {
        return this.sendCore(bytes, arrayOffset, messageFormat, onSend, tracker, null, null);
    }

    private String getSendLinkName() {
        return this.sendLink == null ? "null" : this.sendLink.getName();
    }

    public CompletableFuture<Void> send(Iterable<Message> messages) {
        int encodedSize;
        if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
            throw new IllegalArgumentException(String.format(Locale.US, "Entity[%s}: sending Empty batch of messages is not allowed.", this.sendPath));
        }
        Message firstMessage = messages.iterator().next();
        if (IteratorUtil.sizeEquals(messages, 1)) {
            return this.send(firstMessage);
        }
        Message batchMessage = Proton.message();
        batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
        int maxMessageSizeTemp = this.maxMessageSize;
        byte[] bytes = new byte[maxMessageSizeTemp];
        int byteArrayOffset = encodedSize = batchMessage.encode(bytes, 0, maxMessageSizeTemp);
        for (Message amqpMessage : messages) {
            Message messageWrappedByData = Proton.message();
            int payloadSize = AmqpUtil.getDataSerializedSize(amqpMessage);
            int allocationSize = Math.min(payloadSize + 512, maxMessageSizeTemp);
            byte[] messageBytes = new byte[allocationSize];
            int messageSizeBytes = amqpMessage.encode(messageBytes, 0, allocationSize);
            messageWrappedByData.setBody((Section)new Data(new Binary(messageBytes, 0, messageSizeBytes)));
            try {
                encodedSize = messageWrappedByData.encode(bytes, byteArrayOffset, maxMessageSizeTemp - byteArrayOffset - 1);
            }
            catch (BufferOverflowException exception) {
                CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
                sendTask.completeExceptionally(new PayloadSizeExceededException(String.format(Locale.US, "Entity(%s): size of the payload exceeded Maximum message size: %s kb", this.sendPath, maxMessageSizeTemp / 1024), (Throwable)exception));
                return sendTask;
            }
            byteArrayOffset += encodedSize;
        }
        return this.send(bytes, byteArrayOffset, -2147404032);
    }

    public CompletableFuture<Void> send(Message msg) {
        int payloadSize = AmqpUtil.getDataSerializedSize(msg);
        int maxMessageSizeTemp = this.maxMessageSize;
        int allocationSize = Math.min(payloadSize + 512, maxMessageSizeTemp);
        byte[] bytes = new byte[allocationSize];
        int encodedSize = 0;
        try {
            encodedSize = msg.encode(bytes, 0, allocationSize);
        }
        catch (BufferOverflowException exception) {
            CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
            sendTask.completeExceptionally(new PayloadSizeExceededException(String.format(Locale.US, "Entity(%s): size of the payload exceeded Maximum message size: %s kb", this.sendPath, maxMessageSizeTemp / 1024), (Throwable)exception));
            return sendTask;
        }
        return this.send(bytes, encodedSize, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onOpenComplete(Exception completionException) {
        this.creatingLink = false;
        if (completionException == null) {
            if (this.getIsClosingOrClosed()) {
                this.sendLink.close();
                return;
            }
            Object object = this.errorConditionLock;
            synchronized (object) {
                this.lastKnownLinkError = null;
            }
            this.retryPolicy.resetRetryCount(this.getClientId());
            UnsignedLong remoteMaxMessageSize = this.sendLink.getRemoteMaxMessageSize();
            if (remoteMaxMessageSize != null) {
                this.maxMessageSize = remoteMaxMessageSize.intValue();
            }
            this.cancelOpenTimer();
            if (TRACE_LOGGER.isInfoEnabled()) {
                TRACE_LOGGER.info(String.format(Locale.US, "onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]", this.getClientId(), this.sendPath, this.getSendLinkName()));
            }
            if (!this.linkFirstOpen.isDone()) {
                this.linkFirstOpen.complete(this);
            } else {
                Object object2 = this.pendingSendLock;
                synchronized (object2) {
                    if (!this.pendingSendsData.isEmpty()) {
                        LinkedList unacknowledgedSends = new LinkedList();
                        unacknowledgedSends.addAll(this.pendingSendsData.keySet());
                        if (unacknowledgedSends.size() > 0) {
                            for (String unacknowledgedSend : unacknowledgedSends) {
                                if (!this.pendingSendsData.get(unacknowledgedSend).isWaitingForAck()) continue;
                                this.pendingSends.offer(new WeightedDeliveryTag(unacknowledgedSend, 1));
                            }
                        }
                        unacknowledgedSends.clear();
                    }
                }
            }
        } else if (!this.linkFirstOpen.isDone()) {
            Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, this.openLinkTracker.remaining());
            if (nextRetryInterval != null) {
                if (onOpenRetry != null) {
                    onOpenRetry.accept(this);
                }
                try {
                    this.underlyingFactory.scheduleOnReactorThread((int)nextRetryInterval.toMillis(), new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            if (!(MessageSender.this.getIsClosingOrClosed() || MessageSender.this.sendLink != null && MessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED && MessageSender.this.sendLink.getRemoteState() != EndpointState.CLOSED)) {
                                MessageSender.this.recreateSendLink();
                            }
                        }
                    });
                }
                catch (IOException | RejectedExecutionException schedulerException) {
                    if (TRACE_LOGGER.isWarnEnabled()) {
                        TRACE_LOGGER.warn(String.format(Locale.US, "clientId[%s], senderPath[%s], scheduling createLink encountered error: %s", this.getClientId(), this.sendPath, schedulerException.getLocalizedMessage()));
                    }
                    this.cancelOpen(schedulerException);
                }
            } else if (completionException instanceof EventHubException) {
                if (!((EventHubException)completionException).getIsTransient()) {
                    this.cancelOpen(completionException);
                }
            } else {
                if (TRACE_LOGGER.isErrorEnabled()) {
                    TRACE_LOGGER.error("Could not open link.", (Throwable)completionException);
                }
                this.cancelOpen(completionException);
            }
        } else {
            this.cancelOpenTimer();
        }
    }

    private void cancelOpen(Exception completionException) {
        this.setClosed();
        ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this);
        this.cancelOpenTimer();
    }

    private void cancelOpenTimer() {
        if (this.openTimer != null && !this.openTimer.isCancelled()) {
            this.openTimer.cancel(false);
        }
    }

    @Override
    public void onClose(ErrorCondition condition, String errorContext) {
        if (this.sendLink != null) {
            this.underlyingFactory.deregisterForConnectionError((Link)this.sendLink);
        }
        Exception completionException = condition != null && condition.getCondition() != null ? ExceptionUtil.toException(condition) : null;
        this.onError(completionException, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(Exception completionException, String failingLinkName) {
        TimeoutTracker tracker;
        if (this.getIsClosingOrClosed()) {
            if (this.closeTimer != null && !this.closeTimer.isDone()) {
                this.closeTimer.cancel(false);
            }
            Object object = this.pendingSendLock;
            synchronized (object) {
                for (Map.Entry<String, ReplayableWorkItem<Void>> pendingSend : this.pendingSendsData.entrySet()) {
                    ExceptionUtil.completeExceptionally(pendingSend.getValue().getWork(), completionException == null ? new OperationCancelledException(String.format(Locale.US, "Entity(%s): send cancelled as the Sender instance is Closed before the sendOperation completed.", this.sendPath)) : completionException, this);
                }
                this.pendingSendsData.clear();
                this.pendingSends.clear();
            }
            this.linkClose.complete(null);
            return;
        }
        Object object = this.errorConditionLock;
        synchronized (object) {
            this.lastKnownLinkError = completionException == null ? this.lastKnownLinkError : completionException;
            this.lastKnownErrorReportedAt = Instant.now();
        }
        Exception finalCompletionException = completionException == null ? new EventHubException(true, String.format(Locale.US, "Entity(%s): client encountered transient error for unknown reasons, please retry the operation.", this.sendPath)) : completionException;
        this.onOpenComplete(finalCompletionException);
        Map.Entry<String, ReplayableWorkItem<Void>> pendingSendEntry = IteratorUtil.getFirst(this.pendingSendsData.entrySet());
        if (pendingSendEntry != null && pendingSendEntry.getValue() != null && (tracker = pendingSendEntry.getValue().getTimeoutTracker()) != null) {
            Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), finalCompletionException, tracker.remaining());
            boolean scheduledRecreate = true;
            if (nextRetryInterval != null) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread((int)nextRetryInterval.toMillis(), new DispatchHandler(){

                        @Override
                        public void onEvent() {
                            if (!(MessageSender.this.getIsClosingOrClosed() || MessageSender.this.sendLink != null && MessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED && MessageSender.this.sendLink.getRemoteState() != EndpointState.CLOSED)) {
                                MessageSender.this.recreateSendLink();
                            }
                        }
                    });
                }
                catch (IOException | RejectedExecutionException ignore) {
                    scheduledRecreate = false;
                }
            }
            if (nextRetryInterval == null || !scheduledRecreate) {
                Object object2 = this.pendingSendLock;
                synchronized (object2) {
                    for (Map.Entry<String, ReplayableWorkItem<Void>> pendingSend : this.pendingSendsData.entrySet()) {
                        this.cleanupFailedSend(pendingSend.getValue(), finalCompletionException);
                    }
                    this.pendingSendsData.clear();
                    this.pendingSends.clear();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSendComplete(Delivery delivery) {
        ReplayableWorkItem<Void> pendingSendWorkItem;
        DeliveryState outcome = delivery.getRemoteState();
        String deliveryTag = new String(delivery.getTag(), StandardCharsets.UTF_8);
        if (TRACE_LOGGER.isTraceEnabled()) {
            TRACE_LOGGER.trace(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s]", this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
        }
        if ((pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag)) != null) {
            if (outcome instanceof Accepted) {
                Object object = this.errorConditionLock;
                synchronized (object) {
                    this.lastKnownLinkError = null;
                }
                this.retryPolicy.resetRetryCount(this.getClientId());
                pendingSendWorkItem.getTimeoutTask().cancel(false);
                pendingSendWorkItem.clearMessage();
                pendingSendWorkItem.getWork().complete(null);
            } else if (outcome instanceof Rejected) {
                Duration retryInterval;
                Rejected rejected = (Rejected)outcome;
                ErrorCondition error = rejected.getError();
                Exception exception = ExceptionUtil.toException(error);
                if (ExceptionUtil.isGeneralSendError(error.getCondition())) {
                    Object object = this.errorConditionLock;
                    synchronized (object) {
                        this.lastKnownLinkError = exception;
                        this.lastKnownErrorReportedAt = Instant.now();
                    }
                }
                if ((retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining())) == null) {
                    this.cleanupFailedSend(pendingSendWorkItem, exception);
                } else {
                    pendingSendWorkItem.setLastKnownException(exception);
                    try {
                        this.underlyingFactory.scheduleOnReactorThread((int)retryInterval.toMillis(), new DispatchHandler(){

                            @Override
                            public void onEvent() {
                                MessageSender.this.sendCore(pendingSendWorkItem.getMessage(), pendingSendWorkItem.getEncodedMessageSize(), pendingSendWorkItem.getMessageFormat(), pendingSendWorkItem.getWork(), pendingSendWorkItem.getTimeoutTracker(), pendingSendWorkItem.getLastKnownException(), pendingSendWorkItem.getTimeoutTask());
                            }
                        });
                    }
                    catch (IOException | RejectedExecutionException schedulerException) {
                        exception.initCause(schedulerException);
                        this.cleanupFailedSend(pendingSendWorkItem, new EventHubException(false, String.format(Locale.US, "Entity(%s): send operation failed while scheduling a retry on Reactor, see cause for more details.", this.sendPath), schedulerException));
                    }
                }
            } else if (outcome instanceof Released) {
                this.cleanupFailedSend(pendingSendWorkItem, new OperationCancelledException(outcome.toString()));
            } else {
                this.cleanupFailedSend(pendingSendWorkItem, new EventHubException(false, outcome.toString()));
            }
        } else if (TRACE_LOGGER.isDebugEnabled()) {
            TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s]. path[%s], linkName[%s], delivery[%s] - mismatch (or send timed out)", this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
        }
    }

    private void cleanupFailedSend(ReplayableWorkItem<Void> failedSend, Exception exception) {
        if (failedSend.getTimeoutTask() != null) {
            failedSend.getTimeoutTask().cancel(false);
        }
        ExceptionUtil.completeExceptionally(failedSend.getWork(), exception, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createSendLink() {
        Object object = this.errorConditionLock;
        synchronized (object) {
            if (this.creatingLink) {
                if (TRACE_LOGGER.isInfoEnabled()) {
                    TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s], path[%s], operationTimeout[%s], creating a send link is already in progress", this.getClientId(), this.sendPath, this.operationTimeout));
                }
                return;
            }
            this.creatingLink = true;
        }
        if (TRACE_LOGGER.isInfoEnabled()) {
            TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s], path[%s], operationTimeout[%s], creating a send link", this.getClientId(), this.sendPath, this.operationTimeout));
        }
        this.linkCreationTime = Instant.now().toString();
        this.scheduleLinkOpenTimeout(TimeoutTracker.create(this.operationTimeout));
        final Consumer<Session> onSessionOpen = new Consumer<Session>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void accept(Session session) {
                if (MessageSender.this.getIsClosingOrClosed()) {
                    session.close();
                    return;
                }
                Sender sender = session.sender(TrackingUtil.getLinkName(session));
                Target target = new Target();
                target.setAddress(MessageSender.this.sendPath);
                sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
                Source source = new Source();
                sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                SendLinkHandler handler = new SendLinkHandler(MessageSender.this, MessageSender.this.getClientId(), ((MessageSender)MessageSender.this).underlyingFactory.executor);
                BaseHandler.setHandler((Extendable)sender, (Handler)handler);
                if (MessageSender.this.sendLink != null) {
                    MessageSender.this.underlyingFactory.deregisterForConnectionError((Link)MessageSender.this.sendLink);
                }
                MessageSender.this.underlyingFactory.registerForConnectionError((Link)sender);
                sender.open();
                Object object = MessageSender.this.errorConditionLock;
                synchronized (object) {
                    MessageSender.this.sendLink = sender;
                }
            }
        };
        final BiConsumer<ErrorCondition, Exception> onSessionOpenError = new BiConsumer<ErrorCondition, Exception>(){

            @Override
            public void accept(ErrorCondition t, Exception u) {
                if (t != null) {
                    MessageSender.this.onError(t.getCondition() != null ? ExceptionUtil.toException(t) : null, null);
                } else if (u != null) {
                    MessageSender.this.onError(u, null);
                }
            }
        };
        this.underlyingFactory.getCBSChannel().sendToken(this.underlyingFactory.getReactorDispatcher(), this.underlyingFactory.getTokenProvider().getToken(this.tokenAudience, ClientConstants.TOKEN_VALIDITY), this.tokenAudience, new OperationResult<Void, Exception>(){

            @Override
            public void onComplete(Void result) {
                if (MessageSender.this.getIsClosingOrClosed()) {
                    return;
                }
                MessageSender.this.underlyingFactory.getSession(MessageSender.this.sendPath, onSessionOpen, onSessionOpenError);
            }

            @Override
            public void onError(Exception error) {
                Exception completionException;
                if (error != null && error instanceof AmqpException) {
                    completionException = ExceptionUtil.toException(((AmqpException)error).getError());
                    if (completionException != error && completionException.getCause() == null) {
                        completionException.initCause(error);
                    }
                } else {
                    completionException = error;
                }
                MessageSender.this.onError(completionException, null);
            }
        }, exception -> this.onError((Exception)exception, null));
    }

    private void scheduleLinkOpenTimeout(TimeoutTracker timeout) {
        this.openTimer = this.timer.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                MessageSender.this.creatingLink = false;
                if (!MessageSender.this.linkFirstOpen.isDone()) {
                    Exception lastReportedError;
                    Object object = MessageSender.this.errorConditionLock;
                    synchronized (object) {
                        lastReportedError = MessageSender.this.lastKnownLinkError;
                    }
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "Open operation on entity(%s) timed out at %s.", MessageSender.this.getSendPath(), ZonedDateTime.now().toString()), (Throwable)lastReportedError);
                    if (TRACE_LOGGER.isWarnEnabled()) {
                        TRACE_LOGGER.warn(String.format(Locale.US, "clientId[%s], path[%s], open call timed out", MessageSender.this.getClientId(), MessageSender.this.sendPath), (Throwable)operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageSender.this.linkFirstOpen, operationTimedout, MessageSender.this);
                    MessageSender.this.setClosed();
                }
            }
        }, timeout.remaining());
        this.openTimer.handleAsync((unUsed, exception) -> {
            if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) {
                ExceptionUtil.completeExceptionally(this.linkFirstOpen, (Exception)exception, this);
            }
            return null;
        }, (Executor)this.executor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ErrorContext getContext() {
        boolean isLinkOpened;
        Sender link;
        Object object = this.errorConditionLock;
        synchronized (object) {
            link = this.sendLink;
        }
        boolean bl = isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone();
        String referenceId = link != null && link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? link.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() : (link != null ? link.getName() : null);
        SenderContext errorContext = new SenderContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, referenceId, isLinkOpened && link != null ? Integer.valueOf(link.getCredit()) : null);
        return errorContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onFlow(int creditIssued) {
        Object object = this.errorConditionLock;
        synchronized (object) {
            this.lastKnownLinkError = null;
        }
        if (creditIssued <= 0) {
            return;
        }
        if (TRACE_LOGGER.isDebugEnabled()) {
            int numberOfSendsWaitingforCredit = this.pendingSends.size();
            TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]", this.getClientId(), this.sendPath, this.getSendLinkName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
        }
        this.sendWork.onEvent();
    }

    private void recreateSendLink() {
        this.createSendLink();
        this.retryPolicy.incrementRetryCount(this.getClientId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSendWork() {
        if (this.sendLink == null || this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) {
            if (!this.getIsClosingOrClosed()) {
                this.recreateSendLink();
            }
            return;
        }
        while (this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE && this.sendLink.getCredit() > 0) {
            ReplayableWorkItem<Void> sendData;
            String deliveryTag;
            Object object = this.pendingSendLock;
            synchronized (object) {
                WeightedDeliveryTag weightedDelivery = this.pendingSends.poll();
                if (weightedDelivery != null) {
                    deliveryTag = weightedDelivery.getDeliveryTag();
                    sendData = this.pendingSendsData.get(deliveryTag);
                } else {
                    sendData = null;
                    deliveryTag = null;
                }
            }
            if (sendData != null) {
                if (sendData.getWork() != null && sendData.getWork().isDone()) {
                    this.pendingSendsData.remove(deliveryTag);
                    continue;
                }
                Delivery delivery = null;
                boolean linkAdvance = false;
                int sentMsgSize = 0;
                Exception sendException = null;
                try {
                    delivery = this.sendLink.delivery(deliveryTag.getBytes(StandardCharsets.UTF_8));
                    delivery.setMessageFormat(sendData.getMessageFormat());
                    sentMsgSize = this.sendLink.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize());
                    assert (sentMsgSize == sendData.getEncodedMessageSize()) : "Contract of the ProtonJ library for Sender.Send API changed";
                    linkAdvance = this.sendLink.advance();
                }
                catch (Exception exception) {
                    sendException = exception;
                }
                if (linkAdvance) {
                    sendData.setWaitingForAck();
                    continue;
                }
                if (TRACE_LOGGER.isDebugEnabled()) {
                    TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed", this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
                }
                if (delivery != null) {
                    delivery.free();
                }
                sendData.getWork().completeExceptionally(sendException != null ? new OperationCancelledException(String.format(Locale.US, "Entity(%s): send operation failed. Please see cause for more details", this.sendPath), (Throwable)sendException) : new OperationCancelledException(String.format(Locale.US, "Entity(%s): send operation failed while advancing delivery(tag: %s).", this.sendPath, deliveryTag)));
                continue;
            }
            if (deliveryTag == null || !TRACE_LOGGER.isDebugEnabled()) break;
            TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.", this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void throwSenderTimeout(CompletableFuture<Void> pendingSendWork, Exception lastKnownException) {
        Exception cause = lastKnownException;
        if (lastKnownException == null) {
            Instant lastLinkErrorReportedAt;
            Exception lastReportedLinkLevelError;
            Object object = this.errorConditionLock;
            synchronized (object) {
                lastReportedLinkLevelError = this.lastKnownLinkError;
                lastLinkErrorReportedAt = this.lastKnownErrorReportedAt;
            }
            if (lastReportedLinkLevelError != null) {
                boolean isServerBusy = lastReportedLinkLevelError instanceof ServerBusyException && lastLinkErrorReportedAt.isAfter(Instant.now().minusSeconds(4L));
                cause = isServerBusy || lastLinkErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? lastReportedLinkLevelError : null;
            }
        }
        boolean isClientSideTimeout = cause == null || !(cause instanceof EventHubException);
        EventHubException exception = isClientSideTimeout ? new TimeoutException(String.format(Locale.US, "Entity(%s): %s at %s.", this.sendPath, SEND_TIMED_OUT, ZonedDateTime.now()), (Throwable)cause) : (EventHubException)cause;
        ExceptionUtil.completeExceptionally(pendingSendWork, exception, this);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeout) {
        this.closeTimer = this.timer.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (!MessageSender.this.linkClose.isDone()) {
                    Sender link;
                    Object object = MessageSender.this.errorConditionLock;
                    synchronized (object) {
                        link = MessageSender.this.sendLink;
                    }
                    TimeoutException operationTimedout = new TimeoutException(String.format(Locale.US, "Entity(%s): close operation timed out at %s", MessageSender.this.sendPath, ZonedDateTime.now()));
                    if (TRACE_LOGGER.isInfoEnabled()) {
                        TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s], message sender(linkName: %s, path: %s) close call timed out", MessageSender.this.getClientId(), link.getName(), MessageSender.this.sendPath), (Throwable)operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageSender.this.linkClose, operationTimedout, MessageSender.this);
                    MessageSender.this.onError(null, null);
                }
            }
        }, timeout.remaining());
        this.closeTimer.handleAsync((unUsed, exception) -> {
            if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) {
                ExceptionUtil.completeExceptionally(this.linkClose, (Exception)exception, this);
            }
            return null;
        }, (Executor)this.executor);
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        if (!this.getIsClosed()) {
            try {
                this.activeClientTokenManager.cancel();
                this.scheduleLinkCloseTimeout(TimeoutTracker.create(this.operationTimeout));
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler(){

                    @Override
                    public void onEvent() {
                        if (MessageSender.this.sendLink != null && MessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
                            MessageSender.this.sendLink.close();
                        } else if (MessageSender.this.sendLink == null || MessageSender.this.sendLink.getRemoteState() == EndpointState.CLOSED) {
                            if (MessageSender.this.closeTimer != null && !MessageSender.this.closeTimer.isCancelled()) {
                                MessageSender.this.closeTimer.cancel(false);
                            }
                            MessageSender.this.linkClose.complete(null);
                        }
                    }
                });
            }
            catch (IOException | RejectedExecutionException schedulerException) {
                this.linkClose.completeExceptionally(schedulerException);
            }
        }
        return this.linkClose;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected Exception getLastKnownError() {
        Object object = this.errorConditionLock;
        synchronized (object) {
            return this.lastKnownLinkError;
        }
    }

    private class SendTimeout
    implements Runnable {
        private final String deliveryTag;
        private final ReplayableWorkItem<Void> sendWaiterData;

        SendTimeout(String deliveryTag, ReplayableWorkItem<Void> sendWaiterData) {
            this.sendWaiterData = sendWaiterData;
            this.deliveryTag = deliveryTag;
        }

        @Override
        public void run() {
            if (!this.sendWaiterData.getWork().isDone()) {
                MessageSender.this.pendingSendsData.remove(this.deliveryTag);
                MessageSender.this.throwSenderTimeout(this.sendWaiterData.getWork(), this.sendWaiterData.getLastKnownException());
            }
        }
    }

    private static class DeliveryTagComparator
    implements Comparator<WeightedDeliveryTag>,
    Serializable {
        private static final long serialVersionUID = -7057500582037295635L;

        private DeliveryTagComparator() {
        }

        @Override
        public int compare(WeightedDeliveryTag deliveryTag0, WeightedDeliveryTag deliveryTag1) {
            return deliveryTag1.getPriority() - deliveryTag0.getPriority();
        }
    }

    private static class WeightedDeliveryTag {
        private final String deliveryTag;
        private final int priority;

        WeightedDeliveryTag(String deliveryTag, int priority) {
            this.deliveryTag = deliveryTag;
            this.priority = priority;
        }

        public String getDeliveryTag() {
            return this.deliveryTag;
        }

        public int getPriority() {
            return this.priority;
        }
    }
}

