package io.confluent.ksql.serde;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.ksql.types.SqlArray;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlMap;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlStruct;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.serde.tracked.TrackedCallback;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;

/* loaded from: input_file:io/confluent/ksql/serde/GenericKeySerDe.class */
public final class GenericKeySerDe implements KeySerdeFactory {
    private final GenericSerdeFactory innerFactory;

    public GenericKeySerDe() {
        this(new GenericSerdeFactory());
    }

    @VisibleForTesting
    GenericKeySerDe(GenericSerdeFactory genericSerdeFactory) {
        this.innerFactory = (GenericSerdeFactory) Objects.requireNonNull(genericSerdeFactory, "innerFactory");
    }

    @Override // io.confluent.ksql.serde.KeySerdeFactory
    public Serde<GenericKey> create(FormatInfo formatInfo, PersistenceSchema persistenceSchema, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, String str, ProcessingLogContext processingLogContext, Optional<TrackedCallback> optional) {
        return createInner(formatInfo, persistenceSchema, ksqlConfig, supplier, str, processingLogContext, optional);
    }

    @Override // io.confluent.ksql.serde.KeySerdeFactory
    public Serde<Windowed<GenericKey>> create(FormatInfo formatInfo, WindowInfo windowInfo, PersistenceSchema persistenceSchema, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, String str, ProcessingLogContext processingLogContext, Optional<TrackedCallback> optional) {
        Serde<GenericKey> createInner = createInner(formatInfo, persistenceSchema, ksqlConfig, supplier, str, processingLogContext, optional);
        return windowInfo.getType().requiresWindowSize() ? new WindowedSerdes.TimeWindowedSerde(createInner, ((Duration) windowInfo.getSize().get()).toMillis()) : new WindowedSerdes.SessionWindowedSerde(createInner);
    }

    private Serde<GenericKey> createInner(FormatInfo formatInfo, PersistenceSchema persistenceSchema, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, String str, ProcessingLogContext processingLogContext, Optional<TrackedCallback> optional) {
        checkUnsupportedSchema(persistenceSchema.columns(), ksqlConfig);
        Serde wrapInLoggingSerde = this.innerFactory.wrapInLoggingSerde(toGenericKeySerde(this.innerFactory.createFormatSerde("Key", formatInfo, persistenceSchema, ksqlConfig, supplier, true), persistenceSchema), str, processingLogContext);
        Serde<GenericKey> serde = (Serde) optional.map(trackedCallback -> {
            return this.innerFactory.wrapInTrackingSerde(wrapInLoggingSerde, trackedCallback);
        }).orElse(wrapInLoggingSerde);
        serde.configure(Collections.emptyMap(), true);
        return serde;
    }

    private static void checkUnsupportedSchema(List<SimpleColumn> list, KsqlConfig ksqlConfig) {
        if (list.isEmpty()) {
            return;
        }
        for (SimpleColumn simpleColumn : list) {
            if (containsMapType(simpleColumn.type())) {
                throw new KsqlException("Map keys, including types that contain maps, are not supported as they may lead to unexpected behavior due to inconsistent serialization. Key column name: " + simpleColumn.name() + ". Column type: " + simpleColumn.type().toString(FormatOptions.none()) + ". See https://github.com/confluentinc/ksql/issues/6621 for more.");
            }
        }
    }

    private static boolean containsMapType(SqlType sqlType) {
        if (sqlType instanceof SqlMap) {
            return true;
        }
        if ((sqlType instanceof SqlPrimitiveType) || (sqlType instanceof SqlDecimal)) {
            return false;
        }
        if (sqlType instanceof SqlArray) {
            return containsMapType(((SqlArray) sqlType).getItemType());
        }
        if (sqlType instanceof SqlStruct) {
            return ((SqlStruct) sqlType).fields().stream().map((v0) -> {
                return v0.type();
            }).anyMatch(GenericKeySerDe::containsMapType);
        }
        throw new IllegalStateException("Unexpected type: " + sqlType);
    }

    private static Serde<GenericKey> toGenericKeySerde(Serde<List<?>> serde, PersistenceSchema persistenceSchema) {
        return Serdes.serdeFrom(new GenericSerializer((v0) -> {
            return v0.values();
        }, serde.serializer(), persistenceSchema.columns().size()), new GenericDeserializer(GenericKey::fromList, serde.deserializer(), persistenceSchema.columns().size()));
    }
}
