/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ai.bulk;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.bulk.DataSetExportStatusComputation;
import org.nuxeo.ai.bulk.DataSetUploadComputation;
import org.nuxeo.ai.bulk.RecordWriterBatchComputation;
import org.nuxeo.ai.pipes.functions.PropertyUtils;
import org.nuxeo.ai.pipes.services.JacksonUtil;
import org.nuxeo.ai.pipes.types.BlobTextFromDocument;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentNotFoundException;
import org.nuxeo.ecm.core.api.DocumentRef;
import org.nuxeo.ecm.core.api.IdRef;
import org.nuxeo.ecm.core.blob.ManagedBlob;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.stream.StreamProcessorTopology;

public class DataSetBulkAction
implements StreamProcessorTopology {
    public static final String TRAINING_STREAM = "exp-training";
    public static final String TRAINING_COMPUTATION = "training";
    public static final String VALIDATION_STREAM = "exp-validation";
    public static final String VALIDATION_COMPUTATION = "validation";
    public static final String EXPORT_STATUS_STREAM = "exp-status";
    public static final String EXPORT_STATUS_COMPUTATION = "exp-status-comp";
    public static final String EXPORT_UPLOAD_COMPUTATION = "exp-upload-comp";

    public Topology getTopology(Map<String, String> options) {
        return Topology.builder().addComputation(() -> new ExportingComputation("bulkDatasetExport"), Arrays.asList("i1:bulkDatasetExport", "o1:exp-status", "o2:exp-training", "o3:exp-validation")).addComputation(() -> new RecordWriterBatchComputation(TRAINING_COMPUTATION), Arrays.asList("i1:exp-training", "o1:exp-status")).addComputation(() -> new RecordWriterBatchComputation(VALIDATION_COMPUTATION), Arrays.asList("i1:exp-validation", "o1:exp-status")).addComputation(() -> new DataSetExportStatusComputation(EXPORT_STATUS_COMPUTATION, new HashSet<String>(Arrays.asList(TRAINING_COMPUTATION, VALIDATION_COMPUTATION))), Arrays.asList("i1:exp-status", "o1:status")).addComputation(() -> new DataSetUploadComputation(EXPORT_UPLOAD_COMPUTATION), Collections.singletonList("i1:done")).build();
    }

    public static class ExportingComputation
    extends AbstractBulkComputation {
        public static final int DEFAULT_SPLIT = 75;
        private static final Log log = LogFactory.getLog(ExportingComputation.class);
        List<Record> training = new ArrayList<Record>();
        List<Record> validation = new ArrayList<Record>();
        int discarded;

        public ExportingComputation(String name) {
            super(name, 3);
        }

        protected void compute(CoreSession coreSession, List<String> ids, Map<String, Serializable> properties) {
            List<String> customProperties = Arrays.asList(StringUtils.split((String)((String)((Object)properties.get("features"))), (String)","));
            int percentSplit = Integer.parseInt((String)((Object)properties.getOrDefault("split", Integer.valueOf(75))));
            ThreadLocalRandom random = ThreadLocalRandom.current();
            for (String id : ids) {
                try {
                    boolean isTraining;
                    DocumentModel doc = coreSession.getDocument((DocumentRef)new IdRef(id));
                    BlobTextFromDocument subDoc = this.docSerialize(doc, customProperties);
                    boolean bl = isTraining = random.nextInt(1, 101) <= percentSplit;
                    if (subDoc != null) {
                        if (log.isDebugEnabled()) {
                            log.debug((Object)(isTraining + " " + subDoc));
                        }
                        Record record = JacksonUtil.toRecord((String)this.command.getId(), (Object)subDoc);
                        if (isTraining) {
                            this.training.add(record);
                            continue;
                        }
                        this.validation.add(record);
                        continue;
                    }
                    ++this.discarded;
                }
                catch (DocumentNotFoundException e) {
                    log.error((Object)("DocumentNotFoundException: " + id));
                    ++this.discarded;
                }
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("There  were Ids " + ids.size()));
            }
        }

        public void endBucket(ComputationContext context, int bucketSize) {
            if (this.discarded > 0) {
                DataSetExportStatusComputation.updateExportStatusProcessed(context, this.command.getId(), this.discarded);
                this.discarded = 0;
            }
            this.training.forEach(record -> context.produceRecord("o2", record));
            this.training.clear();
            this.validation.forEach(record -> context.produceRecord("o3", record));
            this.validation.clear();
            context.askForCheckpoint();
        }

        protected BlobTextFromDocument docSerialize(DocumentModel doc, List<String> propertiesList) {
            BlobTextFromDocument blobTextFromDoc = new BlobTextFromDocument(doc);
            Map properties = blobTextFromDoc.getProperties();
            propertiesList.forEach(propName -> {
                Serializable propVal = PropertyUtils.getPropertyValue((DocumentModel)doc, (String)propName);
                if (propVal instanceof ManagedBlob) {
                    blobTextFromDoc.addBlob(propName, (ManagedBlob)propVal);
                } else if (propVal != null) {
                    properties.put(propName, propVal.toString());
                }
            });
            if (properties.size() + blobTextFromDoc.getBlobs().size() == propertiesList.size()) {
                return blobTextFromDoc;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Document %s one of the following properties is null so skipping. %s", doc.getId(), propertiesList));
            }
            return null;
        }
    }
}

