package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusAsyncConsumer.class */
public class ServiceBusAsyncConsumer implements AutoCloseable {
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ServiceBusReceiveLinkProcessor amqpReceiveLinkProcessor;
    private final MessageSerializer messageSerializer;
    private final ServiceBusMessageProcessor processor;
    private final String linkName;

    public ServiceBusAsyncConsumer(String str, ServiceBusReceiveLinkProcessor serviceBusReceiveLinkProcessor, MessageSerializer messageSerializer, boolean z, boolean z2, Duration duration, AmqpRetryOptions amqpRetryOptions, MessageLockContainer messageLockContainer, Function<MessageLockToken, Mono<Void>> function, Function<MessageLockToken, Mono<Void>> function2, Function<MessageLockToken, Mono<Instant>> function3) {
        this.linkName = str;
        this.amqpReceiveLinkProcessor = serviceBusReceiveLinkProcessor;
        this.messageSerializer = messageSerializer;
        this.processor = serviceBusReceiveLinkProcessor.map(message -> {
            return (ServiceBusReceivedMessage) this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class);
        }).subscribeWith(new ServiceBusMessageProcessor(z, z2, duration, amqpRetryOptions, messageLockContainer, serviceBusReceiveLinkProcessor.getErrorContext(), function, function2, function3));
    }

    public String getLinkName() {
        return this.linkName;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.processor.onComplete();
        this.amqpReceiveLinkProcessor.cancel();
    }

    public Flux<ServiceBusReceivedMessage> receive() {
        return this.processor;
    }
}
