/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.impl;

import io.reactivex.Flowable;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.manager.OrderedUpdatesManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

public class ScatteredVersionManagerImpl<K>
implements ScatteredVersionManager<K> {
    private static final AtomicIntegerFieldUpdater<ScatteredVersionManagerImpl> topologyIdUpdater = AtomicIntegerFieldUpdater.newUpdater(ScatteredVersionManagerImpl.class, "topologyId");
    protected static final Log log = LogFactory.getLog(ScatteredVersionManagerImpl.class);
    protected static final boolean trace = log.isTraceEnabled();
    @Inject
    private Configuration configuration;
    @Inject
    private ComponentRegistry componentRegistry;
    @Inject
    @ComponentName(value="org.infinispan.executors.transport")
    private ExecutorService executorService;
    @Inject
    private CommandsFactory commandsFactory;
    @Inject
    private RpcManager rpcManager;
    @Inject
    private InternalDataContainer<K, ?> dataContainer;
    @Inject
    private PersistenceManager persistenceManager;
    @Inject
    private DistributionManager distributionManager;
    @Inject
    private ClusterTopologyManager clusterTopologyManager;
    @Inject
    private OrderedUpdatesManager orderedUpdatesManager;
    private int invalidationBatchSize;
    private int numSegments;
    private int preloadedTopologyId = 0;
    private volatile int topologyId = 0;
    private AtomicReferenceArray<ScatteredVersionManager.SegmentState> segmentStates;
    private AtomicReferenceArray<CompletableFuture<Void>> blockedFutures;
    private AtomicLongArray segmentVersions;
    private AtomicIntegerArray ownerTopologyIds;
    private ReadWriteLock scheduledKeysLock = new ReentrantReadWriteLock();
    private ConcurrentMap<K, InvalidationInfo> scheduledKeys;
    private ReadWriteLock removedKeysLock = new ReentrantReadWriteLock();
    private ConcurrentMap<K, InvalidationInfo> removedKeys;
    private volatile boolean transferringValues = false;
    private volatile int valuesTopology = -1;
    private CompletableFuture<Void> valuesFuture = CompletableFutures.completedNull();
    private final Object valuesLock = new Object();

    @Start(priority=15)
    public void start() {
        this.numSegments = this.configuration.clustering().hash().numSegments();
        this.segmentVersions = new AtomicLongArray(this.numSegments);
        this.segmentStates = new AtomicReferenceArray(this.numSegments);
        this.blockedFutures = new AtomicReferenceArray(this.numSegments);
        this.ownerTopologyIds = new AtomicIntegerArray(this.numSegments);
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        ConsistentHash ch = cacheTopology.getCurrentCH();
        for (int i = 0; i < this.numSegments; ++i) {
            ScatteredVersionManager.SegmentState state = ScatteredVersionManager.SegmentState.NOT_OWNED;
            if (cacheTopology.isConnected() && ch.isSegmentLocalToNode(this.rpcManager.getAddress(), i)) {
                state = ScatteredVersionManager.SegmentState.OWNED;
            }
            this.segmentStates.set(i, state);
        }
        this.printTable();
        this.scheduledKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
        this.invalidationBatchSize = this.configuration.clustering().invalidationBatchSize();
        this.removedKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
    }

    @Start(priority=57)
    public void initTopologyId() {
        if (this.persistenceManager.isPreloaded()) {
            if (this.preloadedTopologyId > 0) {
                this.clusterTopologyManager.setInitialCacheTopologyId(this.componentRegistry.getCacheName(), this.preloadedTopologyId + 1);
            }
            return;
        }
        AtomicInteger maxTopologyId = new AtomicInteger(this.preloadedTopologyId);
        Publisher publisher = this.persistenceManager.publishEntries(false, true);
        Flowable.fromPublisher(publisher).blockingForEach(me -> {
            Metadata metadata = me.getMetadata();
            EntryVersion entryVersion = metadata.version();
            if (entryVersion instanceof SimpleClusteredVersion) {
                int entryTopologyId = ((SimpleClusteredVersion)entryVersion).topologyId;
                if (maxTopologyId.get() < entryTopologyId) {
                    maxTopologyId.updateAndGet(current -> Math.max(current, entryTopologyId));
                }
            }
        });
        if (maxTopologyId.get() > 0) {
            this.clusterTopologyManager.setInitialCacheTopologyId(this.componentRegistry.getCacheName(), maxTopologyId.get() + 1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Stop
    public void stop() {
        log.trace("Stopping " + this + " on " + this.rpcManager.getAddress());
        Object object = this.valuesLock;
        synchronized (object) {
            this.valuesTopology = Integer.MAX_VALUE;
            this.valuesFuture.completeExceptionally(new CacheException("Cache is stopping"));
        }
        log.trace("Stopped " + this + " on " + this.rpcManager.getAddress());
    }

    @Override
    public EntryVersion incrementVersion(int segment) {
        switch (this.segmentStates.get(segment)) {
            case NOT_OWNED: {
                throw new CacheException("Segment " + segment + " is not owned by " + this.rpcManager.getAddress());
            }
            case BLOCKED: {
                throw new CacheException("Segment " + segment + " is currently blocked");
            }
            case KEY_TRANSFER: 
            case VALUE_TRANSFER: 
            case OWNED: {
                return new SimpleClusteredVersion(this.topologyId, this.segmentVersions.addAndGet(segment, 1L));
            }
        }
        throw new IllegalStateException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void scheduleKeyInvalidation(K key, EntryVersion version, boolean removal) {
        boolean needsSend;
        InvalidationInfo ii = new InvalidationInfo((SimpleClusteredVersion)version, removal);
        Lock readLock = this.scheduledKeysLock.readLock();
        readLock.lock();
        try {
            this.scheduledKeys.compute(key, (k, old) -> old == null ? ii : (ii.version > old.version || ii.removal && ii.version == old.version ? ii : old));
            needsSend = this.scheduledKeys.size() >= this.invalidationBatchSize;
        }
        finally {
            readLock.unlock();
        }
        if (needsSend) {
            this.tryRegularInvalidations(false);
        }
    }

    protected boolean startFlush() {
        if (!this.scheduledKeys.isEmpty()) {
            this.tryRegularInvalidations(true);
            return true;
        }
        if (!this.removedKeys.isEmpty()) {
            this.tryRemovedInvalidations();
            return true;
        }
        return false;
    }

    @Override
    public synchronized void registerSegment(int segment) {
        this.ownerTopologyIds.set(segment, this.topologyId);
        this.segmentVersions.set(segment, 0L);
        this.blockedFutures.set(segment, new CompletableFuture());
        if (!this.segmentStates.compareAndSet(segment, ScatteredVersionManager.SegmentState.NOT_OWNED, ScatteredVersionManager.SegmentState.BLOCKED)) {
            throw new IllegalStateException("Segment " + segment + " is in state " + (Object)((Object)this.segmentStates.get(segment)));
        }
        log.tracef("Node %s blocks access to segment %d", this.rpcManager.getAddress(), segment);
    }

    @Override
    public synchronized void unregisterSegment(int segment) {
        CompletableFuture<Void> blockedFuture;
        ScatteredVersionManager.SegmentState previous = this.segmentStates.getAndSet(segment, ScatteredVersionManager.SegmentState.NOT_OWNED);
        if (trace) {
            log.tracef("Unregistered segment %d (previous=%s)", segment, (Object)previous);
        }
        if ((blockedFuture = this.blockedFutures.get(segment)) != null) {
            blockedFuture.completeExceptionally(new CacheException("The segment is no longer owned."));
        }
    }

    @Override
    public boolean isVersionActual(int segment, EntryVersion version) {
        SimpleClusteredVersion clusteredVersion = (SimpleClusteredVersion)version;
        return clusteredVersion.topologyId >= this.ownerTopologyIds.get(segment);
    }

    @Override
    public void notifyKeyTransferFinished(int segment, boolean expectValues, boolean cancelled) {
        CompletableFuture<Void> blockedFuture;
        ScatteredVersionManager.SegmentState update;
        if (cancelled) {
            update = ScatteredVersionManager.SegmentState.NOT_OWNED;
            assert (!expectValues);
        } else {
            update = expectValues ? ScatteredVersionManager.SegmentState.VALUE_TRANSFER : ScatteredVersionManager.SegmentState.OWNED;
        }
        ScatteredVersionManager.SegmentState previous = this.segmentStates.getAndSet(segment, update);
        if (trace) {
            log.tracef("Finished transfer for segment %d = %s -> %s", segment, (Object)previous, (Object)update);
        }
        if ((blockedFuture = this.blockedFutures.get(segment)) != null) {
            blockedFuture.completeExceptionally(new CacheException("Segment state transition did not complete correctly."));
        }
        if (trace) {
            if (expectValues) {
                log.tracef("Node %s, segment %d has all keys in, expects value transfer", this.rpcManager.getAddress(), segment);
            } else {
                log.tracef("Node %s, segment %d did not transfer any keys, segment is owned now", this.rpcManager.getAddress(), segment);
            }
        }
    }

    @Override
    public ScatteredVersionManager.SegmentState getSegmentState(int segment) {
        return this.segmentStates.get(segment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setValuesTransferTopology(int topologyId) {
        log.tracef("Node will transfer value for topology %d", topologyId);
        Object object = this.valuesLock;
        synchronized (object) {
            this.transferringValues = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyValueTransferFinished() {
        block8: for (int i = 0; i < this.numSegments; ++i) {
            block9: while (true) {
                ScatteredVersionManager.SegmentState state = this.segmentStates.get(i);
                switch (state) {
                    case NOT_OWNED: 
                    case OWNED: {
                        continue block8;
                    }
                    case BLOCKED: 
                    case KEY_TRANSFER: {
                        this.blockedFutures.get(i).completeExceptionally(new CacheException("Failed to request versions"));
                        log.warnf("Stopped applying state for segment %d in topology %d but the segment is in state %s", i, this.topologyId, (Object)state);
                    }
                    case VALUE_TRANSFER: {
                        if (this.segmentStates.compareAndSet(i, state, ScatteredVersionManager.SegmentState.OWNED)) continue block8;
                    }
                    default: {
                        continue block9;
                    }
                }
                break;
            }
        }
        Object object = this.valuesLock;
        synchronized (object) {
            this.valuesTopology = Math.max(this.topologyId, this.valuesTopology);
            this.transferringValues = false;
            this.valuesFuture.complete(null);
            this.valuesFuture = new CompletableFuture();
        }
        log.debugf("Node %s received values for all segments in topology %d", this.rpcManager.getAddress(), this.topologyId);
    }

    @Override
    public CompletableFuture<Void> getBlockingFuture(int segment) {
        return this.blockedFutures.get(segment);
    }

    @Override
    public void setTopologyId(int topologyId) {
        int currentTopologyId = this.topologyId;
        if (currentTopologyId >= topologyId) {
            throw new IllegalArgumentException("Updating to topology " + topologyId + " but current is " + currentTopologyId);
        }
        if (!topologyIdUpdater.compareAndSet(this, currentTopologyId, topologyId)) {
            throw new IllegalStateException("Concurrent update to topology " + topologyId + ", current was " + currentTopologyId + " but now it's " + this.topologyId);
        }
    }

    @Override
    public void updatePreloadedEntryVersion(EntryVersion version) {
        if (version instanceof SimpleClusteredVersion) {
            int topologyId = ((SimpleClusteredVersion)version).topologyId;
            this.preloadedTopologyId = Math.max(this.preloadedTopologyId, topologyId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> valuesFuture(int topologyId) {
        if (this.transferringValues && topologyId > this.valuesTopology) {
            Object object = this.valuesLock;
            synchronized (object) {
                if (this.transferringValues && topologyId > this.valuesTopology) {
                    return this.valuesFuture.thenCompose(nil -> this.valuesFuture(topologyId));
                }
            }
        }
        return CompletableFutures.completedNull();
    }

    @Override
    public void setOwnedSegments(IntSet segments) {
        PrimitiveIterator.OfInt iter = segments.iterator();
        while (iter.hasNext()) {
            int segment = iter.nextInt();
            this.segmentVersions.set(segment, 0L);
            this.ownerTopologyIds.set(segment, this.topologyId);
            if (this.segmentStates.compareAndSet(segment, ScatteredVersionManager.SegmentState.NOT_OWNED, ScatteredVersionManager.SegmentState.OWNED)) continue;
            throw new IllegalStateException(String.format("Segment %d is in state %s", new Object[]{segment, this.segmentStates.get(segment)}));
        }
        if (log.isDebugEnabled()) {
            log.debugf("Node %s is now owner of segments %s", this.rpcManager.getAddress(), segments);
            this.printTable();
        }
    }

    @Override
    public void startKeyTransfer(IntSet segments) {
        PrimitiveIterator.OfInt iter = segments.iterator();
        while (iter.hasNext()) {
            int segment = iter.nextInt();
            if (!this.segmentStates.compareAndSet(segment, ScatteredVersionManager.SegmentState.BLOCKED, ScatteredVersionManager.SegmentState.KEY_TRANSFER)) {
                throw new IllegalStateException(String.format("Segment %d is in state %s", new Object[]{segment, this.segmentStates.get(segment)}));
            }
            this.blockedFutures.get(segment).complete(null);
            log.tracef("Node %s, segment %d expects key transfer", this.rpcManager.getAddress(), segment);
        }
    }

    private void printTable() {
        StringBuilder sb = new StringBuilder("Segments for node ").append(this.rpcManager.getAddress()).append(':');
        for (int i = 0; i < this.numSegments; i += 16) {
            sb.append('\n');
            for (int j = 0; j < 16 && i + j < this.numSegments; ++j) {
                sb.append(String.format("%4d=%c ", i + j, Character.valueOf(this.segmentStates.get(i + j).singleChar())));
            }
        }
        log.debug(sb.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryRegularInvalidations(boolean force) {
        ConcurrentMap<K, InvalidationInfo> scheduledKeys;
        Lock writeLock = this.scheduledKeysLock.writeLock();
        writeLock.lock();
        try {
            scheduledKeys = this.scheduledKeys;
            this.scheduledKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
        }
        finally {
            writeLock.unlock();
        }
        this.executorService.execute(() -> {
            int numKeys = scheduledKeys.size();
            Object[] keys = new Object[numKeys];
            int[] topologyIds = new int[numKeys];
            long[] versions = new long[numKeys];
            boolean[] isRemoved = new boolean[numKeys];
            int numRemoved = 0;
            int i = 0;
            for (Map.Entry entry : scheduledKeys.entrySet()) {
                keys[i] = entry.getKey();
                topologyIds[i] = ((InvalidationInfo)entry.getValue()).topologyId;
                versions[i] = ((InvalidationInfo)entry.getValue()).version;
                isRemoved[i] = ((InvalidationInfo)entry.getValue()).removal;
                if (isRemoved[i]) {
                    ++numRemoved;
                }
                ++i;
            }
            InvalidateVersionsCommand command = this.commandsFactory.buildInvalidateVersionsCommand(-1, keys, topologyIds, versions, false);
            this.sendRegularInvalidations(command, keys, topologyIds, versions, numRemoved, isRemoved, force);
        });
    }

    private void sendRegularInvalidations(InvalidateVersionsCommand command, Object[] keys, int[] topologyIds, long[] versions, int numRemoved, boolean[] isRemoved, boolean force) {
        CompletionStage<Map<Address, Response>> future = this.rpcManager.invokeCommandOnAll(command, MapResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions());
        future.whenComplete((r, t) -> {
            if (t != null) {
                log.failedInvalidatingRemoteCache((Throwable)t);
                this.sendRegularInvalidations(command, keys, topologyIds, versions, numRemoved, isRemoved, force);
            } else if (numRemoved > 0 || force) {
                this.regularInvalidationFinished(keys, topologyIds, versions, isRemoved, force);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void regularInvalidationFinished(Object[] keys, int[] topologyIds, long[] versions, boolean[] isRemoved, boolean force) {
        boolean needsSend;
        Lock readLock = this.removedKeysLock.readLock();
        readLock.lock();
        try {
            for (int i = 0; i < isRemoved.length; ++i) {
                if (!isRemoved[i]) continue;
                int topologyId = topologyIds[i];
                long version = versions[i];
                this.removedKeys.compute(keys[i], (k, prev) -> {
                    if (prev == null || prev.topologyId < topologyId || prev.topologyId == topologyId && prev.version < version) {
                        return new InvalidationInfo(topologyId, version);
                    }
                    return prev;
                });
            }
            needsSend = this.removedKeys.size() > this.invalidationBatchSize || force && !this.removedKeys.isEmpty();
        }
        finally {
            readLock.unlock();
        }
        if (needsSend) {
            this.tryRemovedInvalidations();
        }
    }

    private void tryRemovedInvalidations() {
        ConcurrentMap<K, InvalidationInfo> removedKeys;
        Lock writeLock = this.removedKeysLock.writeLock();
        writeLock.lock();
        try {
            removedKeys = this.removedKeys;
            this.removedKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
        }
        finally {
            writeLock.unlock();
        }
        this.executorService.execute(() -> {
            int numKeys = removedKeys.size();
            Object[] keys = new Object[numKeys];
            int[] topologyIds = new int[numKeys];
            long[] versions = new long[numKeys];
            int i = 0;
            for (Map.Entry entry : removedKeys.entrySet()) {
                keys[i] = entry.getKey();
                topologyIds[i] = ((InvalidationInfo)entry.getValue()).topologyId;
                versions[i] = ((InvalidationInfo)entry.getValue()).version;
            }
            InvalidateVersionsCommand removeCommand = this.commandsFactory.buildInvalidateVersionsCommand(-1, keys, topologyIds, versions, true);
            this.sendRemoveInvalidations(removeCommand);
        });
    }

    private void sendRemoveInvalidations(InvalidateVersionsCommand removeCommand) {
        this.rpcManager.invokeCommandOnAll(removeCommand, MapResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions()).whenComplete((r, t) -> {
            if (t != null) {
                log.failedInvalidatingRemoteCache((Throwable)t);
                this.sendRemoveInvalidations(removeCommand);
            } else {
                this.removeInvalidationsFinished();
            }
        });
        removeCommand.init(this.dataContainer, this.orderedUpdatesManager, null, null, null);
        removeCommand.invokeAsync();
    }

    protected void removeInvalidationsFinished() {
    }

    @Override
    public void clearInvalidations() {
        Lock writeLock1 = this.scheduledKeysLock.writeLock();
        writeLock1.lock();
        try {
            this.scheduledKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
        }
        finally {
            writeLock1.unlock();
        }
        Lock writeLock2 = this.removedKeysLock.writeLock();
        writeLock2.lock();
        try {
            this.removedKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
        }
        finally {
            writeLock2.unlock();
        }
    }

    private static class InvalidationInfo {
        public final int topologyId;
        public final long version;
        public final boolean removal;

        private InvalidationInfo(SimpleClusteredVersion version, boolean removal) {
            this.topologyId = version.topologyId;
            this.version = version.version;
            this.removal = removal;
        }

        private InvalidationInfo(int topologyId, long version) {
            this.topologyId = topologyId;
            this.version = version;
            this.removal = true;
        }
    }
}

