/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.h2.twostep;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import javax.cache.CacheException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.gridgain.internal.h2.util.IntArray;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class ReducePartitionMapper {
    private static final Set<ClusterNode> UNMAPPED_PARTS = Collections.emptySet();
    private final GridKernalContext ctx;
    private final IgniteLogger log;

    public ReducePartitionMapper(GridKernalContext ctx, IgniteLogger log) {
        this.ctx = ctx;
        this.log = log;
    }

    public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly) {
        Collection<ClusterNode> nodes = null;
        Map<ClusterNode, IntArray> partsMap = null;
        Map<ClusterNode, IntArray> qryMap = null;
        for (int cacheId : cacheIds) {
            int lostPart;
            GridCacheContext<?, ?> cctx = this.cacheContext(cacheId);
            Set lostParts = cctx.topology().lostPartitions();
            if (lostParts.isEmpty() || (lostPart = parts == null ? ((Integer)lostParts.iterator().next()).intValue() : IntStream.of(parts).filter(lostParts::contains).findFirst().orElse(-1)) < 0) continue;
            throw new CacheException((Throwable)new CacheInvalidStateException("Failed to execute query because cache partition has been lostPart [cacheName=" + cctx.name() + ", part=" + lostPart + ']'));
        }
        if (this.isPreloadingActive(cacheIds)) {
            if (isReplicatedOnly) {
                nodes = this.replicatedUnstableDataNodes(cacheIds);
            } else {
                partsMap = this.partitionedUnstableDataNodes(cacheIds);
                if (partsMap != null) {
                    qryMap = ReducePartitionMapper.narrowForQuery(partsMap, parts);
                    nodes = qryMap == null ? null : qryMap.keySet();
                }
            }
        } else if (parts == null) {
            nodes = this.stableDataNodes(cacheIds, topVer, isReplicatedOnly);
        } else {
            qryMap = this.stableDataNodesForPartitions(topVer, cacheIds, parts);
            if (qryMap != null) {
                nodes = qryMap.keySet();
            }
        }
        return new ReducePartitionMapResult(nodes, partsMap, qryMap);
    }

    private GridCacheContext<?, ?> cacheContext(Integer cacheId) {
        GridCacheContext cctx = this.ctx.cache().context().cacheContext(cacheId.intValue());
        if (cctx == null) {
            throw new CacheException(String.format("Cache not found on local node (was concurrently destroyed?) [cacheId=%d]", cacheId));
        }
        return cctx;
    }

    private boolean isPreloadingActive(List<Integer> cacheIds) {
        for (Integer cacheId : cacheIds) {
            if (!ReducePartitionMapper.hasMovingPartitions(this.cacheContext(cacheId))) continue;
            return true;
        }
        return false;
    }

    private static boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) {
        assert (cctx != null);
        return !cctx.isLocal() && cctx.topology().hasMovingPartitions();
    }

    private Map<ClusterNode, IntArray> stableDataNodesForPartitions(AffinityTopologyVersion topVer, List<Integer> cacheIds, @NotNull int[] parts) {
        Map<ClusterNode, IntArray> map;
        Set<ClusterNode> nodes;
        assert (parts != null);
        GridCacheContext<?, ?> cctx = this.firstPartitionedCache(cacheIds);
        if (this.narrowToCaches(cctx, nodes = (map = this.stableDataNodesMap(topVer, cctx, parts)).keySet(), cacheIds, topVer, parts, false) == null) {
            return null;
        }
        return map;
    }

    private Collection<ClusterNode> stableDataNodes(List<Integer> cacheIds, AffinityTopologyVersion topVer, boolean isReplicatedOnly) {
        GridCacheContext<?, ?> cctx = isReplicatedOnly ? this.cacheContext(cacheIds.get(0)) : this.firstPartitionedCache(cacheIds);
        AffinityAssignment topologyAssignment = cctx.affinity().assignment(topVer);
        Set nodes = cctx.isReplicated() ? (isReplicatedOnly && cacheIds.size() > 1 ? new HashSet(topologyAssignment.nodes()) : topologyAssignment.nodes()) : topologyAssignment.primaryPartitionNodes();
        return this.narrowToCaches(cctx, nodes, cacheIds, topVer, null, isReplicatedOnly);
    }

    private GridCacheContext<?, ?> firstPartitionedCache(List<Integer> cacheIds) {
        GridCacheContext<?, ?> cctx = this.cacheContext(cacheIds.get(0));
        if (cctx.isPartitioned()) {
            return cctx;
        }
        for (int cacheId = 1; cacheId < cacheIds.size(); ++cacheId) {
            GridCacheContext<?, ?> currCctx = this.cacheContext(cacheIds.get(cacheId));
            if (!currCctx.isPartitioned()) continue;
            Collections.swap(cacheIds, 0, cacheId);
            return currCctx;
        }
        assert (false);
        return cctx;
    }

    private Collection<ClusterNode> narrowToCaches(GridCacheContext<?, ?> cctx, Collection<ClusterNode> nodes, List<Integer> cacheIds, AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly) {
        if (F.isEmpty(nodes)) {
            throw new CacheServerNotFoundException("Failed to find data nodes for cache: " + cctx.name());
        }
        for (int i = 1; i < cacheIds.size(); ++i) {
            boolean disjoint;
            GridCacheContext<?, ?> extraCctx = this.cacheContext(cacheIds.get(i));
            String extraCacheName = extraCctx.name();
            if (extraCctx.isLocal()) continue;
            if (isReplicatedOnly && !extraCctx.isReplicated()) {
                throw new CacheException("Queries running on replicated cache should not contain JOINs with partitioned tables [replicatedCache=" + cctx.name() + ", partitionedCache=" + extraCacheName + "]");
            }
            Set<ClusterNode> extraNodes = this.stableDataNodesSet(topVer, extraCctx, parts);
            if (F.isEmpty(extraNodes)) {
                throw new CacheServerNotFoundException("Failed to find data nodes for cache: " + extraCacheName);
            }
            if (extraCctx.isReplicated()) {
                if (isReplicatedOnly) {
                    nodes.retainAll(extraNodes);
                    disjoint = nodes.isEmpty();
                } else {
                    disjoint = !extraNodes.containsAll(nodes);
                }
            } else {
                boolean bl = disjoint = !extraNodes.equals(nodes);
            }
            if (!disjoint) continue;
            if (this.isPreloadingActive(cacheIds)) {
                this.logRetry("Failed to calculate nodes for SQL query (got disjoint node map during rebalance) [affTopVer=" + topVer + ", cacheIds=" + cacheIds + ", parts=" + (parts == null ? "[]" : Arrays.toString(parts)) + ", replicatedOnly=" + isReplicatedOnly + ", lastCache=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ']');
                return null;
            }
            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + ", cache2=" + extraCacheName + "]");
        }
        return nodes;
    }

    private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer, GridCacheContext<?, ?> cctx, @NotNull int[] parts) {
        assert (!cctx.isReplicated());
        List assignment = cctx.affinity().assignment(topVer).assignment();
        HashMap<ClusterNode, IntArray> mapping = new HashMap<ClusterNode, IntArray>();
        for (int part : parts) {
            List partNodes = (List)assignment.get(part);
            if (partNodes.isEmpty()) continue;
            ClusterNode prim = (ClusterNode)partNodes.get(0);
            IntArray partIds = (IntArray)mapping.get(prim);
            if (partIds == null) {
                partIds = new IntArray();
                mapping.put(prim, partIds);
            }
            partIds.add(part);
        }
        return mapping;
    }

    private Set<ClusterNode> stableDataNodesSet(AffinityTopologyVersion topVer, GridCacheContext<?, ?> cctx, @Nullable int[] parts) {
        AffinityAssignment topologyAssignment = cctx.affinity().assignment(topVer);
        if (cctx.isReplicated()) {
            return topologyAssignment.nodes();
        }
        if (parts == null) {
            return topologyAssignment.primaryPartitionNodes();
        }
        List assignment = topologyAssignment.assignment();
        HashSet<ClusterNode> nodes = new HashSet<ClusterNode>();
        for (int part : parts) {
            List partNodes = (List)assignment.get(part);
            if (partNodes.isEmpty()) continue;
            nodes.add((ClusterNode)partNodes.get(0));
        }
        return nodes;
    }

    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds) {
        GridCacheContext<?, ?> cctx = this.findFirstPartitioned(cacheIds);
        int partsCnt = cctx.affinity().partitions();
        if (cacheIds.size() > 1) {
            for (Integer cacheId : cacheIds) {
                int parts;
                GridCacheContext<?, ?> extraCctx = this.cacheContext(cacheId);
                if (extraCctx.isReplicated() || extraCctx.isLocal() || (parts = extraCctx.affinity().partitions()) == partsCnt) continue;
                throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" + cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]");
            }
        }
        Set[] partLocs = new Set[partsCnt];
        for (int p = 0; p < partsCnt; ++p) {
            List owners = cctx.topology().owners(p);
            if (F.isEmpty((Collection)owners)) {
                if (F.isEmpty((Collection)cctx.affinity().assignment(AffinityTopologyVersion.NONE).get(p)) || cctx.topology().lostPartitions().contains(p)) {
                    partLocs[p] = UNMAPPED_PARTS;
                    continue;
                }
                if (!F.isEmpty(this.dataNodes(cctx.groupId(), AffinityTopologyVersion.NONE))) {
                    if (this.log.isInfoEnabled()) {
                        this.logRetry("Failed to calculate nodes for SQL query (partition has no owners, but corresponding cache group has data nodes) [cacheIds=" + cacheIds + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", part=" + p + ", cacheGroupId=" + cctx.groupId() + ']');
                    }
                    return null;
                }
                throw new CacheServerNotFoundException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]");
            }
            partLocs[p] = new HashSet(owners);
        }
        if (cacheIds.size() > 1) {
            for (Integer cacheId : cacheIds) {
                GridCacheContext<?, ?> extraCctx = this.cacheContext(cacheId);
                if (cctx == extraCctx || extraCctx.isReplicated() || extraCctx.isLocal()) continue;
                int parts = extraCctx.affinity().partitions();
                for (int p = 0; p < parts; ++p) {
                    List owners = extraCctx.topology().owners(p);
                    if (partLocs[p] == UNMAPPED_PARTS) continue;
                    if (F.isEmpty((Collection)owners)) {
                        if (!F.isEmpty(this.dataNodes(extraCctx.groupId(), AffinityTopologyVersion.NONE))) {
                            if (this.log.isInfoEnabled()) {
                                this.logRetry("Failed to calculate nodes for SQL query (partition has no owners, but corresponding cache group has data nodes) [cacheIds=" + cacheIds + ", cacheName=" + extraCctx.name() + ", cacheId=" + extraCctx.cacheId() + ", part=" + p + ", cacheGroupId=" + extraCctx.groupId() + ']');
                            }
                            return null;
                        }
                        throw new CacheServerNotFoundException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
                    }
                    if (partLocs[p] == null) {
                        partLocs[p] = new HashSet(owners);
                        continue;
                    }
                    partLocs[p].retainAll(owners);
                    if (!partLocs[p].isEmpty()) continue;
                    if (this.log.isInfoEnabled()) {
                        this.logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for partition) [cacheIds=" + cacheIds + ", lastCacheName=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ", part=" + p + ']');
                    }
                    return null;
                }
            }
            for (Integer cacheId : cacheIds) {
                GridCacheContext<?, ?> extraCctx = this.cacheContext(cacheId);
                if (!extraCctx.isReplicated()) continue;
                Set<ClusterNode> dataNodes = this.replicatedUnstableDataNodes(extraCctx);
                if (F.isEmpty(dataNodes)) {
                    return null;
                }
                int part = 0;
                for (Set partLoc : partLocs) {
                    if (partLoc == UNMAPPED_PARTS) continue;
                    partLoc.retainAll(dataNodes);
                    if (partLoc.isEmpty()) {
                        if (this.log.isInfoEnabled()) {
                            this.logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for partition) [cacheIds=" + cacheIds + ", lastReplicatedCacheName=" + extraCctx.name() + ", lastReplicatedCacheId=" + extraCctx.cacheId() + ", part=" + part + ']');
                        }
                        return null;
                    }
                    ++part;
                }
            }
        }
        HashMap<ClusterNode, IntArray> res = new HashMap<ClusterNode, IntArray>();
        for (int p = 0; p < partLocs.length; ++p) {
            Set pl = partLocs[p];
            if (pl == UNMAPPED_PARTS) continue;
            assert (!F.isEmpty((Collection)pl)) : pl;
            ClusterNode n = pl.size() == 1 ? (ClusterNode)F.first((Iterable)pl) : (ClusterNode)F.rand((Collection)pl);
            IntArray parts = (IntArray)res.get(n);
            if (parts == null) {
                parts = new IntArray();
                res.put(n, parts);
            }
            parts.add(p);
        }
        return res;
    }

    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds) {
        Set<ClusterNode> nodes;
        GridCacheContext<?, ?> cctx;
        int i = 0;
        if (!(cctx = this.cacheContext(cacheIds.get(i++))).isReplicated()) {
            assert (cacheIds.size() > 1) : "no extra replicated caches with partitioned main cache";
            cctx = this.cacheContext(cacheIds.get(i++));
            assert (cctx.isReplicated()) : "all the extra caches must be replicated here";
        }
        if (F.isEmpty(nodes = this.replicatedUnstableDataNodes(cctx))) {
            return null;
        }
        while (i < cacheIds.size()) {
            GridCacheContext<?, ?> extraCctx = this.cacheContext(cacheIds.get(i));
            if (!extraCctx.isLocal()) {
                if (!extraCctx.isReplicated()) {
                    throw new CacheException("Queries running on replicated cache should not contain JOINs with tables in partitioned caches [replicatedCache=" + cctx.name() + ", partitionedCache=" + extraCctx.name() + "]");
                }
                Set<ClusterNode> extraOwners = this.replicatedUnstableDataNodes(extraCctx);
                if (F.isEmpty(extraOwners)) {
                    return null;
                }
                nodes.retainAll(extraOwners);
                if (nodes.isEmpty()) {
                    this.logRetry("Failed to calculate nodes for SQL query (got disjoint node map for REPLICATED caches during rebalance) [cacheIds=" + cacheIds + ", lastCache=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ']');
                    return null;
                }
            }
            ++i;
        }
        return nodes;
    }

    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx) {
        assert (cctx.isReplicated()) : cctx.name() + " must be replicated";
        String cacheName = cctx.name();
        HashSet<ClusterNode> dataNodes = new HashSet<ClusterNode>(this.dataNodes(cctx.groupId(), AffinityTopologyVersion.NONE));
        if (dataNodes.isEmpty()) {
            throw new CacheServerNotFoundException("Failed to find data nodes for cache: " + cacheName);
        }
        int parts = cctx.affinity().partitions();
        for (int p = 0; p < parts; ++p) {
            List owners = cctx.topology().owners(p);
            if (F.isEmpty((Collection)owners)) {
                this.logRetry("Failed to calculate nodes for SQL query (partition of a REPLICATED cache has no owners) [cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", part=" + p + ']');
                return null;
            }
            dataNodes.retainAll(owners);
            if (!dataNodes.isEmpty()) continue;
            this.logRetry("Failed to calculate nodes for SQL query (partitions of a REPLICATED has no common owners) [cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", lastPart=" + p + ']');
            return null;
        }
        return dataNodes;
    }

    private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) {
        Set<ClusterNode> res = this.ctx.discovery().cacheGroupAffinityNodes(grpId, topVer);
        return res != null ? res : Collections.emptySet();
    }

    private static Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, IntArray> partsMap, int[] parts) {
        if (parts == null) {
            return partsMap;
        }
        HashMap cp = U.newHashMap((int)partsMap.size());
        for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) {
            IntArray filtered = new IntArray(parts.length);
            IntArray orig = entry.getValue();
            for (int i = 0; i < orig.size(); ++i) {
                int p = orig.get(i);
                if (Arrays.binarySearch(parts, p) < 0) continue;
                filtered.add(p);
            }
            if (filtered.size() <= 0) continue;
            cp.put(entry.getKey(), filtered);
        }
        return cp.isEmpty() ? null : cp;
    }

    public GridCacheContext<?, ?> findFirstPartitioned(List<Integer> cacheIds) {
        for (int i = 0; i < cacheIds.size(); ++i) {
            GridCacheContext<?, ?> cctx = this.cacheContext(cacheIds.get(i));
            if (i == 0 && cctx.isLocal()) {
                throw new CacheException("Cache is LOCAL: " + cctx.name());
            }
            if (cctx.isReplicated() || cctx.isLocal()) continue;
            return cctx;
        }
        throw new IllegalStateException("Failed to find partitioned cache.");
    }

    private void logRetry(String msg) {
        this.log.info(msg);
    }
}

