/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataConverter {
    private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
    private static final Converter JSON_CONVERTER;
    private static final HeaderConverter HEADER_CONVERTER;
    protected static final String MAP_KEY = "key";
    protected static final String MAP_VALUE = "value";
    protected static final String TIMESTAMP_FIELD = "@timestamp";
    private ObjectMapper objectMapper;
    private final ElasticsearchSinkConnectorConfig config;

    public DataConverter(ElasticsearchSinkConnectorConfig config) {
        this.config = config;
        this.objectMapper = new ObjectMapper();
    }

    private String convertKey(Schema keySchema, Object key) {
        Schema.Type schemaType;
        if (key == null) {
            throw new DataException("Key is used as document id and can not be null.");
        }
        if (String.valueOf(key).isEmpty()) {
            throw new DataException("Key is used as document id and can not be empty.");
        }
        if (keySchema == null) {
            schemaType = ConnectSchema.schemaType(key.getClass());
            if (schemaType == null) {
                throw new DataException("Java class " + key.getClass() + " does not have corresponding schema type.");
            }
        } else {
            schemaType = keySchema.type();
        }
        switch (schemaType) {
            case INT8: 
            case INT16: 
            case INT32: 
            case INT64: 
            case STRING: {
                return String.valueOf(key);
            }
        }
        throw new DataException(schemaType.name() + " is not supported as the document id.");
    }

    public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
        String id;
        if (record.value() == null) {
            switch (this.config.behaviorOnNullValues()) {
                case IGNORE: {
                    log.trace("Ignoring {} with null value.", (Object)DataConverter.recordString(record));
                    return null;
                }
                case DELETE: {
                    if (record.key() == null) {
                        log.trace("Ignoring {} with null key, since the record key is used as the ID of the index", (Object)DataConverter.recordString(record));
                        return null;
                    }
                    log.trace("Deleting {} from Elasticsearch", (Object)DataConverter.recordString(record));
                    break;
                }
                default: {
                    throw new DataException(String.format("%s has a null value (to ignore future records like this change the configuration property '%s' from '%s' to '%s')", new Object[]{DataConverter.recordString(record), "behavior.on.null.values", ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.FAIL, ElasticsearchSinkConnectorConfig.BehaviorOnNullValues.IGNORE}));
                }
            }
        }
        String string = id = this.config.shouldIgnoreKey(record.topic()) ? String.format("%s+%d+%d", record.topic(), record.kafkaPartition(), record.kafkaOffset()) : this.convertKey(record.keySchema(), record.key());
        if (record.value() == null) {
            return this.maybeAddExternalVersioning((DocWriteRequest<?>)new DeleteRequest(index).id(id), record);
        }
        String payload = this.getPayload(record);
        payload = this.maybeAddTimestamp(payload, record.timestamp());
        switch (this.config.writeMethod()) {
            case UPSERT: {
                return new UpdateRequest(index, id).doc(payload, XContentType.JSON).upsert(payload, XContentType.JSON).retryOnConflict(Math.min(this.config.maxInFlightRequests(), 5));
            }
            case INSERT: {
                DocWriteRequest.OpType opType = this.config.isDataStream() ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX;
                IndexRequest req = new IndexRequest(index).source(payload, XContentType.JSON).opType(opType);
                if (this.config.useAutogeneratedIds()) {
                    return req;
                }
                return this.maybeAddExternalVersioning((DocWriteRequest<?>)req.id(id), record);
            }
        }
        return null;
    }

    private String getPayload(SinkRecord record) {
        if (record.value() == null) {
            return null;
        }
        Schema schema = this.config.shouldIgnoreSchema(record.topic()) ? record.valueSchema() : this.preProcessSchema(record.valueSchema());
        Object value = this.config.shouldIgnoreSchema(record.topic()) ? record.value() : this.preProcessValue(record.value(), record.valueSchema(), schema);
        byte[] rawJsonPayload = JSON_CONVERTER.fromConnectData(record.topic(), schema, value);
        return new String(rawJsonPayload, StandardCharsets.UTF_8);
    }

    private String maybeAddTimestamp(String payload, Long timestamp) {
        block7: {
            if (!this.config.isDataStream()) {
                return payload;
            }
            try {
                JsonNode jsonNode = this.objectMapper.readTree(payload);
                if (!jsonNode.isObject()) {
                    throw new DataException("Top level payload contains data of Json type " + jsonNode.getNodeType() + ". Required Json object.");
                }
                if (!this.config.dataStreamTimestampField().isEmpty()) {
                    for (String timestampField : this.config.dataStreamTimestampField()) {
                        if (jsonNode.has(timestampField)) {
                            ((ObjectNode)jsonNode).put(TIMESTAMP_FIELD, jsonNode.get(timestampField).asText());
                            return this.objectMapper.writeValueAsString((Object)jsonNode);
                        }
                        log.debug("Timestamp field {} is not present in payload. This record may fail or be skipped", (Object)timestampField);
                    }
                    break block7;
                }
                ((ObjectNode)jsonNode).put(TIMESTAMP_FIELD, timestamp);
                return this.objectMapper.writeValueAsString((Object)jsonNode);
            }
            catch (JsonProcessingException jsonProcessingException) {
                // empty catch block
            }
        }
        return payload;
    }

    private DocWriteRequest<?> maybeAddExternalVersioning(DocWriteRequest<?> request, SinkRecord record) {
        if (!this.config.isDataStream() && !this.config.shouldIgnoreKey(record.topic())) {
            request.versionType(VersionType.EXTERNAL);
            if (this.config.hasExternalVersionHeader()) {
                Header versionHeader = record.headers().lastWithName(this.config.externalVersionHeader());
                byte[] versionValue = HEADER_CONVERTER.fromConnectHeader(record.topic(), versionHeader.key(), versionHeader.schema(), versionHeader.value());
                try {
                    request.version(Long.parseLong(new String(versionValue, StandardCharsets.UTF_8)));
                }
                catch (NumberFormatException e) {
                    throw new ConnectException("Error converting to long: " + new String(versionValue, StandardCharsets.UTF_8), (Throwable)e);
                }
            } else {
                request.version(record.kafkaOffset());
            }
        }
        return request;
    }

    Schema preProcessSchema(Schema schema) {
        if (schema == null) {
            return null;
        }
        String schemaName = schema.name();
        if (schemaName != null) {
            switch (schemaName) {
                case "org.apache.kafka.connect.data.Decimal": {
                    return this.copySchemaBasics(schema, SchemaBuilder.float64()).build();
                }
                case "org.apache.kafka.connect.data.Date": 
                case "org.apache.kafka.connect.data.Time": 
                case "org.apache.kafka.connect.data.Timestamp": {
                    return schema;
                }
            }
        }
        Schema.Type schemaType = schema.type();
        switch (schemaType) {
            case ARRAY: {
                return this.preProcessArraySchema(schema);
            }
            case MAP: {
                return this.preProcessMapSchema(schema);
            }
            case STRUCT: {
                return this.preProcessStructSchema(schema);
            }
        }
        return schema;
    }

    private Schema preProcessArraySchema(Schema schema) {
        Schema valSchema = this.preProcessSchema(schema.valueSchema());
        return this.copySchemaBasics(schema, SchemaBuilder.array((Schema)valSchema)).build();
    }

    private Schema preProcessMapSchema(Schema schema) {
        Schema keySchema = schema.keySchema();
        Schema valueSchema = schema.valueSchema();
        String keyName = keySchema.name() == null ? keySchema.type().name() : keySchema.name();
        String valueName = valueSchema.name() == null ? valueSchema.type().name() : valueSchema.name();
        Schema preprocessedKeySchema = this.preProcessSchema(keySchema);
        Schema preprocessedValueSchema = this.preProcessSchema(valueSchema);
        if (this.config.useCompactMapEntries() && keySchema.type() == Schema.Type.STRING) {
            SchemaBuilder result = SchemaBuilder.map((Schema)preprocessedKeySchema, (Schema)preprocessedValueSchema);
            return this.copySchemaBasics(schema, result).build();
        }
        Schema elementSchema = SchemaBuilder.struct().name(keyName + "-" + valueName).field(MAP_KEY, preprocessedKeySchema).field(MAP_VALUE, preprocessedValueSchema).build();
        return this.copySchemaBasics(schema, SchemaBuilder.array((Schema)elementSchema)).build();
    }

    private Schema preProcessStructSchema(Schema schema) {
        SchemaBuilder builder = this.copySchemaBasics(schema, SchemaBuilder.struct().name(schema.name()));
        for (Field field : schema.fields()) {
            builder.field(field.name(), this.preProcessSchema(field.schema()));
        }
        return builder.build();
    }

    private SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder target) {
        if (source.isOptional()) {
            target.optional();
        }
        if (source.defaultValue() != null && source.type() != Schema.Type.STRUCT) {
            Object defaultVal = this.preProcessValue(source.defaultValue(), source, (Schema)target);
            target.defaultValue(defaultVal);
        }
        return target;
    }

    Object preProcessValue(Object value, Schema schema, Schema newSchema) {
        Object result;
        if (schema == null) {
            return value;
        }
        if (value == null) {
            return this.preProcessNullValue(schema);
        }
        String schemaName = schema.name();
        if (schemaName != null && (result = this.preProcessLogicalValue(schemaName, value)) != null) {
            return result;
        }
        Schema.Type schemaType = schema.type();
        switch (schemaType) {
            case ARRAY: {
                return this.preProcessArrayValue(value, schema, newSchema);
            }
            case MAP: {
                return this.preProcessMapValue(value, schema, newSchema);
            }
            case STRUCT: {
                return this.preProcessStructValue(value, schema, newSchema);
            }
        }
        return value;
    }

    private Object preProcessNullValue(Schema schema) {
        if (schema.defaultValue() != null) {
            return schema.defaultValue();
        }
        if (schema.isOptional()) {
            return null;
        }
        throw new DataException("null value for field that is required and has no default value");
    }

    private Object preProcessLogicalValue(String schemaName, Object value) {
        switch (schemaName) {
            case "org.apache.kafka.connect.data.Decimal": {
                return ((BigDecimal)value).doubleValue();
            }
            case "org.apache.kafka.connect.data.Date": 
            case "org.apache.kafka.connect.data.Time": 
            case "org.apache.kafka.connect.data.Timestamp": {
                return value;
            }
        }
        return null;
    }

    private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
        Collection collection = (Collection)value;
        ArrayList<Object> result = new ArrayList<Object>();
        for (Object element : collection) {
            result.add(this.preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));
        }
        return result;
    }

    private Object preProcessMapValue(Object value, Schema schema, Schema newSchema) {
        Schema keySchema = schema.keySchema();
        Schema valueSchema = schema.valueSchema();
        Schema newValueSchema = newSchema.valueSchema();
        Map map = (Map)value;
        if (this.config.useCompactMapEntries() && keySchema.type() == Schema.Type.STRING) {
            HashMap<Object, Object> processedMap = new HashMap<Object, Object>();
            for (Map.Entry entry : map.entrySet()) {
                processedMap.put(this.preProcessValue(entry.getKey(), keySchema, newSchema.keySchema()), this.preProcessValue(entry.getValue(), valueSchema, newValueSchema));
            }
            return processedMap;
        }
        ArrayList<Struct> mapStructs = new ArrayList<Struct>();
        for (Map.Entry entry : map.entrySet()) {
            Struct mapStruct = new Struct(newValueSchema);
            Schema mapKeySchema = newValueSchema.field(MAP_KEY).schema();
            Schema mapValueSchema = newValueSchema.field(MAP_VALUE).schema();
            mapStruct.put(MAP_KEY, this.preProcessValue(entry.getKey(), keySchema, mapKeySchema));
            mapStruct.put(MAP_VALUE, this.preProcessValue(entry.getValue(), valueSchema, mapValueSchema));
            mapStructs.add(mapStruct);
        }
        return mapStructs;
    }

    private Object preProcessStructValue(Object value, Schema schema, Schema newSchema) {
        Struct struct = (Struct)value;
        Struct newStruct = new Struct(newSchema);
        for (Field field : schema.fields()) {
            Schema newFieldSchema = newSchema.field(field.name()).schema();
            Object converted = this.preProcessValue(struct.get(field), field.schema(), newFieldSchema);
            newStruct.put(field.name(), converted);
        }
        return newStruct;
    }

    private static String recordString(SinkRecord record) {
        return String.format("record from topic=%s partition=%s offset=%s", record.topic(), record.kafkaPartition(), record.kafkaOffset());
    }

    static {
        HEADER_CONVERTER = new SimpleHeaderConverter();
        JSON_CONVERTER = new JsonConverter();
        JSON_CONVERTER.configure(Collections.singletonMap("schemas.enable", "false"), false);
    }
}

