package org.nuxeo.lib.stream.tools.command;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamManager;
import org.nuxeo.lib.stream.log.LogManager;

/* loaded from: input_file:org/nuxeo/lib/stream/tools/command/MonitorCommand.class */
public class MonitorCommand extends Command {
    public static final String COMPUTATION_NAME = "LatencyMonitor";
    public static final String INPUT_STREAM = "log_null";
    public static final String INTERNAL_LOG_PREFIX = "_";
    protected static final String NAME = "monitor";
    protected static final String DEFAULT_INTERVAL = "60";
    protected static final String DEFAULT_COUNT = "-1";
    protected static final String ALL_LOGS = "all";
    protected static final String DEFAULT_PORT = "2003";
    protected boolean verbose = false;
    protected String output;
    protected List<String> logNames;
    protected int interval;
    protected int count;
    protected Topology topology;
    protected StreamProcessor processor;
    protected String codec;
    protected String host;
    protected int port;
    protected boolean udp;
    protected String prefix;

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public String name() {
        return NAME;
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public void updateOptions(Options options) {
        options.addOption(Option.builder("l").longOpt("log-name").desc("Monitor consumers latency for this LOG, must be a computation Record, can be a comma separated list of log names or ALL").required().hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder("h").longOpt("host").desc("The carbon server host").required().hasArg().argName("HOST").build());
        options.addOption(Option.builder("p").longOpt("port").desc("The carbon server port if not 2003").hasArg().argName("PORT").build());
        options.addOption("u", "udp", false, "Carbon instance is listening using UDP");
        options.addOption(Option.builder("i").longOpt("interval").desc("send latency spaced at the specified interval in seconds").hasArg().argName("INTERVAL").build());
        options.addOption(Option.builder("c").longOpt("count").desc("number of time to send the latency information").hasArg().argName("COUNT").build());
        options.addOption(Option.builder().longOpt("prefix").desc("The metric prefix to use if not server.<hostname>.nuxeo.streams.").hasArg().argName("PREFIX").build());
        options.addOption(Option.builder().longOpt("codec").desc("Codec used to read record, can be: java, avro, avroBinary, avroJson").hasArg().argName("CODEC").build());
        options.addOption(Option.builder().longOpt("verbose").build());
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public boolean run(LogManager logManager, CommandLine commandLine) {
        this.logNames = getLogNames(logManager, commandLine.getOptionValue("log-name"));
        this.codec = commandLine.getOptionValue("codec");
        this.verbose = commandLine.hasOption("verbose");
        this.interval = Integer.parseInt(commandLine.getOptionValue("interval", DEFAULT_INTERVAL));
        this.count = Integer.parseInt(commandLine.getOptionValue("count", DEFAULT_COUNT));
        this.port = Integer.parseInt(commandLine.getOptionValue("port", DEFAULT_PORT));
        this.host = commandLine.getOptionValue("host");
        this.udp = commandLine.hasOption("udp");
        this.prefix = commandLine.getOptionValue("prefix", getDefaultPrefix());
        initTopology(logManager);
        return runProcessor(logManager);
    }

    protected List<String> getLogNames(LogManager logManager, String str) {
        if (ALL_LOGS.equalsIgnoreCase(str)) {
            return (List) logManager.listAll().stream().filter(str2 -> {
                return !str2.startsWith("_");
            }).collect(Collectors.toList());
        }
        List<String> asList = Arrays.asList(str.split(","));
        for (String str3 : asList) {
            if (!logManager.exists(str3)) {
                throw new IllegalArgumentException("Unknown log name: " + str3);
            }
        }
        return asList;
    }

    protected void initTopology(LogManager logManager) {
        this.topology = Topology.builder().addComputation(() -> {
            return new LatencyMonitorComputation(logManager, this.logNames, this.host, this.port, this.udp, this.prefix, COMPUTATION_NAME, this.interval, this.count, this.verbose, getRecordCodec(this.codec));
        }, Arrays.asList("i1:log_null", "o1:" + this.output)).build();
    }

    public String getDefaultPrefix() {
        String str;
        try {
            str = InetAddress.getLocalHost().getHostName().split("\\.")[0];
        } catch (UnknownHostException e) {
            str = "unknown";
        }
        return "servers." + str + ".nuxeo.streams.";
    }

    protected boolean runProcessor(LogManager logManager) {
        this.processor = new LogStreamManager(logManager).registerAndCreateProcessor(NAME, this.topology, new Settings(1, 1, getRecordCodec(this.codec)));
        this.processor.start();
        while (!this.processor.isTerminated()) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.processor.shutdown();
                return false;
            }
        }
        return true;
    }
}
