package org.nuxeo.runtime.stream;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.nuxeo.common.xmap.annotation.XNode;
import org.nuxeo.common.xmap.annotation.XNodeList;
import org.nuxeo.common.xmap.annotation.XNodeMap;
import org.nuxeo.common.xmap.annotation.XObject;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.codec.CodecService;

@XObject("streamProcessor")
/* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor.class */
public class StreamProcessorDescriptor {
    public static final Integer DEFAULT_CONCURRENCY = 4;

    @XNode("@name")
    public String name;

    @XNode("@logConfig")
    public String config;

    @XNode("@class")
    public Class<? extends StreamProcessorTopology> klass;

    @XNode("@defaultCodec")
    public String defaultCodec;

    @XNode("@defaultConcurrency")
    public Integer defaultConcurrency = DEFAULT_CONCURRENCY;

    @XNode("@defaultPartitions")
    public Integer defaultPartitions = DEFAULT_CONCURRENCY;

    @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class)
    public Map<String, String> options = new HashMap();

    @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class)
    public List<ComputationDescriptor> computations = new ArrayList(0);

    @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class)
    public List<StreamDescriptor> streams = new ArrayList(0);

    @XObject("computation")
    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor$ComputationDescriptor.class */
    public static class ComputationDescriptor {

        @XNode("@name")
        public String name;

        @XNode("@concurrency")
        public Integer concurrency = StreamProcessorDescriptor.DEFAULT_CONCURRENCY;
    }

    @XObject("stream")
    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor$StreamDescriptor.class */
    public static class StreamDescriptor {

        @XNode("@name")
        public String name;

        @XNode("@partitions")
        public Integer partitions = StreamProcessorDescriptor.DEFAULT_CONCURRENCY;

        @XNode("@codec")
        public String codec;
    }

    public String getName() {
        return this.name;
    }

    public Settings getSettings(CodecService codecService) {
        Settings settings = new Settings(this.defaultConcurrency.intValue(), this.defaultPartitions.intValue(), getDefaultCodec(codecService));
        this.computations.forEach(computationDescriptor -> {
            settings.setConcurrency(computationDescriptor.name, computationDescriptor.concurrency.intValue());
        });
        this.streams.forEach(streamDescriptor -> {
            settings.setPartitions(streamDescriptor.name, streamDescriptor.partitions.intValue());
        });
        this.streams.stream().filter(streamDescriptor2 -> {
            return Objects.nonNull(streamDescriptor2.codec);
        }).forEach(streamDescriptor3 -> {
            settings.setCodec(streamDescriptor3.name, codecService.getCodec(streamDescriptor3.codec, Record.class));
        });
        return settings;
    }

    public Codec<Record> getDefaultCodec(CodecService codecService) {
        if (this.defaultCodec == null) {
            return null;
        }
        return codecService.getCodec(this.defaultCodec, Record.class);
    }

    public Topology getTopology() {
        try {
            return this.klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]).getTopology(this.options);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new StreamRuntimeException("Can not create topology for processor: " + this.name, e);
        }
    }
}
