package org.nuxeo.runtime.codec;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.message.RawMessageDecoder;
import org.apache.avro.message.RawMessageEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.codec.AvroConfluentCodec;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;

/* loaded from: input_file:org/nuxeo/runtime/codec/AvroRecordCodec.class */
public class AvroRecordCodec<T extends Record> implements Codec<T> {
    private static final Logger log = LogManager.getLogger(AvroRecordCodec.class);
    public static final String NAME = "avroRecord";
    public static final String RECORD_KEY = "recordKey";
    public static final String RECORD_WATERMARK = "recordWatermark";
    public static final String RECORD_TIMESTAMP = "recordTimestamp";
    public static final String RECORD_FLAGS = "recordFlags";
    protected final Schema schema;
    protected final int schemaId;
    protected final Schema messageSchema;
    protected final int messageSchemaId;
    protected final RawMessageDecoder<GenericRecord> messageDecoder;
    protected final RawMessageEncoder<GenericRecord> messageEncoder;
    protected final KafkaAvroSerializer serializer;
    protected final RawMessageEncoder<GenericRecord> encoder;
    protected final SchemaRegistryClient client;

    public AvroRecordCodec(Schema schema, String str) {
        this.messageSchema = schema;
        this.client = AvroConfluentCodec.getRegistryClient(str);
        this.serializer = new KafkaAvroSerializer(this.client);
        this.schema = addRecordFieldsToSchema(schema);
        log.trace("msg schema: {}", new Supplier[]{() -> {
            return this.messageSchema.toString(true);
        }});
        log.trace("rec + msg schema: {}", new Supplier[]{() -> {
            return this.schema.toString(true);
        }});
        try {
            this.messageSchemaId = this.client.register(schema.getName(), schema);
            this.schemaId = this.client.register(this.schema.getName(), this.schema);
            this.encoder = new RawMessageEncoder<>(GenericData.get(), this.schema);
            this.messageDecoder = new RawMessageDecoder<>(GenericData.get(), schema);
            this.messageEncoder = new RawMessageEncoder<>(GenericData.get(), schema);
        } catch (RestClientException | IOException e) {
            throw new StreamRuntimeException(e);
        }
    }

    public AvroRecordCodec(String str, String str2) throws ClassNotFoundException {
        this(ReflectData.get().getSchema(Class.forName(str)), str2);
    }

    public String getName() {
        return NAME;
    }

    public byte[] encode(T t) {
        try {
            GenericRecord createRecordFromMessage = createRecordFromMessage((GenericRecord) this.messageDecoder.decode(t.getData(), (Object) null));
            createRecordFromMessage.put(RECORD_KEY, t.getKey());
            createRecordFromMessage.put(RECORD_WATERMARK, Long.valueOf(t.getWatermark()));
            createRecordFromMessage.put(RECORD_TIMESTAMP, Long.valueOf(Watermark.ofValue(t.getWatermark()).getTimestamp()));
            createRecordFromMessage.put(RECORD_FLAGS, Integer.valueOf(Byte.valueOf(t.getFlagsAsByte()).intValue()));
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            byteArrayOutputStream.write(0);
            try {
                byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(this.schemaId).array());
                byteArrayOutputStream.write(this.encoder.encode(createRecordFromMessage).array());
                return byteArrayOutputStream.toByteArray();
            } catch (IOException e) {
                throw new StreamRuntimeException(e);
            }
        } catch (IOException e2) {
            throw new IllegalArgumentException(e2);
        }
    }

    protected GenericRecord createRecordFromMessage(GenericRecord genericRecord) {
        GenericData.Record record = new GenericData.Record(this.schema);
        for (Schema.Field field : genericRecord.getSchema().getFields()) {
            record.put(field.name(), genericRecord.get(field.pos()));
        }
        return record;
    }

    protected Schema addRecordFieldsToSchema(Schema schema) {
        ArrayList arrayList = new ArrayList();
        for (Schema.Field field : schema.getFields()) {
            arrayList.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
        }
        arrayList.add(new Schema.Field(RECORD_KEY, (Schema) SchemaBuilder.builder().stringType(), "record key", (Object) null));
        arrayList.add(new Schema.Field(RECORD_WATERMARK, (Schema) SchemaBuilder.builder().longType(), "record watermark", 0L));
        arrayList.add(new Schema.Field(RECORD_TIMESTAMP, (Schema) SchemaBuilder.builder().longType(), "record timestamp", 0L));
        arrayList.add(new Schema.Field(RECORD_FLAGS, (Schema) SchemaBuilder.builder().intType(), "record flags", 0));
        return Schema.createRecord(schema.getName() + "Record", schema.getDoc(), schema.getNamespace(), false, arrayList);
    }

    /* renamed from: decode, reason: merged with bridge method [inline-methods] */
    public T m3decode(byte[] bArr) {
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        if (wrap.get() != 0) {
            throw new IllegalArgumentException("Invalid Avro Confluent message, expecting magic byte");
        }
        int i = wrap.getInt();
        try {
            try {
                GenericRecord genericRecord = (GenericRecord) new RawMessageDecoder(GenericData.get(), this.client.getById(i), this.schema).decode(wrap.slice(), (Object) null);
                log.trace("GenericRecord: {}", genericRecord);
                String obj = genericRecord.get(RECORD_KEY).toString();
                long longValue = ((Long) genericRecord.get(RECORD_WATERMARK)).longValue();
                int intValue = ((Integer) genericRecord.get(RECORD_FLAGS)).intValue();
                T t = (T) new Record(obj, this.messageEncoder.encode(genericRecord).array(), longValue);
                t.setFlags((byte) intValue);
                return t;
            } catch (IOException | IndexOutOfBoundsException e) {
                throw new IllegalArgumentException(e);
            }
        } catch (IOException | RestClientException e2) {
            throw new StreamRuntimeException("Cannot retrieve write schema id: " + i, e2);
        }
    }
}
