package org.nuxeo.ecm.core.bulk;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.ecm.core.api.repository.RepositoryManager;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.kv.KeyValueStoreProvider;
import org.nuxeo.runtime.stream.StreamService;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/BulkServiceImpl.class */
public class BulkServiceImpl implements BulkService {
    private static final Logger log = LogManager.getLogger((Class<?>) BulkServiceImpl.class);
    public static final String BULK_LOG_MANAGER_NAME = "bulk";
    public static final String BULK_KV_STORE_NAME = "bulk";
    public static final String COMMAND_STREAM = "command";
    public static final String STATUS_STREAM = "status";
    public static final String DONE_STREAM = "done";
    public static final String RECORD_CODEC = "avro";
    public static final String COMMAND_PREFIX = "command:";
    public static final String STATUS_PREFIX = "status:";
    public static final String PRODUCE_IMMEDIATE_OPTION = "produceImmediate";
    public static final long COMPLETED_TTL_SECONDS = 3600;
    public static final long ABORTED_TTL_SECONDS = 7200;

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public String submit(BulkCommand bulkCommand) {
        log.debug("Run action with command={}", bulkCommand);
        BulkAdminService bulkAdminService = (BulkAdminService) Framework.getService(BulkAdminService.class);
        if (!bulkAdminService.getActions().contains(bulkCommand.getAction())) {
            throw new IllegalArgumentException("Unknown action for command: " + bulkCommand);
        }
        BulkActionValidation actionValidation = bulkAdminService.getActionValidation(bulkCommand.getAction());
        if (actionValidation != null) {
            actionValidation.validate(bulkCommand);
        }
        RepositoryManager repositoryManager = (RepositoryManager) Framework.getService(RepositoryManager.class);
        if (StringUtils.isEmpty(bulkCommand.getRepository())) {
            bulkCommand.setRepository(repositoryManager.getDefaultRepositoryName());
        } else if (repositoryManager.getRepository(bulkCommand.getRepository()) == null) {
            throw new IllegalArgumentException("Unknown repository: " + bulkCommand);
        }
        if (bulkCommand.getBucketSize() == 0 || bulkCommand.getBatchSize() == 0) {
            if (bulkCommand.getBucketSize() == 0) {
                bulkCommand.setBucketSize(bulkAdminService.getBucketSize(bulkCommand.getAction()));
            }
            if (bulkCommand.getBatchSize() == 0) {
                bulkCommand.setBatchSize(bulkAdminService.getBatchSize(bulkCommand.getAction()));
            }
        }
        BulkStatus bulkStatus = new BulkStatus(bulkCommand.getId());
        bulkStatus.setState(BulkStatus.State.SCHEDULED);
        bulkStatus.setAction(bulkCommand.getAction());
        bulkStatus.setUsername(bulkCommand.getUsername());
        bulkStatus.setSubmitTime(Instant.now());
        setStatus(bulkStatus);
        ((StreamService) Framework.getService(StreamService.class)).getLogManager("bulk").getAppender("command", ((CodecService) Framework.getService(CodecService.class)).getCodec("avro", Record.class)).append(bulkAdminService.isSequentialCommands(bulkCommand.getAction()) ? bulkCommand.getAction() : bulkCommand.getId(), (String) Record.of(bulkCommand.getId(), setCommand(bulkCommand)));
        return bulkCommand.getId();
    }

    @Override // org.nuxeo.ecm.core.api.AsyncService
    public BulkStatus getStatus(String str) {
        byte[] bArr = getKvStore().get(STATUS_PREFIX + str);
        if (bArr != null) {
            return BulkCodecs.getStatusCodec().decode(bArr);
        }
        log.debug("Request status of unknown command: {}", str);
        return BulkStatus.unknownOf(str);
    }

    public byte[] setStatus(BulkStatus bulkStatus) {
        KeyValueStore kvStore = getKvStore();
        byte[] encode = BulkCodecs.getStatusCodec().encode(bulkStatus);
        switch (bulkStatus.getState()) {
            case ABORTED:
                kvStore.put(STATUS_PREFIX + bulkStatus.getId(), encode, ABORTED_TTL_SECONDS);
                kvStore.put(COMMAND_PREFIX + bulkStatus.getId(), (String) null);
                break;
            case COMPLETED:
                kvStore.put(STATUS_PREFIX + bulkStatus.getId(), encode, COMPLETED_TTL_SECONDS);
                kvStore.setTTL(COMMAND_PREFIX + bulkStatus.getId(), COMPLETED_TTL_SECONDS);
                break;
            default:
                kvStore.put(STATUS_PREFIX + bulkStatus.getId(), encode);
                break;
        }
        return encode;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public BulkCommand getCommand(String str) {
        byte[] bArr = getKvStore().get(COMMAND_PREFIX + str);
        if (bArr == null) {
            return null;
        }
        return BulkCodecs.getCommandCodec().decode(bArr);
    }

    @Override // org.nuxeo.ecm.core.api.AsyncService
    public BulkStatus abort(String str) {
        BulkStatus status = getStatus(str);
        if (BulkStatus.State.COMPLETED.equals(status.getState())) {
            log.debug("Cannot abort a completed command: {}", str);
            return status;
        }
        status.setState(BulkStatus.State.ABORTED);
        setStatus(status);
        BulkStatus deltaOf = BulkStatus.deltaOf(str);
        deltaOf.setCompletedTime(Instant.now());
        deltaOf.setState(BulkStatus.State.ABORTED);
        ((StreamService) Framework.getService(StreamService.class)).getLogManager("bulk").getAppender("status").append(str, (String) Record.of(str, BulkCodecs.getStatusCodec().encode(deltaOf)));
        return status;
    }

    @Override // org.nuxeo.ecm.core.api.AsyncService
    public Map<String, Serializable> getResult(String str) {
        return getStatus(str).getResult();
    }

    public byte[] setCommand(BulkCommand bulkCommand) {
        KeyValueStore kvStore = getKvStore();
        byte[] encode = BulkCodecs.getCommandCodec().encode(bulkCommand);
        kvStore.put(COMMAND_PREFIX + bulkCommand.getId(), encode);
        return encode;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public boolean await(String str, Duration duration) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        do {
            switch (getStatus(str).getState()) {
                case ABORTED:
                case COMPLETED:
                    return true;
                case UNKNOWN:
                    log.error("Unknown status for command: {}", str);
                    return false;
                default:
                    Thread.sleep(100L);
                    break;
            }
        } while (currentTimeMillis > System.currentTimeMillis());
        Logger logger = log;
        duration.getClass();
        logger.debug("await timeout on {} after {} ms", () -> {
            return getStatus(str);
        }, duration::toMillis);
        return false;
    }

    public KeyValueStore getKvStore() {
        return ((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public boolean await(Duration duration) throws InterruptedException {
        BulkStatus status;
        Set<String> set = (Set) ((KeyValueStoreProvider) getKvStore()).keyStream(STATUS_PREFIX).map(str -> {
            return str.replaceFirst(STATUS_PREFIX, "");
        }).collect(Collectors.toSet());
        long nanoTime = System.nanoTime() + duration.toNanos();
        for (String str2 : set) {
            do {
                status = getStatus(str2);
                BulkStatus.State state = status.getState();
                if (state != BulkStatus.State.COMPLETED && state != BulkStatus.State.ABORTED && state != BulkStatus.State.UNKNOWN) {
                    Thread.sleep(200L);
                }
            } while (nanoTime >= System.nanoTime());
            log.debug("await timeout, at least one uncompleted command: {}", status);
            return false;
        }
        return true;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkService
    public List<BulkStatus> getStatuses(String str) {
        KeyValueStoreProvider keyValueStoreProvider = (KeyValueStoreProvider) getKvStore();
        Stream<String> keyStream = keyValueStoreProvider.keyStream(STATUS_PREFIX);
        keyValueStoreProvider.getClass();
        Stream<R> map = keyStream.map(keyValueStoreProvider::get);
        Codec<BulkStatus> statusCodec = BulkCodecs.getStatusCodec();
        statusCodec.getClass();
        return (List) map.map(statusCodec::decode).filter(bulkStatus -> {
            return str.equals(bulkStatus.getUsername());
        }).collect(Collectors.toList());
    }
}
