package org.nuxeo.lib.stream.tools.command;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;

/* loaded from: input_file:org/nuxeo/lib/stream/tools/command/LatencyTrackerComputation.class */
public class LatencyTrackerComputation extends AbstractComputation {
    protected static final String OUTPUT_STREAM = "o1";
    protected final LogManager manager;
    protected final List<String> logNames;
    protected final int intervalMs;
    protected final int count;
    protected final boolean verbose;
    protected int remaining;
    protected List<LogPartitionGroup> logGroups;

    public LatencyTrackerComputation(LogManager logManager, List<String> list, String str, int i, int i2, boolean z) {
        super(str, 1, 1);
        this.manager = logManager;
        this.logNames = list;
        this.intervalMs = 1000 * i;
        this.count = i2;
        this.remaining = i2;
        this.verbose = z;
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void init(ComputationContext computationContext) {
        info(String.format("Tracking %s, count: %d, interval: %dms", Arrays.toString(this.logNames.toArray()), Integer.valueOf(this.count), Integer.valueOf(this.intervalMs)));
        this.logGroups = new ArrayList();
        this.logNames.forEach(str -> {
            Iterator<String> it = this.manager.listConsumerGroups(str).iterator();
            while (it.hasNext()) {
                this.logGroups.add(new LogPartitionGroup(it.next(), str, 0));
            }
        });
        computationContext.setTimer("tracker", System.currentTimeMillis() + this.intervalMs);
    }

    @Override // org.nuxeo.lib.stream.computation.AbstractComputation, org.nuxeo.lib.stream.computation.Computation
    public void processTimer(ComputationContext computationContext, String str, long j) {
        if (this.remaining == 0) {
            debug("Exiting after " + this.count + " captures");
            computationContext.askForTermination();
            return;
        }
        debug(String.format("Tracking latency %d/%d", Integer.valueOf(this.count - this.remaining), Integer.valueOf(this.count)));
        for (LogPartitionGroup logPartitionGroup : this.logGroups) {
            try {
                int i = 0;
                for (Latency latency : this.manager.getLatencyPerPartition(logPartitionGroup.name, logPartitionGroup.group, record -> {
                    return Long.valueOf(Watermark.ofValue(record.watermark).getTimestamp());
                }, record2 -> {
                    return record2.key;
                })) {
                    try {
                        Record record3 = new Record(encodeKey(logPartitionGroup, i), latency.asJson().getBytes("UTF-8"), Watermark.ofTimestamp(latency.upper()).getValue(), null);
                        debug("out: " + record3);
                        computationContext.produceRecord(OUTPUT_STREAM, record3);
                        computationContext.setSourceLowWatermark(latency.upper());
                        i++;
                    } catch (UnsupportedEncodingException e) {
                        throw new IllegalStateException("Faild to byte encoding " + latency, e);
                    }
                }
            } catch (IllegalStateException e2) {
                error("log does not contains Record: " + logPartitionGroup);
            }
        }
        computationContext.askForCheckpoint();
        computationContext.setTimer("tracker", System.currentTimeMillis() + this.intervalMs);
        this.remaining--;
    }

    public static String encodeKey(LogPartitionGroup logPartitionGroup, int i) {
        return String.format("%s:%s:%s", logPartitionGroup.group, logPartitionGroup.name, Integer.valueOf(i));
    }

    public static LogPartitionGroup decodeKey(String str) {
        String[] split = str.split(":");
        return new LogPartitionGroup(split[0], split[1], Integer.parseInt(split[2]));
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void destroy() {
        info("Good bye");
    }

    @Override // org.nuxeo.lib.stream.computation.Computation
    public void processRecord(ComputationContext computationContext, String str, Record record) {
        error("Receiving a record is not expected!: " + record);
    }

    protected void debug(String str) {
        if (this.verbose) {
            System.out.println(str);
        }
    }

    protected void info(String str) {
        System.out.println(str);
    }

    protected void error(String str) {
        System.err.println(str);
    }
}
