/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.MetadataAwareCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.ReadOnlyKeyCommand;
import org.infinispan.commands.functional.ReadOnlyManyCommand;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.commands.functional.ReadWriteKeyValueCommand;
import org.infinispan.commands.functional.ReadWriteManyCommand;
import org.infinispan.commands.functional.ReadWriteManyEntriesCommand;
import org.infinispan.commands.functional.WriteOnlyKeyCommand;
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetAllCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.remote.GetKeysInGroupCommand;
import org.infinispan.commands.write.ApplyDeltaCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commands.write.ComputeIfAbsentCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.ByRef;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.container.entries.RemoteMetadata;
import org.infinispan.container.entries.metadata.MetadataImmortalCacheValue;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.InequalVersionComparisonResult;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.group.impl.GroupManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.functional.impl.FunctionalNotifier;
import org.infinispan.interceptors.InvocationSuccessAction;
import org.infinispan.interceptors.distribution.ConcurrentChangeException;
import org.infinispan.interceptors.distribution.VersionedResult;
import org.infinispan.interceptors.impl.ClusteringInterceptor;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.NotifyHelper;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsuccessfulResponse;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.AllOwnersLostException;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class ScatteredDistributionInterceptor
extends ClusteringInterceptor {
    private static final Log log = LogFactory.getLog(ScatteredDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    protected ScatteredVersionManager<Object> svm;
    protected GroupManager groupManager;
    protected TimeService timeService;
    protected CacheNotifier cacheNotifier;
    protected FunctionalNotifier functionalNotifier;
    protected KeyPartitioner keyPartitioner;
    protected DistributionManager distributionManager;
    private volatile Address cachedNextMember;
    private volatile int cachedNextMemberTopology = -1;
    private final InvocationSuccessAction dataWriteCommandNoReadHandler = (rCtx, rCommand, rv) -> {
        DataWriteCommand dataWriteCommand = (DataWriteCommand)rCommand;
        CacheEntry entry = rCtx.lookupEntry(dataWriteCommand.getKey());
        boolean committed = this.commitSingleEntryIfNewer(entry, rCtx, dataWriteCommand);
        if (committed && rCtx.isOriginLocal() && !dataWriteCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
            this.svm.scheduleKeyInvalidation(dataWriteCommand.getKey(), entry.getMetadata().version(), entry.isRemoved());
        }
    };
    private final InvocationSuccessAction putMapCommandHandler = (rCtx, rCommand, rv) -> {
        PutMapCommand putMapCommand = (PutMapCommand)rCommand;
        for (Object key : putMapCommand.getAffectedKeys()) {
            this.commitSingleEntryIfNewer(rCtx.lookupEntry(key), rCtx, rCommand);
        }
    };
    private final InvocationSuccessAction clearHandler = (rCtx, rCommand, rv) -> {
        ArrayList copyEntries = new ArrayList(this.dataContainer.entrySet());
        this.dataContainer.clear();
        for (InternalCacheEntry internalCacheEntry : copyEntries) {
            this.cacheNotifier.notifyCacheEntryRemoved(internalCacheEntry.getKey(), internalCacheEntry.getValue(), internalCacheEntry.getMetadata(), false, rCtx, (ClearCommand)rCommand);
        }
    };

    @Inject
    public void injectDependencies(GroupManager groupManager, ScatteredVersionManager svm, TimeService timeService, CacheNotifier cacheNotifier, FunctionalNotifier functionalNotifier, KeyPartitioner keyPartitioner, DistributionManager distributionManager) {
        this.groupManager = groupManager;
        this.svm = svm;
        this.timeService = timeService;
        this.cacheNotifier = cacheNotifier;
        this.functionalNotifier = functionalNotifier;
        this.keyPartitioner = keyPartitioner;
        this.distributionManager = distributionManager;
    }

    private <T extends DataWriteCommand & MetadataAwareCommand> Object handleWriteCommand(InvocationContext ctx, T command) throws Throwable {
        CacheEntry contextEntry;
        CacheEntry cacheEntry = ctx.lookupEntry(command.getKey());
        EntryVersion seenVersion = this.getVersionOrNull(cacheEntry);
        LocalizedCacheTopology cacheTopology = this.checkTopology(command);
        DistributionInfo info = cacheTopology.getDistribution(command.getKey());
        if (info.primary() == null) {
            throw AllOwnersLostException.INSTANCE;
        }
        if (this.isLocalModeForced(command)) {
            CacheEntry contextEntry2 = cacheEntry;
            if (cacheEntry == null) {
                this.entryFactory.wrapExternalEntry(ctx, command.getKey(), null, false, true);
                contextEntry2 = ctx.lookupEntry(command.getKey());
            }
            if (command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                contextEntry2.setMetadata(((MetadataAwareCommand)command).getMetadata());
            } else if (info.isPrimary()) {
                if (cacheTopology.getTopologyId() == 0) {
                    contextEntry2.setMetadata(((MetadataAwareCommand)command).getMetadata());
                    this.svm.updatePreloadedEntryVersion(((MetadataAwareCommand)command).getMetadata().version());
                } else {
                    EntryVersion nextVersion = this.svm.incrementVersion(info.segmentId());
                    contextEntry2.setMetadata(ScatteredDistributionInterceptor.addVersion(((MetadataAwareCommand)command).getMetadata(), nextVersion));
                }
            }
            return this.commitSingleEntryOnReturn(ctx, command, contextEntry2, contextEntry2.getValue(), seenVersion);
        }
        if (ctx.isOriginLocal()) {
            if (info.isPrimary()) {
                Object seenValue = cacheEntry.getValue();
                return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> this.handleWriteOnOriginPrimary(rCtx, (DataWriteCommand)rCommand, rv, cacheEntry, seenValue, seenVersion, cacheTopology, info));
            }
            CompletableFuture<Map<Address, Response>> rpcFuture = this.rpcManager.invokeRemotelyAsync(info.writeOwners(), command, this.defaultSyncOptions);
            return ScatteredDistributionInterceptor.asyncValue(rpcFuture.thenApply(responseMap -> this.handleWritePrimaryResponse(ctx, command, (Map<Address, Response>)responseMap)));
        }
        if (info.isPrimary()) {
            Object seenValue = cacheEntry.getValue();
            return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
                DataWriteCommand cmd = (DataWriteCommand)rCommand;
                if (!cmd.isSuccessful()) {
                    if (trace) {
                        log.tracef("Skipping the replication of the command as it did not succeed on primary owner (%s).", cmd);
                    }
                    return rv;
                }
                EntryVersion nextVersion = this.svm.incrementVersion(info.segmentId());
                Metadata metadata = ScatteredDistributionInterceptor.addVersion(((MetadataAwareCommand)((Object)cmd)).getMetadata(), nextVersion);
                cacheEntry.setMetadata(metadata);
                ((MetadataAwareCommand)((Object)cmd)).setMetadata(metadata);
                if (cmd.loadType() != VisitableCommand.LoadType.DONT_LOAD) {
                    this.commitSingleEntryIfNoChange(seenValue, seenVersion, cacheEntry, rCtx, cmd);
                } else {
                    this.commitSingleEntryIfNewer(cacheEntry, rCtx, cmd);
                }
                if (cmd.isReturnValueExpected()) {
                    return new MetadataImmortalCacheValue(rv, metadata);
                }
                command.setFlagsBitSet(command.getFlagsBitSet() & ((FlagBitSets.IGNORE_RETURN_VALUES | FlagBitSets.SKIP_REMOTE_LOOKUP) ^ 0xFFFFFFFFFFFFFFFFL));
                return metadata.version();
            });
        }
        assert (cacheEntry == null || command.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK));
        if (cacheEntry == null) {
            this.entryFactory.wrapExternalEntry(ctx, command.getKey(), null, false, true);
            contextEntry = ctx.lookupEntry(command.getKey());
        } else {
            contextEntry = cacheEntry;
        }
        contextEntry.setMetadata(((MetadataAwareCommand)command).getMetadata());
        return this.invokeNextThenApply(ctx, command, (rCtx, rCommand, rv) -> {
            this.commitSingleEntryIfNewer(contextEntry, rCtx, rCommand);
            return null;
        });
    }

    private <T extends DataWriteCommand & MetadataAwareCommand> Object handleWriteOnOriginPrimary(InvocationContext ctx, T command, Object rv, CacheEntry cacheEntry, Object seenValue, EntryVersion seenVersion, CacheTopology cacheTopology, DistributionInfo info) {
        if (!command.isSuccessful()) {
            if (trace) {
                log.tracef("Skipping the replication of the command as it did not succeed on primary owner (%s).", command);
            }
            return rv;
        }
        EntryVersion nextVersion = this.svm.incrementVersion(info.segmentId());
        Metadata metadata = ScatteredDistributionInterceptor.addVersion(((MetadataAwareCommand)command).getMetadata(), nextVersion);
        cacheEntry.setMetadata(metadata);
        ((MetadataAwareCommand)command).setMetadata(metadata);
        boolean committed = command.loadType() != VisitableCommand.LoadType.DONT_LOAD ? this.commitSingleEntryIfNoChange(seenValue, seenVersion, cacheEntry, ctx, command) : this.commitSingleEntryIfNewer(cacheEntry, ctx, command);
        command.setValueMatcher(ValueMatcher.MATCH_ALWAYS);
        command.addFlags(FlagBitSets.SKIP_OWNERSHIP_CHECK);
        Address backup = this.getNextMember(cacheTopology);
        if (backup != null) {
            CompletableFuture<Map<Address, Response>> rpcFuture = this.rpcManager.invokeRemotelyAsync(Collections.singletonList(backup), command, this.defaultSyncOptions);
            rpcFuture.thenRun(() -> {
                if (committed && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                    this.svm.scheduleKeyInvalidation(command.getKey(), cacheEntry.getMetadata().version(), cacheEntry.isRemoved());
                }
            });
            return ScatteredDistributionInterceptor.asyncValue(rpcFuture.thenApply(ignore -> rv));
        }
        return rv;
    }

    private <T extends DataWriteCommand & MetadataAwareCommand> Object handleWritePrimaryResponse(InvocationContext ctx, T command, Map<Address, Response> responseMap) {
        Object value;
        EntryVersion version;
        Response response = ScatteredDistributionInterceptor.getSingleResponse(responseMap);
        if (!response.isSuccessful()) {
            command.fail();
            if (response instanceof UnsuccessfulResponse) {
                return ((UnsuccessfulResponse)response).getResponseValue();
            }
            throw new CacheException("Unexpected response " + response);
        }
        Object responseValue = ((SuccessfulResponse)response).getResponseValue();
        if (command.isReturnValueExpected()) {
            if (!(responseValue instanceof MetadataImmortalCacheValue)) {
                throw new CacheException("Expected MetadataImmortalCacheValue as response but it is " + responseValue);
            }
            MetadataImmortalCacheValue micv = (MetadataImmortalCacheValue)responseValue;
            version = micv.getMetadata().version();
            value = micv.getValue();
        } else {
            if (!(responseValue instanceof EntryVersion)) {
                throw new CacheException("Expected NumericVersion as response but it is " + responseValue);
            }
            version = (EntryVersion)responseValue;
            value = null;
        }
        Metadata metadata = ScatteredDistributionInterceptor.addVersion(((MetadataAwareCommand)command).getMetadata(), version);
        this.entryFactory.wrapExternalEntry(ctx, command.getKey(), null, false, true);
        CacheEntry cacheEntry = ctx.lookupEntry(command.getKey());
        cacheEntry.setMetadata(metadata);
        command.setValueMatcher(ValueMatcher.MATCH_ALWAYS);
        return this.invokeNextThenApply(ctx, command, (ctx1, command1, rv) -> {
            DataWriteCommand cmd = (DataWriteCommand)command1;
            boolean committed = this.commitSingleEntryIfNewer(cacheEntry, ctx1, cmd);
            if (committed && !cmd.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                this.svm.scheduleKeyInvalidation(cmd.getKey(), cacheEntry.getMetadata().version(), cacheEntry.isRemoved());
            }
            return value;
        });
    }

    private <T extends FlagAffectedCommand & TopologyAffectedCommand> LocalizedCacheTopology checkTopology(T command) {
        LocalizedCacheTopology cacheTopology = this.distributionManager.getCacheTopology();
        if (!command.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK) && ((TopologyAffectedCommand)command).getTopologyId() >= 0 && ((TopologyAffectedCommand)command).getTopologyId() != cacheTopology.getTopologyId()) {
            throw new OutdatedTopologyException(((TopologyAffectedCommand)command).getTopologyId());
        }
        if (trace) {
            log.tracef("%s has topology %d (current is %d)", command, ((TopologyAffectedCommand)command).getTopologyId(), cacheTopology.getTopologyId());
        }
        return cacheTopology;
    }

    private Object commitSingleEntryOnReturn(InvocationContext ctx, DataWriteCommand command, CacheEntry cacheEntry, Object prevValue, EntryVersion prevVersion) {
        if (command.loadType() != VisitableCommand.LoadType.DONT_LOAD) {
            return this.invokeNextThenAccept(ctx, command, (rCtx, rCommand, rv) -> {
                DataWriteCommand dataWriteCommand = (DataWriteCommand)rCommand;
                boolean committed = this.commitSingleEntryIfNoChange(prevValue, prevVersion, cacheEntry, rCtx, rCommand);
                if (committed && rCtx.isOriginLocal() && !dataWriteCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                    this.svm.scheduleKeyInvalidation(dataWriteCommand.getKey(), cacheEntry.getMetadata().version(), cacheEntry.isRemoved());
                }
            });
        }
        return this.invokeNextThenAccept(ctx, command, this.dataWriteCommandNoReadHandler);
    }

    private boolean commitSingleEntryIfNewer(CacheEntry entry, InvocationContext ctx, VisitableCommand command) {
        if (!entry.isChanged() && trace) {
            log.tracef("Entry has not changed, not committing", new Object[0]);
            return false;
        }
        if (entry.isRemoved()) {
            entry.setValue(null);
        }
        ByRef previousValue = new ByRef(null);
        ByRef previousMetadata = new ByRef(null);
        ByRef.Boolean successful = new ByRef.Boolean(false);
        this.dataContainer.compute(entry.getKey(), (key, oldEntry, factory) -> {
            InequalVersionComparisonResult comparisonResult;
            Metadata newMetadata = entry.getMetadata();
            if (oldEntry == null) {
                if (entry.getValue() == null && newMetadata == null) {
                    if (trace) {
                        log.trace("No previous record and this is a removal, not committing anything.");
                    }
                    return null;
                }
                if (trace) {
                    log.trace("Committing new entry " + entry);
                }
                successful.set(true);
                return factory.create(entry);
            }
            Metadata oldMetadata = oldEntry.getMetadata();
            if (oldMetadata == null || oldMetadata.version() == null || newMetadata == null || newMetadata.version() == null || (comparisonResult = oldMetadata.version().compareTo(newMetadata.version())) == InequalVersionComparisonResult.BEFORE || oldMetadata instanceof RemoteMetadata && comparisonResult == InequalVersionComparisonResult.EQUAL) {
                previousValue.set(oldEntry.getValue());
                previousValue.set((Object)oldMetadata);
                if (trace) {
                    log.tracef("Committing entry %s, replaced %s", entry, oldEntry);
                }
                successful.set(true);
                if (entry.getValue() != null || newMetadata != null) {
                    return factory.create(entry);
                }
                return null;
            }
            if (trace) {
                log.tracef("Not committing %s, current entry is %s", entry, oldEntry);
            }
            return oldEntry;
        });
        boolean created = entry.isCreated();
        boolean removed = entry.isRemoved();
        boolean expired = false;
        if (removed && entry instanceof MVCCEntry) {
            expired = ((MVCCEntry)entry).isExpired();
        }
        if (successful.get()) {
            NotifyHelper.entryCommitted(this.cacheNotifier, this.functionalNotifier, created, removed, expired, entry, ctx, (FlagAffectedCommand)command, previousValue.get(), (Metadata)previousMetadata.get());
            return true;
        }
        return false;
    }

    private boolean commitSingleEntryIfNoChange(Object seenValue, EntryVersion seenVersion, CacheEntry entry, InvocationContext ctx, VisitableCommand command) {
        if (!entry.isChanged() && trace) {
            log.tracef("Entry has not changed, not committing", new Object[0]);
            return false;
        }
        if (entry.isRemoved()) {
            entry.setValue(null);
        }
        ByRef previousValue = new ByRef(null);
        ByRef previousMetadata = new ByRef(null);
        ByRef.Boolean successful = new ByRef.Boolean(false);
        this.dataContainer.compute(entry.getKey(), (key, oldEntry, factory) -> {
            InequalVersionComparisonResult comparisonResult;
            EntryVersion oldVersion;
            Metadata oldMetadata;
            Metadata newMetadata;
            block19: {
                block20: {
                    block18: {
                        newMetadata = entry.getMetadata();
                        if (oldEntry == null) {
                            if (seenValue != null) {
                                if (trace) {
                                    log.trace("Non-null value in context, not committing");
                                }
                                throw new ConcurrentChangeException();
                            }
                            if (entry.getValue() == null && newMetadata == null) {
                                if (trace) {
                                    log.trace("No previous record and this is a removal, not committing anything.");
                                }
                                return null;
                            }
                            if (trace) {
                                log.trace("Committing new entry " + entry);
                            }
                            successful.set(true);
                            return factory.create(entry);
                        }
                        oldMetadata = oldEntry.getMetadata();
                        EntryVersion entryVersion = oldVersion = oldMetadata == null ? null : oldMetadata.version();
                        if (oldVersion != null) break block18;
                        if (seenVersion != null) {
                            if (trace) {
                                log.tracef("Current version is null but seen version is %s, throwing", seenVersion);
                            }
                            throw new ConcurrentChangeException();
                        }
                        break block19;
                    }
                    if (seenVersion != null) break block20;
                    if (oldEntry.canExpire() && oldEntry.isExpired(this.timeService.wallClockTime())) {
                        if (trace) {
                            log.trace("Current entry is expired and therefore we haven't seen it");
                        }
                        break block19;
                    } else {
                        if (trace) {
                            log.tracef("Current version is %s but seen version is null, throwing", oldVersion);
                        }
                        throw new ConcurrentChangeException();
                    }
                }
                if (seenVersion.compareTo(oldVersion) != InequalVersionComparisonResult.EQUAL) {
                    if (trace) {
                        log.tracef("Current version is %s but seen version is %s, throwing", oldVersion, seenVersion);
                    }
                    throw new ConcurrentChangeException();
                }
            }
            if (oldVersion == null || newMetadata == null || newMetadata.version() == null || (comparisonResult = oldMetadata.version().compareTo(newMetadata.version())) == InequalVersionComparisonResult.BEFORE || oldMetadata instanceof RemoteMetadata && comparisonResult == InequalVersionComparisonResult.EQUAL) {
                previousValue.set(oldEntry.getValue());
                previousValue.set((Object)oldMetadata);
                if (trace) {
                    log.tracef("Committing entry %s, replaced %s", entry, oldEntry);
                }
                successful.set(true);
                if (entry.getValue() == null && newMetadata == null) {
                    return null;
                }
                return factory.create(entry);
            }
            if (trace) {
                log.tracef("Not committing %s, current entry is %s", entry, oldEntry);
            }
            return oldEntry;
        });
        boolean created = entry.isCreated();
        boolean removed = entry.isRemoved();
        boolean expired = false;
        if (removed && entry instanceof MVCCEntry) {
            expired = ((MVCCEntry)entry).isExpired();
        }
        if (successful.get()) {
            NotifyHelper.entryCommitted(this.cacheNotifier, this.functionalNotifier, created, removed, expired, entry, ctx, (FlagAffectedCommand)command, previousValue.get(), (Metadata)previousMetadata.get());
            return true;
        }
        return false;
    }

    private EntryVersion getVersionOrNull(CacheEntry cacheEntry) {
        if (cacheEntry == null) {
            return null;
        }
        Metadata metadata = cacheEntry.getMetadata();
        if (metadata != null) {
            return metadata.version();
        }
        return null;
    }

    private static Metadata addVersion(Metadata metadata, EntryVersion nextVersion) {
        Metadata.Builder builder = metadata == null ? new EmbeddedMetadata.Builder() : metadata.builder();
        metadata = builder.version(nextVersion).build();
        return metadata;
    }

    private Address getNextMember(CacheTopology cacheTopology) {
        if (cacheTopology.getTopologyId() == this.cachedNextMemberTopology) {
            return this.cachedNextMember;
        }
        ConsistentHash ch = cacheTopology.getWriteConsistentHash();
        List<Address> members = ch.getMembers();
        Address address = this.rpcManager.getAddress();
        Address nextMember = null;
        if (members.size() > 1) {
            for (int i = 0; i < members.size(); ++i) {
                Address member = members.get(i);
                if (!member.equals(address)) continue;
                if (i + 1 < members.size()) {
                    nextMember = members.get(i + 1);
                    break;
                }
                nextMember = members.get(0);
                break;
            }
        }
        this.cachedNextMember = nextMember;
        this.cachedNextMemberTopology = cacheTopology.getTopologyId();
        return nextMember;
    }

    private Object handleReadCommand(InvocationContext ctx, AbstractDataCommand command) throws Throwable {
        LocalizedCacheTopology cacheTopology = this.checkTopology(command);
        DistributionInfo info = cacheTopology.getDistribution(command.getKey());
        if (info.primary() == null) {
            throw AllOwnersLostException.INSTANCE;
        }
        if (info.isPrimary()) {
            if (trace) {
                log.tracef("In topology %d this is primary owner", cacheTopology.getTopologyId());
            }
            return this.invokeNext(ctx, command);
        }
        if (command.hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK)) {
            if (trace) {
                log.trace("Ignoring ownership");
            }
            return this.invokeNext(ctx, command);
        }
        if (ctx.isOriginLocal()) {
            if (this.isLocalModeForced(command) || command.hasAnyFlag(FlagBitSets.SKIP_REMOTE_LOOKUP)) {
                if (ctx.lookupEntry(command.getKey()) == null) {
                    this.entryFactory.wrapExternalEntry(ctx, command.getKey(), NullCacheEntry.getInstance(), false, false);
                }
                return this.invokeNext(ctx, command);
            }
            ClusteredGetCommand clusteredGetCommand = this.cf.buildClusteredGetCommand(command.getKey(), command.getFlagsBitSet());
            clusteredGetCommand.setTopologyId(command.getTopologyId());
            CompletableFuture<Map<Address, Response>> rpcFuture = this.rpcManager.invokeRemotelyAsync(Collections.singletonList(info.primary()), clusteredGetCommand, this.syncIgnoreLeavers);
            Object key = clusteredGetCommand.getKey();
            return this.asyncInvokeNext(ctx, (VisitableCommand)command, (CompletableFuture<?>)rpcFuture.thenAccept(responseMap -> {
                Response response = ScatteredDistributionInterceptor.getSingleResponse(responseMap);
                if (response.isSuccessful()) {
                    InternalCacheValue value = (InternalCacheValue)((SuccessfulResponse)response).getResponseValue();
                    if (value != null) {
                        InternalCacheEntry cacheEntry = value.toInternalCacheEntry(key);
                        this.entryFactory.wrapExternalEntry(ctx, key, cacheEntry, true, false);
                    } else {
                        this.entryFactory.wrapExternalEntry(ctx, key, NullCacheEntry.getInstance(), false, false);
                    }
                } else {
                    if (response instanceof UnsureResponse) {
                        throw OutdatedTopologyException.INSTANCE;
                    }
                    if (response instanceof CacheNotFoundResponse) {
                        throw AllOwnersLostException.INSTANCE;
                    }
                    throw new IllegalArgumentException("Unexpected response " + response);
                }
            }));
        }
        return UnsureResponse.INSTANCE;
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        LocalizedCacheTopology cacheTopology = this.checkTopology(command);
        Map<Object, Object> originalMap = command.getMap();
        if (command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
            this.extractAndSetMetadata(ctx, command, originalMap);
            return this.invokeNextThenAccept(ctx, command, this.putMapCommandHandler);
        }
        if (ctx.isOriginLocal()) {
            return this.invokeNextThenApply(ctx, command, (returnCtx, returnCommand, rv) -> this.handlePutMapOnOrigin(returnCtx, (PutMapCommand)returnCommand, rv, originalMap, cacheTopology));
        }
        if (command.isForwarded()) {
            this.extractAndSetMetadata(ctx, command, originalMap);
            return this.invokeNextThenAccept(ctx, command, this.putMapCommandHandler);
        }
        HashMap<Object, VersionedResult> versionMap = new HashMap<Object, VersionedResult>(originalMap.size());
        for (Map.Entry<Object, Object> entry : originalMap.entrySet()) {
            Object key = entry.getKey();
            CacheEntry cacheEntry = ctx.lookupEntry(key);
            if (cacheEntry == null) {
                throw new IllegalStateException("Not wrapped " + key);
            }
            EntryVersion version = this.svm.incrementVersion(this.keyPartitioner.getSegment(key));
            cacheEntry.setMetadata(ScatteredDistributionInterceptor.addVersion(command.getMetadata(), version));
            versionMap.put(key, new VersionedResult(null, version));
        }
        command.setFlagsBitSet(command.getFlagsBitSet() & (FlagBitSets.IGNORE_RETURN_VALUES ^ 0xFFFFFFFFFFFFFFFFL));
        return this.invokeNextThenApply(ctx, command, (ctx1, command1, rv) -> {
            for (Object key : ((PutMapCommand)command1).getAffectedKeys()) {
                this.commitSingleEntryIfNewer(ctx1.lookupEntry(key), ctx1, command1);
            }
            if (rv instanceof Map) {
                Map resultMap = (Map)rv;
                for (Map.Entry entry : resultMap.entrySet()) {
                    versionMap.compute(entry.getKey(), (k, vr) -> new VersionedResult(entry.getValue(), vr.version));
                }
            }
            return versionMap;
        });
    }

    private Object handlePutMapOnOrigin(InvocationContext ctx, PutMapCommand command, Object rv, Map<Object, Object> originalMap, LocalizedCacheTopology cacheTopology) {
        Address backup;
        if (!command.isSuccessful()) {
            return null;
        }
        Map<Object, CacheEntry> lookedUpEntries = ctx.getLookedUpEntries();
        HashMap<Address, Map> remoteEntries = new HashMap<Address, Map>();
        HashMap<Object, MetadataImmortalCacheValue> localEntries = new HashMap<Object, MetadataImmortalCacheValue>();
        for (Map.Entry<Object, Object> entry : originalMap.entrySet()) {
            Object key = entry.getKey();
            DistributionInfo info = cacheTopology.getDistribution(key);
            if (info.isPrimary()) {
                CacheEntry ctxEntry = lookedUpEntries.get(key);
                if (ctxEntry == null) {
                    throw new CacheException("Entry not looked up for " + key);
                }
                EntryVersion version = this.svm.incrementVersion(info.segmentId());
                Metadata metadata = new EmbeddedMetadata.Builder().version(version).build();
                ctxEntry.setMetadata(metadata);
                localEntries.put(key, new MetadataImmortalCacheValue(entry.getValue(), metadata));
                this.commitSingleEntryIfNewer(ctxEntry, ctx, command);
                continue;
            }
            if (info.primary() == null) {
                throw AllOwnersLostException.INSTANCE;
            }
            Map currentEntries = remoteEntries.computeIfAbsent(info.primary(), k -> new HashMap());
            currentEntries.put(key, entry.getValue());
        }
        PutMapFuture allFuture = new PutMapFuture(command, remoteEntries.size(), (Map)rv);
        if (!localEntries.isEmpty() && (backup = this.getNextMember(cacheTopology)) != null) {
            allFuture.counter++;
            PutMapCommand backupCommand = this.cf.buildPutMapCommand(localEntries, command.getMetadata(), command.getFlagsBitSet());
            backupCommand.setForwarded(true);
            this.rpcManager.invokeRemotelyAsync(Collections.singleton(backup), backupCommand, this.defaultSyncOptions).whenComplete((r, t) -> {
                if (t != null) {
                    allFuture.completeExceptionally((Throwable)t);
                } else {
                    PutMapFuture putMapFuture = allFuture;
                    synchronized (putMapFuture) {
                        if (--allFuture.counter == 0) {
                            allFuture.complete(allFuture.map);
                        }
                    }
                    for (Map.Entry entry : localEntries.entrySet()) {
                        this.svm.scheduleKeyInvalidation(entry.getKey(), ((InternalCacheValue)entry.getValue()).getMetadata().version(), false);
                    }
                }
            });
        }
        for (Map.Entry ownerEntry : remoteEntries.entrySet()) {
            Address owner = (Address)ownerEntry.getKey();
            PutMapCommand toPrimary = this.cf.buildPutMapCommand((Map)ownerEntry.getValue(), command.getMetadata(), command.getFlagsBitSet());
            CompletableFuture<Map<Address, Response>> rpcFuture = this.rpcManager.invokeRemotelyAsync(Collections.singletonList(owner), toPrimary, this.defaultSyncOptions);
            rpcFuture.whenComplete((responseMap, t) -> {
                if (t != null) {
                    allFuture.completeExceptionally((Throwable)t);
                    return;
                }
                SuccessfulResponse response = ScatteredDistributionInterceptor.getSuccessfulResponseOrFail(responseMap, allFuture, null);
                if (response == null) {
                    return;
                }
                Object responseValue = response.getResponseValue();
                if (!(responseValue instanceof Map)) {
                    allFuture.completeExceptionally(new CacheException("Response from " + owner + ": expected Map<?, VersionedResult> but it is " + responseValue).fillInStackTrace());
                    return;
                }
                Map versions = (Map)responseValue;
                PutMapFuture putMapFuture = allFuture;
                synchronized (putMapFuture) {
                    if (allFuture.isDone()) {
                        return;
                    }
                    for (Map.Entry entry : versions.entrySet()) {
                        this.entryFactory.wrapExternalEntry(ctx, entry.getKey(), null, false, true);
                        CacheEntry cacheEntry = ctx.lookupEntry(entry.getKey());
                        VersionedResult result = (VersionedResult)entry.getValue();
                        if (result.result != null) {
                            if (allFuture.map == null) {
                                allFuture.map = new HashMap();
                            }
                            allFuture.map.put(entry.getKey(), result.result);
                        }
                        Metadata metadata = ScatteredDistributionInterceptor.addVersion(command.getMetadata(), result.version);
                        cacheEntry.setValue(originalMap.get(entry.getKey()));
                        cacheEntry.setMetadata(metadata);
                        cacheEntry.setChanged(true);
                        boolean committed = this.commitSingleEntryIfNewer(cacheEntry, ctx, command);
                        if (!committed || command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) continue;
                        this.svm.scheduleKeyInvalidation(entry.getKey(), result.version, false);
                    }
                    if (--allFuture.counter == 0) {
                        allFuture.complete(allFuture.map);
                    }
                }
            });
        }
        return ScatteredDistributionInterceptor.asyncValue(allFuture);
    }

    protected void extractAndSetMetadata(InvocationContext ctx, PutMapCommand command, Map<Object, Object> originalMap) {
        HashMap<Object, Object> valueMap = new HashMap<Object, Object>(originalMap.size());
        for (Map.Entry<Object, Object> entry : originalMap.entrySet()) {
            Object key = entry.getKey();
            CacheEntry cacheEntry = ctx.lookupEntry(key);
            if (cacheEntry == null) {
                this.entryFactory.wrapExternalEntry(ctx, key, null, false, true);
                cacheEntry = ctx.lookupEntry(key);
            }
            InternalCacheValue value = (InternalCacheValue)entry.getValue();
            Metadata entryMetadata = command.getMetadata() == null ? value.getMetadata() : command.getMetadata().builder().version(value.getMetadata().version()).build();
            cacheEntry.setMetadata(entryMetadata);
            valueMap.put(key, value.getValue());
        }
        command.setMap(valueMap);
    }

    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.handleReadCommand(ctx, command);
    }

    @Override
    public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) throws Throwable {
        LocalizedCacheTopology cacheTopology = this.checkTopology(command);
        if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP | FlagBitSets.SKIP_OWNERSHIP_CHECK)) {
            return this.invokeNext(ctx, command);
        }
        if (ctx.isOriginLocal()) {
            HashMap<Address, List> remoteKeys = new HashMap<Address, List>();
            for (Object key : command.getKeys()) {
                DistributionInfo info = cacheTopology.getDistribution(key);
                if (info.primary() == null) {
                    throw AllOwnersLostException.INSTANCE;
                }
                if (info.isPrimary()) continue;
                remoteKeys.computeIfAbsent(info.primary(), k -> new ArrayList()).add(key);
            }
            if (remoteKeys.isEmpty()) {
                return this.invokeNext(ctx, command);
            }
            ClusteringInterceptor.ClusteredGetAllFuture sync = new ClusteringInterceptor.ClusteredGetAllFuture(this, remoteKeys.size(), command);
            for (Map.Entry remote : remoteKeys.entrySet()) {
                List keys = (List)remote.getValue();
                ClusteredGetAllCommand clusteredGetAllCommand = this.cf.buildClusteredGetAllCommand(keys, command.getFlagsBitSet(), null);
                clusteredGetAllCommand.setTopologyId(command.getTopologyId());
                CompletableFuture<Map<Address, Response>> rpcFuture = this.rpcManager.invokeRemotelyAsync(Collections.singleton(remote.getKey()), clusteredGetAllCommand, this.syncIgnoreLeavers);
                rpcFuture.whenComplete((responseMap, throwable) -> this.handleGetAllResponse((Map<Address, Response>)responseMap, (Throwable)throwable, ctx, keys, sync));
            }
            return this.asyncInvokeNext(ctx, (VisitableCommand)command, sync);
        }
        for (Object key : command.getKeys()) {
            if (ctx.lookupEntry(key) != null) continue;
            return UnsureResponse.INSTANCE;
        }
        return this.invokeNext(ctx, command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleGetAllResponse(Map<Address, Response> responseMap, Throwable throwable, InvocationContext ctx, List<?> keys, ClusteringInterceptor.ClusteredGetAllFuture allFuture) {
        if (throwable != null) {
            allFuture.completeExceptionally(throwable);
            return;
        }
        SuccessfulResponse response = ScatteredDistributionInterceptor.getSuccessfulResponseOrFail(responseMap, allFuture, rsp -> allFuture.completeExceptionally(rsp instanceof UnsureResponse ? OutdatedTopologyException.INSTANCE : AllOwnersLostException.INSTANCE));
        if (response == null) {
            return;
        }
        Object responseValue = response.getResponseValue();
        if (!(responseValue instanceof InternalCacheValue[])) {
            allFuture.completeExceptionally(new IllegalStateException("Unexpected response value: " + responseValue));
            return;
        }
        Object[] values = (InternalCacheValue[])responseValue;
        if (keys.size() != values.length) {
            allFuture.completeExceptionally(new CacheException("Request and response lengths differ: keys=" + keys + ", response=" + Arrays.toString(values)));
            return;
        }
        ClusteringInterceptor.ClusteredGetAllFuture clusteredGetAllFuture = allFuture;
        synchronized (clusteredGetAllFuture) {
            if (allFuture.isDone()) {
                return;
            }
            for (int i = 0; i < values.length; ++i) {
                Object key = keys.get(i);
                Object value = values[i];
                NullCacheEntry entry = value == null ? NullCacheEntry.getInstance() : value.toInternalCacheEntry(key);
                this.entryFactory.wrapExternalEntry(ctx, key, entry, true, false);
            }
            if (--allFuture.counter == 0) {
                allFuture.complete(null);
            }
        }
    }

    @Override
    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
        this.svm.clearInvalidations();
        if (ctx.isOriginLocal() && !this.isLocalModeForced(command)) {
            RpcOptions rpcOptions = this.isSynchronous(command) ? this.syncIgnoreLeavers : this.defaultAsyncOptions;
            return ScatteredDistributionInterceptor.makeStage(this.asyncInvokeNext(ctx, (VisitableCommand)command, this.rpcManager.invokeRemotelyAsync(null, command, rpcOptions))).thenAccept(ctx, command, this.clearHandler);
        }
        return this.invokeNextThenAccept(ctx, command, this.clearHandler);
    }

    @Override
    public Object visitApplyDeltaCommand(InvocationContext ctx, ApplyDeltaCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitReadOnlyManyCommand(InvocationContext ctx, ReadOnlyManyCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitWriteOnlyKeyCommand(InvocationContext ctx, WriteOnlyKeyCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitReadWriteKeyValueCommand(InvocationContext ctx, ReadWriteKeyValueCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitReadWriteKeyCommand(InvocationContext ctx, ReadWriteKeyCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyValueCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitWriteOnlyManyCommand(InvocationContext ctx, WriteOnlyManyCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitReadWriteManyCommand(InvocationContext ctx, ReadWriteManyCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object visitReadWriteManyEntriesCommand(InvocationContext ctx, ReadWriteManyEntriesCommand command) throws Throwable {
        throw new UnsupportedOperationException();
    }

    @Override
    public final Object visitGetKeysInGroupCommand(InvocationContext ctx, GetKeysInGroupCommand command) throws Throwable {
        Object groupName = command.getGroupName();
        if (command.isGroupOwner()) {
            return this.invokeNext(ctx, command);
        }
        CompletionStage future = this.rpcManager.invokeRemotelyAsync(Collections.singleton(this.groupManager.getPrimaryOwner(groupName)), command, this.defaultSyncOptions).thenAccept(responses -> {
            Response response;
            if (!responses.isEmpty() && (response = (Response)responses.values().iterator().next()) instanceof SuccessfulResponse) {
                List cacheEntries = (List)((SuccessfulResponse)response).getResponseValue();
                for (CacheEntry entry : cacheEntries) {
                    this.entryFactory.wrapExternalEntry(ctx, entry.getKey(), entry, true, false);
                }
            }
        });
        return this.asyncInvokeNext(ctx, (VisitableCommand)command, (CompletableFuture<?>)future);
    }

    @Override
    protected Log getLog() {
        return log;
    }

    private static class PutMapFuture
    extends CompletableFuture<Map<Object, Object>> {
        private PutMapCommand command;
        private int counter;
        private Map<Object, Object> map;

        public PutMapFuture(PutMapCommand command, int counter, Map<Object, Object> map) {
            this.command = command;
            this.counter = counter;
            this.map = map;
        }

        @Override
        public synchronized boolean completeExceptionally(Throwable ex) {
            this.command.fail();
            return super.completeExceptionally(ex);
        }
    }
}

