package org.nuxeo.ecm.core.redis.contribs;

import java.io.IOException;
import java.time.LocalDateTime;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.redis.RedisAdmin;
import org.nuxeo.ecm.core.redis.RedisExecutor;
import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
import org.nuxeo.ecm.core.storage.sql.Invalidations;
import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
import org.nuxeo.runtime.api.Framework;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Pipeline;

/* loaded from: input_file:org/nuxeo/ecm/core/redis/contribs/RedisClusterInvalidator.class */
public class RedisClusterInvalidator implements ClusterInvalidator {
    protected static final String PREFIX = "inval";
    protected static final String INVALIDATION_CHANNEL = "channel";
    protected static final String CLUSTER_NODES_KEY = "nodes";
    protected static final int TIMEOUT_REGISTER_SECOND = 86400;
    protected static final String STARTED_KEY = "started";
    protected static final String LAST_INVAL_KEY = "lastInvalSent";
    protected String nodeId;
    protected String repositoryName;
    protected RedisExecutor redisExecutor;
    protected Invalidations receivedInvals;
    protected Thread subscriberThread;
    protected String namespace;
    protected String startedDateTime;
    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);

    public void initialize(String str, RepositoryImpl repositoryImpl) {
        this.nodeId = str;
        this.repositoryName = repositoryImpl.getName();
        this.redisExecutor = (RedisExecutor) Framework.getLocalService(RedisExecutor.class);
        this.namespace = ((RedisAdmin) Framework.getService(RedisAdmin.class)).namespace(PREFIX, this.repositoryName);
        this.receivedInvals = new Invalidations();
        createSubscriberThread();
        registerNode();
    }

    protected void createSubscriberThread() {
        this.subscriberThread = new Thread(this::subscribeToInvalidationChannel, "RedisClusterInvalidatorSubscriber:" + this.repositoryName + ":" + this.nodeId);
        this.subscriberThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("Uncaught error on thread " + thread.getName(), th);
        });
        this.subscriberThread.setPriority(5);
        this.subscriberThread.start();
    }

    protected void subscribeToInvalidationChannel() {
        log.info("Subscribe to channel: " + getChannelName());
        this.redisExecutor.execute(jedis -> {
            jedis.subscribe(new JedisPubSub() { // from class: org.nuxeo.ecm.core.redis.contribs.RedisClusterInvalidator.1
                public void onMessage(String str, String str2) {
                    try {
                        RedisInvalidations redisInvalidations = new RedisInvalidations(RedisClusterInvalidator.this.nodeId, str2);
                        if (RedisClusterInvalidator.log.isTraceEnabled()) {
                            RedisClusterInvalidator.log.trace("Receive invalidations: " + redisInvalidations);
                        }
                        Invalidations invalidations = redisInvalidations.getInvalidations();
                        synchronized (RedisClusterInvalidator.this) {
                            RedisClusterInvalidator.this.receivedInvals.add(invalidations);
                        }
                    } catch (IllegalArgumentException e) {
                        RedisClusterInvalidator.log.error("Fail to read message: " + str2, e);
                    }
                }
            }, new String[]{getChannelName()});
            return null;
        });
    }

    protected String getChannelName() {
        return this.namespace + INVALIDATION_CHANNEL;
    }

    protected void registerNode() {
        this.startedDateTime = getCurrentDateTime();
        log.info("Registering node: " + this.nodeId);
        this.redisExecutor.execute(jedis -> {
            String nodeKey = getNodeKey();
            Pipeline pipelined = jedis.pipelined();
            pipelined.hset(nodeKey, STARTED_KEY, this.startedDateTime);
            pipelined.expire(nodeKey, TIMEOUT_REGISTER_SECOND);
            pipelined.sync();
            return null;
        });
    }

    protected String getNodeKey() {
        return this.namespace + CLUSTER_NODES_KEY + ":" + this.nodeId;
    }

    public void close() {
        log.debug("Closing");
        unsubscribeToInvalidationChannel();
        this.receivedInvals.clear();
    }

    protected void unsubscribeToInvalidationChannel() {
        this.subscriberThread.interrupt();
    }

    public Invalidations receiveInvalidations() {
        Invalidations invalidations;
        Invalidations invalidations2 = new Invalidations();
        synchronized (this) {
            invalidations = this.receivedInvals;
            this.receivedInvals = invalidations2;
        }
        return invalidations;
    }

    public void sendInvalidations(Invalidations invalidations) {
        this.redisExecutor.execute(jedis -> {
            RedisInvalidations redisInvalidations = new RedisInvalidations(this.nodeId, invalidations);
            if (log.isTraceEnabled()) {
                log.trace("Sending invalidations: " + redisInvalidations);
            }
            String nodeKey = getNodeKey();
            try {
                Pipeline pipelined = jedis.pipelined();
                pipelined.publish(getChannelName(), redisInvalidations.serialize());
                pipelined.hset(nodeKey, STARTED_KEY, this.startedDateTime);
                pipelined.hset(nodeKey, LAST_INVAL_KEY, getCurrentDateTime());
                pipelined.expire(nodeKey, TIMEOUT_REGISTER_SECOND);
                pipelined.sync();
                return null;
            } catch (IOException e) {
                throw new NuxeoException(e);
            }
        });
    }

    protected String getCurrentDateTime() {
        return LocalDateTime.now().toString();
    }
}
