package io.confluent.kafka.databalancing;

import io.airlift.airline.Cli;
import io.airlift.airline.Command;
import io.airlift.airline.Help;
import io.airlift.airline.Option;
import io.airlift.airline.ParseException;
import io.confluent.kafka.databalancing.exception.NoRebalanceInProgressException;
import io.confluent.kafka.databalancing.exception.RebalanceInProgressException;
import io.confluent.kafka.databalancing.exception.ValidationException;
import io.confluent.kafka.databalancing.license.DefaultLicenseValidator;
import io.confluent.kafka.databalancing.license.LicenseValidator;
import io.confluent.kafka.databalancing.topology.TopologyUtils;
import io.confluent.kafka.databalancing.view.DefaultRebalancerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.admin.AdminOperationException;
import kafka.common.AdminCommandFailedException;
import org.apache.kafka.common.utils.SystemTime;
import scala.Console;

/* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand.class */
public class ConfluentRebalancerCommand {

    /* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand$BaseReassignCommand.class */
    static abstract class BaseReassignCommand extends BaseRebalanceCommand {

        @Option(name = {"--metrics-bootstrap-server"}, required = true, description = "A list of host:port pairs to use for establishing the initial connection to the metrics Kafka cluster")
        protected String metricBootstrapServers;

        @Option(name = {"--remove-broker-ids"}, description = "Partitions will be moved away from the specified brokers (comma-separated). This option can be used to decommission brokers.")
        private String removeBrokerIds;

        @Option(name = {"--min-free-volume-space-percentage"}, description = "The log.dir volume will have at least the specified percentage of free space during and after the rebalance. For example, if the total volume space is 100 GB and this config is defined as 20, the rebalancer will use up to 80 GB during the rebalance .This is only supported in Confluent 3.2 (for both rebalancer and brokers) and if every broker in the cluster has a single log.dir. This is enabled by default if supported and disabled otherwise.")
        private String minFreeVolumeSpacePercentage;

        BaseReassignCommand() {
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        protected RebalancerConfig rebalancerConfig() {
            try {
                Properties properties = this.configFile == null ? new Properties() : org.apache.kafka.common.utils.Utils.loadProps(this.configFile);
                properties.put(RebalancerConfig.METRICS_BOOTSTRAP_SERVERS_CONFIG, this.metricBootstrapServers);
                if (this.minFreeVolumeSpacePercentage != null) {
                    properties.put(RebalancerConfig.MIN_FREE_VOLUME_SPACE_PERCENTAGE_CONFIG, this.minFreeVolumeSpacePercentage);
                }
                return new RebalancerConfig(properties);
            } catch (IOException e) {
                throw new ValidationException("Could not load configuration properties: " + e.getMessage());
            }
        }

        protected List<Integer> removeBrokerIds() {
            if (this.removeBrokerIds == null) {
                return Collections.emptyList();
            }
            String[] split = this.removeBrokerIds.split(",");
            if (split.length == 0) {
                throw removeBrokerIdsException();
            }
            ArrayList arrayList = new ArrayList();
            for (String str : split) {
                try {
                    arrayList.add(Integer.valueOf(Integer.parseInt(str.trim())));
                } catch (NumberFormatException e) {
                    throw removeBrokerIdsException();
                }
            }
            return arrayList;
        }

        private ValidationException removeBrokerIdsException() {
            return new ValidationException("Expected a comma-separated list of numeric broker ids for option '--remove-broker-ids', but received " + this.removeBrokerIds);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand$BaseRebalanceCommand.class */
    public static abstract class BaseRebalanceCommand implements Runnable, Closeable {

        @Option(name = {"--zookeeper"}, required = true, description = "The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.")
        protected String zkConnect;

        @Option(name = {"--config-file"}, description = "Configuration file in properties format")
        protected String configFile;
        protected RebalancerConfig rebalancerConfig;
        private Rebalancer rebalancer;
        private RebalancerFactory rebalancerFactory;
        private LicenseValidator licenseValidator;

        BaseRebalanceCommand() {
        }

        protected Rebalancer getOrCreateRebalancer() {
            if (this.rebalancer == null) {
                if (this.rebalancerFactory == null) {
                    throw new IllegalStateException("rebalancerFactory has not been set");
                }
                this.rebalancer = this.rebalancerFactory.create(this.zkConnect);
            }
            return this.rebalancer;
        }

        public void setLicenseValidator(LicenseValidator licenseValidator) {
            this.licenseValidator = licenseValidator;
        }

        public void setRebalancerFactory(RebalancerFactory rebalancerFactory) {
            this.rebalancerFactory = rebalancerFactory;
        }

        @Override // java.lang.Runnable
        public final void run() {
            this.rebalancerConfig = rebalancerConfig();
            try {
                if (!this.rebalancerConfig.isTrial()) {
                    this.licenseValidator.validateLicense(this.rebalancerConfig.licenseString());
                } else if (this.licenseValidator.validateTrialPeriod(this.zkConnect) == 0) {
                    throw new ValidationException("Your trial period has expired. Please purchase a license to continue using this product.");
                }
                doRun();
            } catch (LicenseValidator.ValidationFailedException e) {
                throw new ValidationException("Could not validate your license: " + e.getMessage());
            }
        }

        protected abstract void doRun();

        protected RebalancerConfig rebalancerConfig() {
            try {
                return new RebalancerConfig(this.configFile == null ? new Properties() : org.apache.kafka.common.utils.Utils.loadProps(this.configFile));
            } catch (IOException e) {
                throw new ValidationException("Could not load configuration properties: " + e.getMessage());
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.rebalancer != null) {
                this.rebalancer.close();
            }
        }
    }

    @Command(name = "execute", description = "Kick off a cluster-wide rebalance operation")
    /* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand$Execute.class */
    public static class Execute extends BaseReassignCommand {

        @Option(name = {"--throttle"}, required = true, description = "The maximum bandwidth, in bytes per second, allocated to moving replicas.")
        protected long replicationQuota;

        @Option(name = {"--force"}, description = "Suppress prompts if true")
        protected boolean force;

        @Option(name = {"--verbose"}, description = "Also output before/after per broker information (disk space usage, leader count, replica count, topic partitions)")
        private boolean verbose = false;

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        protected void doRun() {
            Rebalancer orCreateRebalancer = getOrCreateRebalancer();
            if (orCreateRebalancer.maybeUpdateReplicationQuota(orCreateRebalancer.status().inProgressAssignment(), this.replicationQuota)) {
                return;
            }
            System.out.println("Computing the rebalance plan (this may take a while) ...");
            ProposedRebalance proposeRebalance = orCreateRebalancer.proposeRebalance(removeBrokerIds(), rebalancerConfig());
            if (!proposeRebalance.shouldRebalance()) {
                if (orCreateRebalancer.disengageThrottle()) {
                    System.out.println("The throttle was removed.");
                }
                System.out.println("The cluster is already balanced, exiting.");
                return;
            }
            proposeRebalance.report().print(System.out, this.verbose);
            if (this.force) {
                startRebalance(orCreateRebalancer, proposeRebalance);
                return;
            }
            Boolean bool = null;
            do {
                System.out.print("Would you like to continue? (y/n): ");
                String readLine = Console.readLine();
                if (readLine != null) {
                    if (readLine.equalsIgnoreCase("y")) {
                        bool = true;
                    } else if (readLine.equalsIgnoreCase("n")) {
                        bool = false;
                    }
                }
            } while (bool == null);
            System.out.println();
            if (bool.booleanValue()) {
                startRebalance(orCreateRebalancer, proposeRebalance);
            } else {
                System.out.println("OK, exiting");
            }
        }

        private void startRebalance(Rebalancer rebalancer, ProposedRebalance proposedRebalance) {
            rebalancer.startRebalance(proposedRebalance.currentAssignment(), proposedRebalance.proposedAssignment(), this.replicationQuota);
            System.out.println("The rebalance has been started, run `status` to check progress.");
            System.out.println(String.format("%nWarning: You must run the `status` or `finish` command periodically, until the rebalance completes, to ensure the throttle is removed. You can also alter the throttle by re-running the execute command passing a new value.%n", new Object[0]));
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand, java.io.Closeable, java.lang.AutoCloseable
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setRebalancerFactory(RebalancerFactory rebalancerFactory) {
            super.setRebalancerFactory(rebalancerFactory);
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setLicenseValidator(LicenseValidator licenseValidator) {
            super.setLicenseValidator(licenseValidator);
        }
    }

    @Command(name = "finish", description = "Performs clean-up activities (like disabling replication throttling) as the rebalance completes")
    /* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand$Finish.class */
    public static class Finish extends BaseRebalanceCommand {
        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        protected void doRun() {
            RebalanceStatus status = getOrCreateRebalancer().status();
            if (!status.inProgressAssignment().isEmpty()) {
                throw new RebalanceInProgressException(status.inProgressAssignment().size() + " partitions are being rebalanced");
            }
            System.out.println("The rebalance has completed and throttling has been disabled");
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand, java.io.Closeable, java.lang.AutoCloseable
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setRebalancerFactory(RebalancerFactory rebalancerFactory) {
            super.setRebalancerFactory(rebalancerFactory);
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setLicenseValidator(LicenseValidator licenseValidator) {
            super.setLicenseValidator(licenseValidator);
        }
    }

    @Command(name = "proposed-assignment", description = "Generate the proposed partition assignment and output it in JSON format")
    /* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand$ProposedAssignment.class */
    public static class ProposedAssignment extends BaseReassignCommand {
        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        protected void doRun() {
            System.out.println(getOrCreateRebalancer().proposeRebalance(removeBrokerIds(), rebalancerConfig()).proposedAssignment().toJson());
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand, java.io.Closeable, java.lang.AutoCloseable
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setRebalancerFactory(RebalancerFactory rebalancerFactory) {
            super.setRebalancerFactory(rebalancerFactory);
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setLicenseValidator(LicenseValidator licenseValidator) {
            super.setLicenseValidator(licenseValidator);
        }
    }

    @Command(name = "status", description = "Show the status of the current rebalance operation (if there is one)")
    /* loaded from: input_file:io/confluent/kafka/databalancing/ConfluentRebalancerCommand$Status.class */
    public static class Status extends BaseRebalanceCommand {
        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        protected void doRun() {
            RebalanceStatus status = getOrCreateRebalancer().status();
            if (status.inProgressAssignment().isEmpty()) {
                throw new NoRebalanceInProgressException("No rebalance is currently in progress. If you have called `status` after a rebalance was started successfully, the rebalance has completed. Run the `execute` command to check if the cluster is balanced.");
            }
            System.out.println("Partitions being rebalanced:");
            Iterator<String> it = TopologyUtils.partitionsByTopicToLines(status.inProgressAssignment().topicPartitions()).iterator();
            while (it.hasNext()) {
                System.out.println("\t" + it.next());
            }
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand, java.io.Closeable, java.lang.AutoCloseable
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setRebalancerFactory(RebalancerFactory rebalancerFactory) {
            super.setRebalancerFactory(rebalancerFactory);
        }

        @Override // io.confluent.kafka.databalancing.ConfluentRebalancerCommand.BaseRebalanceCommand
        public /* bridge */ /* synthetic */ void setLicenseValidator(LicenseValidator licenseValidator) {
            super.setLicenseValidator(licenseValidator);
        }
    }

    public static void main(String... strArr) {
        try {
            run(new DefaultRebalancerFactory(), new DefaultLicenseValidator(new SystemTime()), strArr);
        } catch (ParseException | ValidationException | AdminOperationException | AdminCommandFailedException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        } catch (Throwable th) {
            System.err.println("Unexpected exception: " + th.getMessage());
            System.err.println(org.apache.kafka.common.utils.Utils.stackTrace(th));
            System.exit(1);
        }
    }

    public static void run(RebalancerFactory rebalancerFactory, LicenseValidator licenseValidator, String... strArr) {
        Runnable runnable = (Runnable) Cli.builder("confluent-rebalancer").withDescription("Confluent Rebalancer Tool for Kafka").withDefaultCommand(Help.class).withCommands(Help.class, new Class[]{ProposedAssignment.class, Execute.class, Status.class, Finish.class}).build().parse(strArr);
        if (!(runnable instanceof BaseRebalanceCommand)) {
            runnable.run();
            return;
        }
        BaseRebalanceCommand baseRebalanceCommand = (BaseRebalanceCommand) runnable;
        baseRebalanceCommand.setRebalancerFactory(rebalancerFactory);
        baseRebalanceCommand.setLicenseValidator(licenseValidator);
        try {
            baseRebalanceCommand.run();
            baseRebalanceCommand.close();
        } catch (Throwable th) {
            baseRebalanceCommand.close();
            throw th;
        }
    }
}
