package io.confluent.ksql.security;

import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.topic.SourceTopicsExtractor;
import io.confluent.ksql.util.KsqlException;
import java.util.Iterator;
import org.apache.kafka.common.acl.AclOperation;

/* loaded from: input_file:io/confluent/ksql/security/KsqlAuthorizationValidatorImpl.class */
public class KsqlAuthorizationValidatorImpl implements KsqlAuthorizationValidator {
    private final KsqlAccessValidator accessValidator;

    public KsqlAuthorizationValidatorImpl(KsqlAccessValidator ksqlAccessValidator) {
        this.accessValidator = ksqlAccessValidator;
    }

    KsqlAccessValidator getAccessValidator() {
        return this.accessValidator;
    }

    @Override // io.confluent.ksql.security.KsqlAuthorizationValidator
    public void checkAuthorization(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, Statement statement) {
        if (statement instanceof Query) {
            validateQuery(ksqlSecurityContext, metaStore, (Query) statement);
            return;
        }
        if (statement instanceof InsertInto) {
            validateInsertInto(ksqlSecurityContext, metaStore, (InsertInto) statement);
            return;
        }
        if (statement instanceof CreateAsSelect) {
            validateCreateAsSelect(ksqlSecurityContext, metaStore, (CreateAsSelect) statement);
        } else if (statement instanceof PrintTopic) {
            validatePrintTopic(ksqlSecurityContext, (PrintTopic) statement);
        } else if (statement instanceof CreateSource) {
            validateCreateSource(ksqlSecurityContext, (CreateSource) statement);
        }
    }

    private void validateQuery(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, Query query) {
        SourceTopicsExtractor sourceTopicsExtractor = new SourceTopicsExtractor(metaStore);
        sourceTopicsExtractor.process(query, null);
        Iterator<String> it = sourceTopicsExtractor.getSourceTopics().iterator();
        while (it.hasNext()) {
            this.accessValidator.checkAccess(ksqlSecurityContext, it.next(), AclOperation.READ);
        }
    }

    private void validateCreateAsSelect(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, CreateAsSelect createAsSelect) {
        validateQuery(ksqlSecurityContext, metaStore, createAsSelect.getQuery());
        this.accessValidator.checkAccess(ksqlSecurityContext, getCreateAsSelectSinkTopic(metaStore, createAsSelect), AclOperation.WRITE);
    }

    private void validateInsertInto(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, InsertInto insertInto) {
        validateQuery(ksqlSecurityContext, metaStore, insertInto.getQuery());
        this.accessValidator.checkAccess(ksqlSecurityContext, getSourceTopicName(metaStore, insertInto.getTarget()), AclOperation.WRITE);
    }

    private void validatePrintTopic(KsqlSecurityContext ksqlSecurityContext, PrintTopic printTopic) {
        this.accessValidator.checkAccess(ksqlSecurityContext, printTopic.getTopic(), AclOperation.READ);
    }

    private void validateCreateSource(KsqlSecurityContext ksqlSecurityContext, CreateSource createSource) {
        this.accessValidator.checkAccess(ksqlSecurityContext, createSource.getProperties().getKafkaTopic(), AclOperation.READ);
    }

    private String getSourceTopicName(MetaStore metaStore, SourceName sourceName) {
        DataSource source = metaStore.getSource(sourceName);
        if (source == null) {
            throw new KsqlException("Cannot validate for topic access from an unknown stream/table: " + sourceName);
        }
        return source.getKafkaTopicName();
    }

    private String getCreateAsSelectSinkTopic(MetaStore metaStore, CreateAsSelect createAsSelect) {
        return (String) createAsSelect.getProperties().getKafkaTopic().orElseGet(() -> {
            return getSourceTopicName(metaStore, createAsSelect.getName());
        });
    }
}
