package io.confluent.ksql.security;

import io.confluent.ksql.services.KafkaClusterUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/security/KsqlAuthorizationValidatorFactory.class */
public final class KsqlAuthorizationValidatorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(KsqlAuthorizationValidatorFactory.class);
    private static final String KAFKA_AUTHORIZER_CLASS_NAME = "authorizer.class.name";

    private KsqlAuthorizationValidatorFactory() {
    }

    public static Optional<KsqlAuthorizationValidator> create(KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        String string = ksqlConfig.getString("ksql.access.validator.enable");
        if (string.equals("on")) {
            LOG.info("Forcing topic access validator");
            return Optional.of(createAuthorizationValidator(ksqlConfig));
        }
        if (string.equals("off")) {
            return Optional.empty();
        }
        Admin adminClient = serviceContext.getAdminClient();
        if (isKafkaAuthorizerEnabled(adminClient)) {
            if (KafkaClusterUtil.isAuthorizedOperationsSupported(adminClient)) {
                LOG.info("KSQL topic authorization checks enabled.");
                return Optional.of(createAuthorizationValidator(ksqlConfig));
            }
            LOG.warn("The Kafka broker has an authorization service enabled, but the Kafka version does not support authorizedOperations(). KSQL topic authorization checks will not be enabled.");
        }
        return Optional.empty();
    }

    private static KsqlAuthorizationValidator createAuthorizationValidator(KsqlConfig ksqlConfig) {
        KsqlAccessValidator ksqlBackendAccessValidator = new KsqlBackendAccessValidator();
        if (ksqlConfig.getLong("ksql.authorization.cache.expiry.time.secs").longValue() > 0) {
            ksqlBackendAccessValidator = new KsqlCacheAccessValidator(ksqlConfig, ksqlBackendAccessValidator);
        }
        return new KsqlAuthorizationValidatorImpl(ksqlBackendAccessValidator);
    }

    private static boolean isKafkaAuthorizerEnabled(Admin admin) {
        try {
            ConfigEntry configEntry = KafkaClusterUtil.getConfig(admin).get(KAFKA_AUTHORIZER_CLASS_NAME);
            if (configEntry != null && configEntry.value() != null) {
                if (!configEntry.value().isEmpty()) {
                    return true;
                }
            }
            return false;
        } catch (KsqlServerException e) {
            if (e.getCause() instanceof ClusterAuthorizationException) {
                return true;
            }
            throw e;
        }
    }
}
