/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.support;

import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.PrimitiveIterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.ObjIntConsumer;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.AbstractSegmentedStoreConfiguration;
import org.infinispan.configuration.cache.HashConfiguration;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.InitializationContextImpl;
import org.infinispan.persistence.factory.CacheStoreFactoryRegistry;
import org.infinispan.persistence.internal.PersistenceUtil;
import org.infinispan.persistence.spi.AdvancedCacheWriter;
import org.infinispan.persistence.spi.AdvancedLoadWriteStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.support.AbstractSegmentedAdvancedLoadWriteStore;
import org.reactivestreams.Publisher;

public class ComposedSegmentedLoadWriteStore<K, V, T extends AbstractSegmentedStoreConfiguration>
extends AbstractSegmentedAdvancedLoadWriteStore<K, V> {
    private final AbstractSegmentedStoreConfiguration<T> configuration;
    Cache<K, V> cache;
    ExecutorService executorService;
    CacheStoreFactoryRegistry cacheStoreFactoryRegistry;
    KeyPartitioner keyPartitioner;
    InitializationContext ctx;
    Scheduler scheduler;
    boolean shouldStopSegments;
    AtomicReferenceArray<AdvancedLoadWriteStore<K, V>> stores;

    public ComposedSegmentedLoadWriteStore(AbstractSegmentedStoreConfiguration<T> configuration) {
        this.configuration = configuration;
    }

    @Override
    public ToIntFunction<Object> getKeyMapper() {
        return this.keyPartitioner;
    }

    @Override
    public MarshalledEntry<K, V> load(int segment, Object key) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        if (store != null) {
            return store.load(key);
        }
        return null;
    }

    @Override
    public boolean contains(int segment, Object key) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        return store != null && store.contains(key);
    }

    @Override
    public void write(int segment, MarshalledEntry<? extends K, ? extends V> entry) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        if (store != null) {
            store.write(entry);
        }
    }

    @Override
    public boolean delete(int segment, Object key) {
        AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
        return store != null && store.delete(key);
    }

    @Override
    public int size(IntSet segments) {
        int size = 0;
        PrimitiveIterator.OfInt segmentIterator = segments.iterator();
        while (segmentIterator.hasNext()) {
            int segment = segmentIterator.nextInt();
            AdvancedLoadWriteStore<K, V> store = this.stores.get(segment);
            if (store == null || (size += store.size()) >= 0) continue;
            return Integer.MAX_VALUE;
        }
        return size;
    }

    @Override
    public int size() {
        int size = 0;
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> store = this.stores.get(i);
            if (store == null || (size += store.size()) >= 0) continue;
            return Integer.MAX_VALUE;
        }
        return size;
    }

    @Override
    public Flowable<K> publishKeys(IntSet segments, Predicate<? super K> filter) {
        return PersistenceUtil.parallelizePublisher(segments, this.scheduler, i -> {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws != null) {
                return alws.publishKeys(filter);
            }
            return Flowable.empty();
        });
    }

    @Override
    public Flowable<K> publishKeys(Predicate<? super K> filter) {
        return this.publishKeys(IntSets.immutableRangeSet((int)this.stores.length()), filter);
    }

    @Override
    public Publisher<MarshalledEntry<K, V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
        return PersistenceUtil.parallelizePublisher(segments, this.scheduler, i -> {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws != null) {
                return alws.publishEntries(filter, fetchValue, fetchMetadata);
            }
            return Flowable.empty();
        });
    }

    @Override
    public Publisher<MarshalledEntry<K, V>> publishEntries(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata) {
        return this.publishEntries(IntSets.immutableRangeSet((int)this.stores.length()), filter, fetchValue, fetchMetadata);
    }

    @Override
    public void clear() {
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws == null) continue;
            alws.clear();
        }
    }

    @Override
    public void purge(Executor threadPool, AdvancedCacheWriter.PurgeListener<? super K> listener) {
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(i);
            if (alws == null) continue;
            alws.purge(threadPool, listener);
        }
    }

    @Override
    public void clear(IntSet segments) {
        PrimitiveIterator.OfInt segmentIterator = segments.iterator();
        while (segmentIterator.hasNext()) {
            AdvancedLoadWriteStore<K, V> alws = this.stores.get(segmentIterator.nextInt());
            if (alws == null) continue;
            alws.clear();
        }
    }

    @Override
    public void deleteBatch(Iterable<Object> keys) {
        Flowable.fromIterable(keys).groupBy(this.keyPartitioner::getSegment).blockingForEach(groupedFlowable -> groupedFlowable.buffer(this.configuration.maxBatchSize()).blockingForEach(batch -> this.stores.get((Integer)groupedFlowable.getKey()).deleteBatch((Iterable)batch)));
    }

    @Override
    public void writeBatch(Iterable<MarshalledEntry<? extends K, ? extends V>> marshalledEntries) {
        Flowable.fromIterable(marshalledEntries).groupBy(me -> this.keyPartitioner.getSegment(me.getKey())).flatMap(groupedFlowable -> groupedFlowable.buffer(this.configuration.maxBatchSize()).flatMap(batch -> {
            this.stores.get((Integer)groupedFlowable.getKey()).writeBatch((Iterable)batch);
            return Flowable.empty();
        }), this.stores.length()).blockingSubscribe();
    }

    @Override
    public void init(InitializationContext ctx) {
        this.ctx = ctx;
        this.cache = ctx.getCache();
        this.executorService = ctx.getExecutor();
    }

    public void start() {
        ComponentRegistry componentRegistry = this.cache.getAdvancedCache().getComponentRegistry();
        this.cacheStoreFactoryRegistry = componentRegistry.getComponent(CacheStoreFactoryRegistry.class);
        this.scheduler = Schedulers.from((Executor)this.executorService);
        HashConfiguration hashConfiguration = this.cache.getCacheConfiguration().clustering().hash();
        this.keyPartitioner = componentRegistry.getComponent(KeyPartitioner.class);
        this.stores = new AtomicReferenceArray(hashConfiguration.numSegments());
        for (int i = 0; i < this.stores.length(); ++i) {
            this.startNewStoreForSegment(i);
        }
        this.shouldStopSegments = this.cache.getCacheConfiguration().clustering().cacheMode().isDistributed();
    }

    private void startNewStoreForSegment(int segment) {
        if (this.stores.get(segment) == null) {
            AbstractSegmentedStoreConfiguration storeConfiguration = (AbstractSegmentedStoreConfiguration)this.configuration.newConfigurationFrom(segment);
            AdvancedLoadWriteStore newStore = (AdvancedLoadWriteStore)this.cacheStoreFactoryRegistry.createInstance(storeConfiguration);
            newStore.init(new InitializationContextImpl(storeConfiguration, this.cache, this.keyPartitioner, this.ctx.getMarshaller(), this.ctx.getTimeService(), this.ctx.getByteBufferFactory(), this.ctx.getMarshalledEntryFactory(), this.ctx.getExecutor()));
            newStore.start();
            this.stores.set(segment, newStore);
        }
    }

    private void stopStoreForSegment(int segment) {
        AdvancedLoadWriteStore store = this.stores.getAndSet(segment, null);
        if (store != null) {
            store.stop();
        }
    }

    private void destroyStore(int segment) {
        AdvancedLoadWriteStore store = this.stores.getAndSet(segment, null);
        if (store != null) {
            store.destroy();
        }
    }

    public void stop() {
        for (int i = 0; i < this.stores.length(); ++i) {
            this.stopStoreForSegment(i);
        }
    }

    @Override
    public void addSegments(IntSet segments) {
        segments.forEach(this::startNewStoreForSegment);
    }

    @Override
    public void removeSegments(IntSet segments) {
        if (this.shouldStopSegments) {
            PrimitiveIterator.OfInt segmentIterator = segments.iterator();
            while (segmentIterator.hasNext()) {
                this.destroyStore(segmentIterator.nextInt());
            }
        } else {
            this.clear(segments);
        }
    }

    public void forEach(ObjIntConsumer<? super AdvancedLoadWriteStore> consumer) {
        for (int i = 0; i < this.stores.length(); ++i) {
            AdvancedLoadWriteStore<K, V> store = this.stores.get(i);
            if (store == null) continue;
            consumer.accept(store, i);
        }
    }
}

