package org.nuxeo.lib.stream.codec;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.message.RawMessageDecoder;
import org.apache.avro.message.RawMessageEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.StreamRuntimeException;

/* loaded from: input_file:org/nuxeo/lib/stream/codec/AvroConfluentCodec.class */
public class AvroConfluentCodec<T> implements Codec<T> {
    private static final Log log = LogFactory.getLog(AvroConfluentCodec.class);
    public static final String NAME = "avroConfluent";
    protected static final byte MAGIC_BYTE = 0;
    protected static final int ID_SIZE = 4;
    protected static final int DEFAULT_IDENTITY_MAP_CAPACITY = 10;
    protected final Class<T> messageClass;
    protected final Schema schema;
    protected final int schemaId;
    protected final String schemaName;
    protected final KafkaAvroSerializer serializer;
    protected final RawMessageEncoder<T> encoder;
    protected final SchemaRegistryClient client;

    public AvroConfluentCodec(Class<T> cls, String str) {
        this.messageClass = cls;
        this.schema = ReflectData.get().getSchema(cls);
        this.schemaName = cls.getName();
        if (str.contains(",")) {
            this.client = new CachedSchemaRegistryClient(Arrays.asList(str.split(",")), DEFAULT_IDENTITY_MAP_CAPACITY);
        } else {
            this.client = new CachedSchemaRegistryClient(str, DEFAULT_IDENTITY_MAP_CAPACITY);
        }
        try {
            this.schemaId = this.client.register(cls.getName(), this.schema);
            this.serializer = new KafkaAvroSerializer(this.client);
            this.encoder = new RawMessageEncoder<>(ReflectData.get(), this.schema);
        } catch (RestClientException | IOException e) {
            throw new StreamRuntimeException(e);
        }
    }

    @Override // org.nuxeo.lib.stream.codec.Codec
    public String getName() {
        return NAME;
    }

    @Override // org.nuxeo.lib.stream.codec.Codec
    public byte[] encode(T t) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.write(MAGIC_BYTE);
        try {
            byteArrayOutputStream.write(ByteBuffer.allocate(ID_SIZE).putInt(this.schemaId).array());
            byteArrayOutputStream.write(this.encoder.encode(t).array());
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new StreamRuntimeException(e);
        }
    }

    @Override // org.nuxeo.lib.stream.codec.Codec
    public T decode(byte[] bArr) {
        Schema schema;
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        if (wrap.get() != 0) {
            throw new IllegalArgumentException("Invalid Avro Confluent message, expecting magic byte");
        }
        int i = wrap.getInt();
        try {
            schema = this.client.getById(i);
        } catch (IOException e) {
            throw new StreamRuntimeException("Cannot retrieve write schema id: " + i + " on " + this.messageClass, e);
        } catch (RestClientException e2) {
            if (e2.getStatus() != 404) {
                throw new StreamRuntimeException("Cannot retrieve write schema id: " + i + " on " + this.messageClass, e2);
            }
            if (log.isWarnEnabled()) {
                log.warn(String.format("Cannot retrieve write schema %d, fallback to read schema: %d for %s", Integer.valueOf(i), Integer.valueOf(this.schemaId), this.messageClass));
            }
            schema = this.schema;
        }
        try {
            return (T) new RawMessageDecoder(ReflectData.get(), schema, this.schema).decode(wrap.slice(), (Object) null);
        } catch (IOException | IndexOutOfBoundsException e3) {
            throw new IllegalArgumentException(e3);
        }
    }
}
