package com.atlassian.bamboo.cluster;

import com.atlassian.bamboo.cluster.event.AbstractCrossNodesEvent;
import com.atlassian.bamboo.cluster.event.CrossNodesEvent;
import com.atlassian.bamboo.cluster.event.MetadataInfo;
import com.atlassian.bamboo.cluster.tape.TapePerNodeLocalQueueCriticalHandler;
import com.atlassian.bamboo.cluster.tape.TapePerNodeLocalQueueWithStats;
import com.atlassian.bamboo.grpc.CrossNodesCommunication;
import com.atlassian.bamboo.grpc.CrossNodesEventsServiceGrpc;
import com.atlassian.bamboo.util.Narrow;
import com.atlassian.bamboo.utils.SystemProperty;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:com/atlassian/bamboo/cluster/PerNodeLocalQueueDispatcherImpl.class */
public class PerNodeLocalQueueDispatcherImpl implements PerNodeLocalQueueDispatcher {
    private final TapePerNodeLocalQueueWithStats queue;
    private final Executor executor;
    private final CrossNodesEventsServiceGrpc.CrossNodesEventsServiceStub stub;
    private final TapePerNodeLocalQueueCriticalHandler perNodeLocalQueueCriticalHandler;
    private final MetadataInfo metadataInfo;
    private static final Logger log = LogManager.getLogger(PerNodeLocalQueueDispatcherImpl.class);
    private static final int LOG_EXCEPTIONS_FREQUENCY = (int) new SystemProperty.IntegerSystemProperty(false, 50, new String[]{"bamboo.cluster.queue.dispatcher.log.exceptions.frequency"}).getTypedValue();
    private static final long[] RETRY_DELAYS = {100, 500, 1000, 1000, 1000, 1000, 2000, 3000, 5000};
    private static final Set<Status.Code> UNRECOVERABLE_ERRORS = ImmutableSet.of(Status.Code.UNIMPLEMENTED, Status.Code.INVALID_ARGUMENT);
    private final AtomicBoolean isQueueCurrentlyBeingProcessed = new AtomicBoolean(false);
    private final AtomicLong failuresCount = new AtomicLong(0);
    private final Lock workerSyncLock = new ReentrantLock(true);
    private final Stopwatch sendStopwatch = Stopwatch.createUnstarted();
    private final EventsRouter eventsRouter = new EventsRouter();

    /* loaded from: input_file:com/atlassian/bamboo/cluster/PerNodeLocalQueueDispatcherImpl$EventsRouter.class */
    private class EventsRouter {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/atlassian/bamboo/cluster/PerNodeLocalQueueDispatcherImpl$EventsRouter$CommonStreamObserver.class */
        public class CommonStreamObserver implements StreamObserver<CrossNodesCommunication.CommonResponse> {
            private CommonStreamObserver() {
            }

            public void onNext(CrossNodesCommunication.CommonResponse commonResponse) {
                if (PerNodeLocalQueueDispatcherImpl.log.isDebugEnabled()) {
                    PerNodeLocalQueueDispatcherImpl.log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Received response from server: {} with result status: {}", commonResponse.getClass().getSimpleName(), commonResponse.getResultStatus().getStatus());
                }
            }

            public void onError(Throwable th) {
                if ((th instanceof StatusRuntimeException) && PerNodeLocalQueueDispatcherImpl.UNRECOVERABLE_ERRORS.contains(((StatusRuntimeException) th).getStatus().getCode())) {
                    PerNodeLocalQueueDispatcherImpl.log.warn(PerNodeLocalQueueLogPrefix.prefix() + "Received " + ((StatusRuntimeException) th).getStatus().getCode().name() + " error from the server, this may be caused by the publisher and receiver nodes having different Bamboo versions. The event will be removed from the queue and won't be processed anymore.", th);
                    PerNodeLocalQueueDispatcherImpl.this.failuresCount.set(0L);
                    PerNodeLocalQueueDispatcherImpl.this.queue.notifyDroppedOnSend();
                    PerNodeLocalQueueDispatcherImpl.this.getOnCompletedWorkerJob().run();
                    return;
                }
                if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.FAILED_PRECONDITION)) {
                    PerNodeLocalQueueDispatcherImpl.this.failuresCount.incrementAndGet();
                    PerNodeLocalQueueDispatcherImpl perNodeLocalQueueDispatcherImpl = PerNodeLocalQueueDispatcherImpl.this;
                    PerNodeLocalQueueDispatcherImpl perNodeLocalQueueDispatcherImpl2 = PerNodeLocalQueueDispatcherImpl.this;
                    perNodeLocalQueueDispatcherImpl.deferWorkerJobBecauseOfFailure(th, () -> {
                        return perNodeLocalQueueDispatcherImpl2.getInitialWorkerJob();
                    }, 500L);
                    return;
                }
                if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.UNAVAILABLE)) {
                    PerNodeLocalQueueDispatcherImpl.this.queue.notifySendUnavailableException();
                    PerNodeLocalQueueDispatcherImpl perNodeLocalQueueDispatcherImpl3 = PerNodeLocalQueueDispatcherImpl.this;
                    PerNodeLocalQueueDispatcherImpl perNodeLocalQueueDispatcherImpl4 = PerNodeLocalQueueDispatcherImpl.this;
                    perNodeLocalQueueDispatcherImpl3.deferWorkerJobBecauseOfFailure(th, () -> {
                        return perNodeLocalQueueDispatcherImpl4.getInitialWorkerJob();
                    });
                    return;
                }
                PerNodeLocalQueueDispatcherImpl.this.queue.notifySendFailureException();
                PerNodeLocalQueueDispatcherImpl perNodeLocalQueueDispatcherImpl5 = PerNodeLocalQueueDispatcherImpl.this;
                PerNodeLocalQueueDispatcherImpl perNodeLocalQueueDispatcherImpl6 = PerNodeLocalQueueDispatcherImpl.this;
                perNodeLocalQueueDispatcherImpl5.deferWorkerJobBecauseOfFailure(th, () -> {
                    return perNodeLocalQueueDispatcherImpl6.getInitialWorkerJob();
                });
            }

            public void onCompleted() {
                PerNodeLocalQueueDispatcherImpl.this.failuresCount.set(0L);
                PerNodeLocalQueueDispatcherImpl.this.queue.notifySendWithTime(PerNodeLocalQueueDispatcherImpl.this.sendStopwatch.elapsed(TimeUnit.MILLISECONDS));
                PerNodeLocalQueueDispatcherImpl.this.getOnCompletedWorkerJob().run();
            }
        }

        public EventsRouter() {
        }

        public boolean route(@NotNull CrossNodesEvent crossNodesEvent) {
            AbstractCrossNodesEvent abstractCrossNodesEvent = (AbstractCrossNodesEvent) Narrow.downTo(crossNodesEvent, AbstractCrossNodesEvent.class);
            if (abstractCrossNodesEvent == null) {
                PerNodeLocalQueueDispatcherImpl.log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Unknown event type: {}", crossNodesEvent.getType());
                return true;
            }
            PerNodeLocalQueueDispatcherImpl.this.sendStopwatch.reset().start();
            abstractCrossNodesEvent.send(PerNodeLocalQueueDispatcherImpl.this.stub, PerNodeLocalQueueDispatcherImpl.this.metadataInfo, getCommonStreamObserver());
            return false;
        }

        @NotNull
        private StreamObserver<CrossNodesCommunication.CommonResponse> getCommonStreamObserver() {
            return new CommonStreamObserver();
        }
    }

    private PerNodeLocalQueueDispatcherImpl(TapePerNodeLocalQueueWithStats tapePerNodeLocalQueueWithStats, Executor executor, CrossNodesEventsServiceGrpc.CrossNodesEventsServiceStub crossNodesEventsServiceStub, TapePerNodeLocalQueueCriticalHandler tapePerNodeLocalQueueCriticalHandler, MetadataInfo metadataInfo) {
        this.queue = tapePerNodeLocalQueueWithStats;
        this.executor = executor;
        this.stub = crossNodesEventsServiceStub;
        this.perNodeLocalQueueCriticalHandler = tapePerNodeLocalQueueCriticalHandler;
        this.metadataInfo = metadataInfo;
        startWorkerIfPossible();
    }

    public static PerNodeLocalQueueDispatcherImpl create(TapePerNodeLocalQueueWithStats tapePerNodeLocalQueueWithStats, Executor executor, CrossNodesEventsServiceGrpc.CrossNodesEventsServiceStub crossNodesEventsServiceStub, TapePerNodeLocalQueueCriticalHandler tapePerNodeLocalQueueCriticalHandler, MetadataInfo metadataInfo) {
        return new PerNodeLocalQueueDispatcherImpl(tapePerNodeLocalQueueWithStats, executor, crossNodesEventsServiceStub, tapePerNodeLocalQueueCriticalHandler, metadataInfo);
    }

    public boolean dispatch(@NotNull CrossNodesEvent crossNodesEvent) {
        try {
            boolean add = this.queue.add(crossNodesEvent);
            if (add) {
                startWorkerIfPossible();
            }
            return add;
        } catch (Throwable th) {
            log.error(PerNodeLocalQueueLogPrefix.prefix() + "Critical state of local replication queue - cannot add: {} to queue: {}, error: {}", crossNodesEvent, this.queue.name(), th.getMessage(), th);
            this.queue.notifyCriticalAdd();
            this.perNodeLocalQueueCriticalHandler.handleCriticalAdd(this.queue, crossNodesEvent, th);
            return false;
        }
    }

    private void startWorkerIfPossible() {
        this.workerSyncLock.lock();
        if (!this.isQueueCurrentlyBeingProcessed.compareAndSet(false, true)) {
            this.workerSyncLock.unlock();
        } else {
            this.workerSyncLock.unlock();
            CompletableFuture.runAsync(getInitialWorkerJob(), this.executor);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotNull
    public Runnable getInitialWorkerJob() {
        return () -> {
            while (true) {
                CrossNodesEvent crossNodesEvent = (CrossNodesEvent) inWorkerSyncLock(() -> {
                    if (this.queue.isClosed()) {
                        log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Queue: {} is closed, stopping worker", this.queue.name());
                        this.isQueueCurrentlyBeingProcessed.set(false);
                        return null;
                    }
                    try {
                        CrossNodesEvent peek = this.queue.peek();
                        if (peek != null) {
                            return peek;
                        }
                        log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Queue: {} is empty, stopping worker", this.queue.name());
                        this.isQueueCurrentlyBeingProcessed.set(false);
                        return null;
                    } catch (Throwable th) {
                        this.queue.notifyCriticalPeek();
                        this.perNodeLocalQueueCriticalHandler.handleCriticalPeek(this.queue, th);
                        deferWorkerJobBecauseOfFailure(th, this::getInitialWorkerJob);
                        return null;
                    }
                });
                if (crossNodesEvent == null || !this.eventsRouter.route(crossNodesEvent)) {
                    return;
                }
                try {
                    this.queue.notifyDroppedOnSend();
                    this.queue.remove();
                } catch (Throwable th) {
                    this.queue.notifyCriticalRemove();
                    this.perNodeLocalQueueCriticalHandler.handleCriticalRemove(this.queue, th);
                    deferWorkerJobBecauseOfFailure(th, this::getInitialWorkerJob);
                    return;
                }
            }
        };
    }

    @NotNull
    private Runnable getOnCompletedWorkerJob() {
        return () -> {
            if (((Boolean) inWorkerSyncLock(() -> {
                if (this.queue.isClosed()) {
                    log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Queue: {} is closed, stopping worker", this.queue.name());
                    this.isQueueCurrentlyBeingProcessed.set(false);
                    return false;
                }
                try {
                    this.queue.remove();
                    return true;
                } catch (NoSuchElementException e) {
                    log.debug(PerNodeLocalQueueLogPrefix.prefix() + "Queue: {} is empty, stopping worker", this.queue.name());
                    this.isQueueCurrentlyBeingProcessed.set(false);
                    return false;
                } catch (Throwable th) {
                    this.queue.notifyCriticalRemove();
                    if (this.perNodeLocalQueueCriticalHandler.handleCriticalRemove(this.queue, th)) {
                        deferWorkerJobBecauseOfFailure(th, this::getInitialWorkerJob);
                    } else {
                        deferWorkerJobBecauseOfFailure(th, this::getOnCompletedWorkerJob);
                    }
                    return false;
                }
            })).booleanValue()) {
                getInitialWorkerJob().run();
            }
        };
    }

    private void deferWorkerJobBecauseOfFailure(Throwable th, Supplier<Runnable> supplier) {
        deferWorkerJobBecauseOfFailure(th, supplier, determinateDelay(this.failuresCount.incrementAndGet(), RETRY_DELAYS));
    }

    private void deferWorkerJobBecauseOfFailure(Throwable th, Supplier<Runnable> supplier, long j) {
        if (this.failuresCount.get() == 1 || this.failuresCount.get() % LOG_EXCEPTIONS_FREQUENCY == 0) {
            log.info(PerNodeLocalQueueLogPrefix.prefix() + "Exception: {} occurred when processing replication queue: {}, failuresCount: {}. Will retry indefinitely. Error: {}", th.getClass().getSimpleName(), this.queue.name(), Long.valueOf(this.failuresCount.get()), th.getMessage());
            log.debug("", th);
        } else if (log.isTraceEnabled()) {
            log.trace(PerNodeLocalQueueLogPrefix.prefix() + "Exception: {} occurred when processing replication queue: {}, failuresCount: {}. Will retry indefinitely.", th.getClass().getSimpleName(), this.queue.name(), Long.valueOf(this.failuresCount.get()), th);
        }
        deferWorkerJob(j, supplier);
    }

    private void deferWorkerJob(long j, Supplier<Runnable> supplier) {
        CompletableFuture.runAsync(supplier.get(), CompletableFuture.delayedExecutor(j, TimeUnit.MILLISECONDS, this.executor));
    }

    private static long determinateDelay(long j, long... jArr) {
        return j < 1 ? jArr[0] : jArr[Math.min((int) j, jArr.length) - 1];
    }

    private <T> T inWorkerSyncLock(Supplier<T> supplier) {
        this.workerSyncLock.lock();
        try {
            return supplier.get();
        } finally {
            this.workerSyncLock.unlock();
        }
    }

    public String toString() {
        return "PerNodeLocalQueueDispatcher{queue=" + this.queue + "}";
    }
}
