/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.checker.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.objects.PartitionBatchRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.RecheckRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationAffectedEntries;
import org.apache.ignite.internal.processors.cache.checker.objects.RepairRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedKey;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedValue;
import org.apache.ignite.internal.processors.cache.checker.processor.AbstractPipelineProcessor;
import org.apache.ignite.internal.processors.cache.checker.processor.PipelineWorkload;
import org.apache.ignite.internal.processors.cache.checker.processor.ReconciliationEventListener;
import org.apache.ignite.internal.processors.cache.checker.processor.ReconciliationResultCollector;
import org.apache.ignite.internal.processors.cache.checker.processor.workload.Batch;
import org.apache.ignite.internal.processors.cache.checker.processor.workload.Recheck;
import org.apache.ignite.internal.processors.cache.checker.processor.workload.Repair;
import org.apache.ignite.internal.processors.cache.checker.tasks.CollectPartitionKeysByBatchTask;
import org.apache.ignite.internal.processors.cache.checker.tasks.CollectPartitionKeysByRecheckRequestTask;
import org.apache.ignite.internal.processors.cache.checker.tasks.RepairRequestTask;
import org.apache.ignite.internal.processors.cache.checker.util.ConsistencyCheckUtils;
import org.apache.ignite.internal.processors.cache.verify.RepairAlgorithm;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.U;

public class PartitionReconciliationProcessor
extends AbstractPipelineProcessor {
    public static final String SESSION_CHANGE_MSG = "Reconciliation session has changed.";
    public static final String TOPOLOGY_CHANGE_MSG = "Topology has changed. Partition reconciliation task was stopped.";
    public static final String WORK_PROGRESS_MSG = "Partition reconciliation task [sesId=%s, total=%s, remaining=%s]";
    public static final String START_EXECUTION_MSG = "Partition reconciliation has started [repair=%s, repairAlg=%s, fastCheck=%s, batchSize=%s, recheckAttempts=%s, parallelismLevel=%s, caches=%s]";
    public static final String ERROR_REASON = "Reason [msg=%s, exception=%s]";
    private final long workProgressPrintInterval = IgniteSystemProperties.getLong("WORK_PROGRESS_PRINT_INTERVAL", 180000L);
    private final int recheckDelay;
    private final Collection<String> caches;
    private final boolean repair;
    private final Map<Integer, Set<Integer>> partsToValidate;
    private final int batchSize;
    private final int recheckAttempts;
    private final RepairAlgorithm repairAlg;
    private final WorkloadTracker workloadTracker = new WorkloadTracker();
    final ReconciliationResultCollector collector;

    public PartitionReconciliationProcessor(long sesId, IgniteEx ignite, Collection<String> caches, Map<Integer, Set<Integer>> partsToValidate, boolean repair, RepairAlgorithm repairAlg, int parallelismLevel, int batchSize, int recheckAttempts, int recheckDelay, boolean compact, boolean includeSensitive) throws IgniteCheckedException {
        super(sesId, ignite, parallelismLevel);
        this.recheckDelay = recheckDelay;
        this.caches = caches;
        this.repair = repair;
        this.partsToValidate = partsToValidate;
        this.batchSize = batchSize;
        this.recheckAttempts = recheckAttempts;
        this.repairAlg = repairAlg;
        this.registerListener(this.workloadTracker.andThen(this.evtLsnr));
        this.collector = compact ? new ReconciliationResultCollector.Compact(ignite, this.log, sesId, includeSensitive) : new ReconciliationResultCollector.Simple(ignite, this.log, includeSensitive);
    }

    public ExecutionResult<ReconciliationAffectedEntries> execute() {
        if (this.log.isInfoEnabled()) {
            this.log.info(String.format(START_EXECUTION_MSG, new Object[]{this.repair, this.repairAlg, this.partsToValidate != null, this.batchSize, this.recheckAttempts, this.parallelismLevel, this.caches}));
        }
        try {
            for (String cache : this.caches) {
                int[] partitions;
                IgniteInternalCache cachex = this.ignite.cachex(cache);
                ExpiryPolicy expPlc = cachex.context().expiry();
                if (expPlc != null && !(expPlc instanceof EternalExpiryPolicy)) {
                    this.log.warning("The cache '" + cache + "' was skipped because CacheConfiguration#setExpiryPolicyFactory is set.");
                    continue;
                }
                for (int partId : partitions = this.partitions(cache)) {
                    Batch workload = new Batch(this.sesId, UUID.randomUUID(), cache, partId, null);
                    this.workloadTracker.addTrackingChain(workload);
                    this.schedule(workload);
                }
            }
            boolean live = false;
            long lastUpdateTime = 0L;
            while (!this.isEmpty() || (live = this.hasLiveHandlers())) {
                PipelineWorkload workload;
                if (this.topologyChanged()) {
                    throw new IgniteException(TOPOLOGY_CHANGE_MSG);
                }
                if (this.isSessionExpired()) {
                    throw new IgniteException(SESSION_CHANGE_MSG);
                }
                if (this.isInterrupted()) {
                    throw new IgniteException((String)this.error.get());
                }
                if (this.isEmpty() && live) {
                    U.sleep(100L);
                    continue;
                }
                long currTimeMillis = System.currentTimeMillis();
                if (currTimeMillis >= lastUpdateTime + this.workProgressPrintInterval) {
                    if (this.log.isInfoEnabled()) {
                        this.log.info(String.format(WORK_PROGRESS_MSG, this.sesId, this.workloadTracker.totalChains(), this.workloadTracker.remaningChains()));
                    }
                    lastUpdateTime = currTimeMillis;
                }
                if ((workload = this.takeTask()) instanceof Batch) {
                    this.handle((Batch)workload);
                    continue;
                }
                if (workload instanceof Recheck) {
                    this.handle((Recheck)workload);
                    continue;
                }
                if (workload instanceof Repair) {
                    this.handle((Repair)workload);
                    continue;
                }
                String err = "Unsupported workload type: " + workload;
                this.log.error(err);
                throw new IgniteException(err);
            }
            return new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result());
        }
        catch (InterruptedException | IgniteException e) {
            String errMsg = "Partition reconciliation was interrupted.";
            this.waitWorkFinish();
            this.log.warning(errMsg, e);
            return new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result(), errMsg + ' ' + String.format(ERROR_REASON, e.getMessage(), e.getClass()));
        }
        catch (Exception e) {
            String errMsg = "Unexpected error.";
            this.log.error(errMsg, e);
            return new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result(), errMsg + ' ' + String.format(ERROR_REASON, e.getMessage(), e.getClass()));
        }
    }

    public ReconciliationResultCollector collector() {
        return this.collector;
    }

    private int[] partitions(String name) {
        int[] cacheParts = this.ignite.affinity(name).primaryPartitions(this.ignite.localNode());
        if (this.partsToValidate == null) {
            return cacheParts;
        }
        Set<Integer> parts = this.partsToValidate.getOrDefault(this.ctx.cache().cacheDescriptor(name).groupId(), Collections.EMPTY_SET);
        return IntStream.of(cacheParts).filter(p -> parts.contains(p)).toArray();
    }

    private void handle(Batch workload) throws InterruptedException {
        this.compute(CollectPartitionKeysByBatchTask.class, new PartitionBatchRequest(workload.sessionId(), workload.workloadChainId(), workload.cacheName(), workload.partitionId(), this.batchSize, workload.lowerKey(), this.startTopVer), res -> {
            KeyCacheObject nextBatchKey = (KeyCacheObject)res.get1();
            Map recheckKeys = (Map)res.get2();
            assert (nextBatchKey != null || recheckKeys.isEmpty());
            if (nextBatchKey != null) {
                this.schedule(new Batch(workload.sessionId(), workload.workloadChainId(), workload.cacheName(), workload.partitionId(), nextBatchKey));
            }
            if (!recheckKeys.isEmpty()) {
                this.schedule(new Recheck(workload.sessionId(), workload.workloadChainId(), recheckKeys, workload.cacheName(), workload.partitionId(), 0, 0), this.recheckDelay, TimeUnit.SECONDS);
            }
        });
    }

    private void handle(Recheck workload) throws InterruptedException {
        this.compute(CollectPartitionKeysByRecheckRequestTask.class, new RecheckRequest(workload.sessionId(), workload.workloadChainId(), new ArrayList<KeyCacheObject>(workload.recheckKeys().keySet()), workload.cacheName(), workload.partitionId(), this.startTopVer), actualKeys -> {
            Map<KeyCacheObject, Map<UUID, GridCacheVersion>> conflicts = ConsistencyCheckUtils.checkConflicts(workload.recheckKeys(), actualKeys, this.ignite.cachex(workload.cacheName()).context(), this.startTopVer);
            if (!conflicts.isEmpty()) {
                if (workload.recheckAttempt() < this.recheckAttempts) {
                    this.schedule(new Recheck(workload.sessionId(), workload.workloadChainId(), conflicts, workload.cacheName(), workload.partitionId(), workload.recheckAttempt() + 1, workload.repairAttempt()), this.recheckDelay, TimeUnit.SECONDS);
                } else if (this.repair) {
                    this.scheduleHighPriority(this.repair(workload.sessionId(), workload.workloadChainId(), workload.cacheName(), workload.partitionId(), conflicts, (Map<KeyCacheObject, Map<UUID, VersionedValue>>)actualKeys, workload.repairAttempt()));
                } else {
                    this.collector.appendConflictedEntries(workload.cacheName(), workload.partitionId(), conflicts, (Map<KeyCacheObject, Map<UUID, VersionedValue>>)actualKeys);
                }
            }
        });
    }

    private void handle(Repair workload) throws InterruptedException {
        this.compute(RepairRequestTask.class, new RepairRequest(workload.sessionId(), workload.workloadChainId(), workload.data(), workload.cacheName(), workload.partitionId(), this.startTopVer, this.repairAlg, workload.repairAttempt()), repairRes -> {
            if (!repairRes.repairedKeys().isEmpty()) {
                this.collector.appendRepairedEntries(workload.cacheName(), workload.partitionId(), repairRes.repairedKeys());
            }
            if (!repairRes.keysToRepair().isEmpty()) {
                HashMap<KeyCacheObject, Map<UUID, GridCacheVersion>> recheckKeys = new HashMap<KeyCacheObject, Map<UUID, GridCacheVersion>>();
                for (Map.Entry<VersionedKey, Map<UUID, VersionedValue>> dataEntry : repairRes.keysToRepair().entrySet()) {
                    KeyCacheObject keyCacheObj;
                    try {
                        keyCacheObj = ConsistencyCheckUtils.unmarshalKey(dataEntry.getKey().key(), this.ignite.cachex(workload.cacheName()).context());
                    }
                    catch (IgniteCheckedException e) {
                        U.error(this.log, "Unable to unmarshal key=[" + dataEntry.getKey().key() + "], key is skipped.");
                        continue;
                    }
                    recheckKeys.put(keyCacheObj, dataEntry.getValue().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e2 -> ((VersionedValue)e2.getValue()).version())));
                }
                if (workload.repairAttempt() < 3) {
                    this.schedule(new Recheck(workload.sessionId(), workload.workloadChainId(), recheckKeys, workload.cacheName(), workload.partitionId(), this.recheckAttempts, workload.repairAttempt() + 1));
                }
            }
        });
    }

    private Repair repair(long sesId, UUID workloadChainId, String cacheName, int partId, Map<KeyCacheObject, Map<UUID, GridCacheVersion>> notResolvingConflicts, Map<KeyCacheObject, Map<UUID, VersionedValue>> actualKeys, int repairAttempts) {
        HashMap<KeyCacheObject, Map<UUID, VersionedValue>> res = new HashMap<KeyCacheObject, Map<UUID, VersionedValue>>();
        for (KeyCacheObject key : notResolvingConflicts.keySet()) {
            Map<UUID, VersionedValue> versionedByNodes = actualKeys.get(key);
            if (versionedByNodes == null) continue;
            res.put(key, versionedByNodes);
        }
        return new Repair(sesId, workloadChainId, cacheName, partId, res, repairAttempts);
    }

    private class WorkloadTracker
    implements ReconciliationEventListener {
        private final Map<UUID, ChainDescriptor> chanIds = new ConcurrentHashMap<UUID, ChainDescriptor>();
        private final AtomicInteger trackedChainsCnt = new AtomicInteger();
        private final AtomicInteger completedChainsCnt = new AtomicInteger();

        private WorkloadTracker() {
        }

        @Override
        public void onEvent(ReconciliationEventListener.WorkLoadStage stage, PipelineWorkload workload) {
            switch (stage) {
                case SCHEDULED: {
                    this.attachWorkload(workload);
                    break;
                }
                case FINISHED: {
                    this.detachWorkload(workload);
                    break;
                }
            }
        }

        public Integer totalChains() {
            return this.trackedChainsCnt.get();
        }

        public Integer remaningChains() {
            return this.trackedChainsCnt.get() - this.completedChainsCnt.get();
        }

        public void addTrackingChain(Batch batch) {
            assert (batch.sessionId() == PartitionReconciliationProcessor.this.sesId) : "New tracking chain does not correspond to the current session [currSesId=" + PartitionReconciliationProcessor.this.sesId + ", chainSesId=" + batch.sessionId() + ", chainId=" + batch.workloadChainId() + ']';
            this.chanIds.putIfAbsent(batch.workloadChainId(), new ChainDescriptor(batch.workloadChainId(), batch.cacheName(), batch.partitionId()));
            this.trackedChainsCnt.incrementAndGet();
        }

        private void attachWorkload(PipelineWorkload workload) {
            Optional.ofNullable(this.chanIds.get(workload.workloadChainId())).map(d -> ((ChainDescriptor)d).workloadCnt.incrementAndGet());
        }

        private void detachWorkload(PipelineWorkload workload) {
            ChainDescriptor desc = this.chanIds.get(workload.workloadChainId());
            if (desc != null && desc.workloadCnt.decrementAndGet() == 0) {
                this.completedChainsCnt.incrementAndGet();
                this.chanIds.remove(desc.chainId);
                this.onChainCompleted(desc.chainId, desc.cacheName, desc.partId);
            }
        }

        private void onChainCompleted(UUID chainId, String cacheName, int partId) {
            PartitionReconciliationProcessor.this.collector.onPartitionProcessed(cacheName, partId);
        }

        private class ChainDescriptor {
            private final UUID chainId;
            private final String cacheName;
            private final int partId;
            private final AtomicInteger workloadCnt = new AtomicInteger();

            ChainDescriptor(UUID chainId, String cacheName, int partId) {
                this.chainId = chainId;
                this.cacheName = cacheName;
                this.partId = partId;
            }
        }
    }
}

