/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.PartitionStatusChanged;
import org.infinispan.notifications.cachelistener.event.PartitionStatusChangedEvent;
import org.infinispan.partitionhandling.AvailabilityException;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManagerImpl;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentCompletionPublisher;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

@Scope(value=Scopes.NAMED_CACHE)
public class PartitionAwareClusterPublisherManager<K, V>
extends ClusterPublisherManagerImpl<K, V> {
    volatile AvailabilityMode currentMode = AvailabilityMode.AVAILABLE;
    protected final PartitionListener listener = new PartitionListener();
    @Inject
    protected ComponentRef<Cache<?, ?>> cache;
    private final Set<CompletableFuture<?>> pendingCompletableFutures = ConcurrentHashMap.newKeySet();
    private final Set<FlowableProcessor<?>> pendingProcessors = ConcurrentHashMap.newKeySet();

    @Override
    public void start() {
        super.start();
        this.cache.running().addListener(this.listener);
    }

    @Override
    public <R> CompletionStage<R> keyReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, InvocationContext ctx, boolean includeLoader, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<K>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        this.checkPartitionStatus();
        CompletionStage original = super.keyReduction(parallelPublisher, segments, keysToInclude, ctx, includeLoader, deliveryGuarantee, transformer, finalizer);
        return this.registerStage(original);
    }

    @Override
    public <R> CompletionStage<R> entryReduction(boolean parallelPublisher, IntSet segments, Set<K> keysToInclude, InvocationContext ctx, boolean includeLoader, DeliveryGuarantee deliveryGuarantee, Function<? super Publisher<CacheEntry<K, V>>, ? extends CompletionStage<R>> transformer, Function<? super Publisher<R>, ? extends CompletionStage<R>> finalizer) {
        this.checkPartitionStatus();
        CompletionStage original = super.entryReduction(parallelPublisher, segments, keysToInclude, ctx, includeLoader, deliveryGuarantee, transformer, finalizer);
        return this.registerStage(original);
    }

    private <R> CompletionStage<R> registerStage(CompletionStage<R> original) {
        CompletableFuture future = new CompletableFuture();
        this.pendingCompletableFutures.add(future);
        if (this.isPartitionDegraded()) {
            this.pendingCompletableFutures.remove(future);
            future.completeExceptionally((Throwable)((Object)Log.CLUSTER.partitionDegraded()));
        } else {
            original.whenComplete((value, t) -> {
                if (t != null) {
                    future.completeExceptionally((Throwable)t);
                } else {
                    future.complete(value);
                }
                this.pendingCompletableFutures.remove(future);
            });
        }
        return future;
    }

    @Override
    public <R> SegmentCompletionPublisher<R> keyPublisher(IntSet segments, Set<K> keysToInclude, InvocationContext invocationContext, boolean includeLoader, DeliveryGuarantee deliveryGuarantee, int batchSize, Function<? super Publisher<K>, ? extends Publisher<R>> transformer) {
        this.checkPartitionStatus();
        SegmentCompletionPublisher original = super.keyPublisher(segments, keysToInclude, invocationContext, includeLoader, deliveryGuarantee, batchSize, transformer);
        return this.registerPublisher(original);
    }

    @Override
    public <R> SegmentCompletionPublisher<R> entryPublisher(IntSet segments, Set<K> keysToInclude, InvocationContext invocationContext, boolean includeLoader, DeliveryGuarantee deliveryGuarantee, int batchSize, Function<? super Publisher<CacheEntry<K, V>>, ? extends Publisher<R>> transformer) {
        this.checkPartitionStatus();
        SegmentCompletionPublisher original = super.entryPublisher(segments, keysToInclude, invocationContext, includeLoader, deliveryGuarantee, batchSize, transformer);
        return this.registerPublisher(original);
    }

    private <R> SegmentCompletionPublisher<R> registerPublisher(SegmentCompletionPublisher<R> original) {
        return (subscriber, segmentsComplete) -> {
            FlowableProcessor earlyTerminatingProcessor = PublishProcessor.create().toSerialized();
            this.pendingProcessors.add(earlyTerminatingProcessor);
            if (this.isPartitionDegraded()) {
                this.pendingProcessors.remove(earlyTerminatingProcessor);
                earlyTerminatingProcessor.onError((Throwable)((Object)Log.CLUSTER.partitionDegraded()));
                original.subscribe((Subscriber)earlyTerminatingProcessor);
            } else {
                earlyTerminatingProcessor.doOnTerminate(() -> this.pendingProcessors.remove(earlyTerminatingProcessor)).subscribe(subscriber);
                original.subscribe((Subscriber)earlyTerminatingProcessor, segmentsComplete);
            }
        };
    }

    private void checkPartitionStatus() {
        if (this.isPartitionDegraded()) {
            throw Log.CLUSTER.partitionDegraded();
        }
    }

    private boolean isPartitionDegraded() {
        return this.currentMode != AvailabilityMode.AVAILABLE;
    }

    @Listener
    private class PartitionListener {
        volatile AvailabilityMode currentMode = AvailabilityMode.AVAILABLE;

        private PartitionListener() {
        }

        @PartitionStatusChanged
        public void onPartitionChange(PartitionStatusChangedEvent<K, ?> event) {
            if (!event.isPre()) {
                AvailabilityMode newMode = event.getAvailabilityMode();
                if (newMode == AvailabilityMode.DEGRADED_MODE) {
                    AvailabilityException ae = Log.CLUSTER.partitionDegraded();
                    PartitionAwareClusterPublisherManager.this.pendingProcessors.forEach(pp -> pp.onError((Throwable)((Object)ae)));
                    PartitionAwareClusterPublisherManager.this.pendingCompletableFutures.forEach(cf -> cf.completeExceptionally((Throwable)((Object)ae)));
                }
                this.currentMode = newMode;
            }
        }
    }
}

