【Hadoop】MapReduce的工作机制

MapReduce 任务执行流程

  • 代码编写
  • 作业配置
  • 作业提交
  • Map 任务的分配和执行 [输入准备 任务执行 输出结果]
  • 处理中间结果
  • Reduce 任务分配和执行 [输入准备 任务执行 输出结果]
  • 作业完成

重要的角色及其作用

  • 客户端 : 编写代码、配置作业、提交作业
  • JobTracker : 初始化作业、分配作业、与 TaskTracker 通信,协调整个作业进行
  • TaskTracker : 保持和 JobTracker 的通信,执行 Map 或者 Reduce 任务
  • HDFS : 保存作业的数据、配置信息等,保存作业结果

提交作业需要注意的配置

  • 程序代码 : map 和 reduce 函数实现的代码需要保证语法正确和逻辑正确
  • Map 接口和 Reduce 接口配置

Map 接口派生于 Mapper<k1,v1,k2,v2> 接口,对应 map函数

Reduce 接口派生于 Reduce<k2,v2,k2,v3> 接口,对应 reduce 函数

4个参数分别表示 : 输入 key 的类型、输入 value 的类型、输出 key-value 对的类型和 Reporter 实例,输入输出的类型与继承时设置的类型一致,Map 接口的输出 key-value 类型和 Reduce 接口的输入 key-value 类型对应(map输出组合 value 以后成为 reduce 的输入内容)

  • 输入输出路径

必须保证输出路径不存在,输入路径中存放需要处理的数据文件

  • 其他配置

Output 的kay 、value 类型、作业名称、InputFormat 和 OutputFormat 等

提交作业过程

  • 读取当前作业 ID 号码
  • 检查作业相关路径
  • 计算作业输入划分,写入 job.split 文件
  • 将作业复制到 HDFS 上面
  • submitJob() 方法提交作业,准备执行

初始化作业步骤

  • 从 HDFS 中读取作业对应的 job.spilt
  • 创建并初始化 map 任务 和 reduce 任务
  • 创建 2 个 task,分别初始化 map 和 reduce

分配任务

通过 心跳机制(heartbeat) 完成 JobTracker 和 TaskTracker 之间的通信以及任务的分配

TaskTracker 向 JobTracker 发送自己的状态,根据情况选择是否请求新的 Task,然后发送心跳

JobTracker 接受到心跳信息以后先分析,如果发现 TaskTracker 在请求一个新的 Task,任务调度器就封装好一个任务及其信息返回给 TaskTracker

分配任务时,JobTracker 会首先考虑 map 任务数据本地化,即根据 TaskTracker 的网络位置选择一个最近的文件分配给这个 TaskTracker,所以往往 DataNode 与 TaskTracker 在一台机器上

执行任务

TaskTracker 申请到任务以后就本地执行任务,过程是:

拷贝 job.spilt 到本地

拷贝 job.jar 到本地

在 job.xml 中写入 job 的配置信息

创建本地任务目录,解压 job.jar

发布任务

任务执行的步骤是:

配置任务的执行参数

在 Child 临时文件表中添加 map 任务信息

配置 log 文件夹,配置 map 任务的通信和输出参数

为 map 任务生成 MapRunnalbe,从 RecordReader 中接受数据,调用 Mapper 的 map 函数进行处理

将 map 函数的输出调用 collect 收集到 MapOutputBuffer 中,准备输入到 reduce 中

更新任务进度和状态

TaskTracker 每隔 5 秒将自己的任务执行状态发送给 JobTracker,所有的 TaskTracker 的统计信息最终都会发送到 JobTracker,产生全局作业进度统计信息

完成作业

最后一个 TaskTracker 完成任务并把任务状态信息发送给 JobTracker 以后,JobTracker就将任务标记为 成功 ,然后会产生一条成功的信息给用户,最后通过 runJob()返回,同时清空 JobTracker 的作业工作状态,并且通知 TaskTracker 清空作业状态(删除中间输出、临时文件等等)

shuffle 和排序

为了让 reduce 可以并行处理 map,必须要对 map 的输出进行一定的排序和分割,然后再交给 reduce ,这个过程就称为shuffle ,它是整个 MapReduce 的核心所在,shuffle 的性能直接影响到整个 MapReduce 的性能

shuffle 过程包含在 map 和 reduce 两端

在 map 端,shuffle 主要对 map 的结果进行 partition(划分)、sort(排序)、spill(分割),然后 merge(合并),写入磁盘,最后发送到对应的 reduce 端

在 reduce 端,shuffle 主要将各个 map 送来的属于同一个 partition 的输出进行 merge ,然后对 merge 的结果进行 sort,最后交给 reduce 进行处理

通常,对 shuffle 会进行相关的优化来提升性能(这里暂不讨论)

任务执行策略

  • 推测式执行
  • 任务 JVM 重用
  • 跳过坏记录

错误处理机制

  • 硬件故障

JobTracker 会出现单点故障(高版本的 Hadoop 已经解决这个问题)

当某一个TaskTracker(T1) 出现故障时,JobTracker 会将这个TaskTracker 从集合中移除,使另一个TaskTracker(T2) 来执行 T1 的任务

  • 任务失败

如果是代码缺陷导致任务失败,任务的 JVM 会主动退出,并且将错误信息写入 log 文件,并且标记为失败

如果是进程崩溃导致任务失败,TaskTracker 的监听进程会标记此次任务失败,然后杀死对应的进程

任务调度机制

主要有:

  • 先进先出调度器(First Input First Output)
  • 公平调度器(Fair Scheduler Guide)
  • 容量调度器(Capacity Scheduler Guide)

【参考资料:陆嘉恒《Hadoop 实战》】

刘凯宁

20150414

Share

发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

*


*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>