/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import kafka.server.LogDirFailureChannel;
import kafka.tier.state.TierPartitionState;
import kafka.tier.topic.TierTopicManagerConfig;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.compat.java8.JFunction;
import scala.compat.java8.JFunction0;

public class TierTopicManagerCommitter {
    static final Integer CURRENT_VERSION = 0;
    private static final String SEPARATOR = " ";
    private static final Logger log = LoggerFactory.getLogger(TierTopicManagerCommitter.class);
    private final TierTopicManagerConfig config;
    private final LogDirFailureChannel logDirFailureChannel;
    private final ConcurrentHashMap<Integer, Long> positions = new ConcurrentHashMap();

    public TierTopicManagerCommitter(TierTopicManagerConfig config, LogDirFailureChannel logDirFailureChannel) {
        if (config.logDirs.size() != 1) {
            throw new UnsupportedOperationException("TierTopicManager does not currently support multiple logdirs.");
        }
        this.config = config;
        this.logDirFailureChannel = logDirFailureChannel;
        this.clearTempFiles();
        this.loadOffsets();
    }

    public void updatePosition(Integer partition, Long position) {
        log.debug("Committer position updated {}:{}", (Object)partition, (Object)position);
        this.positions.put(partition, position);
    }

    public Long positionFor(int partitionId) {
        return this.positions.get(partitionId);
    }

    public synchronized void flush(Iterator<TierPartitionState> tierPartitionStateIterator) {
        HashMap<Integer, Long> flushPositions = new HashMap<Integer, Long>(this.positions);
        boolean error = false;
        while (tierPartitionStateIterator.hasNext()) {
            TierPartitionState state = tierPartitionStateIterator.next();
            try {
                state.flush();
            }
            catch (IOException ioe) {
                error = true;
                log.error("Error committing progress or flushing TierPartitionStates.", (Throwable)ioe);
                String logDir = state.dir().getParent();
                this.logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func((JFunction0 & Serializable)() -> "Failed to flush TierPartitionState for " + state.dir()), ioe);
            }
        }
        if (!error) {
            this.writeOffsets(flushPositions);
        }
    }

    static Map<Integer, Long> earliestOffsets(List<Map<Integer, Long>> diskOffsets) {
        if (diskOffsets.stream().map(Map::keySet).collect(Collectors.toSet()).size() != 1) {
            return new HashMap<Integer, Long>();
        }
        HashMap<Integer, Long> minimum = new HashMap<Integer, Long>();
        for (Map<Integer, Long> offsets : diskOffsets) {
            log.debug("Loading offsets from logdir {}.", diskOffsets);
            for (Map.Entry<Integer, Long> entry2 : offsets.entrySet()) {
                minimum.compute(entry2.getKey(), (k, v) -> {
                    if (v == null || (Long)entry2.getValue() < v) {
                        return (Long)entry2.getValue();
                    }
                    return v;
                });
            }
        }
        log.debug("Minimum offsets found {}.", minimum);
        return minimum;
    }

    private static String commitPath(String logDir) {
        return logDir + "/tier.offsets";
    }

    private static String commitTempFilename(String logDir) {
        return TierTopicManagerCommitter.commitPath(logDir) + ".tmp";
    }

    private void clearTempFiles() {
        for (String logDir : this.config.logDirs) {
            try {
                Files.deleteIfExists(Paths.get(TierTopicManagerCommitter.commitTempFilename(logDir), new String[0]));
            }
            catch (IOException ioe) {
                this.logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func((JFunction0 & Serializable)() -> "Failed to delete temporory tier offsets in logdir."), ioe);
            }
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    static Map<Integer, Long> committed(String logDir, LogDirFailureChannel logDirFailureChannel) {
        HashMap<Integer, Long> loaded = new HashMap<Integer, Long>();
        try (FileReader fr = new FileReader(TierTopicManagerCommitter.commitPath(logDir));
             BufferedReader br = new BufferedReader(fr);){
            String line = br.readLine();
            if (TierTopicManagerCommitter.invalidHeader(line)) {
                HashMap<Integer, Long> hashMap = new HashMap<Integer, Long>();
                return hashMap;
            }
            line = br.readLine();
            while (line != null) {
                String[] values = line.split(SEPARATOR);
                if (values.length != 2) {
                    log.warn("TierTopicManager offsets found in incorrect format '{}'. Resetting positions.", (Object)line);
                    HashMap<Integer, Long> hashMap = new HashMap<Integer, Long>();
                    return hashMap;
                }
                loaded.put(Integer.parseInt(values[0]), Long.parseLong(values[1]));
                line = br.readLine();
            }
            return loaded;
        }
        catch (FileNotFoundException fnf) {
            log.info("TierTopicManager offsets not found. This is expected if this is the first time starting up with tiered storage.");
            return loaded;
        }
        catch (NumberFormatException nfe) {
            log.error("Error parsing TierTopicManager offsets. Ignoring stored positions.", (Throwable)nfe);
            return new HashMap<Integer, Long>();
        }
        catch (IOException ioe) {
            log.error("Error loading TierTopicManager offsets. Setting logdir offline.", (Throwable)ioe);
            logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func((JFunction0 & Serializable)() -> "Failed to commit tier offsets to logdir."), ioe);
        }
        return loaded;
    }

    private static boolean invalidHeader(String line) {
        try {
            Integer version = Integer.parseInt(line);
            if (version > CURRENT_VERSION || version < 0) {
                log.error("Committed offsets version {} is unsupported. Current version {}. Returning empty positions.", (Object)version, (Object)CURRENT_VERSION);
                return true;
            }
        }
        catch (NumberFormatException nfe) {
            log.error("Error parsing committed offset version, line '{}'. Returning empty positions.", (Object)line);
            return true;
        }
        return false;
    }

    private void loadOffsets() {
        Map<Integer, Long> earliest = TierTopicManagerCommitter.earliestOffsets(this.config.logDirs.stream().map(logDir -> TierTopicManagerCommitter.committed(logDir, this.logDirFailureChannel)).collect(Collectors.toList()));
        this.positions.clear();
        this.positions.putAll(earliest);
    }

    private void writeOffsets(Map<Integer, Long> offsets) {
        for (String logDir : this.config.logDirs) {
            try {
                try (FileWriter fw = new FileWriter(TierTopicManagerCommitter.commitTempFilename(logDir));
                     BufferedWriter bw = new BufferedWriter(fw);){
                    bw.write(CURRENT_VERSION.toString());
                    bw.newLine();
                    for (Map.Entry<Integer, Long> entry2 : offsets.entrySet()) {
                        bw.write(entry2.getKey() + SEPARATOR + entry2.getValue());
                        bw.newLine();
                    }
                }
                Utils.atomicMoveWithFallback((Path)Paths.get(TierTopicManagerCommitter.commitTempFilename(logDir), new String[0]), (Path)Paths.get(TierTopicManagerCommitter.commitPath(logDir), new String[0]));
            }
            catch (IOException ioe) {
                this.logDirFailureChannel.maybeAddOfflineLogDir(logDir, (Function0<String>)JFunction.func((JFunction0 & Serializable)() -> "Failed to commit tier offsets to logdir."), ioe);
            }
        }
    }
}

