package org.nuxeo.ecm.platform.mqueues.importer.automation;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.automation.OperationContext;
import org.nuxeo.ecm.automation.core.annotations.Context;
import org.nuxeo.ecm.automation.core.annotations.Operation;
import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
import org.nuxeo.ecm.automation.core.annotations.Param;
import org.nuxeo.ecm.platform.mqueues.MQService;
import org.nuxeo.ecm.platform.mqueues.audit.AuditLogWriter;
import org.nuxeo.ecm.platform.mqueues.importer.consumer.BlobInfoWriter;
import org.nuxeo.ecm.platform.mqueues.importer.consumer.BlobMessageConsumerFactory;
import org.nuxeo.ecm.platform.mqueues.importer.consumer.MQBlobInfoWriter;
import org.nuxeo.lib.core.mqueues.mqueues.MQManager;
import org.nuxeo.lib.core.mqueues.pattern.consumer.BatchPolicy;
import org.nuxeo.lib.core.mqueues.pattern.consumer.ConsumerPolicy;
import org.nuxeo.lib.core.mqueues.pattern.consumer.ConsumerPool;
import org.nuxeo.runtime.api.Framework;

@Operation(id = BlobConsumers.ID, category = "Services", label = "Import blobs", since = "9.1", description = "Import mqueues blob into the binarystore.")
/* loaded from: input_file:org/nuxeo/ecm/platform/mqueues/importer/automation/BlobConsumers.class */
public class BlobConsumers {
    private static final Log log = LogFactory.getLog(BlobConsumers.class);
    public static final String ID = "MQImporter.runBlobConsumers";
    public static final String DEFAULT_MQ_BLOB_INFO_NAME = "mq-blob-info";
    public static final String DEFAULT_MQ_CONFIG = "import";

    @Context
    protected OperationContext ctx;

    @Param(name = "nbThreads", required = false)
    protected Integer nbThreads;

    @Param(name = "blobProviderName", required = false)
    protected String blobProviderName = "default";

    @Param(name = AuditLogWriter.BATCH_SIZE_OPT, required = false)
    protected Integer batchSize = 10;

    @Param(name = "batchThresholdS", required = false)
    protected Integer batchThresholdS = 20;

    @Param(name = "retryMax", required = false)
    protected Integer retryMax = 3;

    @Param(name = "retryDelayS", required = false)
    protected Integer retryDelayS = 2;

    @Param(name = "mqName", required = false)
    protected String mqName;

    @Param(name = "mqBlobInfo", required = false)
    protected String mqBlobInfoName;

    @Param(name = "mqConfig", required = false)
    protected String mqConfig;

    @OperationMethod
    public void run() {
        RandomBlobProducers.checkAccess(this.ctx);
        ConsumerPolicy build = ConsumerPolicy.builder().name(ID).batchPolicy(BatchPolicy.builder().capacity(this.batchSize.intValue()).timeThreshold(Duration.ofSeconds(this.batchThresholdS.intValue())).build()).retryPolicy(new RetryPolicy().withMaxRetries(this.retryMax.intValue()).withDelay(this.retryDelayS.intValue(), TimeUnit.SECONDS)).maxThreads(getNbThreads()).build();
        MQManager manager = ((MQService) Framework.getService(MQService.class)).getManager(getMQConfig());
        try {
            BlobInfoWriter blobInfoWriter = getBlobInfoWriter(manager);
            Throwable th = null;
            try {
                try {
                    new ConsumerPool(getMQName(), manager, new BlobMessageConsumerFactory(this.blobProviderName, blobInfoWriter), build).start().get();
                    if (blobInfoWriter != null) {
                        if (0 != 0) {
                            try {
                                blobInfoWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            blobInfoWriter.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    protected BlobInfoWriter getBlobInfoWriter(MQManager mQManager) {
        initBlobInfoMQ(mQManager);
        return new MQBlobInfoWriter(mQManager.getAppender(getMQBlobInfoName()));
    }

    protected void initBlobInfoMQ(MQManager mQManager) {
        mQManager.createIfNotExists(getMQBlobInfoName(), 1);
    }

    protected short getNbThreads() {
        if (this.nbThreads != null) {
            return this.nbThreads.shortValue();
        }
        return (short) 0;
    }

    protected String getMQName() {
        return this.mqName != null ? this.mqName : RandomBlobProducers.DEFAULT_MQ_NAME;
    }

    protected String getMQBlobInfoName() {
        return this.mqBlobInfoName != null ? this.mqBlobInfoName : DEFAULT_MQ_BLOB_INFO_NAME;
    }

    protected String getMQConfig() {
        return this.mqConfig != null ? this.mqConfig : DEFAULT_MQ_CONFIG;
    }
}
