/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.localtask;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;

public class DurableBackgroundTasksProcessor
extends GridProcessorAdapter
implements MetastorageLifecycleListener,
CheckpointListener {
    private static final String TASK_PREFIX = "durable-background-task-";
    private final Object metaStorageMux = new Object();
    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<String, DurableBackgroundTaskState>();
    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<String, DurableBackgroundTask>();
    private volatile boolean prohibitionExecTasks = true;
    private final GridBusyLock stopLock = new GridBusyLock();

    public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
        super(ctx);
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
    }

    @Override
    public void onKernalStop(boolean cancel) {
        this.cancelTasks();
        this.stopLock.block();
    }

    @Override
    public void onReadyForRead(ReadOnlyMetastorage metastorage) {
        if (!this.stopLock.enterBusy()) {
            return;
        }
        try {
            this.metaStorageOperation(metaStorage -> {
                assert (metaStorage != null);
                metaStorage.iterate(TASK_PREFIX, (k, v) -> {
                    DurableBackgroundTask t = (DurableBackgroundTask)v;
                    this.tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
                }, true);
            });
        }
        finally {
            this.stopLock.leaveBusy();
        }
    }

    @Override
    public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
        ((GridCacheDatabaseSharedManager)this.ctx.cache().context().database()).addCheckpointListener(this);
    }

    @Override
    public void beforeCheckpointBegin(CheckpointListener.Context ctx) {
    }

    @Override
    public void onMarkCheckpointBegin(CheckpointListener.Context ctx) {
        Iterator it = this.tasks.values().iterator();
        while (it.hasNext()) {
            DurableBackgroundTaskState taskState = (DurableBackgroundTaskState)it.next();
            if (taskState.state() != DurableBackgroundTaskState.State.COMPLETED) continue;
            assert (taskState.saved());
            DurableBackgroundTask t = taskState.task();
            this.toRmv.put(t.name(), t);
            it.remove();
        }
    }

    @Override
    public void onCheckpointBegin(CheckpointListener.Context ctx) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void afterCheckpointEnd(CheckpointListener.Context ctx) {
        if (!this.stopLock.enterBusy()) {
            return;
        }
        try {
            Iterator it = this.toRmv.values().iterator();
            while (it.hasNext()) {
                DurableBackgroundTask t = (DurableBackgroundTask)it.next();
                this.metaStorageOperation(metaStorage -> {
                    if (metaStorage != null) {
                        if (!this.tasks.containsKey(t.name())) {
                            metaStorage.remove(DurableBackgroundTasksProcessor.metaStorageKey(t));
                        }
                        it.remove();
                    }
                });
            }
        }
        finally {
            this.stopLock.leaveBusy();
        }
    }

    public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
        if (msg.state() == ClusterState.INACTIVE) {
            this.cancelTasks();
        }
    }

    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
        if (msg.state() != ClusterState.INACTIVE) {
            this.prohibitionExecTasks = false;
            if (DurableBackgroundTasksProcessor.executeTasksOnNodeStartOrActivate()) {
                for (DurableBackgroundTaskState taskState : this.tasks.values()) {
                    if (this.prohibitionExecTasks) continue;
                    this.executeAsync0(taskState.task());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
        if (!this.stopLock.enterBusy()) {
            throw new IgniteException("Node is stopping.");
        }
        try {
            DurableBackgroundTaskState taskState = this.tasks.compute(task.name(), (taskName, prev) -> {
                if (prev != null && prev.state() != DurableBackgroundTaskState.State.COMPLETED) {
                    throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
                }
                return new DurableBackgroundTaskState(task, new GridFutureAdapter<Void>(), save);
            });
            if (save) {
                this.metaStorageOperation(metaStorage -> {
                    if (metaStorage != null) {
                        metaStorage.write(DurableBackgroundTasksProcessor.metaStorageKey(task), task);
                    }
                });
            }
            if (!this.prohibitionExecTasks) {
                this.executeAsync0(task);
            }
            GridFutureAdapter<Void> gridFutureAdapter = taskState.outFuture();
            return gridFutureAdapter;
        }
        finally {
            this.stopLock.leaveBusy();
        }
    }

    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
        return this.executeAsync(t, CU.isPersistentCache(cacheCfg, this.ctx.config().getDataStorageConfiguration()));
    }

    private void executeAsync0(DurableBackgroundTask t) {
        this.cancelLock.readLock().lock();
        try {
            DurableBackgroundTaskState taskState = (DurableBackgroundTaskState)this.tasks.get(t.name());
            if (taskState != null && taskState.state(DurableBackgroundTaskState.State.INIT, DurableBackgroundTaskState.State.PREPARE)) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Executing durable background task: " + t.name());
                }
                t.executeAsync(this.ctx).listen(f -> {
                    DurableBackgroundTaskResult res = null;
                    try {
                        res = (DurableBackgroundTaskResult)f.get();
                    }
                    catch (Throwable e) {
                        this.log.error("Task completed with an error: " + t.name(), e);
                    }
                    assert (res != null);
                    if (res.error() != null) {
                        this.log.error("Could not execute durable background task: " + t.name(), res.error());
                    }
                    if (res.completed()) {
                        if (res.error() == null && this.log.isInfoEnabled()) {
                            this.log.info("Execution of durable background task completed: " + t.name());
                        }
                        if (taskState.saved()) {
                            taskState.state(DurableBackgroundTaskState.State.COMPLETED);
                        } else {
                            this.tasks.remove(t.name());
                        }
                        GridFutureAdapter<Void> outFut = taskState.outFuture();
                        if (outFut != null) {
                            outFut.onDone(res.error());
                        }
                    } else {
                        assert (res.restart());
                        if (this.log.isInfoEnabled()) {
                            this.log.info("Execution of durable background task will be restarted: " + t.name());
                        }
                        taskState.state(DurableBackgroundTaskState.State.INIT);
                    }
                });
                taskState.state(DurableBackgroundTaskState.State.PREPARE, DurableBackgroundTaskState.State.STARTED);
            }
        }
        finally {
            this.cancelLock.readLock().unlock();
        }
    }

    private void cancelTasks() {
        this.prohibitionExecTasks = true;
        this.cancelLock.writeLock().lock();
        try {
            for (DurableBackgroundTaskState taskState : this.tasks.values()) {
                if (taskState.state() != DurableBackgroundTaskState.State.STARTED) continue;
                taskState.task().cancel();
            }
        }
        finally {
            this.cancelLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void metaStorageOperation(IgniteThrowableConsumer<MetaStorage> consumer) throws IgniteException {
        Object object = this.metaStorageMux;
        synchronized (object) {
            IgniteCacheDatabaseSharedManager dbMgr = this.ctx.cache().context().database();
            dbMgr.checkpointReadLock();
            try {
                MetaStorage metaStorage = dbMgr.metaStorage();
                consumer.accept(metaStorage);
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
            finally {
                dbMgr.checkpointReadUnlock();
            }
        }
    }

    static String metaStorageKey(DurableBackgroundTask t) {
        return TASK_PREFIX + t.name();
    }

    private static boolean executeTasksOnNodeStartOrActivate() {
        return IgniteSystemProperties.getBoolean("IGNITE_EXECUTE_DURABLE_BACKGROUND_TASKS_ON_NODE_START_OR_ACTIVATE", true);
    }
}

