package io.confluent.ksql.schema.ksql.inference;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import io.confluent.ksql.util.KsqlStatementException;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.cache.Cache;

/* loaded from: input_file:io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.class */
public class SchemaRegisterInjector implements Injector {
    private final KsqlExecutionContext executionContext;
    private final ServiceContext serviceContext;

    public SchemaRegisterInjector(KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        this.executionContext = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "executionContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
    }

    @Override // io.confluent.ksql.statement.Injector
    public <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement) {
        if (configuredStatement.getStatement() instanceof CreateAsSelect) {
            registerForCreateAs(configuredStatement);
        } else if (configuredStatement.getStatement() instanceof CreateSource) {
            registerForCreateSource(configuredStatement);
        }
        return configuredStatement;
    }

    private void registerForCreateSource(ConfiguredStatement<? extends CreateSource> configuredStatement) {
        CreateSource statement = configuredStatement.getStatement();
        LogicalSchema logicalSchema = statement.getElements().toLogicalSchema();
        FormatInfo keyFormat = SourcePropertiesUtil.getKeyFormat(statement.getProperties(), statement.getName());
        SerdeFeatures buildKeyFeatures = SerdeFeaturesFactory.buildKeyFeatures(logicalSchema, FormatFactory.of(keyFormat));
        FormatInfo valueFormat = SourcePropertiesUtil.getValueFormat(statement.getProperties());
        registerSchemas(logicalSchema, statement.getProperties().getKafkaTopic(), keyFormat, buildKeyFeatures, valueFormat, SerdeFeaturesFactory.buildValueFeatures(logicalSchema, FormatFactory.of(valueFormat), statement.getProperties().getValueSerdeFeatures(), configuredStatement.getSessionConfig().getConfig(false)), configuredStatement.getSessionConfig().getConfig(false), configuredStatement.getStatementText(), false);
    }

    private void registerForCreateAs(ConfiguredStatement<? extends CreateAsSelect> configuredStatement) {
        try {
            SandboxedServiceContext create = SandboxedServiceContext.create(this.serviceContext);
            CreateSourceCommand createSourceCommand = this.executionContext.createSandbox(create).plan(create, configuredStatement).getDdlCommand().get();
            registerSchemas(createSourceCommand.getSchema(), createSourceCommand.getTopicName(), createSourceCommand.getFormats().getKeyFormat(), createSourceCommand.getFormats().getKeyFeatures(), createSourceCommand.getFormats().getValueFormat(), createSourceCommand.getFormats().getValueFeatures(), configuredStatement.getSessionConfig().getConfig(false), configuredStatement.getStatementText(), true);
        } catch (Exception e) {
            throw new KsqlStatementException("Could not determine output schema for query due to error: " + e.getMessage(), configuredStatement.getStatementText(), e);
        }
    }

    private void registerSchemas(LogicalSchema logicalSchema, String str, FormatInfo formatInfo, SerdeFeatures serdeFeatures, FormatInfo formatInfo2, SerdeFeatures serdeFeatures2, KsqlConfig ksqlConfig, String str2, boolean z) {
        registerSchema(logicalSchema.key(), str, formatInfo, serdeFeatures, ksqlConfig, str2, z, KsqlConstants.getSRSubject(str, true), true);
        registerSchema(logicalSchema.value(), str, formatInfo2, serdeFeatures2, ksqlConfig, str2, z, KsqlConstants.getSRSubject(str, false), false);
    }

    private void registerSchema(List<? extends SimpleColumn> list, String str, FormatInfo formatInfo, SerdeFeatures serdeFeatures, KsqlConfig ksqlConfig, String str2, boolean z, String str3, boolean z2) {
        Format of = FormatFactory.of(formatInfo);
        if (of.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
            if (ksqlConfig.getString("ksql.schema.registry.url").isEmpty()) {
                throw new KsqlSchemaRegistryNotConfiguredException(String.format("Cannot create topic '%s' with format %s without configuring '%s'", str, of.name(), "ksql.schema.registry.url"));
            }
            try {
                SchemaRegistryClient schemaRegistryClient = this.serviceContext.getSchemaRegistryClient();
                if (z || !SchemaRegistryUtil.subjectExists(schemaRegistryClient, str3)) {
                    ProtobufSchema parsedSchema = of.getSchemaTranslator(formatInfo.getProperties()).toParsedSchema(PersistenceSchema.from(list, serdeFeatures));
                    if (parsedSchema instanceof ProtobufSchema) {
                        schemaRegistryClient.register(str3, AbstractKafkaProtobufSerializer.resolveDependencies(schemaRegistryClient, true, false, true, (Cache) null, new DefaultReferenceSubjectNameStrategy(), str, z2, parsedSchema));
                    } else {
                        schemaRegistryClient.register(str3, parsedSchema);
                    }
                }
            } catch (IOException | RestClientException e) {
                throw new KsqlStatementException("Could not register schema for topic: " + e.getMessage(), str2, e);
            }
        }
    }
}
