/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ai.bulk;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.bulk.ExportBulkProcessed;
import org.nuxeo.ai.bulk.RecordWriter;
import org.nuxeo.ai.services.AIComponent;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.blob.ManagedBlob;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.event.EventProducer;
import org.nuxeo.ecm.core.event.impl.EventContextImpl;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;

public class DataSetExportStatusComputation
extends AbstractComputation {
    public static final String DATASET_EXPORT_DONE_EVENT = "DATASET_EXPORT_DONE_EVENT";
    public static final String ACTION_ID = "ACTION_ID";
    public static final String ACTION_DATA = "ACTION_DATA";
    public static final String ACTION_BLOB_PROVIDER = "ACTION_BLOB_PROVIDER";
    public static final String ACTION_BLOB_REF = "ACTION_BLOB_REF";
    public static final String ACTION_USERNAME = "ACTION_USER";
    private static final Log log = LogFactory.getLog(DataSetExportStatusComputation.class);
    protected final Set<String> writerNames;
    protected Map<String, Long> counters = new HashMap<String, Long>();

    public DataSetExportStatusComputation(String name, Set<String> writerNames) {
        super(name, 1, 1);
        this.writerNames = writerNames;
    }

    public static Codec<ExportBulkProcessed> getExportStatusCodec() {
        return ((CodecService)Framework.getService(CodecService.class)).getCodec("avro", ExportBulkProcessed.class);
    }

    public static void updateExportStatusProcessed(ComputationContext context, String commandId, long processed) {
        ExportBulkProcessed exportStatus = new ExportBulkProcessed(commandId, processed);
        context.produceRecord("o1", commandId, DataSetExportStatusComputation.getExportStatusCodec().encode((Object)exportStatus));
    }

    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
        ExportBulkProcessed exportStatus = (ExportBulkProcessed)DataSetExportStatusComputation.getExportStatusCodec().decode(record.getData());
        BulkService service = (BulkService)Framework.getService(BulkService.class);
        if (this.isEndOfBatch(exportStatus)) {
            for (String name : this.writerNames) {
                RecordWriter writer = ((AIComponent)Framework.getService(AIComponent.class)).getRecordWriter(name);
                if (writer == null) {
                    throw new NuxeoException(String.format("Unable to find record writer: %s", name));
                }
                if (!writer.exists(exportStatus.getCommandId())) continue;
                try {
                    Optional blob = writer.complete(exportStatus.getCommandId());
                    blob.ifPresent(theBlob -> {
                        BulkCommand command = service.getCommand(exportStatus.getCommandId());
                        if (command != null) {
                            EventContextImpl eCtx = new EventContextImpl(new Object[0]);
                            eCtx.setProperty(ACTION_ID, (Serializable)((Object)exportStatus.getCommandId()));
                            eCtx.setProperty(ACTION_DATA, (Serializable)((Object)name));
                            if (theBlob instanceof ManagedBlob) {
                                ManagedBlob managedBlob = (ManagedBlob)theBlob;
                                eCtx.setProperty(ACTION_BLOB_PROVIDER, (Serializable)((Object)managedBlob.getProviderId()));
                                eCtx.setProperty(ACTION_BLOB_REF, (Serializable)((Object)managedBlob.getKey()));
                            } else {
                                eCtx.setProperty(ACTION_BLOB_REF, (Serializable)((Object)theBlob.getDigest()));
                            }
                            eCtx.setProperty(ACTION_USERNAME, (Serializable)((Object)command.getUsername()));
                            eCtx.setRepositoryName(command.getRepository());
                            Event event = eCtx.newEvent(DATASET_EXPORT_DONE_EVENT);
                            ((EventProducer)Framework.getService(EventProducer.class)).fireEvent(event);
                        } else {
                            log.error((Object)String.format("The bulk command with id %s is missing.  Unable to raise an %s event for %s %s.", exportStatus.getCommandId(), DATASET_EXPORT_DONE_EVENT, name, theBlob.getDigest()));
                        }
                    });
                }
                catch (IOException e) {
                    throw new NuxeoException(String.format("Unable to complete action %s", exportStatus.getCommandId()), (Throwable)e);
                }
            }
            this.counters.remove(exportStatus.getCommandId());
        }
        this.updateDelta(exportStatus.getCommandId(), exportStatus.getProcessed());
        AbstractBulkComputation.updateStatusProcessed((ComputationContext)context, (String)exportStatus.getCommandId(), (long)exportStatus.getProcessed());
        context.askForCheckpoint();
    }

    protected Long getCount(String commandId) {
        return this.counters.get(commandId);
    }

    protected void updateDelta(String commandId, long processed) {
        this.counters.computeIfPresent(commandId, (s, aLong) -> processed + aLong);
    }

    protected boolean isEndOfBatch(ExportBulkProcessed exportStatus) {
        BulkStatus status = ((BulkService)Framework.getService(BulkService.class)).getStatus(exportStatus.getCommandId());
        Long processed = this.getCount(exportStatus.getCommandId());
        if (processed == null) {
            processed = 0L;
            this.counters.put(exportStatus.getCommandId(), processed);
        }
        return processed + exportStatus.getProcessed() >= status.getTotal();
    }
}

