package org.nuxeo.ecm.core.redis.contribs;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.redis.RedisAdmin;
import org.nuxeo.ecm.core.redis.RedisExecutor;
import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
import org.nuxeo.ecm.core.storage.sql.Invalidations;
import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
import org.nuxeo.runtime.api.Framework;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:org/nuxeo/ecm/core/redis/contribs/RedisClusterInvalidator.class */
public class RedisClusterInvalidator implements ClusterInvalidator {
    protected static final String PREFIX = "inval";
    protected static final String INVALIDATION_CHANNEL = "channel";
    protected static final String CLUSTER_NODES_KEY = "nodes";
    protected static final int TIMEOUT_REGISTER_SECOND = 86400;
    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
    protected static final String STARTED_FIELD = "started";
    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
    protected String nodeId;
    protected String repositoryName;
    protected RedisExecutor redisExecutor;
    protected Invalidations receivedInvals;
    protected Thread subscriberThread;
    protected String namespace;
    protected String startedDateTime;
    private static final Log log = LogFactory.getLog(RedisClusterInvalidator.class);
    private CountDownLatch subscribeLatch;
    private String registerSha;
    private String sendSha;

    public void initialize(String str, RepositoryImpl repositoryImpl) {
        this.nodeId = str;
        this.repositoryName = repositoryImpl.getName();
        this.redisExecutor = (RedisExecutor) Framework.getLocalService(RedisExecutor.class);
        RedisAdmin redisAdmin = (RedisAdmin) Framework.getService(RedisAdmin.class);
        this.namespace = redisAdmin.namespace(PREFIX, this.repositoryName);
        try {
            this.registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
            this.sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
            this.receivedInvals = new Invalidations();
            createSubscriberThread();
            registerNode();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected void createSubscriberThread() {
        this.subscribeLatch = new CountDownLatch(1);
        this.subscriberThread = new Thread(this::subscribeToInvalidationChannel, "RedisClusterInvalidatorSubscriber:" + this.repositoryName + ":" + this.nodeId);
        this.subscriberThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("Uncaught error on thread " + thread.getName(), th);
        });
        this.subscriberThread.setPriority(5);
        this.subscriberThread.start();
        try {
            if (!this.subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
                log.error("Redis channel subscripion timeout after 10s, continuing but this node may not receive cluster invalidations");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    protected void subscribeToInvalidationChannel() {
        log.info("Subscribing to channel: " + getChannelName());
        this.redisExecutor.execute(jedis -> {
            jedis.subscribe(new JedisPubSub() { // from class: org.nuxeo.ecm.core.redis.contribs.RedisClusterInvalidator.1
                public void onSubscribe(String str, int i) {
                    super.onSubscribe(str, i);
                    if (RedisClusterInvalidator.this.subscribeLatch != null) {
                        RedisClusterInvalidator.this.subscribeLatch.countDown();
                    }
                    RedisClusterInvalidator.log.debug("Subscribed to channel: " + RedisClusterInvalidator.this.getChannelName());
                }

                public void onMessage(String str, String str2) {
                    try {
                        RedisInvalidations redisInvalidations = new RedisInvalidations(RedisClusterInvalidator.this.nodeId, str2);
                        if (RedisClusterInvalidator.log.isTraceEnabled()) {
                            RedisClusterInvalidator.log.trace("Receive invalidations: " + redisInvalidations);
                        }
                        Invalidations invalidations = redisInvalidations.getInvalidations();
                        synchronized (RedisClusterInvalidator.this) {
                            RedisClusterInvalidator.this.receivedInvals.add(invalidations);
                        }
                    } catch (IllegalArgumentException e) {
                        RedisClusterInvalidator.log.error("Fail to read message: " + str2, e);
                    }
                }
            }, new String[]{getChannelName()});
            return null;
        });
    }

    protected String getChannelName() {
        return this.namespace + INVALIDATION_CHANNEL;
    }

    protected void registerNode() {
        String currentDateTime = getCurrentDateTime();
        List asList = Arrays.asList(getNodeKey());
        List asList2 = Arrays.asList(STARTED_FIELD, currentDateTime, Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
        log.debug("Registering node: " + this.nodeId);
        this.redisExecutor.execute(jedis -> {
            jedis.evalsha(this.registerSha, asList, asList2);
            log.info("Node registered: " + this.nodeId);
            return null;
        });
    }

    protected String getNodeKey() {
        return this.namespace + CLUSTER_NODES_KEY + ":" + this.nodeId;
    }

    public void close() {
        log.debug("Closing");
        unsubscribeToInvalidationChannel();
        this.receivedInvals.clear();
    }

    protected void unsubscribeToInvalidationChannel() {
        this.subscriberThread.interrupt();
    }

    public Invalidations receiveInvalidations() {
        Invalidations invalidations;
        Invalidations invalidations2 = new Invalidations();
        synchronized (this) {
            invalidations = this.receivedInvals;
            this.receivedInvals = invalidations2;
        }
        return invalidations;
    }

    public void sendInvalidations(Invalidations invalidations) {
        String currentDateTime = getCurrentDateTime();
        RedisInvalidations redisInvalidations = new RedisInvalidations(this.nodeId, invalidations);
        if (log.isTraceEnabled()) {
            log.trace("Sending invalidations: " + redisInvalidations);
        }
        List asList = Arrays.asList(getChannelName(), getNodeKey());
        try {
            List asList2 = Arrays.asList(redisInvalidations.serialize(), STARTED_FIELD, currentDateTime, LAST_INVAL_FIELD, getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
            this.redisExecutor.execute(jedis -> {
                jedis.evalsha(this.sendSha, asList, asList2);
                if (!log.isTraceEnabled()) {
                    return null;
                }
                log.trace("invals sent");
                return null;
            });
        } catch (IOException e) {
            throw new NuxeoException(e);
        }
    }

    protected String getCurrentDateTime() {
        return LocalDateTime.now().toString();
    }
}
