package com.atlassian.bamboo.cluster;

import com.atlassian.bamboo.beehive.BambooClusterNodeHeartbeatService;
import com.atlassian.bamboo.beehive.BambooNodeStatus;
import com.atlassian.bamboo.beehive.events.NodeBecameLiveEvent;
import com.atlassian.bamboo.beehive.events.NodeBecameOfflineEvent;
import com.atlassian.bamboo.build.pipeline.concurrent.SystemAuthorityForkJoinWorkerThreadFactory;
import com.atlassian.bamboo.cluster.event.CrossNodesEvent;
import com.atlassian.bamboo.cluster.event.MetadataInfo;
import com.atlassian.bamboo.cluster.grpc.GrpcChannelService;
import com.atlassian.bamboo.cluster.tape.PerNodeLocalQueue;
import com.atlassian.bamboo.cluster.tape.TapePerNodeLocalQueueCriticalHandler;
import com.atlassian.bamboo.cluster.tape.TapePerNodeLocalQueueCriticalHandlerFactory;
import com.atlassian.bamboo.cluster.tape.TapePerNodeLocalQueueStatsUtil;
import com.atlassian.bamboo.cluster.tape.TapePerNodeLocalQueueWithStats;
import com.atlassian.bamboo.cluster.tape.TapeSerializationContext;
import com.atlassian.bamboo.grpc.AtlassianCacheCrossNodesEventsServiceGrpc;
import com.atlassian.bamboo.grpc.BambooCrossNodesEventsServiceGrpc;
import com.atlassian.bamboo.grpc.PluginCrossNodesEventsServiceGrpc;
import com.atlassian.bamboo.utils.Pair;
import com.atlassian.bamboo.utils.files.DeleteIfNotWhiteListedFilVisitor;
import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import com.atlassian.plugin.event.events.PluginFrameworkShutdownEvent;
import com.atlassian.plugin.event.events.PluginFrameworkStartingEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ManagedChannel;
import io.grpc.stub.AbstractAsyncStub;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:com/atlassian/bamboo/cluster/PerNodeLocalQueueManager.class */
public class PerNodeLocalQueueManager {
    private static final Logger log = LogManager.getLogger(PerNodeLocalQueueManager.class);
    private final BambooClusterNodeHeartbeatService bambooClusterNodeHeartbeatService;
    private final PerNodeLocalQueueFactory perNodeLocalQueueFactory;
    private final TapePerNodeLocalQueueCriticalHandler perNodeLocalQueueCriticalHandler;
    private final EventPublisher eventPublisher;
    private final String thisNodeId;
    private final MetadataInfo thisNodeMetadataInfo;
    private final GrpcChannelService grpcChannelService;
    private final ConcurrentHashMap<PerNodeLocalQueue.QueueId, TapePerNodeLocalQueueWithStats> queuesByQueueId = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<PerNodeLocalQueue.QueueId, PerNodeLocalQueueDispatcher> queueDispatcherByQueueId = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<PerNodeLocalQueue.QueueId, Pair<ManagedChannel, Map<Class<? extends AbstractAsyncStub<?>>, ? extends AbstractAsyncStub<?>>>> channelsAndStubsByQueueId = new ConcurrentHashMap<>();
    private final List<BambooNodeStatus> bufferedNodesToRemove = Collections.synchronizedList(new ArrayList());
    private final ScheduledExecutorService executorStats = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("localq-stats-%d").build());
    private final AtomicReference<Executor> dispatchersExecutor = new AtomicReference<>(null);
    private final AtomicReference<Executor> grpcClientExecutor = new AtomicReference<>(null);
    private final AtomicReference<Status> status = new AtomicReference<>(Status.CREATED);

    /* loaded from: input_file:com/atlassian/bamboo/cluster/PerNodeLocalQueueManager$Status.class */
    public enum Status {
        CREATED,
        STARTING,
        STARTED,
        STOPPING,
        STOPPED
    }

    @Inject
    public PerNodeLocalQueueManager(BambooClusterNodeHeartbeatService bambooClusterNodeHeartbeatService, PerNodeLocalQueueFactory perNodeLocalQueueFactory, EventPublisher eventPublisher, TapePerNodeLocalQueueCriticalHandlerFactory tapePerNodeLocalQueueCriticalHandlerFactory, GrpcChannelService grpcChannelService) {
        this.bambooClusterNodeHeartbeatService = bambooClusterNodeHeartbeatService;
        this.perNodeLocalQueueFactory = perNodeLocalQueueFactory;
        this.eventPublisher = eventPublisher;
        this.thisNodeId = bambooClusterNodeHeartbeatService.getNodeId();
        this.thisNodeMetadataInfo = new MetadataInfo(this.thisNodeId);
        this.perNodeLocalQueueCriticalHandler = tapePerNodeLocalQueueCriticalHandlerFactory.create();
        this.grpcChannelService = grpcChannelService;
    }

    @PostConstruct
    public void registerEventPublisher() {
        this.eventPublisher.register(this);
    }

    @PreDestroy
    public void finalDestroy() {
        log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Permanently destroying {}", PerNodeLocalQueueManager.class.getSimpleName());
        this.eventPublisher.unregister(this);
    }

    @EventListener
    public void onPluginFrameworkStartingEvent(PluginFrameworkStartingEvent pluginFrameworkStartingEvent) {
        start();
    }

    @EventListener
    public void onPluginFrameworkShutdownEvent(PluginFrameworkShutdownEvent pluginFrameworkShutdownEvent) {
        stop();
    }

    @EventListener
    public void onNodeBecameLive(NodeBecameLiveEvent nodeBecameLiveEvent) {
        BambooNodeStatus node = nodeBecameLiveEvent.getNode();
        log.debug(PerNodeLocalQueueLogPrefix.prefix() + "onNodeBecameLive: {}", node);
        if (shouldConsiderNodeLivenessEvent(node)) {
            initOrGetAllQueues(node);
        }
    }

    @EventListener
    public void onNodeBecameOffline(NodeBecameOfflineEvent nodeBecameOfflineEvent) {
        BambooNodeStatus node = nodeBecameOfflineEvent.getNode();
        log.debug(PerNodeLocalQueueLogPrefix.prefix() + "onNodeBecameLive: {}", node);
        synchronized (this.bufferedNodesToRemove) {
            if (shouldConsiderNodeLivenessEvent(node)) {
                closeAllQueues(node);
            } else if (this.status.get() == Status.STARTING && !isNodeCurrentNode(node)) {
                this.bufferedNodesToRemove.add(nodeBecameOfflineEvent.getNode());
            }
        }
    }

    private boolean shouldConsiderNodeLivenessEvent(@NotNull BambooNodeStatus bambooNodeStatus) {
        return this.status.get() == Status.STARTED && !isNodeCurrentNode(bambooNodeStatus);
    }

    public synchronized void stop() {
        if (this.status.get() == Status.STOPPED) {
            log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Already stopped {}.", PerNodeLocalQueueManager.class.getSimpleName());
            return;
        }
        logQStats("onStop");
        this.status.set(Status.STOPPING);
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Stopping {}...", PerNodeLocalQueueManager.class.getSimpleName());
        destroy();
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Done stopping {}.", PerNodeLocalQueueManager.class.getSimpleName());
        this.status.set(Status.STOPPED);
    }

    void destroy() {
        Iterator<TapePerNodeLocalQueueWithStats> it = this.queuesByQueueId.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.queuesByQueueId.clear();
        this.queueDispatcherByQueueId.clear();
        this.channelsAndStubsByQueueId.forEach((queueId, pair) -> {
            shutdownChannel((ManagedChannel) pair.getFirst(), queueId);
        });
        this.channelsAndStubsByQueueId.clear();
        ((ExecutorService) this.grpcClientExecutor.getAndSet(null)).shutdownNow();
        ((ExecutorService) this.dispatchersExecutor.getAndSet(null)).shutdownNow();
    }

    public synchronized void start() {
        if (this.status.get() == Status.STARTED) {
            log.info(PerNodeLocalQueueLogPrefix.prefix() + "Already started {}.", PerNodeLocalQueueManager.class.getSimpleName());
            return;
        }
        this.status.set(Status.STARTING);
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Starting {}...", PerNodeLocalQueueManager.class.getSimpleName());
        init();
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Done starting {}.", PerNodeLocalQueueManager.class.getSimpleName());
        this.status.set(Status.STARTED);
        cleanupBufferedNodesToRemove();
        logQStats("onStart");
    }

    void init() {
        this.dispatchersExecutor.getAndUpdate(executor -> {
            return executor == null ? getDispatchersExecutorSupplier().get() : executor;
        });
        this.grpcClientExecutor.getAndUpdate(executor2 -> {
            return executor2 == null ? getGrpcClientExecutorSupplier().get() : executor2;
        });
        Iterator<BambooNodeStatus> it = getNodesWhichShouldHaveActiveQueues().iterator();
        while (it.hasNext()) {
            initOrGetAllQueues(it.next());
        }
        cleanupOutdatedQueues();
        long longValue = ((Long) Optional.of(Long.valueOf(BambooClusterSettings.PER_NODE_QUEUE_STATS_LOGGING_INTERVAL_MINUTES.getTypedValue())).filter(l -> {
            return l.longValue() > 0;
        }).orElse(10L)).longValue();
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Stats will be running every {} minutes", Long.valueOf(longValue));
        this.executorStats.scheduleAtFixedRate(() -> {
            logQStats("scheduled");
        }, longValue, longValue, TimeUnit.MINUTES);
    }

    @NotNull
    private static Supplier<Executor> getDispatchersExecutorSupplier() {
        return () -> {
            return new ForkJoinPool((int) BambooClusterSettingsImpl.PER_NODE_QUEUE_DISPATCHERS_THREADS_NUMBER.getTypedValue(), new SystemAuthorityForkJoinWorkerThreadFactory(), null, true);
        };
    }

    @NotNull
    private static Supplier<Executor> getGrpcClientExecutorSupplier() {
        return () -> {
            return new ForkJoinPool((int) BambooClusterSettingsImpl.CROSS_NODES_EVENTS_GRPC_CLIENT_THREADS_NUMBER.getTypedValue(), new SystemAuthorityForkJoinWorkerThreadFactory(), null, true);
        };
    }

    private void cleanupBufferedNodesToRemove() {
        synchronized (this.bufferedNodesToRemove) {
            this.bufferedNodesToRemove.forEach(this::closeAllQueues);
            this.bufferedNodesToRemove.clear();
        }
    }

    private synchronized void logQStats(String str) {
        try {
            ImmutableSet copyOf = ImmutableSet.copyOf(this.queuesByQueueId.values());
            if (!copyOf.isEmpty() || log.isDebugEnabled()) {
                log.info(PerNodeLocalQueueLogPrefix.prefix() + "[{}] Running replication queue stats for: {} queues...", str, Integer.valueOf(this.queuesByQueueId.size()));
                TapePerNodeLocalQueueStatsUtil.logStats(log, copyOf);
                log.info(PerNodeLocalQueueLogPrefix.prefix() + "[{}] ... done running replication queue stats for: {} queues.", str, Integer.valueOf(this.queuesByQueueId.size()));
            }
        } catch (Throwable th) {
            log.error(PerNodeLocalQueueLogPrefix.prefix() + "Error occurred in replication queue stats job: {}, error: {}", Thread.currentThread().getName(), th.getMessage(), th);
        }
    }

    @NotNull
    private List<BambooNodeStatus> getNodesWhichShouldHaveActiveQueues() {
        return (List) this.bambooClusterNodeHeartbeatService.findLiveNodesStatuses().stream().filter(bambooNodeStatus -> {
            return !isNodeCurrentNode(bambooNodeStatus);
        }).collect(Collectors.toList());
    }

    private void initOrGetAllQueues(@NotNull BambooNodeStatus bambooNodeStatus) {
        forEachNodeQueueNumber(num -> {
            initOrGetQueueAndItsDispatcher(bambooNodeStatus, num.intValue());
        });
    }

    private void closeAllQueues(@NotNull BambooNodeStatus bambooNodeStatus) {
        forEachNodeQueueNumber(num -> {
            closeQueue(bambooNodeStatus.getNodeId(), num.intValue());
        });
        cleanupOutdatedQueues();
    }

    private void closeQueue(String str, int i) {
        if (str == null) {
            return;
        }
        PerNodeLocalQueue.QueueId create = PerNodeLocalQueue.QueueId.create(str, i);
        TapePerNodeLocalQueueWithStats tapePerNodeLocalQueueWithStats = this.queuesByQueueId.get(create);
        if (tapePerNodeLocalQueueWithStats == null) {
            log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Queue {} is already closed.", create);
            return;
        }
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Closing cache replication queue: {}", create);
        tapePerNodeLocalQueueWithStats.close();
        this.queuesByQueueId.remove(create);
        this.queueDispatcherByQueueId.remove(create);
        Optional.ofNullable(this.channelsAndStubsByQueueId.get(create)).flatMap(pair -> {
            return Optional.ofNullable((ManagedChannel) pair.getFirst());
        }).ifPresent(managedChannel -> {
            shutdownChannel(managedChannel, create);
        });
        this.channelsAndStubsByQueueId.remove(create);
    }

    private void shutdownChannel(ManagedChannel managedChannel, PerNodeLocalQueue.QueueId queueId) {
        try {
            managedChannel.shutdown();
            if (!managedChannel.awaitTermination(1L, TimeUnit.SECONDS)) {
                log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Channel for queue {} hasn't been terminated for 1 second. Shutting down now.", queueId);
                managedChannel.shutdownNow();
            }
        } catch (Exception e) {
            log.error(PerNodeLocalQueueLogPrefix.prefix() + "Error while shutting down channel for queue: {}", queueId, e);
        }
    }

    private void forEachNodeQueueNumber(Consumer<Integer> consumer) {
        for (int i = 0; i < PerNodeLocalQueue.NUMBER_OF_PHYSICAL_QUEUES_PER_NODE; i++) {
            consumer.accept(Integer.valueOf(i));
        }
    }

    private void cleanupOutdatedQueues() {
        try {
            Files.walkFileTree(this.perNodeLocalQueueFactory.getOrCreateQueueHome().toPath(), EnumSet.noneOf(FileVisitOption.class), Integer.MAX_VALUE, new DeleteIfNotWhiteListedFilVisitor((Set) this.queuesByQueueId.values().stream().map((v0) -> {
                return v0.getQueueFilePath();
            }).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.toSet()), Optional.of(log)));
            log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Finished Removing outdated queues.");
        } catch (IOException e) {
            log.error(PerNodeLocalQueueLogPrefix.prefix() + "Could not cleanup outdated queues", e);
        }
    }

    @NotNull
    private Pair<PerNodeLocalQueue, PerNodeLocalQueueDispatcher> initOrGetQueueAndItsDispatcher(@NotNull BambooNodeStatus bambooNodeStatus, int i) {
        Preconditions.checkArgument(!isNodeCurrentNode(bambooNodeStatus), "Node cannot create replication queue for itself: {}", bambooNodeStatus.getNodeId());
        PerNodeLocalQueue.QueueId create = PerNodeLocalQueue.QueueId.create(bambooNodeStatus.getNodeId(), i);
        this.queuesByQueueId.computeIfAbsent(create, queueId -> {
            try {
                TapePerNodeLocalQueueWithStats tapePerNodeLocalQueueWithStats = new TapePerNodeLocalQueueWithStats(this.perNodeLocalQueueFactory.create(bambooNodeStatus, i));
                ManagedChannel createClientChannel = this.grpcChannelService.createClientChannel(bambooNodeStatus.getHostname(), bambooNodeStatus.getInternalCommunicationPort(), this.grpcClientExecutor.get());
                Map<Class<? extends AbstractAsyncStub<?>>, ? extends AbstractAsyncStub<?>> createStubs = createStubs(createClientChannel);
                this.channelsAndStubsByQueueId.put(create, Pair.make(createClientChannel, createStubs));
                PerNodeLocalQueueDispatcherImpl create2 = PerNodeLocalQueueDispatcherImpl.create(tapePerNodeLocalQueueWithStats, this.dispatchersExecutor.get(), createStubs, this.perNodeLocalQueueCriticalHandler, this.thisNodeMetadataInfo);
                this.queueDispatcherByQueueId.put(create, create2);
                log.info(PerNodeLocalQueueLogPrefix.prefix() + "Created cache replication queue: {} with queue dispatcher running: {}", tapePerNodeLocalQueueWithStats.name(), create2);
                return tapePerNodeLocalQueueWithStats;
            } catch (IOException e) {
                log.error(PerNodeLocalQueueLogPrefix.prefix() + "Error when creating replication queue: {} for node: {}. This node will be inconsistent. Error: {}", create, bambooNodeStatus.getNodeId(), e.getMessage(), e);
                return null;
            } catch (Exception e2) {
                log.error(PerNodeLocalQueueLogPrefix.prefix() + "Error when creating gRPC channel and(or) stub for replication queue: {} for node: {}. This node will be inconsistent. Error: {}", create, bambooNodeStatus.getNodeId(), e2.getMessage(), e2);
                return null;
            }
        });
        return Pair.make(this.queuesByQueueId.get(create), this.queueDispatcherByQueueId.get(create));
    }

    @VisibleForTesting
    @NotNull
    protected Map<Class<? extends AbstractAsyncStub<?>>, ? extends AbstractAsyncStub<?>> createStubs(@NotNull ManagedChannel managedChannel) {
        return (Map) Stream.of((Object[]) new AbstractAsyncStub[]{BambooCrossNodesEventsServiceGrpc.newStub(managedChannel), AtlassianCacheCrossNodesEventsServiceGrpc.newStub(managedChannel), PluginCrossNodesEventsServiceGrpc.newStub(managedChannel)}).collect(Collectors.toMap(abstractAsyncStub -> {
            return abstractAsyncStub.getClass();
        }, abstractAsyncStub2 -> {
            return abstractAsyncStub2;
        }));
    }

    public int addToAllQueues(@NotNull CrossNodesEvent crossNodesEvent) {
        if (this.status.get() != Status.STARTED) {
            log.info(PerNodeLocalQueueLogPrefix.prefix() + "Service: {} is in status: {}. Not syncing: {}", PerNodeLocalQueueManager.class.getSimpleName(), this.status.get(), crossNodesEvent);
            return 0;
        }
        List<BambooNodeStatus> nodesWhichShouldHaveActiveQueues = getNodesWhichShouldHaveActiveQueues();
        if (nodesWhichShouldHaveActiveQueues.isEmpty()) {
            if (!log.isDebugEnabled()) {
                return 0;
            }
            log.debug(PerNodeLocalQueueLogPrefix.prefix() + "No nodes in cluster to sync: {}", crossNodesEvent);
            return 0;
        }
        TapeSerializationContext create = TapeSerializationContext.create();
        try {
            if (log.isDebugEnabled()) {
                log.debug(PerNodeLocalQueueLogPrefix.prefix() + "About to add CrossNodesEvent: {} to replication queues: {}...", crossNodesEvent, Integer.valueOf(nodesWhichShouldHaveActiveQueues.size()));
            }
            int i = 0;
            Iterator<BambooNodeStatus> it = nodesWhichShouldHaveActiveQueues.iterator();
            while (it.hasNext()) {
                Pair<PerNodeLocalQueue, PerNodeLocalQueueDispatcher> initOrGetQueueDispatcherForCurrentThread = initOrGetQueueDispatcherForCurrentThread(it.next());
                PerNodeLocalQueue perNodeLocalQueue = (PerNodeLocalQueue) initOrGetQueueDispatcherForCurrentThread.getFirst();
                PerNodeLocalQueueDispatcher perNodeLocalQueueDispatcher = (PerNodeLocalQueueDispatcher) initOrGetQueueDispatcherForCurrentThread.getSecond();
                if (perNodeLocalQueue == null || perNodeLocalQueueDispatcher == null) {
                    log.warn(PerNodeLocalQueueLogPrefix.prefix() + "Queue is null. Not syncing: {}", crossNodesEvent);
                } else if (perNodeLocalQueue.isClosed()) {
                    log.warn(PerNodeLocalQueueLogPrefix.prefix() + "Queue: {} is closed. Not syncing: {}", perNodeLocalQueue.name(), crossNodesEvent);
                } else if (perNodeLocalQueueDispatcher.dispatch(crossNodesEvent)) {
                    i++;
                }
            }
            if (log.isDebugEnabled()) {
                log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Done adding CrossNodesEvent: {} to replication queues: {}.", crossNodesEvent, Integer.valueOf(i));
            }
            int i2 = i;
            if (create != null) {
                create.close();
            }
            return i2;
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @NotNull
    private Pair<PerNodeLocalQueue, PerNodeLocalQueueDispatcher> initOrGetQueueDispatcherForCurrentThread(BambooNodeStatus bambooNodeStatus) {
        return initOrGetQueueAndItsDispatcher(bambooNodeStatus, PerNodeLocalQueue.nodeQueueNumberForCurrentThread());
    }

    private boolean isNodeCurrentNode(BambooNodeStatus bambooNodeStatus) {
        return Objects.equals(this.thisNodeId, bambooNodeStatus.getNodeId());
    }

    @VisibleForTesting
    @NotNull
    Set<PerNodeLocalQueue.QueueId> getQueueIdsForExistingQueues() {
        return new HashSet(Collections.list(this.queuesByQueueId.keys()));
    }

    @NotNull
    public Status getManagerLifecycleStatusAndStartIfNeeded() {
        Status status = this.status.get();
        log.info(PerNodeLocalQueueLogPrefix.prefix() + "Manager status: {}", status);
        if (status == Status.CREATED || status == Status.STOPPED) {
            log.info(PerNodeLocalQueueLogPrefix.prefix() + "Found manager status: {}. Starting it...", status);
            start();
        }
        return this.status.get();
    }
}
