package org.nuxeo.ecm.core.bulk;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.bulk.BulkStatus;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.stream.StreamService;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/BulkServiceImpl.class */
public class BulkServiceImpl implements BulkService {
    private static final Log log = LogFactory.getLog(BulkServiceImpl.class);
    protected static final String DOCUMENTSET_ACTION_NAME = "documentSet";
    public static final String COMMAND = ":command";
    public static final String STATUS = ":status";

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public String submit(BulkCommand bulkCommand) {
        if (log.isDebugEnabled()) {
            log.debug("Run action with command=" + bulkCommand);
        }
        if (StringUtils.isEmpty(bulkCommand.getRepository()) || StringUtils.isEmpty(bulkCommand.getQuery()) || StringUtils.isEmpty(bulkCommand.getAction())) {
            throw new IllegalArgumentException("Missing mandatory values");
        }
        String uuid = UUID.randomUUID().toString();
        byte[] encode = BulkCodecs.getBulkCommandCodec().encode(bulkCommand);
        KeyValueStore kvStore = getKvStore();
        BulkStatus bulkStatus = new BulkStatus();
        bulkStatus.setId(uuid);
        bulkStatus.setState(BulkStatus.State.SCHEDULED);
        bulkStatus.setSubmitTime(Instant.now());
        byte[] encode2 = BulkCodecs.getBulkStatusCodec().encode(bulkStatus);
        kvStore.put(uuid + COMMAND, encode);
        kvStore.put(uuid + STATUS, encode2);
        ((StreamService) Framework.getService(StreamService.class)).getLogManager("bulk").getAppender(DOCUMENTSET_ACTION_NAME).append(uuid, Record.of(uuid, encode));
        return uuid;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public BulkStatus getStatus(String str) {
        return (BulkStatus) BulkCodecs.getBulkStatusCodec().decode(getKvStore().get(str + STATUS));
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public boolean await(String str, Duration duration) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        KeyValueStore kvStore = getKvStore();
        while (!BulkStatus.State.COMPLETED.equals(((BulkStatus) BulkCodecs.getBulkStatusCodec().decode(kvStore.get(str + STATUS))).getState())) {
            Thread.sleep(500L);
            if (currentTimeMillis <= System.currentTimeMillis()) {
                log.debug("await timeout for commandId(" + str + ") after " + duration.toMillis() + " ms");
                return false;
            }
        }
        return true;
    }

    public KeyValueStore getKvStore() {
        return ((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public boolean await(Duration duration) throws InterruptedException {
        LogManager logManager = ((StreamService) Framework.getService(StreamService.class)).getLogManager("bulk");
        List<String> actions = ((BulkAdminService) Framework.getService(BulkAdminService.class)).getActions();
        ArrayList<String> arrayList = new ArrayList(actions.size() + 3);
        arrayList.add(DOCUMENTSET_ACTION_NAME);
        arrayList.addAll(actions);
        arrayList.add(StreamBulkProcessor.COUNTER_ACTION_NAME);
        arrayList.add(StreamBulkProcessor.KVWRITER_ACTION_NAME);
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        for (String str : arrayList) {
            while (logManager.getLag(str, str).lag() > 0) {
                if (System.currentTimeMillis() > currentTimeMillis) {
                    return false;
                }
                Thread.sleep(50L);
            }
            Thread.sleep(100L);
        }
        return true;
    }
}
