package org.nuxeo.runtime.stream;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.kafka.KafkaConfigService;
import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentInstance;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;

/* loaded from: input_file:org/nuxeo/runtime/stream/StreamServiceImpl.class */
public class StreamServiceImpl extends DefaultComponent implements StreamService {
    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
    protected static final String LOG_CONFIG_XP = "logConfig";
    protected static final String STREAM_PROCESSOR_XP = "streamProcessor";
    private static final Log log = LogFactory.getLog(StreamServiceImpl.class);
    protected final Map<String, LogConfigDescriptor> configs = new HashMap();
    protected final Map<String, LogManager> managers = new HashMap();
    protected final Map<String, StreamProcessor> processors = new HashMap();
    protected final Map<String, StreamProcessorDescriptor> processorDescriptors = new HashMap();

    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamServiceImpl$ComponentsLifeCycleListener.class */
    protected class ComponentsLifeCycleListener extends ComponentManager.LifeCycleHandler {
        protected ComponentsLifeCycleListener() {
        }

        public void afterStart(ComponentManager componentManager, boolean z) {
            StreamServiceImpl.this.startComputations();
        }

        public void beforeStop(ComponentManager componentManager, boolean z) {
            StreamServiceImpl.this.stopComputations();
            Framework.getRuntime().getComponentManager().removeListener(this);
        }
    }

    public int getApplicationStartedOrder() {
        return -590;
    }

    @Override // org.nuxeo.runtime.stream.StreamService
    public LogManager getLogManager(String str) {
        if (!this.managers.containsKey(str)) {
            if (!this.configs.containsKey(str)) {
                throw new IllegalArgumentException("Unknown logConfig: " + str);
            }
            LogConfigDescriptor logConfigDescriptor = this.configs.get(str);
            if (logConfigDescriptor.isKafkaLog()) {
                this.managers.put(str, createKafkaLogManager(logConfigDescriptor));
            } else {
                this.managers.put(str, createChronicleLogManager(logConfigDescriptor));
            }
        }
        return this.managers.get(str);
    }

    protected LogManager createKafkaLogManager(LogConfigDescriptor logConfigDescriptor) {
        String option = logConfigDescriptor.getOption(KafkaConfigServiceImpl.KAFKA_CONFIG_XP, "default");
        KafkaConfigService kafkaConfigService = (KafkaConfigService) Framework.getService(KafkaConfigService.class);
        return new KafkaLogManager(kafkaConfigService.getZkServers(option), kafkaConfigService.getTopicPrefix(option), kafkaConfigService.getProducerProperties(option), kafkaConfigService.getConsumerProperties(option));
    }

    protected LogManager createChronicleLogManager(LogConfigDescriptor logConfigDescriptor) {
        return new ChronicleLogManager(getChroniclePath(logConfigDescriptor.getOption("basePath", null), logConfigDescriptor.getOption("directory", logConfigDescriptor.getName())), getChronicleRetention(logConfigDescriptor.getOption("retention", null)));
    }

    protected String getChronicleRetention(String str) {
        return str != null ? str : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d");
    }

    protected Path getChroniclePath(String str, String str2) {
        if (str != null) {
            return Paths.get(str, str2).toAbsolutePath();
        }
        String property = Framework.getProperty(NUXEO_STREAM_DIR_PROP);
        if (property != null) {
            return Paths.get(property, str2).toAbsolutePath();
        }
        String property2 = Framework.getProperty("nuxeo.data.dir");
        return property2 != null ? Paths.get(property2, "stream", str2).toAbsolutePath() : Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", str2).toAbsolutePath();
    }

    protected void createStreamIfNotExists(String str, LogConfigDescriptor logConfigDescriptor) {
        if (logConfigDescriptor.getLogsToCreate().isEmpty()) {
            return;
        }
        LogManager logManager = getLogManager(str);
        logConfigDescriptor.getLogsToCreate().forEach((str2, num) -> {
            log.info("Create if not exists stream: " + str2 + " with manager: " + str);
            logManager.createIfNotExists(str2, num.intValue());
        });
    }

    public void start(ComponentContext componentContext) {
        super.start(componentContext);
        this.configs.forEach(this::createStreamIfNotExists);
        this.processorDescriptors.forEach(this::initProcessor);
        Framework.getRuntime().getComponentManager().addListener(new ComponentsLifeCycleListener());
    }

    protected void initProcessor(String str, StreamProcessorDescriptor streamProcessorDescriptor) {
        if (this.processors.containsKey(str)) {
            log.error("Processor already initialized: " + str);
            return;
        }
        log.info("Init Stream processor: " + str + " with manager: " + streamProcessorDescriptor.config);
        LogManager logManager = getLogManager(streamProcessorDescriptor.config);
        Topology topology = streamProcessorDescriptor.getTopology();
        StreamProcessor logStreamProcessor = new LogStreamProcessor(logManager);
        Settings settings = streamProcessorDescriptor.getSettings();
        if (log.isDebugEnabled()) {
            log.debug("Starting computation topology: " + str + "\n" + topology.toPlantuml(settings));
        }
        logStreamProcessor.init(topology, settings);
        this.processors.put(str, logStreamProcessor);
    }

    public void stop(ComponentContext componentContext) throws InterruptedException {
        super.stop(componentContext);
        stopComputations();
        closeLogManagers();
    }

    protected void startComputations() {
        this.processorDescriptors.keySet().forEach(str -> {
            StreamProcessor streamProcessor = this.processors.get(str);
            if (streamProcessor != null) {
                streamProcessor.start();
            }
        });
    }

    protected void stopComputations() {
        this.processors.forEach((str, streamProcessor) -> {
            streamProcessor.stop(Duration.ofSeconds(1L));
        });
        this.processors.clear();
    }

    protected void closeLogManagers() {
        this.managers.values().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.managers.clear();
    }

    public void registerContribution(Object obj, String str, ComponentInstance componentInstance) {
        if (str.equals(LOG_CONFIG_XP)) {
            LogConfigDescriptor logConfigDescriptor = (LogConfigDescriptor) obj;
            this.configs.put(logConfigDescriptor.name, logConfigDescriptor);
            log.debug(String.format("Register logConfig: %s", logConfigDescriptor.name));
        } else if (str.equals(STREAM_PROCESSOR_XP)) {
            StreamProcessorDescriptor streamProcessorDescriptor = (StreamProcessorDescriptor) obj;
            this.processorDescriptors.put(streamProcessorDescriptor.name, streamProcessorDescriptor);
            log.debug(String.format("Register Stream StreamProcessorTopologyProcessor: %s", streamProcessorDescriptor.name));
        }
    }

    public void unregisterContribution(Object obj, String str, ComponentInstance componentInstance) {
        if (str.equals(LOG_CONFIG_XP)) {
            LogConfigDescriptor logConfigDescriptor = (LogConfigDescriptor) obj;
            this.configs.remove(logConfigDescriptor.name);
            log.debug(String.format("Unregister logConfig: %s", logConfigDescriptor.name));
        } else if (str.equals(STREAM_PROCESSOR_XP)) {
            StreamProcessorDescriptor streamProcessorDescriptor = (StreamProcessorDescriptor) obj;
            this.processorDescriptors.remove(streamProcessorDescriptor.name);
            log.debug(String.format("Unregister Stream StreamProcessorTopologyProcessor: %s", streamProcessorDescriptor.name));
        }
    }
}
