/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.checker.processor;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.checker.objects.CachePartitionRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.processor.PipelineWorkload;
import org.apache.ignite.internal.processors.cache.checker.processor.ReconciliationEventListener;
import org.apache.ignite.internal.processors.cache.checker.processor.ReconciliationEventListenerProvider;
import org.apache.ignite.internal.processors.cache.checker.util.DelayedHolder;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;

public class AbstractPipelineProcessor {
    protected final long sesId;
    private final BlockingQueue<DelayedHolder<? extends PipelineWorkload>> queue = new DelayQueue<DelayedHolder<? extends PipelineWorkload>>();
    private final BlockingQueue<DelayedHolder<? extends PipelineWorkload>> highPriorityQueue = new LinkedBlockingQueue<DelayedHolder<? extends PipelineWorkload>>();
    private final Semaphore liveListeners;
    protected final int parallelismLevel;
    protected final AffinityTopologyVersion startTopVer;
    protected final GridKernalContext ctx;
    protected volatile ReconciliationEventListener evtLsnr = ReconciliationEventListenerProvider.defaultListenerInstance();
    protected final AtomicReference<String> error = new AtomicReference();
    protected final IgniteEx ignite;
    private final GridCachePartitionExchangeManager<Object, Object> exchMgr;
    protected final IgniteLogger log;

    public AbstractPipelineProcessor(long sesId, IgniteEx ignite, int parallelismLevel) throws IgniteCheckedException {
        this.sesId = sesId;
        this.ctx = ignite.context();
        this.exchMgr = ignite.context().cache().context().exchange();
        this.startTopVer = this.exchMgr.lastAffinityChangedTopologyVersion((AffinityTopologyVersion)this.exchMgr.lastTopologyFuture().get());
        this.parallelismLevel = parallelismLevel;
        this.liveListeners = new Semaphore(parallelismLevel);
        this.ignite = ignite;
        this.log = ignite.log().getLogger(this.getClass());
    }

    public void registerListener(ReconciliationEventListener evtLsnr) {
        this.evtLsnr = evtLsnr;
    }

    public long sessionId() {
        return this.sesId;
    }

    protected boolean topologyChanged() throws IgniteCheckedException {
        AffinityTopologyVersion currVer = this.exchMgr.lastAffinityChangedTopologyVersion((AffinityTopologyVersion)this.exchMgr.lastTopologyFuture().get());
        return !this.startTopVer.equals(currVer);
    }

    protected boolean isSessionExpired() {
        return this.ignite.context().diagnostic().reconciliationExecutionContext().sessionId() != this.sesId;
    }

    protected boolean isInterrupted() {
        return this.error.get() != null;
    }

    protected boolean hasLiveHandlers() {
        return this.parallelismLevel != this.liveListeners.availablePermits();
    }

    protected void waitWorkFinish() {
        while (this.hasLiveHandlers()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    protected boolean isEmpty() {
        return this.highPriorityQueue.isEmpty() && this.queue.isEmpty();
    }

    protected PipelineWorkload takeTask() throws InterruptedException {
        return !this.highPriorityQueue.isEmpty() ? this.highPriorityQueue.take().getTask() : this.queue.take().getTask();
    }

    protected <T extends CachePartitionRequest, R> void compute(Class<? extends ComputeTask<T, ExecutionResult<R>>> taskCls, T workload, IgniteInClosure<? super R> lsnr) throws InterruptedException {
        this.liveListeners.acquire();
        ClusterGroup grp = this.partOwners(workload.cacheName(), workload.partitionId());
        this.evtLsnr.onEvent(ReconciliationEventListener.WorkLoadStage.BEFORE_PROCESSING, workload);
        this.ignite.compute(grp).executeAsync(taskCls, workload).listen(fut -> {
            try {
                ExecutionResult res;
                try {
                    res = (ExecutionResult)fut.get();
                }
                catch (RuntimeException e) {
                    this.log.error("Failed to execute the task " + taskCls.getName(), e);
                    this.error.compareAndSet(null, e.getMessage());
                    this.liveListeners.release();
                    return;
                }
                if (res.errorMessage() != null) {
                    this.error.compareAndSet(null, res.errorMessage());
                    return;
                }
                this.evtLsnr.onEvent(ReconciliationEventListener.WorkLoadStage.READY, workload);
                lsnr.apply((Object)res.result());
                this.evtLsnr.onEvent(ReconciliationEventListener.WorkLoadStage.FINISHED, workload);
            }
            finally {
                this.liveListeners.release();
            }
        });
    }

    protected void schedule(PipelineWorkload task) {
        this.schedule(task, 0L, TimeUnit.MILLISECONDS);
    }

    protected void scheduleHighPriority(PipelineWorkload task) {
        this.evtLsnr.onEvent(ReconciliationEventListener.WorkLoadStage.SCHEDULED, task);
        this.highPriorityQueue.offer(new DelayedHolder<PipelineWorkload>(-1L, task));
    }

    protected void schedule(PipelineWorkload task, long duration, TimeUnit timeUnit) {
        long finishTime = U.currentTimeMillis() + timeUnit.toMillis(duration);
        this.evtLsnr.onEvent(ReconciliationEventListener.WorkLoadStage.SCHEDULED, task);
        this.queue.offer(new DelayedHolder<PipelineWorkload>(finishTime, task));
    }

    private ClusterGroup partOwners(String cacheName, int partId) {
        List<ClusterNode> nodes = this.ignite.cachex(cacheName).context().topology().owners(partId, this.startTopVer);
        return this.ignite.cluster().forNodes(nodes);
    }
}

