package org.nuxeo.ecm.platform.importer.mqueues.workmanager;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.work.WorkManagerImpl;
import org.nuxeo.ecm.core.work.WorkQueueRegistry;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.ecm.core.work.api.WorkManager;
import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology;
import org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue.MQComputationManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/mqueues/workmanager/WorkManagerComputation.class */
public abstract class WorkManagerComputation extends WorkManagerImpl {
    protected static final Log log = LogFactory.getLog(WorkManagerComputation.class);
    protected static final int DEFAULT_CONCURRENCY = 4;
    protected Topology topology;
    protected Settings settings;
    protected MQComputationManager manager;
    protected MQManager<Record> mqManager;
    protected final Set<String> streamIds = new HashSet();

    /* loaded from: input_file:org/nuxeo/ecm/platform/importer/mqueues/workmanager/WorkManagerComputation$WorkScheduling.class */
    public class WorkScheduling implements Synchronization {
        public final Work work;
        public final WorkManager.Scheduling scheduling;

        public WorkScheduling(Work work, WorkManager.Scheduling scheduling) {
            this.work = work;
            this.scheduling = scheduling;
        }

        public void beforeCompletion() {
        }

        public void afterCompletion(int i) {
            if (i == 3) {
                WorkManagerComputation.this.schedule(this.work, this.scheduling, false);
            } else if (i != WorkManagerComputation.DEFAULT_CONCURRENCY) {
                throw new IllegalArgumentException("Unsupported transaction status " + i);
            }
        }
    }

    protected abstract MQManager<Record> initStream();

    protected abstract int getOverProvisioningFactor();

    public void schedule(Work work, WorkManager.Scheduling scheduling, boolean z) {
        String streamForCategory = getStreamForCategory(work.getCategory());
        if (log.isDebugEnabled()) {
            log.debug(String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", work.getId(), work.getCategory(), streamForCategory, scheduling, Boolean.valueOf(z), work));
        }
        if (!isQueuingEnabled(streamForCategory)) {
            log.info("Queue disabled, scheduling canceled: " + streamForCategory);
            return;
        }
        if (z && scheduleAfterCommit(work, scheduling)) {
            return;
        }
        WorkSchedulePath.newInstance(work);
        String id = work.getId();
        MQAppender appender = this.mqManager.getAppender(getStreamForCategory(work.getCategory()));
        if (appender == null) {
            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), getStreamForCategory(work.getCategory())));
        } else {
            appender.append(id, Record.of(id, ComputationWork.serialize(work)));
        }
    }

    public String getStreamForCategory(String str) {
        return (str == null || !this.streamIds.contains(str)) ? WorkManagerComputationKafka.DEFAULT_CONFIG : str;
    }

    public int getApplicationStartedOrder() {
        return -502;
    }

    public void start(ComponentContext componentContext) {
        init();
    }

    public void init() {
        if (this.started) {
            return;
        }
        log.debug("Initializing");
        synchronized (this) {
            if (this.started) {
                return;
            }
            supplantWorkManagerImpl();
            initTopology();
            this.mqManager = initStream();
            startComputation();
            this.started = true;
            log.info("Initialized");
        }
    }

    protected void supplantWorkManagerImpl() {
        WorkManagerImpl workManagerImpl = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
        try {
            Field declaredField = WorkManagerImpl.class.getDeclaredField("workQueueConfig");
            declaredField.setAccessible(true);
            try {
                WorkQueueRegistry workQueueRegistry = (WorkQueueRegistry) declaredField.get(workManagerImpl);
                log.debug("Remove contributions from WorkManagerImpl");
                declaredField.set(workManagerImpl, new WorkQueueRegistry());
                workQueueRegistry.getQueueIds().forEach(str -> {
                    this.workQueueConfig.addContribution(workQueueRegistry.get(str));
                });
                this.streamIds.addAll(this.workQueueConfig.getQueueIds());
                this.workQueueConfig.getQueueIds().forEach(str2 -> {
                    log.info("Registering : " + str2);
                });
            } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        } catch (NoSuchFieldException e2) {
            throw new RuntimeException(e2);
        }
    }

    protected void startComputation() {
        this.manager = new MQComputationManager(this.mqManager, this.topology, this.settings);
        this.manager.start();
    }

    protected void initTopology() {
        Topology.Builder builder = Topology.builder();
        this.workQueueConfig.getQueueIds().forEach(str -> {
            builder.addComputation(() -> {
                return new ComputationWork(str);
            }, Collections.singletonList("i1:" + str));
        });
        this.topology = builder.build();
        this.settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY));
        this.workQueueConfig.getQueueIds().forEach(str2 -> {
            this.settings.setConcurrency(str2, this.workQueueConfig.get(str2).getMaxThreads());
        });
        this.workQueueConfig.getQueueIds().forEach(str3 -> {
            this.settings.setPartitions(str3, getPartitions(this.workQueueConfig.get(str3).getMaxThreads()));
        });
    }

    protected int getPartitions(int i) {
        if (i == 1) {
            return 1;
        }
        return getOverProvisioningFactor() * i;
    }

    public boolean shutdownQueue(String str, long j, TimeUnit timeUnit) throws InterruptedException {
        log.info("Shutdown WorkManager stream: " + str);
        return false;
    }

    public boolean shutdown(long j, TimeUnit timeUnit) throws InterruptedException {
        log.info("Shutdown WorkManager in " + timeUnit.toMillis(j) + " ms");
        boolean stop = this.manager.stop(Duration.ofMillis(timeUnit.toMillis(j)));
        try {
            this.mqManager.close();
        } catch (Exception e) {
            log.error("Error while closing WorkManager mqManager", e);
        }
        return stop;
    }

    public int getQueueSize(String str, Work.State state) {
        return 0;
    }

    public WorkQueueMetrics getMetrics(String str) {
        return new WorkQueueMetrics(str, 0, 0, 0, 0);
    }

    public boolean awaitCompletion(String str, long j, TimeUnit timeUnit) throws InterruptedException {
        if (!isStarted()) {
            return true;
        }
        long min = Math.min(timeUnit.toMillis(j), TimeUnit.DAYS.toMillis(1L));
        long currentTimeMillis = System.currentTimeMillis() + min;
        long lowWaterMark = getLowWaterMark(str);
        while (true) {
            long j2 = lowWaterMark;
            if (System.currentTimeMillis() >= currentTimeMillis) {
                log.warn(String.format("%s timeout after: %.2fs", str, Double.valueOf(min / 1000.0d)));
                return false;
            }
            Thread.sleep(100L);
            long lowWaterMark2 = getLowWaterMark(str);
            if (lowWaterMark2 == j2) {
                log.debug("awaitCompletion for " + (str == null ? "all" : str) + " completed " + lowWaterMark2);
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug("awaitCompletion low wm  for " + (str == null ? "all" : str) + ":" + lowWaterMark2 + " diff: " + (lowWaterMark2 - j2));
            }
            lowWaterMark = lowWaterMark2;
        }
    }

    private long getLowWaterMark(String str) {
        return str != null ? this.manager.getLowWatermark(str) : this.manager.getLowWatermark();
    }

    public Work.State getWorkState(String str) {
        return null;
    }

    public Work find(String str, Work.State state) {
        return null;
    }

    public List<Work> listWork(String str, Work.State state) {
        return Collections.emptyList();
    }

    public List<String> listWorkIds(String str, Work.State state) {
        return Collections.emptyList();
    }

    protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling) {
        TransactionManager transactionManager;
        try {
            transactionManager = TransactionHelper.lookupTransactionManager();
        } catch (NamingException e) {
            transactionManager = null;
        }
        if (transactionManager == null) {
            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
            return false;
        }
        try {
            Transaction transaction = transactionManager.getTransaction();
            if (transaction == null) {
                if (!log.isDebugEnabled()) {
                    return false;
                }
                log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
                return false;
            }
            int status = transaction.getStatus();
            if (status == 0) {
                if (log.isDebugEnabled()) {
                    log.debug("Scheduled after commit: " + work.getId());
                }
                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
                return true;
            }
            if (status == 3) {
                if (!log.isDebugEnabled()) {
                    return false;
                }
                log.debug("Scheduled immediately: " + work.getId());
                return false;
            }
            if (status == 1) {
                if (!log.isDebugEnabled()) {
                    return true;
                }
                log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
                return true;
            }
            if (!log.isDebugEnabled()) {
                return false;
            }
            log.debug("Not scheduling work after commit because transaction is in status " + status + ": " + work.getId());
            return false;
        } catch (SystemException | RollbackException e2) {
            log.error("Cannot schedule after commit", e2);
            return false;
        }
    }
}
