package org.nuxeo.runtime.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.RetryPolicy;
import org.nuxeo.common.xmap.annotation.XNode;
import org.nuxeo.common.xmap.annotation.XNodeList;
import org.nuxeo.common.xmap.annotation.XNodeMap;
import org.nuxeo.common.xmap.annotation.XObject;
import org.nuxeo.common.xmap.registry.XEnable;
import org.nuxeo.common.xmap.registry.XRegistry;
import org.nuxeo.common.xmap.registry.XRegistryId;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
import org.nuxeo.lib.stream.computation.RecordFilter;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.AvroCodecFactory;

@XRegistry(enable = false, compatWarnOnMerge = true)
@XObject("streamProcessor")
/* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor.class */
public class StreamProcessorDescriptor {

    @XNode(value = "@enable", fallback = "@enabled", defaultAssignment = "true")
    @XEnable
    protected boolean isEnabled;

    @XNode(value = "@start", defaultAssignment = "true")
    protected boolean start;

    @XNode("@name")
    @XRegistryId
    public String name;

    @XNode("@logConfig")
    public String config;

    @XNode("@class")
    public Class<? extends StreamProcessorTopology> klass;

    @XNode("@defaultCodec")
    public String defaultCodec;

    @XNode("@defaultExternal")
    public boolean defaultExternal;
    protected ComputationPolicy defaultPolicy;
    public static String RECOVERY_SKIP_FIRST_FAILURES_OPTION = "nuxeo.stream.recovery.skipFirstFailures";
    public static final Integer DEFAULT_CONCURRENCY = 4;

    @XNode("@defaultConcurrency")
    public Integer defaultConcurrency = DEFAULT_CONCURRENCY;

    @XNode("@defaultPartitions")
    public Integer defaultPartitions = DEFAULT_CONCURRENCY;

    @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class)
    public Map<String, String> options = new HashMap();

    @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class)
    public List<ComputationDescriptor> computations = new ArrayList();

    @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class)
    public List<StreamDescriptor> streams = new ArrayList();

    @XNodeList(value = "policy", type = ArrayList.class, componentType = PolicyDescriptor.class)
    public List<PolicyDescriptor> policies = new ArrayList();

    @XObject("computation")
    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor$ComputationDescriptor.class */
    public static class ComputationDescriptor {

        @XNode("@name")
        public String name;

        @XNode("@concurrency")
        public Integer concurrency = StreamProcessorDescriptor.DEFAULT_CONCURRENCY;

        public String getId() {
            return this.name;
        }
    }

    @XObject("filter")
    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor$FilterDescriptor.class */
    public static class FilterDescriptor {

        @XNode("@name")
        public String name;

        @XNode("@class")
        public Class<? extends RecordFilter> klass;

        @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class)
        public Map<String, String> options = new HashMap();

        public String getId() {
            return this.name;
        }

        public RecordFilter getFilter() {
            if (!RecordFilter.class.isAssignableFrom(this.klass)) {
                throw new IllegalArgumentException("Cannot create filter: " + getId() + " for stream: " + getId() + ", class must implement Filter");
            }
            try {
                RecordFilter newInstance = this.klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                newInstance.init(this.options);
                return newInstance;
            } catch (ReflectiveOperationException e) {
                throw new StreamRuntimeException("Cannot create filter: " + getId(), e);
            }
        }
    }

    @XObject("policy")
    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor$PolicyDescriptor.class */
    public static class PolicyDescriptor {
        public static final int DEFAULT_MAX_RETRIES = 0;
        public static final Duration DEFAULT_DELAY = Duration.ofSeconds(1);
        public static final Duration DEFAULT_MAX_DELAY = Duration.ofSeconds(10);
        public static final Integer DEFAULT_BATCH_CAPACITY = 1;
        public static final Duration DEFAULT_BATCH_THRESHOLD = Duration.ofSeconds(1);

        @XNode("@name")
        public String name;

        @XNode("@class")
        public Class<? extends StreamComputationPolicy> klass;

        @XNode("@maxRetries")
        public Integer maxRetries = 0;

        @XNode("@delay")
        public Duration delay = DEFAULT_DELAY;

        @XNode("@maxDelay")
        public Duration maxDelay = DEFAULT_MAX_DELAY;

        @XNode("@continueOnFailure")
        public Boolean continueOnFailure = Boolean.FALSE;

        @XNode("@skipFirstFailures")
        public Integer skipFirstFailures = 0;

        @XNode("@batchCapacity")
        public Integer batchCapacity = DEFAULT_BATCH_CAPACITY;

        @XNode("@batchThreshold")
        public Duration batchThreshold = DEFAULT_BATCH_THRESHOLD;

        public String getId() {
            return this.name;
        }

        protected int getSkipFirstFailures() {
            return Integer.parseInt(Framework.getProperty(StreamProcessorDescriptor.RECOVERY_SKIP_FIRST_FAILURES_OPTION, Integer.toString(this.skipFirstFailures.intValue())));
        }

        public ComputationPolicyBuilder createPolicyBuilder() {
            return new ComputationPolicyBuilder().retryPolicy(new RetryPolicy().withMaxRetries(this.maxRetries.intValue()).withBackoff(this.delay.toMillis(), this.maxDelay.toMillis(), TimeUnit.MILLISECONDS)).batchPolicy(this.batchCapacity.intValue(), this.batchThreshold).continueOnFailure(this.continueOnFailure.booleanValue()).skipFirstFailures(getSkipFirstFailures());
        }
    }

    @XObject("stream")
    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamProcessorDescriptor$StreamDescriptor.class */
    public static class StreamDescriptor {

        @XNode("@name")
        public String name;

        @XNode("@partitions")
        public Integer partitions;

        @XNode("@codec")
        public String codec;

        @XNode("@external")
        public Boolean external;

        @XNodeList(value = "filter", type = ArrayList.class, componentType = FilterDescriptor.class)
        public List<FilterDescriptor> filters = new ArrayList();

        public String getId() {
            return this.name;
        }
    }

    public ComputationPolicy getPolicy(String str) {
        PolicyDescriptor orElse = this.policies.stream().filter(policyDescriptor -> {
            return str.equals(policyDescriptor.getId());
        }).findFirst().orElse(null);
        if (orElse != null) {
            return getComputationPolicy(orElse);
        }
        return null;
    }

    protected ComputationPolicy getComputationPolicy(PolicyDescriptor policyDescriptor) {
        if (policyDescriptor.klass == null) {
            return new DefaultNuxeoComputationPolicy().getPolicy(policyDescriptor);
        }
        if (!StreamComputationPolicy.class.isAssignableFrom(policyDescriptor.klass)) {
            throw new IllegalArgumentException("Cannot create policy: " + policyDescriptor.getId() + " for processor: " + getId() + ", class must implement StreamComputationPolicy");
        }
        try {
            return policyDescriptor.klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]).getPolicy(policyDescriptor);
        } catch (ReflectiveOperationException e) {
            throw new StreamRuntimeException("Cannot create policy: " + policyDescriptor.getId() + " for processor: " + getId(), e);
        }
    }

    public ComputationPolicy getDefaultPolicy() {
        if (this.defaultPolicy == null) {
            PolicyDescriptor orElse = this.policies.stream().filter(policyDescriptor -> {
                return AvroCodecFactory.DEFAULT_ENCODING.equals(policyDescriptor.getId());
            }).findFirst().orElse(null);
            if (orElse == null) {
                this.defaultPolicy = ComputationPolicy.NONE;
            } else {
                this.defaultPolicy = getComputationPolicy(orElse);
            }
        }
        return this.defaultPolicy;
    }

    public boolean isEnabled() {
        return this.isEnabled;
    }

    public void setEnabled(boolean z) {
        this.isEnabled = z;
    }

    public String getId() {
        return this.name;
    }

    public boolean isStart() {
        return this.start;
    }

    public void setStart(boolean z) {
        this.start = z;
    }
}
