package org.nuxeo.runtime.pubsub;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.pubsub.SerializableMessage;

/* loaded from: input_file:org/nuxeo/runtime/pubsub/AbstractPubSubBroker.class */
public abstract class AbstractPubSubBroker<T extends SerializableMessage> {
    private static final Log log = LogFactory.getLog(AbstractPubSubBroker.class);
    private static final String UTF_8 = "UTF-8";
    protected String topic;
    protected byte[] discriminatorBytes;
    protected static final byte DISCRIMINATOR_SEP = 58;

    public abstract T deserialize(InputStream inputStream) throws IOException;

    public void initialize(String str, String str2) {
        this.topic = str;
        try {
            this.discriminatorBytes = str2.getBytes(UTF_8);
            for (byte b : this.discriminatorBytes) {
                if (b == DISCRIMINATOR_SEP) {
                    throw new IllegalArgumentException("Invalid discriminator, must not contains separator ':': " + str2);
                }
            }
            ((PubSubService) Framework.getService(PubSubService.class)).registerSubscriber(str, this::subscriber);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public void close() {
        ((PubSubService) Framework.getService(PubSubService.class)).unregisterSubscriber(this.topic, this::subscriber);
    }

    public void sendMessage(T t) {
        if (log.isTraceEnabled()) {
            log.trace("Sending message: " + t);
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            byteArrayOutputStream.write(this.discriminatorBytes);
            byteArrayOutputStream.write(DISCRIMINATOR_SEP);
            try {
                t.serialize(byteArrayOutputStream);
                ((PubSubService) Framework.getService(PubSubService.class)).publish(this.topic, byteArrayOutputStream.toByteArray());
            } catch (IOException e) {
                log.error("Failed to serialize message", e);
            }
        } catch (IOException e2) {
        }
    }

    protected void subscriber(String str, byte[] bArr) {
        int scanDiscriminator = scanDiscriminator(bArr);
        if (scanDiscriminator == -1) {
            return;
        }
        try {
            T deserialize = deserialize(new ByteArrayInputStream(bArr, scanDiscriminator, bArr.length - scanDiscriminator));
            if (deserialize == null) {
                return;
            }
            if (log.isTraceEnabled()) {
                log.trace("Received message: " + deserialize);
            }
            receivedMessage(deserialize);
        } catch (IOException e) {
            log.error("Failed to deserialize message", e);
        }
    }

    public abstract void receivedMessage(T t);

    protected int scanDiscriminator(byte[] bArr) {
        if (bArr == null) {
            return -1;
        }
        int i = -1;
        boolean z = false;
        int i2 = 0;
        while (true) {
            if (i2 >= bArr.length) {
                break;
            }
            byte b = bArr[i2];
            if (b == DISCRIMINATOR_SEP) {
                z = z || this.discriminatorBytes.length > i2;
                i = i2 + 1;
            } else {
                if (!z) {
                    if (i2 == this.discriminatorBytes.length) {
                        z = true;
                    } else if (b != this.discriminatorBytes[i2]) {
                        z = true;
                    }
                }
                i2++;
            }
        }
        if (z) {
            return i;
        }
        return -1;
    }
}
