/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.pubsub;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.pubsub.PubSubService;
import org.nuxeo.runtime.pubsub.SerializableMessage;

public abstract class AbstractPubSubBroker<T extends SerializableMessage> {
    private static final Log log = LogFactory.getLog(AbstractPubSubBroker.class);
    private static final String UTF_8 = "UTF-8";
    protected String topic;
    protected byte[] discriminatorBytes;
    protected static final byte DISCRIMINATOR_SEP = 58;

    public abstract T deserialize(InputStream var1) throws IOException;

    public void initialize(String topic, String discriminator) {
        this.topic = topic;
        try {
            this.discriminatorBytes = discriminator.getBytes(UTF_8);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
        for (byte b : this.discriminatorBytes) {
            if (b != 58) continue;
            throw new IllegalArgumentException("Invalid discriminator, must not contains separator ':': " + discriminator);
        }
        PubSubService pubSubService = (PubSubService)Framework.getService(PubSubService.class);
        pubSubService.registerSubscriber(topic, this::subscriber);
    }

    public void close() {
        PubSubService pubSubService = (PubSubService)Framework.getService(PubSubService.class);
        pubSubService.unregisterSubscriber(this.topic, this::subscriber);
    }

    public void sendMessage(T message) {
        if (log.isTraceEnabled()) {
            log.trace((Object)("Sending message: " + message));
        }
        ByteArrayOutputStream baout = new ByteArrayOutputStream();
        try {
            baout.write(this.discriminatorBytes);
        }
        catch (IOException e) {
            return;
        }
        baout.write(58);
        try {
            message.serialize(baout);
        }
        catch (IOException e) {
            log.error((Object)"Failed to serialize message", (Throwable)e);
            return;
        }
        byte[] bytes = baout.toByteArray();
        PubSubService pubSubService = (PubSubService)Framework.getService(PubSubService.class);
        pubSubService.publish(this.topic, bytes);
    }

    protected void subscriber(String topic, byte[] bytes) {
        T message;
        int start = this.scanDiscriminator(bytes);
        if (start == -1) {
            return;
        }
        ByteArrayInputStream bain = new ByteArrayInputStream(bytes, start, bytes.length - start);
        try {
            message = this.deserialize(bain);
        }
        catch (IOException e) {
            log.error((Object)"Failed to deserialize message", (Throwable)e);
            return;
        }
        if (message == null) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace((Object)("Received message: " + message));
        }
        this.receivedMessage(message);
    }

    public abstract void receivedMessage(T var1);

    protected int scanDiscriminator(byte[] message) {
        if (message == null) {
            return -1;
        }
        int start = -1;
        boolean differ = false;
        for (int i = 0; i < message.length; ++i) {
            byte b = message[i];
            if (b == 58) {
                differ = differ || this.discriminatorBytes.length > i;
                start = i + 1;
                break;
            }
            if (differ) continue;
            if (i == this.discriminatorBytes.length) {
                differ = true;
                continue;
            }
            if (b == this.discriminatorBytes[i]) continue;
            differ = true;
        }
        if (!differ) {
            return -1;
        }
        return start;
    }
}

