/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.computation;

import java.util.ArrayList;
import java.util.List;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.SyncFailsafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.Record;

public abstract class AbstractBatchComputation
extends AbstractComputation {
    private static final Log log = LogFactory.getLog(AbstractBatchComputation.class);
    public static final String TIMER_BATCH = "batch";
    protected final ComputationPolicy policy;
    protected final List<Record> batchRecords;
    protected String currentInputStream;
    protected boolean newBatch = true;
    protected final long thresholdMillis;

    public AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams, ComputationPolicy policy) {
        super(name, nbInputStreams, nbOutputStreams);
        this.policy = policy;
        this.thresholdMillis = policy.getBatchThreshold().toMillis();
        this.batchRecords = new ArrayList<Record>(policy.batchCapacity);
    }

    public abstract void batchProcess(ComputationContext var1, String var2, List<Record> var3);

    public abstract void batchFailure(ComputationContext var1, String var2, List<Record> var3);

    @Override
    public void init(ComputationContext context) {
        context.setTimer(TIMER_BATCH, System.currentTimeMillis() + this.thresholdMillis);
    }

    @Override
    public void processTimer(ComputationContext context, String key, long timestamp) {
        if (!TIMER_BATCH.equals(key)) {
            return;
        }
        if (!this.batchRecords.isEmpty()) {
            this.processBatch(context);
        }
        context.setTimer(TIMER_BATCH, System.currentTimeMillis() + this.thresholdMillis);
    }

    @Override
    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
        if (!inputStreamName.equals(this.currentInputStream) && !this.batchRecords.isEmpty()) {
            this.processBatch(context);
        }
        if (this.newBatch) {
            this.currentInputStream = inputStreamName;
            this.newBatch = false;
        }
        this.batchRecords.add(record);
        if (this.batchRecords.size() >= this.policy.getBatchCapacity()) {
            this.processBatch(context);
        }
    }

    protected void processBatch(ComputationContext context) {
        ((SyncFailsafe)((SyncFailsafe)((SyncFailsafe)((SyncFailsafe)Failsafe.with((RetryPolicy)this.policy.getRetryPolicy()).onSuccess(ret -> this.checkpointBatch(context))).onFailure(failure -> this.processFailure(context, (Throwable)failure))).onRetry(failure -> this.processRetry(context, (Throwable)failure))).withFallback(() -> this.processFallback(context))).run(() -> this.batchProcess(context, this.currentInputStream, this.batchRecords));
    }

    protected void processFallback(ComputationContext context) {
        if (this.policy.isSkipFailure()) {
            this.batchRecords.forEach(record -> log.error((Object)String.format("Computation %s skips processing of record because of batch failure: %s", this.metadata.name(), record)));
            this.checkpointBatch(context);
        } else {
            log.error((Object)String.format("Computation %s aborts after a failure in batch", this.metadata.name()));
            this.batchRecords.forEach(record -> log.warn((Object)("Record not processed because of the batch failure: " + record)));
            context.cancelAskForCheckpoint();
            context.askForTermination();
        }
    }

    protected void processRetry(ComputationContext context, Throwable failure) {
        log.warn((Object)String.format("Computation: %s fails to process batch of %d records from stream: %s, policy: %s, retrying ...", this.metadata.name(), this.batchRecords.size(), this.currentInputStream, this.policy), failure);
    }

    protected void checkpointBatch(ComputationContext context) {
        context.askForCheckpoint();
        this.batchRecords.clear();
        this.newBatch = true;
    }

    protected void processFailure(ComputationContext context, Throwable failure) {
        log.error((Object)String.format("Computation: %s fails to process batch of %d records from stream: %s, after applying retries: %s", this.metadata.name(), this.batchRecords.size(), this.currentInputStream, this.policy), failure);
        this.batchFailure(context, this.currentInputStream, this.batchRecords);
    }
}

