package com.azure.data.cosmos.internal.changefeed.implementation;

import com.azure.data.cosmos.internal.changefeed.CancellationToken;
import com.azure.data.cosmos.internal.changefeed.CancellationTokenSource;
import com.azure.data.cosmos.internal.changefeed.Lease;
import com.azure.data.cosmos.internal.changefeed.LeaseContainer;
import com.azure.data.cosmos.internal.changefeed.LeaseManager;
import com.azure.data.cosmos.internal.changefeed.PartitionController;
import com.azure.data.cosmos.internal.changefeed.PartitionSupervisor;
import com.azure.data.cosmos.internal.changefeed.PartitionSupervisorFactory;
import com.azure.data.cosmos.internal.changefeed.PartitionSynchronizer;
import com.azure.data.cosmos.internal.changefeed.exceptions.LeaseLostException;
import com.azure.data.cosmos.internal.changefeed.exceptions.PartitionSplitException;
import com.azure.data.cosmos.internal.changefeed.exceptions.TaskCancelledException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/data/cosmos/internal/changefeed/implementation/PartitionControllerImpl.class */
class PartitionControllerImpl implements PartitionController {
    private static final Logger logger = LoggerFactory.getLogger(PartitionControllerImpl.class);
    private final Map<String, WorkerTask> currentlyOwnedPartitions = new ConcurrentHashMap();
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final PartitionSupervisorFactory partitionSupervisorFactory;
    private final PartitionSynchronizer synchronizer;
    private CancellationTokenSource shutdownCts;
    private final Scheduler scheduler;

    public PartitionControllerImpl(LeaseContainer leaseContainer, LeaseManager leaseManager, PartitionSupervisorFactory partitionSupervisorFactory, PartitionSynchronizer partitionSynchronizer, Scheduler scheduler) {
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.partitionSupervisorFactory = partitionSupervisorFactory;
        this.synchronizer = partitionSynchronizer;
        this.scheduler = scheduler;
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionController
    public Mono<Void> initialize() {
        this.shutdownCts = new CancellationTokenSource();
        return loadLeases();
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionController
    public synchronized Mono<Lease> addOrUpdateLease(Lease lease) {
        WorkerTask workerTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
        return (workerTask == null || !workerTask.isRunning()) ? this.leaseManager.acquire(lease).defaultIfEmpty(lease).map(lease2 -> {
            logger.info("Partition {}: acquired.", lease2.getLeaseToken());
            this.currentlyOwnedPartitions.put(lease2.getLeaseToken(), processPartition(this.partitionSupervisorFactory.create(lease2), lease2));
            return lease2;
        }).onErrorResume(th -> {
            logger.warn("Partition {}: unexpected error; removing lease from current cache.", lease.getLeaseToken());
            return removeLease(lease).then(Mono.error(th));
        }) : this.leaseManager.updateProperties(lease).map(lease3 -> {
            logger.debug("Partition {}: updated.", lease3.getLeaseToken());
            return lease3;
        });
    }

    @Override // com.azure.data.cosmos.internal.changefeed.PartitionController
    public Mono<Void> shutdown() {
        this.shutdownCts.cancel();
        return Mono.empty();
    }

    private Mono<Void> loadLeases() {
        logger.debug("Starting renew leases assigned to this host on initialize.");
        return this.leaseContainer.getOwnedLeases().flatMap(lease -> {
            logger.info("Acquired lease for PartitionId '{}' on startup.", lease.getLeaseToken());
            return addOrUpdateLease(lease);
        }).then();
    }

    private Mono<Void> removeLease(Lease lease) {
        if (this.currentlyOwnedPartitions.get(lease.getLeaseToken()) != null) {
            WorkerTask remove = this.currentlyOwnedPartitions.remove(lease.getLeaseToken());
            if (remove.isRunning()) {
                remove.interrupt();
            }
            logger.info("Partition {}: released.", lease.getLeaseToken());
        }
        return this.leaseManager.release(lease).onErrorResume(th -> {
            if (th instanceof LeaseLostException) {
                logger.warn("Partition {}: lease already removed.", lease.getLeaseToken());
            } else {
                logger.warn("Partition {}: failed to remove lease.", lease.getLeaseToken(), th);
            }
            return Mono.empty();
        }).doOnSuccess(r5 -> {
            logger.info("Partition {}: successfully removed lease.", lease.getLeaseToken());
        });
    }

    private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) {
        CancellationToken token = this.shutdownCts.getToken();
        WorkerTask workerTask = new WorkerTask(lease, () -> {
            partitionSupervisor.run(token).onErrorResume(th -> {
                if (th instanceof PartitionSplitException) {
                    return handleSplit(lease, ((PartitionSplitException) th).getLastContinuation());
                }
                if (th instanceof TaskCancelledException) {
                    logger.debug("Partition {}: processing canceled.", lease.getLeaseToken());
                } else {
                    logger.warn("Partition {}: processing failed.", lease.getLeaseToken(), th);
                }
                return Mono.empty();
            }).then(removeLease(lease)).subscribe();
        });
        this.scheduler.schedule(workerTask);
        return workerTask;
    }

    private Mono<Void> handleSplit(Lease lease, String str) {
        lease.setContinuationToken(str);
        return this.synchronizer.splitPartition(lease).flatMap(lease2 -> {
            lease2.setProperties(lease.getProperties());
            return addOrUpdateLease(lease2);
        }).then(this.leaseManager.delete(lease)).onErrorResume(th -> {
            logger.warn("Partition {}: failed to split", lease.getLeaseToken(), th);
            return Mono.empty();
        });
    }
}
