package org.nuxeo.elasticsearch.bulk;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentModelList;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.ecm.core.bulk.message.DataBucket;
import org.nuxeo.elasticsearch.Timestamp;
import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
import org.nuxeo.elasticsearch.api.ElasticSearchIndexing;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/elasticsearch/bulk/IndexRequestComputation.class */
public class IndexRequestComputation extends AbstractBulkComputation {
    private static final Log log = LogFactory.getLog(IndexRequestComputation.class);
    protected static final long MAX_RECORD_SIZE = 900000;
    protected static final String INDEX_OPTION = "indexName";
    protected BulkRequest bulkRequest;
    protected List<BulkRequest> bulkRequests;
    protected String bucketKey;

    public IndexRequestComputation() {
        super("index", 1);
        this.bulkRequests = new ArrayList();
    }

    @Override // org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation
    public void startBucket(String str) {
        this.bucketKey = str;
        this.bulkRequests.clear();
        this.bulkRequest = new BulkRequest();
    }

    @Override // org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation
    protected void compute(CoreSession coreSession, List<String> list, Map<String, Serializable> map) {
        long currentTimeMicros = Timestamp.currentTimeMicros();
        String indexName = getIndexName(coreSession, map);
        DocumentModelList loadDocuments = loadDocuments(coreSession, list);
        ElasticSearchIndexing elasticSearchIndexing = (ElasticSearchIndexing) Framework.getService(ElasticSearchIndexing.class);
        for (DocumentModel documentModel : loadDocuments) {
            try {
                append(new IndexRequest(indexName, "doc", documentModel.getId()).source(elasticSearchIndexing.source(documentModel), XContentType.JSON).versionType(VersionType.EXTERNAL).version(currentTimeMicros));
            } catch (IOException e) {
                throw new NuxeoException("Cannot build source for document: " + documentModel.getId(), e);
            }
        }
    }

    protected void append(IndexRequest indexRequest) {
        if (this.bulkRequest.estimatedSizeInBytes() + indexRequest.source().length() > MAX_RECORD_SIZE) {
            if (this.bulkRequest.numberOfActions() > 0) {
                this.bulkRequests.add(this.bulkRequest);
                this.bulkRequest = new BulkRequest();
            }
            if (indexRequest.source().length() > MAX_RECORD_SIZE) {
                log.warn(String.format("Indexing request for doc: %s, is too large: %d, max record size: %d", indexRequest.id(), Integer.valueOf(indexRequest.source().length()), Long.valueOf(MAX_RECORD_SIZE)));
            }
        }
        this.bulkRequest.add(indexRequest);
    }

    @Override // org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation
    public void endBucket(ComputationContext computationContext, BulkStatus bulkStatus) {
        long processed = bulkStatus.getProcessed();
        this.bulkRequests.add(this.bulkRequest);
        String id = getCurrentCommand().getId();
        int i = 0;
        int i2 = 0;
        for (BulkRequest bulkRequest : this.bulkRequests) {
            int i3 = i;
            i++;
            computationContext.produceRecord(AbstractComputation.OUTPUT_1, Record.of(this.bucketKey + "-" + i3, BulkCodecs.getDataBucketCodec().encode(new DataBucket(id, bulkRequest.numberOfActions(), toBytes(bulkRequest)))));
            i2 += bulkRequest.numberOfActions();
        }
        if (i2 < processed) {
            log.warn(String.format("Command: %s offset: %s created %d documents out of %d, %d not accessible", id, computationContext.getLastOffset(), Integer.valueOf(i2), Long.valueOf(processed), Long.valueOf(processed - i2)));
            computationContext.produceRecord(AbstractComputation.OUTPUT_1, Record.of(this.bucketKey + "-missing", BulkCodecs.getDataBucketCodec().encode(new DataBucket(id, processed - i2, toBytes(new BulkRequest())))));
        }
        this.bulkRequest = null;
        this.bulkRequests.clear();
    }

    protected String getIndexName(CoreSession coreSession, Map<String, Serializable> map) {
        if (map.containsKey(INDEX_OPTION)) {
            return (String) map.get(INDEX_OPTION);
        }
        ElasticSearchAdmin elasticSearchAdmin = (ElasticSearchAdmin) Framework.getService(ElasticSearchAdmin.class);
        return elasticSearchAdmin.getWriteIndexName(elasticSearchAdmin.getIndexNameForRepository(coreSession.getRepositoryName()));
    }

    protected byte[] toBytes(BulkRequest bulkRequest) {
        BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
        try {
            bulkRequest.writeTo(bytesStreamOutput);
            return BytesReference.toBytes(bytesStreamOutput.bytes());
        } catch (IOException e) {
            throw new NuxeoException("Cannot write elasticsearch bulk request " + bulkRequest, e);
        }
    }
}
