package com.azure.core.amqp.implementation;

import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/azure/core/amqp/implementation/ActiveClientTokenManager.class */
public class ActiveClientTokenManager implements TokenManager {
    private final Mono<ClaimsBasedSecurityNode> cbsNode;
    private final String tokenAudience;
    private final String scopes;
    private volatile Disposable subscription;
    private final ClientLogger logger = new ClientLogger(ActiveClientTokenManager.class);
    private final AtomicBoolean hasScheduled = new AtomicBoolean();
    private final AtomicBoolean hasDisposed = new AtomicBoolean();
    private final ReplayProcessor<AmqpResponseCode> authorizationResults = ReplayProcessor.create(1);
    private final FluxSink<AmqpResponseCode> authorizationResultsSink = this.authorizationResults.sink(FluxSink.OverflowStrategy.BUFFER);
    private final EmitterProcessor<Duration> durationSource = EmitterProcessor.create();
    private final FluxSink<Duration> durationSourceSink = this.durationSource.sink();
    private final AtomicReference<Duration> lastRefreshInterval = new AtomicReference<>(Duration.ofMinutes(1));

    public ActiveClientTokenManager(Mono<ClaimsBasedSecurityNode> mono, String str, String str2) {
        this.cbsNode = mono;
        this.tokenAudience = str;
        this.scopes = str2;
    }

    @Override // com.azure.core.amqp.implementation.TokenManager
    public Flux<AmqpResponseCode> getAuthorizationResults() {
        return this.authorizationResults;
    }

    @Override // com.azure.core.amqp.implementation.TokenManager
    public Mono<Long> authorize() {
        return this.hasDisposed.get() ? Mono.error(new AzureException("Cannot authorize with CBS node when this token manager has been disposed of.")) : this.cbsNode.flatMap(claimsBasedSecurityNode -> {
            return claimsBasedSecurityNode.authorize(this.tokenAudience, this.scopes);
        }).map(offsetDateTime -> {
            long floor = ((long) Math.floor(Duration.between(OffsetDateTime.now(ZoneOffset.UTC), offsetDateTime).getSeconds() * 0.9d)) * 1000;
            if (!this.hasScheduled.getAndSet(true)) {
                this.logger.info("Scheduling refresh token task. scopes[{}]", new Object[]{this.scopes});
                Duration ofMillis = Duration.ofMillis(floor);
                this.lastRefreshInterval.set(ofMillis);
                this.authorizationResultsSink.next(AmqpResponseCode.ACCEPTED);
                this.subscription = scheduleRefreshTokenTask(ofMillis);
            }
            return Long.valueOf(floor);
        });
    }

    @Override // com.azure.core.amqp.implementation.TokenManager, java.lang.AutoCloseable
    public void close() {
        if (this.hasDisposed.getAndSet(true)) {
            return;
        }
        this.authorizationResultsSink.complete();
        this.durationSourceSink.complete();
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    private Disposable scheduleRefreshTokenTask(Duration duration) {
        this.durationSourceSink.next(duration);
        return Flux.switchOnNext(this.durationSource.map(Flux::interval)).flatMap(l -> {
            this.logger.info("Refreshing token. scopes[{}] ", new Object[]{this.scopes});
            return authorize();
        }).onErrorContinue(th -> {
            return (th instanceof AmqpException) && ((AmqpException) th).isTransient();
        }, (th2, obj) -> {
            this.logger.error("Error is transient. Rescheduling authorization task at interval {} ms. scopes[{}]", new Object[]{Long.valueOf(this.lastRefreshInterval.get().toMillis()), this.scopes, th2});
            this.durationSourceSink.next(this.lastRefreshInterval.get());
        }).subscribe(l2 -> {
            this.logger.info("Authorization successful. Refreshing token in {} ms. scopes[{}]", new Object[]{l2, this.scopes});
            this.authorizationResultsSink.next(AmqpResponseCode.ACCEPTED);
            this.lastRefreshInterval.set(Duration.ofMillis(l2.longValue()));
            this.durationSourceSink.next(Duration.ofMillis(l2.longValue()));
        }, th3 -> {
            this.logger.error("Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again. audience[{}] scopes[{}]", new Object[]{this.tokenAudience, this.scopes, th3});
            this.hasScheduled.set(false);
            this.durationSourceSink.complete();
            this.authorizationResultsSink.error(th3);
        }, () -> {
            this.logger.verbose("Completed refresh token task.");
        });
    }
}
