package org.nuxeo.lib.stream.computation;

import java.util.ArrayList;
import java.util.List;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.SyncFailsafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/AbstractBatchComputation.class */
public abstract class AbstractBatchComputation extends AbstractComputation {
    private static final Log log = LogFactory.getLog(AbstractBatchComputation.class);
    public static final String TIMER_BATCH = "batch";
    protected final ComputationPolicy policy;
    protected final List<Record> batchRecords;
    protected String currentInputStream;
    protected boolean newBatch;
    protected final long thresholdMillis;

    public AbstractBatchComputation(String str, int i, int i2, ComputationPolicy computationPolicy) {
        super(str, i, i2);
        this.newBatch = true;
        this.policy = computationPolicy;
        this.thresholdMillis = computationPolicy.getBatchThreshold().toMillis();
        this.batchRecords = new ArrayList(computationPolicy.batchCapacity);
    }

    public abstract void batchProcess(ComputationContext computationContext, String str, List<Record> list);

    public abstract void batchFailure(ComputationContext computationContext, String str, List<Record> list);

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void init(ComputationContext computationContext) {
        computationContext.setTimer(TIMER_BATCH, System.currentTimeMillis() + this.thresholdMillis);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processTimer(ComputationContext computationContext, String str, long j) {
        if (TIMER_BATCH.equals(str)) {
            if (!this.batchRecords.isEmpty()) {
                processBatch(computationContext);
            }
            computationContext.setTimer(TIMER_BATCH, System.currentTimeMillis() + this.thresholdMillis);
        }
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void processRecord(ComputationContext computationContext, String str, Record record) {
        if (!str.equals(this.currentInputStream) && !this.batchRecords.isEmpty()) {
            processBatch(computationContext);
        }
        if (this.newBatch) {
            this.currentInputStream = str;
            this.newBatch = false;
        }
        this.batchRecords.add(record);
        if (this.batchRecords.size() >= this.policy.getBatchCapacity()) {
            processBatch(computationContext);
        }
    }

    protected void processBatch(ComputationContext computationContext) {
        ((SyncFailsafe) ((SyncFailsafe) ((SyncFailsafe) ((SyncFailsafe) Failsafe.with(this.policy.getRetryPolicy()).onSuccess(obj -> {
            checkpointBatch(computationContext);
        })).onFailure(th -> {
            processFailure(computationContext, th);
        })).onRetry(th2 -> {
            processRetry(computationContext, th2);
        })).withFallback(() -> {
            processFallback(computationContext);
        })).run(() -> {
            batchProcess(computationContext, this.currentInputStream, this.batchRecords);
        });
    }

    protected void processFallback(ComputationContext computationContext) {
        if (this.policy.isSkipFailure()) {
            this.batchRecords.forEach(record -> {
                log.error(String.format("Computation %s skips processing of record because of batch failure: %s", this.metadata.name(), record));
            });
            checkpointBatch(computationContext);
        } else {
            log.error(String.format("Computation %s aborts after a failure in batch", this.metadata.name()));
            this.batchRecords.forEach(record2 -> {
                log.warn("Record not processed because of the batch failure: " + record2);
            });
            computationContext.cancelAskForCheckpoint();
            computationContext.askForTermination();
        }
    }

    protected void processRetry(ComputationContext computationContext, Throwable th) {
        log.warn(String.format("Computation: %s fails to process batch of %d records from stream: %s, policy: %s, retrying ...", this.metadata.name(), Integer.valueOf(this.batchRecords.size()), this.currentInputStream, this.policy), th);
    }

    protected void checkpointBatch(ComputationContext computationContext) {
        computationContext.askForCheckpoint();
        this.batchRecords.clear();
        this.newBatch = true;
    }

    protected void processFailure(ComputationContext computationContext, Throwable th) {
        log.error(String.format("Computation: %s fails to process batch of %d records from stream: %s, after applying retries: %s", this.metadata.name(), Integer.valueOf(this.batchRecords.size()), this.currentInputStream, this.policy), th);
        batchFailure(computationContext, this.currentInputStream, this.batchRecords);
    }
}
