package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.TransactionContext;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/servicebus/primitives/CoreMessageSender.class */
public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErrorContextProvider {
    private static final Logger TRACE_LOGGER;
    private static final String SEND_TIMED_OUT = "Send operation timed out";
    private static final Duration LINK_REOPEN_TIMEOUT;
    private final Object requestResonseLinkCreationLock;
    private final MessagingFactory underlyingFactory;
    private final String sendPath;
    private final String sasTokenAudienceURI;
    private final Duration operationTimeout;
    private final RetryPolicy retryPolicy;
    private final CompletableFuture<Void> linkClose;
    private final Object pendingSendLock;
    private final ConcurrentHashMap<String, SendWorkItem<DeliveryState>> pendingSendsData;
    private final PriorityQueue<WeightedDeliveryTag> pendingSends;
    private final DispatchHandler sendWork;
    private final MessagingEntityType entityType;
    private boolean isSendLoopRunning;
    private Sender sendLink;
    private RequestResponseLink requestResponseLink;
    private CompletableFuture<CoreMessageSender> linkFirstOpen;
    private int linkCredit;
    private Exception lastKnownLinkError;
    private Instant lastKnownErrorReportedAt;
    private ScheduledFuture<?> sasTokenRenewTimerFuture;
    private CompletableFuture<Void> requestResponseLinkCreationFuture;
    private CompletableFuture<Void> sendLinkReopenFuture;
    private SenderLinkSettings linkSettings;
    private String transferDestinationPath;
    private String transferSasTokenAudienceURI;
    private boolean isSendVia;
    private int maxMessageSize;
    private boolean shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/microsoft/azure/servicebus/primitives/CoreMessageSender$DeliveryTagComparator.class */
    private static class DeliveryTagComparator implements Comparator<WeightedDeliveryTag>, Serializable {
        private static final long serialVersionUID = -7057500582037295636L;

        private DeliveryTagComparator() {
        }

        @Override // java.util.Comparator
        public int compare(WeightedDeliveryTag weightedDeliveryTag, WeightedDeliveryTag weightedDeliveryTag2) {
            return weightedDeliveryTag2.getPriority() - weightedDeliveryTag.getPriority();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/servicebus/primitives/CoreMessageSender$WeightedDeliveryTag.class */
    public static class WeightedDeliveryTag {
        private final String deliveryTag;
        private final int priority;

        WeightedDeliveryTag(String str, int i) {
            this.deliveryTag = str;
            this.priority = i;
        }

        public String getDeliveryTag() {
            return this.deliveryTag;
        }

        public int getPriority() {
            return this.priority;
        }
    }

    @Deprecated
    public static CompletableFuture<CoreMessageSender> create(MessagingFactory messagingFactory, String str, String str2, String str3) {
        return create(messagingFactory, str, str2, str3, null);
    }

    public static CompletableFuture<CoreMessageSender> create(MessagingFactory messagingFactory, String str, String str2, String str3, MessagingEntityType messagingEntityType) {
        return create(messagingFactory, str, messagingEntityType, getDefaultLinkProperties(str2, str3, messagingFactory, messagingEntityType));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<CoreMessageSender> create(MessagingFactory messagingFactory, String str, MessagingEntityType messagingEntityType, SenderLinkSettings senderLinkSettings) {
        TRACE_LOGGER.info("Creating core message sender to '{}'", senderLinkSettings.linkPath);
        Connection activeConnectionCreateIfNecessary = messagingFactory.getActiveConnectionCreateIfNecessary();
        String concat = "Sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
        senderLinkSettings.linkName = !StringUtil.isNullOrEmpty(activeConnectionCreateIfNecessary.getRemoteContainer()) ? concat.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(activeConnectionCreateIfNecessary.getRemoteContainer()) : concat;
        CoreMessageSender coreMessageSender = new CoreMessageSender(messagingFactory, str, messagingEntityType, senderLinkSettings);
        coreMessageSender.initializeLinkOpen(TimeoutTracker.create(messagingFactory.getOperationTimeout()));
        (senderLinkSettings.requiresAuthentication ? coreMessageSender.sendTokenAndSetRenewTimer(false) : CompletableFuture.completedFuture(null)).handleAsync((r8, th) -> {
            if (th != null) {
                Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                TRACE_LOGGER.info("Sending SAS Token to '{}' failed.", coreMessageSender.sendPath, extractAsyncCompletionCause);
                coreMessageSender.linkFirstOpen.completeExceptionally(extractAsyncCompletionCause);
                return null;
            }
            try {
                coreMessageSender.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageSender.1
                    @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                    public void onEvent() {
                        CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings);
                    }
                });
                return null;
            } catch (IOException e) {
                coreMessageSender.cancelSASTokenRenewTimer();
                coreMessageSender.linkFirstOpen.completeExceptionally(new ServiceBusException(false, "Failed to create Sender, see cause for more details.", e));
                return null;
            }
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        return coreMessageSender.linkFirstOpen;
    }

    private CompletableFuture<Void> createRequestResponseLink() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.requestResonseLinkCreationLock) {
            if (this.requestResponseLinkCreationFuture == null) {
                this.requestResponseLinkCreationFuture = new CompletableFuture<>();
                this.underlyingFactory.obtainRequestResponseLinkAsync(this.sendPath, this.transferDestinationPath, this.entityType).handleAsync((requestResponseLink, th) -> {
                    if (th == null) {
                        this.requestResponseLink = requestResponseLink;
                        this.requestResponseLinkCreationFuture.complete(null);
                        return null;
                    }
                    this.requestResponseLinkCreationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(th));
                    synchronized (this.requestResonseLinkCreationLock) {
                        this.requestResponseLinkCreationFuture = null;
                    }
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            }
            completableFuture = this.requestResponseLinkCreationFuture;
        }
        return completableFuture;
    }

    private void closeRequestResponseLink() {
        synchronized (this.requestResonseLinkCreationLock) {
            if (this.requestResponseLinkCreationFuture != null) {
                this.requestResponseLinkCreationFuture.thenRun(() -> {
                    this.underlyingFactory.releaseRequestResponseLink(this.sendPath, this.transferDestinationPath);
                    this.requestResponseLink = null;
                });
                this.requestResponseLinkCreationFuture = null;
            }
        }
    }

    private CoreMessageSender(MessagingFactory messagingFactory, String str, MessagingEntityType messagingEntityType, SenderLinkSettings senderLinkSettings) {
        super(str);
        this.requestResonseLinkCreationLock = new Object();
        this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
        this.sendPath = senderLinkSettings.linkPath;
        this.entityType = messagingEntityType;
        if (senderLinkSettings.linkProperties != null) {
            String str2 = (String) senderLinkSettings.linkProperties.getOrDefault(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, null);
            if (str2 == null || str2.isEmpty()) {
                this.transferDestinationPath = null;
            } else {
                this.transferDestinationPath = str2;
                this.isSendVia = true;
                this.transferSasTokenAudienceURI = String.format("amqp://%s/%s", messagingFactory.getHostName(), this.transferDestinationPath);
            }
        }
        this.sasTokenAudienceURI = String.format("amqp://%s/%s", messagingFactory.getHostName(), senderLinkSettings.linkPath);
        this.underlyingFactory = messagingFactory;
        this.operationTimeout = messagingFactory.getOperationTimeout();
        this.linkSettings = senderLinkSettings;
        this.lastKnownLinkError = null;
        this.lastKnownErrorReportedAt = Instant.EPOCH;
        this.retryPolicy = messagingFactory.getRetryPolicy();
        this.pendingSendLock = new Object();
        this.pendingSendsData = new ConcurrentHashMap<>();
        this.pendingSends = new PriorityQueue<>(1000, new DeliveryTagComparator());
        this.linkCredit = 0;
        this.linkClose = new CompletableFuture<>();
        this.sendLinkReopenFuture = null;
        this.isSendLoopRunning = false;
        this.sendWork = new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageSender.2
            @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
            public void onEvent() {
                CoreMessageSender.this.processSendWork();
            }
        };
    }

    public String getSendPath() {
        return this.sendPath;
    }

    private static String generateRandomDeliveryTag() {
        return UUID.randomUUID().toString().replace("-", StringUtil.EMPTY);
    }

    CompletableFuture<DeliveryState> sendCoreAsync(byte[] bArr, int i, int i2, TransactionContext transactionContext) {
        throwIfClosed(this.lastKnownLinkError);
        TRACE_LOGGER.debug("Sending message to '{}'", this.sendPath);
        String generateRandomDeliveryTag = generateRandomDeliveryTag();
        CompletableFuture<DeliveryState> completableFuture = new CompletableFuture<>();
        SendWorkItem<DeliveryState> sendWorkItem = new SendWorkItem<>(bArr, i, i2, generateRandomDeliveryTag, transactionContext, completableFuture, this.operationTimeout);
        enlistSendRequest(generateRandomDeliveryTag, sendWorkItem, false);
        scheduleSendTimeout(sendWorkItem);
        return completableFuture;
    }

    private void scheduleSendTimeout(SendWorkItem<DeliveryState> sendWorkItem) {
        sendWorkItem.setTimeoutTask(Timer.schedule(() -> {
            if (sendWorkItem.getWork().isDone()) {
                return;
            }
            TRACE_LOGGER.info("Delivery '{}' to '{}' did not receive ack from service. Throwing timeout.", sendWorkItem.getDeliveryTag(), this.sendPath);
            this.pendingSendsData.remove(sendWorkItem.getDeliveryTag());
            throwSenderTimeout(sendWorkItem.getWork(), sendWorkItem.getLastKnownException());
        }, sendWorkItem.getTimeoutTracker().remaining(), TimerType.OneTimeRun));
    }

    private void enlistSendRequest(String str, SendWorkItem<DeliveryState> sendWorkItem, boolean z) {
        synchronized (this.pendingSendLock) {
            this.pendingSendsData.put(str, sendWorkItem);
            this.pendingSends.offer(new WeightedDeliveryTag(str, z ? 1 : 0));
            if (!this.isSendLoopRunning) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(this.sendWork);
                } catch (IOException e) {
                    AsyncUtil.completeFutureExceptionally(sendWorkItem.getWork(), new ServiceBusException(false, "Send failed while dispatching to Reactor, see cause for more details.", e));
                }
            }
        }
    }

    private void reSendAsync(String str, SendWorkItem<DeliveryState> sendWorkItem, boolean z) {
        if (sendWorkItem.getWork().isDone() || !sendWorkItem.cancelTimeoutTask(false)) {
            return;
        }
        Duration remaining = sendWorkItem.getTimeoutTracker().remaining();
        if (remaining.isNegative() || remaining.isZero()) {
            return;
        }
        if (!z) {
            str = generateRandomDeliveryTag();
            sendWorkItem.setDeliveryTag(str);
        }
        enlistSendRequest(str, sendWorkItem, true);
        scheduleSendTimeout(sendWorkItem);
    }

    public CompletableFuture<Void> sendAsync(Iterable<Message> iterable, TransactionContext transactionContext) {
        if (iterable == null || IteratorUtil.sizeEquals(iterable, 0)) {
            throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
        }
        TRACE_LOGGER.debug("Sending a batch of messages to '{}'", this.sendPath);
        Message next = iterable.iterator().next();
        if (IteratorUtil.sizeEquals(iterable, 1)) {
            return sendAsync(next, transactionContext);
        }
        Message message = Proton.message();
        message.setMessageAnnotations(next.getMessageAnnotations());
        if (StringUtil.isNullOrWhiteSpace((String) next.getMessageId())) {
            message.setMessageId(next.getMessageId());
        }
        if (StringUtil.isNullOrWhiteSpace(next.getGroupId())) {
            message.setGroupId(next.getGroupId());
        }
        try {
            Pair<byte[], Integer> encodeMessageToMaxSizeArray = Util.encodeMessageToMaxSizeArray(message, this.maxMessageSize);
            byte[] firstItem = encodeMessageToMaxSizeArray.getFirstItem();
            int intValue = encodeMessageToMaxSizeArray.getSecondItem().intValue();
            for (Message message2 : iterable) {
                Message message3 = Proton.message();
                Pair<byte[], Integer> encodeMessageToOptimalSizeArray = Util.encodeMessageToOptimalSizeArray(message2, this.maxMessageSize);
                message3.setBody(new Data(new Binary(encodeMessageToOptimalSizeArray.getFirstItem(), 0, encodeMessageToOptimalSizeArray.getSecondItem().intValue())));
                intValue += Util.encodeMessageToCustomArray(message3, firstItem, intValue, (this.maxMessageSize - intValue) - 1);
            }
            return sendCoreAsync(firstItem, intValue, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT, transactionContext).thenAccept(deliveryState -> {
            });
        } catch (PayloadSizeExceededException e) {
            TRACE_LOGGER.info("Payload size of batch of messages exceeded limit", e);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    public CompletableFuture<Void> sendAsync(Message message, TransactionContext transactionContext) {
        return sendAndReturnDeliveryStateAsync(message, transactionContext).thenAccept(deliveryState -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<DeliveryState> sendAndReturnDeliveryStateAsync(Message message, TransactionContext transactionContext) {
        try {
            Pair<byte[], Integer> encodeMessageToOptimalSizeArray = Util.encodeMessageToOptimalSizeArray(message, this.maxMessageSize);
            return sendCoreAsync(encodeMessageToOptimalSizeArray.getFirstItem(), encodeMessageToOptimalSizeArray.getSecondItem().intValue(), 0, transactionContext);
        } catch (PayloadSizeExceededException e) {
            TRACE_LOGGER.info("Payload size of message exceeded limit", e);
            CompletableFuture<DeliveryState> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onOpenComplete(Exception exc) {
        this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
        if (exc != null) {
            cancelSASTokenRenewTimer();
            if (!this.linkFirstOpen.isDone()) {
                TRACE_LOGGER.info("Opening send link '{}' to '{}' failed", new Object[]{this.sendLink.getName(), this.sendPath, exc});
                setClosed();
                ExceptionUtil.completeExceptionally(this.linkFirstOpen, exc, this, true);
            }
            if (this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) {
                return;
            }
            TRACE_LOGGER.info("Opening send link '{}' to '{}' failed", new Object[]{this.sendLink.getName(), this.sendPath, exc});
            AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, exc);
            return;
        }
        this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
        this.lastKnownLinkError = null;
        this.retryPolicy.resetRetryCount(getClientId());
        if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
            AsyncUtil.completeFuture(this.sendLinkReopenFuture, null);
        }
        if (!this.linkFirstOpen.isDone()) {
            TRACE_LOGGER.info("Opened send link to '{}'", this.sendPath);
            AsyncUtil.completeFuture(this.linkFirstOpen, this);
            return;
        }
        synchronized (this.pendingSendLock) {
            if (!this.pendingSendsData.isEmpty()) {
                LinkedList linkedList = new LinkedList();
                linkedList.addAll(this.pendingSendsData.keySet());
                if (linkedList.size() > 0) {
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        if (this.pendingSendsData.get(str).isWaitingForAck()) {
                            this.pendingSends.offer(new WeightedDeliveryTag(str, 1));
                        }
                    }
                }
                linkedList.clear();
            }
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onClose(ErrorCondition errorCondition) {
        onError(errorCondition != null ? ExceptionUtil.toException(errorCondition) : new ServiceBusException(true, "The entity has been closed due to transient failures (underlying link closed), please retry the operation."));
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onError(Exception exc) {
        TimeoutTracker timeoutTracker;
        Duration nextRetryInterval;
        this.linkCredit = 0;
        if (getIsClosingOrClosed()) {
            clearAllPendingSendsWithException(exc == null ? new OperationCancelledException("Send cancelled as the Sender instance is Closed before the sendOperation completed.") : exc);
            TRACE_LOGGER.info("Send link to '{}' closed", this.sendPath);
            AsyncUtil.completeFuture(this.linkClose, null);
            return;
        }
        this.underlyingFactory.deregisterForConnectionError(this.sendLink);
        this.lastKnownLinkError = exc;
        this.lastKnownErrorReportedAt = Instant.now();
        onOpenComplete(exc);
        if (exc != null && (!(exc instanceof ServiceBusException) || !((ServiceBusException) exc).getIsTransient())) {
            TRACE_LOGGER.info("Send link '{}' to '{}' closed. Failing all pending send requests.", this.sendLink.getName(), this.sendPath);
            clearAllPendingSendsWithException(exc);
            return;
        }
        Map.Entry entry = (Map.Entry) IteratorUtil.getFirst(this.pendingSendsData.entrySet());
        if (entry == null || entry.getValue() == null || (timeoutTracker = ((SendWorkItem) entry.getValue()).getTimeoutTracker()) == null || (nextRetryInterval = this.retryPolicy.getNextRetryInterval(getClientId(), exc, timeoutTracker.remaining())) == null) {
            return;
        }
        TRACE_LOGGER.info("Send link '{}' to '{}' closed. Will retry link creation after '{}'.", new Object[]{this.sendLink.getName(), this.sendPath, nextRetryInterval});
        Timer.schedule(() -> {
            ensureLinkIsOpen();
        }, nextRetryInterval, TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpSender
    public void onSendComplete(Delivery delivery) {
        DeliveryState remoteState = delivery.getRemoteState();
        String str = new String(delivery.getTag(), StandardCharsets.UTF_8);
        TRACE_LOGGER.debug("Received ack for delivery. path:{}, linkName:{}, deliveryTag:{}, outcome:{}", new Object[]{this.sendPath, this.sendLink.getName(), str, remoteState});
        SendWorkItem<DeliveryState> remove = this.pendingSendsData.remove(str);
        if (remove == null) {
            TRACE_LOGGER.info("Delivery mismatch. path:{}, linkName:{}, delivery:{}", new Object[]{this.sendPath, this.sendLink.getName(), str});
            return;
        }
        if (remoteState instanceof TransactionalState) {
            TRACE_LOGGER.trace("State of delivery is Transactional, retrieving outcome: {}", remoteState);
            DeliveryState outcome = ((TransactionalState) remoteState).getOutcome();
            if (!(outcome instanceof DeliveryState)) {
                cleanupFailedSend(remove, new ServiceBusException(false, "Unknown delivery state: " + remoteState.toString()));
                return;
            }
            remoteState = outcome;
        }
        if (remoteState instanceof Accepted) {
            this.lastKnownLinkError = null;
            this.retryPolicy.resetRetryCount(getClientId());
            remove.cancelTimeoutTask(false);
            AsyncUtil.completeFuture(remove.getWork(), remoteState);
            return;
        }
        if (remoteState instanceof Declared) {
            AsyncUtil.completeFuture(remove.getWork(), remoteState);
            return;
        }
        if (!(remoteState instanceof Rejected)) {
            if (remoteState instanceof Released) {
                cleanupFailedSend(remove, new OperationCancelledException(remoteState.toString()));
                return;
            } else {
                cleanupFailedSend(remove, new ServiceBusException(false, remoteState.toString()));
                return;
            }
        }
        ErrorCondition error = ((Rejected) remoteState).getError();
        Exception exception = ExceptionUtil.toException(error);
        if (ExceptionUtil.isGeneralError(error.getCondition())) {
            this.lastKnownLinkError = exception;
            this.lastKnownErrorReportedAt = Instant.now();
        }
        Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(getClientId(), exception, remove.getTimeoutTracker().remaining());
        if (nextRetryInterval == null) {
            cleanupFailedSend(remove, exception);
            return;
        }
        TRACE_LOGGER.info("Send failed for delivery '{}'. Will retry after '{}'", str, nextRetryInterval);
        remove.setLastKnownException(exception);
        Timer.schedule(() -> {
            reSendAsync(str, remove, false);
        }, nextRetryInterval, TimerType.OneTimeRun);
    }

    private void clearAllPendingSendsWithException(Throwable th) {
        synchronized (this.pendingSendLock) {
            Iterator<Map.Entry<String, SendWorkItem<DeliveryState>>> it = this.pendingSendsData.entrySet().iterator();
            while (it.hasNext()) {
                cleanupFailedSend(it.next().getValue(), th);
            }
            this.pendingSendsData.clear();
            this.pendingSends.clear();
        }
    }

    private void cleanupFailedSend(SendWorkItem<DeliveryState> sendWorkItem, Throwable th) {
        sendWorkItem.cancelTimeoutTask(false);
        ExceptionUtil.completeExceptionally(sendWorkItem.getWork(), th, this, true);
    }

    private static SenderLinkSettings getDefaultLinkProperties(String str, String str2, MessagingFactory messagingFactory, MessagingEntityType messagingEntityType) {
        SenderLinkSettings senderLinkSettings = new SenderLinkSettings();
        senderLinkSettings.linkPath = str;
        Target target = new Target();
        target.setAddress(str);
        senderLinkSettings.target = target;
        senderLinkSettings.source = new Source();
        senderLinkSettings.settleMode = SenderSettleMode.UNSETTLED;
        senderLinkSettings.requiresAuthentication = true;
        HashMap hashMap = new HashMap();
        hashMap.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(messagingFactory.getOperationTimeout()).toMillis()));
        if (messagingEntityType != null) {
            hashMap.put(ClientConstants.ENTITY_TYPE_PROPERTY, Integer.valueOf(messagingEntityType.getIntValue()));
        }
        if (str2 != null && !str2.isEmpty()) {
            hashMap.put(ClientConstants.LINK_TRANSFER_DESTINATION_PROPERTY, str2);
        }
        senderLinkSettings.linkProperties = hashMap;
        return senderLinkSettings;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createSendLink(SenderLinkSettings senderLinkSettings) {
        TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath);
        Connection activeConnectionOrNothing = this.underlyingFactory.getActiveConnectionOrNothing();
        if (activeConnectionOrNothing == null) {
            TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
            ServiceBusException serviceBusException = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
            if (this.linkFirstOpen != null && !this.linkFirstOpen.isDone()) {
                AsyncUtil.completeFutureExceptionally(this.linkFirstOpen, serviceBusException);
            }
            if (this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) {
                return;
            }
            AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, serviceBusException);
            if (this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent) {
                this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = false;
                Timer.schedule(() -> {
                    ensureLinkIsOpen();
                }, Duration.ZERO, TimerType.OneTimeRun);
                return;
            }
            return;
        }
        Session session = activeConnectionOrNothing.session();
        session.setOutgoingWindow(2147483647L);
        session.open();
        BaseHandler.setHandler(session, new SessionHandler(this.sendPath));
        Sender sender = session.sender(senderLinkSettings.linkName);
        sender.setTarget(senderLinkSettings.target);
        sender.setSource(senderLinkSettings.source);
        sender.setProperties(senderLinkSettings.linkProperties);
        TRACE_LOGGER.debug("Send link settle mode '{}'", senderLinkSettings.settleMode);
        sender.setSenderSettleMode(senderLinkSettings.settleMode);
        BaseHandler.setHandler(sender, new SendLinkHandler(this));
        sender.open();
        this.sendLink = sender;
        this.underlyingFactory.registerForConnectionError(this.sendLink);
    }

    CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean z) {
        if (getIsClosingOrClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> thenAccept = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, z, () -> {
            sendTokenAndSetRenewTimer(true);
        }).thenAccept(scheduledFuture -> {
            this.sasTokenRenewTimerFuture = scheduledFuture;
        });
        return (this.transferDestinationPath == null || this.transferDestinationPath.isEmpty()) ? thenAccept : CompletableFuture.allOf(thenAccept, this.underlyingFactory.sendSecurityToken(this.transferSasTokenAudienceURI));
    }

    private void cancelSASTokenRenewTimer() {
        if (this.sasTokenRenewTimerFuture == null || this.sasTokenRenewTimerFuture.isDone()) {
            return;
        }
        this.sasTokenRenewTimerFuture.cancel(true);
        TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
    }

    private void initializeLinkOpen(TimeoutTracker timeoutTracker) {
        this.linkFirstOpen = new CompletableFuture<>();
        Timer.schedule(() -> {
            if (this.linkFirstOpen.isDone()) {
                return;
            }
            TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", this.sendLink.getName(), getSendPath(), ZonedDateTime.now().toString()), this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(4L)) ? this.lastKnownLinkError : null);
            TRACE_LOGGER.info(timeoutException.getMessage());
            ExceptionUtil.completeExceptionally(this.linkFirstOpen, timeoutException, this, true);
            setClosing();
            closeInternals(false);
            setClosed();
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.primitives.IErrorContextProvider
    public ErrorContext getContext() {
        return new SenderErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, (this.sendLink == null || this.sendLink.getRemoteProperties() == null || !this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)) ? this.sendLink != null ? this.sendLink.getName() : null : this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString(), (!(this.linkFirstOpen != null && this.linkFirstOpen.isDone()) || this.sendLink == null) ? null : Integer.valueOf(this.sendLink.getCredit()));
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpSender
    public void onFlow(int i) {
        this.lastKnownLinkError = null;
        if (i <= 0) {
            return;
        }
        TRACE_LOGGER.debug("Received flow frame. path:{}, linkName:{}, remoteLinkCredit:{}, pendingSendsWaitingForCredit:{}, pendingSendsWaitingDelivery:{}", new Object[]{this.sendPath, this.sendLink.getName(), Integer.valueOf(i), Integer.valueOf(this.pendingSends.size()), Integer.valueOf(this.pendingSendsData.size() - this.pendingSends.size())});
        this.linkCredit += i;
        this.sendWork.onEvent();
    }

    private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
        if (this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone()) {
            TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath);
            this.retryPolicy.incrementRetryCount(getClientId());
            this.sendLinkReopenFuture = new CompletableFuture<>();
            CompletableFuture<Void> completableFuture = this.sendLinkReopenFuture;
            Timer.schedule(() -> {
                if (completableFuture.isDone()) {
                    return;
                }
                cancelSASTokenRenewTimer();
                TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", this.sendLink.getName(), this.sendPath, ZonedDateTime.now()));
                TRACE_LOGGER.info(timeoutException.getMessage());
                completableFuture.completeExceptionally(timeoutException);
            }, LINK_REOPEN_TIMEOUT, TimerType.OneTimeRun);
            cancelSASTokenRenewTimer();
            (this.linkSettings.requiresAuthentication ? sendTokenAndSetRenewTimer(false) : CompletableFuture.completedFuture(null)).handleAsync((r6, th) -> {
                if (th != null) {
                    TRACE_LOGGER.info("Sending SAS Token to '{}' failed.", this.sendPath, ExceptionUtil.extractAsyncCompletionCause(th));
                    this.sendLinkReopenFuture.completeExceptionally(th);
                    clearAllPendingSendsWithException(th);
                    return null;
                }
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageSender.3
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            CoreMessageSender.this.createSendLink(CoreMessageSender.this.linkSettings);
                        }
                    });
                    return null;
                } catch (IOException e) {
                    this.sendLinkReopenFuture.completeExceptionally(e);
                    return null;
                }
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }
        return this.sendLinkReopenFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x010b, code lost:
    
        if (r0.getWork() == null) goto L147;
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x0115, code lost:
    
        if (r0.getWork().isDone() == false) goto L148;
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x0127, code lost:
    
        r13 = null;
        r14 = false;
        r15 = 0;
        r16 = null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:59:0x0133, code lost:
    
        r13 = r0.delivery(r0.getDeliveryTag().getBytes(java.nio.charset.StandardCharsets.UTF_8));
        r13.setMessageFormat(r0.getMessageFormat());
        r0 = r0.getTransaction();
     */
    /* JADX WARN: Code restructure failed: missing block: B:60:0x015b, code lost:
    
        if (r0 == com.microsoft.azure.servicebus.TransactionContext.NULL_TXN) goto L74;
     */
    /* JADX WARN: Code restructure failed: missing block: B:61:0x015e, code lost:
    
        r0 = new org.apache.qpid.proton.amqp.transaction.TransactionalState();
        r0.setTxnId(new org.apache.qpid.proton.amqp.Binary(r0.getTransactionId().array()));
        r13.disposition(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x0184, code lost:
    
        com.microsoft.azure.servicebus.primitives.CoreMessageSender.TRACE_LOGGER.debug("Sending message delivery '{}' to '{}'", r0.getDeliveryTag(), r9.sendPath);
        r15 = r0.send(r0.getMessage(), 0, r0.getEncodedMessageSize());
     */
    /* JADX WARN: Code restructure failed: missing block: B:63:0x01ab, code lost:
    
        if (com.microsoft.azure.servicebus.primitives.CoreMessageSender.$assertionsDisabled != false) goto L80;
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x01b4, code lost:
    
        if (r15 == r0.getEncodedMessageSize()) goto L80;
     */
    /* JADX WARN: Code restructure failed: missing block: B:67:0x01c1, code lost:
    
        throw new java.lang.AssertionError("Contract of the ProtonJ library for Sender.Send API changed");
     */
    /* JADX WARN: Code restructure failed: missing block: B:70:0x01c2, code lost:
    
        r14 = r0.advance();
     */
    /* JADX WARN: Code restructure failed: missing block: B:85:0x01cd, code lost:
    
        r17 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:86:0x01cf, code lost:
    
        r16 = r17;
     */
    /* JADX WARN: Code restructure failed: missing block: B:88:0x0118, code lost:
    
        r9.pendingSendsData.remove(r0.getDeliveryTag());
     */
    /* JADX WARN: Code restructure failed: missing block: B:97:0x00a0, code lost:
    
        com.microsoft.azure.servicebus.primitives.CoreMessageSender.TRACE_LOGGER.debug("There are no pending sends to '{}'.", r9.sendPath);
        r9.isSendLoopRunning = false;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void processSendWork() {
        /*
            Method dump skipped, instructions count: 696
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.microsoft.azure.servicebus.primitives.CoreMessageSender.processSendWork():void");
    }

    private void throwSenderTimeout(CompletableFuture<DeliveryState> completableFuture, Exception exc) {
        Exception exc2 = exc;
        if (exc == null && this.lastKnownLinkError != null) {
            exc2 = this.lastKnownErrorReportedAt.isAfter(Instant.now().minusMillis(this.operationTimeout.toMillis())) ? this.lastKnownLinkError : null;
        }
        Throwable timeoutException = exc2 == null || !(exc2 instanceof ServiceBusException) ? new TimeoutException(String.format(Locale.US, "%s %s %s.", SEND_TIMED_OUT, " at ", ZonedDateTime.now()), exc2) : (ServiceBusException) exc2;
        TRACE_LOGGER.info("Send timed out", timeoutException);
        ExceptionUtil.completeExceptionally(completableFuture, timeoutException, this, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleLinkCloseTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(() -> {
            if (this.linkClose.isDone()) {
                return;
            }
            TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on Send Link(%s) timed out at %s", "Close", this.sendLink.getName(), ZonedDateTime.now()));
            TRACE_LOGGER.info(timeoutException.getMessage());
            ExceptionUtil.completeExceptionally(this.linkClose, timeoutException, this, true);
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.primitives.ClientEntity
    protected CompletableFuture<Void> onClose() {
        closeInternals(true);
        return this.linkClose;
    }

    private void closeInternals(final boolean z) {
        if (getIsClosed()) {
            return;
        }
        if (this.sendLink == null || this.sendLink.getLocalState() == EndpointState.CLOSED) {
            AsyncUtil.completeFuture(this.linkClose, null);
        } else {
            try {
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageSender.4
                    @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                    public void onEvent() {
                        if (CoreMessageSender.this.sendLink == null || CoreMessageSender.this.sendLink.getLocalState() == EndpointState.CLOSED) {
                            return;
                        }
                        CoreMessageSender.TRACE_LOGGER.info("Closing send link to '{}'", CoreMessageSender.this.sendPath);
                        CoreMessageSender.this.underlyingFactory.deregisterForConnectionError(CoreMessageSender.this.sendLink);
                        CoreMessageSender.this.sendLink.close();
                        if (z) {
                            CoreMessageSender.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageSender.this.operationTimeout));
                        } else {
                            AsyncUtil.completeFuture(CoreMessageSender.this.linkClose, null);
                        }
                    }
                });
            } catch (IOException e) {
                AsyncUtil.completeFutureExceptionally(this.linkClose, e);
            }
        }
        cancelSASTokenRenewTimer();
        closeRequestResponseLink();
    }

    public CompletableFuture<long[]> scheduleMessageAsync(Message[] messageArr, TransactionContext transactionContext, Duration duration) {
        TRACE_LOGGER.debug("Sending '{}' scheduled message(s) to '{}'", Integer.valueOf(messageArr.length), this.sendPath);
        return createRequestResponseLink().thenComposeAsync(r12 -> {
            HashMap hashMap = new HashMap();
            LinkedList linkedList = new LinkedList();
            for (Message message : messageArr) {
                HashMap hashMap2 = new HashMap();
                try {
                    Pair<byte[], Integer> encodeMessageToOptimalSizeArray = Util.encodeMessageToOptimalSizeArray(message, this.maxMessageSize);
                    hashMap2.put(ClientConstants.REQUEST_RESPONSE_MESSAGE, new Binary(encodeMessageToOptimalSizeArray.getFirstItem(), 0, encodeMessageToOptimalSizeArray.getSecondItem().intValue()));
                    hashMap2.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_ID, message.getMessageId());
                    String groupId = message.getGroupId();
                    if (!StringUtil.isNullOrEmpty(groupId)) {
                        hashMap2.put("session-id", groupId);
                    }
                    Object obj = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.PARTITIONKEYNAME));
                    if (obj != null && !((String) obj).isEmpty()) {
                        hashMap2.put(ClientConstants.REQUEST_RESPONSE_PARTITION_KEY, obj);
                    }
                    Object obj2 = message.getMessageAnnotations().getValue().get(Symbol.valueOf(ClientConstants.VIAPARTITIONKEYNAME));
                    if (obj2 != null && !((String) obj2).isEmpty()) {
                        hashMap2.put(ClientConstants.REQUEST_RESPONSE_VIA_PARTITION_KEY, obj2);
                    }
                    linkedList.add(hashMap2);
                } catch (PayloadSizeExceededException e) {
                    TRACE_LOGGER.info("Payload size of message exceeded limit", e);
                    CompletableFuture completableFuture = new CompletableFuture();
                    completableFuture.completeExceptionally(e);
                    return completableFuture;
                }
            }
            hashMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGES, linkedList);
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, hashMap, Util.adjustServerTimeout(duration), this.sendLink.getName()), transactionContext, duration).thenComposeAsync(message2 -> {
                CompletableFuture completableFuture2 = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message2) == 200) {
                    long[] jArr = (long[]) RequestResponseUtils.getResponseBody(message2).get(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS);
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        TRACE_LOGGER.debug("Scheduled messages sent. Received sequence numbers '{}'", Arrays.toString(jArr));
                    }
                    completableFuture2.complete(jArr);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message2);
                    TRACE_LOGGER.info("Sending scheduled messages to '{}' failed.", this.sendPath, genereateExceptionFromResponse);
                    completableFuture2.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture2;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> cancelScheduledMessageAsync(Long[] lArr, Duration duration) {
        if (TRACE_LOGGER.isDebugEnabled()) {
            TRACE_LOGGER.debug("Cancelling scheduled message(s) '{}' to '{}'", Arrays.toString(lArr), this.sendPath);
        }
        return createRequestResponseLink().thenComposeAsync(r8 -> {
            HashMap hashMap = new HashMap();
            hashMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, lArr);
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION, hashMap, Util.adjustServerTimeout(duration), this.sendLink.getName()), TransactionContext.NULL_TXN, duration).thenComposeAsync(message -> {
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    TRACE_LOGGER.debug("Cancelled scheduled messages in '{}'", this.sendPath);
                    completableFuture.complete(null);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.info("Cancelling scheduled messages in '{}' failed.", this.sendPath, genereateExceptionFromResponse);
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Collection<Message>> peekMessagesAsync(long j, int i) {
        TRACE_LOGGER.debug("Peeking '{}' messages in '{}' from sequence number '{}'", new Object[]{Integer.valueOf(i), this.sendPath, Long.valueOf(j)});
        return createRequestResponseLink().thenComposeAsync(r12 -> {
            return CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, j, i, null, null);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    static {
        $assertionsDisabled = !CoreMessageSender.class.desiredAssertionStatus();
        TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageSender.class);
        LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5L);
    }
}
