/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

@ThreadSafe
public class HeartbeatManagerImpl<I, O>
implements HeartbeatManager<I, O> {
    private final long heartbeatTimeoutIntervalMs;
    private final ResourceID ownResourceID;
    private final HeartbeatListener<I, O> heartbeatListener;
    private final ScheduledExecutor scheduledExecutor;
    protected final Logger log;
    private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    private final Executor executor;
    protected volatile boolean stopped;

    public HeartbeatManagerImpl(long heartbeatTimeoutIntervalMs, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger log) {
        Preconditions.checkArgument((heartbeatTimeoutIntervalMs > 0L ? 1 : 0) != 0, (Object)"The heartbeat timeout has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
        this.ownResourceID = (ResourceID)Preconditions.checkNotNull((Object)ownResourceID);
        this.heartbeatListener = (HeartbeatListener)Preconditions.checkNotNull(heartbeatListener, (String)"heartbeatListener");
        this.scheduledExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)scheduledExecutor);
        this.log = (Logger)Preconditions.checkNotNull((Object)log);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.heartbeatTargets = new ConcurrentHashMap(16);
        this.stopped = false;
    }

    ResourceID getOwnResourceID() {
        return this.ownResourceID;
    }

    Executor getExecutor() {
        return this.executor;
    }

    HeartbeatListener<I, O> getHeartbeatListener() {
        return this.heartbeatListener;
    }

    Collection<HeartbeatMonitor<O>> getHeartbeatTargets() {
        return this.heartbeatTargets.values();
    }

    @Override
    public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
        if (!this.stopped) {
            if (this.heartbeatTargets.containsKey(resourceID)) {
                this.log.debug("The target with resource ID {} is already been monitored.", (Object)resourceID);
            } else {
                HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatMonitor<O>(resourceID, heartbeatTarget, this.scheduledExecutor, this.heartbeatListener, this.heartbeatTimeoutIntervalMs);
                this.heartbeatTargets.put(resourceID, heartbeatMonitor);
                if (this.stopped) {
                    heartbeatMonitor.cancel();
                    this.heartbeatTargets.remove(resourceID);
                }
            }
        }
    }

    @Override
    public void unmonitorTarget(ResourceID resourceID) {
        HeartbeatMonitor<O> heartbeatMonitor;
        if (!this.stopped && (heartbeatMonitor = this.heartbeatTargets.remove(resourceID)) != null) {
            heartbeatMonitor.cancel();
        }
    }

    @Override
    public void stop() {
        this.stopped = true;
        for (HeartbeatMonitor<O> heartbeatMonitor : this.heartbeatTargets.values()) {
            heartbeatMonitor.cancel();
        }
        this.heartbeatTargets.clear();
    }

    @Override
    public long getLastHeartbeatFrom(ResourceID resourceId) {
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceId);
        if (heartbeatMonitor != null) {
            return heartbeatMonitor.getLastHeartbeat();
        }
        return -1L;
    }

    @Override
    public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {
        if (!this.stopped) {
            this.log.debug("Received heartbeat from {}.", (Object)heartbeatOrigin);
            this.reportHeartbeat(heartbeatOrigin);
            if (heartbeatPayload != null) {
                this.heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
            }
        }
    }

    @Override
    public void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload) {
        if (!this.stopped) {
            this.log.debug("Received heartbeat request from {}.", (Object)requestOrigin);
            HeartbeatTarget<O> heartbeatTarget = this.reportHeartbeat(requestOrigin);
            if (heartbeatTarget != null) {
                CompletableFuture<O> futurePayload;
                if (heartbeatPayload != null) {
                    this.heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }
                if ((futurePayload = this.heartbeatListener.retrievePayload(requestOrigin)) != null) {
                    CompletionStage sendHeartbeatFuture = futurePayload.thenAcceptAsync(retrievedPayload -> heartbeatTarget.receiveHeartbeat(this.getOwnResourceID(), retrievedPayload), this.executor);
                    ((CompletableFuture)sendHeartbeatFuture).exceptionally(failure -> {
                        this.log.warn("Could not send heartbeat to target with id {}.", (Object)requestOrigin, failure);
                        return null;
                    });
                } else {
                    heartbeatTarget.receiveHeartbeat(this.ownResourceID, null);
                }
            }
        }
    }

    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (this.heartbeatTargets.containsKey(resourceID)) {
            HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
            heartbeatMonitor.reportHeartbeat();
            return heartbeatMonitor.getHeartbeatTarget();
        }
        return null;
    }

    static class HeartbeatMonitor<O>
    implements Runnable {
        private final ResourceID resourceID;
        private final HeartbeatTarget<O> heartbeatTarget;
        private final ScheduledExecutor scheduledExecutor;
        private final HeartbeatListener<?, ?> heartbeatListener;
        private final long heartbeatTimeoutIntervalMs;
        private volatile ScheduledFuture<?> futureTimeout;
        private final AtomicReference<State> state = new AtomicReference<State>(State.RUNNING);
        private volatile long lastHeartbeat;

        HeartbeatMonitor(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor scheduledExecutor, HeartbeatListener<?, O> heartbeatListener, long heartbeatTimeoutIntervalMs) {
            this.resourceID = (ResourceID)Preconditions.checkNotNull((Object)resourceID);
            this.heartbeatTarget = (HeartbeatTarget)Preconditions.checkNotNull(heartbeatTarget);
            this.scheduledExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)scheduledExecutor);
            this.heartbeatListener = (HeartbeatListener)Preconditions.checkNotNull(heartbeatListener);
            Preconditions.checkArgument((heartbeatTimeoutIntervalMs > 0L ? 1 : 0) != 0, (Object)"The heartbeat timeout interval has to be larger than 0.");
            this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
            this.lastHeartbeat = 0L;
            this.resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
        }

        HeartbeatTarget<O> getHeartbeatTarget() {
            return this.heartbeatTarget;
        }

        ResourceID getHeartbeatTargetId() {
            return this.resourceID;
        }

        public long getLastHeartbeat() {
            return this.lastHeartbeat;
        }

        void reportHeartbeat() {
            this.lastHeartbeat = System.currentTimeMillis();
            this.resetHeartbeatTimeout(this.heartbeatTimeoutIntervalMs);
        }

        void resetHeartbeatTimeout(long heartbeatTimeout) {
            if (this.state.get() == State.RUNNING) {
                this.cancelTimeout();
                this.futureTimeout = this.scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
                if (this.state.get() != State.RUNNING) {
                    this.cancelTimeout();
                }
            }
        }

        void cancel() {
            if (this.state.compareAndSet(State.RUNNING, State.CANCELED)) {
                this.cancelTimeout();
            }
        }

        private void cancelTimeout() {
            if (this.futureTimeout != null) {
                this.futureTimeout.cancel(true);
            }
        }

        public boolean isCanceled() {
            return this.state.get() == State.CANCELED;
        }

        @Override
        public void run() {
            if (this.state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
                this.heartbeatListener.notifyHeartbeatTimeout(this.resourceID);
            }
        }

        private static enum State {
            RUNNING,
            TIMEOUT,
            CANCELED;

        }
    }
}

