/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ai.pipes.streams;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.metrics.MetricsService;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;

public class FunctionStreamProcessor {
    public static final String STREAM_IN = "source";
    public static final String STREAM_OUT = "sink";
    private static final Log log = LogFactory.getLog(FunctionStreamProcessor.class);

    public static List<String> getStreamsList(String streamIn, String streamOut) {
        ArrayList<String> streams = new ArrayList<String>(1);
        if (StringUtils.isBlank((CharSequence)streamIn)) {
            throw new IllegalArgumentException("You must define a streamIn stream");
        }
        streams.add("i1:" + streamIn);
        if (StringUtils.isNotBlank((CharSequence)streamOut)) {
            String[] outStreams = streamOut.split(",");
            int i = 1;
            for (String outStream : outStreams) {
                streams.add("o" + i++ + ":" + outStream);
            }
        }
        return streams;
    }

    public static String buildName(String simpleName, String streamIn, String streamOut) {
        String in = streamIn != null ? streamIn + "$" : "";
        String out = streamOut != null ? "$" + streamOut : "";
        return in + simpleName + out;
    }

    public Topology getTopology(Function<Record, Optional<Record>> function, Map<String, String> options) {
        String streamIn = options.get(STREAM_IN);
        String streamOut = options.get(STREAM_OUT);
        List<String> streams = FunctionStreamProcessor.getStreamsList(streamIn, streamOut);
        String computationName = FunctionStreamProcessor.buildName(function.getClass().getSimpleName(), streamIn, streamOut);
        FunctionMetrics metrics = FunctionStreamProcessor.registerMetrics(new FunctionMetrics(computationName), computationName);
        return Topology.builder().addComputation(() -> new FunctionComputation(streams.size() - 1, computationName, metrics, function), streams).build();
    }

    public static <T extends NuxeoMetricSet> T registerMetrics(T metrics, String name) {
        try {
            MetricRegistry registry = SharedMetricRegistries.getOrCreate((String)MetricsService.class.getName());
            registry.registerAll(metrics);
        }
        catch (IllegalArgumentException e) {
            log.warn((Object)String.format("Metrics are already registered for %s. They will only be recorded again after a full restart.", name));
        }
        return metrics;
    }

    public static class FunctionMetrics
    extends NuxeoMetricSet {
        protected long called = 0L;
        protected long errors = 0L;
        protected long produced = 0L;

        public FunctionMetrics(String name) {
            super("nuxeo", new String[]{"ai", "streams", "func", name});
            this.putGauge(() -> this.called, "called", new String[0]);
            this.putGauge(() -> this.errors, "errors", new String[0]);
            this.putGauge(() -> this.produced, "produced", new String[0]);
        }

        public void called() {
            ++this.called;
        }

        public void error() {
            ++this.errors;
        }

        public void produced() {
            ++this.produced;
        }
    }

    public static class FunctionComputation
    extends AbstractComputation {
        protected final FunctionMetrics metrics;
        protected final Function<Record, Optional<Record>> function;

        public FunctionComputation(int outputStreams, String name, FunctionMetrics metrics, Function<Record, Optional<Record>> function) {
            super(name, 1, outputStreams);
            this.metrics = metrics;
            this.function = function;
        }

        public void init(ComputationContext context) {
            log.debug((Object)String.format("Starting computation for %s", this.metadata.name()));
        }

        public void processRecord(ComputationContext context, String inputStreamName, Record record) {
            this.metrics.called();
            if (log.isDebugEnabled()) {
                log.debug((Object)("Processing record " + record));
            }
            try {
                Optional<Record> applied = this.function.apply(record);
                applied.ifPresent(rec -> this.writeToStreams(context, (Record)rec));
                context.askForCheckpoint();
            }
            catch (NuxeoException e) {
                log.error((Object)("Discard invalid record: " + record), (Throwable)e);
                this.metrics.error();
            }
        }

        public void destroy() {
            log.debug((Object)String.format("Destroy computation: %s", this.metadata.name()));
        }

        protected void writeToStreams(ComputationContext context, Record record) {
            if (record != null && !this.metadata.outputStreams().isEmpty()) {
                this.metrics.produced();
                this.metadata.outputStreams().forEach(o -> context.produceRecord(o, record));
            }
        }
    }
}

