package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:com/microsoft/azure/servicebus/MessageReceiver.class */
public class MessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider {
    private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);
    private static final int MIN_TIMEOUT_DURATION_MILLIS = 20;
    private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
    private final MessagingFactory underlyingFactory;
    private final String receivePath;
    private final Runnable onOperationTimedout;
    private final Duration operationTimeout;
    private final CompletableFuture<Void> linkClose;
    private final Object prefetchCountSync;
    private int prefetchCount;
    private ConcurrentLinkedQueue<Message> prefetchedMessages;
    private Receiver receiveLink;
    private WorkItem<MessageReceiver> linkOpen;
    private Duration receiveTimeout;
    private long epoch;
    private boolean isEpochReceiver;
    private Instant dateTime;
    private boolean offsetInclusive;
    private String lastReceivedOffset;
    private Exception lastKnownLinkError;
    private int nextCreditToFlow;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/servicebus/MessageReceiver$ReceiveWorkItem.class */
    public static class ReceiveWorkItem extends WorkItem<Collection<Message>> {
        private final int maxMessageCount;

        public ReceiveWorkItem(CompletableFuture<Collection<Message>> completableFuture, Duration duration, int i) {
            super(completableFuture, duration);
            this.maxMessageCount = i;
        }
    }

    private MessageReceiver(MessagingFactory messagingFactory, String str, String str2, String str3, boolean z, Instant instant, int i, Long l, boolean z2) {
        super(str, messagingFactory);
        this.underlyingFactory = messagingFactory;
        this.operationTimeout = messagingFactory.getOperationTimeout();
        this.receivePath = str2;
        this.prefetchCount = i;
        this.epoch = l.longValue();
        this.isEpochReceiver = z2;
        this.prefetchedMessages = new ConcurrentLinkedQueue<>();
        this.linkClose = new CompletableFuture<>();
        this.lastKnownLinkError = null;
        this.receiveTimeout = messagingFactory.getOperationTimeout();
        this.prefetchCountSync = new Object();
        if (str3 != null) {
            this.lastReceivedOffset = str3;
            this.offsetInclusive = z;
        } else {
            this.dateTime = instant;
        }
        this.pendingReceives = new ConcurrentLinkedQueue<>();
        this.onOperationTimedout = new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.1
            @Override // java.lang.Runnable
            public void run() {
                boolean z3 = false;
                while (true) {
                    WorkItem workItem = (WorkItem) MessageReceiver.this.pendingReceives.peek();
                    if (workItem != null) {
                        if (workItem.getTimeoutTracker().remaining().toMillis() > 20) {
                            MessageReceiver.this.scheduleOperationTimer(workItem.getTimeoutTracker());
                            break;
                        }
                        WorkItem workItem2 = (WorkItem) MessageReceiver.this.pendingReceives.poll();
                        if (workItem2 == null) {
                            break;
                        }
                        z3 = true;
                        workItem2.getWork().complete(null);
                    } else {
                        break;
                    }
                }
                if (z3) {
                    try {
                        MessageReceiver.this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.MessageReceiver.1.1
                            @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                            public void onEvent() {
                                MessageReceiver.this.receiveLink.flow(0);
                            }
                        });
                    } catch (IOException e) {
                    }
                }
            }
        };
    }

    public static CompletableFuture<MessageReceiver> create(MessagingFactory messagingFactory, String str, String str2, String str3, boolean z, Instant instant, int i, long j, boolean z2) {
        return new MessageReceiver(messagingFactory, str, str2, str3, z, instant, i, Long.valueOf(j), z2).createLink();
    }

    private CompletableFuture<MessageReceiver> createLink() {
        this.linkOpen = new WorkItem<>(new CompletableFuture(), this.operationTimeout);
        scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
        try {
            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.MessageReceiver.2
                @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                public void onEvent() {
                    MessageReceiver.this.createReceiveLink();
                }
            });
        } catch (IOException e) {
            this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", e));
        }
        return this.linkOpen.getWork();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Message> receiveCore(int i) {
        LinkedList linkedList = null;
        Message pollPrefetchQueue = pollPrefetchQueue();
        while (true) {
            Message message = pollPrefetchQueue;
            if (message == null) {
                break;
            }
            if (linkedList == null) {
                linkedList = new LinkedList();
            }
            linkedList.add(message);
            if (linkedList.size() >= i) {
                break;
            }
            pollPrefetchQueue = pollPrefetchQueue();
        }
        return linkedList;
    }

    public int getPrefetchCount() {
        int i;
        synchronized (this.prefetchCountSync) {
            i = this.prefetchCount;
        }
        return i;
    }

    public void setPrefetchCount(int i) throws ServiceBusException {
        final int i2;
        synchronized (this.prefetchCountSync) {
            i2 = this.prefetchCount - i;
            this.prefetchCount = i;
        }
        try {
            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.MessageReceiver.3
                @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                public void onEvent() {
                    MessageReceiver.this.sendFlow(i2);
                }
            });
        } catch (IOException e) {
            throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", e);
        }
    }

    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }

    public void setReceiveTimeout(Duration duration) {
        this.receiveTimeout = duration;
    }

    public CompletableFuture<Collection<Message>> receive(final int i) {
        throwIfClosed(this.lastKnownLinkError);
        if (i <= 0 || i > this.prefetchCount) {
            throw new IllegalArgumentException(String.format(Locale.US, "parameter 'maxMessageCount' should be a positive number and should be less than prefetchCount(%s)", Integer.valueOf(this.prefetchCount)));
        }
        if (this.pendingReceives.isEmpty()) {
            scheduleOperationTimer(TimeoutTracker.create(this.receiveTimeout));
        }
        final CompletableFuture<Collection<Message>> completableFuture = new CompletableFuture<>();
        try {
            this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.MessageReceiver.4
                @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                public void onEvent() {
                    if (MessageReceiver.this.receiveLink.getLocalState() == EndpointState.CLOSED || MessageReceiver.this.receiveLink.getRemoteState() == EndpointState.CLOSED) {
                        MessageReceiver.this.createReceiveLink();
                    }
                    List receiveCore = MessageReceiver.this.receiveCore(i);
                    if (receiveCore != null) {
                        completableFuture.complete(receiveCore);
                    } else {
                        MessageReceiver.this.pendingReceives.offer(new ReceiveWorkItem(completableFuture, MessageReceiver.this.receiveTimeout, i));
                    }
                }
            });
        } catch (IOException e) {
            completableFuture.completeExceptionally(new ServiceBusException(false, "Receive failed while dispatching to Reactor, see cause for more details.", e));
        }
        return completableFuture;
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onOpenComplete(Exception exc) {
        if (exc != null) {
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                setClosed();
                ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exc, this);
            }
            this.lastKnownLinkError = exc;
            return;
        }
        if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
            this.linkOpen.getWork().complete(this);
        }
        this.lastKnownLinkError = null;
        this.offsetInclusive = false;
        this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
        this.nextCreditToFlow = 0;
        sendFlow(this.prefetchCount - this.prefetchedMessages.size());
        if (TRACE_LOGGER.isLoggable(Level.FINE)) {
            TRACE_LOGGER.log(Level.FINE, String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s]", this.receivePath, this.receiveLink.getName(), Integer.valueOf(this.receiveLink.getCredit()), Integer.valueOf(this.prefetchCount)));
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpReceiver
    public void onReceiveComplete(Delivery delivery) {
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        int recv = this.receiveLink.recv(bArr, 0, pending);
        Message message = Proton.message();
        message.decode(bArr, 0, recv);
        delivery.settle();
        this.prefetchedMessages.add(message);
        this.underlyingFactory.getRetryPolicy().resetRetryCount(getClientId());
        ReceiveWorkItem poll = this.pendingReceives.poll();
        if (poll == null || poll.getWork().isDone()) {
            return;
        }
        poll.getWork().complete(receiveCore(poll.maxMessageCount));
    }

    public void onError(ErrorCondition errorCondition) {
        onError(ExceptionUtil.toException(errorCondition));
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onError(Exception exc) {
        Duration nextRetryInterval;
        this.prefetchedMessages.clear();
        if (getIsClosingOrClosed()) {
            this.linkClose.complete(null);
            boolean z = exc == null || ((exc instanceof ServiceBusException) && ((ServiceBusException) exc).getIsTransient());
            while (true) {
                ReceiveWorkItem poll = this.pendingReceives.poll();
                if (poll == null) {
                    return;
                }
                CompletableFuture<Collection<Message>> work = poll.getWork();
                if (z) {
                    work.complete(null);
                } else {
                    ExceptionUtil.completeExceptionally(work, exc, this);
                }
            }
        } else {
            this.lastKnownLinkError = exc;
            onOpenComplete(exc);
            if (exc == null || ((exc instanceof ServiceBusException) && ((ServiceBusException) exc).getIsTransient())) {
                ReceiveWorkItem peek = this.pendingReceives.peek();
                if (peek == null || peek.getTimeoutTracker() == null || (nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(getClientId(), exc, peek.getTimeoutTracker().remaining())) == null) {
                    return;
                }
                try {
                    this.underlyingFactory.scheduleOnReactorThread((int) nextRetryInterval.toMillis(), new DispatchHandler() { // from class: com.microsoft.azure.servicebus.MessageReceiver.5
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            if (MessageReceiver.this.receiveLink.getLocalState() == EndpointState.CLOSED || MessageReceiver.this.receiveLink.getRemoteState() == EndpointState.CLOSED) {
                                MessageReceiver.this.createReceiveLink();
                            }
                        }
                    });
                    return;
                } catch (IOException e) {
                    return;
                }
            }
            while (true) {
                ReceiveWorkItem poll2 = this.pendingReceives.poll();
                if (poll2 == null) {
                    return;
                } else {
                    ExceptionUtil.completeExceptionally(poll2.getWork(), exc, this);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleOperationTimer(TimeoutTracker timeoutTracker) {
        if (timeoutTracker != null) {
            Timer.schedule(this.onOperationTimedout, timeoutTracker.remaining(), TimerType.OneTimeRun);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createReceiveLink() {
        long j;
        UnknownDescribedType unknownDescribedType;
        Connection connection = this.underlyingFactory.getConnection();
        Source source = new Source();
        source.setAddress(this.receivePath);
        if (this.lastReceivedOffset == null) {
            try {
                j = this.dateTime.toEpochMilli();
            } catch (ArithmeticException e) {
                j = Long.MAX_VALUE;
                if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                    TRACE_LOGGER.log(Level.WARNING, String.format("receiverPath[%s], linkname[%s], warning[starting receiver from epoch+Long.Max]", this.receivePath, this.receiveLink.getName()));
                }
            }
            unknownDescribedType = new UnknownDescribedType(AmqpConstants.STRING_FILTER, String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.RECEIVED_AT_ANNOTATION_NAME, StringUtil.EMPTY, Long.valueOf(j)));
        } else {
            if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                TRACE_LOGGER.log(Level.FINE, String.format("receiverPath[%s], action[recreateReceiveLink], offset[%s], offsetInclusive[%s]", this.receivePath, this.lastReceivedOffset, Boolean.valueOf(this.offsetInclusive)));
            }
            Symbol symbol = AmqpConstants.STRING_FILTER;
            Object[] objArr = new Object[3];
            objArr[0] = AmqpConstants.OFFSET_ANNOTATION_NAME;
            objArr[1] = this.offsetInclusive ? "=" : StringUtil.EMPTY;
            objArr[2] = this.lastReceivedOffset;
            unknownDescribedType = new UnknownDescribedType(symbol, String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, objArr));
        }
        source.setFilter(Collections.singletonMap(AmqpConstants.STRING_FILTER, unknownDescribedType));
        Session session = connection.session();
        session.setIncomingCapacity(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler(session, new SessionHandler(this.receivePath));
        String randomString = StringUtil.getRandomString();
        Link receiver = session.receiver(!StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? randomString.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) : randomString);
        receiver.setSource(source);
        receiver.setTarget(new Target());
        receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
        if (this.isEpochReceiver) {
            receiver.setProperties(Collections.singletonMap(AmqpConstants.EPOCH, Long.valueOf(this.epoch)));
        }
        BaseHandler.setHandler(receiver, new ReceiveLinkHandler(this));
        this.underlyingFactory.registerForConnectionError(receiver);
        receiver.open();
        if (this.receiveLink != null) {
            this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
        }
        this.receiveLink = receiver;
    }

    private Message pollPrefetchQueue() {
        Message poll = this.prefetchedMessages.poll();
        if (poll != null) {
            this.lastReceivedOffset = poll.getMessageAnnotations().getValue().get(AmqpConstants.OFFSET).toString();
            sendFlow(1);
        }
        return poll;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendFlow(int i) {
        this.nextCreditToFlow += i;
        if (this.nextCreditToFlow >= this.prefetchCount || this.nextCreditToFlow >= 100) {
            int i2 = this.nextCreditToFlow;
            this.receiveLink.flow(i2);
            this.nextCreditToFlow = 0;
            if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                TRACE_LOGGER.log(Level.FINE, String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s]", this.receivePath, this.receiveLink.getName(), Integer.valueOf(this.receiveLink.getCredit()), Integer.valueOf(i2)));
            }
        }
    }

    private void scheduleLinkOpenTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.6
            @Override // java.lang.Runnable
            public void run() {
                if (MessageReceiver.this.linkOpen.getWork().isDone()) {
                    return;
                }
                TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()), MessageReceiver.this.lastKnownLinkError);
                if (MessageReceiver.TRACE_LOGGER.isLoggable(Level.WARNING)) {
                    MessageReceiver.TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Open"), (Throwable) timeoutException);
                }
                ExceptionUtil.completeExceptionally(MessageReceiver.this.linkOpen.getWork(), timeoutException, MessageReceiver.this);
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.7
            @Override // java.lang.Runnable
            public void run() {
                if (MessageReceiver.this.linkClose.isDone()) {
                    return;
                }
                TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", MessageReceiver.this.receiveLink.getName(), ZonedDateTime.now()));
                if (MessageReceiver.TRACE_LOGGER.isLoggable(Level.WARNING)) {
                    MessageReceiver.TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Close"), (Throwable) timeoutException);
                }
                ExceptionUtil.completeExceptionally(MessageReceiver.this.linkClose, timeoutException, MessageReceiver.this);
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onClose(ErrorCondition errorCondition) {
        if (errorCondition == null) {
            onError(new ServiceBusException(true, String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath)));
        } else {
            onError(errorCondition);
        }
    }

    @Override // com.microsoft.azure.servicebus.IErrorContextProvider
    public ErrorContext getContext() {
        boolean z = this.linkOpen != null && this.linkOpen.getWork().isDone();
        return new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, (this.receiveLink == null || this.receiveLink.getRemoteProperties() == null || !this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)) ? this.receiveLink != null ? this.receiveLink.getName() : null : this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString(), (!z || this.lastReceivedOffset == null) ? null : Long.valueOf(Long.parseLong(this.lastReceivedOffset)), z ? Integer.valueOf(this.prefetchCount) : null, (!z || this.receiveLink == null) ? null : Integer.valueOf(this.receiveLink.getCredit()), (!z || this.prefetchedMessages == null) ? null : Integer.valueOf(this.prefetchedMessages.size()), Boolean.valueOf(this.isEpochReceiver));
    }

    @Override // com.microsoft.azure.servicebus.ClientEntity
    protected CompletableFuture<Void> onClose() {
        if (!getIsClosed()) {
            if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                this.receiveLink.close();
                scheduleLinkCloseTimeout(TimeoutTracker.create(this.operationTimeout));
            } else if (this.receiveLink == null || this.receiveLink.getRemoteState() == EndpointState.CLOSED) {
                this.linkClose.complete(null);
            }
        }
        return this.linkClose;
    }
}
