/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.infinispan.commands.AbstractTopologyAffectedCommand;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.tx.TransactionBoundaryCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.versioning.EntryVersionsMap;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.BasicInvocationStage;
import org.infinispan.interceptors.distribution.BaseDistributionInterceptor;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.DeltaCompositeKeyUtil;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class TxDistributionInterceptor
extends BaseDistributionInterceptor {
    private static Log log = LogFactory.getLog(TxDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    private PartitionHandlingManager partitionHandlingManager;

    @Inject
    public void inject(PartitionHandlingManager partitionHandlingManager) {
        this.partitionHandlingManager = partitionHandlingManager;
    }

    @Override
    public BasicInvocationStage visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    private void updateMatcherForRetry(WriteCommand command) {
        command.setValueMatcher(command.isSuccessful() ? ValueMatcher.MATCH_ALWAYS : ValueMatcher.MATCH_NEVER);
    }

    @Override
    public BasicInvocationStage visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public BasicInvocationStage visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
            return this.handleNonTxWriteCommand(ctx, command);
        }
        return this.handleTxWriteCommand(ctx, command, command.getKey());
    }

    @Override
    public BasicInvocationStage visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        return this.handleTxWriteManyEntriesCommand(ctx, command, command.getMap(), (c, entries) -> new PutMapCommand((PutMapCommand)c).withMap((Map<Object, Object>)entries));
    }

    @Override
    public BasicInvocationStage visitLockControlCommand(TxInvocationContext ctx, LockControlCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            TxInvocationContext localTxCtx = ctx;
            List<Address> affectedNodes = this.cdl.getOwners(DeltaCompositeKeyUtil.filterDeltaCompositeKeys(command.getKeys()));
            ((LocalTransaction)localTxCtx.getCacheTransaction()).locksAcquired(affectedNodes == null ? this.dm.getConsistentHash().getMembers() : affectedNodes);
            log.tracef("Registered remote locks acquired %s", affectedNodes);
            RpcOptions rpcOptions = this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(affectedNodes, command, rpcOptions);
            return this.returnWithAsync((CompletableFuture<Object>)remoteInvocation.thenApply(responses -> {
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, localTxCtx, ((LocalTransaction)localTxCtx.getCacheTransaction()).getRemoteLocksAcquired());
                return null;
            }));
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public BasicInvocationStage visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
        if (this.shouldInvokeRemoteTxCommand(ctx)) {
            Collection<Address> recipients = this.getCommitNodes(ctx);
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, this.createCommitRpcOptions());
            return this.returnWithAsync((CompletableFuture<Object>)remoteInvocation.thenApply(responses -> {
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, ctx, recipients);
                return null;
            }));
        }
        return this.invokeNext(ctx, command);
    }

    @Override
    public BasicInvocationStage visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
        if (!ctx.isOriginLocal()) {
            return this.invokeNext(ctx, command);
        }
        return this.invokeNext(ctx, command).thenCompose((stage, rCtx, rCommand, rv) -> {
            if (!this.shouldInvokeRemoteTxCommand(ctx)) {
                return this.returnWith(null);
            }
            TxInvocationContext localTxCtx = (TxInvocationContext)rCtx;
            List<Address> recipients = this.cdl.getOwners(DeltaCompositeKeyUtil.getAffectedKeysFromContext(localTxCtx));
            CompletableFuture<Object> remotePrepare = this.prepareOnAffectedNodes(localTxCtx, (PrepareCommand)rCommand, recipients);
            return this.returnWithAsync((CompletableFuture<Object>)remotePrepare.thenApply(o -> {
                ((LocalTransaction)localTxCtx.getCacheTransaction()).locksAcquired((Collection<Address>)(recipients == null ? this.dm.getWriteConsistentHash().getMembers() : recipients));
                return o;
            }));
        });
    }

    protected CompletableFuture<Object> prepareOnAffectedNodes(TxInvocationContext<?> ctx, PrepareCommand command, Collection<Address> recipients) {
        try {
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, this.createPrepareRpcOptions());
            return remoteInvocation.handle((responses, t) -> {
                TxDistributionInterceptor.transactionRemotelyPrepared(ctx);
                CompletableFutures.rethrowException(t);
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, (LocalTxInvocationContext)ctx, recipients);
                return null;
            });
        }
        catch (Throwable t2) {
            TxDistributionInterceptor.transactionRemotelyPrepared(ctx);
            throw t2;
        }
    }

    @Override
    public BasicInvocationStage visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
        if (this.shouldInvokeRemoteTxCommand(ctx)) {
            Collection<Address> recipients = this.getCommitNodes(ctx);
            CompletableFuture<Map<Address, Response>> remoteInvocation = this.rpcManager.invokeRemotelyAsync(recipients, command, this.createRollbackRpcOptions());
            return this.returnWithAsync((CompletableFuture<Object>)remoteInvocation.thenApply(responses -> {
                this.checkTxCommandResponses((Map<Address, Response>)responses, command, ctx, recipients);
                return null;
            }));
        }
        return this.invokeNext(ctx, command);
    }

    private Collection<Address> getCommitNodes(TxInvocationContext ctx) {
        LocalTransaction localTx = (LocalTransaction)ctx.getCacheTransaction();
        List<Address> affectedNodes = this.cdl.getOwners(DeltaCompositeKeyUtil.getAffectedKeysFromContext(ctx));
        List<Address> members = this.dm.getConsistentHash().getMembers();
        return localTx.getCommitNodes(affectedNodes, this.rpcManager.getTopologyId(), members);
    }

    protected void checkTxCommandResponses(Map<Address, Response> responseMap, TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context, Collection<Address> recipients) {
        OutdatedTopologyException outdatedTopologyException = null;
        for (Map.Entry<Address, Response> e : responseMap.entrySet()) {
            Address recipient = e.getKey();
            Response response = e.getValue();
            if (response == CacheNotFoundResponse.INSTANCE) {
                if (command.getTopologyId() == this.stateTransferManager.getCacheTopology().getTopologyId() && !this.rpcManager.getMembers().contains(recipient)) {
                    if (!trace) continue;
                    log.tracef("Ignoring response from node not targeted %s", recipient);
                    continue;
                }
                if (this.checkCacheNotFoundResponseInPartitionHandling(command, context, recipients)) {
                    if (trace) {
                        log.tracef("Cache not running on node %s, or the node is missing. It will be handled by the PartitionHandlingManager", recipient);
                    }
                    return;
                }
                if (trace) {
                    log.tracef("Cache not running on node %s, or the node is missing", recipient);
                }
                outdatedTopologyException = new OutdatedTopologyException(String.format("Cache not running on node %s, or the node is missing", recipient));
                continue;
            }
            if (response != UnsureResponse.INSTANCE) continue;
            if (trace) {
                log.tracef("Node %s has a newer topology id", recipient);
            }
            outdatedTopologyException = new OutdatedTopologyException(String.format("Node %s has a newer topology id", recipient));
        }
        if (outdatedTopologyException != null) {
            throw outdatedTopologyException;
        }
    }

    private boolean checkCacheNotFoundResponseInPartitionHandling(TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context, Collection<Address> recipients) {
        GlobalTransaction globalTransaction = command.getGlobalTransaction();
        Set<Object> lockedKeys = context.getLockedKeys();
        if (command instanceof RollbackCommand) {
            return this.partitionHandlingManager.addPartialRollbackTransaction(globalTransaction, recipients, lockedKeys);
        }
        if (command instanceof PrepareCommand) {
            if (((PrepareCommand)command).isOnePhaseCommit()) {
                return this.partitionHandlingManager.addPartialCommit1PCTransaction(globalTransaction, recipients, lockedKeys, Arrays.asList(((PrepareCommand)command).getModifications()));
            }
        } else if (command instanceof CommitCommand) {
            EntryVersionsMap newVersion = null;
            if (command instanceof VersionedCommitCommand) {
                newVersion = ((VersionedCommitCommand)command).getUpdatedVersions();
            }
            return this.partitionHandlingManager.addPartialCommit2PCTransaction(globalTransaction, recipients, lockedKeys, newVersion);
        }
        return false;
    }

    private BasicInvocationStage handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteCommand command, Object key) throws Throwable {
        try {
            if (!ctx.isOriginLocal() && !this.cdl.localNodeIsOwner(command.getKey())) {
                return this.returnWith(null);
            }
            CacheEntry entry = ctx.lookupEntry(command.getKey());
            if (entry == null) {
                if (this.isLocalModeForced(command) || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) || !this.needsPreviousValue(ctx, command)) {
                    this.entryFactory.wrapExternalEntry(ctx, key, null, true);
                } else {
                    return this.invokeNextAsync(ctx, command, this.remoteGet(ctx, command, command.getKey(), true)).handle((rCtx, rCommand, rv, t) -> this.updateMatcherForRetry((WriteCommand)rCommand));
                }
            }
            return this.invokeNext(ctx, command).handle((rCtx, rCommand, rv, t) -> this.updateMatcherForRetry((WriteCommand)rCommand));
        }
        catch (Throwable t2) {
            this.updateMatcherForRetry(command);
            throw t2;
        }
    }

    protected <C extends AbstractTopologyAffectedCommand, K, V> BasicInvocationStage handleTxWriteManyEntriesCommand(InvocationContext ctx, C command, Map<K, V> entries, BiFunction<C, Map<K, V>, C> copyCommand) {
        HashMap<K, V> filtered = new HashMap<K, V>(entries.size());
        ArrayList<CompletableFuture<Void>> remoteGets = null;
        for (Map.Entry<K, V> e : entries.entrySet()) {
            K key = e.getKey();
            if (!ctx.isOriginLocal() && !this.cdl.localNodeIsOwner(key)) continue;
            if (ctx.lookupEntry(key) == null) {
                if (command.hasFlag(Flag.CACHE_MODE_LOCAL) || command.hasFlag(Flag.SKIP_REMOTE_LOOKUP) || !this.needsPreviousValue(ctx, command)) {
                    this.entryFactory.wrapExternalEntry(ctx, key, null, true);
                } else {
                    if (remoteGets == null) {
                        remoteGets = new ArrayList<CompletableFuture<Void>>();
                    }
                    remoteGets.add(this.remoteGet(ctx, command, key, true));
                }
            }
            filtered.put(key, e.getValue());
        }
        AbstractTopologyAffectedCommand narrowed = (AbstractTopologyAffectedCommand)copyCommand.apply(command, filtered);
        if (remoteGets != null) {
            return this.invokeNextAsync(ctx, narrowed, CompletableFuture.allOf(remoteGets.toArray(new CompletableFuture[remoteGets.size()])));
        }
        return this.invokeNext(ctx, narrowed);
    }

    private boolean needsPreviousValue(InvocationContext ctx, FlagAffectedCommand command) {
        switch (command.loadType()) {
            case DONT_LOAD: {
                return false;
            }
            case PRIMARY: {
                return ctx.isOriginLocal();
            }
            case OWNER: {
                return true;
            }
        }
        throw new IllegalStateException();
    }

    private RpcOptions createCommitRpcOptions() {
        return this.createRpcOptionsFor2ndPhase(this.cacheConfiguration.transaction().syncCommitPhase());
    }

    private RpcOptions createRollbackRpcOptions() {
        return this.createRpcOptionsFor2ndPhase(this.cacheConfiguration.transaction().syncRollbackPhase());
    }

    private RpcOptions createRpcOptionsFor2ndPhase(boolean sync) {
        if (sync) {
            return this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
        }
        return this.rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
    }

    protected RpcOptions createPrepareRpcOptions() {
        return this.cacheConfiguration.clustering().cacheMode().isSynchronous() ? this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build() : this.rpcManager.getDefaultRpcOptions(false);
    }
}

