/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.read.AbstractDataCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.tx.TransactionBoundaryCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.EntryFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.EntryVersionsMap;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.distribution.BaseDistributionInterceptor;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.DeltaCompositeKeyUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class TxDistributionInterceptor
extends BaseDistributionInterceptor {
    private static Log log = LogFactory.getLog(TxDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    private PartitionHandlingManager partitionHandlingManager;
    private boolean useClusteredWriteSkewCheck;

    @Inject
    public void inject(PartitionHandlingManager partitionHandlingManager) {
        this.partitionHandlingManager = partitionHandlingManager;
    }

    @Start
    public void start() {
        this.useClusteredWriteSkewCheck = Configurations.isVersioningEnabled(this.cacheConfiguration);
    }

    @Override
    public CompletableFuture<Void> visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            ctx.onReturn((rCtx, rCommand, rv, throwable) -> {
                ReplaceCommand replaceCommand;
                replaceCommand.setValueMatcher((replaceCommand = (ReplaceCommand)rCommand).isSuccessful() ? ValueMatcher.MATCH_ALWAYS : ValueMatcher.MATCH_NEVER);
                return null;
            });
        }
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public CompletableFuture<Void> visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            ctx.onReturn((rCtx, rCommand, rv, throwable) -> {
                RemoveCommand removeCommand;
                removeCommand.setValueMatcher((removeCommand = (RemoveCommand)rCommand).isSuccessful() ? ValueMatcher.MATCH_ALWAYS : ValueMatcher.MATCH_NEVER);
                return null;
            });
        }
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public CompletableFuture<Void> visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
            return this.handleNonTxWriteCommand(ctx, command);
        }
        if (ctx.isOriginLocal()) {
            ctx.onReturn((rCtx, rCommand, rv, throwable) -> {
                PutKeyValueCommand putKeyValueCommand;
                putKeyValueCommand.setValueMatcher((putKeyValueCommand = (PutKeyValueCommand)rCommand).isSuccessful() ? ValueMatcher.MATCH_ALWAYS : ValueMatcher.MATCH_NEVER);
                return null;
            });
        }
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public CompletableFuture<Void> visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return ctx.continueInvocation();
    }

    @Override
    public CompletableFuture<Void> visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
        return this.visitGetCommand(ctx, command);
    }

    @Override
    public CompletableFuture<Void> visitGetCacheEntryCommand(InvocationContext ctx, GetCacheEntryCommand command) throws Throwable {
        return this.visitGetCommand(ctx, command);
    }

    private CompletableFuture<Void> visitGetCommand(InvocationContext ctx, AbstractDataCommand command) throws Throwable {
        Object key = command.getKey();
        CacheEntry entry = ctx.lookupEntry(key);
        if (ctx.isOriginLocal() && this.valueIsMissing(entry) && this.readNeedsRemoteValue(ctx, command)) {
            this.remoteGet(ctx, key, false, command);
        }
        return ctx.continueInvocation();
    }

    @Override
    public CompletableFuture<Void> visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            TxInvocationContext localTxCtx = ctx;
            List<Address> affectedNodes = this.cdl.getOwners(DeltaCompositeKeyUtil.filterDeltaCompositeKeys(command.getKeys()));
            ((LocalTransaction)localTxCtx.getCacheTransaction()).locksAcquired(affectedNodes == null ? this.dm.getConsistentHash().getMembers() : affectedNodes);
            log.tracef("Registered remote locks acquired %s", (Object)affectedNodes);
            RpcOptions rpcOptions = this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
            Map<Address, Response> responseMap = this.rpcManager.invokeRemotely(affectedNodes, command, rpcOptions);
            this.checkTxCommandResponses(responseMap, command, localTxCtx, ((LocalTransaction)localTxCtx.getCacheTransaction()).getRemoteLocksAcquired());
        }
        return ctx.continueInvocation();
    }

    @Override
    public CompletableFuture<Void> visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
        if (this.shouldInvokeRemoteTxCommand(ctx)) {
            Collection<Address> recipients = this.getCommitNodes(ctx);
            Map<Address, Response> responseMap = this.rpcManager.invokeRemotely(recipients, command, this.createCommitRpcOptions());
            this.checkTxCommandResponses(responseMap, command, ctx, recipients);
        }
        return ctx.continueInvocation();
    }

    @Override
    public CompletableFuture<Void> visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
        if (!ctx.isOriginLocal()) {
            return ctx.continueInvocation();
        }
        return ctx.onReturn((rCtx, rCommand, rv, throwable) -> {
            if (throwable == null && this.shouldInvokeRemoteTxCommand(ctx)) {
                TxInvocationContext localTxCtx = (TxInvocationContext)rCtx;
                List<Address> recipients = this.cdl.getOwners(DeltaCompositeKeyUtil.getAffectedKeysFromContext(localTxCtx));
                this.prepareOnAffectedNodes(localTxCtx, (PrepareCommand)rCommand, recipients);
                ((LocalTransaction)localTxCtx.getCacheTransaction()).locksAcquired(recipients == null ? this.dm.getWriteConsistentHash().getMembers() : recipients);
            }
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void prepareOnAffectedNodes(TxInvocationContext<?> ctx, PrepareCommand command, Collection<Address> recipients) {
        try {
            Map<Address, Response> responseMap = this.rpcManager.invokeRemotely(recipients, command, this.createPrepareRpcOptions());
            this.checkTxCommandResponses(responseMap, command, (LocalTxInvocationContext)ctx, recipients);
        }
        finally {
            TxDistributionInterceptor.transactionRemotelyPrepared(ctx);
        }
    }

    @Override
    public CompletableFuture<Void> visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
        if (this.shouldInvokeRemoteTxCommand(ctx)) {
            Collection<Address> recipients = this.getCommitNodes(ctx);
            Map<Address, Response> responseMap = this.rpcManager.invokeRemotely(recipients, command, this.createRollbackRpcOptions());
            this.checkTxCommandResponses(responseMap, command, ctx, recipients);
        }
        return ctx.continueInvocation();
    }

    private Collection<Address> getCommitNodes(TxInvocationContext ctx) {
        LocalTransaction localTx = (LocalTransaction)ctx.getCacheTransaction();
        List<Address> affectedNodes = this.cdl.getOwners(DeltaCompositeKeyUtil.getAffectedKeysFromContext(ctx));
        List<Address> members = this.dm.getConsistentHash().getMembers();
        return localTx.getCommitNodes(affectedNodes, this.rpcManager.getTopologyId(), members);
    }

    protected void checkTxCommandResponses(Map<Address, Response> responseMap, TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context, Collection<Address> recipients) {
        OutdatedTopologyException outdatedTopologyException = null;
        for (Map.Entry<Address, Response> e : responseMap.entrySet()) {
            Address recipient = e.getKey();
            Response response = e.getValue();
            if (response == CacheNotFoundResponse.INSTANCE) {
                if (command.getTopologyId() == this.stateTransferManager.getCacheTopology().getTopologyId() && !this.rpcManager.getMembers().contains(recipient)) {
                    if (!trace) continue;
                    log.tracef("Ignoring response from node not targeted %s", (Object)recipient);
                    continue;
                }
                if (this.checkCacheNotFoundResponseInPartitionHandling(command, context, recipients)) {
                    if (trace) {
                        log.tracef("Cache not running on node %s, or the node is missing. It will be handled by the PartitionHandlingManager", (Object)recipient);
                    }
                    return;
                }
                if (trace) {
                    log.tracef("Cache not running on node %s, or the node is missing", (Object)recipient);
                }
                outdatedTopologyException = new OutdatedTopologyException(String.format("Cache not running on node %s, or the node is missing", recipient));
                continue;
            }
            if (response != UnsureResponse.INSTANCE) continue;
            if (trace) {
                log.tracef("Node %s has a newer topology id", (Object)recipient);
            }
            outdatedTopologyException = new OutdatedTopologyException(String.format("Node %s has a newer topology id", recipient));
        }
        if (outdatedTopologyException != null) {
            throw outdatedTopologyException;
        }
    }

    private boolean checkCacheNotFoundResponseInPartitionHandling(TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context, Collection<Address> recipients) {
        GlobalTransaction globalTransaction = command.getGlobalTransaction();
        Set<Object> lockedKeys = context.getLockedKeys();
        if (command instanceof RollbackCommand) {
            return this.partitionHandlingManager.addPartialRollbackTransaction(globalTransaction, recipients, lockedKeys);
        }
        if (command instanceof PrepareCommand) {
            if (((PrepareCommand)command).isOnePhaseCommit()) {
                return this.partitionHandlingManager.addPartialCommit1PCTransaction(globalTransaction, recipients, lockedKeys, Arrays.asList(((PrepareCommand)command).getModifications()));
            }
        } else if (command instanceof CommitCommand) {
            EntryVersionsMap newVersion = null;
            if (command instanceof VersionedCommitCommand) {
                newVersion = ((VersionedCommitCommand)command).getUpdatedVersions();
            }
            return this.partitionHandlingManager.addPartialCommit2PCTransaction(globalTransaction, recipients, lockedKeys, newVersion);
        }
        return false;
    }

    private CompletableFuture<Void> handleTxWriteCommand(InvocationContext ctx, WriteCommand command, Object key) throws Throwable {
        return this.remoteGetBeforeWrite(ctx, command, key);
    }

    @Override
    protected boolean writeNeedsRemoteValue(InvocationContext ctx, WriteCommand command, Object key) {
        if (command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
            return false;
        }
        if (ctx.isOriginLocal()) {
            if (!command.readsExistingValues()) {
                return false;
            }
            return !command.hasFlag(Flag.SKIP_REMOTE_LOOKUP);
        }
        return command.alwaysReadsExistingValues();
    }

    @Override
    protected CompletableFuture<Void> remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, Object key) throws Throwable {
        CacheEntry entry = ctx.lookupEntry(key);
        if (!this.valueIsMissing(entry)) {
            return ctx.continueInvocation();
        }
        if (this.writeNeedsRemoteValue(ctx, command, key)) {
            this.remoteGet(ctx, key, true, command);
        }
        return ctx.continueInvocation();
    }

    protected InternalCacheEntry remoteGet(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
        InternalCacheEntry ice = this.retrieveFromProperSource(key, ctx, false, command, isWrite).get();
        if (ice != null) {
            if (this.useClusteredWriteSkewCheck && ctx.isInTxScope()) {
                ((AbstractCacheTransaction)((TxInvocationContext)ctx).getCacheTransaction()).putLookedUpRemoteVersion(key, ice.getMetadata().version());
            }
            EntryFactory.Wrap wrap = isWrite ? EntryFactory.Wrap.WRAP_NON_NULL : EntryFactory.Wrap.STORE;
            this.entryFactory.wrapExternalEntry(ctx, key, ice, wrap, false);
            return ice;
        }
        return null;
    }

    private RpcOptions createCommitRpcOptions() {
        return this.createRpcOptionsFor2ndPhase(this.cacheConfiguration.transaction().syncCommitPhase());
    }

    private RpcOptions createRollbackRpcOptions() {
        return this.createRpcOptionsFor2ndPhase(this.cacheConfiguration.transaction().syncRollbackPhase());
    }

    private RpcOptions createRpcOptionsFor2ndPhase(boolean sync) {
        if (sync) {
            return this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
        }
        return this.rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
    }

    protected RpcOptions createPrepareRpcOptions() {
        return this.cacheConfiguration.clustering().cacheMode().isSynchronous() ? this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build() : this.rpcManager.getDefaultRpcOptions(false);
    }
}

