IT技术互动交流平台

Hadoop1.2.1学习之Job建和提交源代码分析

来源:IT165收集  发布日期:2014-10-27 18:39:54

在Hadoop中,MapReduce的Java作业通常由编写Mapper和Reducer开始,接着创建Job对象,然后使用该对象的set方法设置Mapper和Reducer以及诸如输入输出等参数,最后调用Job对象的waitForCompletion(true)方法提交作业并等待作业的完成。尽管使用了寥寥数语就描述了作业的创建和提交,但实际情况要复杂的多,本篇文章将通过分析源代码来深入学习该过程。

通常使用public Job(Configuration conf, String jobName)创建Job作业对象,都会指定作业名称,hadoop代码只是将jobName设置为参数mapred.job.name的值。除了设置作业名称外,Job的构造函数还会使用Configuration对象初始化org.apache.hadoop.mapred.JobConf对象conf,以及使用UserGroupInformation.getCurrentUser()获取当前用户ugi。其中JobConf是描述MapReduce作业的主要接口,包括设置作业名称在内的许多方法都是由该类完成的。UserGroupInformation类用包含了用户和组的信息,该类封装了JAAS(Java Authentication AuthorizationService,Java认证和授权服务),并提供方法确定用户名和组。

当创建了Job对象后通常会设置Mapper和Reducer,比如job.setMapperClass,正像上面提到的,该操作实际是由JobConf对象完成的,具体代码如下,其它的设置方法类似:

public void setMapperClass(Class<? extends Mapper> cls) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
}

在设置完作业运行需要的参数后,执行job.waitForCompletion(true)向集群提交作业并等待作业执行完成,其中的boolean类型的参数用于决定是否向用户打印作业的执行进度。该方法的具体代码如下:

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
}

当新创建一个作业时,该作业的JobState state = JobState.DEFINE,所以上面的代码中会执行submit方法,当在submit返回后会根据参数verbose为true或false执行不同的方法。现在具体submit的实现:

public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();//默认使用新版本中的API,除非显示设置了老版本的API
    
    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    super.setJobID(info.getID());
    state = JobState.RUNNING;
}

在submit中,先确认Job的state为JobState.DEFINE,并最后在将作业提交后设置为JobState.RUNNING。connect方法用于打开到JobTracker的连接,该方法的代码为:

private void connect() throws IOException, InterruptedException {
    ugi.doAs(new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        jobClient = new JobClient((JobConf) getConfiguration());    
        return null;
      }
    });
}

在进一步分析之前,需要先了解两个对象,分别是JobClient jobClient和RunningJobinfo,其中jobClient是用户作业与JobTracker交互的主要接口,该类具有提交作业,跟踪作业进度,访问任务日志和获取MapReduce集群状态信息等功能。RunningJob是接口,用于查询正在运行的MapReduce作业的细节,当调用jobClient的submitJobInternal时,返回的是jobClient的内部类NetworkedJob(该类实现了RunningJob)。在connect方法中,主要是实例化了jobClient对象,而ugi的doAs方法的返回值为run方法的返回值,后面还会使用该方法(实际情况是该方法被大量使用)。在JobClient的构造方法中,主要完成了连接JobTracker的工作,该工作又交给了init方法,该方法的具体实现为:

public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
// mapreduce.client.tasklog.timeout
    tasklogtimeout = conf.getInt(
      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
    this.ugi = UserGroupInformation.getCurrentUser();
    if ("local".equals(tracker)) {
      conf.setNumMapTasks(1);
      this.jobSubmitClient = new LocalJobRunner(conf);
    } else {
      this.rpcJobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
    }        
}

在该方法中着重分析非单机模式下的情况,即mapred.job.tracker的值不是local,也即else语句中的代码。rpcJobSubmitClient和jobSubmitClient是类型为JobSubmissionProtocol的两个对象,JobClient和JobTracker使用该接口通信,JobClient使用该接口的方法提交作业及了解当前系统的状态。方法createRPCProxy和createProxy用于创建实现JobSubmissionProtocol的客户端对象。

在连接到JobTracker后,接着使用jobClient的submitJobInternal向JobTracker提交作业。在该方法中首先确定存放作业文件的路径,该路径为${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging设置,若未设置mapreduce.jobtracker.staging.root.dir则使用/tmp/hadoop/mapred/staging/${user-name}/.staging。然后在上述目录创建名为作业Id的目录,并将参数mapreduce.job.dir设置为该值,即${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging/jobId,上面的目录均是相对于fs.default.name设置的值。接下来将作业的jar文件拷贝到${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging/jobId中,并重命名为job.jar文件,该工作由copyAndConfigureFiles方法完成。接着需要在上述目录中创建job.xml文件,获取Reduce任务的数量,分割输入文件并根据分割所得块数设置Map任务的数量。做完上述工作后,使用下面的代码提交作业:

status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials());

当将作业提交到JobTracker后,作业的执行将由JobTracker负责,而做为提交作业的客户端可以选择是否打印作业执行进度。

综上在Hadoop-1.2.1中作业的创建和提交包括如下的一些过程:

设置作业的输入输出参数拷贝作业文件和配置文件到特定目录中计算作业的分片并设置Map任务的数量 向JobTracker提交作业并可选的监控作业运行进度

Tag标签: 建和   源代码  
  • 专题推荐

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规