/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.serde;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlPrimitiveType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericSerdeFactory;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.serde.tracked.TrackedCallback;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;

public final class GenericKeySerDe
implements KeySerdeFactory {
    private final GenericSerdeFactory innerFactory;

    public GenericKeySerDe() {
        this(new GenericSerdeFactory());
    }

    @VisibleForTesting
    GenericKeySerDe(GenericSerdeFactory innerFactory) {
        this.innerFactory = Objects.requireNonNull(innerFactory, "innerFactory");
    }

    @Override
    public Serde<Struct> create(FormatInfo format, PersistenceSchema schema, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> schemaRegistryClientFactory, String loggerNamePrefix, ProcessingLogContext processingLogContext, Optional<TrackedCallback> tracker) {
        return this.createInner(format, schema, ksqlConfig, schemaRegistryClientFactory, loggerNamePrefix, processingLogContext, tracker);
    }

    @Override
    public Serde<Windowed<Struct>> create(FormatInfo format, WindowInfo window, PersistenceSchema schema, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> schemaRegistryClientFactory, String loggerNamePrefix, ProcessingLogContext processingLogContext, Optional<TrackedCallback> tracker) {
        Serde<Struct> inner = this.createInner(format, schema, ksqlConfig, schemaRegistryClientFactory, loggerNamePrefix, processingLogContext, tracker);
        return window.getType().requiresWindowSize() ? new WindowedSerdes.TimeWindowedSerde(inner, ((Duration)window.getSize().get()).toMillis()) : new WindowedSerdes.SessionWindowedSerde(inner);
    }

    private Serde<Struct> createInner(FormatInfo format, PersistenceSchema schema, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> schemaRegistryClientFactory, String loggerNamePrefix, ProcessingLogContext processingLogContext, Optional<TrackedCallback> tracker) {
        if (GenericKeySerDe.unsupportedSchema(schema.columns(), ksqlConfig)) {
            throw new KsqlException("Unsupported key schema: " + schema.columns());
        }
        Serde<List<?>> formatSerde = this.innerFactory.createFormatSerde("Key", format, schema, ksqlConfig, schemaRegistryClientFactory, true);
        Serde<Struct> structSerde = GenericKeySerDe.toStructSerde(formatSerde, schema);
        Serde<Struct> loggingSerde = this.innerFactory.wrapInLoggingSerde(structSerde, loggerNamePrefix, processingLogContext);
        Serde<Struct> serde = tracker.map(callback -> this.innerFactory.wrapInTrackingSerde(loggingSerde, (TrackedCallback)callback)).orElse(loggingSerde);
        serde.configure(Collections.emptyMap(), true);
        return serde;
    }

    private static boolean unsupportedSchema(List<SimpleColumn> columns, KsqlConfig config) {
        if (config.getBoolean("ksql.key.format.enabled").booleanValue()) {
            return columns.size() > 1;
        }
        if (columns.isEmpty()) {
            return false;
        }
        if (columns.size() > 1) {
            return true;
        }
        SqlType sqlType = columns.get(0).type();
        return !(sqlType instanceof SqlPrimitiveType) && !(sqlType instanceof SqlDecimal);
    }

    private static Serde<Struct> toStructSerde(Serde<List<?>> inner, PersistenceSchema schema) {
        ConnectSchema connectSchema = ConnectSchemas.columnsToConnectSchema(schema.columns());
        return Serdes.serdeFrom((Serializer)new GenericKeySerializer(inner.serializer(), connectSchema.fields().size()), (Deserializer)new GenericKeyDeserializer(inner.deserializer(), connectSchema));
    }

    @VisibleForTesting
    static class GenericKeyDeserializer
    implements Deserializer<Struct> {
        private final Deserializer<List<?>> inner;
        private final ConnectSchema connectSchema;

        GenericKeyDeserializer(Deserializer<List<?>> inner, ConnectSchema connectSchema) {
            this.inner = Objects.requireNonNull(inner, "inner");
            this.connectSchema = Objects.requireNonNull(connectSchema, "connectSchema");
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.inner.configure(configs, isKey);
        }

        public void close() {
            this.inner.close();
        }

        public Struct deserialize(String topic, byte[] data) {
            List values = (List)this.inner.deserialize(topic, data);
            if (values == null) {
                return null;
            }
            List fields = this.connectSchema.fields();
            SerdeUtils.throwOnColumnCountMismatch(fields.size(), values.size(), false, topic);
            Struct row = new Struct((Schema)this.connectSchema);
            Iterator fIt = fields.iterator();
            Iterator vIt = values.iterator();
            while (fIt.hasNext()) {
                row.put((Field)fIt.next(), vIt.next());
            }
            return row;
        }
    }

    @VisibleForTesting
    static class GenericKeySerializer
    implements Serializer<Struct> {
        private final Serializer<List<?>> inner;
        private final int numColumns;

        GenericKeySerializer(Serializer<List<?>> inner, int numColumns) {
            this.inner = Objects.requireNonNull(inner, "inner");
            this.numColumns = numColumns;
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.inner.configure(configs, isKey);
        }

        public byte[] serialize(String topic, Struct data) {
            if (data == null) {
                return this.inner.serialize(topic, null);
            }
            List fields = data.schema().fields();
            SerdeUtils.throwOnColumnCountMismatch(this.numColumns, fields.size(), true, topic);
            ArrayList<Object> values = new ArrayList<Object>(this.numColumns);
            for (Field field : fields) {
                values.add(data.get(field));
            }
            return this.inner.serialize(topic, values);
        }

        public void close() {
            this.inner.close();
        }
    }
}

