package io.confluent.ksql.schema.ksql.inference;

import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;

/* loaded from: input_file:io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.class */
public class DefaultSchemaInjector implements Injector {
    private static final SqlSchemaFormatter FORMATTER = new SqlSchemaFormatter(str -> {
        return !IdentifierUtil.isValid(str);
    }, new SqlSchemaFormatter.Option[]{SqlSchemaFormatter.Option.AS_COLUMN_LIST});
    private final TopicSchemaSupplier schemaSupplier;

    public DefaultSchemaInjector(TopicSchemaSupplier topicSchemaSupplier) {
        this.schemaSupplier = (TopicSchemaSupplier) Objects.requireNonNull(topicSchemaSupplier, "schemaSupplier");
    }

    @Override // io.confluent.ksql.statement.Injector
    public <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement) {
        if (!(configuredStatement.getStatement() instanceof CreateSource)) {
            return configuredStatement;
        }
        try {
            return forCreateStatement(configuredStatement).orElse(configuredStatement);
        } catch (KsqlStatementException e) {
            throw e;
        } catch (KsqlException e2) {
            throw new KsqlStatementException(ErrorMessageUtil.buildErrorMessage(e2), configuredStatement.getStatementText(), e2.getCause());
        }
    }

    private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(ConfiguredStatement<CreateSource> configuredStatement) {
        return (hasValueElements(configuredStatement) || !configuredStatement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) ? Optional.empty() : Optional.of(ConfiguredStatement.of(buildPreparedStatement(addSchemaFields(configuredStatement, getValueSchema(configuredStatement))), configuredStatement.getOverrides(), configuredStatement.getConfig()));
    }

    private TopicSchemaSupplier.SchemaAndId getValueSchema(ConfiguredStatement<CreateSource> configuredStatement) {
        String kafkaTopic = configuredStatement.getStatement().getProperties().getKafkaTopic();
        TopicSchemaSupplier.SchemaResult schemaResult = (TopicSchemaSupplier.SchemaResult) configuredStatement.getStatement().getProperties().getSchemaId().map(num -> {
            return this.schemaSupplier.getValueSchema(kafkaTopic, Optional.of(num));
        }).orElseGet(() -> {
            return this.schemaSupplier.getValueSchema(kafkaTopic, Optional.empty());
        });
        if (!schemaResult.failureReason.isPresent()) {
            return schemaResult.schemaAndId.get();
        }
        Exception exc = schemaResult.failureReason.get();
        throw new KsqlStatementException(exc.getMessage(), configuredStatement.getStatementText(), exc);
    }

    private static boolean hasValueElements(ConfiguredStatement<CreateSource> configuredStatement) {
        return configuredStatement.getStatement().getElements().stream().anyMatch(tableElement -> {
            return tableElement.getNamespace().equals(TableElement.Namespace.VALUE);
        });
    }

    private static CreateSource addSchemaFields(ConfiguredStatement<CreateSource> configuredStatement, TopicSchemaSupplier.SchemaAndId schemaAndId) {
        TableElements buildElements = buildElements(schemaAndId.schema, configuredStatement);
        CreateSource statement = configuredStatement.getStatement();
        CreateSourceProperties properties = statement.getProperties();
        return properties.getSchemaId().isPresent() ? statement.copyWith(buildElements, properties) : statement.copyWith(buildElements, properties.withSchemaId(schemaAndId.id));
    }

    private static TableElements buildElements(Schema schema, ConfiguredStatement<CreateSource> configuredStatement) {
        throwOnInvalidSchema(schema);
        ArrayList arrayList = new ArrayList();
        Stream<TableElement> keyColumns = getKeyColumns(configuredStatement);
        arrayList.getClass();
        keyColumns.forEach((v1) -> {
            r1.add(v1);
        });
        Stream<TableElement> columnsFromSchema = getColumnsFromSchema(schema);
        arrayList.getClass();
        columnsFromSchema.forEach((v1) -> {
            r1.add(v1);
        });
        return TableElements.of(arrayList);
    }

    private static Stream<TableElement> getKeyColumns(ConfiguredStatement<CreateSource> configuredStatement) {
        return configuredStatement.getStatement().getElements().stream().filter(tableElement -> {
            return tableElement.getNamespace() == TableElement.Namespace.KEY;
        });
    }

    private static Stream<TableElement> getColumnsFromSchema(Schema schema) {
        return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream();
    }

    private static void throwOnInvalidSchema(Schema schema) {
        try {
            SchemaConverters.connectToSqlConverter().toSqlType(schema);
        } catch (Exception e) {
            throw new KsqlException("Schema contains types not supported by KSQL: " + e.getMessage() + System.lineSeparator() + "Schema: " + schema, e);
        }
    }

    private static KsqlParser.PreparedStatement<CreateSource> buildPreparedStatement(CreateSource createSource) {
        return KsqlParser.PreparedStatement.of(SqlFormatter.formatSql(createSource), createSource);
    }
}
