package org.nuxeo.elasticsearch.bulk;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/elasticsearch/bulk/IndexCompletionComputation.class */
public class IndexCompletionComputation extends AbstractComputation {
    private static final Log log = LogFactory.getLog(IndexCompletionComputation.class);
    public static final String NAME = "indexCompletion";
    protected Codec<BulkStatus> codec;

    public IndexCompletionComputation() {
        super(NAME, 1, 0);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void init(ComputationContext computationContext) {
        super.init(computationContext);
        this.codec = BulkCodecs.getStatusCodec();
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void processRecord(ComputationContext computationContext, String str, Record record) {
        BulkStatus decode = this.codec.decode(record.getData());
        if ("index".equals(decode.getAction()) && BulkStatus.State.COMPLETED.equals(decode.getState())) {
            logIndexing(decode);
            BulkCommand command = ((BulkService) Framework.getService(BulkService.class)).getCommand(decode.getId());
            refreshIndexIfNeeded(command);
            updateAliasIfNeeded(command);
        }
        computationContext.askForCheckpoint();
    }

    protected void refreshIndexIfNeeded(BulkCommand bulkCommand) {
        if (Boolean.TRUE.equals((Boolean) bulkCommand.getParam("refresh"))) {
            log.warn("Refresh index requested by command: " + bulkCommand.getId());
            ((ElasticSearchAdmin) Framework.getService(ElasticSearchAdmin.class)).refreshRepositoryIndex(bulkCommand.getRepository());
        }
    }

    protected void updateAliasIfNeeded(BulkCommand bulkCommand) {
        if (Boolean.TRUE.equals((Boolean) bulkCommand.getParam(IndexAction.INDEX_UPDATE_ALIAS_PARAM))) {
            log.warn("Update alias requested by command: " + bulkCommand.getId());
            ElasticSearchAdmin elasticSearchAdmin = (ElasticSearchAdmin) Framework.getService(ElasticSearchAdmin.class);
            elasticSearchAdmin.syncSearchAndWriteAlias(elasticSearchAdmin.getIndexNameForRepository(bulkCommand.getRepository()));
        }
    }

    protected void logIndexing(BulkStatus bulkStatus) {
        long epochMilli = bulkStatus.getCompletedTime().toEpochMilli() - bulkStatus.getSubmitTime().toEpochMilli();
        log.warn(String.format("Index command: %s completed: %d docs in %.2fs (wait: %.2fs, scroll: %.2fs) rate: %.2f docs/s", bulkStatus.getId(), Long.valueOf(bulkStatus.getTotal()), Double.valueOf(epochMilli / 1000.0d), Double.valueOf((bulkStatus.getScrollStartTime().toEpochMilli() - bulkStatus.getSubmitTime().toEpochMilli()) / 1000.0d), Double.valueOf((bulkStatus.getScrollEndTime().toEpochMilli() - bulkStatus.getScrollStartTime().toEpochMilli()) / 1000.0d), Double.valueOf((1000.0d * bulkStatus.getTotal()) / epochMilli)));
    }
}
