package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Exceptions;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/caches/AsyncCacheNonBlocking.class */
public class AsyncCacheNonBlocking<TKey, TValue> {
    private static final Logger logger = LoggerFactory.getLogger(AsyncCacheNonBlocking.class);
    private final ConcurrentHashMap<TKey, AsyncLazyWithRefresh<TValue>> values = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/cosmos/implementation/caches/AsyncCacheNonBlocking$AsyncLazyWithRefresh.class */
    public static class AsyncLazyWithRefresh<TValue> {
        private final AtomicBoolean removeFromCache = new AtomicBoolean(false);
        private final AtomicReference<Mono<TValue>> value = new AtomicReference<>();
        private final AtomicReference<Mono<TValue>> refreshInProgress;

        public AsyncLazyWithRefresh(TValue tvalue) {
            this.value.set(Mono.just(tvalue));
            this.refreshInProgress = new AtomicReference<>(null);
        }

        public AsyncLazyWithRefresh(Function<TValue, Mono<TValue>> function) {
            this.value.set(function.apply(null).cache());
            this.refreshInProgress = new AtomicReference<>(null);
        }

        public Mono<TValue> getValueAsync() {
            return this.value.get();
        }

        public Mono<TValue> value() {
            return this.value.get();
        }

        public Mono<TValue> getOrCreateBackgroundRefreshTaskAsync(Function<TValue, Mono<TValue>> function) {
            Mono<TValue> updateAndGet = this.refreshInProgress.updateAndGet(mono -> {
                if (mono == null) {
                    AsyncCacheNonBlocking.logger.debug("Started a new background task");
                    return createBackgroundRefreshTask(function);
                }
                AsyncCacheNonBlocking.logger.debug("Background refresh task is already in progress");
                return mono;
            });
            return updateAndGet == null ? this.value.get() : updateAndGet;
        }

        private Mono<TValue> createBackgroundRefreshTask(Function<TValue, Mono<TValue>> function) {
            return this.value.get().flatMap(function).flatMap(obj -> {
                this.refreshInProgress.set(null);
                return this.value.updateAndGet(mono -> {
                    return Mono.just(obj);
                });
            }).doOnError(th -> {
                this.refreshInProgress.set(null);
                AsyncCacheNonBlocking.logger.debug("Background refresh task failed", th);
            }).cache();
        }

        public Mono<TValue> refresh(Function<TValue, Mono<TValue>> function) {
            if (this.refreshInProgress.compareAndSet(null, createBackgroundRefreshTask(function))) {
                AsyncCacheNonBlocking.logger.debug("Started a new background task");
                return this.refreshInProgress.get();
            }
            AsyncCacheNonBlocking.logger.debug("Background refresh task is already in progress, skip creating a new one");
            return null;
        }

        public boolean shouldRemoveFromCache() {
            return this.removeFromCache.compareAndSet(false, true);
        }
    }

    private Boolean removeNotFoundFromCacheException(CosmosException cosmosException) {
        return Exceptions.isNotFound(cosmosException);
    }

    public Mono<TValue> getAsync(TKey tkey, Function<TValue, Mono<TValue>> function, Function<TValue, Boolean> function2) {
        AsyncLazyWithRefresh<TValue> asyncLazyWithRefresh = this.values.get(tkey);
        if (asyncLazyWithRefresh != null) {
            logger.debug("cache[{}] exists", tkey);
            return asyncLazyWithRefresh.getValueAsync().flatMap(obj -> {
                return !((Boolean) function2.apply(obj)).booleanValue() ? Mono.just(obj) : asyncLazyWithRefresh.getOrCreateBackgroundRefreshTaskAsync(function).onErrorResume(th -> {
                    if ((th instanceof CosmosException) && removeNotFoundFromCacheException((CosmosException) th).booleanValue() && asyncLazyWithRefresh.shouldRemoveFromCache()) {
                        remove(tkey);
                    }
                    logger.debug("refresh cache [{}] resulted in error", tkey, th);
                    return Mono.error(th);
                });
            }).onErrorResume(th -> {
                if (asyncLazyWithRefresh.shouldRemoveFromCache()) {
                    remove(tkey);
                }
                logger.debug("cache[{}] resulted in error", tkey, th);
                return Mono.error(th);
            });
        }
        logger.debug("cache[{}] doesn't exist, computing new value", tkey);
        AsyncLazyWithRefresh<TValue> asyncLazyWithRefresh2 = new AsyncLazyWithRefresh<>((Function) function);
        AsyncLazyWithRefresh<TValue> putIfAbsent = this.values.putIfAbsent(tkey, asyncLazyWithRefresh2);
        if (putIfAbsent == null) {
            putIfAbsent = asyncLazyWithRefresh2;
        }
        AsyncLazyWithRefresh<TValue> asyncLazyWithRefresh3 = putIfAbsent;
        return asyncLazyWithRefresh3.getValueAsync().onErrorResume(th2 -> {
            if (asyncLazyWithRefresh3.shouldRemoveFromCache()) {
                remove(tkey);
            }
            logger.debug("cache[{}] resulted in error", tkey, th2);
            return Mono.error(th2);
        });
    }

    public void refresh(TKey tkey, Function<TValue, Mono<TValue>> function) {
        Mono<TValue> refresh;
        logger.debug("refreshing cache[{}]", tkey);
        AsyncLazyWithRefresh<TValue> asyncLazyWithRefresh = this.values.get(tkey);
        if (asyncLazyWithRefresh == null || (refresh = asyncLazyWithRefresh.refresh(function)) == null) {
            return;
        }
        refresh.subscribeOn(CosmosSchedulers.ASYNC_CACHE_BACKGROUND_REFRESH_BOUNDED_ELASTIC).onErrorResume(th -> {
            logger.debug("Background address refresh task failed for {}", tkey, th);
            return Mono.empty();
        }).subscribe();
    }

    public void set(TKey tkey, TValue tvalue) {
        logger.debug("set cache[{}]={}", tkey, tvalue);
        this.values.put(tkey, new AsyncLazyWithRefresh<>(tvalue));
    }

    public void remove(TKey tkey) {
        this.values.remove(tkey);
    }
}
