/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackPressureStatsTrackerImpl
implements BackPressureStatsTracker {
    private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class);
    private final Object lock = new Object();
    private final BackPressureRequestCoordinator coordinator;
    private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
    private final Set<ExecutionJobVertex> pendingStats = new HashSet<ExecutionJobVertex>();
    private final int backPressureStatsRefreshInterval;
    @GuardedBy(value="lock")
    private boolean shutDown;

    public BackPressureStatsTrackerImpl(BackPressureRequestCoordinator coordinator, int cleanUpInterval, int refreshInterval) {
        Preconditions.checkArgument((cleanUpInterval >= 0 ? 1 : 0) != 0, (Object)"The cleanup interval must be non-negative.");
        Preconditions.checkArgument((refreshInterval >= 0 ? 1 : 0) != 0, (Object)"The back pressure stats refresh interval must be non-negative.");
        this.coordinator = (BackPressureRequestCoordinator)Preconditions.checkNotNull((Object)coordinator);
        this.backPressureStatsRefreshInterval = refreshInterval;
        this.operatorStatsCache = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess((long)cleanUpInterval, TimeUnit.MILLISECONDS).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
        Object object = this.lock;
        synchronized (object) {
            OperatorBackPressureStats stats = (OperatorBackPressureStats)this.operatorStatsCache.getIfPresent((Object)vertex);
            if (stats == null || (long)this.backPressureStatsRefreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) {
                this.triggerBackPressureRequestInternal(vertex);
            }
            return Optional.ofNullable(stats);
        }
    }

    private void triggerBackPressureRequestInternal(ExecutionJobVertex vertex) {
        Executor executor;
        assert (Thread.holdsLock(this.lock));
        if (this.shutDown) {
            return;
        }
        if (!this.pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState() && (executor = vertex.getGraph().getFutureExecutor()) != null) {
            this.pendingStats.add(vertex);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Triggering back pressure request for tasks: " + Arrays.toString(vertex.getTaskVertices()));
            }
            CompletableFuture<BackPressureStats> statsFuture = this.coordinator.triggerBackPressureRequest(vertex.getTaskVertices());
            statsFuture.handleAsync((BiFunction)new BackPressureRequestCompletionCallback(vertex), executor);
        }
    }

    @Override
    public void cleanUpOperatorStatsCache() {
        this.operatorStatsCache.cleanUp();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutDown() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.shutDown) {
                this.operatorStatsCache.invalidateAll();
                this.pendingStats.clear();
                this.shutDown = true;
            }
        }
    }

    private class BackPressureRequestCompletionCallback
    implements BiFunction<BackPressureStats, Throwable, Void> {
        private final ExecutionJobVertex vertex;

        BackPressureRequestCompletionCallback(ExecutionJobVertex vertex) {
            this.vertex = vertex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void apply(BackPressureStats backPressureStats, Throwable throwable) {
            Object object = BackPressureStatsTrackerImpl.this.lock;
            synchronized (object) {
                block15: {
                    if (!BackPressureStatsTrackerImpl.this.shutDown) break block15;
                    Void void_ = null;
                    BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                    return void_;
                }
                try {
                    JobStatus jobState = this.vertex.getGraph().getState();
                    if (jobState.isGloballyTerminalState()) {
                        LOG.debug("Ignoring stats, because job is in state " + jobState + ".");
                    } else if (backPressureStats != null) {
                        OperatorBackPressureStats stats = this.createOperatorBackPressureStats(backPressureStats);
                        BackPressureStatsTrackerImpl.this.operatorStatsCache.put((Object)this.vertex, (Object)stats);
                    } else {
                        LOG.debug("Failed to gather back pressure stats.", throwable);
                    }
                    BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                }
                catch (Throwable t) {
                    try {
                        LOG.error("Error during stats completion.", t);
                    }
                    catch (Throwable throwable2) {
                        throw throwable2;
                    }
                    finally {
                        BackPressureStatsTrackerImpl.this.pendingStats.remove(this.vertex);
                    }
                }
                return null;
            }
        }

        private OperatorBackPressureStats createOperatorBackPressureStats(BackPressureStats stats) {
            Map<ExecutionAttemptID, Double> backPressureRatiosByTask = stats.getBackPressureRatios();
            HashMap subtaskIndexMap = Maps.newHashMapWithExpectedSize((int)backPressureRatiosByTask.size());
            Set<ExecutionAttemptID> tasks = backPressureRatiosByTask.keySet();
            for (ExecutionVertex task : this.vertex.getTaskVertices()) {
                ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
                if (tasks.contains(taskId)) {
                    subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
                    continue;
                }
                LOG.debug("Outdated stats. A task, which is part of the request has been reset.");
            }
            double[] backPressureRatios = new double[backPressureRatiosByTask.size()];
            for (Map.Entry<ExecutionAttemptID, Double> entry : backPressureRatiosByTask.entrySet()) {
                int subtaskIndex = (Integer)subtaskIndexMap.get(entry.getKey());
                backPressureRatios[subtaskIndex] = entry.getValue();
            }
            return new OperatorBackPressureStats(stats.getRequestId(), stats.getEndTime(), backPressureRatios);
        }
    }
}

