package org.nuxeo.runtime.stream;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.common.Environment;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.kafka.KafkaConfigService;
import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;

/* loaded from: input_file:org/nuxeo/runtime/stream/StreamServiceImpl.class */
public class StreamServiceImpl extends DefaultComponent implements StreamService {
    private static final Logger log = LogManager.getLogger((Class<?>) StreamServiceImpl.class);
    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
    public static final String DEFAULT_CODEC = "avro";
    protected static final String XP_LOG_CONFIG = "logConfig";
    protected static final String XP_STREAM_PROCESSOR = "streamProcessor";
    protected final Map<String, org.nuxeo.lib.stream.log.LogManager> managers = new HashMap();
    protected final Map<String, StreamProcessor> processors = new HashMap();

    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamServiceImpl$ComponentsLifeCycleListener.class */
    protected class ComponentsLifeCycleListener implements ComponentManager.Listener {
        protected ComponentsLifeCycleListener() {
        }

        public void afterStart(ComponentManager componentManager, boolean z) {
            StreamServiceImpl.this.startComputations();
        }

        public void beforeStop(ComponentManager componentManager, boolean z) {
            StreamServiceImpl.this.stopComputations();
            Framework.getRuntime().getComponentManager().removeListener(this);
        }
    }

    public int getApplicationStartedOrder() {
        return -590;
    }

    @Override // org.nuxeo.runtime.stream.StreamService
    public org.nuxeo.lib.stream.log.LogManager getLogManager(String str) {
        if (!this.managers.containsKey(str)) {
            LogConfigDescriptor logConfigDescriptor = (LogConfigDescriptor) getDescriptor(XP_LOG_CONFIG, str);
            if (logConfigDescriptor == null) {
                throw new IllegalArgumentException("Unknown logConfig: " + str);
            }
            if ("kafka".equalsIgnoreCase(logConfigDescriptor.type)) {
                this.managers.put(str, createKafkaLogManager(logConfigDescriptor));
            } else {
                this.managers.put(str, createChronicleLogManager(logConfigDescriptor));
            }
        }
        return this.managers.get(str);
    }

    protected org.nuxeo.lib.stream.log.LogManager createKafkaLogManager(LogConfigDescriptor logConfigDescriptor) {
        String orDefault = logConfigDescriptor.options.getOrDefault(KafkaConfigServiceImpl.XP_KAFKA_CONFIG, "default");
        KafkaConfigService kafkaConfigService = (KafkaConfigService) Framework.getService(KafkaConfigService.class);
        return new KafkaLogManager(kafkaConfigService.getTopicPrefix(orDefault), kafkaConfigService.getProducerProperties(orDefault), kafkaConfigService.getConsumerProperties(orDefault));
    }

    protected org.nuxeo.lib.stream.log.LogManager createChronicleLogManager(LogConfigDescriptor logConfigDescriptor) {
        return new ChronicleLogManager(getChroniclePath(logConfigDescriptor.options.getOrDefault("basePath", null), logConfigDescriptor.options.getOrDefault("directory", logConfigDescriptor.getId())), getChronicleRetention(logConfigDescriptor.options.getOrDefault(ChronicleLogAppender.RETENTION_KEY, null)));
    }

    protected String getChronicleRetention(String str) {
        return str != null ? str : Framework.getProperty(NUXEO_STREAM_RET_DURATION_PROP, "4d");
    }

    protected Path getChroniclePath(String str, String str2) {
        if (str != null) {
            return Paths.get(str, str2).toAbsolutePath();
        }
        String property = Framework.getProperty(NUXEO_STREAM_DIR_PROP);
        if (property != null) {
            return Paths.get(property, str2).toAbsolutePath();
        }
        String property2 = Framework.getProperty(Environment.NUXEO_DATA_DIR);
        return property2 != null ? Paths.get(property2, "stream", str2).toAbsolutePath() : Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", str2).toAbsolutePath();
    }

    protected void createStreamIfNotExists(LogConfigDescriptor logConfigDescriptor) {
        if (logConfigDescriptor.logs.isEmpty()) {
            return;
        }
        org.nuxeo.lib.stream.log.LogManager logManager = getLogManager(logConfigDescriptor.getId());
        logConfigDescriptor.logs.forEach(streamDescriptor -> {
            log.info("Create if not exists stream: {} with manager: {}", streamDescriptor.getId(), logConfigDescriptor.getId());
            logManager.createIfNotExists(streamDescriptor.getId(), streamDescriptor.size.intValue());
        });
    }

    public void start(ComponentContext componentContext) {
        super.start(componentContext);
        getDescriptors(XP_LOG_CONFIG).forEach(this::createStreamIfNotExists);
        getDescriptors(XP_STREAM_PROCESSOR).forEach(this::initProcessor);
        new ComponentsLifeCycleListener().install();
    }

    protected void initProcessor(StreamProcessorDescriptor streamProcessorDescriptor) {
        if (this.processors.containsKey(streamProcessorDescriptor.getId())) {
            log.error("Processor already initialized: {}", streamProcessorDescriptor.getId());
            return;
        }
        log.info("Init Stream processor: {} with manager: {}", streamProcessorDescriptor.getId(), streamProcessorDescriptor.config);
        org.nuxeo.lib.stream.log.LogManager logManager = getLogManager(streamProcessorDescriptor.config);
        try {
            Topology topology = streamProcessorDescriptor.klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]).getTopology(streamProcessorDescriptor.options);
            LogStreamProcessor logStreamProcessor = new LogStreamProcessor(logManager);
            Settings settings = getSettings(streamProcessorDescriptor);
            Logger logger = log;
            streamProcessorDescriptor.getClass();
            logger.debug("Starting computation topology: {}\n{}", streamProcessorDescriptor::getId, () -> {
                return topology.toPlantuml(settings);
            });
            logStreamProcessor.init(topology, settings);
            this.processors.put(streamProcessorDescriptor.getId(), logStreamProcessor);
        } catch (ReflectiveOperationException e) {
            throw new StreamRuntimeException("Can not create topology for processor: " + streamProcessorDescriptor.getId(), e);
        }
    }

    protected Settings getSettings(StreamProcessorDescriptor streamProcessorDescriptor) {
        CodecService codecService = (CodecService) Framework.getService(CodecService.class);
        Settings settings = new Settings(streamProcessorDescriptor.defaultConcurrency.intValue(), streamProcessorDescriptor.defaultPartitions.intValue(), streamProcessorDescriptor.defaultCodec == null ? codecService.getCodec("avro", Record.class) : codecService.getCodec(streamProcessorDescriptor.defaultCodec, Record.class), streamProcessorDescriptor.getDefaultPolicy());
        streamProcessorDescriptor.computations.forEach(computationDescriptor -> {
            settings.setConcurrency(computationDescriptor.name, computationDescriptor.concurrency.intValue());
        });
        streamProcessorDescriptor.policies.forEach(policyDescriptor -> {
            settings.setPolicy(policyDescriptor.name, streamProcessorDescriptor.getPolicy(policyDescriptor.name));
        });
        streamProcessorDescriptor.streams.forEach(streamDescriptor -> {
            settings.setPartitions(streamDescriptor.name, streamDescriptor.partitions.intValue());
        });
        streamProcessorDescriptor.streams.stream().filter(streamDescriptor2 -> {
            return Objects.nonNull(streamDescriptor2.codec);
        }).forEach(streamDescriptor3 -> {
            settings.setCodec(streamDescriptor3.name, codecService.getCodec(streamDescriptor3.codec, Record.class));
        });
        return settings;
    }

    public void stop(ComponentContext componentContext) throws InterruptedException {
        super.stop(componentContext);
        stopComputations();
        closeLogManagers();
    }

    protected void startComputations() {
        getDescriptors(XP_STREAM_PROCESSOR).forEach(descriptor -> {
            StreamProcessor streamProcessor = this.processors.get(descriptor.getId());
            if (streamProcessor != null) {
                streamProcessor.start();
            }
        });
    }

    protected void stopComputations() {
        this.processors.forEach((str, streamProcessor) -> {
            streamProcessor.stop(Duration.ofSeconds(1L));
        });
        this.processors.clear();
    }

    protected void closeLogManagers() {
        this.managers.values().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.managers.clear();
    }
}
