package org.nuxeo.ecm.core.redis.contribs;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.model.Repository;
import org.nuxeo.ecm.core.redis.RedisAdmin;
import org.nuxeo.ecm.core.redis.RedisExecutor;
import org.nuxeo.ecm.core.storage.dbs.DBSClusterInvalidator;
import org.nuxeo.ecm.core.storage.dbs.DBSInvalidations;
import org.nuxeo.runtime.api.Framework;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:org/nuxeo/ecm/core/redis/contribs/RedisDBSClusterInvalidator.class */
public class RedisDBSClusterInvalidator implements DBSClusterInvalidator {
    protected static final String PREFIX = "inval";
    protected static final String INVALIDATION_CHANNEL = "channel";
    protected static final String CLUSTER_NODES_KEY = "nodes";
    protected static final int TIMEOUT_REGISTER_SECOND = 86400;
    protected static final long TIMEOUT_SUBSCRIBE_SECOND = 10;
    protected static final String STARTED_FIELD = "started";
    protected static final String LAST_INVAL_FIELD = "lastInvalSent";
    protected String nodeId;
    protected String repositoryName;
    protected RedisExecutor redisExecutor;
    protected DBSInvalidations receivedInvals;
    protected Thread subscriberThread;
    protected String namespace;
    protected String startedDateTime;
    private static final Log log = LogFactory.getLog(RedisDBSClusterInvalidator.class);
    private CountDownLatch subscribeLatch;
    private String registerSha;
    private String sendSha;

    public void initialize(String str, Repository repository) {
        this.nodeId = str;
        this.repositoryName = repository.getName();
        this.redisExecutor = (RedisExecutor) Framework.getService(RedisExecutor.class);
        RedisAdmin redisAdmin = (RedisAdmin) Framework.getService(RedisAdmin.class);
        this.namespace = redisAdmin.namespace(PREFIX, this.repositoryName);
        try {
            this.registerSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "register-node-inval");
            this.sendSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "send-inval");
            this.receivedInvals = new DBSInvalidations();
            createSubscriberThread();
            registerNode();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected void createSubscriberThread() {
        this.subscribeLatch = new CountDownLatch(1);
        this.subscriberThread = new Thread(this::subscribeToInvalidationChannel, "RedisDBSClusterInvalidatorSubscriber:" + this.repositoryName + ":" + this.nodeId);
        this.subscriberThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("Uncaught error on thread " + thread.getName(), th);
        });
        this.subscriberThread.setPriority(5);
        this.subscriberThread.start();
        try {
            if (!this.subscribeLatch.await(TIMEOUT_SUBSCRIBE_SECOND, TimeUnit.SECONDS)) {
                log.error("Redis channel subscripion timeout after 10s, continuing but this node may not receive cluster invalidations");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    protected void subscribeToInvalidationChannel() {
        log.info("Subscribing to channel: " + getChannelName());
        this.redisExecutor.execute(jedis -> {
            jedis.subscribe(new JedisPubSub() { // from class: org.nuxeo.ecm.core.redis.contribs.RedisDBSClusterInvalidator.1
                public void onSubscribe(String str, int i) {
                    super.onSubscribe(str, i);
                    if (RedisDBSClusterInvalidator.this.subscribeLatch != null) {
                        RedisDBSClusterInvalidator.this.subscribeLatch.countDown();
                    }
                    if (RedisDBSClusterInvalidator.log.isDebugEnabled()) {
                        RedisDBSClusterInvalidator.log.debug("Subscribed to channel: " + RedisDBSClusterInvalidator.this.getChannelName());
                    }
                }

                public void onMessage(String str, String str2) {
                    try {
                        RedisDBSInvalidations redisDBSInvalidations = new RedisDBSInvalidations(RedisDBSClusterInvalidator.this.nodeId, str2);
                        if (RedisDBSClusterInvalidator.log.isTraceEnabled()) {
                            RedisDBSClusterInvalidator.log.trace("Receive invalidations: " + redisDBSInvalidations);
                        }
                        DBSInvalidations invalidations = redisDBSInvalidations.getInvalidations();
                        synchronized (RedisDBSClusterInvalidator.this) {
                            RedisDBSClusterInvalidator.this.receivedInvals.add(invalidations);
                        }
                    } catch (IllegalArgumentException e) {
                        RedisDBSClusterInvalidator.log.error("Fail to read message: " + str2, e);
                    }
                }
            }, new String[]{getChannelName()});
            return null;
        });
    }

    protected String getChannelName() {
        return this.namespace + INVALIDATION_CHANNEL;
    }

    protected void registerNode() {
        this.startedDateTime = getCurrentDateTime();
        List<String> singletonList = Collections.singletonList(getNodeKey());
        List<String> asList = Arrays.asList(STARTED_FIELD, this.startedDateTime, Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString());
        if (log.isDebugEnabled()) {
            log.debug("Registering node: " + this.nodeId);
        }
        this.redisExecutor.evalsha(this.registerSha, singletonList, asList);
        if (log.isInfoEnabled()) {
            log.info("Node registered: " + this.nodeId);
        }
    }

    protected String getNodeKey() {
        return this.namespace + CLUSTER_NODES_KEY + ":" + this.nodeId;
    }

    public void close() {
        log.debug("Closing");
        unsubscribeToInvalidationChannel();
        this.receivedInvals.clear();
    }

    protected void unsubscribeToInvalidationChannel() {
        this.subscriberThread.interrupt();
    }

    public DBSInvalidations receiveInvalidations() {
        DBSInvalidations dBSInvalidations;
        DBSInvalidations dBSInvalidations2 = new DBSInvalidations();
        synchronized (this) {
            dBSInvalidations = this.receivedInvals;
            this.receivedInvals = dBSInvalidations2;
        }
        return dBSInvalidations;
    }

    public void sendInvalidations(DBSInvalidations dBSInvalidations) {
        RedisDBSInvalidations redisDBSInvalidations = new RedisDBSInvalidations(this.nodeId, dBSInvalidations);
        if (log.isTraceEnabled()) {
            log.trace("Sending invalidations: " + redisDBSInvalidations);
        }
        this.redisExecutor.evalsha(this.sendSha, Arrays.asList(getChannelName(), getNodeKey()), Arrays.asList(redisDBSInvalidations.serialize(), STARTED_FIELD, this.startedDateTime, LAST_INVAL_FIELD, getCurrentDateTime(), Integer.valueOf(TIMEOUT_REGISTER_SECOND).toString()));
        log.trace("invals sent");
    }

    protected String getCurrentDateTime() {
        return LocalDateTime.now().toString();
    }
}
