博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce阶段源码分析以及shuffle过程详解
阅读量:6034 次
发布时间:2019-06-20

本文共 2335 字,大约阅读时间需要 7 分钟。

MapReducer工作流程图:

MapReduce阶段源码分析以及shuffle过程详解

1. MapReduce阶段源码分析

1)客户端提交源码分析

MapReduce阶段源码分析以及shuffle过程详解

解释
   - 判断是否打印日志
   - 判断是否使用新的API,检查连接
   - 在检查连接时,检查输入输出路径,计算切片,将jar、配置文件复制到HDFS
   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)
   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片
   - 通过block大小和最小切片数、最大切片数计算出切片大小
   - 过切片大小,计算出map的数量以及分发到的节点
   - 提交job给yarn,进行MapReduce计算

2)map阶段源码分析源码分析(Map 的input阶段)

MapReduce阶段源码分析以及shuffle过程详解

解释
   - 首先Map Task任务,调用run()方法,run()方法会经过以下几个阶段
   - 初始化taskcontext对象
   - 对mapper对象的初始化,此处包括一个默认值的判断,如果没有自定义mapper类,默认用系统的Mapper
   - 对文件输入的格式化,此处包括一个默认值的判断,如果没有自定义inputFormat类,默认用系统的TextinputFormat
   - 创建input对象,创建具体的文件读取类,通过lineReader(),默认每次迭代读取一行,此处实现一个迭代的判断的nextKeyVaule(),并在nextKeyVaule实现时初始化key和value
   - Input初始化:计算打开位置,读取文件内容,(放弃第一行)
   - 调用mapper的run方法循环读取,直到末尾,多读一行,start放弃第一行的数据被上一个切片读到,注意这里的run方法中就会调用我们编写的Mapper类中的setup、map、cleanup方法

3)map阶段源码分析源码分析(Map 的output阶段)

MapReduce阶段源码分析以及shuffle过程详解

解释
   - 由newOutCollector创建output对象
   - newOutCollector中需要准备collector和partitions计算reduce数量,会将map端输出的K,V,P(分区号)写入collector中
   - 在准备collector实际上是准备MpaOutputBuffer,这是一特别复杂的过程,这里向大致的解释一下,就是先将收集的KV,P写入一个环形的缓冲区,然后在经过排序和分区将数据写入到文件中。(具体过程会在下面的shuffle中讲解)
   - 最后mapOut结束之后,会调用close方法关闭output,在关闭时,会将剩余在buffer环的数据缓冲出去,并且将所有一些的小文件进行排序然后合并成一个大文件。

2. shuffle过程详解

MapReduce阶段源码分析以及shuffle过程详解

过程介绍

  • 假如在hdfs中存储一个300M文件,每个block的大小默认为128M,而且默认的切片大小也是128M,因此,每一个MapTask任务会处理一个split,则是有三个MapTask并行处理。
  • 每一个MapTask任务处理完成后,会通过收集器,将输出的结果存入一个环形缓冲区中,写入的过程会经过简单的排序,这个环形缓冲区的默认是100M,当环形缓冲区的大小使用超过80%,一个后台线程就会启动把环形缓冲区中的数据写入到磁盘文件,同时Map会继续向环形缓冲区中写入数据。
  • 环形缓冲去的工作原理:
    • 环形缓冲区的大小默认为100M(可以配置mapred-site.xml:mapreduce.task.io.sort.mb)
    • 环形缓冲区的阈值为:80%((mapred-site.xml:mapreduce.map.sort.spill.percent,默认80%)
    • 在环形缓冲区中,存储了两种数据,一个是元数据:分区号,map的key的起始位置,map的value的起始位置,map的value的长度(每一个元数据长度为4个int长度,长度固定)
    • 一种是原始数据:存放map的key和value
    • 在存储原始数据和元数据的时候,会将元数据和原始数据中间建立一个赤道,分割二者,然后不断的向两端写入数据,在环形缓冲区的数据写入到80%的时候,将这些数据锁定,然后向硬盘中溢写成小文件,同时环形缓冲区的剩下的部分仍然可以写数据,直到溢写结束,锁定释放,继续可以将元数据和原始数据写入缓冲区中。
  • 缓冲区溢写小文件:在溢写小文件的时候,会对缓冲区中的元数据根据分区号和key进行排序,然后根据排序好的元数据,溢写相应的原始数据(这是因为元数据的大小是固定的,比直接排序原始数据更容易),这样最后就会溢写出多个已经根据分区和key排序好的小文件(这里可以加入conbiner)
  • 对溢写后的小文件进行归并:此时会将溢写后的小文件进行归并成一个大文件(使用归并排序),此时合并的大文件已经按照分区和key排好序,
  • reduce拉取相应的数据:Reducer 中的一个线程定期向MRAppMaster询问Mapper输出结果文件位置,mapper结束后会向MRAppMaster汇报信息,从而 Reducer 得知 Mapper 状态,得到 map 结果文件目录;reduce会相应的拉取相同分区的小文件到本地
  • 然后会将拉取得到的相应的相同分区的小文件,进行归并排序合并成为一个有序的大文件(相同的key在一起)。
  • 然后根据分组规则,相同的key为一组调用一次reduce方法,处理数据
  • 最终将结果数据根据分区写入到不同的分区文件中。

转载于:https://blog.51cto.com/14048416/2342168

你可能感兴趣的文章
32位系统和64位系统的选择
查看>>
01配置管理过程指南
查看>>
jstl格式化时间
查看>>
一则关于运算符的小例
查看>>
centos7 ambari2.6.1.5+hdp2.6.4.0 大数据集群安装部署
查看>>
cronexpression 详解
查看>>
一周小程序学习 第1天
查看>>
小孩的linux
查看>>
SpringMVC、MyBatis声明式事务管理
查看>>
开发者详解:端游及手游服务端的常用架构
查看>>
JavaScript History对象
查看>>
在 Windows 下安装 Oracle 11g XE (Express Edition)
查看>>
ListView优化
查看>>
【原创】 PostgreSQL 实现MySQL 的auto_increment 字段
查看>>
vs2015添加vc助手
查看>>
检测点1.1
查看>>
android--------阿里 AndFix 热修复
查看>>
java springcloud版b2b2c社交电商spring cloud分布式微服务 (七)高可用的分布式配置中心(Spring Cloud Config)...
查看>>
Oozie与Coordinator调度讲解及系统时区配置与定时触发两种配置方式
查看>>
RGB_YUV_YCbCr
查看>>