package org.nuxeo.importer.stream.automation;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.automation.OperationContext;
import org.nuxeo.ecm.automation.OperationException;
import org.nuxeo.ecm.automation.core.annotations.Context;
import org.nuxeo.ecm.automation.core.annotations.Operation;
import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
import org.nuxeo.ecm.automation.core.annotations.Param;
import org.nuxeo.importer.stream.consumer.RedisDocumentMessageConsumerFactory;
import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.stream.StreamService;

@Operation(id = RedisDocumentConsumers.ID, category = "Services", label = "Imports document into Redis", since = "10.1", description = "Import documents into Redis.")
/* loaded from: input_file:org/nuxeo/importer/stream/automation/RedisDocumentConsumers.class */
public class RedisDocumentConsumers {
    private static final Log log = LogFactory.getLog(RedisDocumentConsumers.class);
    public static final String ID = "StreamImporter.runRedisDocumentConsumers";

    @Context
    protected OperationContext ctx;

    @Param(name = "nbThreads", required = false)
    protected Integer nbThreads;

    @Param(name = "redisPrefix", required = false)
    protected String redisPrefix;

    @Param(name = "logName", required = false)
    protected String logName;

    @Param(name = "logConfig", required = false)
    protected String logConfig;

    @Param(name = "retryMax", required = false)
    protected Integer retryMax = 3;

    @Param(name = "retryDelayS", required = false)
    protected Integer retryDelayS = 2;

    @Param(name = "waitMessageTimeoutSeconds", required = false)
    protected Integer waitMessageTimeoutSeconds = 20;

    @OperationMethod
    public void run() throws OperationException {
        RandomBlobProducers.checkAccess(this.ctx);
        ConsumerPolicy build = ConsumerPolicy.builder().name(ID).batchPolicy(BatchPolicy.NO_BATCH).retryPolicy(new RetryPolicy().withMaxRetries(this.retryMax.intValue()).withDelay(this.retryDelayS.intValue(), TimeUnit.SECONDS)).maxThreads(getNbThreads()).waitMessageTimeout(Duration.ofSeconds(this.waitMessageTimeoutSeconds.intValue())).build();
        log.warn(String.format("Import documents into Redis from log: %s, with policy: %s", getLogName(), build));
        try {
            ConsumerPool consumerPool = new ConsumerPool(getLogName(), ((StreamService) Framework.getService(StreamService.class)).getLogManager(getLogConfig()), new RedisDocumentMessageConsumerFactory(this.redisPrefix), build);
            Throwable th = null;
            try {
                try {
                    consumerPool.start().get();
                    if (consumerPool != null) {
                        if (0 != 0) {
                            try {
                                consumerPool.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            consumerPool.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("Operation interrupted");
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            log.error("fail", e2);
            throw new OperationException(e2);
        }
    }

    protected short getNbThreads() {
        if (this.nbThreads != null) {
            return this.nbThreads.shortValue();
        }
        return (short) 0;
    }

    protected String getLogName() {
        return this.logName != null ? this.logName : RandomDocumentProducers.DEFAULT_DOC_LOG_NAME;
    }

    protected String getLogConfig() {
        return this.logConfig != null ? this.logConfig : "default";
    }
}
