package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageLockContainer;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAsyncConsumer;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.class */
public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
    private static final DeadLetterOptions DEFAULT_DEAD_LETTER_OPTIONS = new DeadLetterOptions();
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final boolean isSessionEnabled;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final int prefetch;
    private final ReceiveMode receiveMode;
    private final MessageLockContainer messageLockContainer;
    private final ReceiveAsyncOptions defaultReceiveOptions;
    private final Runnable onClientClose;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClient.class);
    private final ConcurrentHashMap<String, ServiceBusAsyncConsumer> openConsumers = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusReceiverAsyncClient(String str, String str2, MessagingEntityType messagingEntityType, boolean z, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor serviceBusConnectionProcessor, TracerProvider tracerProvider, MessageSerializer messageSerializer, MessageLockContainer messageLockContainer, Runnable runnable) {
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = (String) Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        this.receiverOptions = (ReceiverOptions) Objects.requireNonNull(receiverOptions, "'receiveMessageOptions' cannot be null.");
        this.connectionProcessor = (ServiceBusConnectionProcessor) Objects.requireNonNull(serviceBusConnectionProcessor, "'connectionProcessor' cannot be null.");
        this.tracerProvider = (TracerProvider) Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.prefetch = receiverOptions.getPrefetchCount();
        this.receiveMode = receiverOptions.getReceiveMode();
        this.entityType = messagingEntityType;
        this.isSessionEnabled = z;
        this.messageLockContainer = messageLockContainer;
        this.onClientClose = runnable;
        this.defaultReceiveOptions = new ReceiveAsyncOptions().setEnableAutoComplete(true).setMaxAutoRenewDuration(serviceBusConnectionProcessor.getRetryOptions().getTryTimeout());
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public Mono<Void> abandon(MessageLockToken messageLockToken) {
        return abandon(messageLockToken, null);
    }

    public Mono<Void> abandon(MessageLockToken messageLockToken, Map<String, Object> map) {
        return updateDisposition(messageLockToken, DispositionStatus.ABANDONED, null, null, map);
    }

    public Mono<Void> complete(MessageLockToken messageLockToken) {
        return updateDisposition(messageLockToken, DispositionStatus.COMPLETED, null, null, null);
    }

    public Mono<Void> defer(MessageLockToken messageLockToken) {
        return defer(messageLockToken, null);
    }

    public Mono<Void> defer(MessageLockToken messageLockToken, Map<String, Object> map) {
        return updateDisposition(messageLockToken, DispositionStatus.DEFERRED, null, null, map);
    }

    public Mono<Void> deadLetter(MessageLockToken messageLockToken) {
        return deadLetter(messageLockToken, DEFAULT_DEAD_LETTER_OPTIONS);
    }

    public Mono<Void> deadLetter(MessageLockToken messageLockToken, DeadLetterOptions deadLetterOptions) {
        return Objects.isNull(deadLetterOptions) ? FluxUtil.monoError(this.logger, new NullPointerException("'deadLetterOptions' cannot be null.")) : updateDisposition(messageLockToken, DispositionStatus.SUSPENDED, deadLetterOptions.getDeadLetterReason(), deadLetterOptions.getDeadLetterErrorDescription(), deadLetterOptions.getPropertiesToModify());
    }

    public Mono<ServiceBusReceivedMessage> peek() {
        return this.isDisposed.get() ? FluxUtil.monoError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peek"))) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap((v0) -> {
            return v0.peek();
        });
    }

    public Mono<ServiceBusReceivedMessage> peekAt(long j) {
        return this.isDisposed.get() ? FluxUtil.monoError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt"))) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.peek(j);
        });
    }

    public Flux<ServiceBusReceivedMessage> peekBatch(int i) {
        return this.isDisposed.get() ? FluxUtil.fluxError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch"))) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMapMany(serviceBusManagementNode -> {
            return serviceBusManagementNode.peekBatch(i);
        });
    }

    public Flux<ServiceBusReceivedMessage> peekBatchAt(int i, long j) {
        return this.isDisposed.get() ? FluxUtil.fluxError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt"))) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMapMany(serviceBusManagementNode -> {
            return serviceBusManagementNode.peekBatch(i, j);
        });
    }

    public Flux<ServiceBusReceivedMessage> receive() {
        return receive(this.defaultReceiveOptions);
    }

    public Flux<ServiceBusReceivedMessage> receive(ReceiveAsyncOptions receiveAsyncOptions) {
        return this.isDisposed.get() ? FluxUtil.fluxError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receive"))) : Objects.isNull(receiveAsyncOptions) ? FluxUtil.fluxError(this.logger, new NullPointerException("'options' cannot be null")) : (receiveAsyncOptions.getMaxAutoRenewDuration() == null || !receiveAsyncOptions.getMaxAutoRenewDuration().isNegative()) ? (this.receiveMode == ReceiveMode.PEEK_LOCK || !receiveAsyncOptions.isEnableAutoComplete()) ? Flux.usingWhen(Mono.fromCallable(() -> {
            return getOrCreateConsumer(this.entityPath, receiveAsyncOptions);
        }), serviceBusAsyncConsumer -> {
            return serviceBusAsyncConsumer.receive();
        }, serviceBusAsyncConsumer2 -> {
            String linkName = serviceBusAsyncConsumer2.getLinkName();
            this.logger.info("{}: Receiving completed. Disposing", new Object[]{linkName});
            ServiceBusAsyncConsumer remove = this.openConsumers.remove(linkName);
            if (remove == null) {
                this.logger.warning("Could not find consumer to remove for: {}", new Object[]{linkName});
            } else {
                remove.close();
            }
            return Mono.empty();
        }) : Flux.error(this.logger.logExceptionAsError(new UnsupportedOperationException("Auto-complete is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE."))) : FluxUtil.fluxError(this.logger, new IllegalArgumentException("'maxAutoRenewDuration' cannot be negative."));
    }

    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long j) {
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.receiveDeferredMessage(this.receiveMode, j);
        }).map(serviceBusReceivedMessage -> {
            if (this.receiveMode == ReceiveMode.PEEK_LOCK && !CoreUtils.isNullOrEmpty(serviceBusReceivedMessage.getLockToken())) {
                serviceBusReceivedMessage.setLockedUntil(this.messageLockContainer.addOrUpdate(serviceBusReceivedMessage.getLockToken(), serviceBusReceivedMessage.getLockedUntil()));
            }
            return serviceBusReceivedMessage;
        });
    }

    public Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(long... jArr) {
        return this.isDisposed.get() ? FluxUtil.fluxError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch"))) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMapMany(serviceBusManagementNode -> {
            return serviceBusManagementNode.receiveDeferredMessageBatch(this.receiveMode, jArr);
        });
    }

    public Mono<Instant> renewMessageLock(MessageLockToken messageLockToken) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock")));
        }
        if (Objects.isNull(messageLockToken)) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'receivedMessage' cannot be null."));
        }
        if (Objects.isNull(messageLockToken.getLockToken())) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'receivedMessage.lockToken' cannot be null."));
        }
        if (messageLockToken.getLockToken().isEmpty()) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("'message.lockToken' cannot be empty."));
        }
        UUID uuid = null;
        try {
            uuid = UUID.fromString(messageLockToken.getLockToken());
        } catch (IllegalArgumentException e) {
            FluxUtil.monoError(this.logger, e);
        }
        UUID uuid2 = uuid;
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.renewMessageLock(uuid2);
        }).map(instant -> {
            if (messageLockToken instanceof ServiceBusReceivedMessage) {
                ((ServiceBusReceivedMessage) messageLockToken).setLockedUntil(instant);
            }
            return instant;
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.logger.info("Removing receiver links.");
        this.openConsumers.keySet().forEach(str -> {
            ServiceBusAsyncConsumer serviceBusAsyncConsumer = this.openConsumers.get(str);
            if (serviceBusAsyncConsumer != null) {
                serviceBusAsyncConsumer.close();
            }
        });
        this.openConsumers.clear();
        this.onClientClose.run();
    }

    private Mono<Boolean> isLockTokenValid(String str) {
        Instant lockTokenExpiration = this.messageLockContainer.getLockTokenExpiration(str);
        if (lockTokenExpiration == null) {
            this.logger.warning("lockToken[{}] is not owned by this receiver.", new Object[]{str});
            return Mono.just(false);
        }
        Instant now = Instant.now();
        return lockTokenExpiration.isBefore(now) ? Mono.error(this.logger.logExceptionAsError(new AmqpException(false, String.format("Lock already expired for the lock token. Expiration: '%s'. Now: '%s'", lockTokenExpiration, now), getErrorContext()))) : Mono.just(true);
    }

    private Mono<Void> updateDisposition(MessageLockToken messageLockToken, DispositionStatus dispositionStatus, String str, String str2, Map<String, Object> map) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, dispositionStatus.getValue())));
        }
        if (Objects.isNull(messageLockToken)) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'receivedMessage' cannot be null."));
        }
        if (Objects.isNull(messageLockToken.getLockToken())) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'receivedMessage.lockToken' cannot be null."));
        }
        if (messageLockToken.getLockToken().isEmpty()) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("'message.lockToken' cannot be empty."));
        }
        if (this.receiveMode != ReceiveMode.PEEK_LOCK) {
            return Mono.error(this.logger.logExceptionAsError(new UnsupportedOperationException(String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", dispositionStatus))));
        }
        if (Objects.isNull(messageLockToken.getLockToken())) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'receivedMessage.lockToken' cannot be null."));
        }
        if (messageLockToken.getLockToken().isEmpty()) {
            return FluxUtil.monoError(this.logger, new IllegalArgumentException("'message.lockToken' cannot be empty."));
        }
        String lockToken = messageLockToken.getLockToken();
        this.logger.info("{}: Update started. Disposition: {}. Lock: {}. Expiration: {}", new Object[]{this.entityPath, dispositionStatus, lockToken, this.messageLockContainer.getLockTokenExpiration(lockToken)});
        return isLockTokenValid(lockToken).flatMap(bool -> {
            return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
                return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
            }).flatMap(serviceBusManagementNode -> {
                return bool.booleanValue() ? serviceBusManagementNode.updateDisposition(lockToken, dispositionStatus, str, str2, map) : Mono.error(new UnsupportedOperationException("Cannot complete a message that is not locked. lockToken: " + lockToken));
            });
        }).then(Mono.fromRunnable(() -> {
            this.logger.info("{}: Update completed. Disposition: {}. Lock: {}.", new Object[]{this.entityPath, dispositionStatus, lockToken});
            this.messageLockContainer.remove(lockToken);
        }));
    }

    private ServiceBusAsyncConsumer getOrCreateConsumer(String str, ReceiveAsyncOptions receiveAsyncOptions) {
        return this.openConsumers.computeIfAbsent(str, str2 -> {
            this.logger.info("{}: Creating consumer for link '{}'", new Object[]{this.entityPath, str});
            return new ServiceBusAsyncConsumer(str, this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
                return serviceBusAmqpConnection.createReceiveLink(str, this.entityPath, this.receiveMode, this.isSessionEnabled, null, this.entityType);
            }).doOnNext(amqpReceiveLink -> {
                this.logger.verbose("Created consumer for Service Bus resource: [{}] mode: [{}] sessionEnabled? {} transferEntityPath: [{}], entityType: [{}]", new Object[]{amqpReceiveLink.getEntityPath(), this.receiveMode, Boolean.valueOf(this.isSessionEnabled), "N/A", this.entityType});
            }).repeat().subscribeWith(new ServiceBusReceiveLinkProcessor(this.prefetch, RetryUtil.getRetryPolicy(this.connectionProcessor.getRetryOptions()), this.connectionProcessor, new LinkErrorContext(this.fullyQualifiedNamespace, this.entityPath, str, (Integer) null))), this.messageSerializer, receiveAsyncOptions.isEnableAutoComplete(), (receiveAsyncOptions.getMaxAutoRenewDuration() == null || receiveAsyncOptions.getMaxAutoRenewDuration().isZero()) ? false : true, receiveAsyncOptions.getMaxAutoRenewDuration(), this.connectionProcessor.getRetryOptions(), this.messageLockContainer, this::complete, this::abandon, this::renewMessageLock);
        });
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(getFullyQualifiedNamespace(), getEntityPath());
    }
}
