package org.nuxeo.ecm.core.redis.contribs;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.redis.RedisAdmin;
import org.nuxeo.ecm.core.redis.RedisExecutor;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.pubsub.AbstractPubSubProvider;
import redis.clients.jedis.Client;
import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.SafeEncoder;

/* loaded from: input_file:org/nuxeo/ecm/core/redis/contribs/RedisPubSubProvider.class */
public class RedisPubSubProvider extends AbstractPubSubProvider {
    static final Log log = LogFactory.getLog(RedisPubSubProvider.class);
    public static final long TIMEOUT_SUBSCRIBE_SECONDS = 5;
    protected Dispatcher dispatcher;
    protected Thread thread;

    /* loaded from: input_file:org/nuxeo/ecm/core/redis/contribs/RedisPubSubProvider$Dispatcher.class */
    public class Dispatcher extends JedisPubSub {
        protected final String pattern;
        protected volatile boolean stop;
        protected RedisExecutor redisExecutor = (RedisExecutor) Framework.getService(RedisExecutor.class);
        protected final CountDownLatch subscribedLatch = new CountDownLatch(1);

        public Dispatcher(String str) {
            this.pattern = str;
        }

        public boolean awaitSubscribed(long j, TimeUnit timeUnit) {
            try {
                return this.subscribedLatch.await(j, timeUnit);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new NuxeoException(e);
            }
        }

        public void run() {
            RedisPubSubProvider.log.debug("Subscribing to: " + this.pattern);
            RedisExecutor redisExecutor = this.redisExecutor;
            this.redisExecutor = null;
            redisExecutor.psubscribe(this, new String[]{this.pattern});
        }

        public void close() {
            this.stop = true;
            RedisPubSubProvider.this.publish("", new byte[0]);
        }

        public void onPSubscribe(String str, int i) {
            this.subscribedLatch.countDown();
            if (RedisPubSubProvider.log.isDebugEnabled()) {
                RedisPubSubProvider.log.debug("Subscribed to: " + str);
            }
        }

        public void onMessage(String str, byte[] bArr) {
            if (bArr == null) {
                bArr = new byte[0];
            }
            if (RedisPubSubProvider.log.isTraceEnabled()) {
                RedisPubSubProvider.log.trace("Message received from channel: " + str + " (" + bArr.length + " bytes)");
            }
            RedisPubSubProvider.this.localPublish(str.substring(RedisPubSubProvider.this.namespace.length()), bArr);
        }

        public void onPMessage(String str, String str2, byte[] bArr) {
            onMessage(str2, bArr);
        }

        public void proceed(Client client, String... strArr) {
            client.subscribe(strArr);
            flush(client);
            processBinary(client);
        }

        public void proceedWithPatterns(Client client, String... strArr) {
            client.psubscribe(strArr);
            flush(client);
            processBinary(client);
        }

        protected void flush(Client client) {
            try {
                Method declaredMethod = Connection.class.getDeclaredMethod("flush", new Class[0]);
                declaredMethod.setAccessible(true);
                declaredMethod.invoke(client, new Object[0]);
            } catch (ReflectiveOperationException e) {
                throw new NuxeoException(e);
            }
        }

        protected void processBinary(Client client) {
            while (true) {
                List rawObjectMultiBulkReply = client.getRawObjectMultiBulkReply();
                if (this.stop) {
                    return;
                }
                Object obj = rawObjectMultiBulkReply.get(0);
                if (!(obj instanceof byte[])) {
                    throw new JedisException("Unknown message type: " + obj);
                }
                byte[] bArr = (byte[]) obj;
                if (Arrays.equals(Protocol.Keyword.MESSAGE.raw, bArr)) {
                    onMessage(toString((byte[]) rawObjectMultiBulkReply.get(1)), (byte[]) rawObjectMultiBulkReply.get(2));
                } else if (Arrays.equals(Protocol.Keyword.PMESSAGE.raw, bArr)) {
                    onPMessage(toString((byte[]) rawObjectMultiBulkReply.get(1)), toString((byte[]) rawObjectMultiBulkReply.get(2)), (byte[]) rawObjectMultiBulkReply.get(3));
                } else if (Arrays.equals(Protocol.Keyword.SUBSCRIBE.raw, bArr)) {
                    onSubscribe(toString((byte[]) rawObjectMultiBulkReply.get(1)), 0);
                } else if (Arrays.equals(Protocol.Keyword.PSUBSCRIBE.raw, bArr)) {
                    onPSubscribe(toString((byte[]) rawObjectMultiBulkReply.get(1)), 0);
                } else if (Arrays.equals(Protocol.Keyword.UNSUBSCRIBE.raw, bArr)) {
                    onUnsubscribe(toString((byte[]) rawObjectMultiBulkReply.get(1)), 0);
                } else {
                    if (!Arrays.equals(Protocol.Keyword.PUNSUBSCRIBE.raw, bArr)) {
                        throw new JedisException("Unknown message: " + toString(bArr));
                    }
                    onPUnsubscribe(toString((byte[]) rawObjectMultiBulkReply.get(1)), 0);
                }
            }
        }

        protected String toString(byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            return SafeEncoder.encode(bArr);
        }
    }

    public void initialize(Map<String, String> map, Map<String, List<BiConsumer<String, byte[]>>> map2) {
        super.initialize(map, map2);
        log.debug("Initializing");
        this.namespace = ((RedisAdmin) Framework.getService(RedisAdmin.class)).namespace(new String[0]);
        this.dispatcher = new Dispatcher(this.namespace + "*");
        Dispatcher dispatcher = this.dispatcher;
        dispatcher.getClass();
        this.thread = new Thread(dispatcher::run, "Nuxeo-PubSub-Redis");
        this.thread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("Uncaught error on thread " + thread.getName(), th);
        });
        this.thread.setPriority(5);
        this.thread.setDaemon(true);
        this.thread.start();
        if (this.dispatcher.awaitSubscribed(5L, TimeUnit.SECONDS)) {
            log.debug("Initialized");
        } else {
            this.thread.interrupt();
            throw new NuxeoException("Failed to subscribe to Redis pubsub after 5s");
        }
    }

    public void close() {
        log.debug("Closing");
        if (this.dispatcher != null) {
            this.thread.interrupt();
            this.thread = null;
            this.dispatcher.close();
            this.dispatcher = null;
        }
        log.debug("Closed");
    }

    public void publish(String str, byte[] bArr) {
        byte[] encode = SafeEncoder.encode(this.namespace + str);
        RedisExecutor redisExecutor = (RedisExecutor) Framework.getService(RedisExecutor.class);
        if (redisExecutor != null) {
            redisExecutor.execute(jedis -> {
                return jedis.publish(encode, bArr);
            });
        }
    }
}
