package org.nuxeo.ecm.automation.core.operations.services.workmanager;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import net.jodah.failsafe.RetryPolicy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.ecm.automation.core.annotations.Operation;
import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
import org.nuxeo.ecm.automation.core.annotations.Param;
import org.nuxeo.ecm.core.api.Blob;
import org.nuxeo.ecm.core.api.Blobs;
import org.nuxeo.ecm.core.work.WorkComputation;
import org.nuxeo.ecm.core.work.WorkHolder;
import org.nuxeo.ecm.core.work.WorkManagerImpl;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.stream.StreamService;

@Operation(id = WorkManagerRunWorkInFailure.ID, category = "Services", label = "Executes Works stored in the dead letter queue", addToStudio = false, description = "Try to execute again Works that have been send to a dead letter queue by the WorkManager after failure")
/* loaded from: input_file:org/nuxeo/ecm/automation/core/operations/services/workmanager/WorkManagerRunWorkInFailure.class */
public class WorkManagerRunWorkInFailure {
    private static final Logger log = LogManager.getLogger(WorkManagerRunWorkInFailure.class);
    public static final String ID = "WorkManager.RunWorkInFailure";
    protected static final long DEFAULT_TIMEOUT_SECONDS = 120;
    protected static final long ASSIGNMENT_TIMEOUT_SECONDS = 60;
    protected volatile long countTotal;
    protected volatile long countSuccess;

    @Param(name = "timeoutSeconds", required = false)
    protected long timeout = DEFAULT_TIMEOUT_SECONDS;

    /* loaded from: input_file:org/nuxeo/ecm/automation/core/operations/services/workmanager/WorkManagerRunWorkInFailure$WorkFailureComputation.class */
    protected class WorkFailureComputation extends AbstractComputation {
        private static final String NAME = "WorkFailure";

        public WorkFailureComputation() {
            super(NAME, 1, 0);
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            computationContext.askForCheckpoint();
            Work deserialize = WorkComputation.deserialize(record.getData());
            WorkManagerRunWorkInFailure.log.info("Trying to run Work from DLQ: " + deserialize.getCategory() + ":" + deserialize.getId());
            try {
                deserialize.setWorkInstanceState(Work.State.UNKNOWN);
                new WorkHolder(deserialize).run();
                cleanup(deserialize, null);
                WorkManagerRunWorkInFailure.log.info(deserialize.getId() + ": Success.");
                WorkManagerRunWorkInFailure.this.countSuccess++;
            } catch (Exception e) {
                cleanup(deserialize, e);
                WorkManagerRunWorkInFailure.log.error(deserialize.getId() + ": Failure, skipping.", e);
            }
            WorkManagerRunWorkInFailure.this.countTotal++;
        }

        protected void cleanup(Work work, Exception exc) {
            try {
                work.cleanUp(true, exc);
            } catch (Exception e) {
                WorkManagerRunWorkInFailure.log.error(work.getId() + ": Failure on cleanup", e);
            }
        }
    }

    @OperationMethod
    public Blob run() throws IOException, InterruptedException, TimeoutException {
        StreamProcessor registerAndCreateProcessor = ((StreamService) Framework.getService(StreamService.class)).getStreamManager("default").registerAndCreateProcessor("RunWorkInFailure", getTopology(), new Settings(1, 1, WorkManagerImpl.DEAD_LETTER_QUEUE_CODEC, getComputationPolicy()));
        try {
            this.countTotal = 0L;
            this.countSuccess = 0L;
            registerAndCreateProcessor.start();
            registerAndCreateProcessor.waitForAssignments(Duration.ofSeconds(ASSIGNMENT_TIMEOUT_SECONDS));
            if (registerAndCreateProcessor.drainAndStop(getTimeout())) {
                return buildResult();
            }
            throw new TimeoutException();
        } finally {
            registerAndCreateProcessor.shutdown();
        }
    }

    private Blob buildResult() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("total", Long.valueOf(this.countTotal));
        hashMap.put("success", Long.valueOf(this.countSuccess));
        return Blobs.createJSONBlobFromValue(hashMap);
    }

    protected Duration getTimeout() {
        return Duration.ofSeconds(this.timeout);
    }

    protected ComputationPolicy getComputationPolicy() {
        return new ComputationPolicyBuilder().retryPolicy(new RetryPolicy(ComputationPolicy.NO_RETRY)).continueOnFailure(true).build();
    }

    protected Topology getTopology() {
        return Topology.builder().addComputation(() -> {
            return new WorkFailureComputation();
        }, Collections.singletonList("i1:dlq-work")).build();
    }
}
