package com.atlassian.jira.cluster.distribution.localq;

import com.atlassian.annotations.ExperimentalApi;
import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.cluster.ClusterManager;
import com.atlassian.jira.cluster.ClusterNodes;
import com.atlassian.jira.cluster.Node;
import com.atlassian.jira.cluster.NodeChangedEvent;
import com.atlassian.jira.cluster.NodeJoinedClusterEvent;
import com.atlassian.jira.cluster.NodeRemovedFromClusterEvent;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpQueue;
import com.atlassian.jira.cluster.distribution.localq.LocalQCacheOpQueueWithStats;
import com.atlassian.jira.cluster.distribution.localq.tape.TapeSerializationContext;
import com.atlassian.plugin.event.events.PluginFrameworkShutdownEvent;
import com.atlassian.plugin.event.events.PluginFrameworkStartingEvent;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/jira/cluster/distribution/localq/LocalQCacheManager.class */
public class LocalQCacheManager {
    private final ClusterNodes clusterNodes;
    private final Supplier<Set<Node>> liveNodesSupplier;
    private final LocalQCacheOpSender localQCacheOpSender;
    private final LocalQCacheOpQueueFactory localQCacheOpQueueFactory;
    private final LocalQCriticalHandler criticalHandler;
    private final EventPublisher eventPublisher;
    private static final Logger LOG = LoggerFactory.getLogger(LocalQCacheManager.class);
    private static ImmutableSet<Status> STATUS_SHUTTING_DOWN = ImmutableSet.of(Status.STOPPING, Status.STOPPED);
    private final Set<Node.NodeState> nodeStatesWithActiveQueues = ImmutableSet.of(Node.NodeState.ACTIVATING, Node.NodeState.ACTIVE);
    private final ConcurrentHashMap<LocalQCacheOpQueue.QueueId, LocalQCacheOpQueueWithStats> queuesByQueueId = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<LocalQCacheOpQueue.QueueId, Future<?>> queueReaderByQueueId = new ConcurrentHashMap<>();
    private final ExecutorService executorReaders = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("localq-reader-%d").build());
    private final ScheduledExecutorService executorStats = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("localq-stats-%d").build());
    private final AtomicReference<Status> status = new AtomicReference<>(Status.CREATED);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/atlassian/jira/cluster/distribution/localq/LocalQCacheManager$Status.class */
    public enum Status {
        CREATED,
        STARTING,
        STARTED,
        STOPPING,
        STOPPED
    }

    public LocalQCacheManager(ClusterNodes clusterNodes, ClusterManager clusterManager, LocalQCacheOpSender localQCacheOpSender, LocalQCacheOpQueueFactory localQCacheOpQueueFactory, LocalQCriticalHandlerFactory localQCriticalHandlerFactory, EventPublisher eventPublisher) {
        this.clusterNodes = clusterNodes;
        this.localQCacheOpSender = localQCacheOpSender;
        this.localQCacheOpQueueFactory = localQCacheOpQueueFactory;
        this.criticalHandler = localQCriticalHandlerFactory.create();
        this.eventPublisher = eventPublisher;
        this.liveNodesSupplier = () -> {
            return ImmutableSet.copyOf(clusterManager.findLiveNodes());
        };
        this.eventPublisher.register(this);
    }

    @EventListener
    public void onPluginFrameworkShutdownEvent(PluginFrameworkShutdownEvent pluginFrameworkShutdownEvent) {
        stop();
    }

    @EventListener
    public void onPluginFrameworkStartingEvent(PluginFrameworkStartingEvent pluginFrameworkStartingEvent) {
        start();
    }

    public void start() {
        this.status.set(Status.STARTING);
        LOG.info(LogPrefix.prefix() + "Starting {}...", LocalQCacheManager.class.getSimpleName());
        init();
        LOG.info(LogPrefix.prefix() + "Done starting {}.", LocalQCacheManager.class.getSimpleName());
        this.status.set(Status.STARTED);
        logQStats("onStart");
    }

    public void stop() {
        logQStats("onStop");
        this.status.set(Status.STOPPING);
        LOG.info(LogPrefix.prefix() + "Stopping {}...", LocalQCacheManager.class.getSimpleName());
        destroy();
        LOG.info(LogPrefix.prefix() + "Done stopping {}.", LocalQCacheManager.class.getSimpleName());
        this.status.set(Status.STOPPED);
    }

    void init() {
        if (!this.clusterNodes.current().isClustered()) {
            LOG.warn(LogPrefix.prefix() + "This node is not a cluster node. Not initializing: {}", LocalQCacheManager.class.getSimpleName());
            return;
        }
        Iterator<Node> it = getNodesWhichShouldHaveActiveQueues(false).iterator();
        while (it.hasNext()) {
            initOrGetAllQueues(it.next());
        }
        if (this.queuesByQueueId.isEmpty()) {
            LOG.info(LogPrefix.prefix() + "Currently there are no other nodes in cluster. Not creating any cache replication queues.");
        }
        long statsLoggingIntervalSeconds = LocalQConfig.statsLoggingIntervalSeconds();
        LOG.info(LogPrefix.prefix() + "stats will be running every: {} seconds", Long.valueOf(statsLoggingIntervalSeconds));
        this.executorStats.scheduleAtFixedRate(() -> {
            logQStats("scheduled");
        }, statsLoggingIntervalSeconds, statsLoggingIntervalSeconds, TimeUnit.SECONDS);
    }

    @ExperimentalApi
    public Set<LocalQCacheOpQueueWithStats.QueueStats> queueStatsTotal() {
        return (Set) this.queuesByQueueId.values().stream().map((v0) -> {
            return v0.statsTotal();
        }).collect(Collectors.toSet());
    }

    @ExperimentalApi
    public Set<LocalQCacheOpQueueWithStats.QueueStats> queueStatsTotalMerged() {
        Map map = (Map) queueStatsTotal().stream().collect(Collectors.groupingBy(queueStats -> {
            return queueStats.nodeId;
        }));
        return new HashSet(((Map) map.keySet().stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return LocalQCacheOpQueueWithStats.QueueStats.merge(str2, (List) map.get(str2));
        }))).values());
    }

    private synchronized void logQStats(String str) {
        try {
            ImmutableSet copyOf = ImmutableSet.copyOf(this.queuesByQueueId.values());
            LOG.info(LogPrefix.prefix() + "[{}] Running cache replication queue stats for: {} queues...", str, Integer.valueOf(this.queuesByQueueId.size()));
            LocalQStatsUtil.logStats(LOG, copyOf, false);
            LocalQStatsUtil.logStats(LOG, copyOf, true);
            LOG.info(LogPrefix.prefix() + "[{}] ... done running cache replication queue stats for: {} queues.", str, Integer.valueOf(this.queuesByQueueId.size()));
        } catch (Throwable th) {
            LOG.error(LogPrefix.prefix() + "Error occurred in cache replication queue stats job: {}, error: {}", new Object[]{Thread.currentThread().getName(), th.getMessage(), th});
        }
    }

    void destroy() {
        this.eventPublisher.unregister(this);
        this.executorStats.shutdownNow();
        try {
            this.executorStats.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.executorReaders.shutdownNow();
        try {
            this.executorReaders.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        Iterator<LocalQCacheOpQueueWithStats> it = this.queuesByQueueId.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private Set<Node> getNodesWhichShouldHaveActiveQueues(boolean z) {
        return z ? (Set) ((Set) this.liveNodesSupplier.get()).stream().filter(nodeShouldHaveActiveQueue()).collect(Collectors.toSet()) : (Set) this.clusterNodes.all().stream().filter(nodeShouldHaveActiveQueue()).collect(Collectors.toSet());
    }

    private Predicate<Node> nodeShouldHaveActiveQueue() {
        return node -> {
            return node.isClustered() && !isNodeCurrentNode(node) && this.nodeStatesWithActiveQueues.contains(node.getState());
        };
    }

    private boolean isNodeCurrentNode(Node node) {
        return Objects.equals(this.clusterNodes.current().getNodeId(), node.getNodeId());
    }

    @EventListener
    public void onNodeChangedEvent(NodeChangedEvent nodeChangedEvent) {
        Node newState = nodeChangedEvent.getNewState();
        if (newState != null) {
            if (this.nodeStatesWithActiveQueues.contains(newState.getState())) {
                onNodeAdded(newState);
            } else {
                onNodeRemoved(newState);
            }
        }
    }

    @EventListener
    public void onNodeJoinedClusterEvent(NodeJoinedClusterEvent nodeJoinedClusterEvent) {
        onNodeAdded(nodeJoinedClusterEvent.getNode());
    }

    @EventListener
    public void onNodeRemovedFromClusterEvent(NodeRemovedFromClusterEvent nodeRemovedFromClusterEvent) {
        onNodeRemoved(nodeRemovedFromClusterEvent.getNode());
    }

    void onNodeAdded(Node node) {
        if (node == null || !node.isClustered() || node.getNodeId() == null || isNodeCurrentNode(node)) {
            return;
        }
        initOrGetAllQueues(node);
    }

    void onNodeRemoved(Node node) {
        if (node == null || node.getNodeId() == null || isNodeCurrentNode(node)) {
            return;
        }
        closeAllQueues(node.getNodeId());
    }

    private void validateClusteredNode(Node node) {
        Preconditions.checkNotNull(node);
        Preconditions.checkArgument(node.isClustered());
        Preconditions.checkNotNull(node.getNodeId());
    }

    private void forEachNodeQueueNumber(Consumer<Integer> consumer) {
        for (int i = 0; i < 10; i++) {
            consumer.accept(Integer.valueOf(i));
        }
    }

    private void initOrGetAllQueues(Node node) {
        forEachNodeQueueNumber(num -> {
            initOrGetQueue(node, num.intValue(), false);
        });
        forEachNodeQueueNumber(num2 -> {
            initOrGetQueue(node, num2.intValue(), true);
        });
    }

    private LocalQCacheOpQueueWithStats initOrGetQueueForCurrentThread(Node node, boolean z) {
        return initOrGetQueue(node, LocalQCacheOpQueue.nodeQueueNumberForCurrentThread(), z);
    }

    private LocalQCacheOpQueueWithStats initOrGetQueue(Node node, int i, boolean z) {
        validateClusteredNode(node);
        Preconditions.checkArgument(!isNodeCurrentNode(node), "Node cannot create cache replication queue for itself: {}", node.getNodeId());
        LocalQCacheOpQueue.QueueId create = LocalQCacheOpQueue.QueueId.create(node.getNodeId(), i, z);
        return this.queuesByQueueId.computeIfAbsent(create, queueId -> {
            try {
                LocalQCacheOpQueueWithStats localQCacheOpQueueWithStats = new LocalQCacheOpQueueWithStats(this.localQCacheOpQueueFactory.create(node, i, z));
                Future<?> submit = this.executorReaders.submit(LocalQCacheOpReader.create(localQCacheOpQueueWithStats, this.localQCacheOpSender, this.criticalHandler, this.clusterNodes, z));
                this.queueReaderByQueueId.put(create, submit);
                LOG.info(LogPrefix.prefix(z) + "Created cache replication queue: {} with queue reader running: {}", localQCacheOpQueueWithStats.name(), Boolean.valueOf(!submit.isDone()));
                return localQCacheOpQueueWithStats;
            } catch (IOException e) {
                LOG.error(LogPrefix.prefix(z) + "Error when creating cache replication queue: {} for node: {}. This node will be inconsistent. Error: {}", new Object[]{create, node.getNodeId(), e.getMessage(), e});
                return null;
            }
        });
    }

    private void closeAllQueues(String str) {
        forEachNodeQueueNumber(num -> {
            closeQueue(str, num.intValue(), false);
        });
        forEachNodeQueueNumber(num2 -> {
            closeQueue(str, num2.intValue(), true);
        });
    }

    private void closeQueue(String str, int i, boolean z) {
        if (str == null) {
            return;
        }
        LocalQCacheOpQueue.QueueId create = LocalQCacheOpQueue.QueueId.create(str, i, z);
        Future<?> future = this.queueReaderByQueueId.get(create);
        if (future != null) {
            LOG.info(LogPrefix.prefix(z) + "Closing cache replication queue sender: {}", create);
            future.cancel(true);
            this.queueReaderByQueueId.remove(create);
        }
        LocalQCacheOpQueueWithStats localQCacheOpQueueWithStats = this.queuesByQueueId.get(create);
        if (localQCacheOpQueueWithStats != null) {
            LOG.info(LogPrefix.prefix(z) + "Closing cache replication queue: {}", create);
            localQCacheOpQueueWithStats.close();
            this.queuesByQueueId.remove(create);
        }
    }

    private boolean isShuttingDown() {
        return STATUS_SHUTTING_DOWN.contains(this.status.get());
    }

    public int addToAllQueues(LocalQCacheOp localQCacheOp) {
        if (isShuttingDown()) {
            LOG.info(LogPrefix.prefix() + "Service: {} is in status: {}. Not replicating: {}", new Object[]{LocalQCacheManager.class.getSimpleName(), this.status.get(), localQCacheOp});
            return 0;
        }
        boolean isReplicatePutsViaCopy = localQCacheOp.isReplicatePutsViaCopy();
        Set<Node> nodesWhichShouldHaveActiveQueues = getNodesWhichShouldHaveActiveQueues(isReplicatePutsViaCopy);
        if (nodesWhichShouldHaveActiveQueues.size() == 0) {
            if (!LOG.isDebugEnabled()) {
                return 0;
            }
            LOG.debug(LogPrefix.prefix(isReplicatePutsViaCopy) + "No nodes in cluster to replicate: {}", localQCacheOp);
            return 0;
        }
        TapeSerializationContext create = TapeSerializationContext.create();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug(LogPrefix.prefix(isReplicatePutsViaCopy) + "About to add localQCacheOp: {} to cache replication queues: {}...", localQCacheOp, Integer.valueOf(nodesWhichShouldHaveActiveQueues.size()));
            }
            int i = 0;
            Iterator<Node> it = nodesWhichShouldHaveActiveQueues.iterator();
            while (it.hasNext()) {
                LocalQCacheOpQueueWithStats initOrGetQueueForCurrentThread = initOrGetQueueForCurrentThread(it.next(), isReplicatePutsViaCopy);
                if (initOrGetQueueForCurrentThread == null) {
                    LOG.warn(LogPrefix.prefix(isReplicatePutsViaCopy) + "Queue is null. Not replicating: {}", localQCacheOp);
                } else if (initOrGetQueueForCurrentThread.isClosed()) {
                    LOG.warn(LogPrefix.prefix(isReplicatePutsViaCopy) + "Queue: {} is closed. Not replicating: {}", initOrGetQueueForCurrentThread.name(), localQCacheOp);
                } else if (addToQueue(initOrGetQueueForCurrentThread, localQCacheOp)) {
                    i++;
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(LogPrefix.prefix(isReplicatePutsViaCopy) + "Done adding localQCacheOp: {} to cache replication queues: {}.", localQCacheOp, Integer.valueOf(i));
            }
            int i2 = i;
            if (create != null) {
                create.close();
            }
            return i2;
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean addToQueue(LocalQCacheOpQueueWithStats localQCacheOpQueueWithStats, LocalQCacheOp localQCacheOp) {
        try {
            return localQCacheOpQueueWithStats.add(localQCacheOp);
        } catch (Throwable th) {
            LOG.error(LogPrefix.prefix() + "Critical state of local cache replication queue - cannot add: {} to queue: {}, error: {}", new Object[]{localQCacheOp, localQCacheOpQueueWithStats.name(), th.getMessage(), th});
            localQCacheOpQueueWithStats.notifyCriticalAdd();
            this.criticalHandler.handleCriticalAdd(localQCacheOpQueueWithStats, localQCacheOp, th);
            return false;
        }
    }

    Set<LocalQCacheOpQueue.QueueId> getQueueIdsForExistingQueues() {
        return new HashSet(Collections.list(this.queuesByQueueId.keys()));
    }
}
