/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.WindowedSubscriber;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

final class SynchronousReceiver {
    private static final String TERMINAL_MESSAGE = "The receiver client is terminated. Re-create the client to continue receive attempt.";
    private static final WindowedSubscriber<ServiceBusReceivedMessage> DISPOSED = (WindowedSubscriber)Flux.error((Throwable)new RuntimeException("Disposed.")).subscribeWith((Subscriber)new WindowedSubscriber(new HashMap(0), TERMINAL_MESSAGE, new WindowedSubscriber.WindowedSubscriberOptions()));
    private static final String ENTITY_PATH_KEY = "entityPath";
    private static final String SYNC_RECEIVE_SPAN_NAME = "ServiceBus.receiveMessages";
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000L);
    private final ClientLogger logger;
    private final ServiceBusReceiverAsyncClient asyncClient;
    private final ServiceBusTracer tracer;
    private final AtomicReference<WindowedSubscriber<ServiceBusReceivedMessage>> subscriber = new AtomicReference<Object>(null);

    SynchronousReceiver(ClientLogger logger, ServiceBusReceiverAsyncClient asyncClient) {
        this.logger = Objects.requireNonNull(logger, "'logger' cannot be null.");
        this.asyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null.");
        this.tracer = asyncClient.getInstrumentation().getTracer();
    }

    IterableStream<ServiceBusReceivedMessage> receive(int maxMessages, Duration maxWaitTime) {
        WindowedSubscriber<ServiceBusReceivedMessage> s = this.subscriber.get();
        if (s != null) {
            return s.enqueueRequest(maxMessages, maxWaitTime);
        }
        return this.subscribeOnce().enqueueRequest(maxMessages, maxWaitTime);
    }

    void dispose() {
        WindowedSubscriber<ServiceBusReceivedMessage> s = this.subscriber.getAndSet(DISPOSED);
        if (s != null) {
            s.dispose();
        }
    }

    private WindowedSubscriber<ServiceBusReceivedMessage> subscribeOnce() {
        if (!this.asyncClient.isV2()) {
            throw this.logger.logExceptionAsError((RuntimeException)new UnsupportedOperationException("SynchronousReceiver requires v2 mode."));
        }
        WindowedSubscriber<ServiceBusReceivedMessage> s = this.createSubscriber();
        if (this.subscriber.compareAndSet(null, s)) {
            Flux<ServiceBusReceivedMessage> upstream = this.asyncClient.isSessionEnabled() ? this.asyncClient.sessionSyncReceiveV2() : this.asyncClient.nonSessionSyncReceiveV2();
            upstream.subscribeWith(s);
        }
        return this.subscriber.get();
    }

    private WindowedSubscriber<ServiceBusReceivedMessage> createSubscriber() {
        String entityPath = this.asyncClient.getEntityPath();
        ReceiverOptions receiverOptions = this.asyncClient.getReceiverOptions();
        boolean isPeekLockMode = receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK;
        boolean isPrefetchDisabled = receiverOptions.getPrefetchCount() == 0;
        WindowedSubscriber.WindowedSubscriberOptions options = new WindowedSubscriber.WindowedSubscriberOptions();
        if (isPeekLockMode && isPrefetchDisabled) {
            options.setReleaser(this::messageReleaser);
        }
        options.setWindowDecorator(this::traceDecorator);
        options.setNextItemTimeout(TIMEOUT_BETWEEN_MESSAGES);
        return new WindowedSubscriber(Collections.singletonMap(ENTITY_PATH_KEY, entityPath), TERMINAL_MESSAGE, options);
    }

    private void messageReleaser(ServiceBusReceivedMessage message) {
        this.asyncClient.release(message).subscribe(__ -> {}, error -> this.logger.atWarning().addKeyValue("lockToken", message.getLockToken()).log("couldn't release the message.", new Object[]{error}), () -> this.logger.atVerbose().addKeyValue("lockToken", message.getLockToken()).log("message successfully released."));
    }

    private Flux<ServiceBusReceivedMessage> traceDecorator(Flux<ServiceBusReceivedMessage> toDecorate) {
        Flux<ServiceBusReceivedMessage> decorated = this.tracer.traceSyncReceive(SYNC_RECEIVE_SPAN_NAME, toDecorate);
        MessageUtils.subscribe(decorated);
        return decorated;
    }
}

