package io.confluent.ksql.ddl.commands;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:io/confluent/ksql/ddl/commands/CreateSourceFactory.class */
public final class CreateSourceFactory {
    private final ServiceContext serviceContext;
    private final SerdeFeaturessSupplier keySerdeFeaturesSupplier;
    private final SerdeFeaturessSupplier valueSerdeFeaturesSupplier;
    private final KeySerdeFactory keySerdeFactory;
    private final ValueSerdeFactory valueSerdeFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:io/confluent/ksql/ddl/commands/CreateSourceFactory$SerdeFeaturessSupplier.class */
    public interface SerdeFeaturessSupplier {
        SerdeFeatures build(LogicalSchema logicalSchema, Format format, SerdeFeatures serdeFeatures, KsqlConfig ksqlConfig);
    }

    public CreateSourceFactory(ServiceContext serviceContext) {
        this(serviceContext, (logicalSchema, format, serdeFeatures, ksqlConfig) -> {
            return SerdeFeaturesFactory.buildKeyFeatures(logicalSchema, format);
        }, SerdeFeaturesFactory::buildValueFeatures, new GenericKeySerDe(), new GenericRowSerDe());
    }

    @VisibleForTesting
    CreateSourceFactory(ServiceContext serviceContext, SerdeFeaturessSupplier serdeFeaturessSupplier, SerdeFeaturessSupplier serdeFeaturessSupplier2, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.keySerdeFeaturesSupplier = (SerdeFeaturessSupplier) Objects.requireNonNull(serdeFeaturessSupplier, "keySerdeFeaturesSupplier");
        this.valueSerdeFeaturesSupplier = (SerdeFeaturessSupplier) Objects.requireNonNull(serdeFeaturessSupplier2, "valueSerdeFeaturesSupplier");
        this.keySerdeFactory = (KeySerdeFactory) Objects.requireNonNull(keySerdeFactory, "keySerdeFactory");
        this.valueSerdeFactory = (ValueSerdeFactory) Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
    }

    public CreateStreamCommand createStreamCommand(CreateStream createStream, KsqlConfig ksqlConfig) {
        SourceName name = createStream.getName();
        CreateSourceProperties properties = createStream.getProperties();
        String ensureTopicExists = ensureTopicExists(properties, this.serviceContext);
        LogicalSchema buildSchema = buildSchema(createStream.getElements());
        return new CreateStreamCommand(name, buildSchema, buildTimestampColumn(ksqlConfig, properties, buildSchema), ensureTopicExists, buildFormats(buildSchema, properties, ksqlConfig), getWindowInfo(properties), Optional.of(Boolean.valueOf(createStream.isOrReplace())));
    }

    public CreateTableCommand createTableCommand(CreateTable createTable, KsqlConfig ksqlConfig) {
        SourceName name = createTable.getName();
        CreateSourceProperties properties = createTable.getProperties();
        String ensureTopicExists = ensureTopicExists(properties, this.serviceContext);
        LogicalSchema buildSchema = buildSchema(createTable.getElements());
        if (!buildSchema.key().isEmpty()) {
            return new CreateTableCommand(name, buildSchema, buildTimestampColumn(ksqlConfig, properties, buildSchema), ensureTopicExists, buildFormats(buildSchema, properties, ksqlConfig), getWindowInfo(properties), Optional.of(Boolean.valueOf(createTable.isOrReplace())));
        }
        throw new KsqlException("Tables require a PRIMARY KEY. Please define the PRIMARY KEY." + (properties.getValueSchemaId().isPresent() ? System.lineSeparator() + "Use a partial schema to define the primary key and still load the value columns from the Schema Registry, for example:" + System.lineSeparator() + "\tCREATE TABLE " + name.text() + " (ID INT PRIMARY KEY) WITH (...);" : ""));
    }

    private Formats buildFormats(LogicalSchema logicalSchema, CreateSourceProperties createSourceProperties, KsqlConfig ksqlConfig) {
        FormatInfo keyFormat = SourcePropertiesUtil.getKeyFormat(createSourceProperties);
        FormatInfo valueFormat = SourcePropertiesUtil.getValueFormat(createSourceProperties);
        Formats of = Formats.of(keyFormat, valueFormat, this.keySerdeFeaturesSupplier.build(logicalSchema, FormatFactory.of(keyFormat), SerdeFeatures.of(new SerdeFeature[0]), ksqlConfig), this.valueSerdeFeaturesSupplier.build(logicalSchema, FormatFactory.of(valueFormat), createSourceProperties.getValueSerdeFeatures(), ksqlConfig));
        validateSerdesCanHandleSchemas(ksqlConfig, logicalSchema, of);
        return of;
    }

    private static LogicalSchema buildSchema(TableElements tableElements) {
        if (Iterables.isEmpty(tableElements)) {
            throw new KsqlException("The statement does not define any columns.");
        }
        tableElements.forEach(tableElement -> {
            if (SystemColumns.isSystemColumn(tableElement.getName())) {
                throw new KsqlException("'" + tableElement.getName().text() + "' is a reserved column name.");
            }
        });
        return tableElements.toLogicalSchema();
    }

    private static Optional<WindowInfo> getWindowInfo(CreateSourceProperties createSourceProperties) {
        return createSourceProperties.getWindowType().map(windowType -> {
            return WindowInfo.of(windowType, createSourceProperties.getWindowSize());
        });
    }

    private static String ensureTopicExists(CreateSourceProperties createSourceProperties, ServiceContext serviceContext) {
        String kafkaTopic = createSourceProperties.getKafkaTopic();
        if (serviceContext.getTopicClient().isTopicExists(kafkaTopic)) {
            return kafkaTopic;
        }
        throw new KsqlException("Kafka topic does not exist: " + kafkaTopic);
    }

    private static Optional<TimestampColumn> buildTimestampColumn(KsqlConfig ksqlConfig, CreateSourceProperties createSourceProperties, LogicalSchema logicalSchema) {
        Optional<TimestampColumn> map = createSourceProperties.getTimestampColumnName().map(columnName -> {
            return new TimestampColumn(columnName, createSourceProperties.getTimestampFormat());
        });
        TimestampExtractionPolicyFactory.validateTimestampColumn(ksqlConfig, logicalSchema, map);
        return map;
    }

    private void validateSerdesCanHandleSchemas(KsqlConfig ksqlConfig, LogicalSchema logicalSchema, Formats formats) {
        PhysicalSchema from = PhysicalSchema.from(logicalSchema, formats.getKeyFeatures(), formats.getValueFeatures());
        this.keySerdeFactory.create(formats.getKeyFormat(), from.keySchema(), ksqlConfig, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty()).close();
        this.valueSerdeFactory.create(formats.getValueFormat(), from.valueSchema(), ksqlConfig, this.serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty()).close();
    }
}
