package org.nuxeo.lib.stream.tools.command;

import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogManager;

/* loaded from: input_file:org/nuxeo/lib/stream/tools/command/LatencyCommand.class */
public class LatencyCommand extends Command {
    private static final Log log = LogFactory.getLog(LatencyCommand.class);
    protected static final String NAME = "latency";
    protected boolean verbose = false;

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public String name() {
        return NAME;
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public void updateOptions(Options options) {
        options.addOption(Option.builder("l").longOpt("log-name").desc("Log name of a stream containing computation.Record").hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder().longOpt("codec").desc("Codec used to read record, can be: java, avro, avroBinary, avroJson").hasArg().argName("CODEC").build());
        options.addOption(Option.builder().longOpt("verbose").desc("Display latency for each partition").build());
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public boolean run(LogManager logManager, CommandLine commandLine) {
        String optionValue = commandLine.getOptionValue("log-name");
        Codec<Record> recordCodec = getRecordCodec(commandLine.getOptionValue("codec"));
        this.verbose = commandLine.hasOption("verbose");
        if (optionValue != null) {
            latency(logManager, optionValue, recordCodec);
            return true;
        }
        latency(logManager, recordCodec);
        return true;
    }

    protected void latency(LogManager logManager, Codec<Record> codec) {
        log.info("# " + logManager);
        Iterator<String> it = logManager.listAll().iterator();
        while (it.hasNext()) {
            latency(logManager, it.next(), codec);
        }
    }

    protected void latency(LogManager logManager, String str, Codec<Record> codec) {
        log.info("## Log: " + str + " partitions: " + logManager.size(str));
        List<String> listConsumerGroups = logManager.listConsumerGroups(str);
        if (this.verbose && listConsumerGroups.isEmpty()) {
            listConsumerGroups.add("tools");
        }
        try {
            listConsumerGroups.forEach(str2 -> {
                renderLatency(str2, logManager.getLatencyPerPartition(str, str2, codec, record -> {
                    return Long.valueOf(Watermark.ofValue(record.getWatermark()).getTimestamp());
                }, (v0) -> {
                    return v0.getKey();
                }));
            });
        } catch (IllegalStateException e) {
            log.error(e.getMessage());
        }
    }

    protected void renderLatency(String str, List<Latency> list) {
        log.info(String.format("### Group: %s", str));
        log.info("| partition | lag | latencyMs | latency | posTimestamp | posDate | curDate | pos | end | posOffset | endOffset | posKey |\n| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |");
        Latency of = Latency.of(list);
        log.info(String.format("|All|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", Long.valueOf(of.lag().lag()), Long.valueOf(of.latency()), formatInterval(of.latency()), Long.valueOf(of.lower()), formatDate(of.lower()), formatDate(of.upper()), Long.valueOf(of.lag().lower()), Long.valueOf(of.lag().upper()), Long.valueOf(of.lag().lowerOffset()), Long.valueOf(of.lag().upperOffset()), of.key()));
        if (!this.verbose || list.size() <= 1) {
            return;
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        list.forEach(latency -> {
            log.info(String.format("|%d|%d|%d|%s|%d|%s|%s|%d|%d|%d|%d|%s|", Integer.valueOf(atomicInteger.getAndIncrement()), Long.valueOf(latency.lag().lag()), Long.valueOf(latency.latency()), formatInterval(latency.latency()), Long.valueOf(latency.lower()), formatDate(latency.lower()), formatDate(latency.upper()), Long.valueOf(latency.lag().lower()), Long.valueOf(latency.lag().upper()), Long.valueOf(latency.lag().lowerOffset()), Long.valueOf(latency.lag().upperOffset()), latency.key()));
        });
    }

    protected String formatDate(long j) {
        return j > 0 ? Instant.ofEpochMilli(j).toString() : "NA";
    }

    protected static String formatInterval(long j) {
        if (j == 0) {
            return "NA";
        }
        long hours = TimeUnit.MILLISECONDS.toHours(j);
        long minutes = TimeUnit.MILLISECONDS.toMinutes(j - TimeUnit.HOURS.toMillis(hours));
        long seconds = TimeUnit.MILLISECONDS.toSeconds((j - TimeUnit.HOURS.toMillis(hours)) - TimeUnit.MINUTES.toMillis(minutes));
        return String.format("%02d:%02d:%02d.%03d", Long.valueOf(hours), Long.valueOf(minutes), Long.valueOf(seconds), Long.valueOf(TimeUnit.MILLISECONDS.toMillis(((j - TimeUnit.HOURS.toMillis(hours)) - TimeUnit.MINUTES.toMillis(minutes)) - TimeUnit.SECONDS.toMillis(seconds))));
    }
}
