package io.confluent.ksql.rest.healthcheck;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.HealthCheckResponseDetail;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.ServerUtil;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/healthcheck/HealthCheckAgent.class */
public class HealthCheckAgent {
    private static final Logger log = LoggerFactory.getLogger(HealthCheckAgent.class);
    public static final String METASTORE_CHECK_NAME = "metastore";
    public static final String KAFKA_CHECK_NAME = "kafka";
    public static final String COMMAND_RUNNER_CHECK_NAME = "commandRunner";
    private static final List<Check> DEFAULT_CHECKS = ImmutableList.of(new ExecuteStatementCheck(METASTORE_CHECK_NAME, "list streams; list tables; list queries;"), new KafkaBrokerCheck(KAFKA_CHECK_NAME), new CommandRunnerCheck(COMMAND_RUNNER_CHECK_NAME));
    private final SimpleKsqlClient ksqlClient;
    private final URI serverEndpoint;
    private final KsqlConfig ksqlConfig;
    private final CommandRunner commandRunner;
    private final Admin adminClient;

    /* loaded from: input_file:io/confluent/ksql/rest/healthcheck/HealthCheckAgent$Check.class */
    private interface Check {
        String getName();

        HealthCheckResponseDetail check(HealthCheckAgent healthCheckAgent);
    }

    /* loaded from: input_file:io/confluent/ksql/rest/healthcheck/HealthCheckAgent$CommandRunnerCheck.class */
    private static class CommandRunnerCheck implements Check {
        private final String name;

        CommandRunnerCheck(String str) {
            this.name = (String) Objects.requireNonNull(str, "name");
        }

        @Override // io.confluent.ksql.rest.healthcheck.HealthCheckAgent.Check
        public String getName() {
            return this.name;
        }

        @Override // io.confluent.ksql.rest.healthcheck.HealthCheckAgent.Check
        public HealthCheckResponseDetail check(HealthCheckAgent healthCheckAgent) {
            return new HealthCheckResponseDetail(healthCheckAgent.commandRunner.checkCommandRunnerStatus() == CommandRunner.CommandRunnerStatus.RUNNING);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/healthcheck/HealthCheckAgent$ExecuteStatementCheck.class */
    private static class ExecuteStatementCheck implements Check {
        private final String name;
        private final String ksqlStatement;

        ExecuteStatementCheck(String str, String str2) {
            this.name = (String) Objects.requireNonNull(str, "name");
            this.ksqlStatement = (String) Objects.requireNonNull(str2, "ksqlStatement");
        }

        @Override // io.confluent.ksql.rest.healthcheck.HealthCheckAgent.Check
        public String getName() {
            return this.name;
        }

        @Override // io.confluent.ksql.rest.healthcheck.HealthCheckAgent.Check
        public HealthCheckResponseDetail check(HealthCheckAgent healthCheckAgent) {
            return new HealthCheckResponseDetail(healthCheckAgent.ksqlClient.makeKsqlRequest(healthCheckAgent.serverEndpoint, this.ksqlStatement, ImmutableMap.of("request.ksql.internal.request", true)).isSuccessful());
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/healthcheck/HealthCheckAgent$KafkaBrokerCheck.class */
    private static class KafkaBrokerCheck implements Check {
        private static final int DESCRIBE_TOPICS_TIMEOUT_MS = 30000;
        private final String name;

        KafkaBrokerCheck(String str) {
            this.name = (String) Objects.requireNonNull(str, "name");
        }

        @Override // io.confluent.ksql.rest.healthcheck.HealthCheckAgent.Check
        public String getName() {
            return this.name;
        }

        @Override // io.confluent.ksql.rest.healthcheck.HealthCheckAgent.Check
        @SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"})
        public HealthCheckResponseDetail check(HealthCheckAgent healthCheckAgent) {
            boolean z;
            String commandTopic = ReservedInternalTopics.commandTopic(healthCheckAgent.ksqlConfig);
            try {
                HealthCheckAgent.log.info("Checking ksql's ability to contact broker");
                healthCheckAgent.adminClient.describeTopics(Collections.singletonList(commandTopic), new DescribeTopicsOptions().timeoutMs(Integer.valueOf(DESCRIBE_TOPICS_TIMEOUT_MS))).allTopicNames().get();
                z = true;
            } catch (Exception e) {
                HealthCheckAgent.log.error("Error describing command topic during health check", e);
                z = (e instanceof UnknownTopicOrPartitionException) || (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException);
            } catch (KsqlTopicAuthorizationException e2) {
                HealthCheckAgent.log.info("ksqlDB denied access to describe cmd topic. This is considered healthy");
                z = true;
            }
            return new HealthCheckResponseDetail(z);
        }
    }

    public HealthCheckAgent(SimpleKsqlClient simpleKsqlClient, KsqlRestConfig ksqlRestConfig, KsqlConfig ksqlConfig, CommandRunner commandRunner, Admin admin) {
        this.ksqlClient = (SimpleKsqlClient) Objects.requireNonNull(simpleKsqlClient, "ksqlClient");
        this.serverEndpoint = ServerUtil.getServerAddress(ksqlRestConfig);
        this.ksqlConfig = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "ksqlConfig");
        this.commandRunner = (CommandRunner) Objects.requireNonNull(commandRunner, COMMAND_RUNNER_CHECK_NAME);
        this.adminClient = (Admin) Objects.requireNonNull(admin, "adminClient");
    }

    public HealthCheckResponse checkHealth() {
        Map map = (Map) DEFAULT_CHECKS.stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, check -> {
            return check.check(this);
        }));
        return new HealthCheckResponse(map.values().stream().allMatch((v0) -> {
            return v0.getIsHealthy();
        }), map);
    }
}
