package org.nuxeo.lib.stream.tools.command;

import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteSender;
import com.codahale.metrics.graphite.GraphiteUDP;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;

/* loaded from: input_file:org/nuxeo/lib/stream/tools/command/LatencyMonitorComputation.class */
public class LatencyMonitorComputation extends LatencyTrackerComputation {
    private static final Log log = LogFactory.getLog(LatencyMonitorComputation.class);
    protected final String host;
    protected final int port;
    protected final boolean udp;
    protected final String basePrefix;
    protected GraphiteSender graphite;

    public LatencyMonitorComputation(LogManager logManager, List<String> list, String str, int i, boolean z, String str2, String str3, int i2, int i3, boolean z2, Codec<Record> codec) {
        super(logManager, list, str3, i2, i3, z2, codec);
        this.host = str;
        this.port = i;
        this.udp = z;
        this.basePrefix = str2;
    }

    @Override // org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation, org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void init(ComputationContext computationContext) {
        super.init(computationContext);
        if (this.udp) {
            this.graphite = new GraphiteUDP(this.host, this.port);
        } else {
            this.graphite = new Graphite(this.host, this.port);
        }
        try {
            this.graphite.connect();
        } catch (IOException e) {
            throw new IllegalStateException("Fail to connect to " + this.host + ":" + this.port, e);
        }
    }

    @Override // org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation
    protected void processLatencies(ComputationContext computationContext, LogPartitionGroup logPartitionGroup, List<Latency> list) {
        publishMetrics(Latency.of(list), String.format("%s%s.%s.all.", this.basePrefix, logPartitionGroup.group, logPartitionGroup.name));
        for (int i = 0; i < list.size(); i++) {
            publishMetrics(list.get(i), String.format("%s%s.%s.p%02d.", this.basePrefix, logPartitionGroup.group, logPartitionGroup.name, Integer.valueOf(i)));
        }
    }

    protected void publishMetrics(Latency latency, String str) {
        if (this.verbose) {
            log.info(latency.toString());
        }
        long upper = latency.upper() / 1000;
        try {
            this.graphite.send(str + "lag", Long.toString(latency.lag().lag()), upper);
            this.graphite.send(str + "end", Long.toString(latency.lag().upper()), upper);
            this.graphite.send(str + "pos", Long.toString(latency.lag().lower()), upper);
            this.graphite.send(str + "latency", Long.toString(latency.latency()), upper);
        } catch (IOException e) {
            log.error("Fail to send metric to graphite " + str + " " + latency, e);
        }
    }

    @Override // org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation, org.nuxeo.lib.stream.computation.Computation
    public void destroy() {
        super.destroy();
        if (this.graphite != null) {
            try {
                this.graphite.close();
            } catch (IOException e) {
                log.debug("Error when closing graphite socket: ", e);
            }
        }
        this.graphite = null;
    }
}
