package org.nuxeo.lib.stream.tools.command;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
import org.nuxeo.runtime.codec.CodecServiceImpl;

/* loaded from: input_file:org/nuxeo/lib/stream/tools/command/RestoreCommand.class */
public class RestoreCommand extends Command {
    private static final Log log = LogFactory.getLog(RestoreCommand.class);
    protected static final String NAME = "restore";
    protected static final String GROUP = "tools";
    protected boolean verbose = false;
    protected String input;
    protected List<String> logNames;
    protected long date;
    protected boolean dryRun;
    protected String codec;

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public String name() {
        return "restore";
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public void updateOptions(Options options) {
        options.addOption(Option.builder("l").longOpt("log-name").desc("Restore consumers positions for this LOG, must be a computation Record, can be a comma separated list of log names or ALL").required().hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder("i").longOpt("log-input").desc("Log name of the input default to _consumer_latencies").hasArg().argName("LOG_INPUT").build());
        options.addOption(Option.builder().longOpt("to-date").desc("Sets the committed positions as they where at a specific date. The date is specified in ISO-8601 format, eg. " + Instant.now()).hasArg().argName("DATE").build());
        options.addOption(Option.builder().longOpt(CodecServiceImpl.XP_CODEC).desc("Codec used to read record, can be: java, avro, avroBinary, avroJson").hasArg().argName("CODEC").build());
        options.addOption(Option.builder().longOpt("verbose").build());
        options.addOption(Option.builder().longOpt("dry-run").desc("Do not change any position").build());
    }

    @Override // org.nuxeo.lib.stream.tools.command.Command
    public boolean run(LogManager logManager, CommandLine commandLine) throws InterruptedException {
        this.logNames = getLogNames(logManager, commandLine.getOptionValue("log-name"));
        this.input = commandLine.getOptionValue("log-input", "_consumer_latencies");
        this.date = PositionCommand.getTimestampFromDate(commandLine.getOptionValue("to-date"));
        this.verbose = commandLine.hasOption("verbose");
        this.dryRun = commandLine.hasOption("dry-run");
        this.codec = commandLine.getOptionValue(CodecServiceImpl.XP_CODEC);
        return restorePosition(logManager);
    }

    protected boolean restorePosition(LogManager logManager) throws InterruptedException {
        Map<LogPartitionGroup, LogOffset> searchOffsets = searchOffsets(logManager, readLatencies(logManager));
        if (this.dryRun) {
            log.info("# Dry run mode returning without doing any changes");
            return true;
        }
        updatePositions(logManager, searchOffsets);
        return true;
    }

    protected void updatePositions(LogManager logManager, Map<LogPartitionGroup, LogOffset> map) {
        log.info("# Update positions");
        map.forEach((logPartitionGroup, logOffset) -> {
            updatePosition(logManager, logPartitionGroup, logOffset);
        });
    }

    protected void updatePosition(LogManager logManager, LogPartitionGroup logPartitionGroup, LogOffset logOffset) {
        if (logOffset == null) {
            return;
        }
        log.info(logPartitionGroup + " new position: " + logOffset);
        LogTailer createTailer = logManager.createTailer(logPartitionGroup.group, logPartitionGroup.getLogPartition(), getRecordCodec(this.codec));
        Throwable th = null;
        try {
            createTailer.seek(logOffset);
            createTailer.commit();
            if (createTailer != null) {
                if (0 == 0) {
                    createTailer.close();
                    return;
                }
                try {
                    createTailer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTailer != null) {
                if (0 != 0) {
                    try {
                        createTailer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTailer.close();
                }
            }
            throw th3;
        }
    }

    protected Map<LogPartitionGroup, LogOffset> searchOffsets(LogManager logManager, Map<LogPartitionGroup, Latency> map) throws InterruptedException {
        HashMap hashMap = new HashMap(map.size());
        log.info("# Searching records matching the latencies lower timestamp and key");
        for (Map.Entry<LogPartitionGroup, Latency> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), findOffset(logManager, entry.getKey(), entry.getValue()));
        }
        return hashMap;
    }

    protected LogOffset findOffset(LogManager logManager, LogPartitionGroup logPartitionGroup, Latency latency) throws InterruptedException {
        long lower = latency.lower();
        String key = latency.key();
        LogTailer createTailer = logManager.createTailer(GROUP, logPartitionGroup.getLogPartition(), getRecordCodec(this.codec));
        Throwable th = null;
        try {
            for (LogRecord read = createTailer.read(PositionCommand.FIRST_READ_TIMEOUT); read != null; read = createTailer.read(PositionCommand.READ_TIMEOUT)) {
                if ((key == null || key.equals(((Record) read.message()).getKey())) && lower == Watermark.ofValue(((Record) read.message()).getWatermark()).getTimestamp()) {
                    log.info(String.format("%s: offset: %s wm: %d key: %s", logPartitionGroup, read.offset(), Long.valueOf(((Record) read.message()).getWatermark()), ((Record) read.message()).getKey()));
                    LogOffset nextOffset = read.offset().nextOffset();
                    if (createTailer != null) {
                        if (0 != 0) {
                            try {
                                createTailer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createTailer.close();
                        }
                    }
                    return nextOffset;
                }
            }
            log.error("No offset found for: " + logPartitionGroup + ", matching: " + latency.asJson());
            return null;
        } finally {
            if (createTailer != null) {
                if (0 != 0) {
                    try {
                        createTailer.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    createTailer.close();
                }
            }
        }
    }

    protected Map<LogPartitionGroup, Latency> readLatencies(LogManager logManager) throws InterruptedException {
        Latency decodeLatency;
        HashMap hashMap = new HashMap();
        log.info("# Reading latencies log: " + this.input + ", searching for the higher timestamp <= " + this.date);
        LogTailer createTailer = logManager.createTailer(GROUP, this.input, getRecordCodec(this.codec));
        Throwable th = null;
        try {
            for (LogRecord read = createTailer.read(PositionCommand.FIRST_READ_TIMEOUT); read != null; read = createTailer.read(PositionCommand.READ_TIMEOUT)) {
                long timestamp = Watermark.ofValue(((Record) read.message()).getWatermark()).getTimestamp();
                if (this.date <= 0 || timestamp <= this.date) {
                    LogPartitionGroup decodeKey = LatencyTrackerComputation.decodeKey(((Record) read.message()).getKey());
                    if (this.logNames.contains(decodeKey.name) && (decodeLatency = decodeLatency(((Record) read.message()).getData())) != null && decodeLatency.lower() > 0) {
                        hashMap.put(decodeKey, decodeLatency);
                    }
                }
            }
            log.info("# Latencies found (group:log:partition -> lat)");
            hashMap.forEach((logPartitionGroup, latency) -> {
                log.info(String.format("%s: %s", logPartitionGroup, latency.asJson()));
            });
            return hashMap;
        } finally {
            if (createTailer != null) {
                if (0 != 0) {
                    try {
                        createTailer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createTailer.close();
                }
            }
        }
    }

    protected Latency decodeLatency(byte[] bArr) {
        return Latency.fromJson(new String(bArr, StandardCharsets.UTF_8));
    }

    protected List<String> getLogNames(LogManager logManager, String str) {
        if ("all".equalsIgnoreCase(str)) {
            return (List) logManager.listAll().stream().filter(str2 -> {
                return !str2.startsWith("_");
            }).collect(Collectors.toList());
        }
        List<String> asList = Arrays.asList(str.split(","));
        for (String str3 : asList) {
            if (!logManager.exists(str3)) {
                throw new IllegalArgumentException("Unknown log name: " + str3);
            }
        }
        return asList;
    }
}
