/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.hadoop2.shaded.com.google.common.base.Charsets;
import org.apache.flink.shaded.hadoop2.org.codehaus.jackson.JsonParseException;
import org.apache.flink.shaded.hadoop2.org.codehaus.jackson.map.JsonMappingException;
import org.apache.flink.shaded.hadoop2.org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.QueueACL;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.Private
@InterfaceStability.Unstable
class JobSubmitter {
    protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
    private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
    private static final int SHUFFLE_KEY_LENGTH = 64;
    private FileSystem jtFs;
    private ClientProtocol submitClient;
    private String submitHostName;
    private String submitHostAddress;

    JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) throws IOException {
        this.submitClient = submitClient;
        this.jtFs = submitFs;
    }

    private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
        URI srcUri = srcFs.getUri();
        URI dstUri = destFs.getUri();
        if (srcUri.getScheme() == null) {
            return false;
        }
        if (!srcUri.getScheme().equals(dstUri.getScheme())) {
            return false;
        }
        String srcHost = srcUri.getHost();
        String dstHost = dstUri.getHost();
        if (srcHost != null && dstHost != null) {
            try {
                srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
                dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
            }
            catch (UnknownHostException ue) {
                return false;
            }
            if (!srcHost.equals(dstHost)) {
                return false;
            }
        } else {
            if (srcHost == null && dstHost != null) {
                return false;
            }
            if (srcHost != null && dstHost == null) {
                return false;
            }
        }
        return srcUri.getPort() == dstUri.getPort();
    }

    private Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf, short replication) throws IOException {
        FileSystem remoteFs = null;
        remoteFs = originalPath.getFileSystem(conf);
        if (this.compareFs(remoteFs, this.jtFs)) {
            return originalPath;
        }
        Path newPath = new Path(parentDir, originalPath.getName());
        FileUtil.copy(remoteFs, originalPath, this.jtFs, newPath, false, conf);
        this.jtFs.setReplication(newPath, replication);
        return newPath;
    }

    private void copyAndConfigureFiles(Job job, Path submitJobDir, short replication) throws IOException {
        URI pathURI;
        Path newPath;
        Path tmp;
        URI tmpURI;
        Configuration conf = job.getConfiguration();
        if (!conf.getBoolean("mapreduce.client.genericoptionsparser.used", false)) {
            LOG.warn((Object)"Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.");
        }
        String files = conf.get("tmpfiles");
        String libjars = conf.get("tmpjars");
        String archives = conf.get("tmparchives");
        String jobJar = job.getJar();
        LOG.debug((Object)("default FileSystem: " + this.jtFs.getUri()));
        if (this.jtFs.exists(submitJobDir)) {
            throw new IOException("Not submitting job. Job directory " + submitJobDir + " already exists!! This is unexpected.Please check what's there in" + " that directory");
        }
        submitJobDir = this.jtFs.makeQualified(submitJobDir);
        submitJobDir = new Path(submitJobDir.toUri().getPath());
        FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
        FileSystem.mkdirs(this.jtFs, submitJobDir, mapredSysPerms);
        Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
        Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
        Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
        if (files != null) {
            String[] fileArr;
            FileSystem.mkdirs(this.jtFs, filesDir, mapredSysPerms);
            for (String tmpFile : fileArr = files.split(",")) {
                tmpURI = null;
                try {
                    tmpURI = new URI(tmpFile);
                }
                catch (URISyntaxException e) {
                    throw new IllegalArgumentException(e);
                }
                tmp = new Path(tmpURI);
                newPath = this.copyRemoteFiles(filesDir, tmp, conf, replication);
                try {
                    pathURI = this.getPathURI(newPath, tmpURI.getFragment());
                    DistributedCache.addCacheFile(pathURI, conf);
                }
                catch (URISyntaxException ue) {
                    throw new IOException("Failed to create uri for " + tmpFile, ue);
                }
            }
        }
        if (libjars != null) {
            String[] libjarsArr;
            FileSystem.mkdirs(this.jtFs, libjarsDir, mapredSysPerms);
            for (String tmpjars : libjarsArr = libjars.split(",")) {
                Path tmp2 = new Path(tmpjars);
                Path newPath2 = this.copyRemoteFiles(libjarsDir, tmp2, conf, replication);
                DistributedCache.addFileToClassPath(new Path(newPath2.toUri().getPath()), conf);
            }
        }
        if (archives != null) {
            String[] archivesArr;
            FileSystem.mkdirs(this.jtFs, archivesDir, mapredSysPerms);
            for (String tmpArchives : archivesArr = archives.split(",")) {
                try {
                    tmpURI = new URI(tmpArchives);
                }
                catch (URISyntaxException e) {
                    throw new IllegalArgumentException(e);
                }
                tmp = new Path(tmpURI);
                newPath = this.copyRemoteFiles(archivesDir, tmp, conf, replication);
                try {
                    pathURI = this.getPathURI(newPath, tmpURI.getFragment());
                    DistributedCache.addCacheArchive(pathURI, conf);
                }
                catch (URISyntaxException ue) {
                    throw new IOException("Failed to create uri for " + tmpArchives, ue);
                }
            }
        }
        if (jobJar != null) {
            Path jobJarPath;
            URI jobJarURI;
            if ("".equals(job.getJobName())) {
                job.setJobName(new Path(jobJar).getName());
            }
            if ((jobJarURI = (jobJarPath = new Path(jobJar)).toUri()).getScheme() == null || jobJarURI.getAuthority() == null || !jobJarURI.getScheme().equals(this.jtFs.getUri().getScheme()) || !jobJarURI.getAuthority().equals(this.jtFs.getUri().getAuthority())) {
                this.copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), replication);
                job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
            }
        } else {
            LOG.warn((Object)"No job jar file set.  User classes may not be found. See Job or Job#setJar(String).");
        }
        ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
        ClientDistributedCacheManager.getDelegationTokens(conf, job.getCredentials());
    }

    private URI getPathURI(Path destPath, String fragment) throws URISyntaxException {
        URI pathURI = destPath.toUri();
        if (pathURI.getFragment() == null) {
            pathURI = fragment == null ? new URI(pathURI.toString() + "#" + destPath.getName()) : new URI(pathURI.toString() + "#" + fragment);
        }
        return pathURI;
    }

    private void copyJar(Path originalJarPath, Path submitJarFile, short replication) throws IOException {
        this.jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
        this.jtFs.setReplication(submitJarFile, replication);
        this.jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
    }

    private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException {
        Configuration conf = job.getConfiguration();
        short replication = (short)conf.getInt("mapreduce.client.submit.file.replication", 10);
        this.copyAndConfigureFiles(job, jobSubmitDir, replication);
        if (job.getWorkingDirectory() == null) {
            job.setWorkingDirectory(this.jtFs.getWorkingDirectory());
        }
    }

    JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
        this.checkSpecs(job);
        Configuration conf = job.getConfiguration();
        JobSubmitter.addMRFrameworkToDistributedCache(conf);
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
        InetAddress ip = InetAddress.getLocalHost();
        if (ip != null) {
            this.submitHostAddress = ip.getHostAddress();
            this.submitHostName = ip.getHostName();
            conf.set("mapreduce.job.submithostname", this.submitHostName);
            conf.set("mapreduce.job.submithostaddress", this.submitHostAddress);
        }
        JobID jobId = this.submitClient.getNewJobID();
        job.setJobID(jobId);
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        JobStatus status = null;
        try {
            conf.set("mapreduce.job.user.name", UserGroupInformation.getCurrentUser().getShortUserName());
            conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
            conf.set("mapreduce.job.dir", submitJobDir.toString());
            LOG.debug((Object)("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"));
            TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{submitJobDir}, conf);
            this.populateTokenCache(conf, job.getCredentials());
            if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
                KeyGenerator keyGen;
                try {
                    keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
                    keyGen.init(64);
                }
                catch (NoSuchAlgorithmException e) {
                    throw new IOException("Error generating shuffle secret key", e);
                }
                SecretKey shuffleKey = keyGen.generateKey();
                TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials());
            }
            this.copyAndConfigureFiles(job, submitJobDir);
            Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
            LOG.debug((Object)("Creating splits at " + this.jtFs.makeQualified(submitJobDir)));
            int maps = this.writeSplits(job, submitJobDir);
            conf.setInt("mapreduce.job.maps", maps);
            LOG.info((Object)("number of splits:" + maps));
            String queue = conf.get("mapreduce.job.queuename", "default");
            AccessControlList acl = this.submitClient.getQueueAdmins(queue);
            conf.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
            TokenCache.cleanUpTokenReferral(conf);
            if (conf.getBoolean("mapreduce.job.token.tracking.ids.enabled", false)) {
                ArrayList<String> trackingIds = new ArrayList<String>();
                for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) {
                    trackingIds.add(t.decodeIdentifier().getTrackingId());
                }
                conf.setStrings("mapreduce.job.token.tracking.ids", trackingIds.toArray(new String[trackingIds.size()]));
            }
            this.writeConf(conf, submitJobFile);
            this.printTokens(jobId, job.getCredentials());
            status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
            if (status != null) {
                JobStatus jobStatus = status;
                return jobStatus;
            }
            throw new IOException("Could not launch job");
        }
        finally {
            if (status == null) {
                LOG.info((Object)("Cleaning up the staging area " + submitJobDir));
                if (this.jtFs != null && submitJobDir != null) {
                    this.jtFs.delete(submitJobDir, true);
                }
            }
        }
    }

    private void checkSpecs(Job job) throws ClassNotFoundException, InterruptedException, IOException {
        JobConf jConf = (JobConf)job.getConfiguration();
        if (jConf.getNumReduceTasks() == 0 ? jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
            OutputFormat<?, ?> output = ReflectionUtils.newInstance(job.getOutputFormatClass(), job.getConfiguration());
            output.checkOutputSpecs(job);
        } else {
            jConf.getOutputFormat().checkOutputSpecs(this.jtFs, jConf);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeConf(Configuration conf, Path jobFile) throws IOException {
        FSDataOutputStream out = FileSystem.create(this.jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
        try {
            conf.writeXml(out);
        }
        finally {
            out.close();
        }
    }

    private void printTokens(JobID jobId, Credentials credentials) throws IOException {
        LOG.info((Object)("Submitting tokens for job: " + jobId));
        for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
            LOG.info(token);
        }
    }

    private <T extends org.apache.hadoop.mapreduce.InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
        List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
        org.apache.hadoop.mapreduce.InputSplit[] array = splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
        Arrays.sort(array, new SplitComparator());
        JobSplitWriter.createSplitFiles((Path)jobSubmitDir, (Configuration)conf, (FileSystem)jobSubmitDir.getFileSystem(conf), (org.apache.hadoop.mapreduce.InputSplit[])array);
        return array.length;
    }

    private int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf jConf = (JobConf)job.getConfiguration();
        int maps = jConf.getUseNewMapper() ? this.writeNewSplits(job, jobSubmitDir) : this.writeOldSplits(jConf, jobSubmitDir);
        return maps;
    }

    private int writeOldSplits(JobConf job, Path jobSubmitDir) throws IOException {
        InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks());
        Arrays.sort(splits, new Comparator<InputSplit>(){

            @Override
            public int compare(InputSplit a, InputSplit b) {
                try {
                    long left = a.getLength();
                    long right = b.getLength();
                    if (left == right) {
                        return 0;
                    }
                    if (left < right) {
                        return 1;
                    }
                    return -1;
                }
                catch (IOException ie) {
                    throw new RuntimeException("Problem getting input split size", ie);
                }
            }
        });
        JobSplitWriter.createSplitFiles(jobSubmitDir, (Configuration)job, jobSubmitDir.getFileSystem(job), splits);
        return splits.length;
    }

    private void readTokensFromFiles(Configuration conf, Credentials credentials) throws IOException {
        String tokensFileName;
        String binaryTokenFilename = conf.get("mapreduce.job.credentials.binary");
        if (binaryTokenFilename != null) {
            Credentials binary = Credentials.readTokenStorageFile(new Path("file:///" + binaryTokenFilename), conf);
            credentials.addAll(binary);
        }
        if ((tokensFileName = conf.get("mapreduce.job.credentials.json")) != null) {
            LOG.info((Object)("loading user's secret keys from " + tokensFileName));
            String localFileName = new Path(tokensFileName).toUri().getPath();
            boolean json_error = false;
            try {
                ObjectMapper mapper = new ObjectMapper();
                Map nm = mapper.readValue(new File(localFileName), Map.class);
                for (Map.Entry ent : nm.entrySet()) {
                    credentials.addSecretKey(new Text((String)ent.getKey()), ((String)ent.getValue()).getBytes(Charsets.UTF_8));
                }
            }
            catch (JsonMappingException e) {
                json_error = true;
            }
            catch (JsonParseException e) {
                json_error = true;
            }
            if (json_error) {
                LOG.warn((Object)"couldn't parse Token Cache JSON file with user secret keys");
            }
        }
    }

    private void populateTokenCache(Configuration conf, Credentials credentials) throws IOException {
        this.readTokensFromFiles(conf, credentials);
        Object[] nameNodes = conf.getStrings("mapreduce.job.hdfs-servers");
        LOG.debug((Object)("adding the following namenodes' delegation tokens:" + Arrays.toString(nameNodes)));
        if (nameNodes != null) {
            Path[] ps = new Path[nameNodes.length];
            for (int i = 0; i < nameNodes.length; ++i) {
                ps[i] = new Path((String)nameNodes[i]);
            }
            TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
        }
    }

    private static void addMRFrameworkToDistributedCache(Configuration conf) throws IOException {
        String framework = conf.get("mapreduce.application.framework.path", "");
        if (!framework.isEmpty()) {
            URI uri;
            try {
                uri = new URI(framework);
            }
            catch (URISyntaxException e) {
                throw new IllegalArgumentException("Unable to parse '" + framework + "' as a URI, check the setting for " + "mapreduce.application.framework.path", e);
            }
            String linkedName = uri.getFragment();
            FileSystem fs = FileSystem.get(conf);
            Path frameworkPath = fs.makeQualified(new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()));
            FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf);
            frameworkPath = fc.resolvePath(frameworkPath);
            uri = frameworkPath.toUri();
            try {
                uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, linkedName);
            }
            catch (URISyntaxException e) {
                throw new IllegalArgumentException(e);
            }
            DistributedCache.addCacheArchive(uri, conf);
        }
    }

    private static class SplitComparator
    implements Comparator<org.apache.hadoop.mapreduce.InputSplit> {
        private SplitComparator() {
        }

        @Override
        public int compare(org.apache.hadoop.mapreduce.InputSplit o1, org.apache.hadoop.mapreduce.InputSplit o2) {
            try {
                long len1 = o1.getLength();
                long len2 = o2.getLength();
                if (len1 < len2) {
                    return 1;
                }
                if (len1 == len2) {
                    return 0;
                }
                return -1;
            }
            catch (IOException ie) {
                throw new RuntimeException("exception in compare", ie);
            }
            catch (InterruptedException ie) {
                throw new RuntimeException("exception in compare", ie);
            }
        }
    }
}

