/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.serde.connect;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.SchemaTranslator;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.serde.connect.ConnectFormatSchemaTranslator;
import io.confluent.ksql.serde.connect.ConnectSchemaTranslator;
import io.confluent.ksql.serde.connect.ConnectSchemaUtil;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;

public abstract class ConnectFormat
implements Format {
    @Override
    public SchemaTranslator getSchemaTranslator(Map<String, String> formatProperties) {
        return new ConnectFormatSchemaTranslator(this, formatProperties, ConnectSchemaUtil::toKsqlSchema);
    }

    @Override
    public Serde<List<?>> getSerde(PersistenceSchema schema, Map<String, String> formatProps, KsqlConfig config, Supplier<SchemaRegistryClient> srFactory, boolean isKey) {
        SerdeUtils.throwOnUnsupportedFeatures(schema.features(), this.supportedFeatures());
        ConnectSchema outerSchema = ConnectSchemas.columnsToConnectSchema(schema.columns());
        ConnectSchema innerSchema = SerdeUtils.applySinglesUnwrapping((Schema)outerSchema, schema.features());
        Class targetType = SchemaConverters.connectToJavaTypeConverter().toJavaType((Schema)innerSchema);
        return schema.features().enabled(SerdeFeature.UNWRAP_SINGLES) ? this.handleUnwrapped(innerSchema, formatProps, config, srFactory, targetType, isKey) : this.handleWrapped(innerSchema, formatProps, config, srFactory, targetType, isKey);
    }

    private <T> Serde<List<?>> handleUnwrapped(ConnectSchema innerSchema, Map<String, String> formatProps, KsqlConfig config, Supplier<SchemaRegistryClient> srFactory, Class<T> targetType, boolean isKey) {
        Serde<T> innerSerde = this.getConnectSerde(innerSchema, formatProps, config, srFactory, targetType, isKey);
        return Serdes.serdeFrom(SerdeUtils.unwrappedSerializer(innerSerde.serializer(), targetType), SerdeUtils.unwrappedDeserializer(innerSerde.deserializer()));
    }

    private Serde<List<?>> handleWrapped(ConnectSchema innerSchema, Map<String, String> formatProps, KsqlConfig config, Supplier<SchemaRegistryClient> srFactory, Class<?> targetType, boolean isKey) {
        if (!targetType.equals(Struct.class)) {
            throw new IllegalArgumentException("Expected STRUCT, got " + targetType);
        }
        Serde<Struct> connectSerde = this.getConnectSerde(innerSchema, formatProps, config, srFactory, Struct.class, isKey);
        return Serdes.serdeFrom((Serializer)new ListToStructSerializer((Serializer<Struct>)connectSerde.serializer(), innerSchema), (Deserializer)new StructToListDeserializer((Deserializer<Struct>)connectSerde.deserializer(), innerSchema.fields().size()));
    }

    protected abstract ConnectSchemaTranslator getConnectSchemaTranslator(Map<String, String> var1);

    protected abstract <T> Serde<T> getConnectSerde(ConnectSchema var1, Map<String, String> var2, KsqlConfig var3, Supplier<SchemaRegistryClient> var4, Class<T> var5, boolean var6);

    private static class StructToListDeserializer
    implements Deserializer<List<?>> {
        private final Deserializer<Struct> inner;
        private final int numColumns;

        StructToListDeserializer(Deserializer<Struct> deserializer, int numColumns) {
            this.inner = Objects.requireNonNull(deserializer, "deserializer");
            this.numColumns = numColumns;
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.inner.configure(configs, isKey);
        }

        public List<?> deserialize(String topic, byte[] bytes) {
            if (bytes == null) {
                return null;
            }
            Struct struct = (Struct)this.inner.deserialize(topic, bytes);
            List fields = struct.schema().fields();
            SerdeUtils.throwOnColumnCountMismatch(this.numColumns, fields.size(), false, topic);
            ArrayList<Object> values = new ArrayList<Object>(this.numColumns);
            for (Field field : fields) {
                values.add(struct.get(field));
            }
            return values;
        }

        public void close() {
            this.inner.close();
        }
    }

    private static class ListToStructSerializer
    implements Serializer<List<?>> {
        private final Serializer<Struct> inner;
        private final ConnectSchema structSchema;

        ListToStructSerializer(Serializer<Struct> inner, ConnectSchema structSchema) {
            this.inner = Objects.requireNonNull(inner, "inner");
            this.structSchema = Objects.requireNonNull(structSchema, "structSchema");
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.inner.configure(configs, isKey);
        }

        public byte[] serialize(String topic, List<?> values) {
            if (values == null) {
                return null;
            }
            List fields = this.structSchema.fields();
            SerdeUtils.throwOnColumnCountMismatch(fields.size(), values.size(), true, topic);
            Struct struct = new Struct((Schema)this.structSchema);
            Iterator fIt = fields.iterator();
            Iterator<?> vIt = values.iterator();
            while (fIt.hasNext()) {
                ListToStructSerializer.putField(struct, (Field)fIt.next(), vIt.next());
            }
            return this.inner.serialize(topic, (Object)struct);
        }

        public void close() {
            this.inner.close();
        }

        private static void putField(Struct struct, Field field, Object value) {
            try {
                struct.put(field, value);
            }
            catch (DataException e) {
                if (!(value instanceof Struct)) {
                    throw e;
                }
                throw new SerializationException("Failed to prepare Struct value field '" + field.name() + "' for serialization. This could happen if the value was produced by a user-defined function where the schema has non-optional return types. ksqlDB requires all schemas to be optional at all levels of the Struct: the Struct itself, schemas for all fields within the Struct, and so on.", (Throwable)e);
            }
        }
    }
}

