package org.nuxeo.ai.pipes.services;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ai.pipes.consumers.LogAppenderConsumer;
import org.nuxeo.ai.pipes.events.DirtyEventListener;
import org.nuxeo.ai.pipes.events.DynamicEventListenerDescriptor;
import org.nuxeo.ai.pipes.events.EventConsumer;
import org.nuxeo.ai.pipes.functions.BinaryTextListener;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.event.EventListener;
import org.nuxeo.ecm.core.event.EventService;
import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.metrics.MetricsService;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentInstance;
import org.nuxeo.runtime.model.DefaultComponent;
import org.nuxeo.runtime.stream.StreamService;

/* loaded from: input_file:org/nuxeo/ai/pipes/services/PipelineServiceImpl.class */
public class PipelineServiceImpl extends DefaultComponent implements PipelineService {
    public static final String ROUTE_AP = "pipes";
    public static final String TEXT_AP = "text";
    public static final String PIPES_CONFIG = "nuxeo.ai.stream.config.name";
    private static final Log log = LogFactory.getLog(PipelineServiceImpl.class);
    protected final Map<String, PipeDescriptor> configs = new HashMap();
    protected final List<BinaryTextDescriptor> textConfigs = new ArrayList();
    protected final List<EventListenerDescriptor> listenerDescriptors = new ArrayList();
    protected final Map<String, LogAppenderConsumer> logAppenderConsumers = new HashMap();
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
    private String pipeConfigName;

    public void registerContribution(Object obj, String str, ComponentInstance componentInstance) {
        if (!ROUTE_AP.equals(str)) {
            if (TEXT_AP.equals(str)) {
                this.textConfigs.add((BinaryTextDescriptor) obj);
                return;
            }
            return;
        }
        PipeDescriptor pipeDescriptor = (PipeDescriptor) obj;
        PipeDescriptor pipeDescriptor2 = this.configs.get(pipeDescriptor.id);
        if (pipeDescriptor2 != null) {
            pipeDescriptor2.merge(pipeDescriptor);
            pipeDescriptor = pipeDescriptor2;
        }
        pipeDescriptor.validate();
        this.configs.put(pipeDescriptor.id, pipeDescriptor);
    }

    public void start(ComponentContext componentContext) {
        super.start(componentContext);
        this.pipeConfigName = Framework.getProperty(PIPES_CONFIG, ROUTE_AP);
        this.configs.forEach((str, pipeDescriptor) -> {
            addPipe(pipeDescriptor);
        });
        this.textConfigs.forEach(binaryTextDescriptor -> {
            binaryTextDescriptor.consumer.streams.forEach(streamDescriptor -> {
                addBinaryTextListener(binaryTextDescriptor.eventName, streamDescriptor.name, streamDescriptor.size.intValue(), binaryTextDescriptor.propertyName, binaryTextDescriptor.inputPropertyName, binaryTextDescriptor.windowSize);
            });
        });
    }

    public void stop(ComponentContext componentContext) throws InterruptedException {
        super.stop(componentContext);
        EventService eventService = (EventService) Framework.getService(EventService.class);
        List<EventListenerDescriptor> list = this.listenerDescriptors;
        eventService.getClass();
        list.forEach(eventService::removeEventListener);
        this.logAppenderConsumers.values().forEach((v0) -> {
            v0.close();
        });
    }

    protected List<Consumer<Record>> getConsumers(PipeDescriptor pipeDescriptor) {
        ArrayList arrayList = new ArrayList();
        pipeDescriptor.consumer.streams.forEach(streamDescriptor -> {
            arrayList.add(addLogConsumer(streamDescriptor.name, streamDescriptor.size.intValue()));
        });
        return arrayList;
    }

    @Override // org.nuxeo.ai.pipes.services.PipelineService
    public void addPipe(PipeDescriptor pipeDescriptor) {
        if (pipeDescriptor == null || !pipeDescriptor.enabled) {
            return;
        }
        pipeDescriptor.supplier.events.forEach(pipeEvent -> {
            List<Consumer<Record>> consumers = getConsumers(pipeDescriptor);
            if (pipeDescriptor.hasDirtyCheckFilter(pipeEvent)) {
                addDirtyCheckListener(DirtyEventListener.DIRTY_EVENT_NAME, pipeEvent.options);
            }
            consumers.forEach(consumer -> {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Listening for %s event and sending it to %s", pipeEvent, consumer.toString()));
                }
                addEventPipe(pipeEvent.name, pipeDescriptor.id, pipeDescriptor.getFunction(pipeEvent), pipeDescriptor.isAsync.booleanValue(), consumer);
            });
        });
    }

    protected void addDirtyCheckListener(String str, Map<String, String> map) {
        addEventListener(str, false, new DirtyEventListener(map));
    }

    @Override // org.nuxeo.ai.pipes.services.PipelineService
    public void addBinaryTextListener(String str, String str2, int i, String str3, String str4, int i2) {
        addLogConsumer(str2, i);
        addEventListener(str, false, new BinaryTextListener(str2, str3, str4, i2));
    }

    @Override // org.nuxeo.ai.pipes.services.PipelineService
    public Consumer<Record> getConsumer(String str) {
        return this.logAppenderConsumers.get(str);
    }

    @Override // org.nuxeo.ai.pipes.services.PipelineService
    public <R> void addEventPipe(String str, String str2, Function<Event, Collection<R>> function, boolean z, Consumer<R> consumer) {
        EventConsumer eventConsumer = new EventConsumer(function, consumer);
        NuxeoMetricSet nuxeoMetricSet = new NuxeoMetricSet("nuxeo", new String[]{"ai", "streams", str, str2});
        try {
            eventConsumer.withMetrics(nuxeoMetricSet);
            this.registry.registerAll(nuxeoMetricSet);
        } catch (IllegalArgumentException e) {
            log.warn(String.format("Metrics are already registered for %s %s, do you have a duplicate definition?", str, str2));
        }
        addEventListener(str, z, eventConsumer);
    }

    @Override // org.nuxeo.ai.pipes.services.PipelineService
    public void addEventListener(String str, boolean z, EventListener eventListener) {
        EventService eventService = (EventService) Framework.getService(EventService.class);
        DynamicEventListenerDescriptor dynamicEventListenerDescriptor = new DynamicEventListenerDescriptor(str, eventListener, z);
        this.listenerDescriptors.add(dynamicEventListenerDescriptor);
        eventService.addEventListener(dynamicEventListenerDescriptor);
    }

    protected LogAppenderConsumer addLogConsumer(String str, int i) {
        LogManager logManager = ((StreamService) Framework.getService(StreamService.class)).getLogManager(this.pipeConfigName);
        logManager.createIfNotExists(str, i);
        LogAppenderConsumer logAppenderConsumer = new LogAppenderConsumer(logManager.getAppender(str, ((CodecService) Framework.getService(CodecService.class)).getCodec("avro", Record.class)));
        this.logAppenderConsumers.put(str, logAppenderConsumer);
        return logAppenderConsumer;
    }
}
