package io.confluent.ksql.ddl.commands;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.SerdeOptions;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.topic.TopicFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/* loaded from: input_file:io/confluent/ksql/ddl/commands/CreateSourceFactory.class */
public final class CreateSourceFactory {
    private final ServiceContext serviceContext;
    private final SerdeOptionsSupplier serdeOptionsSupplier;
    private final KeySerdeFactory keySerdeFactory;
    private final ValueSerdeFactory valueSerdeFactory;

    @FunctionalInterface
    /* loaded from: input_file:io/confluent/ksql/ddl/commands/CreateSourceFactory$SerdeOptionsSupplier.class */
    interface SerdeOptionsSupplier {
        Set<SerdeOption> build(LogicalSchema logicalSchema, Format format, Optional<Boolean> optional, KsqlConfig ksqlConfig);
    }

    public CreateSourceFactory(ServiceContext serviceContext) {
        this(serviceContext, SerdeOptions::buildForCreateStatement, new GenericKeySerDe(), new GenericRowSerDe());
    }

    @VisibleForTesting
    CreateSourceFactory(ServiceContext serviceContext, SerdeOptionsSupplier serdeOptionsSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.serdeOptionsSupplier = (SerdeOptionsSupplier) Objects.requireNonNull(serdeOptionsSupplier, "serdeOptionsSupplier");
        this.keySerdeFactory = (KeySerdeFactory) Objects.requireNonNull(keySerdeFactory, "keySerdeFactory");
        this.valueSerdeFactory = (ValueSerdeFactory) Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
    }

    public CreateStreamCommand createStreamCommand(CreateStream createStream, KsqlConfig ksqlConfig) {
        SourceName name = createStream.getName();
        KsqlTopic buildTopic = buildTopic(createStream.getProperties(), this.serviceContext);
        LogicalSchema buildSchema = buildSchema(createStream.getElements());
        Optional<ColumnName> buildKeyFieldName = buildKeyFieldName(createStream, buildSchema);
        Optional<TimestampColumn> buildTimestampColumn = buildTimestampColumn(ksqlConfig, createStream.getProperties(), buildSchema);
        Set<SerdeOption> build = this.serdeOptionsSupplier.build(buildSchema, buildTopic.getValueFormat().getFormat(), createStream.getProperties().getWrapSingleValues(), ksqlConfig);
        validateSerdesCanHandleSchemas(ksqlConfig, PhysicalSchema.from(buildSchema, build), buildTopic);
        return new CreateStreamCommand(name, buildSchema, buildKeyFieldName, buildTimestampColumn, buildTopic.getKafkaTopicName(), Formats.of(buildTopic.getKeyFormat(), buildTopic.getValueFormat(), build), buildTopic.getKeyFormat().getWindowInfo());
    }

    public CreateTableCommand createTableCommand(CreateTable createTable, KsqlConfig ksqlConfig) {
        SourceName name = createTable.getName();
        KsqlTopic buildTopic = buildTopic(createTable.getProperties(), this.serviceContext);
        LogicalSchema buildSchema = buildSchema(createTable.getElements());
        Optional<ColumnName> buildKeyFieldName = buildKeyFieldName(createTable, buildSchema);
        Optional<TimestampColumn> buildTimestampColumn = buildTimestampColumn(ksqlConfig, createTable.getProperties(), buildSchema);
        Set<SerdeOption> build = this.serdeOptionsSupplier.build(buildSchema, buildTopic.getValueFormat().getFormat(), createTable.getProperties().getWrapSingleValues(), ksqlConfig);
        validateSerdesCanHandleSchemas(ksqlConfig, PhysicalSchema.from(buildSchema, build), buildTopic);
        return new CreateTableCommand(name, buildSchema, buildKeyFieldName, buildTimestampColumn, buildTopic.getKafkaTopicName(), Formats.of(buildTopic.getKeyFormat(), buildTopic.getValueFormat(), build), buildTopic.getKeyFormat().getWindowInfo());
    }

    private static Optional<ColumnName> buildKeyFieldName(CreateSource createSource, LogicalSchema logicalSchema) {
        if (!createSource.getProperties().getKeyField().isPresent()) {
            return Optional.empty();
        }
        ColumnName columnName = (ColumnName) createSource.getProperties().getKeyField().get();
        logicalSchema.findValueColumn(columnName).orElseThrow(() -> {
            return new KsqlException("The KEY column set in the WITH clause does not exist in the schema: '" + columnName.toString(FormatOptions.noEscape()) + "'");
        });
        return Optional.of(columnName);
    }

    private static LogicalSchema buildSchema(TableElements tableElements) {
        if (Iterables.isEmpty(tableElements)) {
            throw new KsqlException("The statement does not define any columns.");
        }
        tableElements.forEach(tableElement -> {
            boolean equals = tableElement.getName().equals(SchemaUtil.ROWKEY_NAME);
            if (!equals && SchemaUtil.isSystemColumn(tableElement.getName())) {
                throw new KsqlException("'" + tableElement.getName().name() + "' is a reserved column name.");
            }
            if (tableElement.getNamespace() == TableElement.Namespace.KEY) {
                if (!equals) {
                    throw new KsqlException("'" + tableElement.getName().name() + "' is an invalid KEY column name. KSQL currently only supports KEY columns named ROWKEY.");
                }
            } else if (equals) {
                throw new KsqlException("'" + tableElement.getName().name() + "' is a reserved column name. It can only be used for KEY columns.");
            }
        });
        return tableElements.toLogicalSchema(true);
    }

    private static KsqlTopic buildTopic(CreateSourceProperties createSourceProperties, ServiceContext serviceContext) {
        String kafkaTopic = createSourceProperties.getKafkaTopic();
        if (serviceContext.getTopicClient().isTopicExists(kafkaTopic)) {
            return TopicFactory.create(createSourceProperties);
        }
        throw new KsqlException("Kafka topic does not exist: " + kafkaTopic);
    }

    private static Optional<TimestampColumn> buildTimestampColumn(KsqlConfig ksqlConfig, CreateSourceProperties createSourceProperties, LogicalSchema logicalSchema) {
        Optional<TimestampColumn> map = createSourceProperties.getTimestampColumnName().map(columnName -> {
            return new TimestampColumn(columnName, createSourceProperties.getTimestampFormat());
        });
        TimestampExtractionPolicyFactory.validateTimestampColumn(ksqlConfig, logicalSchema, map);
        return map;
    }

    private void validateSerdesCanHandleSchemas(KsqlConfig ksqlConfig, PhysicalSchema physicalSchema, KsqlTopic ksqlTopic) {
        this.keySerdeFactory.create(ksqlTopic.getKeyFormat().getFormatInfo(), physicalSchema.keySchema(), ksqlConfig, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE).close();
        this.valueSerdeFactory.create(ksqlTopic.getValueFormat().getFormatInfo(), physicalSchema.valueSchema(), ksqlConfig, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE).close();
    }
}
