package org.nuxeo.ai.bulk;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.pipes.functions.PropertyUtils;
import org.nuxeo.ai.pipes.services.JacksonUtil;
import org.nuxeo.ai.pipes.types.BlobTextFromDocument;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentNotFoundException;
import org.nuxeo.ecm.core.api.IdRef;
import org.nuxeo.ecm.core.blob.ManagedBlob;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.stream.StreamProcessorTopology;

/* loaded from: input_file:org/nuxeo/ai/bulk/DataSetBulkAction.class */
public class DataSetBulkAction implements StreamProcessorTopology {
    public static final String TRAINING_STREAM = "exp-training";
    public static final String TRAINING_COMPUTATION = "training";
    public static final String VALIDATION_STREAM = "exp-validation";
    public static final String VALIDATION_COMPUTATION = "validation";
    public static final String EXPORT_STATUS_STREAM = "exp-status";
    public static final String EXPORT_STATUS_COMPUTATION = "exp-status-comp";
    public static final String EXPORT_UPLOAD_COMPUTATION = "exp-upload-comp";

    /* loaded from: input_file:org/nuxeo/ai/bulk/DataSetBulkAction$ExportingComputation.class */
    public static class ExportingComputation extends AbstractBulkComputation {
        public static final int DEFAULT_SPLIT = 75;
        private static final Log log = LogFactory.getLog(ExportingComputation.class);
        List<Record> training;
        List<Record> validation;
        int discarded;

        public ExportingComputation(String str) {
            super(str, 3);
            this.training = new ArrayList();
            this.validation = new ArrayList();
        }

        protected void compute(CoreSession coreSession, List<String> list, Map<String, Serializable> map) {
            List<String> asList = Arrays.asList(StringUtils.split((String) map.get("features"), ","));
            int parseInt = Integer.parseInt((String) map.getOrDefault("split", 75));
            ThreadLocalRandom current = ThreadLocalRandom.current();
            for (String str : list) {
                try {
                    BlobTextFromDocument docSerialize = docSerialize(coreSession.getDocument(new IdRef(str)), asList);
                    boolean z = current.nextInt(1, 101) <= parseInt;
                    if (docSerialize != null) {
                        if (log.isDebugEnabled()) {
                            log.debug(z + " " + docSerialize);
                        }
                        Record record = JacksonUtil.toRecord(this.command.getId(), docSerialize);
                        if (z) {
                            this.training.add(record);
                        } else {
                            this.validation.add(record);
                        }
                    } else {
                        this.discarded++;
                    }
                } catch (DocumentNotFoundException e) {
                    log.error("DocumentNotFoundException: " + str);
                    this.discarded++;
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("There  were Ids " + list.size());
            }
        }

        public void endBucket(ComputationContext computationContext, BulkStatus bulkStatus) {
            if (this.discarded > 0) {
                DataSetExportStatusComputation.updateExportStatusProcessed(computationContext, this.command.getId(), this.discarded);
                this.discarded = 0;
            }
            this.training.forEach(record -> {
                computationContext.produceRecord("o2", record);
            });
            this.training.clear();
            this.validation.forEach(record2 -> {
                computationContext.produceRecord("o3", record2);
            });
            this.validation.clear();
            computationContext.askForCheckpoint();
        }

        protected BlobTextFromDocument docSerialize(DocumentModel documentModel, List<String> list) {
            BlobTextFromDocument blobTextFromDocument = new BlobTextFromDocument(documentModel);
            Map properties = blobTextFromDocument.getProperties();
            list.forEach(str -> {
                ManagedBlob propertyValue = PropertyUtils.getPropertyValue(documentModel, str);
                if (propertyValue instanceof ManagedBlob) {
                    blobTextFromDocument.addBlob(str, propertyValue);
                } else if (propertyValue != null) {
                    properties.put(str, propertyValue.toString());
                }
            });
            if (properties.size() + blobTextFromDocument.getBlobs().size() == list.size()) {
                return blobTextFromDocument;
            }
            if (!log.isDebugEnabled()) {
                return null;
            }
            log.debug(String.format("Document %s one of the following properties is null so skipping. %s", documentModel.getId(), list));
            return null;
        }
    }

    public Topology getTopology(Map<String, String> map) {
        return Topology.builder().addComputation(() -> {
            return new ExportingComputation("bulkDatasetExport");
        }, Arrays.asList("i1:bulkDatasetExport", "o1:exp-status", "o2:exp-training", "o3:exp-validation")).addComputation(() -> {
            return new RecordWriterBatchComputation(TRAINING_COMPUTATION);
        }, Arrays.asList("i1:exp-training", "o1:exp-status")).addComputation(() -> {
            return new RecordWriterBatchComputation(VALIDATION_COMPUTATION);
        }, Arrays.asList("i1:exp-validation", "o1:exp-status")).addComputation(() -> {
            return new DataSetExportStatusComputation(EXPORT_STATUS_COMPUTATION, new HashSet(Arrays.asList(TRAINING_COMPUTATION, VALIDATION_COMPUTATION)));
        }, Arrays.asList("i1:exp-status", "o1:status")).addComputation(() -> {
            return new DataSetUploadComputation(EXPORT_UPLOAD_COMPUTATION);
        }, Collections.singletonList("i1:done")).build();
    }
}
