/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.lang.invoke.MethodHandles;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.IntConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.CacheSet;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.PublisherResult;
import org.infinispan.reactive.publisher.impl.SegmentPublisherResult;
import org.infinispan.stream.StreamMarshalling;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.util.rxjava.FlowableFromIntSetFunction;
import org.reactivestreams.Publisher;

public class LocalPublisherManagerImpl<K, V>
implements LocalPublisherManager<K, V> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static final boolean trace = log.isTraceEnabled();
    @Inject
    private ComponentRef<Cache<K, V>> cacheComponentRef;
    @Inject
    private DistributionManager distributionManager;
    private AdvancedCache<K, V> remoteCache;
    private AdvancedCache<K, V> cache;
    private Scheduler asyncScheduler;
    private int maxSegment;
    private boolean hasLoader;
    private final Set<SegmentListener> changeListener = ConcurrentHashMap.newKeySet();
    private static Function<Object, PublisherResult<Object>> ignoreSegmentsFunction = value -> new SegmentPublisherResult<Object>(IntSets.immutableEmptySet(), value);

    @Inject
    public void inject(@ComponentName(value="org.infinispan.executors.async") ExecutorService asyncOperationsExecutor) {
        this.asyncScheduler = Schedulers.from((Executor)asyncOperationsExecutor);
    }

    @Start
    public void start() {
        this.remoteCache = AbstractDelegatingCache.unwrapCache(this.cacheComponentRef.running()).getAdvancedCache();
        this.cache = this.remoteCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.REMOTE_ITERATION);
        this.hasLoader = this.cache.getCacheConfiguration().persistence().usingStores();
        ClusteringConfiguration clusteringConfiguration = this.cache.getCacheConfiguration().clustering();
        this.maxSegment = clusteringConfiguration.hash().numSegments();
    }

    @Override
    public <R> CompletionStage<PublisherResult<R>> keyReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        if (keysToInclude != null) {
            return this.handleSpecificKeys(parallelPublisher, keysToInclude, keysToExclude, deliveryGuarantee, transformer, finalizer);
        }
        AdvancedCache<K, V> cache = this.getCacheWithFlags(includeLoader);
        Function toKeyFunction = Function.identity();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE: {
                CompletionStage<R> stage = this.atMostOnce(parallelPublisher, cache.keySet(), keysToExclude, toKeyFunction, segments, transformer, finalizer);
                return stage.thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
            }
            case AT_LEAST_ONCE: {
                return this.atLeastOnce(parallelPublisher, cache.keySet(), keysToExclude, toKeyFunction, segments, transformer, finalizer);
            }
            case EXACTLY_ONCE: {
                return this.exactlyOnce(parallelPublisher, cache.keySet(), keysToExclude, toKeyFunction, segments, transformer, finalizer);
            }
        }
        throw new UnsupportedOperationException("Unsupported delivery guarantee: " + (Object)((Object)deliveryGuarantee));
    }

    @Override
    public <R> CompletionStage<PublisherResult<R>> entryReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, Set<K> keysToExclude, boolean includeLoader, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        if (keysToInclude != null) {
            return this.handleSpecificEntries(parallelPublisher, keysToInclude, keysToExclude, deliveryGuarantee, transformer, finalizer);
        }
        AdvancedCache<K, V> cache = this.getCacheWithFlags(includeLoader);
        Function toKeyFunction = StreamMarshalling.entryToKeyFunction();
        switch (deliveryGuarantee) {
            case AT_MOST_ONCE: {
                CompletionStage<R> stage = this.atMostOnce(parallelPublisher, cache.cacheEntrySet(), keysToExclude, toKeyFunction, segments, transformer, finalizer);
                return stage.thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
            }
            case AT_LEAST_ONCE: {
                return this.atLeastOnce(parallelPublisher, cache.cacheEntrySet(), keysToExclude, toKeyFunction, segments, transformer, finalizer);
            }
            case EXACTLY_ONCE: {
                return this.exactlyOnce(parallelPublisher, cache.cacheEntrySet(), keysToExclude, toKeyFunction, segments, transformer, finalizer);
            }
        }
        throw new UnsupportedOperationException("Unsupported delivery guarantee: " + (Object)((Object)deliveryGuarantee));
    }

    @Override
    public void segmentsLost(IntSet lostSegments) {
        if (trace) {
            log.tracef("Notifying listeners of lost segments %s", lostSegments);
        }
        this.changeListener.forEach(arg_0 -> ((IntSet)lostSegments).forEach(arg_0));
    }

    private static <R> Function<R, PublisherResult<R>> ignoreSegmentsFunction() {
        return ignoreSegmentsFunction;
    }

    private <I, R> CompletionStage<PublisherResult<R>> exactlyOnce(boolean parallelPublisher, CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        IntSet concurrentSegments = IntSets.concurrentCopyFrom((IntSet)segments, (int)this.maxSegment);
        SegmentListener listener = new SegmentListener(concurrentSegments);
        this.changeListener.add(listener);
        listener.verifyTopology(this.distributionManager.getCacheTopology());
        Flowable stageFlowable = parallelPublisher ? Flowable.fromIterable((Iterable)segments).parallel().runOn(this.asyncScheduler).map(segment -> {
            CompletionStage stage;
            CompletableFuture future;
            Flowable innerFlowable = Flowable.fromPublisher(set.localPublisher((int)segment)).doOnComplete(() -> concurrentSegments.remove(segment));
            if (keysToExclude != null) {
                innerFlowable = innerFlowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
            }
            if ((future = (stage = (CompletionStage)transformer.apply((Object)innerFlowable)).toCompletableFuture()).isDone()) {
                if (listener.segmentsLost.contains(segment)) {
                    return CompletableFutures.completedNull();
                }
                return future;
            }
            return stage.thenCompose(value -> {
                if (listener.segmentsLost.contains(segment)) {
                    return CompletableFutures.completedNull();
                }
                return CompletableFuture.completedFuture(value);
            });
        }).sequential() : new FlowableFromIntSetFunction(segments, segment -> {
            CompletionStage stage;
            CompletableFuture future;
            Flowable innerFlowable = Flowable.fromPublisher(set.localPublisher(segment)).doOnComplete(() -> concurrentSegments.remove(segment));
            if (keysToExclude != null) {
                innerFlowable = innerFlowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
            }
            if ((future = (stage = (CompletionStage)transformer.apply((Object)innerFlowable)).toCompletableFuture()).isDone()) {
                if (listener.segmentsLost.contains(segment)) {
                    return CompletableFutures.completedNull();
                }
                return future;
            }
            return stage.thenCompose(value -> {
                if (listener.segmentsLost.contains(segment)) {
                    return CompletableFutures.completedNull();
                }
                return CompletableFuture.completedFuture(value);
            });
        });
        CompletionStage<R> combinedStage = LocalPublisherManagerImpl.combineStages(stageFlowable, finalizer);
        return this.handleLostSegments(combinedStage, listener);
    }

    private <R> CompletionStage<PublisherResult<R>> handleSpecificKeys(boolean parallelPublisher, Set<K> keysToInclude, Set<K> keysToExclude, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        AdvancedCache cache = deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE ? this.cache : this.remoteCache;
        return this.handleSpecificObjects(parallelPublisher, keysToInclude, keysToExclude, keyFlowable -> keyFlowable.filter(arg_0 -> cache.containsKey(arg_0)), transformer, finalizer);
    }

    private <R> CompletionStage<PublisherResult<R>> handleSpecificEntries(boolean parallelPublisher, Set<K> keysToInclude, Set<K> keysToExclude, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        AdvancedCache cache = deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE ? this.cache : this.remoteCache;
        return this.handleSpecificObjects(parallelPublisher, keysToInclude, keysToExclude, keyFlowable -> keyFlowable.map(k -> {
            CacheEntry entry = cache.getCacheEntry(k);
            if (entry == null) {
                return NullCacheEntry.getInstance();
            }
            return entry;
        }).filter(e -> e != NullCacheEntry.getInstance()), transformer, finalizer);
    }

    private <I, R> CompletionStage<PublisherResult<R>> handleSpecificObjects(boolean parallelPublisher, Set<K> keysToInclude, Set<K> keysToExclude, Function<? super Flowable<K>, ? extends Flowable<I>> keyTransformer, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        Flowable keyFlowable = Flowable.fromIterable(keysToInclude);
        if (keysToExclude != null) {
            keyFlowable = keyFlowable.filter(k -> !keysToExclude.contains(k));
        }
        if (parallelPublisher) {
            Flowable stageFlowable = keyFlowable.window(16L).flatMap(keys -> {
                CompletionStage stage = (CompletionStage)((Flowable)keyTransformer.apply((Flowable<K>)keys)).subscribeOn(this.asyncScheduler).to(transformer::apply);
                return (Publisher)RxJavaInterop.completionStageToPublisher().apply(stage);
            });
            return finalizer.apply((Publisher<R>)stageFlowable).thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
        }
        return ((CompletionStage)keyTransformer.apply(keyFlowable).to(transformer::apply)).thenApply(LocalPublisherManagerImpl.ignoreSegmentsFunction());
    }

    private <I, R> CompletionStage<R> parallelAtMostOnce(CacheSet<I> cacheSet, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        Flowable stageFlowable = Flowable.fromIterable((Iterable)segments).parallel().runOn(this.asyncScheduler).map(segment -> {
            Flowable innerFlowable = Flowable.fromPublisher(cacheSet.localPublisher((int)segment));
            if (keysToExclude != null) {
                innerFlowable = innerFlowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
            }
            return (CompletionStage)transformer.apply((Object)innerFlowable);
        }).sequential();
        return LocalPublisherManagerImpl.combineStages(stageFlowable, finalizer);
    }

    private <I, R> CompletionStage<R> atMostOnce(boolean parallel, CacheSet<I> set, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        if (parallel) {
            return this.parallelAtMostOnce(set, keysToExclude, toKeyFunction, segments, transformer, finalizer);
        }
        Flowable flowable = Flowable.fromPublisher(set.localPublisher(segments));
        if (keysToExclude != null) {
            flowable = flowable.filter(i -> !keysToExclude.contains(toKeyFunction.apply(i)));
        }
        return transformer.apply((Publisher<I>)flowable);
    }

    private <I, R> CompletionStage<PublisherResult<R>> atLeastOnce(boolean parallel, CacheSet<I> cacheSet, Set<K> keysToExclude, Function<I, K> toKeyFunction, IntSet segments, Function<? super Publisher<I>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        SegmentListener listener = new SegmentListener(segments);
        this.changeListener.add(listener);
        listener.verifyTopology(this.distributionManager.getCacheTopology());
        CompletionStage<R> stage = this.atMostOnce(parallel, cacheSet, keysToExclude, toKeyFunction, segments, transformer, finalizer);
        return this.handleLostSegments(stage, listener);
    }

    private <R> CompletionStage<PublisherResult<R>> handleLostSegments(CompletionStage<R> stage, SegmentListener segmentListener) {
        return stage.thenApply(value -> {
            IntSet lostSegments = segmentListener.segmentsLost;
            if (lostSegments.isEmpty()) {
                return LocalPublisherManagerImpl.ignoreSegmentsFunction().apply(value);
            }
            return new SegmentPublisherResult<Object>(lostSegments, value);
        }).whenComplete((u, t) -> this.changeListener.remove(segmentListener));
    }

    private static <R> CompletionStage<R> combineStages(Flowable<? extends CompletionStage<R>> stagePublisher, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        Flowable resultPublisher = stagePublisher.flatMap(stage -> {
            if (stage == CompletableFutures.completedNull()) {
                return Flowable.empty();
            }
            CompletableFuture future = stage.toCompletableFuture();
            if (future.isDone()) {
                Object value = future.join();
                return Flowable.just(value);
            }
            return (Publisher)RxJavaInterop.completionStageToPublisher().apply(stage);
        });
        return finalizer.apply((Publisher<R>)resultPublisher);
    }

    private AdvancedCache<K, V> getCacheWithFlags(boolean includeLoader) {
        if (this.hasLoader && !includeLoader) {
            return this.cache.withFlags(Flag.SKIP_CACHE_LOAD);
        }
        return this.cache;
    }

    private class SegmentListener
    implements IntConsumer {
        private final IntSet segments;
        private final IntSet segmentsLost;

        SegmentListener(IntSet segments) {
            this.segments = segments;
            this.segmentsLost = IntSets.concurrentSet((int)LocalPublisherManagerImpl.this.maxSegment);
        }

        @Override
        public void accept(int segment) {
            if (this.segments.contains(segment)) {
                if (trace) {
                    log.tracef("Listener %s lost segment %d", this, segment);
                }
                this.segmentsLost.set(segment);
            }
        }

        public void verifyTopology(LocalizedCacheTopology localizedCacheTopology) {
            PrimitiveIterator.OfInt segmentIterator = this.segments.iterator();
            while (segmentIterator.hasNext()) {
                int segment = segmentIterator.nextInt();
                if (localizedCacheTopology.isSegmentReadOwner(segment)) continue;
                this.segmentsLost.set(segment);
            }
        }
    }
}

