package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.ColumnConstraints;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/* loaded from: input_file:io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.class */
public class DefaultSchemaInjector implements Injector {
    private static final ColumnConstraints KEY_CONSTRAINT = new ColumnConstraints.Builder().key().build();
    private static final ColumnConstraints PRIMARY_KEY_CONSTRAINT = new ColumnConstraints.Builder().primaryKey().build();
    private final TopicSchemaSupplier schemaSupplier;
    private final KsqlExecutionContext executionContext;
    private final ServiceContext serviceContext;

    public DefaultSchemaInjector(TopicSchemaSupplier topicSchemaSupplier, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        this.schemaSupplier = (TopicSchemaSupplier) Objects.requireNonNull(topicSchemaSupplier, "schemaSupplier");
        this.executionContext = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "executionContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
    }

    @Override // io.confluent.ksql.statement.Injector
    public <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement) {
        if (!(configuredStatement.getStatement() instanceof CreateSource) && !(configuredStatement.getStatement() instanceof CreateAsSelect)) {
            return configuredStatement;
        }
        try {
            return configuredStatement.getStatement() instanceof CreateSource ? forCreateStatement(configuredStatement).orElse(configuredStatement) : forCreateAsStatement(configuredStatement).orElse(configuredStatement);
        } catch (KsqlStatementException e) {
            throw e;
        } catch (KsqlException e2) {
            throw new KsqlStatementException(ErrorMessageUtil.buildErrorMessage(e2), configuredStatement.getMaskedStatementText(), e2.getCause());
        }
    }

    private Optional<ConfiguredStatement<CreateAsSelect>> forCreateAsStatement(ConfiguredStatement<CreateAsSelect> configuredStatement) {
        CreateSourceAsProperties properties = configuredStatement.getStatement().getProperties();
        if (!properties.getKeySchemaId().isPresent() && !properties.getValueSchemaId().isPresent()) {
            return Optional.empty();
        }
        try {
            SandboxedServiceContext create = SandboxedServiceContext.create(this.serviceContext);
            CreateSourceCommand createSourceCommand = (CreateSourceCommand) this.executionContext.createSandbox(create).plan(create, configuredStatement).getDdlCommand().get();
            Optional<TopicSchemaSupplier.SchemaAndId> createAsKeySchema = getCreateAsKeySchema(configuredStatement, createSourceCommand);
            Optional<TopicSchemaSupplier.SchemaAndId> createAsValueSchema = getCreateAsValueSchema(configuredStatement, createSourceCommand);
            KsqlParser.PreparedStatement buildPreparedStatement = buildPreparedStatement(addSchemaFieldsCas(configuredStatement, createAsKeySchema, createAsValueSchema));
            ImmutableMap.Builder builder = ImmutableMap.builder();
            if (properties.getKeySchemaId().isPresent()) {
                createAsKeySchema.map(schemaAndId -> {
                    return builder.put("KEY_SCHEMA_ID", schemaAndId);
                });
            }
            if (properties.getValueSchemaId().isPresent()) {
                createAsValueSchema.map(schemaAndId2 -> {
                    return builder.put("VALUE_SCHEMA_ID", schemaAndId2);
                });
            }
            return Optional.of(ConfiguredStatement.of(buildPreparedStatement, configuredStatement.getSessionConfig().copyWith(builder.build())));
        } catch (Exception e) {
            throw new KsqlStatementException("Could not determine output schema for query due to error: " + e.getMessage(), configuredStatement.getMaskedStatementText(), e);
        }
    }

    private Optional<TopicSchemaSupplier.SchemaAndId> getCreateAsKeySchema(ConfiguredStatement<CreateAsSelect> configuredStatement, CreateSourceCommand createSourceCommand) {
        CreateSourceAsProperties properties = configuredStatement.getStatement().getProperties();
        FormatInfo keyFormat = createSourceCommand.getFormats().getKeyFormat();
        if (!shouldInferSchema(properties.getKeySchemaId(), configuredStatement, keyFormat, true)) {
            return Optional.empty();
        }
        SerdeFeatures buildKeyFeatures = SerdeFeaturesFactory.buildKeyFeatures(FormatFactory.of(keyFormat), true);
        if (!shouldInferSchema(properties.getKeySchemaId(), configuredStatement, keyFormat, true)) {
            return Optional.empty();
        }
        TopicSchemaSupplier.SchemaAndId schema = getSchema(properties.getKafkaTopic(), properties.getKeySchemaId(), keyFormat, buildKeyFeatures, configuredStatement.getMaskedStatementText(), true);
        checkColumnsCompatibility(properties.getKeySchemaId(), createSourceCommand.getSchema().key(), schema.columns, true);
        return Optional.of(schema);
    }

    private Optional<TopicSchemaSupplier.SchemaAndId> getCreateAsValueSchema(ConfiguredStatement<CreateAsSelect> configuredStatement, CreateSourceCommand createSourceCommand) {
        CreateSourceAsProperties properties = configuredStatement.getStatement().getProperties();
        FormatInfo valueFormat = createSourceCommand.getFormats().getValueFormat();
        if (!shouldInferSchema(properties.getValueSchemaId(), configuredStatement, valueFormat, false)) {
            return Optional.empty();
        }
        TopicSchemaSupplier.SchemaAndId schema = getSchema(properties.getKafkaTopic(), properties.getValueSchemaId(), valueFormat, createSourceCommand.getFormats().getValueFeatures(), configuredStatement.getMaskedStatementText(), false);
        checkColumnsCompatibility(properties.getValueSchemaId(), createSourceCommand.getSchema().value(), schema.columns, false);
        return Optional.of(schema);
    }

    private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(ConfiguredStatement<CreateSource> configuredStatement) {
        Optional<TopicSchemaSupplier.SchemaAndId> keySchema = getKeySchema(configuredStatement);
        Optional<TopicSchemaSupplier.SchemaAndId> valueSchema = getValueSchema(configuredStatement);
        if (!keySchema.isPresent() && !valueSchema.isPresent()) {
            return Optional.empty();
        }
        CreateSource addSchemaFields = addSchemaFields(configuredStatement, keySchema, valueSchema);
        KsqlParser.PreparedStatement buildPreparedStatement = buildPreparedStatement(addSchemaFields);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        if (addSchemaFields.getProperties().getKeySchemaId().isPresent()) {
            keySchema.map(schemaAndId -> {
                return builder.put("KEY_SCHEMA_ID", schemaAndId);
            });
        }
        if (addSchemaFields.getProperties().getValueSchemaId().isPresent()) {
            valueSchema.map(schemaAndId2 -> {
                return builder.put("VALUE_SCHEMA_ID", schemaAndId2);
            });
        }
        return Optional.of(ConfiguredStatement.of(buildPreparedStatement, configuredStatement.getSessionConfig().copyWith(builder.build())));
    }

    private Optional<TopicSchemaSupplier.SchemaAndId> getKeySchema(ConfiguredStatement<CreateSource> configuredStatement) {
        CreateSource statement = configuredStatement.getStatement();
        CreateSourceProperties properties = statement.getProperties();
        FormatInfo keyFormat = SourcePropertiesUtil.getKeyFormat(properties, statement.getName());
        return !shouldInferSchema(properties.getKeySchemaId(), configuredStatement, keyFormat, true) ? Optional.empty() : Optional.of(getSchema(Optional.of(properties.getKafkaTopic()), properties.getKeySchemaId(), keyFormat, SerdeFeaturesFactory.buildKeyFeatures(FormatFactory.of(keyFormat), true), configuredStatement.getMaskedStatementText(), true));
    }

    private Optional<TopicSchemaSupplier.SchemaAndId> getValueSchema(ConfiguredStatement<CreateSource> configuredStatement) {
        CreateSourceProperties properties = configuredStatement.getStatement().getProperties();
        FormatInfo valueFormat = SourcePropertiesUtil.getValueFormat(properties);
        return !shouldInferSchema(properties.getValueSchemaId(), configuredStatement, valueFormat, false) ? Optional.empty() : Optional.of(getSchema(Optional.of(properties.getKafkaTopic()), properties.getValueSchemaId(), valueFormat, properties.getValueSerdeFeatures(), configuredStatement.getMaskedStatementText(), false));
    }

    private TopicSchemaSupplier.SchemaAndId getSchema(Optional<String> optional, Optional<Integer> optional2, FormatInfo formatInfo, SerdeFeatures serdeFeatures, String str, boolean z) {
        TopicSchemaSupplier.SchemaResult keySchema = z ? this.schemaSupplier.getKeySchema(optional, optional2, formatInfo, serdeFeatures) : this.schemaSupplier.getValueSchema(optional, optional2, formatInfo, serdeFeatures);
        if (!keySchema.failureReason.isPresent()) {
            return keySchema.schemaAndId.get();
        }
        Exception exc = keySchema.failureReason.get();
        throw new KsqlStatementException(exc.getMessage(), str, exc);
    }

    private static boolean shouldInferSchema(Optional<Integer> optional, ConfiguredStatement<? extends Statement> configuredStatement, FormatInfo formatInfo, boolean z) {
        boolean hasKeyElements;
        String str = z ? "KEY_FORMAT" : "VALUE_FORMAT";
        String str2 = z ? "KEY_SCHEMA_ID" : "VALUE_SCHEMA_ID";
        String format = String.format("%s should support schema inference when %s is provided. Current format is %s.", str, str2, formatInfo.getFormat());
        try {
            Format of = FormatFactory.of(formatInfo);
            boolean z2 = configuredStatement.getStatement() instanceof CreateAsSelect;
            if (z2) {
                hasKeyElements = false;
            } else {
                hasKeyElements = z ? hasKeyElements(configuredStatement) : hasValueElements(configuredStatement);
            }
            if (!optional.isPresent()) {
                return (z2 || hasKeyElements || !formatSupportsSchemaInference(of)) ? false : true;
            }
            if (!formatSupportsSchemaInference(of)) {
                throw new KsqlException(format);
            }
            if (hasKeyElements) {
                throw new KsqlException("Table elements and " + str2 + " cannot both exist for create statement.");
            }
            return true;
        } catch (KsqlException e) {
            if (e.getMessage().contains("does not support the following configs: [schemaId]")) {
                throw new KsqlException(format);
            }
            throw e;
        }
    }

    private static void checkColumnsCompatibility(Optional<Integer> optional, List<Column> list, List<? extends SimpleColumn> list2, boolean z) {
        if (optional.isPresent() && !list.isEmpty()) {
            Column.Namespace namespace = z ? Column.Namespace.KEY : Column.Namespace.VALUE;
            List list3 = (List) IntStream.range(0, list2.size()).mapToObj(i -> {
                return Column.of(((SimpleColumn) list2.get(i)).name(), ((SimpleColumn) list2.get(i)).type(), namespace, i);
            }).collect(Collectors.toList());
            Sets.SetView difference = Sets.difference(ImmutableSet.copyOf(list), ImmutableSet.copyOf(list3));
            if (difference.isEmpty()) {
            } else {
                throw new KsqlException("The following " + (z ? "key " : "value ") + "columns are changed, missing or reordered: " + difference + ". Schema from schema registry is " + list3);
            }
        }
    }

    private static boolean hasKeyElements(ConfiguredStatement<CreateSource> configuredStatement) {
        return configuredStatement.getStatement().getElements().stream().map((v0) -> {
            return v0.getConstraints();
        }).anyMatch(columnConstraints -> {
            return columnConstraints.isKey() || columnConstraints.isPrimaryKey();
        });
    }

    private static boolean hasValueElements(ConfiguredStatement<CreateSource> configuredStatement) {
        return configuredStatement.getStatement().getElements().stream().map((v0) -> {
            return v0.getConstraints();
        }).anyMatch(columnConstraints -> {
            return (columnConstraints.isKey() || columnConstraints.isPrimaryKey() || columnConstraints.isHeaders()) ? false : true;
        });
    }

    private static boolean formatSupportsSchemaInference(Format format) {
        return format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
    }

    private static CreateAsSelect addSchemaFieldsCas(ConfiguredStatement<CreateAsSelect> configuredStatement, Optional<TopicSchemaSupplier.SchemaAndId> optional, Optional<TopicSchemaSupplier.SchemaAndId> optional2) {
        CreateAsSelect statement = configuredStatement.getStatement();
        CreateSourceAsProperties properties = statement.getProperties();
        return statement.copyWith(statement.getProperties().withKeyValueSchemaName((properties.getKeySchemaId().isPresent() && optional.isPresent()) ? Optional.ofNullable(optional.get().rawSchema.name()) : Optional.empty(), (properties.getValueSchemaId().isPresent() && optional2.isPresent()) ? Optional.ofNullable(optional2.get().rawSchema.name()) : Optional.empty()));
    }

    private static void throwOnMultiSchemaDefinitions(ParsedSchema parsedSchema, Format format, boolean z) {
        List schemaFullNames = format.schemaFullNames(parsedSchema);
        if (schemaFullNames.size() > 1) {
            throw new KsqlException((z ? "Key" : "Value") + " schema has multiple schema definitions. " + System.lineSeparator() + System.lineSeparator() + ((String) schemaFullNames.stream().map(str -> {
                return "- " + str;
            }).collect(Collectors.joining("\n"))) + System.lineSeparator() + System.lineSeparator() + "Please specify a schema full name in the WITH clause using " + (z ? "KEY_SCHEMA_FULL_NAME" : "VALUE_SCHEMA_FULL_NAME"));
        }
    }

    private static CreateSource addSchemaFields(ConfiguredStatement<CreateSource> configuredStatement, Optional<TopicSchemaSupplier.SchemaAndId> optional, Optional<TopicSchemaSupplier.SchemaAndId> optional2) {
        TableElements buildElements = buildElements(configuredStatement, optional, optional2);
        CreateSource statement = configuredStatement.getStatement();
        CreateSourceProperties properties = statement.getProperties();
        if (optional.isPresent() && !properties.getKeySchemaFullName().isPresent()) {
            throwOnMultiSchemaDefinitions(optional.get().rawSchema, FormatFactory.of(SourcePropertiesUtil.getKeyFormat(properties, statement.getName())), true);
        }
        if (optional2.isPresent() && !properties.getValueSchemaFullName().isPresent()) {
            throwOnMultiSchemaDefinitions(optional2.get().rawSchema, FormatFactory.of(SourcePropertiesUtil.getValueFormat(properties)), false);
        }
        return statement.copyWith(buildElements, statement.getProperties().withKeyValueSchemaName(properties.getKeySchemaFullName().isPresent() ? properties.getKeySchemaFullName() : (properties.getKeySchemaId().isPresent() && optional.isPresent()) ? Optional.ofNullable(optional.get().rawSchema.name()) : Optional.empty(), properties.getValueSchemaFullName().isPresent() ? properties.getValueSchemaFullName() : (properties.getValueSchemaId().isPresent() && optional2.isPresent()) ? Optional.ofNullable(optional2.get().rawSchema.name()) : Optional.empty()));
    }

    private static TableElements buildElements(ConfiguredStatement<CreateSource> configuredStatement, Optional<TopicSchemaSupplier.SchemaAndId> optional, Optional<TopicSchemaSupplier.SchemaAndId> optional2) {
        List list = (List) configuredStatement.getStatement().getElements().stream().filter(tableElement -> {
            return tableElement.getConstraints().isHeaders();
        }).collect(Collectors.toList());
        if (optional.isPresent()) {
            ColumnConstraints keyConstraints = getKeyConstraints(configuredStatement.getStatement());
            Stream<R> map = optional.get().columns.stream().map(simpleColumn -> {
                return new TableElement(simpleColumn.name(), new Type(simpleColumn.type()), keyConstraints);
            });
            list.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
        } else {
            Stream<TableElement> keyColumns = getKeyColumns(configuredStatement);
            list.getClass();
            keyColumns.forEach((v1) -> {
                r1.add(v1);
            });
        }
        if (optional2.isPresent()) {
            Stream<R> map2 = optional2.get().columns.stream().map(simpleColumn2 -> {
                return new TableElement(simpleColumn2.name(), new Type(simpleColumn2.type()));
            });
            list.getClass();
            map2.forEach((v1) -> {
                r1.add(v1);
            });
        } else {
            Stream<TableElement> valueColumns = getValueColumns(configuredStatement);
            list.getClass();
            valueColumns.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return TableElements.of(list);
    }

    private static ColumnConstraints getKeyConstraints(CreateSource createSource) {
        if (createSource instanceof CreateStream) {
            return KEY_CONSTRAINT;
        }
        if (createSource instanceof CreateTable) {
            return PRIMARY_KEY_CONSTRAINT;
        }
        throw new IllegalArgumentException("Unrecognized statement type: " + createSource);
    }

    private static Stream<TableElement> getKeyColumns(ConfiguredStatement<CreateSource> configuredStatement) {
        return configuredStatement.getStatement().getElements().stream().filter(tableElement -> {
            return tableElement.getConstraints().isKey() || tableElement.getConstraints().isPrimaryKey();
        });
    }

    private static Stream<TableElement> getValueColumns(ConfiguredStatement<CreateSource> configuredStatement) {
        return configuredStatement.getStatement().getElements().stream().filter(tableElement -> {
            return (tableElement.getConstraints().isKey() || tableElement.getConstraints().isPrimaryKey() || tableElement.getConstraints().isHeaders()) ? false : true;
        });
    }

    private static <T extends Statement> KsqlParser.PreparedStatement<T> buildPreparedStatement(T t) {
        return KsqlParser.PreparedStatement.of(SqlFormatter.formatSql(t), t);
    }
}
