/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperJobGraphStore
implements JobGraphStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperJobGraphStore.class);
    private final Object cacheLock = new Object();
    private final Set<JobID> addedJobGraphs = new HashSet<JobID>();
    private final ZooKeeperStateHandleStore<JobGraph> jobGraphsInZooKeeper;
    private final PathChildrenCache pathCache;
    private final String zooKeeperFullBasePath;
    private JobGraphStore.JobGraphListener jobGraphListener;
    private boolean isRunning;

    public ZooKeeperJobGraphStore(String zooKeeperFullBasePath, ZooKeeperStateHandleStore<JobGraph> zooKeeperStateHandleStore, PathChildrenCache pathCache) {
        Preconditions.checkNotNull((Object)zooKeeperFullBasePath, (String)"Current jobs path");
        this.zooKeeperFullBasePath = zooKeeperFullBasePath;
        this.jobGraphsInZooKeeper = (ZooKeeperStateHandleStore)Preconditions.checkNotNull(zooKeeperStateHandleStore);
        this.pathCache = (PathChildrenCache)Preconditions.checkNotNull((Object)pathCache);
        pathCache.getListenable().addListener(new JobGraphsPathCacheListener());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(JobGraphStore.JobGraphListener jobGraphListener) throws Exception {
        Object object = this.cacheLock;
        synchronized (object) {
            if (!this.isRunning) {
                this.jobGraphListener = jobGraphListener;
                this.pathCache.start();
                this.isRunning = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        Object object = this.cacheLock;
        synchronized (object) {
            if (this.isRunning) {
                this.jobGraphListener = null;
                try {
                    Exception exception = null;
                    try {
                        this.jobGraphsInZooKeeper.releaseAll();
                    }
                    catch (Exception e) {
                        exception = e;
                    }
                    try {
                        this.pathCache.close();
                    }
                    catch (Exception e) {
                        exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                    }
                    if (exception != null) {
                        throw new FlinkException("Could not properly stop the ZooKeeperJobGraphStore.", (Throwable)exception);
                    }
                }
                finally {
                    this.isRunning = false;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    @Nullable
    public JobGraph recoverJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = ZooKeeperJobGraphStore.getPathForJob(jobId);
        LOG.debug("Recovering job graph {} from {}{}.", new Object[]{jobId, this.zooKeeperFullBasePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            JobGraph jobGraph;
            boolean success;
            block13: {
                RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle;
                this.verifyIsRunning();
                success = false;
                try {
                    jobGraphRetrievableStateHandle = this.jobGraphsInZooKeeper.getAndLock(path);
                }
                catch (KeeperException.NoNodeException ignored) {
                    success = true;
                    JobGraph jobGraph2 = null;
                    if (success) return jobGraph2;
                    this.jobGraphsInZooKeeper.release(path);
                    return jobGraph2;
                }
                catch (Exception e) {
                    throw new FlinkException("Could not retrieve the submitted job graph state handle for " + path + " from the submitted job graph store.", (Throwable)e);
                }
                jobGraph = jobGraphRetrievableStateHandle.retrieveState();
                break block13;
                catch (ClassNotFoundException cnfe) {
                    throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + path + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", (Throwable)cnfe);
                }
                catch (IOException ioe) {
                    throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + path + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", (Throwable)ioe);
                }
            }
            this.addedJobGraphs.add(jobGraph.getJobID());
            LOG.info("Recovered {}.", (Object)jobGraph);
            success = true;
            return jobGraph;
            finally {
                if (!success) {
                    this.jobGraphsInZooKeeper.release(path);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putJobGraph(JobGraph jobGraph) throws Exception {
        Preconditions.checkNotNull((Object)jobGraph, (String)"Job graph");
        String path = ZooKeeperJobGraphStore.getPathForJob(jobGraph.getJobID());
        LOG.debug("Adding job graph {} to {}{}.", new Object[]{jobGraph.getJobID(), this.zooKeeperFullBasePath, path});
        boolean success = false;
        while (!success) {
            Object object = this.cacheLock;
            synchronized (object) {
                this.verifyIsRunning();
                int currentVersion = this.jobGraphsInZooKeeper.exists(path);
                if (currentVersion == -1) {
                    try {
                        this.jobGraphsInZooKeeper.addAndLock(path, jobGraph);
                        this.addedJobGraphs.add(jobGraph.getJobID());
                        success = true;
                    }
                    catch (KeeperException.NodeExistsException nodeExistsException) {}
                } else if (this.addedJobGraphs.contains(jobGraph.getJobID())) {
                    try {
                        this.jobGraphsInZooKeeper.replace(path, currentVersion, jobGraph);
                        LOG.info("Updated {} in ZooKeeper.", (Object)jobGraph);
                        success = true;
                    }
                    catch (KeeperException.NoNodeException noNodeException) {}
                } else {
                    throw new IllegalStateException("Oh, no. Trying to update a graph you didn't #getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
                }
            }
        }
        LOG.info("Added {} to ZooKeeper.", (Object)jobGraph);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = ZooKeeperJobGraphStore.getPathForJob(jobId);
        LOG.debug("Removing job graph {} from {}{}.", new Object[]{jobId, this.zooKeeperFullBasePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            if (this.addedJobGraphs.contains(jobId)) {
                if (this.jobGraphsInZooKeeper.releaseAndTryRemove(path)) {
                    this.addedJobGraphs.remove(jobId);
                } else {
                    throw new FlinkException(String.format("Could not remove job graph with job id %s from ZooKeeper.", jobId));
                }
            }
        }
        LOG.info("Removed job graph {} from ZooKeeper.", (Object)jobId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = ZooKeeperJobGraphStore.getPathForJob(jobId);
        LOG.debug("Releasing locks of job graph {} from {}{}.", new Object[]{jobId, this.zooKeeperFullBasePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            if (this.addedJobGraphs.contains(jobId)) {
                this.jobGraphsInZooKeeper.release(path);
                this.addedJobGraphs.remove(jobId);
            }
        }
        LOG.info("Released locks of job graph {} from ZooKeeper.", (Object)jobId);
    }

    @Override
    public Collection<JobID> getJobIds() throws Exception {
        Collection<String> paths;
        LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", (Object)this.zooKeeperFullBasePath);
        try {
            paths = this.jobGraphsInZooKeeper.getAllPaths();
        }
        catch (Exception e) {
            throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e);
        }
        ArrayList<JobID> jobIds = new ArrayList<JobID>(paths.size());
        for (String path : paths) {
            try {
                jobIds.add(ZooKeeperJobGraphStore.jobIdfromPath(path));
            }
            catch (Exception exception) {
                LOG.warn("Could not parse job id from {}. This indicates a malformed path.", (Object)path, (Object)exception);
            }
        }
        return jobIds;
    }

    private void verifyIsRunning() {
        Preconditions.checkState((boolean)this.isRunning, (Object)"Not running. Forgot to call start()?");
    }

    public static String getPathForJob(JobID jobId) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return String.format("/%s", jobId);
    }

    public static JobID jobIdfromPath(String path) {
        return JobID.fromHexString((String)path);
    }

    private final class JobGraphsPathCacheListener
    implements PathChildrenCacheListener {
        private JobGraphsPathCacheListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            if (LOG.isDebugEnabled()) {
                if (event.getData() != null) {
                    LOG.debug("Received {} event (path: {})", (Object)event.getType(), (Object)event.getData().getPath());
                } else {
                    LOG.debug("Received {} event", (Object)event.getType());
                }
            }
            switch (event.getType()) {
                case CHILD_ADDED: {
                    JobID jobId = this.fromEvent(event);
                    LOG.debug("Received CHILD_ADDED event notification for job {}", (Object)jobId);
                    Object object = ZooKeeperJobGraphStore.this.cacheLock;
                    synchronized (object) {
                        try {
                            if (ZooKeeperJobGraphStore.this.jobGraphListener != null && !ZooKeeperJobGraphStore.this.addedJobGraphs.contains(jobId)) {
                                try {
                                    ZooKeeperJobGraphStore.this.jobGraphListener.onAddedJobGraph(jobId);
                                }
                                catch (Throwable t) {
                                    LOG.error("Error in callback", t);
                                }
                            }
                        }
                        catch (Exception e) {
                            LOG.error("Error in JobGraphsPathCacheListener", (Throwable)e);
                        }
                        break;
                    }
                }
                case CHILD_UPDATED: {
                    break;
                }
                case CHILD_REMOVED: {
                    JobID jobId = this.fromEvent(event);
                    LOG.debug("Received CHILD_REMOVED event notification for job {}", (Object)jobId);
                    Object object = ZooKeeperJobGraphStore.this.cacheLock;
                    synchronized (object) {
                        try {
                            if (ZooKeeperJobGraphStore.this.jobGraphListener != null && ZooKeeperJobGraphStore.this.addedJobGraphs.contains(jobId)) {
                                try {
                                    ZooKeeperJobGraphStore.this.jobGraphListener.onRemovedJobGraph(jobId);
                                }
                                catch (Throwable t) {
                                    LOG.error("Error in callback", t);
                                }
                            }
                        }
                        catch (Exception e) {
                            LOG.error("Error in JobGraphsPathCacheListener", (Throwable)e);
                        }
                        break;
                    }
                }
                case CONNECTION_SUSPENDED: {
                    LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not monitored (temporarily).");
                    break;
                }
                case CONNECTION_LOST: {
                    LOG.warn("ZooKeeper connection LOST. Changes to the submitted job graphs are not monitored (permanently).");
                    break;
                }
                case CONNECTION_RECONNECTED: {
                    LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.");
                    break;
                }
                case INITIALIZED: {
                    LOG.info("JobGraphsPathCacheListener initialized");
                }
            }
        }

        private JobID fromEvent(PathChildrenCacheEvent event) {
            return JobID.fromHexString((String)ZKPaths.getNodeFromPath(event.getData().getPath()));
        }
    }
}

