package org.nuxeo.lib.stream.tools.command;

import java.io.Externalizable;
import java.time.DateTimeException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;

/* loaded from: input_file:org/nuxeo/lib/stream/tools/command/PositionCommand.class */
public class PositionCommand extends Command {
    private static final Log log = LogFactory.getLog(PositionCommand.class);
    public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000);
    public static final Duration READ_TIMEOUT = Duration.ofMillis(100);
    protected static final String NAME = "position";
    public static final String AFTER_DATE_OPT = "after-date";
    public static final String TO_WATERMARK_OPT = "to-watermark";

    /* JADX INFO: Access modifiers changed from: protected */
    public static long getTimestampFromDate(String str) {
        if (str == null || str.isEmpty()) {
            return -1L;
        }
        try {
            return Instant.parse(str).toEpochMilli();
        } catch (DateTimeException e) {
            log.error("Failed to read the timeout: " + e.getMessage());
            log.error("The timestamp should be in ISO-8601 format, eg. " + Instant.now());
            return -1L;
        }
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public String name() {
        return NAME;
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public void updateOptions(Options options) {
        options.addOption(Option.builder("l").longOpt("log-name").desc("Log name").required().hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder("p").longOpt("partition").desc("Read only this partition").hasArg().argName("PARTITION").build());
        options.addOption(Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
        options.addOption(Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build());
        options.addOption(Option.builder().longOpt("to-end").desc("Sets the committed positions to the end of partitions for the group").build());
        options.addOption(Option.builder().longOpt(AFTER_DATE_OPT).desc("Sets the committed positions for the group to a specific date. The date used to find the offset depends on the implementation, for Kafka this is the LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date. The date is specified in ISO-8601 format, eg. " + Instant.now() + ". If no record offset is found with an appropriate timestamp then the command fails.").hasArg().argName("DATE").build());
        options.addOption(Option.builder().longOpt(TO_WATERMARK_OPT).desc("Sets the committed positions for the group to a specific date. The date used to find the offset is contained in a record watermark.  This means that the LOG_NAME is expected to be a computation stream with records with populated watermark. The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\" The date is specified in ISO-8601 format, eg. " + Instant.now() + ". If no record offset is found with an appropriate timestamp then the command fails.").hasArg().argName("DATE").build());
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public boolean run(LogManager logManager, CommandLine commandLine) throws InterruptedException {
        String optionValue = commandLine.getOptionValue("log-name");
        String optionValue2 = commandLine.getOptionValue("group", "tools");
        int parseInt = Integer.parseInt(commandLine.getOptionValue("partition", "-1"));
        if (commandLine.hasOption(AFTER_DATE_OPT)) {
            long timestampFromDate = getTimestampFromDate(commandLine.getOptionValue(AFTER_DATE_OPT));
            if (timestampFromDate >= 0) {
                return positionAfterDate(logManager, optionValue2, optionValue, parseInt, timestampFromDate);
            }
            return false;
        }
        if (commandLine.hasOption(TO_WATERMARK_OPT)) {
            long timestampFromDate2 = getTimestampFromDate(commandLine.getOptionValue(TO_WATERMARK_OPT));
            if (timestampFromDate2 >= 0) {
                return positionToWatermark(logManager, optionValue2, optionValue, parseInt, timestampFromDate2);
            }
            return false;
        }
        if (commandLine.hasOption("to-end")) {
            return toEnd(logManager, optionValue2, optionValue, parseInt);
        }
        if (commandLine.hasOption("reset")) {
            return reset(logManager, optionValue2, optionValue, parseInt);
        }
        log.error("Invalid option, try 'help position'");
        return false;
    }

    protected boolean toEnd(LogManager logManager, String str, String str2, int i) {
        LogLag lag = getLag(logManager, str, str2, i);
        LogTailer createTailer = createTailer(logManager, str2, i, str);
        Throwable th = null;
        try {
            try {
                createTailer.toEnd();
                createTailer.commit();
                if (createTailer != null) {
                    if (0 != 0) {
                        try {
                            createTailer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTailer.close();
                    }
                }
                log.info(String.format("# Moved log %s, group: %s, from: %s to %s", labelFor(str2, i), str, Long.valueOf(lag.lower()), Long.valueOf(lag.upper())));
                return true;
            } finally {
            }
        } catch (Throwable th3) {
            if (createTailer != null) {
                if (th != null) {
                    try {
                        createTailer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTailer.close();
                }
            }
            throw th3;
        }
    }

    protected String labelFor(int i) {
        return i >= 0 ? Integer.toString(i) : "all";
    }

    protected String labelFor(String str, int i) {
        return i >= 0 ? str + ":" + labelFor(i) : str;
    }

    protected LogLag getLag(LogManager logManager, String str, String str2, int i) {
        return i >= 0 ? logManager.getLagPerPartition(str2, str).get(i) : logManager.getLag(str2, str);
    }

    protected <T extends Externalizable> LogTailer<T> createTailer(LogManager logManager, String str, int i, String str2) {
        return i >= 0 ? logManager.createTailer(str2, new LogPartition(str, i)) : logManager.createTailer(str2, str);
    }

    protected boolean reset(LogManager logManager, String str, String str2, int i) {
        long lower = getLag(logManager, str, str2, i).lower();
        LogTailer createTailer = createTailer(logManager, str2, i, str);
        Throwable th = null;
        try {
            try {
                createTailer.reset();
                if (createTailer != null) {
                    if (0 != 0) {
                        try {
                            createTailer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTailer.close();
                    }
                }
                log.warn(String.format("# Reset log %s, group: %s, from: %s to 0", labelFor(str2, i), str, Long.valueOf(lower)));
                return true;
            } finally {
            }
        } catch (Throwable th3) {
            if (createTailer != null) {
                if (th != null) {
                    try {
                        createTailer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTailer.close();
                }
            }
            throw th3;
        }
    }

    protected boolean positionAfterDate(LogManager logManager, String str, String str2, int i, long j) {
        LogTailer createTailer = logManager.createTailer(str, str2);
        Throwable th = null;
        boolean z = false;
        for (int i2 = 0; i2 < logManager.size(str2); i2++) {
            try {
                try {
                    if (i < 0 || i2 == i) {
                        LogPartition logPartition = new LogPartition(str2, i2);
                        LogOffset offsetForTimestamp = createTailer.offsetForTimestamp(logPartition, j);
                        if (offsetForTimestamp == null) {
                            log.error(String.format("# Could not find an offset for group: %s, partition: %s", str, logPartition));
                        } else {
                            createTailer.seek(offsetForTimestamp);
                            z = true;
                            log.info(String.format("# Set log %s, group: %s, to offset %s", labelFor(str2, i2), str, Long.valueOf(offsetForTimestamp.offset())));
                        }
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (createTailer != null) {
                    if (th != null) {
                        try {
                            createTailer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createTailer.close();
                    }
                }
                throw th2;
            }
        }
        if (z) {
            createTailer.commit();
            if (createTailer != null) {
                if (0 != 0) {
                    try {
                        createTailer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTailer.close();
                }
            }
            return true;
        }
        if (createTailer != null) {
            if (0 != 0) {
                try {
                    createTailer.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            } else {
                createTailer.close();
            }
        }
        log.error("No offset found for the specified date");
        return false;
    }

    protected boolean positionToWatermark(LogManager logManager, String str, String str2, int i, long j) throws InterruptedException {
        ArrayList arrayList = new ArrayList(logManager.size(str2));
        List<LogLag> lagPerPartition = logManager.getLagPerPartition(str2, "tools");
        int i2 = 0;
        Iterator<LogLag> it = lagPerPartition.iterator();
        while (it.hasNext()) {
            if (it.next().lag() == 0) {
                arrayList.add(null);
            } else {
                if (i >= 0 && i2 != i) {
                    arrayList.add(null);
                }
                LogTailer<Record> createTailer = logManager.createTailer("tools", new LogPartition(str2, i2));
                Throwable th = null;
                try {
                    try {
                        arrayList.add(searchWatermarkOffset(createTailer, j));
                        if (createTailer != null) {
                            if (0 != 0) {
                                try {
                                    createTailer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createTailer.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (createTailer != null) {
                        if (th != null) {
                            try {
                                createTailer.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            createTailer.close();
                        }
                    }
                    throw th4;
                }
            }
            i2++;
        }
        if (arrayList.stream().noneMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            if (LogLag.of(lagPerPartition).upper() == 0) {
                log.error("No offsets found because log is empty");
                return false;
            }
            log.error("Timestamp: " + j + " is earlier as any records, resetting positions");
            return reset(logManager, str, str2, i);
        }
        LogTailer createTailer2 = logManager.createTailer(str, str2);
        Throwable th6 = null;
        try {
            try {
                Stream filter = arrayList.stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                });
                createTailer2.getClass();
                filter.forEach(createTailer2::seek);
                createTailer2.commit();
                arrayList.stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).forEach(logOffset -> {
                    log.info("# Moving consumer to: " + logOffset);
                });
                if (createTailer2 == null) {
                    return true;
                }
                if (0 == 0) {
                    createTailer2.close();
                    return true;
                }
                try {
                    createTailer2.close();
                    return true;
                } catch (Throwable th7) {
                    th6.addSuppressed(th7);
                    return true;
                }
            } catch (Throwable th8) {
                th6 = th8;
                throw th8;
            }
        } catch (Throwable th9) {
            if (createTailer2 != null) {
                if (th6 != null) {
                    try {
                        createTailer2.close();
                    } catch (Throwable th10) {
                        th6.addSuppressed(th10);
                    }
                } else {
                    createTailer2.close();
                }
            }
            throw th9;
        }
    }

    protected LogOffset searchWatermarkOffset(LogTailer<Record> logTailer, long j) throws InterruptedException {
        LogOffset logOffset = null;
        LogRecord<Record> read = logTailer.read(FIRST_READ_TIMEOUT);
        while (true) {
            LogRecord<Record> logRecord = read;
            if (logRecord == null) {
                return logOffset;
            }
            long timestamp = Watermark.ofValue(logRecord.message().getWatermark()).getTimestamp();
            if (timestamp == j) {
                return logRecord.offset();
            }
            if (timestamp > j) {
                return logOffset;
            }
            if (timestamp == 0) {
                throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + logRecord);
            }
            logOffset = logRecord.offset();
            read = logTailer.read(READ_TIMEOUT);
        }
    }
}
