/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.codec;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.message.RawMessageDecoder;
import org.apache.avro.message.RawMessageEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.codec.AvroConfluentCodec;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;

public class AvroRecordCodec<T extends Record>
implements Codec<T> {
    private static final Logger log = LogManager.getLogger(AvroRecordCodec.class);
    public static final String NAME = "avroRecord";
    public static final String RECORD_KEY = "recordKey";
    public static final String RECORD_WATERMARK = "recordWatermark";
    public static final String RECORD_TIMESTAMP = "recordTimestamp";
    public static final String RECORD_FLAGS = "recordFlags";
    protected final Schema schema;
    protected final int schemaId;
    protected final Schema messageSchema;
    protected final int messageSchemaId;
    protected final RawMessageDecoder<GenericRecord> messageDecoder;
    protected final RawMessageEncoder<GenericRecord> messageEncoder;
    protected final KafkaAvroSerializer serializer;
    protected final RawMessageEncoder<GenericRecord> encoder;
    protected final SchemaRegistryClient client;

    public AvroRecordCodec(Schema messageSchema, String schemaRegistryUrls) {
        this.messageSchema = messageSchema;
        this.client = AvroConfluentCodec.getRegistryClient((String)schemaRegistryUrls);
        this.serializer = new KafkaAvroSerializer(this.client);
        this.schema = this.addRecordFieldsToSchema(messageSchema);
        log.trace("msg schema: {}", new Supplier[]{() -> this.messageSchema.toString(true)});
        log.trace("rec + msg schema: {}", new Supplier[]{() -> this.schema.toString(true)});
        try {
            this.messageSchemaId = this.client.register(messageSchema.getName(), messageSchema);
            this.schemaId = this.client.register(this.schema.getName(), this.schema);
        }
        catch (RestClientException | IOException e) {
            throw new StreamRuntimeException(e);
        }
        this.encoder = new RawMessageEncoder(GenericData.get(), this.schema);
        this.messageDecoder = new RawMessageDecoder(GenericData.get(), messageSchema);
        this.messageEncoder = new RawMessageEncoder(GenericData.get(), messageSchema);
    }

    public AvroRecordCodec(String messageClassName, String schemaRegistryUrls) throws ClassNotFoundException {
        this(ReflectData.get().getSchema(Class.forName(messageClassName)), schemaRegistryUrls);
    }

    public String getName() {
        return NAME;
    }

    public byte[] encode(T record) {
        try {
            GenericRecord message = (GenericRecord)this.messageDecoder.decode(record.getData(), null);
            GenericRecord newRecord = this.createRecordFromMessage(message);
            newRecord.put(RECORD_KEY, (Object)record.getKey());
            newRecord.put(RECORD_WATERMARK, (Object)record.getWatermark());
            newRecord.put(RECORD_TIMESTAMP, (Object)Watermark.ofValue((long)record.getWatermark()).getTimestamp());
            newRecord.put(RECORD_FLAGS, (Object)Byte.valueOf(record.getFlagsAsByte()).intValue());
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0);
            try {
                out.write(ByteBuffer.allocate(4).putInt(this.schemaId).array());
                out.write(this.encoder.encode((Object)newRecord).array());
            }
            catch (IOException e) {
                throw new StreamRuntimeException((Throwable)e);
            }
            return out.toByteArray();
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    protected GenericRecord createRecordFromMessage(GenericRecord message) {
        GenericData.Record ret = new GenericData.Record(this.schema);
        for (Schema.Field field : message.getSchema().getFields()) {
            Object value = message.get(field.pos());
            ret.put(field.name(), value);
        }
        return ret;
    }

    protected Schema addRecordFieldsToSchema(Schema schema) {
        ArrayList<Schema.Field> fields = new ArrayList<Schema.Field>();
        for (Schema.Field field : schema.getFields()) {
            fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
        }
        fields.add(new Schema.Field(RECORD_KEY, (Schema)SchemaBuilder.builder().stringType(), "record key", null));
        fields.add(new Schema.Field(RECORD_WATERMARK, (Schema)SchemaBuilder.builder().longType(), "record watermark", (Object)0L));
        fields.add(new Schema.Field(RECORD_TIMESTAMP, (Schema)SchemaBuilder.builder().longType(), "record timestamp", (Object)0L));
        fields.add(new Schema.Field(RECORD_FLAGS, (Schema)SchemaBuilder.builder().intType(), "record flags", (Object)0));
        return Schema.createRecord((String)(schema.getName() + "Record"), (String)schema.getDoc(), (String)schema.getNamespace(), (boolean)false, fields);
    }

    public T decode(byte[] data) {
        Schema writeSchema;
        ByteBuffer buffer = ByteBuffer.wrap(data);
        if (buffer.get() != 0) {
            throw new IllegalArgumentException("Invalid Avro Confluent message, expecting magic byte");
        }
        int id = buffer.getInt();
        try {
            writeSchema = this.client.getById(id);
        }
        catch (RestClientException | IOException e) {
            throw new StreamRuntimeException("Cannot retrieve write schema id: " + id, e);
        }
        RawMessageDecoder decoder = new RawMessageDecoder(GenericData.get(), writeSchema, this.schema);
        try {
            GenericRecord rec = (GenericRecord)decoder.decode(buffer.slice(), null);
            log.trace("GenericRecord: {}", (Object)rec);
            String key = rec.get(RECORD_KEY).toString();
            long wm = (Long)rec.get(RECORD_WATERMARK);
            int flag = (Integer)rec.get(RECORD_FLAGS);
            byte[] msgData = this.messageEncoder.encode((Object)rec).array();
            Record ret = new Record(key, msgData, wm);
            ret.setFlags((byte)flag);
            return (T)ret;
        }
        catch (IOException | IndexOutOfBoundsException e) {
            throw new IllegalArgumentException(e);
        }
    }
}

