package org.nuxeo.ai.bulk;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.services.AIComponent;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.blob.ManagedBlob;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.ecm.core.event.EventProducer;
import org.nuxeo.ecm.core.event.impl.EventContextImpl;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;

/* loaded from: input_file:org/nuxeo/ai/bulk/DataSetExportStatusComputation.class */
public class DataSetExportStatusComputation extends AbstractComputation {
    public static final String DATASET_EXPORT_DONE_EVENT = "DATASET_EXPORT_DONE_EVENT";
    public static final String ACTION_ID = "ACTION_ID";
    public static final String ACTION_DATA = "ACTION_DATA";
    public static final String ACTION_BLOB_PROVIDER = "ACTION_BLOB_PROVIDER";
    public static final String ACTION_BLOB_REF = "ACTION_BLOB_REF";
    public static final String ACTION_USERNAME = "ACTION_USER";
    private static final Log log = LogFactory.getLog(DataSetExportStatusComputation.class);
    protected final Set<String> writerNames;
    protected Map<String, Long> counters;

    public DataSetExportStatusComputation(String str, Set<String> set) {
        super(str, 1, 1);
        this.counters = new HashMap();
        this.writerNames = set;
    }

    public static Codec<ExportBulkProcessed> getExportStatusCodec() {
        return ((CodecService) Framework.getService(CodecService.class)).getCodec("avro", ExportBulkProcessed.class);
    }

    public static void updateExportStatusProcessed(ComputationContext computationContext, String str, long j) {
        computationContext.produceRecord("o1", str, getExportStatusCodec().encode(new ExportBulkProcessed(str, j)));
    }

    public void processRecord(ComputationContext computationContext, String str, Record record) {
        ExportBulkProcessed exportBulkProcessed = (ExportBulkProcessed) getExportStatusCodec().decode(record.getData());
        BulkService bulkService = (BulkService) Framework.getService(BulkService.class);
        if (isEndOfBatch(exportBulkProcessed)) {
            for (String str2 : this.writerNames) {
                RecordWriter recordWriter = ((AIComponent) Framework.getService(AIComponent.class)).getRecordWriter(str2);
                if (recordWriter == null) {
                    throw new NuxeoException(String.format("Unable to find record writer: %s", str2));
                }
                if (recordWriter.exists(exportBulkProcessed.getCommandId())) {
                    try {
                        recordWriter.complete(exportBulkProcessed.getCommandId()).ifPresent(blob -> {
                            BulkCommand command = bulkService.getCommand(exportBulkProcessed.getCommandId());
                            if (command == null) {
                                log.error(String.format("The bulk command with id %s is missing.  Unable to raise an %s event for %s %s.", exportBulkProcessed.getCommandId(), DATASET_EXPORT_DONE_EVENT, str2, blob.getDigest()));
                                return;
                            }
                            EventContextImpl eventContextImpl = new EventContextImpl(new Object[0]);
                            eventContextImpl.setProperty(ACTION_ID, exportBulkProcessed.getCommandId());
                            eventContextImpl.setProperty(ACTION_DATA, str2);
                            if (blob instanceof ManagedBlob) {
                                ManagedBlob managedBlob = (ManagedBlob) blob;
                                eventContextImpl.setProperty(ACTION_BLOB_PROVIDER, managedBlob.getProviderId());
                                eventContextImpl.setProperty(ACTION_BLOB_REF, managedBlob.getKey());
                            } else {
                                eventContextImpl.setProperty(ACTION_BLOB_REF, blob.getDigest());
                            }
                            eventContextImpl.setProperty(ACTION_USERNAME, command.getUsername());
                            eventContextImpl.setRepositoryName(command.getRepository());
                            ((EventProducer) Framework.getService(EventProducer.class)).fireEvent(eventContextImpl.newEvent(DATASET_EXPORT_DONE_EVENT));
                        });
                    } catch (IOException e) {
                        throw new NuxeoException(String.format("Unable to complete action %s", exportBulkProcessed.getCommandId()), e);
                    }
                }
            }
            this.counters.remove(exportBulkProcessed.getCommandId());
        }
        updateDelta(exportBulkProcessed.getCommandId(), exportBulkProcessed.getProcessed());
        AbstractBulkComputation.updateStatusProcessed(computationContext, exportBulkProcessed.getCommandId(), exportBulkProcessed.getProcessed());
        computationContext.askForCheckpoint();
    }

    protected Long getCount(String str) {
        return this.counters.get(str);
    }

    protected void updateDelta(String str, long j) {
        this.counters.computeIfPresent(str, (str2, l) -> {
            return Long.valueOf(j + l.longValue());
        });
    }

    protected boolean isEndOfBatch(ExportBulkProcessed exportBulkProcessed) {
        BulkStatus status = ((BulkService) Framework.getService(BulkService.class)).getStatus(exportBulkProcessed.getCommandId());
        Long count = getCount(exportBulkProcessed.getCommandId());
        if (count == null) {
            count = 0L;
            this.counters.put(exportBulkProcessed.getCommandId(), null);
        }
        return count.longValue() + exportBulkProcessed.getProcessed() >= status.getTotal();
    }
}
