package io.confluent.ksql.serde.connect;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.SchemaTranslator;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeUtils;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;

/* loaded from: input_file:io/confluent/ksql/serde/connect/ConnectFormat.class */
public abstract class ConnectFormat implements Format {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/serde/connect/ConnectFormat$ListToStructSerializer.class */
    public static class ListToStructSerializer implements Serializer<List<?>> {
        private final Serializer<Struct> inner;
        private final ConnectSchema structSchema;

        ListToStructSerializer(Serializer<Struct> serializer, ConnectSchema connectSchema) {
            this.inner = (Serializer) Objects.requireNonNull(serializer, "inner");
            this.structSchema = (ConnectSchema) Objects.requireNonNull(connectSchema, "structSchema");
        }

        public void configure(Map<String, ?> map, boolean z) {
            this.inner.configure(map, z);
        }

        public byte[] serialize(String str, List<?> list) {
            if (list == null) {
                return null;
            }
            List fields = this.structSchema.fields();
            SerdeUtils.throwOnColumnCountMismatch(fields.size(), list.size(), true, str);
            Struct struct = new Struct(this.structSchema);
            Iterator it = fields.iterator();
            Iterator<?> it2 = list.iterator();
            while (it.hasNext()) {
                putField(struct, (Field) it.next(), it2.next());
            }
            return this.inner.serialize(str, struct);
        }

        public void close() {
            this.inner.close();
        }

        private static void putField(Struct struct, Field field, Object obj) {
            try {
                struct.put(field, obj);
            } catch (DataException e) {
                if (!(obj instanceof Struct)) {
                    throw e;
                }
                throw new SerializationException("Failed to prepare Struct value field '" + field.name() + "' for serialization. This could happen if the value was produced by a user-defined function where the schema has non-optional return types. ksqlDB requires all schemas to be optional at all levels of the Struct: the Struct itself, schemas for all fields within the Struct, and so on.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/serde/connect/ConnectFormat$StructToListDeserializer.class */
    public static class StructToListDeserializer implements Deserializer<List<?>> {
        private final Deserializer<Struct> inner;
        private final int numColumns;

        StructToListDeserializer(Deserializer<Struct> deserializer, int i) {
            this.inner = (Deserializer) Objects.requireNonNull(deserializer, "deserializer");
            this.numColumns = i;
        }

        public void configure(Map<String, ?> map, boolean z) {
            this.inner.configure(map, z);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public List<?> m8deserialize(String str, byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            Struct struct = (Struct) this.inner.deserialize(str, bArr);
            List fields = struct.schema().fields();
            SerdeUtils.throwOnColumnCountMismatch(this.numColumns, fields.size(), false, str);
            ArrayList arrayList = new ArrayList(this.numColumns);
            Iterator it = fields.iterator();
            while (it.hasNext()) {
                arrayList.add(struct.get((Field) it.next()));
            }
            return arrayList;
        }

        public void close() {
            this.inner.close();
        }
    }

    @Override // io.confluent.ksql.serde.Format
    public SchemaTranslator getSchemaTranslator(Map<String, String> map) {
        return new ConnectFormatSchemaTranslator(this, map, ConnectSchemaUtil::toKsqlSchema);
    }

    @Override // io.confluent.ksql.serde.Format
    public Serde<List<?>> getSerde(PersistenceSchema persistenceSchema, Map<String, String> map, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, boolean z) {
        SerdeUtils.throwOnUnsupportedFeatures(persistenceSchema.features(), supportedFeatures());
        ConnectSchema applySinglesUnwrapping = SerdeUtils.applySinglesUnwrapping(ConnectSchemas.columnsToConnectSchema(persistenceSchema.columns()), persistenceSchema.features());
        Class<?> javaType = SchemaConverters.connectToJavaTypeConverter().toJavaType(applySinglesUnwrapping);
        return persistenceSchema.features().enabled(SerdeFeature.UNWRAP_SINGLES) ? handleUnwrapped(applySinglesUnwrapping, map, ksqlConfig, supplier, javaType, z) : handleWrapped(applySinglesUnwrapping, map, ksqlConfig, supplier, javaType, z);
    }

    private <T> Serde<List<?>> handleUnwrapped(ConnectSchema connectSchema, Map<String, String> map, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, Class<T> cls, boolean z) {
        Serde<T> connectSerde = getConnectSerde(connectSchema, map, ksqlConfig, supplier, cls, z);
        return Serdes.serdeFrom(SerdeUtils.unwrappedSerializer(connectSerde.serializer(), cls), SerdeUtils.unwrappedDeserializer(connectSerde.deserializer()));
    }

    private Serde<List<?>> handleWrapped(ConnectSchema connectSchema, Map<String, String> map, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, Class<?> cls, boolean z) {
        if (!cls.equals(Struct.class)) {
            throw new IllegalArgumentException("Expected STRUCT, got " + cls);
        }
        Serde connectSerde = getConnectSerde(connectSchema, map, ksqlConfig, supplier, Struct.class, z);
        return Serdes.serdeFrom(new ListToStructSerializer(connectSerde.serializer(), connectSchema), new StructToListDeserializer(connectSerde.deserializer(), connectSchema.fields().size()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract ConnectSchemaTranslator getConnectSchemaTranslator(Map<String, String> map);

    protected abstract <T> Serde<T> getConnectSerde(ConnectSchema connectSchema, Map<String, String> map, KsqlConfig ksqlConfig, Supplier<SchemaRegistryClient> supplier, Class<T> cls, boolean z);

    @Override // io.confluent.ksql.serde.Format
    public boolean supportsKeyType(SqlType sqlType) {
        return true;
    }
}
