/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.checker.tasks;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.objects.PartitionBatchRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedKey;
import org.apache.ignite.internal.processors.cache.checker.tasks.ReconciliationResourceLimitedJob;
import org.apache.ignite.internal.processors.cache.checker.util.ConsistencyCheckUtils;
import org.apache.ignite.internal.processors.cache.checker.util.KeyComparator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@GridInternal
public class CollectPartitionKeysByBatchTask
extends ComputeTaskAdapter<PartitionBatchRequest, ExecutionResult<T2<KeyCacheObject, Map<KeyCacheObject, Map<UUID, GridCacheVersion>>>>> {
    private static final long serialVersionUID = 0L;
    private static final KeyComparator KEY_COMPARATOR = new KeyComparator();
    @LoggerResource
    private IgniteLogger log;
    @IgniteInstanceResource
    private IgniteEx ignite;
    private volatile PartitionBatchRequest partBatch;

    @Override
    @NotNull
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, PartitionBatchRequest partBatch) throws IgniteException {
        HashMap<CollectPartitionKeysByBatchJob, ClusterNode> jobs = new HashMap<CollectPartitionKeysByBatchJob, ClusterNode>();
        this.partBatch = partBatch;
        for (ClusterNode node : subgrid) {
            jobs.put(new CollectPartitionKeysByBatchJob(partBatch), node);
        }
        return jobs;
    }

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
        ComputeJobResultPolicy superRes = super.result(res, rcvd);
        if (superRes == ComputeJobResultPolicy.FAILOVER) {
            superRes = ComputeJobResultPolicy.WAIT;
            this.log.warning("CollectPartitionEntryHashesJob failed on node [consistentId=" + res.getNode().consistentId() + "]", res.getException());
        }
        return superRes;
    }

    @Override
    @Nullable
    public ExecutionResult<T2<KeyCacheObject, Map<KeyCacheObject, Map<UUID, GridCacheVersion>>>> reduce(List<ComputeJobResult> results) throws IgniteException {
        assert (this.partBatch != null);
        GridCacheContext<Object, Object> ctx = this.ignite.context().cache().cache(this.partBatch.cacheName()).context();
        HashMap<KeyCacheObject, Map> totalRes = new HashMap<KeyCacheObject, Map>();
        KeyCacheObject lastKey = null;
        for (int i = 0; i < results.size(); ++i) {
            IgniteException exc = results.get(i).getException();
            if (exc != null) {
                return new ExecutionResult<T2<KeyCacheObject, Map<KeyCacheObject, Map<UUID, GridCacheVersion>>>>(exc.getMessage());
            }
            ExecutionResult nodeRes = (ExecutionResult)results.get(i).getData();
            if (nodeRes.errorMessage() != null) {
                return new ExecutionResult<T2<KeyCacheObject, Map<KeyCacheObject, Map<UUID, GridCacheVersion>>>>(nodeRes.errorMessage());
            }
            for (VersionedKey partKeyVer : (List)nodeRes.result()) {
                try {
                    KeyCacheObject key = ConsistencyCheckUtils.unmarshalKey(partKeyVer.key(), ctx);
                    if (lastKey == null || KEY_COMPARATOR.compare(lastKey, key) < 0) {
                        lastKey = key;
                    }
                    Map map = totalRes.computeIfAbsent(key, k -> new HashMap());
                    map.put(partKeyVer.nodeId(), partKeyVer.ver());
                    if (i != results.size() - 1 || map.size() != results.size() || this.hasConflict(map.values())) continue;
                    totalRes.remove(key);
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, e.getMessage(), e);
                    return new ExecutionResult<T2<KeyCacheObject, Map<KeyCacheObject, Map<UUID, GridCacheVersion>>>>(e.getMessage());
                }
            }
        }
        return new ExecutionResult<T2<KeyCacheObject, Map<KeyCacheObject, Map<UUID, GridCacheVersion>>>>(new T2(lastKey, totalRes));
    }

    private boolean hasConflict(Collection<GridCacheVersion> keyVersions) {
        assert (!keyVersions.isEmpty());
        Iterator<GridCacheVersion> iter = keyVersions.iterator();
        GridCacheVersion ver = iter.next();
        while (iter.hasNext()) {
            if (ver.equals(iter.next())) continue;
            return true;
        }
        return false;
    }

    public static class CollectPartitionKeysByBatchJob
    extends ReconciliationResourceLimitedJob {
        private static final long serialVersionUID = 0L;
        private PartitionBatchRequest partBatch;

        private CollectPartitionKeysByBatchJob(PartitionBatchRequest partBatch) {
            this.partBatch = partBatch;
        }

        @Override
        protected long sessionId() {
            return this.partBatch.sessionId();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        @Override
        protected ExecutionResult<List<VersionedKey>> execute0() {
            KeyCacheObject lowerKey;
            GridCacheContext<Object, Object> cctx = this.ignite.context().cache().cache(this.partBatch.cacheName()).context();
            CacheGroupContext grpCtx = cctx.group();
            int batchSize = this.partBatch.batchSize();
            try {
                lowerKey = ConsistencyCheckUtils.unmarshalKey(this.partBatch.lowerKey(), cctx);
            }
            catch (IgniteCheckedException e) {
                String errMsg = "Batch [" + this.partBatch + "] can't processed. Broken key.";
                this.log.error(errMsg, e);
                return new ExecutionResult<List<VersionedKey>>(errMsg + " " + e.getMessage());
            }
            GridDhtLocalPartition part = grpCtx.topology().localPartition(this.partBatch.partitionId());
            assert (part != null);
            part.reserve();
            try {
                ExecutionResult<List<VersionedKey>> executionResult;
                Throwable throwable;
                GridCursor<? extends CacheDataRow> cursor;
                block22: {
                    block23: {
                        cursor = lowerKey == null ? grpCtx.offheap().dataStore(part).cursor(cctx.cacheId()) : grpCtx.offheap().dataStore(part).cursor(cctx.cacheId(), lowerKey, null);
                        throwable = null;
                        ArrayList<VersionedKey> partEntryHashRecords = new ArrayList<VersionedKey>();
                        for (int i = 0; i < batchSize && cursor.next(); ++i) {
                            CacheDataRow row = cursor.get();
                            if (lowerKey == null || KEY_COMPARATOR.compare(lowerKey, row.key()) != 0) {
                                partEntryHashRecords.add(new VersionedKey(this.ignite.localNode().id(), row.key(), row.version()));
                                continue;
                            }
                            --i;
                        }
                        executionResult = new ExecutionResult<List<VersionedKey>>(partEntryHashRecords);
                        if (cursor == null) break block22;
                        if (throwable == null) break block23;
                        try {
                            cursor.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                        break block22;
                    }
                    cursor.close();
                }
                return executionResult;
                catch (Throwable throwable3) {
                    try {
                        try {
                            throwable = throwable3;
                            throw throwable3;
                        }
                        catch (Throwable throwable4) {
                            if (cursor != null) {
                                if (throwable != null) {
                                    try {
                                        cursor.close();
                                    }
                                    catch (Throwable throwable5) {
                                        throwable.addSuppressed(throwable5);
                                    }
                                } else {
                                    cursor.close();
                                }
                            }
                            throw throwable4;
                        }
                    }
                    catch (Exception e) {
                        String errMsg = "Batch [" + this.partBatch + "] can't processed. Broken cursor.";
                        this.log.error(errMsg, e);
                        ExecutionResult<List<VersionedKey>> executionResult2 = new ExecutionResult<List<VersionedKey>>(errMsg + " " + e.getMessage());
                        return executionResult2;
                    }
                }
            }
            finally {
                part.release();
            }
        }
    }
}

