package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.util.Beta;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;

/* loaded from: input_file:com/azure/cosmos/util/CosmosPagedFlux.class */
public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, FeedResponse<T>> {
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final Consumer<FeedResponse<T>> feedResponseConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function) {
        this.optionsFluxFunction = function;
        this.feedResponseConsumer = null;
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function, Consumer<FeedResponse<T>> consumer) {
        this.optionsFluxFunction = function;
        this.feedResponseConsumer = consumer;
    }

    @Beta(Beta.SinceVersion.V4_6_0)
    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> consumer) {
        return new CosmosPagedFlux<>(this.optionsFluxFunction, consumer);
    }

    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(str);
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(int i) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str, int i) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(str);
        cosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        byPage().flatMap(feedResponse -> {
            IterableStream<T> elements = feedResponse.getElements();
            return elements == null ? Flux.empty() : Flux.fromIterable(elements);
        }).subscribe(coreSubscriber);
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions cosmosPagedFluxOptions, Context context) {
        AtomicReference atomicReference = new AtomicReference(Context.NONE);
        return this.optionsFluxFunction.apply(cosmosPagedFluxOptions).doOnSubscribe(subscription -> {
            if (cosmosPagedFluxOptions.getTracerProvider().isEnabled()) {
                atomicReference.set(cosmosPagedFluxOptions.getTracerProvider().startSpan(cosmosPagedFluxOptions.getTracerSpanName(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getServiceEndpoint(), context));
            }
        }).doOnComplete(() -> {
            if (cosmosPagedFluxOptions.getTracerProvider().isEnabled()) {
                cosmosPagedFluxOptions.getTracerProvider().endSpan((Context) atomicReference.get(), Signal.complete(), HttpConstants.StatusCodes.OK);
            }
        }).doOnError(th -> {
            if (cosmosPagedFluxOptions.getTracerProvider().isEnabled()) {
                cosmosPagedFluxOptions.getTracerProvider().endSpan((Context) atomicReference.get(), Signal.error(th), 0);
            }
        }).doOnNext(feedResponse -> {
            if (this.feedResponseConsumer != null) {
                this.feedResponseConsumer.accept(feedResponse);
            }
        });
    }
}
