/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.tools.command;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamManager;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.tools.command.Command;
import org.nuxeo.lib.stream.tools.command.LatencyMonitorComputation;

public class MonitorCommand
extends Command {
    public static final String COMPUTATION_NAME = "LatencyMonitor";
    public static final String INPUT_STREAM = "log_null";
    public static final String INTERNAL_LOG_PREFIX = "_";
    protected static final String NAME = "monitor";
    protected static final String DEFAULT_INTERVAL = "60";
    protected static final String DEFAULT_COUNT = "-1";
    protected static final String ALL_LOGS = "all";
    protected static final String DEFAULT_PORT = "2003";
    protected boolean verbose = false;
    protected boolean partition = false;
    protected List<String> logNames;
    protected int interval;
    protected int count;
    protected Topology topology;
    protected StreamProcessor processor;
    protected String codec;
    protected String host;
    protected int port;
    protected boolean udp;
    protected String prefix;

    @Override
    public String name() {
        return NAME;
    }

    @Override
    public void updateOptions(Options options) {
        options.addOption(Option.builder((String)"l").longOpt("log-name").desc("Monitor consumers latency for this LOG, must be a computation Record, can be a comma separated list of log names or ALL").required().hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder((String)"h").longOpt("host").desc("The carbon server host").required().hasArg().argName("HOST").build());
        options.addOption(Option.builder((String)"p").longOpt("port").desc("The carbon server port if not 2003").hasArg().argName("PORT").build());
        options.addOption("u", "udp", false, "Carbon instance is listening using UDP");
        options.addOption(Option.builder().longOpt("partition").desc("Report metrics for each partition").build());
        options.addOption(Option.builder((String)"i").longOpt("interval").desc("send latency spaced at the specified interval in seconds").hasArg().argName("INTERVAL").build());
        options.addOption(Option.builder((String)"c").longOpt("count").desc("number of times the latency information is sent").hasArg().argName("COUNT").build());
        options.addOption(Option.builder().longOpt("prefix").desc("The metric prefix to use if not server.<hostname>.nuxeo.streams.").hasArg().argName("PREFIX").build());
        options.addOption(Option.builder().longOpt("codec").desc("Codec used to read record, can be: java, avro, avroBinary, avroJson").hasArg().argName("CODEC").build());
        options.addOption(Option.builder().longOpt("verbose").build());
    }

    @Override
    public boolean run(LogManager manager, CommandLine cmd) {
        this.logNames = this.getLogNames(manager, cmd.getOptionValue("log-name"));
        this.codec = cmd.getOptionValue("codec");
        this.partition = cmd.hasOption("partition");
        this.verbose = cmd.hasOption("verbose");
        this.interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL));
        this.count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT));
        this.port = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_PORT));
        this.host = cmd.getOptionValue("host");
        this.udp = cmd.hasOption("udp");
        this.prefix = cmd.getOptionValue("prefix", this.getDefaultPrefix());
        this.initTopology(manager);
        return this.runProcessor(manager);
    }

    protected List<String> getLogNames(LogManager manager, String names) {
        if (ALL_LOGS.equalsIgnoreCase(names)) {
            return manager.listAll().stream().filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX)).filter(name -> !name.startsWith(INPUT_STREAM)).collect(Collectors.toList());
        }
        List<String> ret = Arrays.asList(names.split(","));
        if (ret.isEmpty()) {
            throw new IllegalArgumentException("No log name provided or found.");
        }
        for (String name2 : ret) {
            if (manager.exists(name2)) continue;
            throw new IllegalArgumentException("Unknown log name: " + name2);
        }
        return ret;
    }

    protected void initTopology(LogManager manager) {
        this.topology = Topology.builder().addComputation(() -> new LatencyMonitorComputation(manager, this.logNames, this.host, this.port, this.udp, this.prefix, COMPUTATION_NAME, this.interval, this.count, this.partition, this.verbose, this.getRecordCodec(this.codec)), Collections.singletonList("i1:log_null")).build();
    }

    public String getDefaultPrefix() {
        String hostname;
        try {
            hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0];
        }
        catch (UnknownHostException e) {
            hostname = "unknown";
        }
        return "servers." + hostname + ".nuxeo.streams.";
    }

    protected boolean runProcessor(LogManager manager) {
        LogStreamManager streamManager = new LogStreamManager(manager);
        Settings settings = new Settings(1, 1, this.getRecordCodec(this.codec));
        this.processor = streamManager.registerAndCreateProcessor(NAME, this.topology, settings);
        this.processor.start();
        while (!this.processor.isTerminated()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.processor.shutdown();
                return false;
            }
        }
        return true;
    }
}

