package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinator.class */
public class BackPressureRequestCoordinator {
    private static final Logger LOG;
    private static final int NUM_GHOST_REQUEST_IDS = 10;
    private final Executor executor;
    private final Time requestTimeout;

    @GuardedBy("lock")
    private int requestIdCounter;

    @GuardedBy("lock")
    private boolean isShutDown;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<Integer, PendingBackPressureRequest> pendingRequests = new HashMap();
    private final ArrayDeque<Integer> recentPendingRequests = new ArrayDeque<>(10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureRequestCoordinator$PendingBackPressureRequest.class */
    public static class PendingBackPressureRequest {
        private final int requestId;
        private final Set<ExecutionAttemptID> pendingTasks;
        private final Map<ExecutionAttemptID, Double> backPressureRatios;
        private boolean isDiscarded;
        private final long startTime = System.currentTimeMillis();
        private final CompletableFuture<BackPressureStats> backPressureStatsFuture = new CompletableFuture<>();

        PendingBackPressureRequest(int i, ExecutionAttemptID[] executionAttemptIDArr) {
            this.requestId = i;
            this.pendingTasks = new HashSet(Arrays.asList(executionAttemptIDArr));
            this.backPressureRatios = Maps.newHashMapWithExpectedSize(executionAttemptIDArr.length);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isComplete() {
            checkDiscarded();
            return this.pendingTasks.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void discard(Throwable th) {
            if (this.isDiscarded) {
                return;
            }
            this.pendingTasks.clear();
            this.backPressureRatios.clear();
            this.backPressureStatsFuture.completeExceptionally(new RuntimeException("Discarded.", th));
            this.isDiscarded = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void collectBackPressureStats(ExecutionAttemptID executionAttemptID, double d) {
            checkDiscarded();
            checkCompleted();
            if (!this.pendingTasks.remove(executionAttemptID)) {
                throw new IllegalArgumentException(String.format("Unknown task %s.", executionAttemptID));
            }
            this.backPressureRatios.put(executionAttemptID, Double.valueOf(d));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void completePromiseAndDiscard() {
            this.isDiscarded = true;
            this.backPressureStatsFuture.complete(new BackPressureStats(this.requestId, this.startTime, System.currentTimeMillis(), this.backPressureRatios));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<BackPressureStats> getBackPressureStatsFuture() {
            return this.backPressureStatsFuture;
        }

        private void checkCompleted() {
            if (this.pendingTasks.isEmpty()) {
                throw new IllegalStateException("Completed.");
            }
        }

        private void checkDiscarded() {
            if (this.isDiscarded) {
                throw new IllegalStateException("Discarded.");
            }
        }
    }

    public BackPressureRequestCoordinator(Executor executor, long j) {
        Preconditions.checkArgument(j >= 0, "The request timeout must be non-negative.");
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.requestTimeout = Time.milliseconds(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<BackPressureStats> triggerBackPressureRequest(ExecutionVertex[] executionVertexArr) {
        Preconditions.checkNotNull(executionVertexArr, "Tasks to request must not be null.");
        Preconditions.checkArgument(executionVertexArr.length >= 1, "No tasks to request.");
        ExecutionAttemptID[] executionAttemptIDArr = new ExecutionAttemptID[executionVertexArr.length];
        Execution[] executionArr = new Execution[executionVertexArr.length];
        for (int i = 0; i < executionAttemptIDArr.length; i++) {
            Execution currentExecutionAttempt = executionVertexArr[i].getCurrentExecutionAttempt();
            if (currentExecutionAttempt == null || currentExecutionAttempt.getState() != ExecutionState.RUNNING) {
                return FutureUtils.completedExceptionally(new IllegalStateException("Task " + executionVertexArr[i].getTaskNameWithSubtaskIndex() + " is not running."));
            }
            executionArr[i] = currentExecutionAttempt;
            executionAttemptIDArr[i] = currentExecutionAttempt.getAttemptId();
        }
        synchronized (this.lock) {
            if (this.isShutDown) {
                return FutureUtils.completedExceptionally(new IllegalStateException("Shut down."));
            }
            int i2 = this.requestIdCounter;
            this.requestIdCounter = i2 + 1;
            LOG.debug("Triggering task back pressure request {}.", Integer.valueOf(i2));
            PendingBackPressureRequest pendingBackPressureRequest = new PendingBackPressureRequest(i2, executionAttemptIDArr);
            this.pendingRequests.put(Integer.valueOf(i2), pendingBackPressureRequest);
            requestBackPressure(executionArr, i2);
            return pendingBackPressureRequest.getBackPressureStatsFuture();
        }
    }

    private void requestBackPressure(Execution[] executionArr, int i) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        for (Execution execution : executionArr) {
            execution.requestBackPressure(i, this.requestTimeout).handleAsync((taskBackPressureResponse, th) -> {
                if (taskBackPressureResponse != null) {
                    handleSuccessfulTaskBackPressureResponse(taskBackPressureResponse);
                    return null;
                }
                handleFailedTaskBackPressureResponse(i, th);
                return null;
            }, this.executor);
        }
    }

    private void handleFailedTaskBackPressureResponse(int i, @Nullable Throwable th) {
        synchronized (this.lock) {
            if (this.isShutDown) {
                return;
            }
            PendingBackPressureRequest remove = this.pendingRequests.remove(Integer.valueOf(i));
            if (remove != null) {
                if (th != null) {
                    LOG.info(String.format("Cancelling back pressure request %d.", Integer.valueOf(i)), th);
                } else {
                    LOG.info("Cancelling back pressure request {}.", Integer.valueOf(i));
                }
                remove.discard(th);
                rememberRecentRequestId(i);
            }
        }
    }

    public void shutDown() {
        synchronized (this.lock) {
            if (!this.isShutDown) {
                LOG.info("Shutting down back pressure request coordinator.");
                Iterator<PendingBackPressureRequest> it = this.pendingRequests.values().iterator();
                while (it.hasNext()) {
                    it.next().discard(new RuntimeException("Shut down."));
                }
                this.pendingRequests.clear();
                this.recentPendingRequests.clear();
                this.isShutDown = true;
            }
        }
    }

    private void handleSuccessfulTaskBackPressureResponse(TaskBackPressureResponse taskBackPressureResponse) {
        int requestId = taskBackPressureResponse.getRequestId();
        ExecutionAttemptID executionAttemptID = taskBackPressureResponse.getExecutionAttemptID();
        double backPressureRatio = taskBackPressureResponse.getBackPressureRatio();
        synchronized (this.lock) {
            if (this.isShutDown) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Collecting back pressure response of request {} from task {}.", Integer.valueOf(requestId), executionAttemptID);
            }
            PendingBackPressureRequest pendingBackPressureRequest = this.pendingRequests.get(Integer.valueOf(requestId));
            if (pendingBackPressureRequest != null) {
                pendingBackPressureRequest.collectBackPressureStats(executionAttemptID, backPressureRatio);
                if (pendingBackPressureRequest.isComplete()) {
                    this.pendingRequests.remove(Integer.valueOf(requestId));
                    rememberRecentRequestId(requestId);
                    pendingBackPressureRequest.completePromiseAndDiscard();
                }
            } else if (this.recentPendingRequests.contains(Integer.valueOf(requestId))) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received late back pressure request {} result of task {}.", Integer.valueOf(requestId), executionAttemptID);
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Unknown request ID %d.", Integer.valueOf(requestId)));
            }
        }
    }

    private void rememberRecentRequestId(int i) {
        if (this.recentPendingRequests.size() >= 10) {
            this.recentPendingRequests.removeFirst();
        }
        this.recentPendingRequests.addLast(Integer.valueOf(i));
    }

    @VisibleForTesting
    int getNumberOfPendingRequests() {
        int size;
        synchronized (this.lock) {
            size = this.pendingRequests.size();
        }
        return size;
    }

    static {
        $assertionsDisabled = !BackPressureRequestCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(BackPressureRequestCoordinator.class);
    }
}
