package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.Pair;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/* loaded from: input_file:io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.class */
public class SchemaRegisterInjector implements Injector {
    private final KsqlExecutionContext executionContext;
    private final ServiceContext serviceContext;

    public SchemaRegisterInjector(KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        this.executionContext = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "executionContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
    }

    @Override // io.confluent.ksql.statement.Injector
    public <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement) {
        try {
            if (configuredStatement.getStatement() instanceof CreateAsSelect) {
                registerForCreateAs(configuredStatement);
            } else if (configuredStatement.getStatement() instanceof CreateSource) {
                registerForCreateSource(configuredStatement);
            }
            return stripSchemaIdConfig(configuredStatement);
        } catch (KsqlException e) {
            throw new KsqlStatementException(ErrorMessageUtil.buildErrorMessage(e), configuredStatement.getMaskedStatementText(), e.getCause());
        } catch (KsqlStatementException e2) {
            throw e2;
        }
    }

    private <T extends Statement> ConfiguredStatement<T> stripSchemaIdConfig(ConfiguredStatement<T> configuredStatement) {
        Map overrides = configuredStatement.getSessionConfig().getOverrides();
        if (!overrides.containsKey("KEY_SCHEMA_ID") && !overrides.containsKey("VALUE_SCHEMA_ID")) {
            return configuredStatement;
        }
        return ConfiguredStatement.of(configuredStatement.getPreparedStatement(), configuredStatement.getSessionConfig().withNewOverrides((ImmutableMap) overrides.entrySet().stream().filter(entry -> {
            return (((String) entry.getKey()).equals("KEY_SCHEMA_ID") || ((String) entry.getKey()).equals("VALUE_SCHEMA_ID")) ? false : true;
        }).collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))));
    }

    private void registerForCreateSource(ConfiguredStatement<? extends CreateSource> configuredStatement) {
        CreateSource statement = configuredStatement.getStatement();
        LogicalSchema logicalSchema = statement.getElements().toLogicalSchema();
        FormatInfo keyFormat = SourcePropertiesUtil.getKeyFormat(statement.getProperties(), statement.getName());
        SerdeFeatures buildKeyFeatures = SerdeFeaturesFactory.buildKeyFeatures(logicalSchema, tryGetFormat(keyFormat, true, configuredStatement.getMaskedStatementText()));
        FormatInfo valueFormat = SourcePropertiesUtil.getValueFormat(statement.getProperties());
        registerSchemas(logicalSchema, Pair.of((TopicSchemaSupplier.SchemaAndId) configuredStatement.getSessionConfig().getOverrides().get("KEY_SCHEMA_ID"), (TopicSchemaSupplier.SchemaAndId) configuredStatement.getSessionConfig().getOverrides().get("VALUE_SCHEMA_ID")), statement.getProperties().getKafkaTopic(), keyFormat, buildKeyFeatures, valueFormat, SerdeFeaturesFactory.buildValueFeatures(logicalSchema, tryGetFormat(valueFormat, false, configuredStatement.getMaskedStatementText()), statement.getProperties().getValueSerdeFeatures(), configuredStatement.getSessionConfig().getConfig(false)), configuredStatement.getSessionConfig().getConfig(false), configuredStatement.getMaskedStatementText(), false);
    }

    private static Format tryGetFormat(FormatInfo formatInfo, boolean z, String str) {
        try {
            return FormatFactory.of(formatInfo);
        } catch (KsqlException e) {
            if (!e.getMessage().contains("does not support the following configs: [schemaId]")) {
                throw e;
            }
            throw new KsqlStatementException((z ? "KEY_SCHEMA_ID" : "VALUE_SCHEMA_ID") + " is provided but format " + formatInfo.getFormat() + " doesn't support registering in Schema Registry", str, e);
        }
    }

    private void registerForCreateAs(ConfiguredStatement<? extends CreateAsSelect> configuredStatement) {
        try {
            SandboxedServiceContext create = SandboxedServiceContext.create(this.serviceContext);
            CreateSourceCommand createSourceCommand = this.executionContext.createSandbox(create).plan(create, configuredStatement).getDdlCommand().get();
            registerSchemas(createSourceCommand.getSchema(), Pair.of((TopicSchemaSupplier.SchemaAndId) configuredStatement.getSessionConfig().getOverrides().get("KEY_SCHEMA_ID"), (TopicSchemaSupplier.SchemaAndId) configuredStatement.getSessionConfig().getOverrides().get("VALUE_SCHEMA_ID")), createSourceCommand.getTopicName(), createSourceCommand.getFormats().getKeyFormat(), createSourceCommand.getFormats().getKeyFeatures(), createSourceCommand.getFormats().getValueFormat(), createSourceCommand.getFormats().getValueFeatures(), configuredStatement.getSessionConfig().getConfig(false), configuredStatement.getMaskedStatementText(), true);
        } catch (Exception e) {
            throw new KsqlStatementException("Could not determine output schema for query due to error: " + e.getMessage(), configuredStatement.getMaskedStatementText(), e);
        }
    }

    private void registerSchemas(LogicalSchema logicalSchema, Pair<TopicSchemaSupplier.SchemaAndId, TopicSchemaSupplier.SchemaAndId> pair, String str, FormatInfo formatInfo, SerdeFeatures serdeFeatures, FormatInfo formatInfo2, SerdeFeatures serdeFeatures2, KsqlConfig ksqlConfig, String str2, boolean z) {
        boolean z2 = pair.left != null;
        boolean z3 = pair.right != null;
        if (!z2) {
            registerSchema(logicalSchema.key(), str, formatInfo, serdeFeatures, ksqlConfig, str2, z, KsqlConstants.getSRSubject(str, true), true);
        }
        if (!z3) {
            registerSchema(logicalSchema.value(), str, formatInfo2, serdeFeatures2, ksqlConfig, str2, z, KsqlConstants.getSRSubject(str, false), false);
        }
        if (z2) {
            sanityCheck((TopicSchemaSupplier.SchemaAndId) pair.left, formatInfo, str, ksqlConfig, str2, true);
        }
        if (z3) {
            sanityCheck((TopicSchemaSupplier.SchemaAndId) pair.right, formatInfo2, str, ksqlConfig, str2, false);
        }
        if (z2) {
            registerRawSchema((TopicSchemaSupplier.SchemaAndId) pair.left, str, str2, KsqlConstants.getSRSubject(str, true), true);
        }
        if (z3) {
            registerRawSchema((TopicSchemaSupplier.SchemaAndId) pair.right, str, str2, KsqlConstants.getSRSubject(str, false), false);
        }
    }

    private static void sanityCheck(TopicSchemaSupplier.SchemaAndId schemaAndId, FormatInfo formatInfo, String str, KsqlConfig ksqlConfig, String str2, boolean z) {
        String str3 = z ? "KEY_SCHEMA_ID" : "VALUE_SCHEMA_ID";
        Format of = FormatFactory.of(formatInfo);
        if (!canRegister(of, ksqlConfig, str)) {
            throw new KsqlStatementException(str3 + " is provided but format " + of.name() + " doesn't support registering in Schema Registry", str2);
        }
        if (!of.getSchemaTranslator(formatInfo.getProperties()).name().equals(schemaAndId.rawSchema.schemaType())) {
            throw new KsqlStatementException(String.format("Format and fetched schema type using %s %d are different. Format: [%s], Fetched schema type: [%s].", str3, Integer.valueOf(schemaAndId.id), of.name(), schemaAndId.rawSchema.schemaType()), str2);
        }
    }

    private void registerRawSchema(TopicSchemaSupplier.SchemaAndId schemaAndId, String str, String str2, String str3, Boolean bool) {
        try {
            int registerSchema = SchemaRegistryUtil.registerSchema(this.serviceContext.getSchemaRegistryClient(), schemaAndId.rawSchema, str, str3, bool.booleanValue());
            if ((this.serviceContext instanceof SandboxedServiceContext) || registerSchema == schemaAndId.id) {
                return;
            }
            throw new KsqlStatementException("Schema id registered is " + registerSchema + " which is different from provided " + (bool.booleanValue() ? "KEY_SCHEMA_ID" : "VALUE_SCHEMA_ID") + " " + schemaAndId.id + "." + System.lineSeparator() + "Topic: " + str + System.lineSeparator() + "Subject: " + str3 + System.lineSeparator() + "Schema: " + schemaAndId.rawSchema, str2);
        } catch (KsqlException e) {
            throw new KsqlStatementException("Could not register schema for topic: " + e.getMessage(), str2, e);
        }
    }

    private static boolean canRegister(Format format, KsqlConfig ksqlConfig, String str) {
        if (!format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
            return false;
        }
        if (ksqlConfig.getString("ksql.schema.registry.url").isEmpty()) {
            throw new KsqlSchemaRegistryNotConfiguredException(String.format("Cannot create topic '%s' with format %s without configuring '%s'", str, format.name(), "ksql.schema.registry.url"));
        }
        return true;
    }

    private void registerSchema(List<? extends SimpleColumn> list, String str, FormatInfo formatInfo, SerdeFeatures serdeFeatures, KsqlConfig ksqlConfig, String str2, boolean z, String str3, boolean z2) {
        Format of = FormatFactory.of(formatInfo);
        if (canRegister(of, ksqlConfig, str)) {
            SchemaRegistryClient schemaRegistryClient = this.serviceContext.getSchemaRegistryClient();
            if (z || !SchemaRegistryUtil.subjectExists(schemaRegistryClient, str3)) {
                try {
                    SchemaRegistryUtil.registerSchema(schemaRegistryClient, of.getSchemaTranslator(formatInfo.getProperties()).toParsedSchema(PersistenceSchema.from(list, serdeFeatures)), str, str3, z2);
                } catch (KsqlException e) {
                    throw new KsqlStatementException("Could not register schema for topic: " + e.getMessage(), str2, e);
                }
            }
        }
    }
}
