package org.nuxeo.ecm.core.bulk.actions;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.CloseableCoreSession;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.BulkCommand;
import org.nuxeo.ecm.core.bulk.BulkCounter;
import org.nuxeo.ecm.core.bulk.BulkRecords;
import org.nuxeo.ecm.core.bulk.BulkServiceImpl;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.Computation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.stream.StreamProcessorTopology;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/actions/AbstractBulkAction.class */
public abstract class AbstractBulkAction implements StreamProcessorTopology {
    public static final String BATCH_SIZE_OPT = "batchSize";
    public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs";
    public static final int DEFAULT_BATCH_SIZE = 10;
    public static final int DEFAULT_BATCH_THRESHOLD_MS = 200;
    protected final String name;

    /* loaded from: input_file:org/nuxeo/ecm/core/bulk/actions/AbstractBulkAction$AbstractBulkComputation.class */
    protected static abstract class AbstractBulkComputation extends AbstractComputation {
        protected final List<String> documentIds;
        protected final int size;
        protected final int timer;
        protected String currentCommandId;
        protected BulkCommand currentCommand;

        public AbstractBulkComputation(String str, int i, int i2, int i3, int i4) {
            super(str, i, i2);
            this.documentIds = new ArrayList(i3);
            this.timer = i4;
            this.size = i3;
        }

        public void init(ComputationContext computationContext) {
            getLog().debug(String.format("Starting computation: %s, size: %d, timer: %dms", this.metadata.name(), Integer.valueOf(this.size), Integer.valueOf(this.timer)));
            computationContext.setTimer("timer", System.currentTimeMillis() + this.timer);
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            String commandIdFrom = BulkRecords.commandIdFrom(record);
            if (this.currentCommandId == null) {
                loadCurrentBulkCommandContext(commandIdFrom);
            } else if (!this.currentCommandId.equals(commandIdFrom)) {
                processBatch(computationContext);
                this.documentIds.clear();
                loadCurrentBulkCommandContext(commandIdFrom);
            }
            this.documentIds.addAll(BulkRecords.docIdsFrom(record));
            if (this.documentIds.size() >= this.size) {
                processBatch(computationContext);
            }
        }

        public void processTimer(ComputationContext computationContext, String str, long j) {
            processBatch(computationContext);
            computationContext.setTimer("timer", System.currentTimeMillis() + this.timer);
        }

        public void destroy() {
            getLog().debug(String.format("Destroy computation: %s, pending entries: %d", this.metadata.name(), Integer.valueOf(this.documentIds.size())));
        }

        protected Log getLog() {
            return LogFactory.getLog(getClass());
        }

        protected void loadCurrentBulkCommandContext(String str) {
            this.currentCommandId = str;
            this.currentCommand = (BulkCommand) BulkCodecs.getBulkCommandCodec().decode(((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk").get(str + BulkServiceImpl.COMMAND));
        }

        protected void processBatch(ComputationContext computationContext) {
            if (this.documentIds.isEmpty()) {
                return;
            }
            TransactionHelper.runInTransaction(() -> {
                try {
                    LoginContext loginAsUser = Framework.loginAsUser(this.currentCommand.getUsername());
                    try {
                        CloseableCoreSession openCoreSession = CoreInstance.openCoreSession(this.currentCommand.getRepository());
                        Throwable th = null;
                        try {
                            try {
                                compute(openCoreSession, this.documentIds, this.currentCommand.getParams());
                                if (openCoreSession != null) {
                                    if (0 != 0) {
                                        try {
                                            openCoreSession.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        openCoreSession.close();
                                    }
                                }
                                loginAsUser.logout();
                            } finally {
                            }
                        } catch (Throwable th3) {
                            if (openCoreSession != null) {
                                if (th != null) {
                                    try {
                                        openCoreSession.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    openCoreSession.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        loginAsUser.logout();
                        throw th5;
                    }
                } catch (LoginException e) {
                    throw new NuxeoException(e);
                }
            });
            computationContext.produceRecord("o1", this.currentCommandId, BulkCodecs.getBulkCounterCodec().encode(new BulkCounter(this.currentCommandId, this.documentIds.size())));
            this.documentIds.clear();
            computationContext.askForCheckpoint();
        }

        protected abstract void compute(CoreSession coreSession, List<String> list, Map<String, Serializable> map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractBulkAction(String str) {
        this.name = str;
    }

    public Topology getTopology(Map<String, String> map) {
        int optionAsInteger = getOptionAsInteger(map, BATCH_SIZE_OPT, 10);
        int optionAsInteger2 = getOptionAsInteger(map, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS);
        return Topology.builder().addComputation(() -> {
            return createComputation(optionAsInteger, optionAsInteger2);
        }, getIOs()).build();
    }

    protected List<String> getIOs() {
        return Arrays.asList("i1:" + getActionName(), "o1:counter");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getActionName() {
        return this.name;
    }

    protected abstract Computation createComputation(int i, int i2);

    protected int getOptionAsInteger(Map<String, String> map, String str, int i) {
        String str2 = map.get(str);
        return str2 == null ? i : Integer.parseInt(str2);
    }
}
