/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import io.reactivex.Flowable;
import java.util.PrimitiveIterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.PublisherDecorator;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

public abstract class AbstractRehashPublisherDecorator<S>
implements PublisherDecorator<S> {
    final AbstractCacheStream.IteratorOperation iteratorOperation;
    final DistributionManager dm;
    final Address localAddress;
    final Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments;
    final Consumer<Object> keyConsumer;
    final Function<S, ?> toKeyFunction;

    AbstractRehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm, Address localAddress, Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer, Function<S, ?> toKeyFunction) {
        this.iteratorOperation = iteratorOperation;
        this.dm = dm;
        this.localAddress = localAddress;
        this.lostSegments = lostSegments;
        this.keyConsumer = keyConsumer;
        this.toKeyFunction = toKeyFunction;
    }

    abstract Log getLog();

    Publisher<S> decorateLocal(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments, ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter, Publisher<S> localPublisher) {
        Flowable convertedPublisher = Flowable.fromPublisher(localPublisher).doOnComplete(() -> {
            SmallIntSet ourSegments = onlyLocal ? SmallIntSet.from(beginningCh.getSegmentsForOwner(this.localAddress)) : SmallIntSet.from(beginningCh.getPrimarySegmentsForOwner(this.localAddress));
            ourSegments.retainAll(segmentsToFilter);
            if (this.dm.getReadConsistentHash().equals(beginningCh)) {
                this.getLog().tracef("Local iterator has completed segments %s", ourSegments);
                completedSegments.accept(() -> ((IntSet)ourSegments).iterator());
            } else {
                this.getLog().tracef("Local iterator segments %s are all suspect as consistent hash has changed", ourSegments);
                this.lostSegments.accept(() -> ((IntSet)ourSegments).iterator());
            }
        });
        return this.iteratorOperation.handlePublisher(convertedPublisher, this.keyConsumer, this.toKeyFunction);
    }
}

