/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.automation.core.operations.services.workmanager;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import net.jodah.failsafe.RetryPolicy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.nuxeo.ecm.automation.core.annotations.Operation;
import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
import org.nuxeo.ecm.automation.core.annotations.Param;
import org.nuxeo.ecm.automation.core.util.StringList;
import org.nuxeo.ecm.core.api.Blob;
import org.nuxeo.ecm.core.api.Blobs;
import org.nuxeo.ecm.core.work.WorkComputation;
import org.nuxeo.ecm.core.work.WorkHolder;
import org.nuxeo.ecm.core.work.WorkManagerImpl;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.stream.StreamService;

@Operation(id="WorkManager.RunWorkInFailure", category="Services", label="Executes Works stored in the dead letter queue", addToStudio=false, description="Try to execute again Works that have been send to a dead letter queue by the WorkManager after failure")
public class WorkManagerRunWorkInFailure {
    private static final Logger log = LogManager.getLogger(WorkManagerRunWorkInFailure.class);
    public static final String ID = "WorkManager.RunWorkInFailure";
    protected static final long DEFAULT_TIMEOUT_SECONDS = 3600L;
    protected static final long ASSIGNMENT_TIMEOUT_SECONDS = 60L;
    protected volatile long countTotal;
    protected volatile long countSuccess;
    protected volatile long countFiltered;
    @Param(name="timeoutSeconds", required=false)
    protected long timeout = 3600L;
    @Param(name="dryRun", required=false)
    protected boolean dryRun = false;
    @Param(name="categoryFilter", required=false)
    protected StringList categoryFilter;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @OperationMethod
    public Blob run() throws IOException, InterruptedException, TimeoutException {
        boolean timeoutDuringProcessing;
        log.warn("Reprocessing Works in DLQ: dryRun: {}, categoryFilter: {}, timeout after: {}s", (Object)this.dryRun, (Object)this.categoryFilter, (Object)this.timeout);
        StreamManager streamManager = ((StreamService)Framework.getService(StreamService.class)).getStreamManager();
        Settings settings = new Settings(1, 1, WorkManagerImpl.DEAD_LETTER_QUEUE_CODEC, this.getComputationPolicy());
        StreamProcessor processor = streamManager.registerAndCreateProcessor("RunWorkInFailure", this.getTopology(), settings);
        try {
            this.countTotal = 0L;
            this.countSuccess = 0L;
            this.countFiltered = 0L;
            processor.start();
            processor.waitForAssignments(Duration.ofSeconds(60L));
            timeoutDuringProcessing = !processor.drainAndStop(this.getTimeout());
        }
        finally {
            processor.shutdown();
        }
        Blob result = this.buildResult();
        log.warn("Reprocessing status: {}, timeout: {}", (Object)result.getString(), (Object)timeoutDuringProcessing);
        if (timeoutDuringProcessing) {
            throw new TimeoutException();
        }
        return result;
    }

    protected Blob buildResult() throws IOException {
        HashMap<String, Long> result = new HashMap<String, Long>();
        result.put("total", this.countTotal);
        result.put("success", this.countSuccess);
        result.put("filtered", this.countFiltered);
        return Blobs.createJSONBlobFromValue(result);
    }

    protected Duration getTimeout() {
        return Duration.ofSeconds(this.timeout);
    }

    protected ComputationPolicy getComputationPolicy() {
        return new ComputationPolicyBuilder().retryPolicy(new RetryPolicy(ComputationPolicy.NO_RETRY)).continueOnFailure(true).build();
    }

    protected Topology getTopology() {
        return Topology.builder().addComputation(() -> new WorkFailureComputation(), Collections.singletonList("i1:" + WorkManagerImpl.DEAD_LETTER_QUEUE.getUrn())).build();
    }

    protected class WorkFailureComputation
    extends AbstractComputation {
        private static final String NAME = "WorkFailure";

        public WorkFailureComputation() {
            super(NAME, 1, 0);
        }

        public void processRecord(ComputationContext context, String inputStreamName, Record record) {
            if (WorkManagerRunWorkInFailure.this.dryRun) {
                context.cancelAskForCheckpoint();
            } else {
                context.askForCheckpoint();
            }
            Work work = WorkComputation.deserialize((byte[])record.getData());
            if (this.toBeSkipped(work)) {
                return;
            }
            Supplier[] supplierArray = new Supplier[2];
            supplierArray[0] = () -> ((Work)work).getId();
            supplierArray[1] = () -> ((Work)work).getCategory();
            log.info("Trying to reprocess work: {} with category: {}", supplierArray);
            try {
                if (!WorkManagerRunWorkInFailure.this.dryRun) {
                    work.setWorkInstanceState(Work.State.UNKNOWN);
                    new WorkHolder(work).run();
                    this.cleanup(work, null);
                    Supplier[] supplierArray2 = new Supplier[1];
                    supplierArray2[0] = () -> ((Work)work).getId();
                    log.info("Work: {} successfully reprocessed", supplierArray2);
                }
                ++WorkManagerRunWorkInFailure.this.countSuccess;
            }
            catch (Exception e) {
                this.cleanup(work, e);
                log.error("Fail to reprocess work: {} with category: {}", (Object)work.getId(), (Object)work.getCategory(), (Object)e);
            }
            ++WorkManagerRunWorkInFailure.this.countTotal;
        }

        protected boolean toBeSkipped(Work work) {
            if (WorkManagerRunWorkInFailure.this.categoryFilter == null || WorkManagerRunWorkInFailure.this.categoryFilter.isEmpty() || WorkManagerRunWorkInFailure.this.categoryFilter.contains((Object)work.getCategory())) {
                Supplier[] supplierArray = new Supplier[2];
                supplierArray[0] = () -> ((Work)work).getId();
                supplierArray[1] = () -> ((Work)work).getCategory();
                log.debug("Keep work: {} with category: {}", supplierArray);
                return false;
            }
            Supplier[] supplierArray = new Supplier[2];
            supplierArray[0] = () -> ((Work)work).getId();
            supplierArray[1] = () -> ((Work)work).getCategory();
            log.info("Skipping work: {} with category: {}", supplierArray);
            ++WorkManagerRunWorkInFailure.this.countFiltered;
            return true;
        }

        protected void cleanup(Work work, Exception exception) {
            try {
                work.cleanUp(true, exception);
            }
            catch (Exception e) {
                log.error(work.getId() + ": Failure on cleanup", (Throwable)e);
            }
        }
    }
}

