/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanType;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureAdapter<Void>
implements GridCacheAtomicFuture<Void> {
    protected static IgniteLogger log;
    private static final AtomicReference<IgniteLogger> logRef;
    protected static IgniteLogger msgLog;
    protected final GridCacheVersion writeVer;
    protected final GridCacheContext cctx;
    @GridToStringInclude
    protected final long futId;
    final GridNearAtomicAbstractUpdateRequest updateReq;
    @GridToStringExclude
    protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
    private Collection<CI1<Boolean>> cntQryClsrs;
    private volatile int resCnt;
    private boolean addedReader;

    protected GridDhtAtomicAbstractUpdateFuture(GridCacheContext cctx, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq) {
        this.cctx = cctx;
        this.updateReq = updateReq;
        this.writeVer = writeVer;
        this.futId = cctx.mvcc().nextAtomicId();
        if (log == null) {
            msgLog = cctx.shared().atomicMessageLogger();
            log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
        }
    }

    protected abstract boolean sendAllToDht();

    @Override
    public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
        boolean waitForExchange;
        boolean bl = waitForExchange = !this.updateReq.topologyLocked();
        if (waitForExchange && this.updateReq.topologyVersion().compareTo(topVer) < 0) {
            return this;
        }
        return null;
    }

    public final void addContinuousQueryClosure(CI1<Boolean> clsr, boolean sync) {
        assert (!this.isDone()) : this;
        if (sync) {
            clsr.apply(true);
        } else {
            if (this.cntQryClsrs == null) {
                this.cntQryClsrs = new ArrayList<CI1<Boolean>>(10);
            }
            this.cntQryClsrs.add(clsr);
        }
    }

    final void addWriteEntry(AffinityAssignment affAssignment, GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, long updateCntr, GridCacheOperation cacheOp) {
        List<ClusterNode> dhtNodes;
        AffinityTopologyVersion topVer = this.updateReq.topologyVersion();
        List<ClusterNode> affNodes = affAssignment.get(entry.partition());
        List<ClusterNode> list = dhtNodes = this.updateReq.affinityMapping() ? affNodes : this.cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes);
        if (dhtNodes == null) {
            dhtNodes = affNodes;
        }
        if (log.isDebugEnabled()) {
            log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
        }
        CacheWriteSynchronizationMode syncMode = this.updateReq.writeSynchronizationMode();
        this.addDhtKey(entry.key(), dhtNodes);
        for (int i = 0; i < dhtNodes.size(); ++i) {
            ClusterNode node = dhtNodes.get(i);
            UUID nodeId = node.id();
            if (nodeId.equals(this.cctx.localNodeId())) continue;
            GridDhtAtomicAbstractUpdateRequest updateReq = this.mappings.get(nodeId);
            if (updateReq == null) {
                updateReq = this.createRequest(node.id(), this.futId, this.writeVer, syncMode, topVer, ttl, conflictExpireTime, conflictVer);
                this.mappings.put(nodeId, updateReq);
            }
            updateReq.addWriteValue(entry.key(), val, entryProcessor, ttl, conflictExpireTime, conflictVer, addPrevVal, prevVal, updateCntr, cacheOp);
        }
    }

    protected abstract void addDhtKey(KeyCacheObject var1, List<ClusterNode> var2);

    protected abstract void addNearKey(KeyCacheObject var1, GridDhtCacheEntry.ReaderId[] var2);

    final void addNearWriteEntries(ClusterNode nearNode, GridDhtCacheEntry.ReaderId[] readers, GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) {
        assert (readers != null);
        CacheWriteSynchronizationMode syncMode = this.updateReq.writeSynchronizationMode();
        this.addNearKey(entry.key(), readers);
        AffinityTopologyVersion topVer = this.updateReq.topologyVersion();
        for (int i = 0; i < readers.length; ++i) {
            GridDhtCacheEntry.ReaderId reader = readers[i];
            if (nearNode.id().equals(reader.nodeId())) continue;
            GridDhtAtomicAbstractUpdateRequest updateReq = this.mappings.get(reader.nodeId());
            if (updateReq == null) {
                ClusterNode node = this.cctx.discovery().node(reader.nodeId());
                if (node == null) {
                    try {
                        entry.removeReader(reader.nodeId(), -1L);
                        continue;
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        assert (false);
                        continue;
                    }
                }
                updateReq = this.createRequest(node.id(), this.futId, this.writeVer, syncMode, topVer, ttl, expireTime, null);
                this.mappings.put(node.id(), updateReq);
                this.addedReader = true;
            }
            updateReq.addNearWriteValue(entry.key(), val, entryProcessor, ttl, expireTime);
        }
    }

    @Override
    public final IgniteUuid futureId() {
        throw new UnsupportedOperationException();
    }

    final long id() {
        return this.futId;
    }

    final GridCacheVersion writeVersion() {
        return this.writeVer;
    }

    @Override
    public final boolean onNodeLeft(UUID nodeId) {
        boolean res = this.registerResponse(nodeId);
        if (res && msgLog.isDebugEnabled()) {
            msgLog.debug("DTH update fut, node left [futId=" + this.futId + ", writeVer=" + this.writeVer + ", node=" + nodeId + ']');
        }
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean registerResponse(UUID nodeId) {
        GridDhtAtomicAbstractUpdateRequest req;
        GridDhtAtomicAbstractUpdateRequest gridDhtAtomicAbstractUpdateRequest = req = this.mappings != null ? this.mappings.get(nodeId) : null;
        if (req != null) {
            int resCnt0;
            GridDhtAtomicAbstractUpdateFuture gridDhtAtomicAbstractUpdateFuture = this;
            synchronized (gridDhtAtomicAbstractUpdateFuture) {
                if (!req.onResponse()) {
                    return false;
                }
                resCnt0 = this.resCnt;
                this.resCnt = ++resCnt0;
            }
            if (resCnt0 == this.mappings.size()) {
                this.onDone();
            }
            return true;
        }
        return false;
    }

    final void map(ClusterNode nearNode, GridCacheReturn ret, GridNearAtomicUpdateResponse updateRes, GridDhtAtomicCache.UpdateReplyClosure completionCb, Span ctxSpan) {
        this.span = this.cctx.kernalContext().tracing().create(SpanType.CACHE_API_DHT_UPDATE_FUTURE, ctxSpan);
        try (MTC.TraceSurroundings ignored = MTC.supportContinual(this.span);
             MTC.TraceSurroundings ignored2 = MTC.support(this.cctx.kernalContext().tracing().create(SpanType.CACHE_API_UPDATE_MAP, this.span));){
            this.span.addTag("write.version", () -> Objects.toString(this.writeVer));
            if (F.isEmpty(this.mappings)) {
                updateRes.mapping(Collections.emptyList());
                completionCb.apply(this.updateReq, updateRes);
                this.onDone();
                return;
            }
            boolean needReplyToNear = this.updateReq.writeSynchronizationMode() == CacheWriteSynchronizationMode.PRIMARY_SYNC || !ret.emptyResult() || this.updateReq.nearCache() || this.cctx.localNodeId().equals(nearNode.id());
            boolean needMapping = this.updateReq.fullSync() && (this.updateReq.needPrimaryResponse() || !this.sendAllToDht());
            boolean readersOnlyNodes = false;
            if (!this.updateReq.needPrimaryResponse() && this.addedReader) {
                for (GridDhtAtomicAbstractUpdateRequest dhtReq : this.mappings.values()) {
                    if (dhtReq.nearSize() <= 0 || dhtReq.size() != 0) continue;
                    readersOnlyNodes = true;
                    break;
                }
            }
            if (needMapping || readersOnlyNodes) {
                this.initMapping(updateRes);
                needReplyToNear = true;
            }
            this.sendDhtRequests(nearNode, ret, !readersOnlyNodes);
            if (needReplyToNear) {
                completionCb.apply(this.updateReq, updateRes);
            }
        }
    }

    private void initMapping(GridNearAtomicUpdateResponse updateRes) {
        List<UUID> mapping;
        if (!F.isEmpty(this.mappings)) {
            mapping = new ArrayList<UUID>(this.mappings.size());
            mapping.addAll(this.mappings.keySet());
        } else {
            mapping = Collections.emptyList();
        }
        updateRes.mapping(mapping);
    }

    private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret, boolean sndRes) {
        for (GridDhtAtomicAbstractUpdateRequest req : this.mappings.values()) {
            try {
                assert (!this.cctx.localNodeId().equals(req.nodeId())) : req;
                if (this.updateReq.fullSync()) {
                    req.nearReplyInfo(nearNode.id(), this.updateReq.futureId());
                    if (sndRes && ret.emptyResult()) {
                        req.hasResult(true);
                    }
                }
                if (this.cntQryClsrs != null) {
                    req.replyWithoutDelay(true);
                }
                this.cctx.io().send(req.nodeId(), (GridCacheMessage)req, this.cctx.ioPolicy());
                if (!msgLog.isDebugEnabled()) continue;
                msgLog.debug("DTH update fut, sent request [futId=" + this.futId + ", writeVer=" + this.writeVer + ", node=" + req.nodeId() + ']');
            }
            catch (ClusterTopologyCheckedException ignored) {
                if (msgLog.isDebugEnabled()) {
                    msgLog.debug("DTH update fut, failed to send request, node left [futId=" + this.futId + ", writeVer=" + this.writeVer + ", node=" + req.nodeId() + ']');
                }
                this.registerResponse(req.nodeId());
            }
            catch (IgniteCheckedException ignored) {
                U.error(msgLog, "Failed to send request [futId=" + this.futId + ", writeVer=" + this.writeVer + ", node=" + req.nodeId() + ']');
                this.registerResponse(req.nodeId());
            }
        }
    }

    final void onDeferredResponse(UUID nodeId) {
        try (MTC.TraceSurroundings ignored = MTC.support(this.cctx.kernalContext().tracing().create(SpanType.CACHE_API_DHT_PROCESS_ATOMIC_DEFERRED_UPDATE_RESPONSE, this.span));){
            if (log.isDebugEnabled()) {
                log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
            }
            this.registerResponse(nodeId);
        }
    }

    final void onDhtResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
        if (!F.isEmpty(res.nearEvicted())) {
            for (KeyCacheObject key : res.nearEvicted()) {
                try {
                    GridDhtCacheEntry entry = (GridDhtCacheEntry)this.cctx.cache().peekEx(key);
                    if (entry == null) continue;
                    entry.removeReader(nodeId, res.messageId());
                }
                catch (GridCacheEntryRemovedException e) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("Entry with evicted reader was removed [key=" + key + ", err=" + e + ']');
                }
            }
        }
        this.registerResponse(nodeId);
    }

    protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(UUID var1, long var2, GridCacheVersion var4, CacheWriteSynchronizationMode var5, @NotNull AffinityTopologyVersion var6, long var7, long var9, @Nullable GridCacheVersion var11);

    @Override
    public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
        if (super.onDone(res, err)) {
            boolean suc;
            this.cctx.mvcc().removeAtomicFuture(this.futId);
            boolean bl = suc = err == null;
            if (this.cntQryClsrs != null) {
                for (CI1<Boolean> clsr : this.cntQryClsrs) {
                    clsr.apply(suc);
                }
            }
            return true;
        }
        return false;
    }

    @Override
    public boolean trackable() {
        return true;
    }

    @Override
    public void markNotTrackable() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String toString() {
        GridDhtAtomicAbstractUpdateFuture gridDhtAtomicAbstractUpdateFuture = this;
        synchronized (gridDhtAtomicAbstractUpdateFuture) {
            Map<UUID, String> dhtRes = F.viewReadOnly(this.mappings, new IgniteClosure<GridDhtAtomicAbstractUpdateRequest, String>(){

                @Override
                public String apply(GridDhtAtomicAbstractUpdateRequest req) {
                    return "[res=" + req.hasResponse() + ", size=" + req.size() + ", nearSize=" + req.nearSize() + ']';
                }
            }, new IgnitePredicate[0]);
            return S.toString(GridDhtAtomicAbstractUpdateFuture.class, this, "dhtRes", dhtRes);
        }
    }

    static {
        logRef = new AtomicReference();
    }
}

