/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.util.polling;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.polling.AsyncPollResponse;
import com.azure.core.util.polling.DefaultSyncPoller;
import com.azure.core.util.polling.LongRunningOperationStatus;
import com.azure.core.util.polling.PollResponse;
import com.azure.core.util.polling.PollingContext;
import com.azure.core.util.polling.SyncPoller;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class PollerFlux<T, U>
extends Flux<AsyncPollResponse<T, U>> {
    private final ClientLogger logger = new ClientLogger(PollerFlux.class);
    private final PollingContext<T> rootContext = new PollingContext();
    private final Duration defaultPollInterval;
    private final Function<PollingContext<T>, Mono<T>> activationOperation;
    private final Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation;
    private final BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation;
    private final Function<PollingContext<T>, Mono<U>> fetchResultOperation;
    private final Mono<Boolean> oneTimeActivationMono;
    private volatile boolean activated = false;
    private volatile int activationGuardFlag = 0;
    private final AtomicIntegerFieldUpdater<PollerFlux> guardActivationCall = AtomicIntegerFieldUpdater.newUpdater(PollerFlux.class, "activationGuardFlag");

    public PollerFlux(Duration defaultPollInterval, Function<PollingContext<T>, Mono<T>> activationOperation, Function<PollingContext<T>, Mono<PollResponse<T>>> pollOperation, BiFunction<PollingContext<T>, PollResponse<T>, Mono<T>> cancelOperation, Function<PollingContext<T>, Mono<U>> fetchResultOperation) {
        Objects.requireNonNull(defaultPollInterval, "'defaultPollInterval' cannot be null.");
        if (defaultPollInterval.compareTo(Duration.ZERO) <= 0) {
            throw this.logger.logExceptionAsWarning(new IllegalArgumentException("Negative or zero value for 'defaultPollInterval' is not allowed."));
        }
        this.defaultPollInterval = defaultPollInterval;
        this.activationOperation = Objects.requireNonNull(activationOperation, "'activationOperation' cannot be null.");
        this.oneTimeActivationMono = this.oneTimeActivationMono(activationOperation);
        this.pollOperation = Objects.requireNonNull(pollOperation, "'pollOperation' cannot be null.");
        this.cancelOperation = Objects.requireNonNull(cancelOperation, "'cancelOperation' cannot be null.");
        this.fetchResultOperation = Objects.requireNonNull(fetchResultOperation, "'fetchResultOperation' cannot be null.");
    }

    public void subscribe(CoreSubscriber<? super AsyncPollResponse<T, U>> actual) {
        this.oneTimeActivationMono.flatMapMany(ignored -> this.pollingLoop()).subscribe(actual);
    }

    public SyncPoller<T, U> getSyncPoller() {
        return new DefaultSyncPoller<T, U>(this.defaultPollInterval, this.activationOperation, this.pollOperation, this.cancelOperation, this.fetchResultOperation);
    }

    private Mono<Boolean> oneTimeActivationMono(Function<PollingContext<T>, Mono<T>> activationOperation) {
        return Mono.defer(() -> {
            if (this.activated) {
                return Mono.just((Object)true);
            }
            if (this.guardActivationCall.compareAndSet(this, 0, 1)) {
                Mono<T> activationMono;
                try {
                    activationMono = this.activationOperation.apply(this.rootContext);
                }
                catch (RuntimeException e) {
                    this.guardActivationCall.compareAndSet(this, 1, 0);
                    return FluxUtil.monoError(this.logger, e);
                }
                return activationMono.map(result -> new PollResponse<Object>(LongRunningOperationStatus.NOT_STARTED, result)).switchIfEmpty(Mono.defer(() -> Mono.just(new PollResponse<Object>(LongRunningOperationStatus.NOT_STARTED, null)))).map(activationResponse -> {
                    this.rootContext.setOnetimeActivationResponse((PollResponse<T>)activationResponse);
                    this.activated = true;
                    return true;
                }).doOnError(throwable -> this.guardActivationCall.compareAndSet(this, 1, 0));
            }
            return Mono.empty();
        }).repeatWhenEmpty(longFlux -> longFlux.concatMap(ignored -> Flux.just((Object)true)));
    }

    private Flux<AsyncPollResponse<T, U>> pollingLoop() {
        return Flux.using(() -> this.rootContext.copy(), cxt -> Mono.defer(() -> this.pollOperation.apply((PollingContext<PollingContext>)cxt)).delaySubscription(this.getDelay(cxt.getLatestResponse())).switchIfEmpty(Mono.error((Throwable)new IllegalStateException("PollOperation returned Mono.empty()."))).repeat().takeUntil(currentPollResponse -> currentPollResponse.getStatus().isComplete()).onErrorResume(throwable -> {
            this.logger.warning("Received an error from pollOperation. Any error from pollOperation will be ignored and polling will be continued. Error:" + throwable.getMessage(), new Object[0]);
            return Mono.empty();
        }).concatMap(currentPollResponse -> {
            cxt.setLatestResponse(currentPollResponse);
            return Mono.just(new AsyncPollResponse<T, U>(cxt, this.cancelOperation, this.fetchResultOperation));
        }), cxt -> {});
    }

    private Duration getDelay(PollResponse<T> pollResponse) {
        Duration retryAfter = pollResponse.getRetryAfter();
        if (retryAfter == null) {
            return this.defaultPollInterval;
        }
        return retryAfter.compareTo(Duration.ZERO) > 0 ? retryAfter : this.defaultPollInterval;
    }
}

