/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.verify.checker.tasks;

import java.io.File;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationAffectedEntries;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationAffectedEntriesExtended;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationResult;
import org.apache.ignite.internal.processors.cache.checker.processor.PartitionReconciliationProcessor;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.checker.VisorPartitionReconciliationTaskArg;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;

@GridInternal
public class PartitionReconciliationProcessorTask
extends ComputeTaskAdapter<VisorPartitionReconciliationTaskArg, ReconciliationResult> {
    private static final long serialVersionUID = 0L;
    @IgniteInstanceResource
    private IgniteEx ignite;
    @LoggerResource
    private IgniteLogger log;
    private boolean localOutoutMode;

    @Override
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, VisorPartitionReconciliationTaskArg arg) throws IgniteException {
        this.localOutoutMode = arg.locOutput();
        HashMap<PartitionReconciliationJob, ClusterNode> jobs = new HashMap<PartitionReconciliationJob, ClusterNode>();
        LocalDateTime startTime = LocalDateTime.now();
        long sesId = System.currentTimeMillis() / 1000L;
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        if (arg.parallelism() == 0 || arg.parallelism() > availableProcessors) {
            U.warn(this.log, "Partition reconciliation [session=" + sesId + "] will be executed with [parallelism=" + availableProcessors + "] according to number of CPU cores of the local node");
            arg = new VisorPartitionReconciliationTaskArg.Builder(arg).parallelism(availableProcessors).build();
        }
        this.ignite.compute().broadcastAsync(new ReconciliationSessionId(sesId, arg.parallelism())).get();
        if (arg.fastCheck()) {
            assert (this.ignite.context().discovery().discoCache().oldestAliveServerNode().id().equals(this.ignite.context().localNodeId())) : "PartitionReconciliationProcessorTask must be executed on the coordinator node [locNodeId=" + this.ignite.context().localNodeId() + ", crd=" + this.ignite.context().discovery().discoCache().oldestAliveServerNode().id() + ']';
            Map<Integer, Set<Integer>> invalidParts = Collections.emptyMap();
            GridDhtPartitionsExchangeFuture lastFut = this.ignite.context().cache().context().exchange().lastFinishedFuture();
            if (lastFut == null) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("PartitionReconciliationProcessorTask has nothing to check, the initial exchnage has not completed yet.");
                }
            } else {
                invalidParts = lastFut.invalidPartitions();
            }
            arg = new VisorPartitionReconciliationTaskArg.Builder(arg).partitionsToRepair(invalidParts).build();
        }
        for (ClusterNode node : subgrid) {
            jobs.put(new PartitionReconciliationJob(arg, startTime, sesId), node);
        }
        return jobs;
    }

    @Override
    public ReconciliationResult reduce(List<ComputeJobResult> results) throws IgniteException {
        HashMap<UUID, String> nodeIdToFolder = new HashMap<UUID, String>();
        ReconciliationAffectedEntries res = this.localOutoutMode ? new ReconciliationAffectedEntries() : new ReconciliationAffectedEntriesExtended();
        ArrayList<String> errors = new ArrayList<String>();
        for (ComputeJobResult result : results) {
            UUID nodeId = result.getNode().id();
            IgniteException exc = result.getException();
            if (exc != null) {
                errors.add(nodeId + " - " + exc.getMessage());
                continue;
            }
            T2 data = (T2)result.getData();
            nodeIdToFolder.put(nodeId, (String)data.get1());
            res.merge((ReconciliationAffectedEntries)((ExecutionResult)data.get2()).result());
            if (((ExecutionResult)data.get2()).errorMessage() == null) continue;
            errors.add(nodeId + " - " + ((ExecutionResult)data.get2()).errorMessage());
        }
        return new ReconciliationResult(res, nodeIdToFolder, errors);
    }

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
        IgniteException e = res.getException();
        if (e != null) {
            this.log.warning("PartitionReconciliationProcessorTask failed on node [consistentId=" + res.getNode().consistentId() + ", e=" + e.getMessage() + "]", res.getException());
        }
        return ComputeJobResultPolicy.WAIT;
    }

    @GridInternal
    public static class ReconciliationSessionId
    implements IgniteRunnable {
        private static final long serialVersionUID = 0L;
        private final long sesId;
        private final int parallelism;
        @IgniteInstanceResource
        private IgniteEx ignite;

        public ReconciliationSessionId(long sesId, int parallelism) {
            this.sesId = sesId;
            this.parallelism = parallelism;
        }

        @Override
        public void run() {
            this.ignite.context().diagnostic().reconciliationExecutionContext().registerSession(this.sesId, this.parallelism);
        }
    }

    private static class PartitionReconciliationJob
    extends ComputeJobAdapter {
        private static final long serialVersionUID = 0L;
        @IgniteInstanceResource
        private IgniteEx ignite;
        @LoggerResource
        private IgniteLogger log;
        private final VisorPartitionReconciliationTaskArg reconciliationTaskArg;
        private final LocalDateTime startTime;
        private long sesId;

        public PartitionReconciliationJob(VisorPartitionReconciliationTaskArg arg, LocalDateTime startTime, long sesId) {
            this.reconciliationTaskArg = arg;
            this.startTime = startTime;
            this.sesId = sesId;
        }

        @Override
        public T2<String, ExecutionResult<ReconciliationAffectedEntries>> execute() throws IgniteException {
            HashSet<String> caches = new HashSet<String>();
            if (this.reconciliationTaskArg.caches() == null || this.reconciliationTaskArg.caches().isEmpty()) {
                caches.addAll(this.ignite.context().cache().publicCacheNames());
            } else {
                for (String cacheRegexp : this.reconciliationTaskArg.caches()) {
                    ArrayList<String> acceptedCaches = new ArrayList<String>();
                    for (String cacheName : this.ignite.context().cache().publicCacheNames()) {
                        if (!cacheName.matches(cacheRegexp)) continue;
                        acceptedCaches.add(cacheName);
                    }
                    if (acceptedCaches.isEmpty()) {
                        return new T2<Object, ExecutionResult<ReconciliationAffectedEntriesExtended>>(null, new ExecutionResult<ReconciliationAffectedEntriesExtended>(new ReconciliationAffectedEntriesExtended(), "The cache '" + cacheRegexp + "' doesn't exist."));
                    }
                    caches.addAll(acceptedCaches);
                }
            }
            try {
                PartitionReconciliationProcessor proc = new PartitionReconciliationProcessor(this.sesId, this.ignite, caches, this.reconciliationTaskArg.partitionsToRepair(), this.reconciliationTaskArg.repair(), this.reconciliationTaskArg.repairAlg(), this.reconciliationTaskArg.parallelism(), this.reconciliationTaskArg.batchSize(), this.reconciliationTaskArg.recheckAttempts(), this.reconciliationTaskArg.recheckDelay(), !this.reconciliationTaskArg.locOutput(), this.reconciliationTaskArg.includeSensitive());
                ExecutionResult<ReconciliationAffectedEntries> reconciliationRes = proc.execute();
                File path = proc.collector().flushResultsToFile(this.startTime);
                return new T2<String, ExecutionResult<ReconciliationAffectedEntries>>(path != null ? path.getAbsolutePath() : null, reconciliationRes);
            }
            catch (Exception e) {
                String msg = "Reconciliation job failed on node [id=" + this.ignite.localNode().id() + "]. ";
                this.log.error(msg, e);
                throw new IgniteException(msg + String.format("Reason [msg=%s, exception=%s]", e.getMessage(), e.getClass()), e);
            }
        }
    }
}

