/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source.internal;

import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
import io.streamthoughts.kafka.connect.filepulse.data.ArraySchema;
import io.streamthoughts.kafka.connect.filepulse.data.DataException;
import io.streamthoughts.kafka.connect.filepulse.data.MapSchema;
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
import io.streamthoughts.kafka.connect.filepulse.data.SchemaMapper;
import io.streamthoughts.kafka.connect.filepulse.data.SchemaMapperWithValue;
import io.streamthoughts.kafka.connect.filepulse.data.SimpleSchema;
import io.streamthoughts.kafka.connect.filepulse.data.StructSchema;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedField;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.schema.SchemaContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectSchemaMapper
implements SchemaMapper<org.apache.kafka.connect.data.Schema>,
SchemaMapperWithValue<SchemaAndValue> {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectSchemaMapper.class);
    private static final Object DEFAULT_NULL_VALUE = null;
    private static final Pattern REGEX = Pattern.compile("[_\\-.]");
    private final SchemaContext context = new SchemaContext();
    private boolean keepLeadingUnderscoreCharacters = false;

    @VisibleForTesting
    String normalizeSchemaName(String name) {
        String toNormalize = name;
        StringBuilder prefix = new StringBuilder();
        if (this.keepLeadingUnderscoreCharacters) {
            StringBuilder sb = new StringBuilder(name);
            while (sb.length() > 0 && sb.charAt(0) == '_') {
                prefix.append("_");
                sb.deleteCharAt(0);
            }
            toNormalize = sb.toString();
        }
        return String.valueOf(prefix) + Arrays.stream(REGEX.split(toNormalize)).filter(s -> !s.isEmpty()).map(it -> it.substring(0, 1).toUpperCase(Locale.getDefault()) + it.substring(1)).collect(Collectors.joining());
    }

    public void setKeepLeadingUnderscoreCharacters(boolean keepLeadingUnderscoreCharacters) {
        this.keepLeadingUnderscoreCharacters = keepLeadingUnderscoreCharacters;
    }

    @Override
    public org.apache.kafka.connect.data.Schema map(MapSchema schema, boolean optional) {
        org.apache.kafka.connect.data.Schema keySchema = schema.keySchema().map(this, optional);
        org.apache.kafka.connect.data.Schema valueSchema = schema.valueSchema().map(this, optional);
        SchemaBuilder builder = SchemaBuilder.map((org.apache.kafka.connect.data.Schema)keySchema, (org.apache.kafka.connect.data.Schema)valueSchema);
        return (optional ? ConnectSchemaMapper.asNullableAndOptional(builder) : builder).build();
    }

    @Override
    public org.apache.kafka.connect.data.Schema map(ArraySchema schema, boolean optional) {
        org.apache.kafka.connect.data.Schema valueSchema = schema.valueSchema().map(this, optional);
        SchemaBuilder builder = SchemaBuilder.array((org.apache.kafka.connect.data.Schema)valueSchema);
        return (optional ? ConnectSchemaMapper.asNullableAndOptional(builder) : builder).build();
    }

    @Override
    public org.apache.kafka.connect.data.Schema map(StructSchema schema, boolean optional) {
        Object schemaName = schema.name();
        if (schemaName != null && schema.namespace() != null) {
            schemaName = schema.namespace() + "." + (String)schemaName;
        }
        SchemaBuilder sb = SchemaBuilder.struct().name((String)schemaName).doc(schema.doc());
        if (optional) {
            sb.optional();
        }
        for (TypedField field : schema) {
            String fieldName = field.name();
            Schema fieldSchema = field.schema();
            if (fieldSchema.type() != Type.NULL && fieldSchema.isResolvable()) {
                String fieldSchemaName = this.normalizeSchemaName(fieldName);
                this.mayUpdateSchemaWithName(fieldSchema, fieldSchemaName);
                sb.field(fieldName, fieldSchema.map(this, true));
                continue;
            }
            LOG.debug("Ignore field '{}', schema type is either NULL or cannot be resolved.", (Object)fieldName);
        }
        return this.context.buildSchemaWithCyclicSchemaWrapper(sb.build());
    }

    private void mayUpdateSchemaWithName(Schema schema, String schemaName) {
        StructSchema structSchema;
        if (schema.type() == Type.ARRAY) {
            ArraySchema arraySchema = (ArraySchema)schema;
            this.mayUpdateSchemaWithName(arraySchema.valueSchema(), schemaName);
        }
        if (schema.type() == Type.MAP) {
            MapSchema mapSchema = (MapSchema)schema;
            this.mayUpdateSchemaWithName(mapSchema.valueSchema(), schemaName);
        }
        if (schema.type() == Type.STRUCT && (structSchema = (StructSchema)schema).name() == null) {
            structSchema.name(schemaName);
        }
    }

    @Override
    public org.apache.kafka.connect.data.Schema map(SimpleSchema schema, boolean optional) {
        SchemaBuilder builder = new SchemaBuilder(schema.type().schemaType());
        return (optional ? ConnectSchemaMapper.asNullableAndOptional(builder) : builder).build();
    }

    private static SchemaBuilder asNullableAndOptional(SchemaBuilder sb) {
        return sb.optional().defaultValue(DEFAULT_NULL_VALUE);
    }

    @Override
    public SchemaAndValue map(MapSchema schema, Map<String, ?> map, boolean optional) {
        org.apache.kafka.connect.data.Schema connectSchema = schema.map(this, optional);
        return new SchemaAndValue(connectSchema, map);
    }

    @Override
    public SchemaAndValue map(ArraySchema schema, Collection<?> array, boolean optional) {
        org.apache.kafka.connect.data.Schema connectSchema = schema.map(this, optional);
        return new SchemaAndValue(connectSchema, array);
    }

    @Override
    public SchemaAndValue map(StructSchema schema, TypedStruct struct, boolean optional) {
        return this.map(schema.map(this, optional), struct);
    }

    @Override
    public SchemaAndValue map(SimpleSchema schema, Object value, boolean optional) {
        return new SchemaAndValue(schema.map(this, optional), value);
    }

    @Override
    public SchemaAndValue map(org.apache.kafka.connect.data.Schema connectSchema, TypedStruct value) {
        return new SchemaAndValue(connectSchema, (Object)ConnectSchemaMapper.toConnectStruct(connectSchema, value));
    }

    private static Struct toConnectStruct(org.apache.kafka.connect.data.Schema connectSchema, TypedStruct struct) {
        Struct connectStruct = new Struct(connectSchema);
        for (Field connectField : connectSchema.fields()) {
            String recordName = connectSchema.name();
            String fieldName = connectField.name();
            boolean isOptional = connectField.schema().isOptional();
            if (!struct.has(fieldName)) {
                if (isOptional) continue;
                throw new DataException("Failed to convert record to connect data. Missing required field '" + fieldName + "' for record '" + recordName + "'");
            }
            TypedValue typed = struct.get(fieldName);
            org.apache.kafka.connect.data.Schema connectFieldSchema = connectField.schema();
            Schema.Type dataSchemaType = typed.type().schemaType();
            Schema.Type schemaType = connectFieldSchema.type();
            if (schemaType != dataSchemaType) {
                if (schemaType.equals((Object)Schema.Type.ARRAY)) {
                    Schema.Type arrayValueType = connectFieldSchema.valueSchema().type();
                    if (!arrayValueType.equals((Object)dataSchemaType)) {
                        throw new DataException("Failed to convert record field '" + recordName + "." + fieldName + "' to connect data. Types do not match Array[" + String.valueOf(arrayValueType) + "]<>Array[" + String.valueOf(dataSchemaType) + "]");
                    }
                    typed = TypedValue.array(Collections.singleton(typed.value()), typed.schema());
                } else if (dataSchemaType.isPrimitive()) {
                    boolean isNumber = typed.type().isNumber();
                    if (schemaType == Schema.Type.STRING) {
                        typed = typed.as(Type.STRING);
                    } else if (schemaType == Schema.Type.FLOAT64 && isNumber) {
                        typed = typed.as(Type.DOUBLE);
                    } else if (schemaType == Schema.Type.INT64 && typed.type() == Type.INTEGER) {
                        typed = typed.as(Type.LONG);
                    }
                } else {
                    throw new DataException("Failed to convert record field '" + recordName + "." + fieldName + "' to connect data. Types do not match " + String.valueOf(schemaType) + "<>" + String.valueOf(dataSchemaType));
                }
            }
            connectStruct.put(connectField, ConnectSchemaMapper.toConnectObject(connectFieldSchema, typed));
        }
        return connectStruct;
    }

    private static Object toConnectObject(org.apache.kafka.connect.data.Schema schema, TypedValue typed) {
        if (schema.type() != typed.type().schemaType()) {
            throw new DataException("types do not match " + String.valueOf(schema.type()) + "<>" + String.valueOf((Object)typed.type()));
        }
        if (schema.type() == Schema.Type.STRUCT) {
            return ConnectSchemaMapper.toConnectStruct(schema, (TypedStruct)typed.value());
        }
        if (schema.type() == Schema.Type.MAP) {
            org.apache.kafka.connect.data.Schema connectValueSchema = schema.valueSchema();
            org.apache.kafka.connect.data.Schema connectKeySchema = schema.keySchema();
            Schema valueSchema = ((MapSchema)typed.schema()).valueSchema();
            Schema keySchema = ((MapSchema)typed.schema()).keySchema();
            return typed.getMap().entrySet().stream().collect(Collectors.toMap(e -> {
                TypedValue value = TypedValue.of(e.getKey(), keySchema);
                return ConnectSchemaMapper.toConnectObject(connectKeySchema, value);
            }, e -> {
                Object converted = valueSchema.type().convert(e.getValue());
                TypedValue elemValue = TypedValue.of(converted, valueSchema);
                return ConnectSchemaMapper.toConnectObject(connectValueSchema, elemValue);
            }));
        }
        if (schema.type() == Schema.Type.ARRAY) {
            org.apache.kafka.connect.data.Schema connectValueSchema = schema.valueSchema();
            Schema valueSchema = ((ArraySchema)typed.schema()).valueSchema();
            return typed.getArray().stream().map(value -> {
                Object converted = valueSchema.type().convert(value);
                TypedValue elemValue = TypedValue.of(converted, valueSchema);
                return ConnectSchemaMapper.toConnectObject(connectValueSchema, elemValue);
            }).collect(Collectors.toList());
        }
        return typed.value();
    }
}

