/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.task;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAfterSend;
import org.apache.ignite.compute.ComputeJobBeforeFailover;
import org.apache.ignite.compute.ComputeJobFailoverException;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeLoadBalancer;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskContinuousMapper;
import org.apache.ignite.compute.ComputeTaskNoResultCache;
import org.apache.ignite.compute.ComputeTaskSpis;
import org.apache.ignite.compute.ComputeUserUndeclaredException;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridInternalException;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobResultImpl;
import org.apache.ignite.internal.GridJobSiblingImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.task.GridTaskEventListener;
import org.apache.ignite.internal.processors.task.GridTaskThreadContextKey;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.TaskContinuousMapperResource;
import org.jetbrains.annotations.Nullable;

public class GridTaskWorker<T, R>
extends GridWorker
implements GridTimeoutObject {
    private static final int SPLIT_WARN_THRESHOLD = 1000;
    private static final long RETRY_DELAY_MS = 10L;
    private boolean internal;
    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference();
    private final GridKernalContext ctx;
    private final IgniteLogger log;
    private final Marshaller marsh;
    private final GridTaskSessionImpl ses;
    private final ComputeTaskInternalFuture<R> fut;
    private final T arg;
    private final GridTaskEventListener evtLsnr;
    private Map<IgniteUuid, GridJobResultImpl> jobRes;
    private State state = State.WAITING;
    private final GridDeployment dep;
    private final Class<?> taskCls;
    private final Map<GridTaskThreadContextKey, Object> thCtx;
    private ComputeTask<T, R> task;
    private final Queue<GridJobExecuteResponse> delayedRess = new ConcurrentLinkedDeque<GridJobExecuteResponse>();
    private boolean continuous;
    private final Object mux = new Object();
    private boolean lockRespProc = true;
    private final boolean resCache;
    private final boolean noFailover;
    private final int affPartId;
    private final String affCacheName;
    private final int[] affCacheIds;
    private AffinityTopologyVersion mapTopVer;
    private int retryAttemptCnt;
    private final UUID subjId;
    private final ComputeTaskContinuousMapper mapper = new ComputeTaskContinuousMapper(){

        @Override
        public void send(ComputeJob job, ClusterNode node) {
            try {
                A.notNull(job, "job");
                A.notNull(node, "node");
                GridTaskWorker.this.processMappedJobs(Collections.singletonMap(job, node));
            }
            catch (IgniteCheckedException e) {
                throw U.convertException(e);
            }
        }

        @Override
        public void send(Map<? extends ComputeJob, ClusterNode> mappedJobs) {
            try {
                A.notNull(mappedJobs, "mappedJobs");
                GridTaskWorker.this.processMappedJobs(mappedJobs);
            }
            catch (IgniteCheckedException e) {
                throw U.convertException(e);
            }
        }

        @Override
        public void send(ComputeJob job) {
            A.notNull(job, "job");
            this.send(Collections.singleton(job));
        }

        @Override
        public void send(Collection<? extends ComputeJob> jobs) {
            try {
                A.notNull(jobs, "jobs");
                if (jobs.isEmpty()) {
                    throw new IgniteException("Empty jobs collection passed to send(...) method.");
                }
                ComputeLoadBalancer balancer = GridTaskWorker.this.ctx.loadBalancing().getLoadBalancer(GridTaskWorker.this.ses, GridTaskWorker.this.getTaskTopology());
                for (ComputeJob computeJob : jobs) {
                    if (computeJob == null) {
                        throw new IgniteException("Null job passed to send(...) method.");
                    }
                    GridTaskWorker.this.processMappedJobs(Collections.singletonMap(computeJob, balancer.getBalancedNode(computeJob, null)));
                }
            }
            catch (IgniteCheckedException e) {
                throw U.convertException(e);
            }
        }
    };

    GridTaskWorker(GridKernalContext ctx, @Nullable T arg, GridTaskSessionImpl ses, ComputeTaskInternalFuture<R> fut, @Nullable Class<?> taskCls, @Nullable ComputeTask<T, R> task, GridDeployment dep, GridTaskEventListener evtLsnr, @Nullable Map<GridTaskThreadContextKey, Object> thCtx, UUID subjId) {
        super(ctx.config().getIgniteInstanceName(), "grid-task-worker", ctx.log(GridTaskWorker.class));
        assert (ses != null);
        assert (fut != null);
        assert (evtLsnr != null);
        assert (dep != null);
        this.arg = arg;
        this.ctx = ctx;
        this.fut = fut;
        this.ses = ses;
        this.taskCls = taskCls;
        this.task = task;
        this.dep = dep;
        this.evtLsnr = evtLsnr;
        this.thCtx = thCtx;
        this.subjId = subjId;
        this.log = U.logger(ctx, logRef, this);
        this.marsh = ctx.config().getMarshaller();
        boolean noResCacheAnnotation = dep.annotation(taskCls, ComputeTaskNoResultCache.class) != null;
        Boolean noResCacheCtxFlag = (Boolean)this.getThreadContext(GridTaskThreadContextKey.TC_NO_RESULT_CACHE);
        this.resCache = !noResCacheAnnotation && (noResCacheCtxFlag == null || noResCacheCtxFlag == false);
        Boolean noFailover = (Boolean)this.getThreadContext(GridTaskThreadContextKey.TC_NO_FAILOVER);
        boolean bl = this.noFailover = noFailover != null ? noFailover : false;
        if (task instanceof AffinityTask) {
            AffinityTask affTask = (AffinityTask)((Object)task);
            assert (affTask.affinityCacheNames() != null) : affTask;
            assert (affTask.partition() >= 0) : affTask;
            this.affPartId = affTask.partition();
            this.affCacheName = F.first(affTask.affinityCacheNames());
            this.mapTopVer = affTask.topologyVersion();
            this.affCacheIds = new int[affTask.affinityCacheNames().size()];
            int i = 0;
            for (String cacheName : affTask.affinityCacheNames()) {
                this.affCacheIds[i] = CU.cacheId(cacheName);
                ++i;
            }
        } else {
            this.affPartId = -1;
            this.affCacheName = null;
            this.mapTopVer = null;
            this.affCacheIds = null;
        }
    }

    @Nullable
    private <V> V getThreadContext(GridTaskThreadContextKey key) {
        return (V)(this.thCtx == null ? null : this.thCtx.get((Object)key));
    }

    IgniteUuid getTaskSessionId() {
        return this.ses.getId();
    }

    public GridTaskSessionImpl getSession() {
        return this.ses;
    }

    ComputeTaskInternalFuture<R> getTaskFuture() {
        return this.fut;
    }

    GridDeployment getDeployment() {
        return this.dep;
    }

    public ComputeTask<T, R> getTask() {
        return this.task;
    }

    public void setTask(ComputeTask<T, R> task) {
        this.task = task;
    }

    public boolean isInternal() {
        return this.internal;
    }

    @Override
    public IgniteUuid timeoutId() {
        return this.ses.getId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onTimeout() {
        Object object = this.mux;
        synchronized (object) {
            if (this.state != State.WAITING) {
                return;
            }
        }
        U.warn(this.log, "Task has timed out: " + this.ses);
        this.recordTaskEvent(23, "Task has timed out.");
        ComputeTaskTimeoutCheckedException e = new ComputeTaskTimeoutCheckedException("Task timed out (check logs for error messages): " + this.ses);
        this.finishTask(null, e);
    }

    @Override
    public long endTime() {
        return this.ses.getEndTime();
    }

    private ComputeTask<T, R> newTask(Class<? extends ComputeTask<T, R>> taskCls) throws IgniteCheckedException {
        ComputeTask<T, R> task = this.dep.newInstance(taskCls);
        if (task == null) {
            throw new IgniteCheckedException("Failed to instantiate task (is default constructor available?): " + taskCls);
        }
        return task;
    }

    private void initializeSpis() {
        ComputeTaskSpis spis = this.dep.annotation(this.taskCls, ComputeTaskSpis.class);
        if (spis != null) {
            this.ses.setLoadBalancingSpi(spis.loadBalancingSpi());
            this.ses.setFailoverSpi(spis.failoverSpi());
            this.ses.setCheckpointSpi(spis.checkpointSpi());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void body() {
        block24: {
            this.evtLsnr.onTaskStarted(this);
            try {
                Object object;
                if (this.task == null) {
                    assert (this.taskCls != null);
                    assert (ComputeTask.class.isAssignableFrom(this.taskCls));
                    try {
                        this.task = this.newTask(this.taskCls);
                    }
                    catch (IgniteCheckedException e) {
                        this.internal = this.dep.internalTask(null, this.taskCls);
                        this.recordTaskEvent(20, "Task started.");
                        throw e;
                    }
                }
                this.internal = this.ses.isInternal();
                this.recordTaskEvent(20, "Task started.");
                this.initializeSpis();
                this.ses.setClassLoader(this.dep.classLoader());
                final List<ClusterNode> shuffledNodes = this.affCacheIds == null ? this.getTaskTopology() : Collections.emptyList();
                ComputeLoadBalancer balancer = this.ctx.loadBalancing().getLoadBalancer(this.ses, shuffledNodes);
                this.continuous = this.ctx.resource().isAnnotationPresent(this.dep, this.task, TaskContinuousMapperResource.class);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Injected task resources [continuous=" + this.continuous + ']');
                }
                this.ctx.resource().inject(this.dep, this.task, this.ses, balancer, this.mapper);
                Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(this.dep.classLoader(), new Callable<Map<? extends ComputeJob, ClusterNode>>(){

                    @Override
                    public Map<? extends ComputeJob, ClusterNode> call() {
                        return GridTaskWorker.this.task.map(shuffledNodes, GridTaskWorker.this.arg);
                    }
                });
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) + ", mappedJobs=" + mappedJobs + ", ses=" + this.ses + ']');
                }
                if (F.isEmpty(mappedJobs)) {
                    object = this.mux;
                    synchronized (object) {
                        if (F.isEmpty(this.jobRes)) {
                            throw new IgniteCheckedException("Task map operation produced no mapped jobs: " + this.ses);
                        }
                    }
                } else {
                    this.processMappedJobs(mappedJobs);
                }
                object = this.mux;
                synchronized (object) {
                    this.lockRespProc = false;
                }
                this.processDelayedResponses();
            }
            catch (ClusterGroupEmptyCheckedException e) {
                U.warn(this.log, "Failed to map task jobs to nodes (topology projection is empty): " + this.ses);
                this.finishTask(null, e);
            }
            catch (IgniteCheckedException | IgniteException e) {
                if (!this.fut.isCancelled()) {
                    if (!(e instanceof VisorClusterGroupEmptyException)) {
                        U.error(this.log, "Failed to map task jobs to nodes: " + this.ses, e);
                    }
                    this.finishTask(null, e);
                } else if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to map task jobs to nodes due to task cancellation: " + this.ses);
                }
            }
            catch (Throwable e) {
                String errMsg = "Failed to map task jobs to nodes due to undeclared user exception [cause=" + e.getMessage() + ", ses=" + this.ses + "]";
                U.error(this.log, errMsg, e);
                this.finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
                if (!(e instanceof Error)) break block24;
                throw e;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMappedJobs(Map<? extends ComputeJob, ClusterNode> jobs) throws IgniteCheckedException {
        if (F.isEmpty(jobs)) {
            return;
        }
        ArrayList<GridJobResultImpl> jobResList = new ArrayList<GridJobResultImpl>(jobs.size());
        ArrayList<ComputeJobSibling> sibs = new ArrayList<ComputeJobSibling>(jobs.size());
        for (Map.Entry<? extends ComputeJob, ClusterNode> entry : jobs.entrySet()) {
            ComputeJob job = entry.getKey();
            ClusterNode node = entry.getValue();
            if (job == null) {
                throw new IgniteCheckedException("Job can not be null [mappedJob=" + entry + ", ses=" + this.ses + ']');
            }
            if (node == null) {
                throw new IgniteCheckedException("Node can not be null [mappedJob=" + entry + ", ses=" + this.ses + ']');
            }
            IgniteUuid jobId = IgniteUuid.fromUuid(this.ctx.localNodeId());
            GridJobSiblingImpl sib = new GridJobSiblingImpl(this.ses.getId(), jobId, node.id(), this.ctx);
            jobResList.add(new GridJobResultImpl(job, jobId, node, sib));
            if (this.resCache) {
                sibs.add(sib);
            }
            this.recordJobEvent(40, jobId, node, null, "Job got mapped.");
        }
        Object object = this.mux;
        synchronized (object) {
            if (this.state != State.WAITING) {
                throw new IgniteCheckedException("Task is not in waiting state [state=" + (Object)((Object)this.state) + ", ses=" + this.ses + ']');
            }
            if (this.resCache) {
                this.ses.addJobSiblings(sibs);
            }
            if (this.jobRes == null) {
                this.jobRes = new HashMap<IgniteUuid, GridJobResultImpl>();
            }
            for (GridJobResultImpl res : jobResList) {
                if (this.jobRes.put(res.getJobContext().getJobId(), res) != null) {
                    throw new IgniteCheckedException("Duplicate job ID for remote job found: " + res.getJobContext().getJobId());
                }
                res.setOccupied(true);
                if (!this.resCache || this.jobRes.size() <= this.ctx.discovery().size() || this.jobRes.size() % 1000 != 0) continue;
                LT.warn(this.log, "Number of jobs in task is too large for task: " + this.ses.getTaskName() + ". Consider reducing number of jobs or disabling job result cache with @ComputeTaskNoResultCache annotation.");
            }
        }
        this.ses.onMapped();
        int jobResSize = jobResList.size();
        if (jobResSize > 1) {
            UUID uUID = this.ctx.discovery().localNode().id();
            for (int i = 0; i < jobResSize; ++i) {
                UUID jobNodeId = ((GridJobResultImpl)jobResList.get(i)).getNode().id();
                if (!jobNodeId.equals(uUID) || i >= jobResSize - 1) continue;
                Collections.swap(jobResList, i, jobResSize - 1);
                --jobResSize;
                --i;
            }
        }
        for (GridJobResultImpl res : jobResList) {
            this.evtLsnr.onJobSend(this, res.getSibling());
            try {
                this.sendRequest(res);
            }
            finally {
                Object object2 = this.mux;
                synchronized (object2) {
                    res.setOccupied(false);
                }
            }
        }
        this.processDelayedResponses();
    }

    private List<ClusterNode> getTaskTopology() throws IgniteCheckedException {
        Collection<UUID> top = this.ses.getTopology();
        Collection<ClusterNode> subgrid = top != null ? this.ctx.discovery().nodes(top, new IgnitePredicate[0]) : this.ctx.discovery().allNodes();
        int size = subgrid.size();
        if (size == 0) {
            throw new ClusterGroupEmptyCheckedException("Topology projection is empty.");
        }
        ArrayList<ClusterNode> shuffledNodes = new ArrayList<ClusterNode>(size);
        for (ClusterNode node : subgrid) {
            shuffledNodes.add(node);
        }
        if (shuffledNodes.size() > 1) {
            Collections.shuffle(shuffledNodes);
        }
        return shuffledNodes;
    }

    private void processDelayedResponses() {
        GridJobExecuteResponse res = this.delayedRess.poll();
        if (res != null) {
            this.onResponse(res);
        }
    }

    /*
     * Exception decompiling
     */
    void onResponse(GridJobExecuteResponse msg) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void sendRetryRequest(long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) {
        this.ctx.timeout().schedule(new Runnable(){

            @Override
            public void run() {
                GridTaskWorker.this.ctx.closure().runLocalSafe((Runnable)new GridPlainRunnable(){

                    @Override
                    public void run() {
                        try {
                            ClusterNode newNode = GridTaskWorker.this.ctx.affinity().mapPartitionToNode(GridTaskWorker.this.affCacheName, GridTaskWorker.this.affPartId, GridTaskWorker.this.mapTopVer);
                            if (!GridTaskWorker.this.checkTargetNode(resp, jRes, newNode)) {
                                return;
                            }
                            GridTaskWorker.this.sendRequest(jRes);
                        }
                        catch (Exception e) {
                            U.error(GridTaskWorker.this.log, "Failed to re-map job or retry request [ses=" + GridTaskWorker.this.ses + "]", e);
                            GridTaskWorker.this.finishTask(null, e);
                        }
                    }
                }, false);
            }
        }, waitms, -1L);
    }

    @Nullable
    private ComputeJobResultPolicy result(final ComputeJobResult jobRes, final List<ComputeJobResult> results) {
        assert (!Thread.holdsLock(this.mux));
        return U.wrapThreadLoader(this.dep.classLoader(), new CO<ComputeJobResultPolicy>(){

            @Override
            @Nullable
            public ComputeJobResultPolicy apply() {
                try {
                    ComputeJobResultPolicy plc = null;
                    try {
                        plc = GridTaskWorker.this.task.result(jobRes, results);
                        if (plc == ComputeJobResultPolicy.FAILOVER && GridTaskWorker.this.noFailover) {
                            IgniteException e = jobRes.getException();
                            if (e != null) {
                                throw e;
                            }
                            plc = ComputeJobResultPolicy.WAIT;
                        }
                    }
                    finally {
                        GridTaskWorker.this.recordJobEvent(41, jobRes.getJobContext().getJobId(), jobRes.getNode(), plc, "Job got resulted with: " + (Object)((Object)plc));
                    }
                    if (GridTaskWorker.this.log.isDebugEnabled()) {
                        GridTaskWorker.this.log.debug("Obtained job result policy [policy=" + (Object)((Object)plc) + ", ses=" + GridTaskWorker.this.ses + ']');
                    }
                    return plc;
                }
                catch (IgniteException e) {
                    if (X.hasCause((Throwable)e, GridInternalException.class)) {
                        if (GridTaskWorker.this.log.isDebugEnabled()) {
                            U.error(GridTaskWorker.this.log, "Failed to obtain remote job result policy for result from ComputeTask.result(..) method (will fail the whole task): " + jobRes, e);
                        }
                    } else {
                        if (X.hasCause((Throwable)e, ComputeJobFailoverException.class)) {
                            IgniteCheckedException e0 = new IgniteCheckedException(" Job was not failed over because ComputeJobResultPolicy.FAILOVER was not returned from ComputeTask.result(...) method for job result with ComputeJobFailoverException.", e);
                            GridTaskWorker.this.finishTask(null, e0);
                            return null;
                        }
                        if (X.hasCause((Throwable)e, GridServiceNotFoundException.class) || X.hasCause((Throwable)e, ClusterTopologyCheckedException.class)) {
                            LT.error(GridTaskWorker.this.log, e, "Failed to obtain remote job result policy for result from ComputeTask.result(..) method (will fail the whole task): " + jobRes);
                        } else {
                            U.error(GridTaskWorker.this.log, "Failed to obtain remote job result policy for result from ComputeTask.result(..) method (will fail the whole task): " + jobRes, e);
                        }
                    }
                    GridTaskWorker.this.finishTask(null, e);
                    return null;
                }
                catch (Throwable e) {
                    String errMsg = "Failed to obtain remote job result policy for result fromComputeTask.result(..) method due to undeclared user exception (will fail the whole task): " + jobRes;
                    U.error(GridTaskWorker.this.log, errMsg, e);
                    ComputeUserUndeclaredException tmp = new ComputeUserUndeclaredException(errMsg, e);
                    GridTaskWorker.this.finishTask(null, tmp);
                    if (e instanceof Error) {
                        throw e;
                    }
                    return null;
                }
            }
        });
    }

    /*
     * Exception decompiling
     */
    private void reduce(List<ComputeJobResult> results) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
        assert (Thread.holdsLock(this.mux));
        try {
            this.ctx.resource().invokeAnnotated(this.dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
            ClusterNode node = this.ctx.failover().failover(this.ses, jobRes, new ArrayList<ClusterNode>(top), this.affPartId, this.affCacheName, this.mapTopVer);
            return this.checkTargetNode(res, jobRes, node);
        }
        catch (Throwable e) {
            String errMsg = "Failed to failover job due to undeclared user exception [job=" + jobRes.getJob() + ", err=" + e + ']';
            U.error(this.log, errMsg, e);
            this.finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
            if (e instanceof Error) {
                throw (Error)e;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkTargetNode(GridJobExecuteResponse res, GridJobResultImpl jobRes, ClusterNode node) {
        if (node == null) {
            String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
            if (this.log.isDebugEnabled()) {
                this.log.debug(msg);
            }
            ClusterTopologyCheckedException e = new ClusterTopologyCheckedException(msg, jobRes.getException());
            this.finishTask(null, e);
            return false;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() + ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
        }
        Object object = this.mux;
        synchronized (object) {
            jobRes.setNode(node);
            jobRes.resetResponse();
            if (!this.resCache) {
                this.jobRes.put(res.getJobId(), jobRes);
            }
        }
        return true;
    }

    private void sendFailoverRequest(GridJobResultImpl jobRes) {
        this.evtLsnr.onJobFailover(this, jobRes.getSibling(), jobRes.getNode().id());
        long timeout = this.ses.getEndTime() - U.currentTimeMillis();
        if (timeout > 0L) {
            this.recordJobEvent(43, jobRes.getJobContext().getJobId(), jobRes.getNode(), ComputeJobResultPolicy.FAILOVER, "Job failed over.");
            this.sendRequest(jobRes);
        } else {
            U.warn(this.log, "Failed to fail-over job due to task timeout: " + jobRes);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelChildren() {
        LinkedList<GridJobResultImpl> doomed = new LinkedList<GridJobResultImpl>();
        Iterator iterator = this.mux;
        synchronized (iterator) {
            if (this.jobRes != null) {
                for (GridJobResultImpl res : this.jobRes.values()) {
                    if (res.hasResponse()) continue;
                    doomed.add(res);
                }
            }
        }
        for (GridJobResultImpl res : doomed) {
            UUID nodeId = res.getNode().id();
            if (nodeId.equals(this.ctx.localNodeId())) {
                this.ctx.job().cancelJob(this.ses.getId(), res.getJobContext().getJobId(), true);
                continue;
            }
            try {
                ClusterNode node = this.ctx.discovery().node(nodeId);
                if (node == null) continue;
                this.ctx.io().sendToGridTopic(node, GridTopic.TOPIC_JOB_CANCEL, (Message)new GridJobCancelRequest(this.ses.getId(), res.getJobContext().getJobId(), true), (byte)0);
            }
            catch (ClusterTopologyCheckedException e) {
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Failed to send cancel request, node failed: " + nodeId);
            }
            catch (IgniteCheckedException e) {
                try {
                    if (this.isDeadNode(nodeId)) continue;
                    U.error(this.log, "Failed to send cancel request to node (will ignore) [nodeId=" + nodeId + ", taskName=" + this.ses.getTaskName() + ", taskSesId=" + this.ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
                }
                catch (IgniteClientDisconnectedCheckedException ignored) {
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Failed to send cancel request to node, client disconnected [nodeId=" + nodeId + ", taskName=" + this.ses.getTaskName() + ']');
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendRequest(ComputeJobResult res) {
        block18: {
            assert (res != null);
            GridJobExecuteRequest req = null;
            ClusterNode node = res.getNode();
            try {
                long timeout;
                ClusterNode curNode = this.ctx.discovery().node(node.id());
                if (curNode == null) {
                    U.warn(this.log, "Failed to send job request because remote node left grid (if fail-over is enabled, will attempt fail-over to another node) [node=" + node + ", taskName=" + this.ses.getTaskName() + ", taskSesId=" + this.ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
                    this.ctx.resource().invokeAnnotated(this.dep, res.getJob(), ComputeJobAfterSend.class);
                    GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), this.ses.getId(), res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
                    fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));
                    this.onResponse(fakeRes);
                    break block18;
                }
                long l = timeout = this.ses.getEndTime() == Long.MAX_VALUE ? Long.MAX_VALUE : this.ses.getEndTime() - U.currentTimeMillis();
                if (timeout > 0L) {
                    boolean loc = node.id().equals(this.ctx.discovery().localNode().id()) && !this.ctx.config().isMarshalLocalJobs();
                    Map<Object, Object> sesAttrs = this.ses.isFullSupport() ? this.ses.getAttributes() : null;
                    Map<?, ?> jobAttrs = res.getJobContext().getAttributes();
                    boolean forceLocDep = this.internal || !this.ctx.deploy().enabled();
                    try {
                        MarshallerUtils.jobReceiverVersion(node.version());
                        req = new GridJobExecuteRequest(this.ses.getId(), res.getJobContext().getJobId(), this.ses.getTaskName(), this.ses.getUserVersion(), this.ses.getTaskClassName(), loc ? null : U.marshal(this.marsh, res.getJob()), loc ? (ComputeJob)res.getJob() : null, this.ses.getStartTime(), timeout, this.ses.getTopology(), loc ? this.ses.getTopologyPredicate() : null, loc ? null : U.marshal(this.marsh, this.ses.getTopologyPredicate()), loc ? null : U.marshal(this.marsh, this.ses.getJobSiblings()), loc ? this.ses.getJobSiblings() : null, loc ? null : U.marshal(this.marsh, sesAttrs), loc ? sesAttrs : null, loc ? null : U.marshal(this.marsh, jobAttrs), loc ? jobAttrs : null, this.ses.getCheckpointSpi(), this.dep.classLoaderId(), this.dep.deployMode(), this.continuous, this.dep.participants(), forceLocDep, this.ses.isFullSupport(), this.internal, this.subjId, this.affCacheIds, this.affPartId, this.mapTopVer, this.ses.executorName());
                    }
                    finally {
                        MarshallerUtils.jobReceiverVersion(null);
                    }
                    if (loc) {
                        this.ctx.job().processJobExecuteRequest(this.ctx.discovery().localNode(), req);
                    } else {
                        Byte ctxPlc;
                        byte plc = this.internal ? (byte)3 : ((ctxPlc = (Byte)this.getThreadContext(GridTaskThreadContextKey.TC_IO_POLICY)) != null ? (byte)ctxPlc : (byte)0);
                        this.ctx.io().sendToGridTopic(node, GridTopic.TOPIC_JOB, (Message)req, plc);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Sent job request [req=" + req + ", node=" + node + ']');
                        }
                    }
                    if (!loc) {
                        this.ctx.resource().invokeAnnotated(this.dep, res.getJob(), ComputeJobAfterSend.class);
                    }
                    break block18;
                }
                U.warn(this.log, "Job timed out prior to sending job execution request: " + res.getJob());
            }
            catch (IgniteCheckedException e) {
                IgniteException fakeErr = null;
                try {
                    boolean deadNode;
                    boolean bl = deadNode = e instanceof ClusterTopologyCheckedException || this.isDeadNode(res.getNode().id());
                    if (deadNode) {
                        U.warn(this.log, "Failed to send job request because remote node left grid (if failover is enabled, will attempt fail-over to another node) [node=" + node + ", taskName=" + this.ses.getTaskName() + ", taskSesId=" + this.ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
                        fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
                    } else {
                        U.error(this.log, "Failed to send job request: " + req, e);
                    }
                }
                catch (IgniteClientDisconnectedCheckedException e0) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to send job request, client disconnected [node=" + node + ", taskName=" + this.ses.getTaskName() + ", taskSesId=" + this.ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
                    }
                    fakeErr = U.convertException(e0);
                }
                GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), this.ses.getId(), res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
                if (fakeErr == null) {
                    fakeErr = U.convertException(e);
                }
                fakeRes.setFakeException(fakeErr);
                this.onResponse(fakeRes);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onNodeLeft(UUID nodeId) {
        ArrayList<GridJobExecuteResponse> resList = null;
        Iterator iterator = this.mux;
        synchronized (iterator) {
            if (this.state != State.WAITING) {
                return;
            }
            if (this.jobRes != null) {
                for (GridJobResultImpl jr : this.jobRes.values()) {
                    if (jr.hasResponse() || !jr.getNode().id().equals(nodeId)) continue;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Creating fake response because node left grid [job=" + jr.getJob() + ", nodeId=" + nodeId + ']');
                    }
                    GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(nodeId, this.ses.getId(), jr.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
                    fakeRes.setFakeException(new ClusterTopologyException("Node has left grid: " + nodeId));
                    if (resList == null) {
                        resList = new ArrayList<GridJobExecuteResponse>();
                    }
                    resList.add(fakeRes);
                }
            }
        }
        if (resList == null) {
            return;
        }
        for (GridJobExecuteResponse res : resList) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Simulating fake response from left node [res=" + res + ", nodeId=" + nodeId + ']');
            }
            this.onResponse(res);
        }
    }

    private void recordTaskEvent(int evtType, String msg) {
        if (!this.internal && this.ctx.event().isRecordable(evtType)) {
            TaskEvent evt = new TaskEvent(this.ctx.discovery().localNode(), msg, evtType, this.ses.getId(), this.ses.getTaskName(), this.ses.getTaskClassName(), this.internal, this.subjId);
            this.ctx.event().record(evt);
        }
    }

    private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode evtNode, @Nullable ComputeJobResultPolicy plc, String msg) {
        if (!this.internal && this.ctx.event().isRecordable(evtType)) {
            JobEvent evt = new JobEvent();
            evt.message(msg);
            evt.node(this.ctx.discovery().localNode());
            evt.taskName(this.ses.getTaskName());
            evt.taskClassName(this.ses.getTaskClassName());
            evt.taskSessionId(this.ses.getId());
            evt.taskNode(evtNode);
            evt.jobId(jobId);
            evt.type(evtType);
            evt.taskSubjectId(this.ses.subjectId());
            evt.resultPolicy(plc);
            this.ctx.event().record(evt);
        }
    }

    private List<ComputeJobResult> getRemoteResults() {
        assert (Thread.holdsLock(this.mux));
        ArrayList<ComputeJobResult> results = new ArrayList<ComputeJobResult>(this.jobRes.size());
        for (GridJobResultImpl jobResult : this.jobRes.values()) {
            if (!jobResult.hasResponse()) continue;
            results.add(jobResult);
        }
        return results;
    }

    void finishTask(@Nullable R res, @Nullable Throwable e) {
        this.finishTask(res, e, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void finishTask(@Nullable R res, @Nullable Throwable e, boolean cancelChildren) {
        Object object = this.mux;
        synchronized (object) {
            if (this.state == State.REDUCING || this.state == State.FINISHING) {
                return;
            }
            this.state = State.FINISHING;
        }
        try {
            if (e == null) {
                this.recordTaskEvent(21, "Task finished.");
            } else {
                this.recordTaskEvent(22, "Task failed.");
            }
            this.evtLsnr.onTaskFinished(this);
            if (cancelChildren) {
                this.cancelChildren();
            }
        }
        finally {
            this.fut.onDone(res, e);
            this.ses.onDone();
        }
    }

    private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException {
        return this.ctx.discovery().node(uid) == null || !this.ctx.discovery().pingNode(uid);
    }

    public String affCacheName() {
        return this.affCacheName;
    }

    public int affPartId() {
        return this.affPartId;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        assert (obj instanceof GridTaskWorker);
        return this.ses.getId().equals(((GridTaskWorker)obj).ses.getId());
    }

    public int hashCode() {
        return this.ses.getId().hashCode();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String toString() {
        Object object = this.mux;
        synchronized (object) {
            return S.toString(GridTaskWorker.class, this);
        }
    }

    private static enum State {
        WAITING,
        REDUCING,
        REDUCED,
        FINISHING;

    }
}

