/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.tools.command;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;

public class LatencyTrackerComputation
extends AbstractComputation {
    private static final Log log = LogFactory.getLog(LatencyTrackerComputation.class);
    protected static final String OUTPUT_STREAM = "o1";
    protected final LogManager manager;
    protected final List<String> logNames;
    protected final int intervalMs;
    protected final int count;
    protected final boolean verbose;
    protected final Codec<Record> codec;
    protected int remaining;
    protected List<LogPartitionGroup> logGroups;

    public LatencyTrackerComputation(LogManager manager, List<String> logNames, String computationName, int intervalSecond, int count, boolean verbose, Codec<Record> codec) {
        super(computationName, 1, 1);
        this.manager = manager;
        this.logNames = logNames;
        this.intervalMs = 1000 * intervalSecond;
        this.count = count;
        this.remaining = count;
        this.verbose = verbose;
        this.codec = codec;
    }

    @Override
    public void init(ComputationContext context) {
        this.info(String.format("Tracking %s, count: %d, interval: %dms", Arrays.toString(this.logNames.toArray()), this.count, this.intervalMs));
        this.logGroups = new ArrayList<LogPartitionGroup>();
        this.logNames.forEach(name -> {
            for (String group : this.manager.listConsumerGroups((String)name)) {
                this.logGroups.add(new LogPartitionGroup(group, (String)name, 0));
            }
        });
        context.setTimer("tracker", System.currentTimeMillis() + (long)this.intervalMs);
    }

    @Override
    public void processTimer(ComputationContext context, String key, long timestamp) {
        if (this.remaining == 0) {
            this.debug("Exiting after " + this.count + " captures");
            context.askForTermination();
            return;
        }
        this.debug(String.format("Tracking latency %d/%d", this.count - this.remaining, this.count));
        for (LogPartitionGroup logGroup : this.logGroups) {
            List<Latency> latencies = this.getLatenciesForPartition(logGroup, this.codec);
            if (latencies.isEmpty()) continue;
            for (int partition = 0; partition < latencies.size(); ++partition) {
                Latency latency = latencies.get(partition);
                if (latency.lower() <= 0L) continue;
                long recordWatermark = Watermark.ofTimestamp(latency.upper()).getValue();
                String recordKey = LatencyTrackerComputation.encodeKey(logGroup, partition);
                byte[] recordValue = this.encodeLatency(latency);
                Record record = new Record(recordKey, recordValue, recordWatermark);
                if (this.verbose) {
                    this.debug("out: " + record);
                }
                context.produceRecord(OUTPUT_STREAM, record);
                context.setSourceLowWatermark(recordWatermark);
            }
        }
        context.askForCheckpoint();
        context.setTimer("tracker", System.currentTimeMillis() + (long)this.intervalMs);
        --this.remaining;
    }

    protected byte[] encodeLatency(Latency latency) {
        return latency.asJson().getBytes(StandardCharsets.UTF_8);
    }

    protected List<Latency> getLatenciesForPartition(LogPartitionGroup logGroup, Codec<Record> codec) {
        try {
            return this.manager.getLatencyPerPartition(logGroup.name, logGroup.group, codec, rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp(), Record::getKey);
        }
        catch (Exception e) {
            if (e.getCause() instanceof ClassNotFoundException || e instanceof IllegalStateException) {
                this.error("log does not contains Record, remove partition: " + logGroup);
                return Collections.emptyList();
            }
            throw e;
        }
    }

    public static String encodeKey(LogPartitionGroup logGroup, int partition) {
        return String.format("%s:%s:%s", logGroup.group, logGroup.name, partition);
    }

    public static LogPartitionGroup decodeKey(String key) {
        String[] parts = key.split(":");
        return new LogPartitionGroup(parts[0], parts[1], Integer.parseInt(parts[2]));
    }

    @Override
    public void destroy() {
        this.info("Good bye");
    }

    @Override
    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
        this.error("Receiving a record is not expected!: " + record);
    }

    protected void debug(String msg) {
        if (this.verbose) {
            log.info((Object)msg);
        }
    }

    protected void info(String msg) {
        log.info((Object)msg);
    }

    protected void error(String msg) {
        log.error((Object)msg);
    }
}

