package org.nuxeo.runtime.stream;

import io.dropwizard.metrics5.MetricName;
import io.dropwizard.metrics5.MetricRegistry;
import io.dropwizard.metrics5.SharedMetricRegistries;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.nuxeo.lib.stream.codec.AvroMessageCodec;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/runtime/stream/StreamMetricsComputation.class */
public class StreamMetricsComputation extends AbstractComputation {
    private static final Logger log = LogManager.getLogger(StreamMetricsComputation.class);
    protected static final String NAME = "stream/metrics";
    protected MetricRegistry registry;
    protected final long intervalMs;
    protected final List<String> inputStreams;
    protected final List<Name> streams;
    protected final Set<Name> invalidStreams;
    protected final List<LogPartitionGroup> groups;
    protected final List<LatencyMetric> metrics;
    protected org.nuxeo.lib.stream.log.LogManager manager;
    protected final Codec<Record> codec;
    protected long refreshGroupCounter;

    /* loaded from: input_file:org/nuxeo/runtime/stream/StreamMetricsComputation$LatencyMetric.class */
    public static class LatencyMetric {
        public static final Object PREFIX = "nuxeo.streams.global.stream.group.";
        protected final LogPartitionGroup consumer;
        protected final MetricRegistry registry;
        protected final MetricName endMetric = getMetricName("end");
        protected final MetricName posMetric = getMetricName("pos");
        protected final MetricName lagMetric = getMetricName("lag");
        protected final MetricName latMetric = getMetricName("latency");
        protected Latency latency;
        protected boolean registered;

        public LatencyMetric(LogPartitionGroup logPartitionGroup, MetricRegistry metricRegistry) {
            this.consumer = logPartitionGroup;
            this.registry = metricRegistry;
        }

        protected MetricName getMetricName(String str) {
            return MetricName.build(new String[]{PREFIX + str}).tagged(new String[]{"stream", this.consumer.name.getId()}).tagged(new String[]{"group", this.consumer.group.getId()});
        }

        protected void registerMetrics() {
            this.registry.register(this.endMetric, () -> {
                return Long.valueOf(this.latency.lag().upper());
            });
            this.registry.register(this.posMetric, () -> {
                return Long.valueOf(this.latency.lag().lower());
            });
            this.registry.register(this.lagMetric, () -> {
                return Long.valueOf(this.latency.lag().lag());
            });
            this.registry.register(this.latMetric, () -> {
                return Long.valueOf(this.latency.latency());
            });
        }

        protected void unregisterMetrics() {
            this.registry.remove(this.endMetric);
            this.registry.remove(this.posMetric);
            this.registry.remove(this.lagMetric);
            this.registry.remove(this.latMetric);
        }

        public boolean update(org.nuxeo.lib.stream.log.LogManager logManager, Codec<Record> codec) {
            try {
                this.latency = logManager.getLatency(this.consumer.name, this.consumer.group, codec, record -> {
                    return Long.valueOf(Watermark.ofValue(record.getWatermark()).getTimestamp());
                }, (v0) -> {
                    return v0.getKey();
                });
                if (!this.registered) {
                    registerMetrics();
                    this.registered = true;
                }
                return false;
            } catch (Exception e) {
                if (!(e.getCause() instanceof ClassNotFoundException) && !(e.getCause() instanceof ClassCastException) && !(e instanceof IllegalStateException) && !(e instanceof IllegalArgumentException)) {
                    throw e;
                }
                StreamMetricsComputation.log.warn("Invalid stream, cannot get latency: " + this.consumer, e);
                return true;
            }
        }

        public void destroy() {
            unregisterMetrics();
        }

        public Name getStream() {
            return this.consumer.getLogPartition().name();
        }
    }

    public StreamMetricsComputation(Duration duration, List<String> list) {
        super(NAME, 1, 0);
        this.registry = SharedMetricRegistries.getOrCreate("org.nuxeo.runtime.metrics.MetricsService");
        this.streams = new ArrayList();
        this.invalidStreams = new HashSet();
        this.groups = new ArrayList();
        this.metrics = new ArrayList();
        this.codec = new AvroMessageCodec(Record.class);
        this.intervalMs = duration.toMillis();
        this.inputStreams = list;
    }

    public void init(ComputationContext computationContext) {
        if (computationContext.isSpareComputation()) {
            log.info("Spare instance nothing to report");
            unregisterMetrics();
        } else {
            log.warn("Instance elected to report stream metrics");
            computationContext.setTimer("tracker", System.currentTimeMillis() + this.intervalMs);
        }
    }

    public void destroy() {
        unregisterMetrics();
    }

    protected void registerMetrics() {
        unregisterMetrics();
        getGroups().forEach(logPartitionGroup -> {
            this.metrics.add(new LatencyMetric(logPartitionGroup, this.registry));
        });
    }

    protected void unregisterMetrics() {
        this.metrics.forEach((v0) -> {
            v0.destroy();
        });
        this.metrics.clear();
    }

    public void processTimer(ComputationContext computationContext, String str, long j) {
        refreshMetricsIfNeeded();
        Logger logger = log;
        List<LatencyMetric> list = this.metrics;
        Objects.requireNonNull(list);
        logger.debug("start update metrics: {}", new Supplier[]{list::size});
        List list2 = (List) this.metrics.stream().filter(latencyMetric -> {
            return latencyMetric.update(getManager(), this.codec);
        }).collect(Collectors.toList());
        list2.forEach((v0) -> {
            v0.destroy();
        });
        list2.forEach(latencyMetric2 -> {
            this.invalidStreams.add(latencyMetric2.getStream());
        });
        this.metrics.removeAll(list2);
        computationContext.setTimer("tracker", System.currentTimeMillis() + this.intervalMs);
    }

    /*  JADX ERROR: Failed to decode insn: 0x002B: MOVE_MULTI, method: org.nuxeo.runtime.stream.StreamMetricsComputation.refreshMetricsIfNeeded():void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    protected void refreshMetricsIfNeeded() {
        /*
            r6 = this;
            r0 = r6
            java.util.List<org.nuxeo.lib.stream.log.Name> r0 = r0.streams
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L38
            r0 = r6
            java.util.List<org.nuxeo.lib.stream.log.internals.LogPartitionGroup> r0 = r0.groups
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L38
            r0 = r6
            java.util.List<org.nuxeo.runtime.stream.StreamMetricsComputation$LatencyMetric> r0 = r0.metrics
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L38
            r0 = r6
            r1 = r0
            long r1 = r1.refreshGroupCounter
            r2 = 1
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.refreshGroupCounter = r1
            r0 = 5
            long r-1 = r-1 % r0
            r0 = 0
            int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
            if (r-1 != 0) goto L4e
            r0 = r6
            java.util.List<org.nuxeo.lib.stream.log.Name> r0 = r0.streams
            r0.clear()
            r0 = r6
            java.util.List<org.nuxeo.lib.stream.log.internals.LogPartitionGroup> r0 = r0.groups
            r0.clear()
            r0 = r6
            r0.registerMetrics()
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.nuxeo.runtime.stream.StreamMetricsComputation.refreshMetricsIfNeeded():void");
    }

    protected List<Name> getStreams() {
        if (this.streams.isEmpty()) {
            if (this.inputStreams == null || this.inputStreams.isEmpty()) {
                this.streams.addAll(getManager().listAllNames());
                log.debug("Use all available streams: {}", this.streams);
            } else {
                this.inputStreams.forEach(str -> {
                    this.streams.add(Name.ofUrn(str));
                });
                log.debug("Use input streams: {}", this.streams);
            }
            if (!this.invalidStreams.isEmpty()) {
                this.streams.removeAll(this.invalidStreams);
                log.debug("Filtered list of streams: {}", this.streams);
            }
        }
        return this.streams;
    }

    protected List<LogPartitionGroup> getGroups() {
        if (this.groups.isEmpty()) {
            getStreams().forEach(name -> {
                getManager().listConsumerGroups(name).forEach(name -> {
                    this.groups.add(new LogPartitionGroup(name, name, 0));
                });
            });
            log.info("Update list of consumers: {}", this.groups);
        }
        return this.groups;
    }

    protected org.nuxeo.lib.stream.log.LogManager getManager() {
        if (this.manager == null) {
            this.manager = ((StreamService) Framework.getService(StreamService.class)).getLogManager();
        }
        return this.manager;
    }

    public void processRecord(ComputationContext computationContext, String str, Record record) {
    }
}
