package org.nuxeo.ecm.core.bulk;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.jodah.failsafe.RetryPolicy;
import org.nuxeo.ecm.core.bulk.computation.BulkScrollerComputation;
import org.nuxeo.ecm.core.bulk.computation.BulkStatusComputation;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.services.config.ConfigurationService;
import org.nuxeo.runtime.stream.StreamService;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/BulkAdminServiceImpl.class */
public class BulkAdminServiceImpl implements BulkAdminService {
    public static final String SCROLLER_NAME = "scroller";
    public static final String STATUS_NAME = "status";
    public static final String BULK_SCROLLER_CONCURRENCY_PROPERTY = "nuxeo.core.bulk.scroller.concurrency";
    public static final String BULK_STATUS_CONCURRENCY_PROPERTY = "nuxeo.core.bulk.status.concurrency";
    public static final String BULK_STATUS_CONTINUE_ON_FAILURE_PROPERTY = "nuxeo.core.bulk.status.continueOnFailure";
    public static final String BULK_STATUS_MAX_RETRIES_PROPERTY = "nuxeo.core.bulk.status.maxRetries";
    public static final String BULK_STATUS_DELAY_PROPERTY = "nuxeo.core.bulk.status.delayMillis";
    public static final String BULK_STATUS_MAX_DELAY_PROPERTY = "nuxeo.core.bulk.status.maxDelayMillis";
    public static final String BULK_SCROLL_SIZE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.size";
    public static final String BULK_SCROLL_KEEP_ALIVE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.keepAliveSeconds";
    public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediate";
    public static final String BULK_SCROLL_CONTINUE_ON_FAILURE_PROPERTY = "nuxeo.core.bulk.scroller.continueOnFailure";
    public static final String DEFAULT_STATUS_CONCURRENCY = "1";
    public static final String DEFAULT_STATUS_MAX_RETRIES = "3";
    public static final String DEFAULT_STATUS_DELAY_MILLIS = "500";
    public static final String DEFAULT_STATUS_MAX_DELAY_MILLIS = "10000";
    public static final String DEFAULT_SCROLLER_CONCURRENCY = "1";
    public static final String DEFAULT_SCROLL_SIZE = "100";
    public static final String DEFAULT_SCROLL_KEEP_ALIVE = "60";
    public static final String DEFAULT_SCROLL_PRODUCE_IMMEDIATE = "false";
    public static final Duration STOP_DURATION = Duration.ofSeconds(1);
    protected final Map<String, BulkActionDescriptor> descriptors;
    protected final List<String> actions;
    protected StreamProcessor streamProcessor;
    protected Map<String, BulkActionValidation> actionValidations;

    public BulkAdminServiceImpl(List<BulkActionDescriptor> list) {
        this.actions = (List) list.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        this.descriptors = new HashMap(list.size());
        list.forEach(bulkActionDescriptor -> {
            this.descriptors.put(bulkActionDescriptor.name, bulkActionDescriptor);
        });
        this.actionValidations = (Map) list.stream().collect(HashMap::new, (hashMap, bulkActionDescriptor2) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
    }

    protected void initProcessor() {
        StreamService streamService = (StreamService) Framework.getService(StreamService.class);
        ConfigurationService configurationService = (ConfigurationService) Framework.getService(ConfigurationService.class);
        this.streamProcessor = new LogStreamProcessor(streamService.getLogManager("bulk"));
        Settings settings = new Settings(1, 1, (Codec<Record>) ((CodecService) Framework.getService(CodecService.class)).getCodec("avro", Record.class));
        settings.setConcurrency(SCROLLER_NAME, Integer.parseInt(configurationService.getProperty(BULK_SCROLLER_CONCURRENCY_PROPERTY, "1")));
        settings.setConcurrency("status", Integer.parseInt(configurationService.getProperty(BULK_STATUS_CONCURRENCY_PROPERTY, "1")));
        settings.setPolicy(SCROLLER_NAME, new ComputationPolicyBuilder().continueOnFailure(configurationService.isBooleanPropertyTrue(BULK_SCROLL_CONTINUE_ON_FAILURE_PROPERTY)).retryPolicy(ComputationPolicy.NO_RETRY).build());
        settings.setPolicy(SCROLLER_NAME, new ComputationPolicyBuilder().continueOnFailure(configurationService.isBooleanPropertyTrue(BULK_STATUS_CONTINUE_ON_FAILURE_PROPERTY)).retryPolicy(new RetryPolicy().withMaxRetries(Integer.parseInt(configurationService.getProperty(BULK_STATUS_MAX_RETRIES_PROPERTY, "3"))).withBackoff(Integer.parseInt(configurationService.getProperty(BULK_STATUS_DELAY_PROPERTY, DEFAULT_STATUS_DELAY_MILLIS)), Integer.parseInt(configurationService.getProperty(BULK_STATUS_MAX_DELAY_PROPERTY, "10000")), TimeUnit.MILLISECONDS)).build());
        this.streamProcessor.init(getTopology(Integer.parseInt(configurationService.getProperty(BULK_SCROLL_SIZE_PROPERTY, DEFAULT_SCROLL_SIZE)), Integer.parseInt(configurationService.getProperty(BULK_SCROLL_KEEP_ALIVE_PROPERTY, "60")), Boolean.parseBoolean(configurationService.getProperty(BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY, "false"))), settings);
    }

    protected Topology getTopology(int i, int i2, boolean z) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("i1:command");
        int i3 = 1;
        Iterator<String> it = this.actions.iterator();
        while (it.hasNext()) {
            arrayList.add(String.format("o%s:%s", Integer.valueOf(i3), it.next()));
            i3++;
        }
        arrayList.add(String.format("o%s:%s", Integer.valueOf(i3), "status"));
        return Topology.builder().addComputation(() -> {
            return new BulkScrollerComputation(SCROLLER_NAME, this.actions.size() + 1, i, i2, z);
        }, arrayList).addComputation(() -> {
            return new BulkStatusComputation("status");
        }, Arrays.asList("i1:status", "o1:done")).build();
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkAdminService
    public List<String> getActions() {
        return this.actions;
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkAdminService
    public int getBucketSize(String str) {
        return this.descriptors.get(str).getBucketSize().intValue();
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkAdminService
    public int getBatchSize(String str) {
        return this.descriptors.get(str).getBatchSize().intValue();
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkAdminService
    public boolean isHttpEnabled(String str) {
        return this.descriptors.get(str).httpEnabled.booleanValue();
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkAdminService
    public boolean isSequentialCommands(String str) {
        return this.descriptors.get(str).sequentialCommands.booleanValue();
    }

    @Override // org.nuxeo.ecm.core.bulk.BulkAdminService
    public BulkActionValidation getActionValidation(String str) {
        return this.actionValidations.get(str);
    }

    public void afterStart() {
        initProcessor();
        this.streamProcessor.start();
    }

    public void beforeStop() {
        if (this.streamProcessor != null) {
            this.streamProcessor.stop(STOP_DURATION);
        }
    }
}
