package org.nuxeo.ecm.core.bulk;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.bulk.BulkStatus;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.stream.StreamService;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/BulkServiceImpl.class */
public class BulkServiceImpl implements BulkService {
    private static final Log log = LogFactory.getLog(BulkServiceImpl.class);
    protected static final String SET_STREAM_NAME = "documentSet";
    protected static final String COMMAND = ":command";
    protected static final String SUBMIT_TIME = ":submitTime";
    protected static final String SCROLL_START_TIME = ":scrollStartTime";
    protected static final String SCROLL_END_TIME = ":scrollEndTime";
    protected static final String STATE = ":state";
    protected static final String PROCESSED_DOCUMENTS = ":processedDocs";
    protected static final String SCROLLED_DOCUMENT_COUNT = ":count";

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public String submit(BulkCommand bulkCommand) {
        if (log.isDebugEnabled()) {
            log.debug("Run action with command=" + bulkCommand);
        }
        if (StringUtils.isEmpty(bulkCommand.getRepository()) || StringUtils.isEmpty(bulkCommand.getQuery()) || StringUtils.isEmpty(bulkCommand.getAction())) {
            throw new IllegalArgumentException("Missing mandatory values");
        }
        String uuid = UUID.randomUUID().toString();
        byte[] bytes = BulkCommands.toBytes(bulkCommand);
        KeyValueStore kvStore = getKvStore();
        kvStore.put(uuid + STATE, BulkStatus.State.SCHEDULED.toString());
        kvStore.put(uuid + SUBMIT_TIME, Long.valueOf(Instant.now().toEpochMilli()));
        kvStore.put(uuid + COMMAND, bytes);
        ((StreamService) Framework.getService(StreamService.class)).getLogManager("bulk").getAppender(SET_STREAM_NAME).append(uuid, Record.of(uuid, bytes));
        return uuid;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public BulkStatus getStatus(String str) {
        BulkStatus bulkStatus = new BulkStatus();
        bulkStatus.setId(str);
        KeyValueStore kvStore = getKvStore();
        bulkStatus.setState(BulkStatus.State.valueOf(kvStore.getString(str + STATE)));
        bulkStatus.setSubmitTime(Instant.ofEpochMilli(kvStore.getLong(str + SUBMIT_TIME).longValue()));
        bulkStatus.setCommand(BulkCommands.fromKVStore(kvStore, str));
        Long l = kvStore.getLong(str + SCROLL_START_TIME);
        if (l != null) {
            bulkStatus.setScrollStartTime(Instant.ofEpochMilli(l.longValue()));
        }
        Long l2 = kvStore.getLong(str + SCROLL_END_TIME);
        if (l2 != null) {
            bulkStatus.setScrollEndTime(Instant.ofEpochMilli(l2.longValue()));
        }
        bulkStatus.setProcessed(kvStore.getLong(str + PROCESSED_DOCUMENTS));
        bulkStatus.setCount(kvStore.getLong(str + SCROLLED_DOCUMENT_COUNT));
        return bulkStatus;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public boolean await(String str, Duration duration) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        KeyValueStore kvStore = getKvStore();
        while (!BulkStatus.State.COMPLETED.toString().equals(kvStore.getString(str + STATE))) {
            Thread.sleep(500L);
            if (currentTimeMillis <= System.currentTimeMillis()) {
                log.debug("await timeout for commandId(" + str + ") after " + duration.toMillis() + " ms");
                return false;
            }
        }
        return true;
    }

    public KeyValueStore getKvStore() {
        return ((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
    }
}
