/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import kafka.log.MergedLog;
import kafka.restore.RestoreUtil;
import kafka.server.KafkaConfig;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.state.FileTierPartitionIterator;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.Header;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import kafka.tier.store.TierObjectStoreUtils;
import kafka.tier.store.VersionInformation;
import kafka.tier.tools.RecoveryUtils;
import kafka.tier.tools.TierObjectStoreFactory;
import kafka.tier.tools.ValidateAndRestoreSegments;
import kafka.tier.tools.ValidateFtpsSegmentsResponse;
import kafka.utils.checksum.CheckedFileIO;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.internal.HelpScreenException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

public class ValidateSegments {
    private static final Logger LOGGER = Logger.getLogger("ValidateSegments");
    public static final String DESCRIPTION = "Validate segments in FTPS from tiered storage";
    private static final String LOGGING_LEVEL = "logging.level";
    private static final String LOGGING_LEVEL_DOC = "Logging level for the tool. Valid values: SEVERE, WARNING, INFO, CONFIG, FINE, FINER, FINEST, ALL";
    private static final String LOG_DIR = "log.dir";
    private static final String LOG_DIR_DOC = "Fully qualified path for log directory where tier state file is located";
    private static final String DEFAULT_KAFKA_PROPS_FILE = "/mnt/config/kafka.properties";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static TierObjectStore.Backend backend = null;
    private static Instant startTime;
    private static final Time TIME;
    private static final Set<String> REQUIRED_FILE_TYPES;
    private static final List<String> OBJECT_STORE_REQUIRED_PROPERTIES;

    public static void run(Namespace args) throws ArgumentParserException, IOException {
        startTime = Instant.ofEpochMilli(TIME.milliseconds());
        String objectStoreConfigFile = args.getString("tier.config").trim();
        String loggingLevel = args.getString(LOGGING_LEVEL);
        String logDir = args.getString(LOG_DIR).trim();
        ValidateSegments.setupLogger(loggingLevel);
        Map<UUID, TierObjectStore.ObjectMetadata> objectMetadataMap = ValidateSegments.objectMetadata(logDir);
        ValidateSegments.getInconsistentSegments(ValidateSegments.objectStore(objectStoreConfigFile), objectMetadataMap);
        TierObjectStoreFactory.closeBackendInstance(backend);
    }

    private static Map<UUID, TierObjectStore.ObjectMetadata> objectMetadata(String logDir) throws IOException {
        Map<UUID, TierObjectStore.ObjectMetadata> objectMetadataMap;
        File dir = new File(logDir);
        TopicPartition topicPartition = MergedLog.parseTopicPartitionName(dir);
        LOGGER.info("====== TopicPartition " + topicPartition + " ======");
        Optional<File> tierStateFile = Arrays.stream((Object[])Objects.requireNonNull(dir.listFiles())).filter(f -> f.isFile() && MergedLog.isTierStateFile(f)).findFirst();
        if (!tierStateFile.isPresent()) {
            throw new IllegalArgumentException("No Tier state file found at the log directory");
        }
        try {
            objectMetadataMap = ValidateSegments.loadObjectMetadata(topicPartition, tierStateFile.get());
        }
        catch (IOException e) {
            LOGGER.severe("IO Exception while reading tier state file");
            throw e;
        }
        return objectMetadataMap;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static Map<UUID, TierObjectStore.ObjectMetadata> loadObjectMetadata(TopicPartition topicPartition, File tierStateFile) throws IOException {
        HashMap<UUID, TierObjectStore.ObjectMetadata> objectMetadataMap = new HashMap<UUID, TierObjectStore.ObjectMetadata>();
        try (CheckedFileIO fileChannel = CheckedFileIO.open(tierStateFile.toPath(), StandardOpenOption.READ);){
            Optional<Header> headerOpt = FileTierPartitionState.readHeader(fileChannel);
            if (!headerOpt.isPresent()) {
                LOGGER.severe("Empty header at the tier state file");
                HashMap<UUID, TierObjectStore.ObjectMetadata> hashMap = objectMetadataMap;
                return hashMap;
            }
            LOGGER.fine("====== Printing header ======\n" + headerOpt.get());
            Optional<FileTierPartitionIterator> iteratorOpt = FileTierPartitionState.iterator(topicPartition, fileChannel);
            if (!iteratorOpt.isPresent()) {
                LOGGER.warning("Empty tier state file");
                HashMap<UUID, TierObjectStore.ObjectMetadata> hashMap = objectMetadataMap;
                return hashMap;
            }
            while (iteratorOpt.get().hasNext()) {
                TierObjectMetadata metadata = (TierObjectMetadata)iteratorOpt.get().next();
                if (!metadata.state().equals((Object)TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE)) continue;
                objectMetadataMap.put(metadata.objectId(), new TierObjectStore.ObjectMetadata(metadata.topicIdPartition(), metadata.objectId(), metadata.tierEpoch(), metadata.baseOffset(), metadata.hasAbortedTxns(), metadata.hasProducerState(), metadata.hasEpochState(), metadata.opaqueData()));
            }
            return objectMetadataMap;
        }
        catch (IOException e) {
            LOGGER.severe("IO Exception while reading tier state file");
            throw e;
        }
    }

    private static void getInconsistentSegments(TierObjectStore tierObjectStore, Map<UUID, TierObjectStore.ObjectMetadata> objectMetadataMap) throws IOException {
        LOGGER.info("Total segments in SEGMENT_UPLOAD_COMPLETE state: " + objectMetadataMap.size());
        Response response = new Response();
        objectMetadataMap.forEach((segmentId, objectMetadata) -> {
            try {
                String segmentDirectoryPath = ValidateSegments.getSegmentDirectoryPath(objectMetadata.toPath("", TierObjectStore.FileType.SEGMENT));
                Map<String, List<VersionInformation>> objectMapWithoutVersion = tierObjectStore.listObject(segmentDirectoryPath, false);
                LOGGER.fine("====== Listing files for Segment: " + segmentId + " ======");
                LOGGER.fine(objectMapWithoutVersion.keySet().toString());
                List<String> missingFiles = RestoreUtil.getMissingFiles(objectMapWithoutVersion, REQUIRED_FILE_TYPES);
                if (!missingFiles.isEmpty()) {
                    LOGGER.fine("Found missing files: " + missingFiles);
                    ValidateFtpsSegmentsResponse.SegmentDetail segmentDetail = new ValidateFtpsSegmentsResponse.SegmentDetail(segmentDirectoryPath, (UUID)segmentId);
                    segmentDetail.getFiles().addAll(missingFiles);
                    response.getMissingFiles().add(segmentDetail);
                }
            }
            catch (Exception ex) {
                ex.printStackTrace();
                LOGGER.info("Unable to validate files for segment: " + segmentId);
            }
        });
        if (response.getMissingFiles().isEmpty()) {
            LOGGER.info("No inconsistent segments found.");
        } else {
            LOGGER.info("Found " + response.getTotalMissingFiles() + " missing files across " + response.getMissingFiles().size() + " segments in " + Duration.between(startTime, Instant.ofEpochMilli(TIME.milliseconds())).getSeconds() + " seconds\n");
            System.out.println(OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(response.getMissingFiles()));
        }
    }

    private static String getSegmentDirectoryPath(String segmentPath) {
        return segmentPath.substring(0, segmentPath.lastIndexOf(47));
    }

    private static TierObjectStore objectStore(String objectStoreConfigFile) throws IOException {
        Properties props;
        try {
            ArrayList<String> allProps = new ArrayList<String>(OBJECT_STORE_REQUIRED_PROPERTIES);
            props = Utils.loadProps((String)objectStoreConfigFile, allProps);
        }
        catch (IOException e) {
            LOGGER.severe("Can not load object store properties from file: " + objectStoreConfigFile);
            throw e;
        }
        props.put(KafkaConfig.TierGcsPrefixProp(), "");
        LOGGER.fine("====== Loaded the following properties to access object store ======");
        props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> LOGGER.fine(k + " -> " + v)));
        TierObjectStore objectStore = ValidateSegments.getObjectStore(props);
        LOGGER.fine("Successfully created an instance to object store. Backend: " + objectStore.getBackend().getName());
        return objectStore;
    }

    private static TierObjectStore getObjectStore(Properties props) {
        backend = TierObjectStore.Backend.valueOf(props.getProperty(KafkaConfig.TierBackendProp()));
        TierObjectStoreConfig config = TierObjectStoreUtils.generateBackendConfig(backend, props);
        return TierObjectStoreFactory.getObjectStoreInstance(TIME, backend, config);
    }

    private static ArgumentParser createArgParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)ValidateAndRestoreSegments.class.getName()).defaultHelp(true).description(DESCRIPTION);
        parser.addArgument(new String[]{RecoveryUtils.makeArgument(LOG_DIR)}).dest(LOG_DIR).type(String.class).required(true).help(LOG_DIR_DOC);
        parser.addArgument(new String[]{RecoveryUtils.makeArgument("tier.config")}).dest("tier.config").type(String.class).required(false).setDefault((Object)DEFAULT_KAFKA_PROPS_FILE).help("The path to a configuration file containing the required properties");
        parser.addArgument(new String[]{RecoveryUtils.makeArgument(LOGGING_LEVEL)}).dest(LOGGING_LEVEL).type(String.class).required(false).setDefault((Object)"INFO").help(LOGGING_LEVEL_DOC);
        return parser;
    }

    private static void setupLogger(String level) {
        ConsoleHandler handler = new ConsoleHandler();
        handler.setFormatter(new SimpleFormatter(){
            private static final String format = "[%1$-7s] %2$s %n";

            @Override
            public synchronized String format(LogRecord lr) {
                return String.format(format, lr.getLevel().getLocalizedName(), lr.getMessage());
            }
        });
        handler.setLevel(Level.parse(level));
        LOGGER.addHandler(handler);
        LOGGER.setUseParentHandlers(false);
        LOGGER.setLevel(Level.parse(level));
    }

    public static void main(String[] args) throws Exception {
        block2: {
            ArgumentParser parser = ValidateSegments.createArgParser();
            try {
                ValidateSegments.run(parser.parseArgs(args));
            }
            catch (ArgumentParserException e) {
                parser.handleError(e);
                if (e instanceof HelpScreenException) break block2;
                throw e;
            }
        }
    }

    static {
        TIME = Time.SYSTEM;
        REQUIRED_FILE_TYPES = new HashSet<String>(Arrays.asList(TierObjectStore.FileType.SEGMENT.suffix(), TierObjectStore.FileType.OFFSET_INDEX.suffix(), TierObjectStore.FileType.TIMESTAMP_INDEX.suffix(), TierObjectStore.FileType.PRODUCER_STATE.suffix(), TierObjectStore.FileType.EPOCH_STATE.suffix()));
        OBJECT_STORE_REQUIRED_PROPERTIES = Arrays.asList(KafkaConfig.TierMetadataNamespaceProp(), KafkaConfig.TierBackendProp(), KafkaConfig.TierS3RegionProp(), KafkaConfig.TierS3BucketProp(), KafkaConfig.TierS3PrefixProp(), KafkaConfig.TierS3AssumeRoleArnProp(), KafkaConfig.TierS3CredFilePathProp(), KafkaConfig.TierGcsRegionProp(), KafkaConfig.TierGcsBucketProp(), KafkaConfig.TierGcsPrefixProp(), KafkaConfig.TierGcsCredFilePathProp(), KafkaConfig.TierGcsWriteChunkSizeProp(), KafkaConfig.TierAzureBlockBlobContainerProp(), KafkaConfig.TierAzureBlockBlobCredFilePathProp(), KafkaConfig.TierAzureBlockBlobEndpointProp(), KafkaConfig.TierAzureBlockBlobPrefixProp(), KafkaConfig.TierAzureBlockBlobAutoAbortThresholdBytesProp());
    }

    static final class Response {
        List<ValidateFtpsSegmentsResponse.SegmentDetail> missingFiles = new ArrayList<ValidateFtpsSegmentsResponse.SegmentDetail>();

        Response() {
        }

        public List<ValidateFtpsSegmentsResponse.SegmentDetail> getMissingFiles() {
            return this.missingFiles;
        }

        public int getTotalMissingFiles() {
            int totalMissingFiles = 0;
            for (ValidateFtpsSegmentsResponse.SegmentDetail segmentDetail : this.missingFiles) {
                totalMissingFiles += segmentDetail.getFiles().size();
            }
            return totalMissingFiles;
        }
    }
}

