package org.nuxeo.ecm.core.bulk.action.computation;

import com.google.common.collect.Lists;
import com.ibm.icu.text.PluralRules;
import java.io.Serializable;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.ecm.core.api.CloseableCoreSession;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModelList;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.message.BulkBucket;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/action/computation/AbstractBulkComputation.class */
public abstract class AbstractBulkComputation extends AbstractComputation {
    private static final Logger log = LogManager.getLogger((Class<?>) AbstractBulkComputation.class);
    protected static final String SELECT_DOCUMENTS_IN = "SELECT * FROM Document, Relation WHERE ecm:uuid IN ('%s')";
    protected Map<String, BulkCommand> commands;
    protected BulkCommand command;
    protected BulkStatus delta;

    public AbstractBulkComputation(String str) {
        this(str, 1);
    }

    public AbstractBulkComputation(String str, int i) {
        super(str, 1, i);
        this.commands = new PassiveExpiringMap(60L, TimeUnit.SECONDS);
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void processRecord(ComputationContext computationContext, String str, Record record) {
        BulkBucket decode = BulkCodecs.getBucketCodec().decode(record.getData());
        this.command = getCommand(decode.getCommandId());
        if (this.command == null) {
            if (isAbortedCommand(decode.getCommandId())) {
                log.debug("Skipping aborted command: {}", decode.getCommandId());
                computationContext.askForCheckpoint();
                return;
            } else {
                log.error("Stopping processing, unknown command: {}, offset: {}, record: {}.", decode.getCommandId(), computationContext.getLastOffset(), record);
                computationContext.askForTermination();
                return;
            }
        }
        this.delta = BulkStatus.deltaOf(this.command.getId());
        this.delta.setProcessingStartTime(Instant.now());
        this.delta.setProcessed(decode.getIds().size());
        startBucket(record.getKey());
        Iterator it = Lists.partition(decode.getIds(), this.command.getBatchSize()).iterator();
        while (it.hasNext()) {
            processBatchOfDocuments((List) it.next());
        }
        this.delta.setProcessingEndTime(Instant.now());
        endBucket(computationContext, this.delta);
        computationContext.askForCheckpoint();
    }

    protected boolean isAbortedCommand(String str) {
        return BulkStatus.State.ABORTED.equals(((BulkService) Framework.getService(BulkService.class)).getStatus(str).getState());
    }

    protected BulkCommand getCommand(String str) {
        this.commands.size();
        return this.commands.computeIfAbsent(str, str2 -> {
            return ((BulkService) Framework.getService(BulkService.class)).getCommand(str2);
        });
    }

    public BulkCommand getCurrentCommand() {
        return this.command;
    }

    protected void processBatchOfDocuments(List<String> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        TransactionHelper.runInTransaction(() -> {
            try {
                LoginContext loginAsUser = Framework.loginAsUser(this.command.getUsername());
                try {
                    CloseableCoreSession openCoreSession = CoreInstance.openCoreSession(this.command.getRepository());
                    Throwable th = null;
                    try {
                        try {
                            compute(openCoreSession, list, this.command.getParams());
                            if (openCoreSession != null) {
                                if (0 != 0) {
                                    try {
                                        openCoreSession.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    openCoreSession.close();
                                }
                            }
                            loginAsUser.logout();
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (openCoreSession != null) {
                            if (th != null) {
                                try {
                                    openCoreSession.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                openCoreSession.close();
                            }
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    loginAsUser.logout();
                    throw th5;
                }
            } catch (LoginException e) {
                throw new NuxeoException(e);
            }
        });
    }

    public void startBucket(String str) {
    }

    public void endBucket(ComputationContext computationContext, BulkStatus bulkStatus) {
        updateStatus(computationContext, bulkStatus);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processFailure(ComputationContext computationContext, Throwable th) {
        log.error(String.format("Action: %s fails on record: %s after retries.", this.metadata.name(), computationContext.getLastOffset()), th);
        this.delta.inError(this.metadata.name() + " fails on " + computationContext.getLastOffset() + PluralRules.KEYWORD_RULE_SEPARATOR + th.getMessage());
        endBucket(computationContext, this.delta);
    }

    public static void updateStatus(ComputationContext computationContext, BulkStatus bulkStatus) {
        computationContext.produceRecord(AbstractComputation.OUTPUT_1, bulkStatus.getId(), BulkCodecs.getStatusCodec().encode(bulkStatus));
    }

    protected abstract void compute(CoreSession coreSession, List<String> list, Map<String, Serializable> map);

    public DocumentModelList loadDocuments(CoreSession coreSession, List<String> list) {
        return (list == null || list.isEmpty()) ? new DocumentModelListImpl(0) : coreSession.query(String.format(SELECT_DOCUMENTS_IN, String.join("', '", list)));
    }
}
