/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.localtask;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;

public class DurableBackgroundTasksProcessor
extends GridProcessorAdapter
implements MetastorageLifecycleListener,
DbCheckpointListener {
    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
    private volatile ReadWriteMetastorage metastorage;
    private final Object metaStorageMux = new Object();
    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<GridWorker>();
    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap();
    private final Set<String> startedTasks = new GridConcurrentHashSet<String>();
    private volatile boolean forbidStartingNewTasks;

    public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
        super(ctx);
    }

    private void asyncDurableBackgroundTasksExecution() {
        assert (this.durableBackgroundTasks != null);
        for (DurableBackgroundTask task : this.durableBackgroundTasks.values()) {
            if (task.isCompleted() || !this.startedTasks.add(task.shortName())) continue;
            this.asyncDurableBackgroundTaskExecute(task);
        }
    }

    private void asyncDurableBackgroundTaskExecute(final DurableBackgroundTask task) {
        String workerName = "async-durable-background-task-executor-" + this.asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
        GridWorker worker = new GridWorker(this.ctx.igniteInstanceName(), workerName, this.log){

            @Override
            public void cancel() {
                task.onCancel();
                super.cancel();
            }

            @Override
            protected void body() {
                try {
                    if (DurableBackgroundTasksProcessor.this.forbidStartingNewTasks) {
                        return;
                    }
                    this.log.info("Executing durable background task: " + task.shortName());
                    task.execute(DurableBackgroundTasksProcessor.this.ctx);
                    task.complete();
                    this.log.info("Execution of durable background task completed: " + task.shortName());
                }
                catch (Throwable e) {
                    this.log.error("Could not execute durable background task: " + task.shortName(), e);
                    if (e instanceof Error) {
                        DurableBackgroundTasksProcessor.this.ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
                    }
                }
                finally {
                    DurableBackgroundTasksProcessor.this.startedTasks.remove(task.shortName());
                    DurableBackgroundTasksProcessor.this.asyncDurableBackgroundTaskWorkers.remove(this);
                }
            }
        };
        this.asyncDurableBackgroundTaskWorkers.add(worker);
        IgniteThread asyncTask = new IgniteThread(worker);
        asyncTask.start();
    }

    @Override
    public void onKernalStart(boolean active) {
        this.asyncDurableBackgroundTasksExecution();
    }

    @Override
    public void onKernalStop(boolean cancel) {
        this.forbidStartingNewTasks = true;
        IgniteUtils.awaitForWorkersStop(this.asyncDurableBackgroundTaskWorkers, true, this.log);
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
    }

    public void onStateChange(ChangeGlobalStateMessage msg) {
        if (msg.state() == ClusterState.INACTIVE) {
            this.forbidStartingNewTasks = true;
            IgniteUtils.awaitForWorkersStop(this.asyncDurableBackgroundTaskWorkers, true, this.log);
        }
    }

    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
        if (msg.state() != ClusterState.INACTIVE) {
            this.forbidStartingNewTasks = false;
            this.asyncDurableBackgroundTasksExecution();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onReadyForRead(ReadOnlyMetastorage metastorage) {
        Object object = this.metaStorageMux;
        synchronized (object) {
            if (this.durableBackgroundTasks.isEmpty()) {
                try {
                    metastorage.iterate(STORE_DURABLE_BACKGROUND_TASK_PREFIX, (key, val) -> this.durableBackgroundTasks.put((String)key, (DurableBackgroundTask)val), true);
                }
                catch (IgniteCheckedException e) {
                    throw new IgniteException("Failed to iterate durable background tasks storage.", e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
        Object object = this.metaStorageMux;
        synchronized (object) {
            try {
                for (Map.Entry<String, DurableBackgroundTask> entry : this.durableBackgroundTasks.entrySet()) {
                    if (metastorage.readRaw(entry.getKey()) != null) continue;
                    metastorage.write(entry.getKey(), entry.getValue());
                }
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException("Failed to read key from durable background tasks storage.", e);
            }
        }
        ((GridCacheDatabaseSharedManager)this.ctx.cache().context().database()).addCheckpointListener(this);
        this.metastorage = metastorage;
    }

    private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
        String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
        if (k.length() > 64) {
            int hashLenLimit = 5;
            String hash = String.valueOf(k.hashCode());
            k = k.substring(0, 64 - hashLenLimit) + (hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
        }
        return k;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addDurableBackgroundTask(DurableBackgroundTask obj) {
        String objName = this.durableBackgroundTaskMetastorageKey(obj);
        Object object = this.metaStorageMux;
        synchronized (object) {
            this.durableBackgroundTasks.put(objName, obj);
            if (this.metastorage != null) {
                this.ctx.cache().context().database().checkpointReadLock();
                try {
                    this.metastorage.write(objName, obj);
                }
                catch (IgniteCheckedException e) {
                    throw new IgniteException(e);
                }
                finally {
                    this.ctx.cache().context().database().checkpointReadUnlock();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeDurableBackgroundTask(DurableBackgroundTask obj) {
        String objName = this.durableBackgroundTaskMetastorageKey(obj);
        Object object = this.metaStorageMux;
        synchronized (object) {
            this.durableBackgroundTasks.remove(objName);
            if (this.metastorage != null) {
                this.ctx.cache().context().database().checkpointReadLock();
                try {
                    this.metastorage.remove(objName);
                }
                catch (IgniteCheckedException e) {
                    throw new IgniteException(e);
                }
                finally {
                    this.ctx.cache().context().database().checkpointReadUnlock();
                }
            }
        }
    }

    public void startDurableBackgroundTask(DurableBackgroundTask task, CacheConfiguration ccfg) {
        if (CU.isPersistentCache(ccfg, this.ctx.config().getDataStorageConfiguration())) {
            this.addDurableBackgroundTask(task);
        }
        this.asyncDurableBackgroundTaskExecute(task);
    }

    @Override
    public void onMarkCheckpointBegin(DbCheckpointListener.Context ctx) {
        for (DurableBackgroundTask task : this.durableBackgroundTasks.values()) {
            if (!task.isCompleted()) continue;
            this.removeDurableBackgroundTask(task);
        }
    }

    @Override
    public void onCheckpointBegin(DbCheckpointListener.Context ctx) {
    }

    @Override
    public void beforeCheckpointBegin(DbCheckpointListener.Context ctx) {
    }
}

