/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.tools.command;

import java.io.Externalizable;
import java.time.DateTimeException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.tools.command.Command;

public class PositionCommand
extends Command {
    private static final Log log = LogFactory.getLog(PositionCommand.class);
    public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000L);
    public static final Duration READ_TIMEOUT = Duration.ofMillis(100L);
    protected static final String NAME = "position";
    public static final String AFTER_DATE_OPT = "after-date";
    public static final String TO_WATERMARK_OPT = "to-watermark";

    protected static long getTimestampFromDate(String dateIso8601) {
        if (dateIso8601 == null || dateIso8601.isEmpty()) {
            return -1L;
        }
        try {
            Instant instant = Instant.parse(dateIso8601);
            return instant.toEpochMilli();
        }
        catch (DateTimeException e) {
            log.error((Object)("Failed to read the timeout: " + e.getMessage()));
            log.error((Object)("The timestamp should be in ISO-8601 format, eg. " + Instant.now()));
            return -1L;
        }
    }

    @Override
    public String name() {
        return NAME;
    }

    @Override
    public void updateOptions(Options options) {
        options.addOption(Option.builder((String)"l").longOpt("log-name").desc("Log name").required().hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder((String)"p").longOpt("partition").desc("Read only this partition").hasArg().argName("PARTITION").build());
        options.addOption(Option.builder((String)"g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
        options.addOption(Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build());
        options.addOption(Option.builder().longOpt("to-end").desc("Sets the committed positions to the end of partitions for the group").build());
        options.addOption(Option.builder().longOpt(AFTER_DATE_OPT).desc("Sets the committed positions for the group to a specific date. The date used to find the offset depends on the implementation, for Kafka this is the LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date. The date is specified in ISO-8601 format, eg. " + Instant.now() + ". If no record offset is found with an appropriate timestamp then the command fails.").hasArg().argName("DATE").build());
        options.addOption(Option.builder().longOpt(TO_WATERMARK_OPT).desc("Sets the committed positions for the group to a specific date. The date used to find the offset is contained in a record watermark.  This means that the LOG_NAME is expected to be a computation stream with records with populated watermark. The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\" The date is specified in ISO-8601 format, eg. " + Instant.now() + ". If no record offset is found with an appropriate timestamp then the command fails.").hasArg().argName("DATE").build());
    }

    @Override
    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
        Name name = Name.ofUrn(cmd.getOptionValue("log-name"));
        Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools"));
        int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1"));
        if (cmd.hasOption(AFTER_DATE_OPT)) {
            long timestamp = PositionCommand.getTimestampFromDate(cmd.getOptionValue(AFTER_DATE_OPT));
            if (timestamp >= 0L) {
                return this.positionAfterDate(manager, group, name, partition, timestamp);
            }
        } else if (cmd.hasOption(TO_WATERMARK_OPT)) {
            long timestamp = PositionCommand.getTimestampFromDate(cmd.getOptionValue(TO_WATERMARK_OPT));
            if (timestamp >= 0L) {
                return this.positionToWatermark(manager, group, name, partition, timestamp);
            }
        } else {
            if (cmd.hasOption("to-end")) {
                return this.toEnd(manager, group, name, partition);
            }
            if (cmd.hasOption("reset")) {
                return this.reset(manager, group, name, partition);
            }
            log.error((Object)"Invalid option, try 'help position'");
        }
        return false;
    }

    protected boolean toEnd(LogManager manager, Name group, Name name, int partition) {
        LogLag lag = this.getLag(manager, group, name, partition);
        try (LogTailer tailer = this.createTailer(manager, name, partition, group);){
            tailer.toEnd();
            tailer.commit();
        }
        log.info((Object)String.format("# Moved log %s, group: %s, from: %s to %s", this.labelFor(name, partition), group, lag.lower(), lag.upper()));
        return true;
    }

    protected String labelFor(int partition) {
        return partition >= 0 ? Integer.toString(partition) : "all";
    }

    protected String labelFor(Name name, int partition) {
        return partition >= 0 ? name.getUrn() + ":" + this.labelFor(partition) : name.getUrn();
    }

    protected LogLag getLag(LogManager manager, Name group, Name name, int partition) {
        if (partition >= 0) {
            return manager.getLagPerPartition(name, group).get(partition);
        }
        return manager.getLag(name, group);
    }

    protected <T extends Externalizable> LogTailer<T> createTailer(LogManager manager, Name name, int partition, Name group) {
        if (partition >= 0) {
            return manager.createTailer(group, new LogPartition(name, partition));
        }
        return manager.createTailer(group, name);
    }

    protected boolean reset(LogManager manager, Name group, Name name, int partition) {
        LogLag lag = this.getLag(manager, group, name, partition);
        long pos = lag.lower();
        try (LogTailer tailer = this.createTailer(manager, name, partition, group);){
            tailer.reset();
        }
        log.warn((Object)String.format("# Reset log %s, group: %s, from: %s to 0", this.labelFor(name, partition), group, pos));
        return true;
    }

    protected boolean positionAfterDate(LogManager manager, Name group, Name name, int partition, long timestamp) {
        try (LogTailer tailer = manager.createTailer(group, name);){
            boolean movedOffset = false;
            for (int part = 0; part < manager.size(name); ++part) {
                if (partition >= 0 && part != partition) continue;
                LogPartition logPartition = new LogPartition(name, part);
                LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp);
                if (logOffset == null) {
                    log.error((Object)String.format("# Could not find an offset for group: %s, partition: %s", group, logPartition));
                    continue;
                }
                tailer.seek(logOffset);
                movedOffset = true;
                log.info((Object)String.format("# Set log %s, group: %s, to offset %s", this.labelFor(name, part), group, logOffset.offset()));
            }
            if (movedOffset) {
                tailer.commit();
                boolean bl = true;
                return bl;
            }
        }
        log.error((Object)"No offset found for the specified date");
        return false;
    }

    protected boolean positionToWatermark(LogManager manager, Name group, Name name, int partition, long timestamp) throws InterruptedException {
        Name newGroup = Name.ofUrn("admin/tools");
        int size = manager.size(name);
        ArrayList<LogOffset> offsets = new ArrayList<LogOffset>(size);
        List<LogLag> lags = manager.getLagPerPartition(name, newGroup);
        int part = 0;
        for (LogLag lag : lags) {
            if (lag.lag() == 0L) {
                offsets.add(null);
            } else {
                if (partition >= 0 && part != partition) {
                    offsets.add(null);
                }
                try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, part));){
                    offsets.add(this.searchWatermarkOffset(tailer, timestamp));
                }
            }
            ++part;
        }
        if (offsets.stream().noneMatch(Objects::nonNull)) {
            if (LogLag.of(lags).upper() == 0L) {
                log.error((Object)"No offsets found because log is empty");
                return false;
            }
            log.error((Object)("Timestamp: " + timestamp + " is earlier as any records, resetting positions"));
            return this.reset(manager, group, name, partition);
        }
        try (LogTailer tailer = manager.createTailer(group, name);){
            offsets.stream().filter(Objects::nonNull).forEach(tailer::seek);
            tailer.commit();
            offsets.stream().filter(Objects::nonNull).forEach(offset -> log.info((Object)("# Moving consumer to: " + offset)));
        }
        return true;
    }

    protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException {
        LogOffset lastOffset = null;
        LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT);
        while (rec != null) {
            long recTimestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp();
            if (recTimestamp == timestamp) {
                return rec.offset();
            }
            if (recTimestamp > timestamp) {
                return lastOffset;
            }
            if (recTimestamp == 0L) {
                throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec);
            }
            lastOffset = rec.offset();
            rec = tailer.read(READ_TIMEOUT);
        }
        return lastOffset;
    }
}

