package com.azure.cosmos.implementation.changefeed.pkversion;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.class */
class PartitionSynchronizerImpl implements PartitionSynchronizer {
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final CosmosAsyncContainer collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;
    private final String collectionResourceId;

    public PartitionSynchronizerImpl(ChangeFeedContextClient changeFeedContextClient, CosmosAsyncContainer cosmosAsyncContainer, LeaseContainer leaseContainer, LeaseManager leaseManager, int i, int i2, String str) {
        this.documentClient = changeFeedContextClient;
        this.collectionSelfLink = cosmosAsyncContainer;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = i;
        this.maxBatchSize = i2;
        this.collectionResourceId = str;
    }

    @Override // com.azure.cosmos.implementation.changefeed.pkversion.PartitionSynchronizer
    public Mono<Void> createMissingLeases() {
        return enumPartitionKeyRanges().map((v0) -> {
            return v0.getId();
        }).collectList().flatMap(list -> {
            return createLeases(new HashSet(list)).then();
        }).onErrorResume(th -> {
            return Mono.empty();
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.pkversion.PartitionSynchronizer
    public Flux<Lease> splitPartition(Lease lease) {
        if (lease == null) {
            throw new IllegalArgumentException("lease");
        }
        String leaseToken = lease.getLeaseToken();
        String continuationToken = lease.getContinuationToken();
        this.logger.info("Partition {} is gone due to split; will attempt to resume using continuation token {}.", leaseToken, continuationToken);
        return enumPartitionKeyRanges().filter(partitionKeyRange -> {
            return (partitionKeyRange == null || partitionKeyRange.getParents() == null || !partitionKeyRange.getParents().contains(leaseToken)) ? false : true;
        }).map((v0) -> {
            return v0.getId();
        }).collectList().flatMapMany(list -> {
            if (list.size() != 0) {
                return Flux.fromIterable(list);
            }
            this.logger.error("Partition {} had split but we failed to find at least one child partition", leaseToken);
            throw new RuntimeException(String.format("Partition %s had split but we failed to find at least one child partition", leaseToken));
        }).flatMap(str -> {
            return this.leaseManager.createLeaseIfNotExist(str, continuationToken);
        }, this.degreeOfParallelism).map(lease2 -> {
            this.logger.info("Partition {} split into new partition and continuation token {}.", new Object[]{leaseToken, lease2.getLeaseToken(), continuationToken});
            return lease2;
        });
    }

    private Flux<PartitionKeyRange> enumPartitionKeyRanges() {
        String extractContainerSelfLink = BridgeInternal.extractContainerSelfLink(this.collectionSelfLink);
        CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
        ModelBridgeInternal.setQueryRequestOptionsContinuationTokenAndMaxItemCount(cosmosQueryRequestOptions, null, Integer.valueOf(this.maxBatchSize));
        return this.documentClient.readPartitionKeyRangeFeed(extractContainerSelfLink, cosmosQueryRequestOptions).map((v0) -> {
            return v0.getResults();
        }).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        }).onErrorResume(th -> {
            return Flux.empty();
        });
    }

    private Flux<Lease> createLeases(Set<String> set) {
        HashSet hashSet = new HashSet(set);
        return this.leaseContainer.getAllLeases().map(lease -> {
            if (lease != null) {
                hashSet.remove(lease.getLeaseToken());
            }
            return lease;
        }).thenMany(Flux.fromIterable(hashSet).flatMap(str -> {
            return this.leaseManager.createLeaseIfNotExist(str, (String) null);
        }, this.degreeOfParallelism).map(lease2 -> {
            return lease2;
        }));
    }
}
