/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.tier.tools.RecoveryUtils;
import kafka.tier.tools.TierMetadataRecoveryConstants;
import kafka.tier.tools.TierMetadataRecoveryOrchestrator;
import kafka.tier.tools.TierMetadataRecoveryUtils;
import kafka.tier.tools.commands.RewindTierTopicConsumerCommandResponse;
import kafka.tier.topic.TierTopicConsumerRewindPolicy;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentType;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;

public final class TierTopicConsumerRewindCommand {
    static final String COMMAND = "tier-topic-consumer-rewind";
    private static final String REWIND_FORCE_STOP = "force-stop";
    private static final String REWIND_FORCE_STOP_DOC = "Force stops the tier topic consumer prior to running the rewind operation";
    private static final String REWIND_POLICY = "rewind-policy";
    private static final String REWIND_POLICY_DOC = "The value can be one of the following (default: FAIL_ON_MISSING_PARTITIONS):\n - FAIL_ON_MISSING_PARTITIONS: the broker will fail the entire rewind operation if any tier topic partition present in the rewind request is absent in the broker state. This is the default value.\n - SKIP_MISSING_PARTITIONS: the broker will skip the rewind operation only for those tier topic partitions from the rewind request that are absent in the broker state. The rewind operation will be allowed to succeed partially for other tier topic partitions. It is rare that you will need to use this option, so please use it only when needed.";
    private static final String RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT = "reconciled-data-loss-validator-output";
    private static final String RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT_DOC = "Name of the file that contains the reconciled output of data loss validator.";
    private static final String PARTITION_TO_POSITION_FILE = "partition-to-position-file";
    private static final String PARTITION_TO_POSITION_FILE_DOC = "Name of the file that contains the mapping of topic partitions to offsets and epochs. If not provided, then --rewind-to-earliest-offset must be provided";
    private static final String REWIND_TO_EARLIEST_OFFSET = "rewind-to-earliest-offset";
    private static final String REWIND_TO_EARLIEST_OFFSET_DOC = "If provided, will automatically rewind all partitions to earliest offset for each partition as determined by the broker.";

    static void addCommand(Subparsers subparsers) {
        Subparser tierTopicConsumerRewind = subparsers.addParser(COMMAND).help("Rewind the tier topic consumer according to the provided partition to position map");
        MutuallyExclusiveGroup runScopeGroup = tierTopicConsumerRewind.addMutuallyExclusiveGroup().required(true);
        runScopeGroup.addArgument(new String[]{RecoveryUtils.makeArgument("broker-ids")}).dest("broker-ids").type(String.class).action((ArgumentAction)Arguments.store()).help("Runs the operation only on the specified brokers");
        runScopeGroup.addArgument(new String[]{RecoveryUtils.makeArgument("all-brokers")}).dest("all-brokers").action((ArgumentAction)Arguments.storeTrue()).help("Runs the operation for all brokers in the cluster");
        tierTopicConsumerRewind.addArgument(new String[]{RecoveryUtils.makeArgument("rest-server-port-override")}).dest("rest-server-port-override").type(Integer.class).action((ArgumentAction)Arguments.store()).setDefault((Object)TierMetadataRecoveryConstants.DEFAULT_REST_SERVER_PORT_OVERRIDE).help(TierMetadataRecoveryConstants.REST_SERVER_PORT_OVERRIDE_DOC);
        tierTopicConsumerRewind.addArgument(new String[]{RecoveryUtils.makeArgument(REWIND_FORCE_STOP)}).dest(REWIND_FORCE_STOP).action((ArgumentAction)Arguments.storeTrue()).help(REWIND_FORCE_STOP_DOC);
        tierTopicConsumerRewind.addArgument(new String[]{RecoveryUtils.makeArgument(REWIND_POLICY)}).dest(REWIND_POLICY).action((ArgumentAction)Arguments.store()).type(String.class).setDefault((Object)TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS.toString()).help(REWIND_POLICY_DOC);
        MutuallyExclusiveGroup rewindTypeGroup = tierTopicConsumerRewind.addMutuallyExclusiveGroup().required(true);
        rewindTypeGroup.addArgument(new String[]{RecoveryUtils.makeArgument(REWIND_TO_EARLIEST_OFFSET)}).dest(REWIND_TO_EARLIEST_OFFSET).action((ArgumentAction)Arguments.storeTrue()).help(REWIND_TO_EARLIEST_OFFSET_DOC);
        rewindTypeGroup.addArgument(new String[]{RecoveryUtils.makeArgument(PARTITION_TO_POSITION_FILE)}).dest(PARTITION_TO_POSITION_FILE).type((ArgumentType)Arguments.fileType().verifyCanRead()).action((ArgumentAction)Arguments.store()).help(PARTITION_TO_POSITION_FILE_DOC);
        rewindTypeGroup.addArgument(new String[]{RecoveryUtils.makeArgument(RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT)}).dest(RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT).action((ArgumentAction)Arguments.store()).type((ArgumentType)Arguments.fileType().verifyCanRead()).help(RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT_DOC);
    }

    static Map<Integer, Map<Long, Optional<Integer>>> parsePartitionToPositionFile(String partitionToPositionFilePath) throws IOException {
        HashMap<Integer, Map<Long, Optional<Integer>>> partitionToPosition = new HashMap<Integer, Map<Long, Optional<Integer>>>();
        try (BufferedReader reader = new BufferedReader(new FileReader(partitionToPositionFilePath));){
            String line;
            while ((line = reader.readLine()) != null) {
                String[] entry = line.trim().split(" ");
                if (entry.length != 3) {
                    throw new IllegalArgumentException("Expected tier topic partition, offset, and epoch to be provided per row.");
                }
                Integer tierTopicPartition = Integer.parseInt(entry[0]);
                Long offset = Long.parseLong(entry[1]);
                Integer epoch = Integer.parseInt(entry[2]) < 0 ? null : Integer.valueOf(entry[2]);
                Optional<Integer> epochOpt = Optional.ofNullable(epoch);
                partitionToPosition.put(tierTopicPartition, Collections.singletonMap(offset, epochOpt));
            }
        }
        return partitionToPosition;
    }

    static void printRewindReport(RewindTierTopicConsumerCommandResponse response) {
        StringBuilder formattedRewindResponse = new StringBuilder();
        if (!response.getSuccess().isEmpty()) {
            formattedRewindResponse.append(String.format("Rewind tier topic consumer succeeded on the following brokers: %s\n", response.getSuccess().stream().map(String::valueOf).collect(Collectors.joining(", "))));
        }
        if (!response.getFailed().isEmpty()) {
            formattedRewindResponse.append("Rewind tier topic consumer failed on the following brokers:\n");
            int failedIndex = 1;
            for (RewindTierTopicConsumerCommandResponse.FailedBrokerDetail broker : response.getFailed()) {
                StringBuilder skippedPartitions = new StringBuilder();
                for (Map.Entry<String, Set<Integer>> entry : broker.skippedPartitions().entrySet()) {
                    skippedPartitions.append(String.format("\t- logDir = %s, skippedPartitions = %s\n", entry.getKey(), entry.getValue().stream().map(String::valueOf).collect(Collectors.joining(", "))));
                }
                formattedRewindResponse.append(String.format("%d. Broker %d, Skipped Partitions:\n%s", failedIndex++, broker.broker(), skippedPartitions));
            }
        }
        System.out.print(formattedRewindResponse);
    }

    static int execute(Namespace namespace) throws ExecutionException, InterruptedException, IOException {
        Optional<Map<Integer, Map<Long, Optional<Integer>>>> partitionToPositionOpt;
        String bootstrapServers = namespace.getString("bootstrap-servers");
        String adminConfig = namespace.getString("admin.config");
        Integer restServerPort = namespace.getInt("rest-server-port-override");
        TierMetadataRecoveryOrchestrator orchestrator = TierMetadataRecoveryUtils.getTierMetadataRecoveryOrchestrator(adminConfig, bootstrapServers, restServerPort);
        boolean forceStop = namespace.getBoolean(REWIND_FORCE_STOP);
        TierTopicConsumerRewindPolicy rewindPolicy = TierTopicConsumerRewindPolicy.valueOf(namespace.getString(REWIND_POLICY));
        String reconciledDataLossValidatorOutput = namespace.getString(RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT);
        String partitionToPositionFile = namespace.getString(PARTITION_TO_POSITION_FILE);
        boolean rewindToEarliestOffset = namespace.getBoolean(REWIND_TO_EARLIEST_OFFSET);
        if (reconciledDataLossValidatorOutput != null) {
            partitionToPositionOpt = Optional.of(TierMetadataRecoveryUtils.getPartitionToPositionFromReconciledDLVOutput(reconciledDataLossValidatorOutput));
        } else if (partitionToPositionFile != null) {
            partitionToPositionOpt = Optional.of(TierTopicConsumerRewindCommand.parsePartitionToPositionFile(partitionToPositionFile));
        } else if (rewindToEarliestOffset) {
            partitionToPositionOpt = Optional.empty();
        } else {
            throw new IllegalArgumentException(String.format("Either --%s, --%s, or --%s must be provided", RECONCILED_DATA_LOSS_VALIDATOR_OUTPUT, PARTITION_TO_POSITION_FILE, REWIND_TO_EARLIEST_OFFSET));
        }
        RewindTierTopicConsumerCommandResponse response = new RewindTierTopicConsumerCommandResponse();
        if (namespace.getBoolean("all-brokers").booleanValue()) {
            response = orchestrator.rewindTierTopicConsumerForCluster(partitionToPositionOpt, forceStop, rewindPolicy);
        } else {
            Set<Integer> brokers = TierMetadataRecoveryUtils.getBrokerList(namespace);
            for (Integer broker : brokers) {
                RewindTierTopicConsumerCommandResponse brokerRes = orchestrator.rewindTierTopicConsumerForBroker(partitionToPositionOpt, forceStop, rewindPolicy, broker);
                response.getSuccess().addAll(brokerRes.getSuccess());
                response.getFailed().addAll(brokerRes.getFailed());
            }
        }
        TierTopicConsumerRewindCommand.printRewindReport(response);
        if (response.getFailed().isEmpty()) {
            return 0;
        }
        return 1;
    }
}

