package org.nuxeo.elasticsearch.bulk;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.ecm.core.bulk.message.DataBucket;
import org.nuxeo.elasticsearch.api.ESClient;
import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/elasticsearch/bulk/BulkIndexComputation.class */
public class BulkIndexComputation extends AbstractComputation implements BulkProcessor.Listener {
    private static final Log log = LogFactory.getLog(BulkIndexComputation.class);
    public static final String NAME = "bulkIndex";
    protected final int esBulkSize;
    protected final int esBulkActions;
    protected final int flushIntervalMs;
    protected BulkProcessor bulkProcessor;
    protected Codec<DataBucket> codec;
    protected boolean updates;
    protected boolean continueOnFailure;
    protected volatile boolean abort;

    public BulkIndexComputation(int i, int i2, int i3) {
        super(NAME, 1, 1);
        this.esBulkSize = i;
        this.esBulkActions = i2;
        this.flushIntervalMs = i3 * 1000;
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void init(ComputationContext computationContext) {
        super.init(computationContext);
        this.continueOnFailure = computationContext.getPolicy().continueOnFailure();
        this.bulkProcessor = getESClient().bulkProcessorBuilder(this).setConcurrentRequests(0).setBulkSize(new ByteSizeValue(this.esBulkSize, ByteSizeUnit.BYTES)).setBulkActions(this.esBulkActions).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(computationContext.getPolicy().getRetryPolicy().getDelay().toMillis()), computationContext.getPolicy().getRetryPolicy().getMaxRetries())).build();
        this.codec = BulkCodecs.getDataBucketCodec();
        computationContext.setTimer("flush", System.currentTimeMillis() + this.flushIntervalMs);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processTimer(ComputationContext computationContext, String str, long j) {
        if (this.abort) {
            computationContext.askForTermination();
            log.error("Terminate computation due to previous error");
            return;
        }
        if (this.updates) {
            this.bulkProcessor.flush();
            computationContext.askForCheckpoint();
            this.updates = false;
        }
        computationContext.setTimer("flush", System.currentTimeMillis() + this.flushIntervalMs);
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void processRecord(ComputationContext computationContext, String str, Record record) {
        DataBucket decode = this.codec.decode(record.getData());
        if (decode.getCount() > 0) {
            Iterator<DocWriteRequest<?>> it = decodeRequest(decode).requests().iterator();
            while (it.hasNext()) {
                this.bulkProcessor.add(it.next());
            }
            BulkStatus deltaOf = BulkStatus.deltaOf(decode.getCommandId());
            deltaOf.setProcessed(decode.getCount());
            AbstractBulkComputation.updateStatus(computationContext, deltaOf);
        }
        this.updates = true;
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void destroy() {
        if (this.bulkProcessor != null) {
            this.bulkProcessor.close();
            this.bulkProcessor = null;
        }
    }

    protected ESClient getESClient() {
        return ((ElasticSearchAdmin) Framework.getService(ElasticSearchAdmin.class)).getClient();
    }

    protected BulkRequest decodeRequest(DataBucket dataBucket) {
        BulkRequest bulkRequest = new BulkRequest();
        try {
            bulkRequest.readFrom(new ByteBufferStreamInput(ByteBuffer.wrap(dataBucket.getData())));
            return bulkRequest;
        } catch (IOException e) {
            throw new NuxeoException("Cannot load elastic bulk request from: " + dataBucket);
        }
    }

    @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
    public void beforeBulk(long j, BulkRequest bulkRequest) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Creating elasticsearch bulk %s with %d action", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions())));
        }
    }

    @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
    public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("After bulk: %s, actions: %d, status: %s", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions()), bulkResponse.status()));
        }
        if (bulkResponse.hasFailures()) {
            MutableBoolean mutableBoolean = new MutableBoolean(false);
            Arrays.stream(bulkResponse.getItems()).filter((v0) -> {
                return v0.isFailed();
            }).forEach(bulkItemResponse -> {
                if (bulkItemResponse.getFailure().getStatus() != RestStatus.CONFLICT) {
                    log.warn("Failure in bulk indexing: " + bulkItemResponse.getFailureMessage());
                    mutableBoolean.setTrue();
                } else if (log.isDebugEnabled()) {
                    log.debug("Skipping version conflict: " + bulkItemResponse.getFailureMessage());
                }
            });
            if (mutableBoolean.isTrue()) {
                log.error(String.format("Elasticsearch bulk %s returns with failures: %s", Long.valueOf(j), bulkResponse.buildFailureMessage()));
                if (this.continueOnFailure) {
                    return;
                }
                this.abort = true;
            }
        }
    }

    @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
    public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
        log.error(String.format("Elasticsearch bulk %s fails, contains %d actions", Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions())), th);
        if (this.continueOnFailure) {
            return;
        }
        this.abort = true;
    }
}
