/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ConnectionCacheWrapper;
import com.azure.messaging.servicebus.LockRenewalOperation;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

final class ServiceBusSessionAcquirer {
    private static final String TRACKING_ID_KEY = "trackingId";
    private final ClientLogger logger;
    private final String identifier;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final Duration tryTimeout;
    private final boolean timeoutRetryDisabled;
    private final ServiceBusReceiveMode receiveMode;
    private final ConnectionCacheWrapper connectionCacheWrapper;
    private final Mono<ServiceBusManagementNode> sessionManagement;

    ServiceBusSessionAcquirer(ClientLogger logger, String identifier, String entityPath, MessagingEntityType entityType, ServiceBusReceiveMode receiveMode, Duration tryTimeout, boolean timeoutRetryDisabled, ConnectionCacheWrapper connectionCacheWrapper) {
        assert (connectionCacheWrapper.isV2());
        this.logger = logger;
        this.identifier = identifier;
        this.entityPath = entityPath;
        this.entityType = entityType;
        this.tryTimeout = tryTimeout;
        this.timeoutRetryDisabled = timeoutRetryDisabled;
        this.receiveMode = receiveMode;
        this.connectionCacheWrapper = connectionCacheWrapper;
        this.sessionManagement = connectionCacheWrapper.getConnection().flatMap(connection -> connection.getManagementNode(entityPath, entityType));
    }

    boolean isConnectionClosed() {
        return this.connectionCacheWrapper.isChannelClosed();
    }

    Mono<Session> acquire() {
        return this.acquireIntern(null);
    }

    Mono<Session> acquire(String sessionId) {
        Objects.requireNonNull(sessionId, "sessionId cannot be null.");
        return this.acquireIntern(sessionId);
    }

    private Mono<Session> acquireIntern(String sessionId) {
        if (this.timeoutRetryDisabled) {
            return this.acquireSession(sessionId).onErrorResume(t -> {
                if (ServiceBusSessionAcquirer.isBrokerTimeoutError(t)) {
                    Throwable e = new TimeoutException("com.microsoft:timeout").initCause((Throwable)t);
                    return this.publishError(sessionId, e, false);
                }
                return this.publishError(sessionId, (Throwable)t, true);
            });
        }
        return this.acquireSession(sessionId).timeout(this.tryTimeout).retryWhen(Retry.from(signals -> signals.flatMap(signal -> {
            Throwable t = signal.failure();
            if (ServiceBusSessionAcquirer.isTimeoutError(t)) {
                this.logger.atVerbose().addKeyValue("entityPath", this.entityPath).addKeyValue("attempt", signal.totalRetriesInARow()).log("Timeout while acquiring session '{}'.", new Object[]{ServiceBusSessionAcquirer.sessionName(sessionId), t});
                return Mono.delay((Duration)Duration.ZERO);
            }
            return this.publishError(sessionId, t, true);
        })));
    }

    private Mono<Session> acquireSession(String sessionId) {
        return Mono.defer(() -> {
            Mono createLink = this.connectionCacheWrapper.getConnection().flatMap(connection -> connection.createReceiveLink(ServiceBusSessionAcquirer.linkName(sessionId), this.entityPath, this.receiveMode, null, this.entityType, this.identifier, sessionId));
            return createLink.flatMap(link -> link.getSessionProperties().flatMap(sessionProperties -> Mono.just((Object)new Session((ServiceBusReceiveLink)link, (ServiceBusReceiveLink.SessionProperties)sessionProperties, this.sessionManagement))));
        });
    }

    private <T> Mono<T> publishError(String sessionId, Throwable t, boolean logAtInfo) {
        long id = System.nanoTime();
        if (logAtInfo) {
            this.logger.atInfo().addKeyValue("entityPath", this.entityPath).addKeyValue(TRACKING_ID_KEY, id).log("Unable to acquire session '{}'.", new Object[]{ServiceBusSessionAcquirer.sessionName(sessionId), t});
        }
        return Mono.error((Throwable)t).publishOn(Schedulers.boundedElastic()).doOnError(ignored -> this.logger.atVerbose().addKeyValue(TRACKING_ID_KEY, id).log("Emitting session acquire error" + (logAtInfo ? "." : ": " + t.getMessage())));
    }

    private static boolean isBrokerTimeoutError(Throwable t) {
        return t instanceof AmqpException && ((AmqpException)t).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR;
    }

    private static boolean isTimeoutError(Throwable t) {
        return t instanceof TimeoutException || ServiceBusSessionAcquirer.isBrokerTimeoutError(t);
    }

    private static String linkName(String sessionId) {
        return sessionId != null ? sessionId : StringUtil.getRandomString((String)"session-");
    }

    private static String sessionName(String sessionId) {
        return sessionId == null ? "unnamed" : sessionId;
    }

    static final class Session {
        private final ServiceBusReceiveLink link;
        private final ServiceBusReceiveLink.SessionProperties properties;
        private final Mono<ServiceBusManagementNode> sessionManagement;

        Session(ServiceBusReceiveLink sessionLink, ServiceBusReceiveLink.SessionProperties sessionProperties, Mono<ServiceBusManagementNode> sessionManagement) {
            this.link = Objects.requireNonNull(sessionLink, "sessionLink cannot be null.");
            this.properties = Objects.requireNonNull(sessionProperties, "sessionProperties cannot be null.");
            this.sessionManagement = Objects.requireNonNull(sessionManagement, "sessionManagement cannot be null.");
        }

        String getId() {
            return this.properties.getId();
        }

        ServiceBusReceiveLink getLink() {
            return this.link;
        }

        Disposable beginLockRenew(ServiceBusTracer tracer, Duration maxSessionLockRenew) {
            String sessionId = this.properties.getId();
            Function<String, Mono<OffsetDateTime>> lockRenewFunc = __ -> this.sessionManagement.flatMap(mgmt -> {
                Mono<OffsetDateTime> renewLock = mgmt.renewSessionLock(sessionId, this.link.getLinkName());
                return tracer.traceMono("ServiceBus.renewSessionLock", renewLock);
            });
            OffsetDateTime initialLockedUntil = this.properties.getLockedUntil();
            LockRenewalOperation recurringLockRenew = new LockRenewalOperation(sessionId, maxSessionLockRenew, true, lockRenewFunc, initialLockedUntil);
            return recurringLockRenew;
        }
    }
}

