package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.MessageLockToken;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import java.time.Duration;
import java.time.Instant;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusMessageProcessor.class */
public class ServiceBusMessageProcessor extends FluxProcessor<ServiceBusReceivedMessage, ServiceBusReceivedMessage> implements Subscription {
    private final boolean isAutoComplete;
    private final AmqpRetryOptions retryOptions;
    private final AmqpErrorContext errorContext;
    private final Function<MessageLockToken, Mono<Void>> completeFunction;
    private final Function<MessageLockToken, Mono<Void>> onAbandon;
    private final Function<MessageLockToken, Mono<Instant>> onRenewLock;
    private final boolean isAutoRenewLock;
    private final Duration maxAutoLockRenewal;
    private final MessageLockContainer messageLockContainer;
    private volatile boolean isDone;
    private volatile CoreSubscriber<? super ServiceBusReceivedMessage> downstream;
    private volatile boolean isCancelled;
    volatile Subscription upstream;
    volatile int once;
    volatile int wip;
    volatile long requested;
    volatile Throwable error;
    private static final AtomicReferenceFieldUpdater<ServiceBusMessageProcessor, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, Subscription.class, "upstream");
    static final AtomicIntegerFieldUpdater<ServiceBusMessageProcessor> ONCE = AtomicIntegerFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "once");
    static final AtomicIntegerFieldUpdater<ServiceBusMessageProcessor> WIP = AtomicIntegerFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "wip");
    static final AtomicLongFieldUpdater<ServiceBusMessageProcessor> REQUESTED = AtomicLongFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, "requested");
    static final AtomicReferenceFieldUpdater<ServiceBusMessageProcessor, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(ServiceBusMessageProcessor.class, Throwable.class, "error");
    private final ClientLogger logger = new ClientLogger(ServiceBusMessageProcessor.class);
    private final Deque<ServiceBusReceivedMessage> messageQueue = new ConcurrentLinkedDeque();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusMessageProcessor(boolean z, boolean z2, Duration duration, AmqpRetryOptions amqpRetryOptions, MessageLockContainer messageLockContainer, AmqpErrorContext amqpErrorContext, Function<MessageLockToken, Mono<Void>> function, Function<MessageLockToken, Mono<Void>> function2, Function<MessageLockToken, Mono<Instant>> function3) {
        this.retryOptions = (AmqpRetryOptions) Objects.requireNonNull(amqpRetryOptions, "'retryOptions' cannot be null.");
        this.errorContext = (AmqpErrorContext) Objects.requireNonNull(amqpErrorContext, "'errorContext' cannot be null.");
        this.completeFunction = (Function) Objects.requireNonNull(function, "'onComplete' cannot be null.");
        this.onAbandon = (Function) Objects.requireNonNull(function2, "'onAbandon' cannot be null.");
        this.onRenewLock = (Function) Objects.requireNonNull(function3, "'onRenewLock' cannot be null.");
        this.messageLockContainer = (MessageLockContainer) Objects.requireNonNull(messageLockContainer, "'messageLockContainer' cannot be null.");
        this.isAutoComplete = z;
        this.isAutoRenewLock = z2;
        this.maxAutoLockRenewal = duration;
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null.");
        if (Operators.setOnce(UPSTREAM, this, subscription)) {
            subscription.request(1L);
        } else {
            onError(Operators.onOperatorError(subscription, this.logger.logExceptionAsError(new IllegalStateException("Processor cannot be subscribed to with multiple upstreams.")), Context.empty()));
        }
    }

    public boolean isTerminated() {
        return this.isDone || this.isCancelled;
    }

    public void onNext(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        if (isTerminated()) {
            Operators.onNextDropped(serviceBusReceivedMessage, this.downstream == null ? currentContext() : this.downstream.currentContext());
        } else {
            this.messageQueue.add(serviceBusReceivedMessage);
            drain();
        }
    }

    public void onError(Throwable th) {
        if (this.isDone || this.isCancelled) {
            this.logger.error("Exception occurred from upstream when this is already terminated.", new Object[]{th});
            Operators.onErrorDropped(th, currentContext());
        } else {
            if (Exceptions.addThrowable(ERROR, this, th)) {
                this.isDone = true;
            } else {
                Operators.onErrorDropped(th, currentContext());
            }
            drain();
        }
    }

    public void onComplete() {
        if (this.isDone) {
            return;
        }
        this.isDone = true;
        drain();
    }

    public void request(long j) {
        this.logger.info("Back-pressure request: {}", new Object[]{Long.valueOf(j)});
        if (Operators.validate(j)) {
            Operators.addCap(REQUESTED, this, j);
            if (this.upstream != null) {
                this.upstream.request(j);
            }
            drain();
        }
    }

    public void cancel() {
        if (this.isCancelled) {
            return;
        }
        this.logger.info("Cancelling subscription.");
        this.isCancelled = true;
        drain();
    }

    public void dispose() {
        if (this.isDone) {
            return;
        }
        this.logger.info("Disposing subscription.");
        this.isDone = true;
        drain();
    }

    public boolean isDisposed() {
        return this.isDone || this.isCancelled;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessage> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'downstream' cannot be null.");
        if (this.once != 0 || !ONCE.compareAndSet(this, 0, 1)) {
            Operators.error(coreSubscriber, new IllegalStateException("ServiceBusMessageSubscriber can only have one subscriber."));
            return;
        }
        this.downstream = coreSubscriber;
        coreSubscriber.onSubscribe(this);
        if (this.isCancelled) {
            this.downstream = null;
        } else {
            drain();
        }
    }

    private void drain() {
        if (WIP.compareAndSet(this, 0, 1)) {
            try {
                drainQueue();
                if (WIP.decrementAndGet(this) != 0) {
                    this.logger.warning("There is another worker in drainLoop. But there should only be 1 worker.");
                }
            } catch (Throwable th) {
                if (WIP.decrementAndGet(this) != 0) {
                    this.logger.warning("There is another worker in drainLoop. But there should only be 1 worker.");
                }
                throw th;
            }
        }
    }

    private void drainQueue() {
        if (this.downstream == null) {
            return;
        }
        while (!this.messageQueue.isEmpty()) {
            if (REQUESTED.addAndGet(this, -drainRequested(REQUESTED.get(this))) == 0 || this.isDone) {
                break;
            }
        }
        if (this.isDone) {
            if (this.error != null) {
                this.downstream.onError(this.error);
            } else if (this.messageQueue.peekLast() == null) {
                this.downstream.onComplete();
            } else {
                Operators.onDiscardQueueWithClear(this.messageQueue, this.downstream.currentContext(), (Function) null);
            }
            this.downstream = null;
        }
    }

    private long drainRequested(long j) {
        long j2 = 0;
        if (j == 0) {
            return 0L;
        }
        while (true) {
            if (j2 < j) {
                if (!this.isDone) {
                    ServiceBusReceivedMessage poll = this.messageQueue.poll();
                    if (poll == null) {
                        break;
                    }
                    if (this.isCancelled) {
                        Operators.onDiscard(poll, this.downstream.currentContext());
                        Operators.onDiscardQueueWithClear(this.messageQueue, this.downstream.currentContext(), (Function) null);
                        break;
                    }
                    try {
                        next(poll);
                        j2++;
                    } catch (Exception e) {
                        setInternalError(e);
                    }
                } else {
                    return j2;
                }
            } else {
                break;
            }
        }
        return j2;
    }

    private void next(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        long sequenceNumber = serviceBusReceivedMessage.getSequenceNumber();
        String lockToken = serviceBusReceivedMessage.getLockToken();
        Instant addOrUpdate = !CoreUtils.isNullOrEmpty(lockToken) ? this.messageLockContainer.addOrUpdate(lockToken, serviceBusReceivedMessage.getLockedUntil()) : serviceBusReceivedMessage.getLockedUntil();
        if (this.isAutoComplete && CoreUtils.isNullOrEmpty(lockToken)) {
            throw this.logger.logExceptionAsError(new IllegalStateException("Cannot auto-complete message without a lock token on message. Sequence number: " + sequenceNumber));
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Disposable renewLockOperation = getRenewLockOperation(serviceBusReceivedMessage, addOrUpdate, atomicBoolean);
        try {
            try {
                this.downstream.onNext(serviceBusReceivedMessage);
                renewLockOperation.dispose();
            } catch (Exception e) {
                atomicBoolean.set(true);
                this.logger.error("Exception occurred while handling downstream onNext operation.", new Object[]{e});
                if (this.isAutoComplete) {
                    this.logger.info("Abandoning message lock: {}", new Object[]{lockToken});
                    this.onAbandon.apply(serviceBusReceivedMessage).onErrorContinue((th, obj) -> {
                        this.logger.warning("Could not abandon message with lock: {}", new Object[]{lockToken, th});
                        setInternalError(th);
                    }).doFinally(signalType -> {
                        this.logger.info("lock[{}]. Abandon status: [{}]", new Object[]{lockToken, signalType});
                    }).block(this.retryOptions.getTryTimeout());
                } else {
                    setInternalError(e);
                }
                renewLockOperation.dispose();
            }
            if (!atomicBoolean.get() && this.isAutoComplete) {
                this.logger.info("sequenceNumber[{}]. lock[{}]. Completing message.", new Object[]{Long.valueOf(sequenceNumber), lockToken});
                this.completeFunction.apply(serviceBusReceivedMessage).onErrorResume(th2 -> {
                    this.logger.warning("Could not complete message with lock: {}", new Object[]{lockToken, th2});
                    setInternalError(th2);
                    return Mono.empty();
                }).doFinally(signalType2 -> {
                    this.logger.info("lock[{}]. Complete status: [{}]", new Object[]{lockToken, signalType2});
                }).block(this.retryOptions.getTryTimeout());
            }
        } catch (Throwable th3) {
            renewLockOperation.dispose();
            throw th3;
        }
    }

    private Disposable getRenewLockOperation(ServiceBusReceivedMessage serviceBusReceivedMessage, Instant instant, AtomicBoolean atomicBoolean) {
        if (!this.isAutoRenewLock) {
            return Disposables.disposed();
        }
        long sequenceNumber = serviceBusReceivedMessage.getSequenceNumber();
        String lockToken = serviceBusReceivedMessage.getLockToken();
        if (this.isAutoComplete && instant == null) {
            throw this.logger.logExceptionAsError(new IllegalStateException("Cannot renew lock token without a value for 'message.getLockedUntil()'"));
        }
        Duration between = Duration.between(Instant.now(), instant);
        this.logger.info("lock[{}]. lockedUntil[{}]. interval[{}]", new Object[]{lockToken, instant, between});
        EmitterProcessor create = EmitterProcessor.create();
        FluxSink sink = create.sink(FluxSink.OverflowStrategy.BUFFER);
        sink.next(MessageUtils.adjustServerTimeout(between));
        return Disposables.composite(new Disposable[]{Flux.switchOnNext(create.map(duration -> {
            return Flux.interval(duration);
        })).flatMap(l -> {
            return this.onRenewLock.apply(serviceBusReceivedMessage);
        }).map(instant2 -> {
            Instant addOrUpdate = this.messageLockContainer.addOrUpdate(lockToken, instant2);
            Duration between2 = Duration.between(Instant.now(), addOrUpdate);
            this.logger.info("lockToken[{}]. given[{}]. updated[{}]. Next renewal: [{}]", new Object[]{lockToken, instant2, addOrUpdate, between2});
            sink.next(MessageUtils.adjustServerTimeout(between2));
            return addOrUpdate;
        }).subscribe(instant3 -> {
            this.logger.verbose("seq[{}]. lockToken[{}]. lockedUntil[{}]. Lock renewal successful.", new Object[]{Long.valueOf(sequenceNumber), lockToken, instant3});
        }, th -> {
            this.logger.error("Error occurred while renewing lock token.", new Object[]{th});
            atomicBoolean.set(true);
            setInternalError(th);
        }, () -> {
            this.logger.info("Renewing lock token task completed.");
        }), Mono.delay(this.maxAutoLockRenewal).subscribe(l2 -> {
            if (sink.isCancelled()) {
                return;
            }
            sink.error(new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Could not complete within renewal time. Max renewal time: " + this.maxAutoLockRenewal, this.errorContext));
        })});
    }

    private void setInternalError(Throwable th) {
        if (Exceptions.addThrowable(ERROR, this, th)) {
            this.isDone = true;
        } else {
            Operators.onErrorDropped(th, this.downstream.currentContext());
        }
    }
}
