/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.migration;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.cluster.ClusterService;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.migration.MigrationDescriptor;
import org.nuxeo.runtime.migration.MigrationService;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;
import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
import org.nuxeo.runtime.pubsub.SerializableMessage;

public class MigrationServiceImpl
extends DefaultComponent
implements MigrationService {
    private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class);
    public static final String KEYVALUE_STORE_NAME = "migration";
    public static final String XP_CONFIG = "configuration";
    public static final String LOCK = ":lock";
    public static final String STEP = ":step";
    public static final String START_TIME = ":starttime";
    public static final String PING_TIME = ":pingtime";
    public static final String PROGRESS_MESSAGE = ":message";
    public static final String PROGRESS_NUM = ":num";
    public static final String PROGRESS_TOTAL = ":total";
    public static final long WRITE_LOCK_TTL = 10L;
    public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval";
    protected static final Random RANDOM = new Random();
    protected MigrationThreadPoolExecutor executor;
    protected String nodeId;
    protected MigrationInvalidator invalidator;

    protected static KeyValueStore getKeyValueStore() {
        KeyValueService service = (KeyValueService)Framework.getService(KeyValueService.class);
        Objects.requireNonNull(service, "Missing KeyValueService");
        return service.getKeyValueStore(KEYVALUE_STORE_NAME);
    }

    public Collection<MigrationDescriptor> getMigrationDescriptors() {
        return this.getDescriptors(XP_CONFIG);
    }

    public int getApplicationStartedOrder() {
        return -490;
    }

    public void start(ComponentContext context) {
        super.start(context);
        ClusterService clusterService = (ClusterService)Framework.getService(ClusterService.class);
        if (clusterService.isEnabled()) {
            this.nodeId = clusterService.getNodeId();
            this.invalidator = new MigrationInvalidator();
            this.invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, this.nodeId);
            log.info("Registered migration invalidator for node: {}", (Object)this.nodeId);
        } else {
            log.info("Not registering a migration invalidator because clustering is not enabled");
        }
        this.executor = new MigrationThreadPoolExecutor();
        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener(){

            public void beforeStop(ComponentManager mgr, boolean isStandby) {
                MigrationServiceImpl.this.executor.requestShutdown();
            }

            public void afterStop(ComponentManager mgr, boolean isStandby) {
                Framework.getRuntime().getComponentManager().removeListener((ComponentManager.Listener)this);
            }
        });
    }

    public void stop(ComponentContext context) throws InterruptedException {
        if (this.invalidator != null) {
            this.invalidator.close();
            this.invalidator = null;
        }
        this.executor.shutdownNow();
        this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        this.executor = null;
        super.stop(context);
    }

    @Override
    public MigrationService.MigrationStatus getStatus(String id) {
        MigrationDescriptor descr = (MigrationDescriptor)this.getDescriptor(XP_CONFIG, id);
        if (descr == null) {
            return null;
        }
        KeyValueStore kv = MigrationServiceImpl.getKeyValueStore();
        String state = kv.getString(id);
        if (state != null) {
            return new MigrationService.MigrationStatus(state);
        }
        String step = kv.getString(id + STEP);
        if (step == null) {
            state = descr.defaultState;
            return new MigrationService.MigrationStatus(state);
        }
        long startTime = Long.parseLong(kv.getString(id + START_TIME));
        long pingTime = Long.parseLong(kv.getString(id + PING_TIME));
        String progressMessage = kv.getString(id + PROGRESS_MESSAGE);
        long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM));
        long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL));
        if (progressMessage == null) {
            progressMessage = "";
        }
        return new MigrationService.MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal);
    }

    @Override
    public String probeAndSetState(String id) {
        MigrationService.Migrator migrator = this.getMigrator(id);
        String state = migrator.probeState();
        if (state != null) {
            ProgressReporter progressReporter = new ProgressReporter(id);
            this.setState(id, state, migrator, progressReporter);
        }
        return state;
    }

    protected void setState(String id, String state, MigrationService.Migrator migrator, ProgressReporter progressReporter) {
        this.atomic(id, kv -> {
            String currentState = kv.getString(id);
            String currentStep = kv.getString(id + STEP);
            if (currentState == null && currentStep != null) {
                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
            }
            this.setState(id, state, progressReporter, (KeyValueStore)kv);
        });
        migrator.notifyStatusChange();
    }

    protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) {
        kv.put(id, state);
        kv.put(id + STEP, (String)null);
        kv.put(id + START_TIME, (String)null);
        progressReporter.reportProgress(null, -2L, -2L, false);
    }

    @Override
    public void runStep(String id, String step) {
        MigrationService.Migrator migrator = this.getMigrator(id);
        MigrationDescriptor descr = (MigrationDescriptor)this.getDescriptor(XP_CONFIG, id);
        MigrationDescriptor.MigrationStepDescriptor stepDescr = descr.steps.get(step);
        if (stepDescr == null) {
            throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id);
        }
        ProgressReporter progressReporter = new ProgressReporter(id);
        this.atomic(id, kv -> {
            String state = kv.getString(id);
            String currentStep = kv.getString(id + STEP);
            if (state == null && currentStep == null) {
                state = descr.defaultState;
                if (!descr.states.containsKey(state)) {
                    throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id);
                }
            } else if (state == null) {
                throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep);
            }
            if (!descr.states.containsKey(state)) {
                throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id);
            }
            if (!stepDescr.fromState.equals(state)) {
                throw new IllegalArgumentException("Invalid step: " + step + " for migration: " + id + " in state: " + state);
            }
            String time = String.valueOf(System.currentTimeMillis());
            kv.put(id + STEP, step);
            kv.put(id + START_TIME, time);
            progressReporter.reportProgress("", 0L, -1L, true);
            kv.put(id, (String)null);
        });
        migrator.notifyStatusChange();
        Consumer<MigrationService.MigrationContext> migration = migrationContext -> {
            Thread.currentThread().setName("Nuxeo-Migrator-" + id);
            migrator.run(step, (MigrationService.MigrationContext)migrationContext);
        };
        BiConsumer<MigrationService.MigrationContext, Throwable> afterMigration = (migrationContext, t) -> {
            if (t != null) {
                log.error("Exception during execution of step: {} for migration: {}", (Object)step, (Object)id, t);
            }
            String state = t != null || migrationContext.isShutdownRequested() ? stepDescr.fromState : stepDescr.toState;
            this.atomic(id, kv -> this.setState(id, state, progressReporter, (KeyValueStore)kv));
            migrator.notifyStatusChange();
        };
        this.executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration));
    }

    protected MigrationService.Migrator getMigrator(String id) {
        MigrationDescriptor descr = (MigrationDescriptor)this.getDescriptor(XP_CONFIG, id);
        if (descr == null) {
            throw new IllegalArgumentException("Unknown migration: " + id);
        }
        Class<?> klass = descr.klass;
        if (!MigrationService.Migrator.class.isAssignableFrom(klass)) {
            throw new RuntimeException("Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id);
        }
        try {
            MigrationService.Migrator migrator = (MigrationService.Migrator)klass.getConstructor(new Class[0]).newInstance(new Object[0]);
            if (this.invalidator != null) {
                migrator = new InvalidatorMigrator(id, migrator, this.invalidator);
            }
            return migrator;
        }
        catch (ReflectiveOperationException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void atomic(String id, Consumer<KeyValueStore> consumer) {
        KeyValueStore kv = MigrationServiceImpl.getKeyValueStore();
        for (int i = 0; i < 5; ++i) {
            String value = Instant.now() + " node=" + this.nodeId;
            if (kv.compareAndSet(id + LOCK, null, value, 10L)) {
                try {
                    consumer.accept(kv);
                    return;
                }
                finally {
                    kv.put(id + LOCK, (String)null);
                }
            }
            try {
                Thread.sleep(RANDOM.nextInt(100) * i);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        String currentLock = kv.getString(id + LOCK);
        throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock);
    }

    protected static class MigrationThreadPoolExecutor
    extends ThreadPoolExecutor {
        protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<MigratorWithContext>();

        public MigrationThreadPoolExecutor() {
            super(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        }

        @Override
        protected void beforeExecute(Thread thread, Runnable runnable) {
            this.runnables.add((MigratorWithContext)runnable);
        }

        @Override
        protected void afterExecute(Runnable runnable, Throwable t) {
            this.runnables.remove(runnable);
            ((MigratorWithContext)runnable).afterMigration(t);
        }

        public void requestShutdown() {
            this.runnables.forEach(MigratorWithContext::requestShutdown);
        }
    }

    protected static class MigratorWithContext
    implements Runnable {
        protected final Consumer<MigrationService.MigrationContext> migration;
        protected final MigrationService.MigrationContext migrationContext;
        protected final BiConsumer<MigrationService.MigrationContext, Throwable> afterMigration;

        public MigratorWithContext(Consumer<MigrationService.MigrationContext> migration, ProgressReporter progressReporter, BiConsumer<MigrationService.MigrationContext, Throwable> afterMigration) {
            this.migration = migration;
            this.migrationContext = new MigrationContextImpl(progressReporter);
            this.afterMigration = afterMigration;
        }

        @Override
        public void run() {
            this.migration.accept(this.migrationContext);
        }

        public void afterMigration(Throwable t) {
            this.afterMigration.accept(this.migrationContext, t);
        }

        public void requestShutdown() {
            this.migrationContext.requestShutdown();
        }
    }

    protected static class MigrationContextImpl
    implements MigrationService.MigrationContext {
        protected final ProgressReporter progressReporter;
        protected volatile boolean shutdown;

        public MigrationContextImpl(ProgressReporter progressReporter) {
            this.progressReporter = progressReporter;
        }

        @Override
        public void reportProgress(String message, long num, long total) {
            this.progressReporter.reportProgress(message, num, total, true);
        }

        @Override
        public void requestShutdown() {
            this.shutdown = true;
        }

        @Override
        public boolean isShutdownRequested() {
            return this.shutdown || Thread.currentThread().isInterrupted();
        }
    }

    protected static class ProgressReporter {
        protected final String id;

        public ProgressReporter(String id) {
            this.id = id;
        }

        public void reportProgress(String message, long num, long total, boolean ping) {
            KeyValueStore keyValueStore = MigrationServiceImpl.getKeyValueStore();
            keyValueStore.put(this.id + MigrationServiceImpl.PROGRESS_MESSAGE, message);
            keyValueStore.put(this.id + MigrationServiceImpl.PROGRESS_NUM, num == -2L ? null : String.valueOf(num));
            keyValueStore.put(this.id + MigrationServiceImpl.PROGRESS_TOTAL, total == -2L ? null : String.valueOf(total));
            keyValueStore.put(this.id + MigrationServiceImpl.PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null);
        }
    }

    public static class InvalidatorMigrator
    implements MigrationService.Migrator {
        protected final String id;
        protected final MigrationService.Migrator migrator;
        protected final MigrationInvalidator invalidator;

        public InvalidatorMigrator(String id, MigrationService.Migrator migrator, MigrationInvalidator invalidator) {
            this.id = id;
            this.migrator = migrator;
            this.invalidator = invalidator;
        }

        @Override
        public String probeState() {
            return this.migrator.probeState();
        }

        @Override
        public void run(String step, MigrationService.MigrationContext migrationContext) {
            this.migrator.run(step, migrationContext);
        }

        @Override
        public void notifyStatusChange() {
            this.migrator.notifyStatusChange();
            this.invalidator.sendMessage(new MigrationInvalidation(this.id));
        }
    }

    public class MigrationInvalidator
    extends AbstractPubSubBroker<MigrationInvalidation> {
        public MigrationInvalidation deserialize(InputStream in) throws IOException {
            return MigrationInvalidation.deserialize(in);
        }

        public void receivedMessage(MigrationInvalidation message) {
            String id = message.id;
            MigrationService.Migrator migrator = MigrationServiceImpl.this.getMigrator(id);
            if (migrator == null) {
                log.error("Unknown migration id received in invalidation: {}", (Object)id);
                return;
            }
            ((InvalidatorMigrator)migrator).migrator.notifyStatusChange();
        }
    }

    public static class MigrationInvalidation
    implements SerializableMessage {
        private static final long serialVersionUID = 1L;
        public final String id;

        public MigrationInvalidation(String id) {
            this.id = id;
        }

        public void serialize(OutputStream out) throws IOException {
            IOUtils.write((String)this.id, (OutputStream)out, (Charset)StandardCharsets.UTF_8);
        }

        public static MigrationInvalidation deserialize(InputStream in) throws IOException {
            String id = IOUtils.toString((InputStream)in, (Charset)StandardCharsets.UTF_8);
            return new MigrationInvalidation(id);
        }

        public String toString() {
            return this.getClass().getSimpleName() + "(" + this.id + ")";
        }
    }
}

