/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

@Deprecated
final class AutoDispositionLockRenew
extends FluxOperator<ServiceBusReceivedMessage, ServiceBusReceivedMessage> {
    private final ClientLogger logger;
    private final ServiceBusReceiverAsyncClient client;
    private final boolean enableAutoDisposition;
    private final boolean enableAutoLockRenew;
    private final Semaphore dispositionLock;

    AutoDispositionLockRenew(Flux<? extends ServiceBusReceivedMessage> upstream, ServiceBusReceiverAsyncClient client, boolean enableAutoDisposition, boolean enableAutoLockRenew, Semaphore dispositionLock) {
        super(upstream);
        HashMap<String, String> loggingContext = new HashMap<String, String>(2);
        loggingContext.put("namespace", client.getFullyQualifiedNamespace());
        loggingContext.put("entityPath", client.getEntityPath());
        this.logger = new ClientLogger(AutoDispositionLockRenew.class, loggingContext);
        this.client = client;
        this.enableAutoDisposition = enableAutoDisposition;
        this.enableAutoLockRenew = enableAutoLockRenew;
        this.dispositionLock = dispositionLock;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessage> actual) {
        Objects.requireNonNull(actual, "'actual' cannot be null.");
        this.source.subscribe((CoreSubscriber)new Subscriber(this.logger, this.client, this.enableAutoDisposition, this.enableAutoLockRenew, this.dispositionLock, actual));
    }

    private static final class Subscriber
    extends BaseSubscriber<ServiceBusReceivedMessage> {
        private final ClientLogger logger;
        private final ServiceBusReceiverAsyncClient client;
        private final boolean enableAutoDisposition;
        private final boolean enableAutoLockRenew;
        private final Semaphore dispositionLock;
        private final CoreSubscriber<? super ServiceBusReceivedMessage> downstream;

        Subscriber(ClientLogger logger, ServiceBusReceiverAsyncClient client, boolean enableAutoDisposition, boolean enableAutoLockRenew, Semaphore dispositionLock, CoreSubscriber<? super ServiceBusReceivedMessage> downstream) {
            this.logger = logger;
            this.client = client;
            this.enableAutoDisposition = enableAutoDisposition;
            this.enableAutoLockRenew = enableAutoLockRenew;
            this.dispositionLock = dispositionLock;
            this.downstream = downstream;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.logger.atInfo().log("Subscription received. Subscribing downstream. {}", new Object[]{subscription});
            this.downstream.onSubscribe((Subscription)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void hookOnNext(ServiceBusReceivedMessage message) {
            Disposable lockRenewDisposable = this.enableAutoLockRenew ? this.client.beginLockRenewal(message) : Disposables.disposed();
            String seqNumber = message != null ? String.valueOf(message.getSequenceNumber()) : "n/a";
            this.logger.atVerbose().addKeyValue("sequenceNumber", seqNumber).log("onNext: Passing message downstream.");
            if (this.enableAutoDisposition) {
                try {
                    this.dispositionLock.acquire();
                }
                catch (InterruptedException e) {
                    this.logger.atInfo().addKeyValue("sequenceNumber", seqNumber).log("Unable to acquire dispositionLock.", new Object[]{e});
                }
            }
            try {
                this.downstream.onNext((Object)message);
                this.disposition(message, seqNumber, true);
            }
            catch (Exception e) {
                this.logger.atError().addKeyValue("sequenceNumber", seqNumber).log("Error occurred when downstream processing message.", new Object[]{e});
                this.disposition(message, seqNumber, false);
            }
            finally {
                if (this.enableAutoDisposition) {
                    this.dispositionLock.release();
                    lockRenewDisposable.dispose();
                }
                this.logger.atVerbose().addKeyValue("sequenceNumber", seqNumber).log("onNext: Finished.");
            }
        }

        protected void hookOnError(Throwable throwable) {
            this.logger.atInfo().log("Propagating upstream error signal to downstream.", new Object[]{throwable});
            this.downstream.onError(throwable);
        }

        protected void hookOnComplete() {
            this.logger.atInfo().log("Propagating upstream completion signal to downstream.");
            this.downstream.onComplete();
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        private void disposition(ServiceBusReceivedMessage message, String seqNumber, boolean isComplete) {
            if (!this.enableAutoDisposition) {
                return;
            }
            if (message != null && message.isSettled()) {
                return;
            }
            try {
                if (isComplete) {
                    this.client.complete(message).block();
                } else {
                    this.client.abandon(message).block();
                }
            }
            catch (Exception e) {
                this.logger.atWarning().addKeyValue("sequenceNumber", seqNumber).log("Unable to '{}' message, cancelling the message streaming.", new Object[]{isComplete ? "Complete" : "Abandon", e});
                this.upstream().cancel();
                this.onError(e);
            }
        }
    }
}

