package org.nuxeo.runtime.migration;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.cluster.ClusterService;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.migration.MigrationDescriptor;
import org.nuxeo.runtime.migration.MigrationService;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;
import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
import org.nuxeo.runtime.pubsub.SerializableMessage;

/* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl.class */
public class MigrationServiceImpl extends DefaultComponent implements MigrationService {
    public static final String KEYVALUE_STORE_NAME = "migration";
    public static final String XP_CONFIG = "configuration";
    public static final String LOCK = ":lock";
    public static final String STEP = ":step";
    public static final String START_TIME = ":starttime";
    public static final String PING_TIME = ":pingtime";
    public static final String PROGRESS_MESSAGE = ":message";
    public static final String PROGRESS_NUM = ":num";
    public static final String PROGRESS_TOTAL = ":total";
    public static final long WRITE_LOCK_TTL = 10;
    public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval";
    protected MigrationThreadPoolExecutor executor;
    protected String nodeId;
    protected MigrationInvalidator invalidator;
    private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class);
    protected static final Random RANDOM = new Random();

    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$InvalidatorMigrator.class */
    public static class InvalidatorMigrator implements MigrationService.Migrator {
        protected final String id;
        protected final MigrationService.Migrator migrator;
        protected final MigrationInvalidator invalidator;

        public InvalidatorMigrator(String str, MigrationService.Migrator migrator, MigrationInvalidator migrationInvalidator) {
            this.id = str;
            this.migrator = migrator;
            this.invalidator = migrationInvalidator;
        }

        @Override // org.nuxeo.runtime.migration.MigrationService.Migrator
        public String probeState() {
            return this.migrator.probeState();
        }

        @Override // org.nuxeo.runtime.migration.MigrationService.Migrator
        public void run(String str, MigrationService.MigrationContext migrationContext) {
            this.migrator.run(str, migrationContext);
        }

        @Override // org.nuxeo.runtime.migration.MigrationService.Migrator
        public void notifyStatusChange() {
            this.migrator.notifyStatusChange();
            this.invalidator.sendMessage(new MigrationInvalidation(this.id));
        }
    }

    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$MigrationContextImpl.class */
    protected static class MigrationContextImpl implements MigrationService.MigrationContext {
        protected final ProgressReporter progressReporter;
        protected volatile boolean shutdown;

        public MigrationContextImpl(ProgressReporter progressReporter) {
            this.progressReporter = progressReporter;
        }

        @Override // org.nuxeo.runtime.migration.MigrationService.MigrationContext
        public void reportProgress(String str, long j, long j2) {
            this.progressReporter.reportProgress(str, j, j2, true);
        }

        @Override // org.nuxeo.runtime.migration.MigrationService.MigrationContext
        public void requestShutdown() {
            this.shutdown = true;
        }

        @Override // org.nuxeo.runtime.migration.MigrationService.MigrationContext
        public boolean isShutdownRequested() {
            return this.shutdown || Thread.currentThread().isInterrupted();
        }
    }

    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$MigrationInvalidation.class */
    public static class MigrationInvalidation implements SerializableMessage {
        private static final long serialVersionUID = 1;
        public final String id;

        public MigrationInvalidation(String str) {
            this.id = str;
        }

        public void serialize(OutputStream outputStream) throws IOException {
            IOUtils.write(this.id, outputStream, StandardCharsets.UTF_8);
        }

        public static MigrationInvalidation deserialize(InputStream inputStream) throws IOException {
            return new MigrationInvalidation(IOUtils.toString(inputStream, StandardCharsets.UTF_8));
        }

        public String toString() {
            return getClass().getSimpleName() + "(" + this.id + ")";
        }
    }

    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$MigrationInvalidator.class */
    public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> {
        public MigrationInvalidator() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public MigrationInvalidation m1deserialize(InputStream inputStream) throws IOException {
            return MigrationInvalidation.deserialize(inputStream);
        }

        public void receivedMessage(MigrationInvalidation migrationInvalidation) {
            String str = migrationInvalidation.id;
            MigrationService.Migrator migrator = MigrationServiceImpl.this.getMigrator(str);
            if (migrator == null) {
                MigrationServiceImpl.log.error("Unknown migration id received in invalidation: {}", str);
            } else {
                ((InvalidatorMigrator) migrator).migrator.notifyStatusChange();
            }
        }
    }

    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$MigrationThreadPoolExecutor.class */
    protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor {
        protected final List<MigratorWithContext> runnables;

        public MigrationThreadPoolExecutor() {
            super(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue());
            this.runnables = new CopyOnWriteArrayList();
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void beforeExecute(Thread thread, Runnable runnable) {
            this.runnables.add((MigratorWithContext) runnable);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void afterExecute(Runnable runnable, Throwable th) {
            this.runnables.remove(runnable);
            ((MigratorWithContext) runnable).afterMigration(th);
        }

        public void requestShutdown() {
            this.runnables.forEach((v0) -> {
                v0.requestShutdown();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$MigratorWithContext.class */
    public static class MigratorWithContext implements Runnable {
        protected final Consumer<MigrationService.MigrationContext> migration;
        protected final MigrationService.MigrationContext migrationContext;
        protected final BiConsumer<MigrationService.MigrationContext, Throwable> afterMigration;

        public MigratorWithContext(Consumer<MigrationService.MigrationContext> consumer, ProgressReporter progressReporter, BiConsumer<MigrationService.MigrationContext, Throwable> biConsumer) {
            this.migration = consumer;
            this.migrationContext = new MigrationContextImpl(progressReporter);
            this.afterMigration = biConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.migration.accept(this.migrationContext);
        }

        public void afterMigration(Throwable th) {
            this.afterMigration.accept(this.migrationContext, th);
        }

        public void requestShutdown() {
            this.migrationContext.requestShutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/nuxeo/runtime/migration/MigrationServiceImpl$ProgressReporter.class */
    public static class ProgressReporter {
        protected final String id;

        public ProgressReporter(String str) {
            this.id = str;
        }

        public void reportProgress(String str, long j, long j2, boolean z) {
            KeyValueStore keyValueStore = MigrationServiceImpl.getKeyValueStore();
            keyValueStore.put(this.id + ":message", str);
            keyValueStore.put(this.id + ":num", j == -2 ? null : String.valueOf(j));
            keyValueStore.put(this.id + ":total", j2 == -2 ? null : String.valueOf(j2));
            keyValueStore.put(this.id + ":pingtime", z ? String.valueOf(System.currentTimeMillis()) : null);
        }
    }

    protected static KeyValueStore getKeyValueStore() {
        KeyValueService keyValueService = (KeyValueService) Framework.getService(KeyValueService.class);
        Objects.requireNonNull(keyValueService, "Missing KeyValueService");
        return keyValueService.getKeyValueStore(KEYVALUE_STORE_NAME);
    }

    public Collection<MigrationDescriptor> getMigrationDescriptors() {
        return getDescriptors(XP_CONFIG);
    }

    public int getApplicationStartedOrder() {
        return -490;
    }

    public void start(ComponentContext componentContext) {
        super.start(componentContext);
        ClusterService clusterService = (ClusterService) Framework.getService(ClusterService.class);
        if (clusterService.isEnabled()) {
            this.nodeId = clusterService.getNodeId();
            this.invalidator = new MigrationInvalidator();
            this.invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, this.nodeId);
            log.info("Registered migration invalidator for node: {}", this.nodeId);
        } else {
            log.info("Not registering a migration invalidator because clustering is not enabled");
        }
        this.executor = new MigrationThreadPoolExecutor();
        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { // from class: org.nuxeo.runtime.migration.MigrationServiceImpl.1
            public void beforeStop(ComponentManager componentManager, boolean z) {
                MigrationServiceImpl.this.executor.requestShutdown();
            }

            public void afterStop(ComponentManager componentManager, boolean z) {
                Framework.getRuntime().getComponentManager().removeListener(this);
            }
        });
    }

    public void stop(ComponentContext componentContext) throws InterruptedException {
        if (this.invalidator != null) {
            this.invalidator.close();
            this.invalidator = null;
        }
        this.executor.shutdownNow();
        this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        this.executor = null;
        super.stop(componentContext);
    }

    @Override // org.nuxeo.runtime.migration.MigrationService
    public MigrationService.MigrationStatus getStatus(String str) {
        MigrationDescriptor migrationDescriptor = (MigrationDescriptor) getDescriptor(XP_CONFIG, str);
        if (migrationDescriptor == null) {
            return null;
        }
        KeyValueStore keyValueStore = getKeyValueStore();
        String string = keyValueStore.getString(str);
        if (string != null) {
            return new MigrationService.MigrationStatus(string);
        }
        String string2 = keyValueStore.getString(str + ":step");
        if (string2 == null) {
            return new MigrationService.MigrationStatus(migrationDescriptor.defaultState);
        }
        long parseLong = Long.parseLong(keyValueStore.getString(str + ":starttime"));
        long parseLong2 = Long.parseLong(keyValueStore.getString(str + ":pingtime"));
        String string3 = keyValueStore.getString(str + ":message");
        long parseLong3 = Long.parseLong(keyValueStore.getString(str + ":num"));
        long parseLong4 = Long.parseLong(keyValueStore.getString(str + ":total"));
        if (string3 == null) {
            string3 = "";
        }
        return new MigrationService.MigrationStatus(string2, parseLong, parseLong2, string3, parseLong3, parseLong4);
    }

    @Override // org.nuxeo.runtime.migration.MigrationService
    public String probeAndSetState(String str) {
        MigrationService.Migrator migrator = getMigrator(str);
        String probeState = migrator.probeState();
        if (probeState != null) {
            setState(str, probeState, migrator, new ProgressReporter(str));
        }
        return probeState;
    }

    protected void setState(String str, String str2, MigrationService.Migrator migrator, ProgressReporter progressReporter) {
        atomic(str, keyValueStore -> {
            String string = keyValueStore.getString(str);
            String string2 = keyValueStore.getString(str + ":step");
            if (string == null && string2 != null) {
                throw new IllegalArgumentException("Migration: " + str + " already running step: " + string2);
            }
            setState(str, str2, progressReporter, keyValueStore);
        });
        migrator.notifyStatusChange();
    }

    protected void setState(String str, String str2, ProgressReporter progressReporter, KeyValueStore keyValueStore) {
        keyValueStore.put(str, str2);
        keyValueStore.put(str + ":step", (String) null);
        keyValueStore.put(str + ":starttime", (String) null);
        progressReporter.reportProgress(null, -2L, -2L, false);
    }

    @Override // org.nuxeo.runtime.migration.MigrationService
    public void runStep(String str, String str2) {
        MigrationService.Migrator migrator = getMigrator(str);
        MigrationDescriptor migrationDescriptor = (MigrationDescriptor) getDescriptor(XP_CONFIG, str);
        MigrationDescriptor.MigrationStepDescriptor migrationStepDescriptor = migrationDescriptor.steps.get(str2);
        if (migrationStepDescriptor == null) {
            throw new IllegalArgumentException("Unknown step: " + str2 + " for migration: " + str);
        }
        ProgressReporter progressReporter = new ProgressReporter(str);
        atomic(str, keyValueStore -> {
            String string = keyValueStore.getString(str);
            String string2 = keyValueStore.getString(str + ":step");
            if (string == null && string2 == null) {
                string = migrationDescriptor.defaultState;
                if (!migrationDescriptor.states.containsKey(string)) {
                    throw new IllegalArgumentException("Invalid default state: " + string + " for migration: " + str);
                }
            } else if (string == null) {
                throw new IllegalArgumentException("Migration: " + str + " already running step: " + string2);
            }
            if (!migrationDescriptor.states.containsKey(string)) {
                throw new IllegalArgumentException("Invalid current state: " + string + " for migration: " + str);
            }
            if (!migrationStepDescriptor.fromState.equals(string)) {
                throw new IllegalArgumentException("Invalid step: " + str2 + " for migration: " + str + " in state: " + string);
            }
            String valueOf = String.valueOf(System.currentTimeMillis());
            keyValueStore.put(str + ":step", str2);
            keyValueStore.put(str + ":starttime", valueOf);
            progressReporter.reportProgress("", 0L, -1L, true);
            keyValueStore.put(str, (String) null);
        });
        migrator.notifyStatusChange();
        this.executor.execute(new MigratorWithContext(migrationContext -> {
            Thread.currentThread().setName("Nuxeo-Migrator-" + str);
            migrator.run(str2, migrationContext);
        }, progressReporter, (migrationContext2, th) -> {
            if (th != null) {
                log.error("Exception during execution of step: {} for migration: {}", str2, str, th);
            }
            String str3 = (th != null || migrationContext2.isShutdownRequested()) ? migrationStepDescriptor.fromState : migrationStepDescriptor.toState;
            atomic(str, keyValueStore2 -> {
                setState(str, str3, progressReporter, keyValueStore2);
            });
            migrator.notifyStatusChange();
        }));
    }

    protected MigrationService.Migrator getMigrator(String str) {
        MigrationDescriptor migrationDescriptor = (MigrationDescriptor) getDescriptor(XP_CONFIG, str);
        if (migrationDescriptor == null) {
            throw new IllegalArgumentException("Unknown migration: " + str);
        }
        Class<?> cls = migrationDescriptor.klass;
        if (!MigrationService.Migrator.class.isAssignableFrom(cls)) {
            throw new RuntimeException("Invalid class not implementing Migrator: " + cls.getName() + " for migration: " + str);
        }
        try {
            MigrationService.Migrator migrator = (MigrationService.Migrator) cls.getConstructor(new Class[0]).newInstance(new Object[0]);
            if (this.invalidator != null) {
                migrator = new InvalidatorMigrator(str, migrator, this.invalidator);
            }
            return migrator;
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException(e);
        }
    }

    protected void atomic(String str, Consumer<KeyValueStore> consumer) {
        KeyValueStore keyValueStore = getKeyValueStore();
        for (int i = 0; i < 5; i++) {
            if (keyValueStore.compareAndSet(str + ":lock", (String) null, Instant.now() + " node=" + this.nodeId, 10L)) {
                try {
                    consumer.accept(keyValueStore);
                    keyValueStore.put(str + ":lock", (String) null);
                    return;
                } catch (Throwable th) {
                    keyValueStore.put(str + ":lock", (String) null);
                    throw th;
                }
            }
            try {
                Thread.sleep(RANDOM.nextInt(100) * i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        throw new RuntimeException("Cannot lock for write migration: " + str + ", already locked: " + keyValueStore.getString(str + ":lock"));
    }
}
