/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.diagnostics;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.DiagnosticsResult;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.HealthPinger;
import com.couchbase.client.core.diagnostics.PingResult;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.service.ServiceType;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Stability.Internal
public class WaitUntilReadyHelper {
    @Stability.Internal
    public static CompletableFuture<Void> waitUntilReady(Core core, Set<ServiceType> serviceTypes, Duration timeout, ClusterState desiredState, Optional<String> bucketName) {
        boolean hasChance;
        boolean bl = hasChance = core.clusterConfig().hasClusterOrBucketConfig() || core.configurationProvider().globalConfigLoadInProgress() || core.configurationProvider().bucketConfigLoadInProgress();
        if (!hasChance) {
            CompletableFuture<Void> f = new CompletableFuture<Void>();
            f.completeExceptionally(new IllegalStateException("Against pre 6.5 clusters at least a bucket needs to be opened!"));
            return f;
        }
        return Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).filter(i -> !core.configurationProvider().bucketConfigLoadInProgress() && !core.configurationProvider().globalConfigLoadInProgress() && (!bucketName.isPresent() || !core.configurationProvider().collectionMapRefreshInProgress())).take(1L).flatMap(aLong -> {
            Set<ServiceType> servicesToCheck = serviceTypes != null && !serviceTypes.isEmpty() ? serviceTypes : HealthPinger.extractPingTargets(core.clusterConfig(), bucketName).stream().map(HealthPinger.PingTarget::serviceType).collect(Collectors.toSet());
            Flux diagnostics = Flux.interval((Duration)Duration.ofMillis(10L), (Scheduler)core.context().environment().scheduler()).map(i -> WaitUntilReadyHelper.diagnosticsCurrentState(core)).takeUntil(s -> s == desiredState);
            return Flux.concat((Publisher[])new Publisher[]{WaitUntilReadyHelper.ping(core, servicesToCheck, timeout), diagnostics});
        }).timeout(timeout, (Publisher)Mono.defer(() -> Mono.error((Throwable)new UnambiguousTimeoutException("WaitUntilReady timed out", null))), core.context().environment().scheduler()).then().toFuture();
    }

    private static ClusterState diagnosticsCurrentState(Core core) {
        return DiagnosticsResult.aggregateClusterState(core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)).values());
    }

    private static Flux<PingResult> ping(Core core, Set<ServiceType> serviceTypes, Duration timeout) {
        return HealthPinger.ping(core, Optional.of(timeout), core.context().environment().retryStrategy(), serviceTypes, Optional.empty(), Optional.empty()).flux();
    }
}

