/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.WriteTxnMarkerSpec;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.tools.PrintVersionAndExitAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TransactionsCommand {
    private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
    protected final Time time;

    protected TransactionsCommand(Time time) {
        this.time = time;
    }

    abstract String name();

    abstract void addSubparser(Subparsers var1);

    abstract void execute(ConfluentAdmin var1, Namespace var2, PrintStream var3) throws Exception;

    private static void printErrorAndExit(String message, Throwable t) {
        log.debug(message, t);
        String exitMessage = message + ": " + t.getMessage() + ". Enable debug logging for additional detail.";
        TransactionsCommand.printErrorAndExit(exitMessage);
    }

    private static void printErrorAndExit(String message) {
        System.err.println(message);
        Exit.exit((int)1, (String)message);
    }

    private static ConfluentAdmin buildAdminClient(Namespace ns) {
        Properties properties;
        String configFile = ns.getString("command_config");
        if (configFile == null) {
            properties = new Properties();
        } else {
            try {
                properties = Utils.loadProps((String)configFile);
            }
            catch (IOException e) {
                TransactionsCommand.printErrorAndExit("Failed to load admin client properties", e);
                return null;
            }
        }
        String bootstrapServers = ns.getString("bootstrap_server");
        properties.put("bootstrap.servers", bootstrapServers);
        return (ConfluentAdmin)Admin.create((Properties)properties);
    }

    static ArgumentParser buildBaseParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)"kafka-transactions.sh");
        parser.description("This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and recover from hanging transactions.");
        parser.addArgument(new String[]{"-v", "--version"}).action((ArgumentAction)new PrintVersionAndExitAction()).help("show the version of this Kafka distribution and exit");
        parser.addArgument(new String[]{"--command-config"}).help("property file containing configs to be passed to admin client").action((ArgumentAction)Arguments.store()).type(String.class).metavar(new String[]{"FILE"}).required(false);
        parser.addArgument(new String[]{"--bootstrap-server"}).help("hostname and port for the broker to connect to, in the form `host:port`  (multiple comma-separated entries can be given)").action((ArgumentAction)Arguments.store()).type(String.class).metavar(new String[]{"host:port"}).required(true);
        return parser;
    }

    static void execute(String[] args, Function<Namespace, ConfluentAdmin> adminSupplier, PrintStream out, Time time) throws Exception {
        Namespace ns;
        List<AbortTransactionCommand> commands = Collections.singletonList(new AbortTransactionCommand(time));
        ArgumentParser parser = TransactionsCommand.buildBaseParser();
        Subparsers subparsers = parser.addSubparsers().dest("command").title("commands").metavar("COMMAND");
        commands.forEach(command -> command.addSubparser(subparsers));
        try {
            ns = parser.parseArgs(args);
        }
        catch (ArgumentParserException e) {
            parser.handleError(e);
            Exit.exit((int)1);
            return;
        }
        ConfluentAdmin admin = adminSupplier.apply(ns);
        String commandName = ns.getString("command");
        Optional<TransactionsCommand> commandOpt = commands.stream().filter(cmd -> cmd.name().equals(commandName)).findFirst();
        if (!commandOpt.isPresent()) {
            TransactionsCommand.printErrorAndExit("Unexpected command " + commandName);
        }
        TransactionsCommand command2 = commandOpt.get();
        command2.execute(admin, ns, out);
        Exit.exit((int)0);
    }

    public static void main(String[] args) throws Exception {
        TransactionsCommand.execute(args, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM);
    }

    static class AbortTransactionCommand
    extends TransactionsCommand {
        AbortTransactionCommand(Time time) {
            super(time);
        }

        @Override
        String name() {
            return "abort";
        }

        @Override
        void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).help("abort a hanging transaction (requires administrative privileges)");
            subparser.addArgument(new String[]{"--topic"}).help("topic name").action((ArgumentAction)Arguments.store()).type(String.class).required(true);
            subparser.addArgument(new String[]{"--partition"}).help("partition number").action((ArgumentAction)Arguments.store()).type(Integer.class).required(true);
            ArgumentGroup olderBrokerArgumentGroup = subparser.addArgumentGroup("Brokers on versions older than CP 7.0 (AK 3.0)").description("For older brokers, you must provide all of these arguments");
            olderBrokerArgumentGroup.addArgument(new String[]{"--producer-id"}).help("producer id").action((ArgumentAction)Arguments.store()).type(Long.class);
            olderBrokerArgumentGroup.addArgument(new String[]{"--producer-epoch"}).help("producer epoch").action((ArgumentAction)Arguments.store()).type(Short.class);
            olderBrokerArgumentGroup.addArgument(new String[]{"--coordinator-epoch"}).help("coordinator epoch").action((ArgumentAction)Arguments.store()).type(Integer.class);
        }

        private void abortTransaction(ConfluentAdmin admin, TopicPartition topicPartition, WriteTxnMarkerSpec txnMarkerSpec) throws Exception {
            try {
                admin.writeTransactionMarkers(txnMarkerSpec, Collections.singleton(topicPartition)).all().get();
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to abort transaction " + txnMarkerSpec, e.getCause());
            }
        }

        @Override
        void execute(ConfluentAdmin admin, Namespace ns, PrintStream out) throws Exception {
            String topicName = ns.getString("topic");
            Integer partitionId = ns.getInt("partition");
            TopicPartition topicPartition = new TopicPartition(topicName, partitionId.intValue());
            Long producerId = ns.getLong("producer_id");
            if (producerId == null) {
                TransactionsCommand.printErrorAndExit("Missing required argument --producer-id");
                return;
            }
            Short producerEpoch = ns.getShort("producer_epoch");
            if (producerEpoch == null) {
                TransactionsCommand.printErrorAndExit("Missing required argument --producer-epoch");
                return;
            }
            Integer coordinatorEpoch = ns.getInt("coordinator_epoch");
            if (coordinatorEpoch == null) {
                TransactionsCommand.printErrorAndExit("Missing required argument --coordinator-epoch");
                return;
            }
            if (coordinatorEpoch < 0) {
                coordinatorEpoch = 0;
            }
            WriteTxnMarkerSpec abortSpec = new WriteTxnMarkerSpec(producerId.longValue(), producerEpoch.shortValue(), coordinatorEpoch.intValue(), TransactionResult.ABORT);
            this.abortTransaction(admin, topicPartition, abortSpec);
        }
    }
}

