package org.nuxeo.ai.bulk;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.model.AiDocumentTypeConstants;
import org.nuxeo.ai.model.export.DatasetExportService;
import org.nuxeo.ai.services.AIComponent;
import org.nuxeo.ecm.core.api.Blob;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ai/bulk/DataSetExportStatusComputation.class */
public class DataSetExportStatusComputation extends AbstractComputation {
    private static final Log log = LogFactory.getLog(DataSetExportStatusComputation.class);
    protected final Set<String> writerNames;
    protected Map<String, Long> counters;

    public DataSetExportStatusComputation(String str, Set<String> set) {
        super(str, 1, 1);
        this.counters = new HashMap();
        this.writerNames = set;
    }

    public static Codec<ExportBulkProcessed> getExportStatusCodec() {
        return ((CodecService) Framework.getService(CodecService.class)).getCodec("avro", ExportBulkProcessed.class);
    }

    public static void updateExportStatusProcessed(ComputationContext computationContext, String str, long j) {
        computationContext.produceRecord("o1", str, getExportStatusCodec().encode(new ExportBulkProcessed(str, j)));
    }

    public static boolean isTraining(String str) {
        return DataSetBulkAction.TRAINING_COMPUTATION.equals(str);
    }

    public void processRecord(ComputationContext computationContext, String str, Record record) {
        ExportBulkProcessed exportBulkProcessed = (ExportBulkProcessed) getExportStatusCodec().decode(record.getData());
        BulkService bulkService = (BulkService) Framework.getService(BulkService.class);
        if (isEndOfBatch(exportBulkProcessed)) {
            BulkCommand command = bulkService.getCommand(exportBulkProcessed.getCommandId());
            for (String str2 : this.writerNames) {
                RecordWriter recordWriter = ((AIComponent) Framework.getService(AIComponent.class)).getRecordWriter(str2);
                if (recordWriter == null) {
                    throw new NuxeoException(String.format("Unable to find record writer: %s", str2));
                }
                if (recordWriter.exists(exportBulkProcessed.getCommandId())) {
                    try {
                        recordWriter.complete(exportBulkProcessed.getCommandId()).ifPresent(blob -> {
                            if (command != null) {
                                updateCorpusDocument(exportBulkProcessed, command, blob, isTraining(str2));
                            } else {
                                log.warn(String.format("The bulk command with id %s is missing.  Unable to save blob info for %s %s.", exportBulkProcessed.getCommandId(), str2, blob.getDigest()));
                            }
                        });
                    } catch (IOException e) {
                        throw new NuxeoException(String.format("Unable to complete action %s", exportBulkProcessed.getCommandId()), e);
                    }
                }
            }
            this.counters.remove(exportBulkProcessed.getCommandId());
        }
        updateDelta(exportBulkProcessed.getCommandId(), exportBulkProcessed.getProcessed());
        BulkStatus deltaOf = BulkStatus.deltaOf(exportBulkProcessed.getCommandId());
        deltaOf.setProcessed(exportBulkProcessed.getProcessed());
        AbstractBulkComputation.updateStatus(computationContext, deltaOf);
        computationContext.askForCheckpoint();
    }

    protected void updateCorpusDocument(ExportBulkProcessed exportBulkProcessed, BulkCommand bulkCommand, Blob blob, boolean z) {
        TransactionHelper.runInTransaction(() -> {
            CoreSession openCoreSession = CoreInstance.openCoreSession(bulkCommand.getRepository(), bulkCommand.getUsername());
            Throwable th = null;
            try {
                DocumentModel corpusDocument = ((DatasetExportService) Framework.getService(DatasetExportService.class)).getCorpusDocument(openCoreSession, bulkCommand.getId());
                if (corpusDocument != null) {
                    corpusDocument.setPropertyValue(AiDocumentTypeConstants.CORPUS_DOCUMENTS_COUNT, Long.valueOf(exportBulkProcessed.getProcessed() + getCount(exportBulkProcessed.getCommandId()).longValue()));
                    corpusDocument.setPropertyValue(z ? AiDocumentTypeConstants.CORPUS_TRAINING_DATA : AiDocumentTypeConstants.CORPUS_EVALUATION_DATA, (Serializable) blob);
                    openCoreSession.saveDocument(corpusDocument);
                } else {
                    log.warn(String.format("Unable to save blob %s for command id %s.", blob.getDigest(), exportBulkProcessed.getCommandId()));
                }
                if (openCoreSession != null) {
                    if (0 == 0) {
                        openCoreSession.close();
                        return;
                    }
                    try {
                        openCoreSession.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (openCoreSession != null) {
                    if (0 != 0) {
                        try {
                            openCoreSession.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        openCoreSession.close();
                    }
                }
                throw th3;
            }
        });
    }

    protected Long getCount(String str) {
        return this.counters.get(str);
    }

    protected void updateDelta(String str, long j) {
        this.counters.computeIfPresent(str, (str2, l) -> {
            return Long.valueOf(j + l.longValue());
        });
    }

    protected boolean isEndOfBatch(ExportBulkProcessed exportBulkProcessed) {
        BulkStatus status = ((BulkService) Framework.getService(BulkService.class)).getStatus(exportBulkProcessed.getCommandId());
        Long count = getCount(exportBulkProcessed.getCommandId());
        if (count == null) {
            count = 0L;
            this.counters.put(exportBulkProcessed.getCommandId(), null);
        }
        return count.longValue() + exportBulkProcessed.getProcessed() >= status.getTotal();
    }
}
