package org.nuxeo.ecm.platform.mqueues;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.mqueues.kafka.KafkaConfigService;
import org.nuxeo.lib.core.mqueues.computation.ComputationManager;
import org.nuxeo.lib.core.mqueues.computation.Settings;
import org.nuxeo.lib.core.mqueues.computation.Topology;
import org.nuxeo.lib.core.mqueues.computation.mqueue.MQComputationManager;
import org.nuxeo.lib.core.mqueues.mqueues.MQManager;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQManager;
import org.nuxeo.lib.core.mqueues.mqueues.kafka.KafkaMQManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentInstance;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;

/* loaded from: input_file:org/nuxeo/ecm/platform/mqueues/MQServiceImpl.class */
public class MQServiceImpl extends DefaultComponent implements MQService {
    private static final Log log = LogFactory.getLog(MQServiceImpl.class);
    public static final String NUXEO_MQUEUE_DIR_PROP = "nuxeo.mqueue.chronicle.dir";
    public static final String NUXEO_MQUEUE_RET_DURATION_PROP = "nuxeo.mqueue.chronicle.retention.duration";
    protected static final String MQ_CONFIG_XP = "config";
    protected static final String MQ_TOPOLOGY_XP = "topology";
    protected Map<String, ConfigDescriptor> configs = new HashMap();
    protected Map<String, MQManager> managers = new HashMap();
    protected Map<String, ComputationManager> computationManagers = new HashMap();
    protected Map<String, TopologyDescriptor> topologies = new HashMap();

    /* loaded from: input_file:org/nuxeo/ecm/platform/mqueues/MQServiceImpl$ComponentsLifeCycleListener.class */
    protected class ComponentsLifeCycleListener extends ComponentManager.LifeCycleHandler {
        protected ComponentsLifeCycleListener() {
        }

        public void afterStart(ComponentManager componentManager, boolean z) {
            MQServiceImpl.this.startComputations();
        }

        public void beforeStop(ComponentManager componentManager, boolean z) {
            MQServiceImpl.this.stopComputations();
        }
    }

    public int getApplicationStartedOrder() {
        return -520;
    }

    @Override // org.nuxeo.ecm.platform.mqueues.MQService
    public MQManager getManager(String str) {
        if (!this.managers.containsKey(str)) {
            if (!this.configs.containsKey(str)) {
                throw new IllegalArgumentException("Unknown MQ configuration: " + str);
            }
            ConfigDescriptor configDescriptor = this.configs.get(str);
            if ("kafka".equalsIgnoreCase(configDescriptor.getType())) {
                this.managers.put(str, createKafkaMQManager(configDescriptor));
            } else {
                this.managers.put(str, createChronicleMQManager(configDescriptor));
            }
        }
        return this.managers.get(str);
    }

    protected MQManager createKafkaMQManager(ConfigDescriptor configDescriptor) {
        String option = configDescriptor.getOption(MQ_CONFIG_XP, "default");
        KafkaConfigService kafkaConfigService = (KafkaConfigService) Framework.getService(KafkaConfigService.class);
        return new KafkaMQManager(kafkaConfigService.getZkServers(option), kafkaConfigService.getTopicPrefix(option), kafkaConfigService.getProducerProperties(option), kafkaConfigService.getConsumerProperties(option));
    }

    protected MQManager createChronicleMQManager(ConfigDescriptor configDescriptor) {
        return new ChronicleMQManager(getChroniclePath(configDescriptor.getOption("basePath", null), configDescriptor.getOption("directory", configDescriptor.getName())), getChronicleRetention(configDescriptor.getOption("retention", null)));
    }

    protected String getChronicleRetention(String str) {
        return str != null ? str : Framework.getProperty(NUXEO_MQUEUE_RET_DURATION_PROP, "4d");
    }

    protected Path getChroniclePath(String str, String str2) {
        if (str != null) {
            return Paths.get(str, str2).toAbsolutePath();
        }
        String property = Framework.getProperty(NUXEO_MQUEUE_DIR_PROP);
        if (property != null) {
            return Paths.get(property, str2).toAbsolutePath();
        }
        String property2 = Framework.getProperty("nuxeo.data.dir");
        return property2 != null ? Paths.get(property2, "mqueue", str2).toAbsolutePath() : Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "mqueue", str2).toAbsolutePath();
    }

    protected void createMQueueIfNotExists(String str, ConfigDescriptor configDescriptor) {
        if (configDescriptor.getMQueuesToCreate().isEmpty()) {
            return;
        }
        MQManager manager = getManager(str);
        configDescriptor.getMQueuesToCreate().forEach((str2, num) -> {
            log.info("Create if not exists MQ: " + str2 + " with manager: " + str);
            manager.createIfNotExists(str2, num.intValue());
        });
    }

    public void start(ComponentContext componentContext) {
        super.start(componentContext);
        this.configs.forEach(this::createMQueueIfNotExists);
        this.topologies.forEach(this::initComputations);
        Framework.getRuntime().getComponentManager().addListener(new ComponentsLifeCycleListener());
    }

    protected void initComputations(String str, TopologyDescriptor topologyDescriptor) {
        if (this.computationManagers.containsKey(str)) {
            log.error("Computation topology already initialized: " + str);
            return;
        }
        log.warn("Init computation topology: " + str + " with manager: " + topologyDescriptor.config);
        MQManager manager = getManager(topologyDescriptor.config);
        try {
            Topology topology = topologyDescriptor.klass.newInstance().getTopology(topologyDescriptor.options);
            ComputationManager mQComputationManager = new MQComputationManager(manager);
            Settings settings = new Settings(topologyDescriptor.defaultConcurrency.intValue(), topologyDescriptor.defaultPartitions.intValue());
            topologyDescriptor.computations.forEach(computationDescriptor -> {
                settings.setConcurrency(computationDescriptor.name, computationDescriptor.concurrency.intValue());
            });
            topologyDescriptor.streams.forEach(streamDescriptor -> {
                settings.setPartitions(streamDescriptor.name, streamDescriptor.partitions.intValue());
            });
            if (log.isDebugEnabled()) {
                log.debug("Starting computation topology: " + str + "\n" + topology.toPlantuml(settings));
            }
            mQComputationManager.init(topology, settings);
            this.computationManagers.put(str, mQComputationManager);
        } catch (IllegalAccessException | InstantiationException e) {
            log.error("Can not create topology for " + str, e);
        }
    }

    public void stop(ComponentContext componentContext) throws InterruptedException {
        super.stop(componentContext);
        stopComputations();
        closeManagers();
    }

    protected void startComputations() {
        this.topologies.keySet().forEach(str -> {
            ComputationManager computationManager = this.computationManagers.get(str);
            if (computationManager != null) {
                computationManager.start();
            }
        });
    }

    protected void stopComputations() {
        this.computationManagers.forEach((str, computationManager) -> {
            computationManager.stop(Duration.ofSeconds(1L));
        });
        this.computationManagers.clear();
    }

    protected void closeManagers() {
        this.managers.forEach((str, mQManager) -> {
            try {
                mQManager.close();
            } catch (Exception e) {
                log.warn("Failed to close MQManager: " + str, e);
            }
        });
        this.managers.clear();
    }

    public void registerContribution(Object obj, String str, ComponentInstance componentInstance) {
        if (str.equals(MQ_CONFIG_XP)) {
            ConfigDescriptor configDescriptor = (ConfigDescriptor) obj;
            this.configs.put(configDescriptor.name, configDescriptor);
            log.info(String.format("Register MQ Config: %s", configDescriptor.name));
        } else if (str.equals(MQ_TOPOLOGY_XP)) {
            TopologyDescriptor topologyDescriptor = (TopologyDescriptor) obj;
            this.topologies.put(topologyDescriptor.name, topologyDescriptor);
            log.info(String.format("Register MQ Topology: %s", topologyDescriptor.name));
        }
    }
}
