package org.nuxeo.ai.pipes.streams;

import com.codahale.metrics.SharedMetricRegistries;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.metrics.MetricsService;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;

/* loaded from: input_file:org/nuxeo/ai/pipes/streams/FunctionStreamProcessor.class */
public class FunctionStreamProcessor {
    public static final String STREAM_IN = "source";
    public static final String STREAM_OUT = "sink";
    private static final Log log = LogFactory.getLog(FunctionStreamProcessor.class);

    /* loaded from: input_file:org/nuxeo/ai/pipes/streams/FunctionStreamProcessor$FunctionComputation.class */
    public static class FunctionComputation extends AbstractComputation {
        protected final FunctionMetrics metrics;
        protected final Function<Record, Optional<Record>> function;

        public FunctionComputation(int i, String str, FunctionMetrics functionMetrics, Function<Record, Optional<Record>> function) {
            super(str, 1, i);
            this.metrics = functionMetrics;
            this.function = function;
        }

        public void init(ComputationContext computationContext) {
            FunctionStreamProcessor.log.debug(String.format("Starting computation for %s", this.metadata.name()));
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            this.metrics.called();
            if (FunctionStreamProcessor.log.isDebugEnabled()) {
                FunctionStreamProcessor.log.debug("Processing record " + record);
            }
            try {
                this.function.apply(record).ifPresent(record2 -> {
                    writeToStreams(computationContext, record2);
                });
                computationContext.askForCheckpoint();
            } catch (NuxeoException e) {
                FunctionStreamProcessor.log.error("Discard invalid record: " + record, e);
                this.metrics.error();
            }
        }

        public void destroy() {
            FunctionStreamProcessor.log.debug(String.format("Destroy computation: %s", this.metadata.name()));
        }

        protected void writeToStreams(ComputationContext computationContext, Record record) {
            if (record == null || this.metadata.outputStreams().isEmpty()) {
                return;
            }
            this.metrics.produced();
            this.metadata.outputStreams().forEach(str -> {
                computationContext.produceRecord(str, record);
            });
        }
    }

    /* loaded from: input_file:org/nuxeo/ai/pipes/streams/FunctionStreamProcessor$FunctionMetrics.class */
    public static class FunctionMetrics extends NuxeoMetricSet {
        protected long called;
        protected long errors;
        protected long produced;

        public FunctionMetrics(String str) {
            super("nuxeo", new String[]{"ai", "streams", "func", str});
            this.called = 0L;
            this.errors = 0L;
            this.produced = 0L;
            putGauge(() -> {
                return Long.valueOf(this.called);
            }, "called", new String[0]);
            putGauge(() -> {
                return Long.valueOf(this.errors);
            }, "errors", new String[0]);
            putGauge(() -> {
                return Long.valueOf(this.produced);
            }, "produced", new String[0]);
        }

        public void called() {
            this.called++;
        }

        public void error() {
            this.errors++;
        }

        public void produced() {
            this.produced++;
        }
    }

    public static List<String> getStreamsList(String str, String str2) {
        ArrayList arrayList = new ArrayList(1);
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("You must define a streamIn stream");
        }
        arrayList.add("i1:" + str);
        if (StringUtils.isNotBlank(str2)) {
            int i = 1;
            for (String str3 : str2.split(",")) {
                int i2 = i;
                i++;
                arrayList.add("o" + i2 + ":" + str3);
            }
        }
        return arrayList;
    }

    public static String buildName(String str, String str2, String str3) {
        return (str2 != null ? str2 + "$" : "") + str + (str3 != null ? "$" + str3 : "");
    }

    public Topology getTopology(Function<Record, Optional<Record>> function, Map<String, String> map) {
        String str = map.get(STREAM_IN);
        String str2 = map.get(STREAM_OUT);
        List<String> streamsList = getStreamsList(str, str2);
        String buildName = buildName(function.getClass().getSimpleName(), str, str2);
        FunctionMetrics functionMetrics = (FunctionMetrics) registerMetrics(new FunctionMetrics(buildName), buildName);
        return Topology.builder().addComputation(() -> {
            return new FunctionComputation(streamsList.size() - 1, buildName, functionMetrics, function);
        }, streamsList).build();
    }

    public static <T extends NuxeoMetricSet> T registerMetrics(T t, String str) {
        try {
            SharedMetricRegistries.getOrCreate(MetricsService.class.getName()).registerAll(t);
        } catch (IllegalArgumentException e) {
            log.warn(String.format("Metrics are already registered for %s. They will only be recorded again after a full restart.", str));
        }
        return t;
    }
}
