/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PrimaryAckCommand;
import org.infinispan.commands.write.PrimaryMultiKeyAckCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commons.util.Util;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.BasicInvocationStage;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class TriangleAckInterceptor
extends DDAsyncInterceptor {
    private static final Log log = LogFactory.getLog(TriangleAckInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    private RpcManager rpcManager;
    private CommandsFactory commandsFactory;
    private CommandAckCollector commandAckCollector;
    private DistributionManager distributionManager;
    private ScheduledExecutorService timeoutExecutor;
    private StateTransferManager stateTransferManager;
    private Address localAddress;
    private long timeoutNanoseconds;

    private static Collection<Integer> calculateSegments(Collection<?> keys, ConsistentHash ch) {
        return keys.stream().map(ch::getSegment).collect(Collectors.toSet());
    }

    @Inject
    public void inject(RpcManager rpcManager, CommandsFactory commandsFactory, CommandAckCollector commandAckCollector, DistributionManager distributionManager, @ComponentName(value="org.infinispan.executors.timeout") ScheduledExecutorService timeoutExecutor, StateTransferManager stateTransferManager) {
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.commandAckCollector = commandAckCollector;
        this.distributionManager = distributionManager;
        this.timeoutExecutor = timeoutExecutor;
        this.stateTransferManager = stateTransferManager;
    }

    @Start
    public void start() {
        this.localAddress = this.rpcManager.getAddress();
        RpcOptions options = this.rpcManager.getDefaultRpcOptions(true);
        this.timeoutNanoseconds = options.timeUnit().toNanos(options.timeout());
    }

    @Override
    public BasicInvocationStage visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public BasicInvocationStage visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public BasicInvocationStage visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleWriteCommand(ctx, command);
    }

    @Override
    public BasicInvocationStage visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            return this.handleLocalPutMapCommand(ctx, command);
        }
        return this.handleRemotePutMapCommand(ctx, command);
    }

    private BasicInvocationStage handleRemotePutMapCommand(InvocationContext ctx, PutMapCommand command) {
        return this.invokeNext(ctx, command).compose((stage, rCtx, rCommand, rv, t) -> {
            PutMapCommand cmd = (PutMapCommand)rCommand;
            if (t != null) {
                this.sendExceptionAck(cmd.getCommandInvocationId(), cmd.getTopologyId(), t);
                return stage;
            }
            Collection<Integer> segments = TriangleAckInterceptor.calculateSegments(cmd.getAffectedKeys(), this.distributionManager.getWriteConsistentHash());
            if (cmd.isForwarded()) {
                this.sendPutMapBackupAck(cmd, segments);
            } else {
                this.sendPrimaryPutMapAck(cmd, (Map)rv);
            }
            return stage;
        });
    }

    private BasicInvocationStage handleLocalPutMapCommand(InvocationContext ctx, PutMapCommand command) {
        return this.invokeNext(ctx, command).compose((stage, rCtx, rCommand, rv, throwable) -> {
            PutMapCommand cmd = (PutMapCommand)rCommand;
            if (throwable != null) {
                this.disposeCollectorOnException(cmd.getCommandInvocationId());
                return stage;
            }
            return this.waitCollectorAsync(stage, cmd.getCommandInvocationId());
        });
    }

    private BasicInvocationStage handleWriteCommand(InvocationContext ctx, DataWriteCommand command) {
        if (ctx.isOriginLocal()) {
            return this.invokeNext(ctx, command).compose(this::onLocalWriteCommand);
        }
        CacheTopology cacheTopology = this.stateTransferManager.getCacheTopology();
        if (command.getTopologyId() != cacheTopology.getTopologyId()) {
            this.sendExceptionAck(command.getCommandInvocationId(), command.getTopologyId(), (Throwable)((Object)OutdatedTopologyException.getCachedInstance()));
            throw OutdatedTopologyException.getCachedInstance();
        }
        DistributionInfo distributionInfo = new DistributionInfo(command.getKey(), cacheTopology.getWriteConsistentHash(), this.localAddress);
        switch (distributionInfo.ownership()) {
            case BACKUP: {
                return this.invokeNext(ctx, command).compose(this::onRemoteBackupOwner);
            }
            case PRIMARY: {
                return this.invokeNext(ctx, command).compose(this::onRemotePrimaryOwner);
            }
        }
        throw new IllegalStateException();
    }

    private BasicInvocationStage onRemotePrimaryOwner(BasicInvocationStage stage, InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) {
        DataWriteCommand command = (DataWriteCommand)rCommand;
        if (throwable != null) {
            this.sendExceptionAck(command.getCommandInvocationId(), command.getTopologyId(), throwable);
        } else {
            this.sendPrimaryAck(command, rv);
        }
        return stage;
    }

    private BasicInvocationStage onRemoteBackupOwner(BasicInvocationStage stage, InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) {
        DataWriteCommand cmd = (DataWriteCommand)rCommand;
        if (throwable != null) {
            this.sendExceptionAck(cmd.getCommandInvocationId(), cmd.getTopologyId(), throwable);
        } else {
            this.sendBackupAck(cmd);
        }
        return stage;
    }

    private BasicInvocationStage onLocalWriteCommand(BasicInvocationStage stage, InvocationContext rCtx, VisitableCommand rCommand, Object rv, Throwable throwable) {
        DataWriteCommand cmd = (DataWriteCommand)rCommand;
        cmd.getCommandInvocationId();
        if (throwable != null) {
            this.disposeCollectorOnException(cmd.getCommandInvocationId());
            return stage;
        }
        return this.waitCollectorAsync(stage, cmd.getCommandInvocationId());
    }

    private void disposeCollectorOnException(CommandInvocationId id) {
        this.commandAckCollector.dispose(id);
    }

    private BasicInvocationStage waitCollectorAsync(BasicInvocationStage stage, CommandInvocationId id) {
        CompletableFuture<Object> collectorFuture = this.commandAckCollector.getCollectorCompletableFuture(id, true);
        if (collectorFuture == null) {
            return stage;
        }
        this.timeoutExecutor.schedule(() -> this.timeoutCompletableFuture(collectorFuture), this.timeoutNanoseconds, TimeUnit.NANOSECONDS);
        return this.returnWithAsync(collectorFuture);
    }

    private void timeoutCompletableFuture(CompletableFuture<?> future) {
        future.completeExceptionally((Throwable)((Object)log.timeoutWaitingForAcks(Util.prettyPrintTime((long)this.timeoutNanoseconds, (TimeUnit)TimeUnit.NANOSECONDS))));
    }

    private void sendPrimaryAck(DataWriteCommand command, Object returnValue) {
        CommandInvocationId id = command.getCommandInvocationId();
        Address origin = id.getAddress();
        if (trace) {
            log.tracef("Sending ack for command %s. Originator=%s.", id, origin);
        }
        PrimaryAckCommand ackCommand = this.commandsFactory.buildPrimaryAckCommand();
        command.initPrimaryAck(ackCommand, returnValue);
        this.rpcManager.sendTo(origin, ackCommand, command.isSuccessful() ? DeliverOrder.NONE : DeliverOrder.PER_SENDER);
    }

    private void sendBackupAck(DataWriteCommand command) {
        CommandInvocationId id = command.getCommandInvocationId();
        Address origin = id.getAddress();
        if (trace) {
            log.tracef("Sending ack for command %s. Originator=%s.", id, origin);
        }
        if (origin.equals(this.localAddress)) {
            this.commandAckCollector.backupAck(id, origin, command.getTopologyId());
        } else {
            this.rpcManager.sendTo(origin, this.commandsFactory.buildBackupAckCommand(id, command.getTopologyId()), DeliverOrder.NONE);
        }
    }

    private void sendPutMapBackupAck(PutMapCommand command, Collection<Integer> segments) {
        CommandInvocationId id = command.getCommandInvocationId();
        Address origin = id.getAddress();
        if (trace) {
            log.tracef("Sending ack for command %s. Originator=%s.", id, origin);
        }
        if (id.getAddress().equals(this.localAddress)) {
            this.commandAckCollector.multiKeyBackupAck(id, this.localAddress, segments, command.getTopologyId());
        } else {
            this.rpcManager.sendTo(id.getAddress(), this.commandsFactory.buildBackupMultiKeyAckCommand(id, segments, command.getTopologyId()), DeliverOrder.NONE);
        }
    }

    private void sendPrimaryPutMapAck(PutMapCommand command, Map<Object, Object> returnValue) {
        CommandInvocationId id = command.getCommandInvocationId();
        Address origin = id.getAddress();
        if (trace) {
            log.tracef("Sending ack for command %s. Originator=%s.", id, origin);
        }
        PrimaryMultiKeyAckCommand ack = this.commandsFactory.buildPrimaryMultiKeyAckCommand(command.getCommandInvocationId(), command.getTopologyId());
        if (command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
            ack.initWithoutReturnValue();
        } else {
            ack.initWithReturnValue(returnValue);
        }
        this.rpcManager.sendTo(id.getAddress(), ack, command.isSuccessful() ? DeliverOrder.NONE : DeliverOrder.PER_SENDER);
    }

    private void sendExceptionAck(CommandInvocationId id, int topologyId, Throwable throwable) {
        Address origin = id.getAddress();
        if (trace) {
            log.tracef("Sending exception ack for command %s. Originator=%s.", id, origin);
        }
        if (origin.equals(this.localAddress)) {
            this.commandAckCollector.completeExceptionally(id, throwable, topologyId);
        } else {
            this.rpcManager.sendTo(origin, this.commandsFactory.buildExceptionAckCommand(id, throwable, topologyId), DeliverOrder.NONE);
        }
    }
}

