package com.azure.cosmos.implementation.batch;

import com.azure.core.util.FluxUtil;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosDaemonThreadFactory;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosItemOperationType;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.function.Tuple2;

/* loaded from: input_file:com/azure/cosmos/implementation/batch/BulkExecutor.class */
public final class BulkExecutor<TContext> implements Disposable {
    private static final Logger logger = LoggerFactory.getLogger(BulkExecutor.class);
    private static final AtomicLong instanceCount = new AtomicLong(0);
    private final CosmosAsyncContainer container;
    private final AsyncDocumentClient docClientWrapper;
    private final String operationContextText;
    private final OperationContextAndListenerTuple operationListener;
    private final ThrottlingRetryOptions throttlingRetryOptions;
    private final Flux<CosmosItemOperation> inputOperations;
    private final Long maxMicroBatchIntervalInMs;
    private final TContext batchContext;
    private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
    private final CosmosBulkExecutionOptions cosmosBulkExecutionOptions;
    private final AtomicBoolean mainSourceCompleted;
    private final AtomicInteger totalCount;
    private final Sinks.EmitFailureHandler serializedEmitFailureHandler;
    private final Sinks.Many<CosmosItemOperation> mainSink;
    private final List<FluxSink<CosmosItemOperation>> groupSinks;
    private final ScheduledThreadPoolExecutor executorService;
    private final CosmosAsyncClient cosmosClient;
    private final String bulkSpanName;
    private ScheduledFuture<?> scheduledFutureForFlush;
    private final AtomicBoolean isDisposed = new AtomicBoolean(false);
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final String identifier = "BulkExecutor-" + instanceCount.incrementAndGet();

    /* loaded from: input_file:com/azure/cosmos/implementation/batch/BulkExecutor$SerializedEmitFailureHandler.class */
    private class SerializedEmitFailureHandler implements Sinks.EmitFailureHandler {
        private SerializedEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            if (emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED)) {
                BulkExecutor.logger.debug("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", signalType, emitResult);
                return true;
            }
            BulkExecutor.logger.error("SerializedEmitFailureHandler.onEmitFailure - Signal:{}, Result: {}", signalType, emitResult);
            return false;
        }
    }

    public BulkExecutor(CosmosAsyncContainer cosmosAsyncContainer, Flux<CosmosItemOperation> flux, CosmosBulkExecutionOptions cosmosBulkExecutionOptions) {
        Preconditions.checkNotNull(cosmosAsyncContainer, "expected non-null container");
        Preconditions.checkNotNull(flux, "expected non-null inputOperations");
        Preconditions.checkNotNull(cosmosBulkExecutionOptions, "expected non-null bulkOptions");
        this.cosmosBulkExecutionOptions = cosmosBulkExecutionOptions;
        this.container = cosmosAsyncContainer;
        this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
        this.inputOperations = flux;
        this.docClientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncContainer.getDatabase());
        this.cosmosClient = ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor().getCosmosAsyncClient(cosmosAsyncContainer.getDatabase());
        this.throttlingRetryOptions = this.docClientWrapper.getConnectionPolicy().getThrottlingRetryOptions();
        this.maxMicroBatchIntervalInMs = Long.valueOf(ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getMaxMicroBatchInterval(this.cosmosBulkExecutionOptions).toMillis());
        this.batchContext = (TContext) ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getLegacyBatchScopedContext(this.cosmosBulkExecutionOptions);
        this.partitionScopeThresholds = ImplementationBridgeHelpers.CosmosBulkExecutionThresholdsStateHelper.getBulkExecutionThresholdsAccessor().getPartitionScopeThresholds(this.cosmosBulkExecutionOptions.getThresholdsState());
        this.operationListener = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getOperationContext(this.cosmosBulkExecutionOptions);
        if (this.operationListener == null || this.operationListener.getOperationContext() == null) {
            this.operationContextText = this.identifier + "[n/a]";
        } else {
            this.operationContextText = this.identifier + "[" + this.operationListener.getOperationContext().toString() + "]";
        }
        this.mainSourceCompleted = new AtomicBoolean(false);
        this.totalCount = new AtomicInteger(0);
        this.serializedEmitFailureHandler = new SerializedEmitFailureHandler();
        this.mainSink = Sinks.many().unicast().onBackpressureBuffer();
        this.groupSinks = new CopyOnWriteArrayList();
        this.executorService = new ScheduledThreadPoolExecutor(1, new CosmosDaemonThreadFactory(this.identifier));
        this.executorService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.executorService.setRemoveOnCancelPolicy(true);
        this.scheduledFutureForFlush = this.executorService.scheduleWithFixedDelay(this::onFlush, this.maxMicroBatchIntervalInMs.longValue(), this.maxMicroBatchIntervalInMs.longValue(), TimeUnit.MILLISECONDS);
        logger.debug("Instantiated BulkExecutor, Context: {}", this.operationContextText);
    }

    public void dispose() {
        if (this.isDisposed.compareAndSet(false, true)) {
            if (this.totalCount.get() == 0) {
                completeAllSinks();
            } else {
                shutdown();
            }
        }
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    private void cancelFlushTask() {
        ScheduledFuture<?> scheduledFuture = this.scheduledFutureForFlush;
        if (scheduledFuture != null) {
            try {
                scheduledFuture.cancel(true);
                logger.debug("Cancelled all future scheduled tasks {}, Context: {}", getThreadInfo(), this.operationContextText);
            } catch (Exception e) {
                logger.warn("Failed to cancel scheduled tasks{}, Context: {}", new Object[]{getThreadInfo(), this.operationContextText, e});
            }
        }
    }

    private void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            logger.debug("Shutting down, Context: {}", this.operationContextText);
            this.groupSinks.forEach((v0) -> {
                v0.complete();
            });
            logger.debug("All group sinks completed, Context: {}", this.operationContextText);
            cancelFlushTask();
            try {
                logger.debug("Shutting down the executor service, Context: {}", this.operationContextText);
                this.executorService.shutdownNow();
                logger.debug("Successfully shut down the executor service, Context: {}", this.operationContextText);
            } catch (Exception e) {
                logger.warn("Failed to shut down the executor service, Context: {}", this.operationContextText, e);
            }
        }
    }

    public Flux<CosmosBulkOperationResponse<TContext>> execute() {
        return executeCore().doFinally(signalType -> {
            if (signalType == SignalType.ON_COMPLETE) {
                logger.debug("BulkExecutor.execute flux completed - # left items {}, Context: {}, {}", new Object[]{Integer.valueOf(this.totalCount.get()), this.operationContextText, getThreadInfo()});
            } else {
                int i = this.totalCount.get();
                if (i > 0) {
                    logger.info("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", new Object[]{signalType, Integer.valueOf(i), this.operationContextText, getThreadInfo()});
                } else {
                    logger.debug("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", new Object[]{signalType, Integer.valueOf(i), this.operationContextText, getThreadInfo()});
                }
            }
            dispose();
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeCore() {
        Integer maxConcurrentCosmosPartitions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getMaxConcurrentCosmosPartitions(this.cosmosBulkExecutionOptions);
        return (maxConcurrentCosmosPartitions != null ? Mono.just(Integer.valueOf(Math.max(256, maxConcurrentCosmosPartitions.intValue()))) : this.container.getFeedRanges().map(list -> {
            return Integer.valueOf(Math.max(256, list.size() * 2));
        })).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMapMany(num -> {
            logger.debug("BulkExecutor.execute with MaxConcurrentPartitions: {}, Context: {}", num, this.operationContextText);
            return this.inputOperations.publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).onErrorContinue((th, obj) -> {
                logger.error("Skipping an error operation while processing {}. Cause: {}, Context: {}", new Object[]{obj, th.getMessage(), this.operationContextText});
            }).doOnNext(cosmosItemOperation -> {
                BulkExecutorUtil.setRetryPolicyForBulk(this.docClientWrapper, this.container, cosmosItemOperation, this.throttlingRetryOptions);
                if (cosmosItemOperation != FlushBuffersItemOperation.singleton()) {
                    this.totalCount.incrementAndGet();
                }
                logger.trace("SetupRetryPolicy, {}, TotalCount: {}, Context: {}, {}", new Object[]{getItemOperationDiagnostics(cosmosItemOperation), Integer.valueOf(this.totalCount.get()), this.operationContextText, getThreadInfo()});
            }).doOnComplete(() -> {
                this.mainSourceCompleted.set(true);
                long j = this.totalCount.get();
                logger.debug("Main source completed - # left items {}, Context: {}", Long.valueOf(j), this.operationContextText);
                if (j == 0) {
                    completeAllSinks();
                    return;
                }
                cancelFlushTask();
                onFlush();
                long min = Math.min(this.maxMicroBatchIntervalInMs.longValue(), 100L);
                this.scheduledFutureForFlush = this.executorService.scheduleWithFixedDelay(this::onFlush, min, min, TimeUnit.MILLISECONDS);
                logger.debug("Scheduled new flush operation {}, Context: {}", getThreadInfo(), this.operationContextText);
            }).mergeWith(this.mainSink.asFlux()).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(cosmosItemOperation2 -> {
                logger.trace("Before Resolve PkRangeId, {}, Context: {} {}", new Object[]{getItemOperationDiagnostics(cosmosItemOperation2), this.operationContextText, getThreadInfo()});
                return BulkExecutorUtil.resolvePartitionKeyRangeId(this.docClientWrapper, this.container, cosmosItemOperation2).map(str -> {
                    PartitionScopeThresholds computeIfAbsent = this.partitionScopeThresholds.computeIfAbsent(str, str -> {
                        return new PartitionScopeThresholds(str, this.cosmosBulkExecutionOptions);
                    });
                    logger.trace("Resolved PkRangeId, {}, PKRangeId: {} Context: {} {}", new Object[]{getItemOperationDiagnostics(cosmosItemOperation2), str, this.operationContextText, getThreadInfo()});
                    return Pair.of(computeIfAbsent, cosmosItemOperation2);
                });
            }).groupBy((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }).flatMap(this::executePartitionedGroup, num.intValue()).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).doOnNext(cosmosBulkOperationResponse -> {
                int decrementAndGet = this.totalCount.decrementAndGet();
                boolean z = this.mainSourceCompleted.get();
                if (decrementAndGet == 0 && z) {
                    logger.debug("All work completed, {}, TotalCount: {}, Context: {} {}", new Object[]{getItemOperationDiagnostics(cosmosBulkOperationResponse.getOperation()), Integer.valueOf(decrementAndGet), this.operationContextText, getThreadInfo()});
                    completeAllSinks();
                } else {
                    if (decrementAndGet == 0) {
                        logger.debug("No Work left - but mainSource not yet completed, Context: {} {}", this.operationContextText, getThreadInfo());
                    }
                    logger.trace("Work left - TotalCount after decrement: {}, main sink completed {}, {}, Context: {} {}", new Object[]{Integer.valueOf(decrementAndGet), Boolean.valueOf(z), getItemOperationDiagnostics(cosmosBulkOperationResponse.getOperation()), this.operationContextText, getThreadInfo()});
                }
            }).doOnComplete(() -> {
                int i = this.totalCount.get();
                boolean z = this.mainSourceCompleted.get();
                if (i != 0 || !z) {
                    logger.debug("DoOnComplete: Work left - TotalCount after decrement: {}, main sink completed {}, Context: {} {}", new Object[]{Integer.valueOf(i), Boolean.valueOf(z), this.operationContextText, getThreadInfo()});
                } else {
                    logger.debug("DoOnComplete: All work completed, Context: {}", this.operationContextText);
                    completeAllSinks();
                }
            });
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(GroupedFlux<PartitionScopeThresholds, CosmosItemOperation> groupedFlux) {
        PartitionScopeThresholds partitionScopeThresholds = (PartitionScopeThresholds) groupedFlux.key();
        FluxProcessor serialize = UnicastProcessor.create().serialize();
        FluxSink<CosmosItemOperation> sink = serialize.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks.add(sink);
        AtomicLong atomicLong = new AtomicLong(-1L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return groupedFlux.mergeWith(serialize).onBackpressureBuffer().timestamp().subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).bufferUntil(tuple2 -> {
            long longValue = ((Long) tuple2.getT1()).longValue();
            CosmosItemOperation cosmosItemOperation = (CosmosItemOperation) tuple2.getT2();
            logger.trace("BufferUntil - enqueued {}, {}, Context: {} {}", new Object[]{Long.valueOf(longValue), getItemOperationDiagnostics(cosmosItemOperation), this.operationContextText, getThreadInfo()});
            if (cosmosItemOperation == FlushBuffersItemOperation.singleton()) {
                long j = atomicLong2.get();
                if (j <= 0) {
                    return false;
                }
                logger.trace("Flushing PKRange {} (batch size: {}) due to FlushItemOperation, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Long.valueOf(j), this.operationContextText, getThreadInfo()});
                atomicLong.set(-1L);
                atomicLong2.set(0L);
                atomicInteger.set(0);
                return true;
            }
            atomicLong.compareAndSet(-1L, longValue);
            long j2 = longValue - atomicLong.get();
            long incrementAndGet = atomicLong2.incrementAndGet();
            int calculateTotalSerializedLength = calculateTotalSerializedLength(atomicInteger, cosmosItemOperation);
            if (incrementAndGet < partitionScopeThresholds.getTargetMicroBatchSizeSnapshot() && j2 < this.maxMicroBatchIntervalInMs.longValue() && calculateTotalSerializedLength < 220201) {
                return false;
            }
            logger.debug("BufferUntil - Flushing PKRange {} due to BatchSize ({}), payload size ({}) or age ({}), Triggering {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Long.valueOf(incrementAndGet), Integer.valueOf(calculateTotalSerializedLength), Long.valueOf(j2), getItemOperationDiagnostics(cosmosItemOperation), this.operationContextText, getThreadInfo()});
            atomicLong.set(-1L);
            atomicLong2.set(0L);
            atomicInteger.set(0);
            return true;
        }).flatMap(list -> {
            ArrayList arrayList = new ArrayList(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                CosmosItemOperation cosmosItemOperation = (CosmosItemOperation) ((Tuple2) it.next()).getT2();
                if (cosmosItemOperation != FlushBuffersItemOperation.singleton()) {
                    arrayList.add(cosmosItemOperation);
                }
            }
            logger.debug("Flushing PKRange {} micro batch with {} operations,  Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Integer.valueOf(arrayList.size()), this.operationContextText, getThreadInfo()});
            return executeOperations(arrayList, partitionScopeThresholds, sink);
        }, ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getMaxMicroBatchConcurrency(this.cosmosBulkExecutionOptions));
    }

    private int calculateTotalSerializedLength(AtomicInteger atomicInteger, CosmosItemOperation cosmosItemOperation) {
        return cosmosItemOperation instanceof CosmosItemOperationBase ? atomicInteger.accumulateAndGet(((CosmosItemOperationBase) cosmosItemOperation).getSerializedLength(), (i, i2) -> {
            return i + i2;
        }) : atomicInteger.get();
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(List<CosmosItemOperation> list, PartitionScopeThresholds partitionScopeThresholds, FluxSink<CosmosItemOperation> fluxSink) {
        if (list.size() == 0) {
            logger.trace("Empty operations list, Context: {}", this.operationContextText);
            return Flux.empty();
        }
        ServerOperationBatchRequest createBatchRequest = BulkExecutorUtil.createBatchRequest(list, partitionScopeThresholds.getPartitionKeyRangeId());
        if (createBatchRequest.getBatchPendingOperations().size() > 0) {
            List<CosmosItemOperation> batchPendingOperations = createBatchRequest.getBatchPendingOperations();
            Objects.requireNonNull(fluxSink);
            batchPendingOperations.forEach((v1) -> {
                r1.next(v1);
            });
        }
        return Flux.just(createBatchRequest.getBatchRequest()).publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(partitionKeyRangeServerBatchRequest -> {
            return executePartitionKeyRangeServerBatchRequest(partitionKeyRangeServerBatchRequest, fluxSink, partitionScopeThresholds);
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionKeyRangeServerBatchRequest(PartitionKeyRangeServerBatchRequest partitionKeyRangeServerBatchRequest, FluxSink<CosmosItemOperation> fluxSink, PartitionScopeThresholds partitionScopeThresholds) {
        return executeBatchRequest(partitionKeyRangeServerBatchRequest).subscribeOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMapMany(cosmosBatchResponse -> {
            return Flux.fromIterable(cosmosBatchResponse.getResults()).publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(cosmosBatchOperationResult -> {
                return handleTransactionalBatchOperationResult(cosmosBatchResponse, cosmosBatchOperationResult, fluxSink, partitionScopeThresholds);
            });
        }).onErrorResume(th -> {
            if (!(th instanceof Exception)) {
                throw Exceptions.propagate(th);
            }
            Exception exc = (Exception) th;
            return Flux.fromIterable(partitionKeyRangeServerBatchRequest.getOperations()).publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC).flatMap(cosmosItemOperation -> {
                return handleTransactionalBatchExecutionException(cosmosItemOperation, exc, fluxSink, partitionScopeThresholds);
            });
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOperationResult(CosmosBatchResponse cosmosBatchResponse, CosmosBatchOperationResult cosmosBatchOperationResult, FluxSink<CosmosItemOperation> fluxSink, PartitionScopeThresholds partitionScopeThresholds) {
        CosmosBulkItemResponse createCosmosBulkItemResponse = ModelBridgeInternal.createCosmosBulkItemResponse(cosmosBatchOperationResult, cosmosBatchResponse);
        CosmosItemOperation operation = cosmosBatchOperationResult.getOperation();
        TContext actualContext = getActualContext(operation);
        logger.debug("HandleTransactionalBatchOperationResult - PKRange {}, Response Status Code {}, Operation Status Code, {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Integer.valueOf(cosmosBatchResponse.getStatusCode()), Integer.valueOf(cosmosBatchOperationResult.getStatusCode()), getItemOperationDiagnostics(operation), this.operationContextText, getThreadInfo()});
        if (cosmosBatchOperationResult.isSuccessStatusCode()) {
            partitionScopeThresholds.recordSuccessfulOperation();
            return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(operation, createCosmosBulkItemResponse, actualContext));
        }
        if (operation instanceof ItemBulkOperation) {
            return ((ItemBulkOperation) operation).getRetryPolicy().shouldRetry(cosmosBatchOperationResult).flatMap(shouldRetryResult -> {
                if (shouldRetryResult.shouldRetry) {
                    logger.debug("HandleTransactionalBatchOperationResult - enqueue retry, PKRange {}, Response Status Code {}, Operation Status Code, {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Integer.valueOf(cosmosBatchResponse.getStatusCode()), Integer.valueOf(cosmosBatchOperationResult.getStatusCode()), getItemOperationDiagnostics(operation), this.operationContextText, getThreadInfo()});
                    return enqueueForRetry(shouldRetryResult.backOffTime, fluxSink, operation, partitionScopeThresholds);
                }
                if (cosmosBatchResponse.getStatusCode() == 409 || cosmosBatchResponse.getStatusCode() == 412) {
                    logger.debug("HandleTransactionalBatchOperationResult - Fail, PKRange {}, Response Status Code {}, Operation Status Code {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Integer.valueOf(cosmosBatchResponse.getStatusCode()), Integer.valueOf(cosmosBatchOperationResult.getStatusCode()), getItemOperationDiagnostics(operation), this.operationContextText, getThreadInfo()});
                } else {
                    logger.error("HandleTransactionalBatchOperationResult - Fail, PKRange {}, Response Status Code {}, Operation Status Code {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), Integer.valueOf(cosmosBatchResponse.getStatusCode()), Integer.valueOf(cosmosBatchOperationResult.getStatusCode()), getItemOperationDiagnostics(operation), this.operationContextText, getThreadInfo()});
                }
                return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(operation, createCosmosBulkItemResponse, actualContext));
            });
        }
        throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
    }

    private TContext getActualContext(CosmosItemOperation cosmosItemOperation) {
        TContext tcontext;
        ItemBulkOperation itemBulkOperation = null;
        if (cosmosItemOperation instanceof ItemBulkOperation) {
            itemBulkOperation = (ItemBulkOperation) cosmosItemOperation;
        }
        if (itemBulkOperation != null && (tcontext = (TContext) itemBulkOperation.getContext()) != null) {
            return tcontext;
        }
        return this.batchContext;
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExecutionException(CosmosItemOperation cosmosItemOperation, Exception exc, FluxSink<CosmosItemOperation> fluxSink, PartitionScopeThresholds partitionScopeThresholds) {
        logger.debug("HandleTransactionalBatchExecutionException, PKRange {}, Error: {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), exc, getItemOperationDiagnostics(cosmosItemOperation), this.operationContextText, getThreadInfo()});
        if (!(exc instanceof CosmosException) || !(cosmosItemOperation instanceof ItemBulkOperation)) {
            return Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(cosmosItemOperation, exc, getActualContext(cosmosItemOperation)));
        }
        CosmosException cosmosException = (CosmosException) exc;
        ItemBulkOperation itemBulkOperation = (ItemBulkOperation) cosmosItemOperation;
        return itemBulkOperation.getRetryPolicy().shouldRetryForGone(cosmosException.getStatusCode(), cosmosException.getSubStatusCode()).flatMap(bool -> {
            if (!bool.booleanValue()) {
                logger.debug("HandleTransactionalBatchExecutionException - Retry other, PKRange {}, Error: {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), exc, getItemOperationDiagnostics(cosmosItemOperation), this.operationContextText, getThreadInfo()});
                return retryOtherExceptions(cosmosItemOperation, exc, fluxSink, cosmosException, itemBulkOperation, partitionScopeThresholds);
            }
            logger.debug("HandleTransactionalBatchExecutionException - Retry due to split, PKRange {}, Error: {}, {}, Context: {} {}", new Object[]{partitionScopeThresholds.getPartitionKeyRangeId(), exc, getItemOperationDiagnostics(cosmosItemOperation), this.operationContextText, getThreadInfo()});
            this.mainSink.emitNext(cosmosItemOperation, this.serializedEmitFailureHandler);
            return Mono.empty();
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> enqueueForRetry(Duration duration, FluxSink<CosmosItemOperation> fluxSink, CosmosItemOperation cosmosItemOperation, PartitionScopeThresholds partitionScopeThresholds) {
        partitionScopeThresholds.recordEnqueuedRetry();
        if (duration != null && !duration.isZero()) {
            return Mono.delay(duration).flatMap(l -> {
                fluxSink.next(cosmosItemOperation);
                return Mono.empty();
            });
        }
        fluxSink.next(cosmosItemOperation);
        return Mono.empty();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(CosmosItemOperation cosmosItemOperation, Exception exc, FluxSink<CosmosItemOperation> fluxSink, CosmosException cosmosException, ItemBulkOperation<?, ?> itemBulkOperation, PartitionScopeThresholds partitionScopeThresholds) {
        TContext actualContext = getActualContext(cosmosItemOperation);
        return itemBulkOperation.getRetryPolicy().shouldRetry((Exception) cosmosException).flatMap(shouldRetryResult -> {
            return shouldRetryResult.shouldRetry ? enqueueForRetry(shouldRetryResult.backOffTime, fluxSink, itemBulkOperation, partitionScopeThresholds) : Mono.just(ModelBridgeInternal.createCosmosBulkOperationResponse(cosmosItemOperation, exc, actualContext));
        });
    }

    private Mono<CosmosBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest partitionKeyRangeServerBatchRequest) {
        RequestOptions requestOptions = new RequestOptions();
        requestOptions.setThroughputControlGroupName(this.cosmosBulkExecutionOptions.getThroughputControlGroupName());
        Map<String, String> customOptions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor().getCustomOptions(this.cosmosBulkExecutionOptions);
        if (customOptions != null && !customOptions.isEmpty()) {
            for (Map.Entry<String, String> entry : customOptions.entrySet()) {
                requestOptions.setHeader(entry.getKey(), entry.getValue());
            }
        }
        requestOptions.setOperationContextAndListenerTuple(this.operationListener);
        if (!this.docClientWrapper.isContentResponseOnWriteEnabled() && partitionKeyRangeServerBatchRequest.getOperations().size() > 0) {
            for (CosmosItemOperation cosmosItemOperation : partitionKeyRangeServerBatchRequest.getOperations()) {
                if (cosmosItemOperation instanceof ItemBulkOperation) {
                    ItemBulkOperation itemBulkOperation = (ItemBulkOperation) cosmosItemOperation;
                    if (itemBulkOperation.getOperationType() == CosmosItemOperationType.READ || (itemBulkOperation.getRequestOptions() != null && itemBulkOperation.getRequestOptions().isContentResponseOnWriteEnabled() != null && itemBulkOperation.getRequestOptions().isContentResponseOnWriteEnabled().booleanValue())) {
                        requestOptions.setContentResponseOnWriteEnabled(true);
                        break;
                    }
                }
            }
        }
        return FluxUtil.withContext(context -> {
            return BridgeInternal.getTracerProvider(this.cosmosClient).traceEnabledBatchResponsePublisher(this.docClientWrapper.executeBatchRequest(BridgeInternal.getLink(this.container), partitionKeyRangeServerBatchRequest, requestOptions, false), context, this.bulkSpanName, this.container.getId(), this.container.getDatabase().getId(), this.cosmosClient, requestOptions.getConsistencyLevel(), OperationType.Batch, ResourceType.Document);
        });
    }

    private void completeAllSinks() {
        logger.info("Closing all sinks, Context: {}", this.operationContextText);
        logger.debug("Executor service shut down, Context: {}", this.operationContextText);
        Sinks.EmitResult tryEmitComplete = this.mainSink.tryEmitComplete();
        if (tryEmitComplete == Sinks.EmitResult.OK) {
            logger.debug("Main sink completed, Context: {}", this.operationContextText);
        } else if (tryEmitComplete == Sinks.EmitResult.FAIL_CANCELLED || tryEmitComplete == Sinks.EmitResult.FAIL_TERMINATED) {
            logger.debug("Main sink already completed, EmitResult: {}, Context: {}", tryEmitComplete, this.operationContextText);
        } else {
            logger.warn("Main sink completion failed. EmitResult: {}, Context: {}", tryEmitComplete, this.operationContextText);
        }
        shutdown();
    }

    private void onFlush() {
        try {
            this.groupSinks.forEach(fluxSink -> {
                fluxSink.next(FlushBuffersItemOperation.singleton());
            });
        } catch (Throwable th) {
            logger.error("Callback invocation 'onFlush' failed. Context: {}", this.operationContextText, th);
        }
    }

    private static String getItemOperationDiagnostics(CosmosItemOperation cosmosItemOperation) {
        if (cosmosItemOperation == FlushBuffersItemOperation.singleton()) {
            return "ItemOperation[Type: Flush]";
        }
        StringBuilder sb = new StringBuilder();
        sb.append("ItemOperation[Type: ").append(cosmosItemOperation.getOperationType().toString()).append(", PK: ").append(cosmosItemOperation.getPartitionKeyValue() != null ? cosmosItemOperation.getPartitionKeyValue().toString() : "n/a").append(", id: ").append(cosmosItemOperation.getId()).append("]");
        return sb.toString();
    }

    private static String getThreadInfo() {
        StringBuilder sb = new StringBuilder();
        Thread currentThread = Thread.currentThread();
        sb.append("Thread[").append("Name: ").append(currentThread.getName()).append(",Group: ").append(currentThread.getThreadGroup() != null ? currentThread.getThreadGroup().getName() : "n/a").append(", isDaemon: ").append(currentThread.isDaemon()).append(", Id: ").append(currentThread.getId()).append("]");
        return sb.toString();
    }
}
