package org.nuxeo.ecm.core.work;

import com.codahale.metrics.MetricRegistry;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.ecm.core.work.api.WorkManager;
import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.Descriptor;
import org.nuxeo.runtime.services.config.ConfigurationService;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/core/work/StreamWorkManager.class */
public class StreamWorkManager extends WorkManagerImpl {
    protected static final Log log = LogFactory.getLog(StreamWorkManager.class);
    public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config";
    public static final String DEFAULT_WORK_LOG_CONFIG = "work";
    public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec";
    public static final String DEFAULT_WORK_CODEC = "legacy";
    public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor";
    public static final String DEFAULT_WORK_OVER_PROVISIONING = "3";
    public static final int DEFAULT_CONCURRENCY = 4;
    public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds";
    public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled";
    public static final String STATETTL_DEFAULT_VALUE = "3600";
    protected Topology topology;
    protected Settings settings;
    protected StreamProcessor streamProcessor;
    protected LogManager logManager;
    protected final Set<String> streamIds = new HashSet();
    protected boolean storeState;
    protected long stateTTL;

    /* renamed from: org.nuxeo.ecm.core.work.StreamWorkManager$1, reason: invalid class name */
    /* loaded from: input_file:org/nuxeo/ecm/core/work/StreamWorkManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$nuxeo$ecm$core$work$api$Work$State = new int[Work.State.values().length];

        static {
            try {
                $SwitchMap$org$nuxeo$ecm$core$work$api$Work$State[Work.State.SCHEDULED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$nuxeo$ecm$core$work$api$Work$State[Work.State.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/nuxeo/ecm/core/work/StreamWorkManager$ComponentListener.class */
    class ComponentListener implements ComponentManager.Listener {
        ComponentListener() {
        }

        public void beforeStop(ComponentManager componentManager, boolean z) {
            if (StreamWorkManager.this.shutdown(10L, TimeUnit.SECONDS)) {
                return;
            }
            StreamWorkManager.log.error("Some processors are still active");
        }

        public void afterStart(ComponentManager componentManager, boolean z) {
            StreamWorkManager.this.streamProcessor.start();
            Iterator it = StreamWorkManager.this.getDescriptors("queues").iterator();
            while (it.hasNext()) {
                StreamWorkManager.this.activateQueueMetrics(((Descriptor) it.next()).getId());
            }
        }

        public void afterStop(ComponentManager componentManager, boolean z) {
            Framework.getRuntime().getComponentManager().removeListener(this);
            Iterator it = StreamWorkManager.this.getDescriptors("queues").iterator();
            while (it.hasNext()) {
                StreamWorkManager.this.deactivateQueueMetrics(((Descriptor) it.next()).getId());
            }
        }
    }

    /* loaded from: input_file:org/nuxeo/ecm/core/work/StreamWorkManager$WorkScheduling.class */
    public class WorkScheduling implements Synchronization {
        public final Work work;
        public final WorkManager.Scheduling scheduling;

        public WorkScheduling(Work work, WorkManager.Scheduling scheduling) {
            this.work = work;
            this.scheduling = scheduling;
        }

        public void beforeCompletion() {
        }

        public void afterCompletion(int i) {
            if (i == 3) {
                StreamWorkManager.this.schedule(this.work, this.scheduling, false);
            } else if (i != 4) {
                throw new IllegalArgumentException("Unsupported transaction status " + i);
            }
        }
    }

    protected int getOverProvisioningFactor() {
        if (getLogManager().supportSubscribe()) {
            return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING));
        }
        return 1;
    }

    protected String getCodecName() {
        return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC);
    }

    protected Codec<Record> getCodec() {
        return ((CodecService) Framework.getService(CodecService.class)).getCodec(getCodecName(), Record.class);
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public void schedule(Work work, WorkManager.Scheduling scheduling, boolean z) {
        String streamForCategory = getStreamForCategory(work.getCategory());
        if (log.isDebugEnabled()) {
            log.debug(String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", work.getId(), work.getCategory(), streamForCategory, scheduling, Boolean.valueOf(z), work));
        }
        if (!isQueuingEnabled(streamForCategory)) {
            log.info("Queue disabled, scheduling canceled: " + streamForCategory);
            return;
        }
        if (WorkManager.Scheduling.CANCEL_SCHEDULED.equals(scheduling)) {
            if (!this.storeState) {
                log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s", STORESTATE_KEY, work));
                return;
            } else {
                if (WorkStateHelper.getState(work.getId()) != null) {
                    WorkStateHelper.setCanceled(work.getId());
                    return;
                }
                return;
            }
        }
        if (z && scheduleAfterCommit(work, scheduling)) {
            return;
        }
        WorkSchedulePath.newInstance(work);
        LogAppender appender = this.logManager.getAppender(getStreamForCategory(work.getCategory()));
        if (appender == null) {
            log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), getStreamForCategory(work.getCategory())));
            return;
        }
        String partitionKey = work.getPartitionKey();
        appender.append(partitionKey, Record.of(partitionKey, WorkComputation.serialize(work)));
        if (this.storeState) {
            WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, this.stateTTL);
        }
    }

    protected String getStreamForCategory(String str) {
        return (str == null || !this.streamIds.contains(str)) ? "default" : str;
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    public int getApplicationStartedOrder() {
        return -502;
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    public void start(ComponentContext componentContext) {
        super.start(componentContext);
        ConfigurationService configurationService = (ConfigurationService) Framework.getService(ConfigurationService.class);
        this.storeState = configurationService.isBooleanPropertyTrue(STORESTATE_KEY);
        this.stateTTL = Long.parseLong(configurationService.getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE));
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public void init() {
        if (this.started) {
            return;
        }
        ((WorkManagerImpl) Framework.getRuntime().getComponent(WorkManagerImpl.NAME)).active = false;
        log.debug("Initializing");
        synchronized (this) {
            if (this.started) {
                return;
            }
            this.streamIds.addAll((Collection) getDescriptors("queues").stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList()));
            index();
            initTopology();
            this.logManager = getLogManager();
            this.streamProcessor = new LogStreamProcessor(this.logManager);
            this.streamProcessor.init(this.topology, this.settings);
            this.started = true;
            new ComponentListener().install();
            log.info("Initialized");
        }
    }

    protected LogManager getLogManager() {
        log.info("Init StreamWorkManager with Log configuration: " + getLogConfig());
        return ((StreamService) Framework.getService(StreamService.class)).getLogManager(getLogConfig());
    }

    protected String getLogConfig() {
        return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG);
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public boolean isProcessingEnabled(String str) {
        WorkQueueDescriptor workQueueDescriptor = getWorkQueueDescriptor(str);
        return workQueueDescriptor != null && workQueueDescriptor.isProcessingEnabled();
    }

    protected void initTopology() {
        Topology.Builder builder = Topology.builder();
        List descriptors = getDescriptors("queues");
        descriptors.stream().filter((v0) -> {
            return v0.isProcessingEnabled();
        }).forEach(workQueueDescriptor -> {
            builder.addComputation(() -> {
                return new WorkComputation(workQueueDescriptor.getId());
            }, Collections.singletonList("i1:" + workQueueDescriptor.getId()));
        });
        this.topology = builder.build();
        this.settings = new Settings(4, getPartitions(4), getCodec());
        descriptors.forEach(workQueueDescriptor2 -> {
            this.settings.setConcurrency(workQueueDescriptor2.getId(), workQueueDescriptor2.getMaxThreads());
        });
        descriptors.forEach(workQueueDescriptor3 -> {
            this.settings.setPartitions(workQueueDescriptor3.getId(), getPartitions(workQueueDescriptor3.getMaxThreads()));
        });
    }

    protected int getPartitions(int i) {
        if (i == 1) {
            return 1;
        }
        return getOverProvisioningFactor() * i;
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    void activateQueue(WorkQueueDescriptor workQueueDescriptor) {
        if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) {
            throw new IllegalArgumentException("cannot activate all queues");
        }
        log.info("Activated queue " + workQueueDescriptor.id + " " + workQueueDescriptor.toString());
        if (workQueueDescriptor.isProcessingEnabled()) {
            activateQueueMetrics(workQueueDescriptor.id);
        }
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    void deactivateQueue(WorkQueueDescriptor workQueueDescriptor) {
        if (WorkQueueDescriptor.ALL_QUEUES.equals(workQueueDescriptor.id)) {
            throw new IllegalArgumentException("cannot deactivate all queues");
        }
        if (workQueueDescriptor.isProcessingEnabled()) {
            deactivateQueueMetrics(workQueueDescriptor.id);
        }
        log.info("Deactivated work queue not supported: " + workQueueDescriptor.id);
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    protected void activateQueueMetrics(String str) {
        NuxeoMetricSet nuxeoMetricSet = new NuxeoMetricSet("nuxeo", new String[]{"works", "total", str});
        nuxeoMetricSet.putGauge(() -> {
            return getMetricsWithNuxeoClassLoader(str).scheduled;
        }, "scheduled", new String[0]);
        nuxeoMetricSet.putGauge(() -> {
            return getMetricsWithNuxeoClassLoader(str).running;
        }, "running", new String[0]);
        nuxeoMetricSet.putGauge(() -> {
            return getMetricsWithNuxeoClassLoader(str).completed;
        }, "completed", new String[0]);
        nuxeoMetricSet.putGauge(() -> {
            return getMetricsWithNuxeoClassLoader(str).canceled;
        }, "canceled", new String[0]);
        this.registry.registerAll(nuxeoMetricSet);
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    protected void deactivateQueueMetrics(String str) {
        String name = MetricRegistry.name("nuxeo", new String[]{"works", "total", str});
        this.registry.removeMatching((str2, metric) -> {
            return str2.startsWith(name);
        });
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public boolean shutdownQueue(String str, long j, TimeUnit timeUnit) {
        log.warn("Shutdown a queue is not supported with computation implementation");
        return false;
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public boolean shutdown(long j, TimeUnit timeUnit) {
        log.info("Shutdown WorkManager in " + timeUnit.toMillis(j) + " ms");
        this.shutdownInProgress = true;
        try {
            boolean stop = this.streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(j), Long.parseLong(((ConfigurationService) Framework.getService(ConfigurationService.class)).getProperty(WorkManagerImpl.SHUTDOWN_DELAY_MS_KEY, "0")))));
            if (!stop) {
                log.error("Not able to stop worker pool within the timeout.");
            }
            return stop;
        } finally {
            this.shutdownInProgress = false;
        }
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public int getQueueSize(String str, Work.State state) {
        switch (AnonymousClass1.$SwitchMap$org$nuxeo$ecm$core$work$api$Work$State[state.ordinal()]) {
            case Event.FLAG_CANCEL /* 1 */:
                return getMetrics(str).getScheduled().intValue();
            case Event.FLAG_ROLLBACK /* 2 */:
                return getMetrics(str).getRunning().intValue();
            default:
                return 0;
        }
    }

    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String str) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
            WorkQueueMetrics metrics = getMetrics(str);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return metrics;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public WorkQueueMetrics getMetrics(String str) {
        LogLag lag = this.logManager.getLag(str, str);
        long j = 0;
        if (lag.lag() > 0) {
            j = Math.min(lag.lag(), this.settings.getPartitions(str));
        }
        return new WorkQueueMetrics(str, Long.valueOf(lag.lag()), Long.valueOf(j), Long.valueOf(lag.lower()), 0);
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public boolean awaitCompletion(String str, long j, TimeUnit timeUnit) throws InterruptedException {
        if (str != null) {
            return awaitCompletionOnQueue(str, j, timeUnit);
        }
        Iterator it = getDescriptors("queues").iterator();
        while (it.hasNext()) {
            if (!awaitCompletionOnQueue(((Descriptor) it.next()).getId(), j, timeUnit)) {
                return false;
            }
        }
        return true;
    }

    protected boolean awaitCompletionOnQueue(String str, long j, TimeUnit timeUnit) throws InterruptedException {
        if (!isStarted()) {
            return true;
        }
        log.debug("awaitCompletion " + str + " starting");
        long min = Math.min(timeUnit.toMillis(j), TimeUnit.DAYS.toMillis(1L));
        long currentTimeMillis = System.currentTimeMillis() + min;
        while (System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(100L);
            if (getMetrics(str).getScheduled().intValue() == 0) {
                if (!log.isDebugEnabled()) {
                    return true;
                }
                log.debug("awaitCompletion for " + str + " completed " + getMetrics(str));
                return true;
            }
            if (!log.isDebugEnabled()) {
                log.debug("awaitCompletion for " + str + " not completed " + getMetrics(str));
            }
        }
        log.warn(String.format("%s timeout after: %.2fs, %s", str, Double.valueOf(min / 1000.0d), getMetrics(str)));
        return false;
    }

    @Deprecated
    public boolean awaitCompletionWithWaterMark(String str, long j, TimeUnit timeUnit) throws InterruptedException {
        if (!isStarted()) {
            return true;
        }
        long min = Math.min(timeUnit.toMillis(j), TimeUnit.DAYS.toMillis(1L));
        long currentTimeMillis = System.currentTimeMillis() + min;
        long lowWaterMark = getLowWaterMark(str);
        while (true) {
            long j2 = lowWaterMark;
            if (System.currentTimeMillis() >= currentTimeMillis) {
                log.warn(String.format("%s timeout after: %.2fs", str, Double.valueOf(min / 1000.0d)));
                return false;
            }
            Thread.sleep(100L);
            long lowWaterMark2 = getLowWaterMark(str);
            if (lowWaterMark2 == j2) {
                log.debug("awaitCompletion for " + (str == null ? "all" : str) + " completed " + lowWaterMark2);
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug("awaitCompletion low wm  for " + (str == null ? "all" : str) + ":" + lowWaterMark2 + " diff: " + (lowWaterMark2 - j2));
            }
            lowWaterMark = lowWaterMark2;
        }
    }

    protected long getLowWaterMark(String str) {
        return str != null ? this.streamProcessor.getLowWatermark(str) : this.streamProcessor.getLowWatermark();
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public Work.State getWorkState(String str) {
        if (this.storeState) {
            return WorkStateHelper.getState(str);
        }
        return null;
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public Work find(String str, Work.State state) {
        return null;
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public List<Work> listWork(String str, Work.State state) {
        return Collections.emptyList();
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public List<String> listWorkIds(String str, Work.State state) {
        return Collections.emptyList();
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl
    protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling) {
        TransactionManager transactionManager;
        try {
            transactionManager = TransactionHelper.lookupTransactionManager();
        } catch (NamingException e) {
            transactionManager = null;
        }
        if (transactionManager == null) {
            log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId());
            return false;
        }
        try {
            Transaction transaction = transactionManager.getTransaction();
            if (transaction == null) {
                if (!log.isDebugEnabled()) {
                    return false;
                }
                log.debug("Not scheduled work after commit because of missing transaction: " + work.getId());
                return false;
            }
            int status = transaction.getStatus();
            if (status == 0) {
                if (log.isDebugEnabled()) {
                    log.debug("Scheduled after commit: " + work.getId());
                }
                transaction.registerSynchronization(new WorkScheduling(work, scheduling));
                return true;
            }
            if (status == 3) {
                if (!log.isDebugEnabled()) {
                    return false;
                }
                log.debug("Scheduled immediately: " + work.getId());
                return false;
            }
            if (status == 1) {
                if (!log.isDebugEnabled()) {
                    return true;
                }
                log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId());
                return true;
            }
            if (!log.isDebugEnabled()) {
                return false;
            }
            log.debug("Not scheduling work after commit because transaction is in status " + status + ": " + work.getId());
            return false;
        } catch (SystemException | RollbackException e2) {
            log.error("Cannot schedule after commit", e2);
            return false;
        }
    }

    @Override // org.nuxeo.ecm.core.work.WorkManagerImpl, org.nuxeo.ecm.core.work.api.WorkManager
    public boolean supportsProcessingDisabling() {
        return true;
    }
}
