/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ai.pipes.services;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.SharedMetricRegistries;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.pipes.consumers.LogAppenderConsumer;
import org.nuxeo.ai.pipes.events.DirtyEventListener;
import org.nuxeo.ai.pipes.events.DynamicEventListenerDescriptor;
import org.nuxeo.ai.pipes.events.EventConsumer;
import org.nuxeo.ai.pipes.functions.BinaryTextListener;
import org.nuxeo.ai.pipes.services.BinaryTextDescriptor;
import org.nuxeo.ai.pipes.services.PipeDescriptor;
import org.nuxeo.ai.pipes.services.PipelineService;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.event.EventListener;
import org.nuxeo.ecm.core.event.EventService;
import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.metrics.MetricsService;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentInstance;
import org.nuxeo.runtime.model.DefaultComponent;
import org.nuxeo.runtime.stream.LogConfigDescriptor;
import org.nuxeo.runtime.stream.StreamService;

public class PipelineServiceImpl
extends DefaultComponent
implements PipelineService {
    public static final String ROUTE_AP = "pipes";
    public static final String TEXT_AP = "text";
    public static final String PIPES_CONFIG = "nuxeo.ai.stream.config.name";
    private static final Log log = LogFactory.getLog(PipelineServiceImpl.class);
    protected final Map<String, PipeDescriptor> configs = new HashMap<String, PipeDescriptor>();
    protected final List<BinaryTextDescriptor> textConfigs = new ArrayList<BinaryTextDescriptor>();
    protected final List<EventListenerDescriptor> listenerDescriptors = new ArrayList<EventListenerDescriptor>();
    protected final Map<String, LogAppenderConsumer> logAppenderConsumers = new HashMap<String, LogAppenderConsumer>();
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate((String)MetricsService.class.getName());
    private String pipeConfigName;

    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
        if (ROUTE_AP.equals(extensionPoint)) {
            PipeDescriptor descriptor = (PipeDescriptor)contribution;
            PipeDescriptor original = this.configs.get(descriptor.id);
            if (original != null) {
                original.merge(descriptor);
                descriptor = original;
            }
            descriptor.validate();
            this.configs.put(descriptor.id, descriptor);
        } else if (TEXT_AP.equals(extensionPoint)) {
            BinaryTextDescriptor descriptor = (BinaryTextDescriptor)contribution;
            this.textConfigs.add(descriptor);
        }
    }

    public void start(ComponentContext context) {
        super.start(context);
        this.pipeConfigName = Framework.getProperty((String)PIPES_CONFIG, (String)ROUTE_AP);
        this.configs.forEach((key, value) -> this.addPipe((PipeDescriptor)value));
        this.textConfigs.forEach(d -> d.consumer.streams.forEach(s -> this.addBinaryTextListener(d.eventName, s.name, s.size, d.propertyName, d.windowSize)));
    }

    public void stop(ComponentContext context) throws InterruptedException {
        super.stop(context);
        EventService eventService = (EventService)Framework.getService(EventService.class);
        this.listenerDescriptors.forEach(arg_0 -> ((EventService)eventService).removeEventListener(arg_0));
        this.logAppenderConsumers.values().forEach(LogAppenderConsumer::close);
    }

    protected List<Consumer<Record>> getConsumers(PipeDescriptor descriptor) {
        ArrayList<Consumer<Record>> consumers = new ArrayList<Consumer<Record>>();
        List<LogConfigDescriptor.StreamDescriptor> streams = descriptor.consumer.streams;
        streams.forEach(s -> consumers.add(this.addLogConsumer(s.name, s.size)));
        return consumers;
    }

    @Override
    public void addPipe(PipeDescriptor descriptor) {
        if (descriptor != null && descriptor.enabled) {
            descriptor.supplier.events.forEach(e -> {
                List<Consumer<Record>> consumers = this.getConsumers(descriptor);
                if (descriptor.hasDirtyCheckFilter((PipeDescriptor.PipeEvent)e)) {
                    this.addDirtyCheckListener("beforeDocumentModification", e.options);
                }
                consumers.forEach(consumer -> {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)String.format("Listening for %s event and sending it to %s", e, consumer.toString()));
                    }
                    this.addEventPipe(e.name, descriptor.id, descriptor.getFunction((PipeDescriptor.PipeEvent)e), descriptor.isAsync, (Consumer)consumer);
                });
            });
        }
    }

    protected void addDirtyCheckListener(String eventName, Map<String, String> options) {
        DirtyEventListener dirtyEventListener = new DirtyEventListener(options);
        this.addEventListener(eventName, false, dirtyEventListener);
    }

    @Override
    public void addBinaryTextListener(String eventName, String logName, int partitions, String propertyName, int windowSizeSeconds) {
        this.addLogConsumer(logName, partitions);
        BinaryTextListener textListener = new BinaryTextListener(logName, propertyName, windowSizeSeconds);
        this.addEventListener(eventName, false, textListener);
    }

    @Override
    public Consumer<Record> getConsumer(String logName) {
        return this.logAppenderConsumers.get(logName);
    }

    @Override
    public <R> void addEventPipe(String eventName, String supplierId, Function<Event, Collection<R>> eventFunction, boolean isAsync, Consumer<R> consumer) {
        EventConsumer<R> eventConsumer = new EventConsumer<R>(eventFunction, consumer);
        NuxeoMetricSet pipeMetrics = new NuxeoMetricSet("nuxeo", new String[]{"ai", "streams", eventName, supplierId});
        try {
            eventConsumer.withMetrics(pipeMetrics);
            this.registry.registerAll((MetricSet)pipeMetrics);
        }
        catch (IllegalArgumentException e) {
            log.warn((Object)String.format("Metrics are already registered for %s %s, do you have a duplicate definition?", eventName, supplierId));
        }
        this.addEventListener(eventName, isAsync, eventConsumer);
    }

    @Override
    public void addEventListener(String eventName, boolean isAsync, EventListener eventConsumer) {
        EventService eventService = (EventService)Framework.getService(EventService.class);
        DynamicEventListenerDescriptor listenerDescriptor = new DynamicEventListenerDescriptor(eventName, eventConsumer, isAsync);
        this.listenerDescriptors.add(listenerDescriptor);
        eventService.addEventListener((EventListenerDescriptor)listenerDescriptor);
    }

    protected LogAppenderConsumer addLogConsumer(String logName, int size) {
        LogManager manager = ((StreamService)Framework.getService(StreamService.class)).getLogManager(this.pipeConfigName);
        manager.createIfNotExists(logName, size);
        CloseableLogAppender appender = (CloseableLogAppender)manager.getAppender(logName);
        LogAppenderConsumer consumer = new LogAppenderConsumer((CloseableLogAppender<Record>)appender);
        this.logAppenderConsumers.put(logName, consumer);
        return consumer;
    }
}

