package org.nuxeo.ecm.core.work;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.common.utils.ExceptionUtils;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.metrics.MetricsService;
import org.nuxeo.runtime.services.config.ConfigurationService;

/* loaded from: input_file:org/nuxeo/ecm/core/work/WorkComputation.class */
public class WorkComputation extends AbstractComputation {
    private static final Log log = LogFactory.getLog(WorkComputation.class);
    protected static final int IDS_SIZE = 50;
    protected final CircularFifoBuffer workIds;
    protected final Timer workTimer;
    protected final long stateTTL;
    protected Work work;

    public WorkComputation(String str) {
        super(str, 1, 0);
        this.workIds = new CircularFifoBuffer(IDS_SIZE);
        this.workTimer = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()).timer(MetricRegistry.name("nuxeo", new String[]{"works", str, "total"}));
        this.stateTTL = Long.parseLong(((ConfigurationService) Framework.getService(ConfigurationService.class)).getProperty(StreamWorkManager.STATETTL_KEY, StreamWorkManager.STATETTL_DEFAULT_VALUE));
    }

    public void signalStop() {
        if (this.work != null) {
            this.work.setWorkInstanceSuspending();
        }
    }

    public void processRecord(ComputationContext computationContext, String str, Record record) {
        this.work = deserialize(record.getData());
        try {
            try {
                if (this.work.isIdempotent() && this.workIds.contains(this.work.getId())) {
                    log.warn("Duplicate work id: " + this.work.getId() + " skipping");
                } else {
                    boolean parseBoolean = Boolean.parseBoolean(((ConfigurationService) Framework.getService(ConfigurationService.class)).getProperty(StreamWorkManager.STORESTATE_KEY));
                    if (parseBoolean) {
                        if (WorkStateHelper.getState(this.work.getId()) != Work.State.SCHEDULED) {
                            log.warn("work has been canceled, saving and returning");
                            computationContext.askForCheckpoint();
                            this.workTimer.update(this.work.getCompletionTime() - this.work.getStartTime(), TimeUnit.MILLISECONDS);
                            this.work = null;
                            return;
                        }
                        WorkStateHelper.setState(this.work.getId(), Work.State.RUNNING, this.stateTTL);
                    }
                    new WorkHolder(this.work).run();
                    if (parseBoolean) {
                        WorkStateHelper.setState(this.work.getId(), null, this.stateTTL);
                    }
                    this.workIds.add(this.work.getId());
                }
                this.work.cleanUp(true, null);
                if (!this.work.isWorkInstanceSuspended()) {
                    computationContext.askForCheckpoint();
                }
                this.workTimer.update(this.work.getCompletionTime() - this.work.getStartTime(), TimeUnit.MILLISECONDS);
                this.work = null;
            } catch (Exception e) {
                if (ExceptionUtils.hasInterruptedCause(e)) {
                    Thread.currentThread().interrupt();
                    log.warn(String.format("Work id: %s title: %s, has been interrupted, it will be rescheduled, record: %s", this.work.getId(), this.work.getTitle(), record));
                } else {
                    log.error(String.format("Work id: %s title: %s is in error, the work is skipped, record: %s", this.work.getId(), this.work.getTitle(), record));
                    computationContext.askForCheckpoint();
                }
                log.debug("Exception during work " + this.work.getId(), e);
                this.work.cleanUp(false, e);
                this.workTimer.update(this.work.getCompletionTime() - this.work.getStartTime(), TimeUnit.MILLISECONDS);
                this.work = null;
            }
        } catch (Throwable th) {
            this.workTimer.update(this.work.getCompletionTime() - this.work.getStartTime(), TimeUnit.MILLISECONDS);
            this.work = null;
            throw th;
        }
    }

    public static Work deserialize(byte[] bArr) {
        ObjectInputStream objectInputStream = null;
        try {
            try {
                objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bArr));
                Work work = (Work) objectInputStream.readObject();
                if (objectInputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                    }
                }
                return work;
            } catch (IOException | ClassNotFoundException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e3) {
                    throw th;
                }
            }
            throw th;
        }
    }

    public static byte[] serialize(Work work) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            try {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                objectOutputStream.writeObject(work);
                objectOutputStream.flush();
                return byteArrayOutputStream.toByteArray();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } finally {
            try {
                byteArrayOutputStream.close();
            } catch (IOException e2) {
            }
        }
    }
}
