MapReduce 工作流程
以示例程序
wordcount
为例
Map
InputFormat
InputFormat
会将我们指定的输入路径中的文件按照block
(默认 128M)逻辑切分成若干切片(split
,如果文件不足 128M 则单独为一个切片,如果满了128M但是不满128M*1.1也单独为一个切片),然后交给RecordReader
进行处理,产出若干key/value record
RecordReader
产出的key/value record
会暂存在内存中的一块环形缓冲区中(逻辑上成环形),写入record
时会从环形上的两个位置写入,一个位置写入record
,一个位置写入record
的索引inde
,这样做的好处是:要想在环上找到一个record
不用遍历数据量较大的record
序列,而只用遍历数据量较小的index
列表。
Shuffle
Ring Buffer & Excessive Writing & Combine
MapTask
会根据输入的大数据源源不断的产出record
,而环形缓冲的大小是有限的(假设是100M,此参数可配置),当环形缓冲的占用量达到80%
(此参数可配置)时,就会对这80%
的record
进行一个全排序(准确的说是二次排序,先按照partition
有序(见Partitioner
),再按照record
的key
有序),如果你设置了CombinerClass
那么同时会对record
进行一个合并,最后写入磁盘(此过程称为溢写),形成一个首先分区号有序其次key
有序的record
序列;而剩下的20%
则继续迎接后续写入的record
。
Merge Sort
这样输入的数据集就会分批次写入到硬盘中,形成多个批次内有序的record
序列,然后再从硬盘中逐批读出这些序列进行一个归并排序(归并的过程中又可以应用Combiner
做一个合并处理),最终产出该MapTask
对应的分区号有序、同一分区内record.key
有序的record
序列,即将流入Reducer
Reducer
setNumReduceTask
MapTask
的数量是由切片规则来决定的,输入的数据***被切成多少片就会有多少个MapTask
,每个MapTask
都会产出一个分区号有序的record
序列,而ReducerTask
是在Driver
中通过setNumReducerTask
手动指定的,一般会和Partitioner
返回的分区号(返回0则会由ReduceTask1
处理并产出到part-r-0000
)类别数保持一致。
ReduceTask
ReduceTask
会从所有的MapTask
的产出中抓取出分区号和自己对应的record
过来,例如上图中ReduceTask1
会分别抓取MapTask1
和MapTask2
产出的record
序列中分区号为0
的部分,进行一个归并排序(过程中使用GroupingComparator
进行分组,结果对应Reducer#reduce
方法入参的Iterable values
)并将结果序列中的元素(Object key,Iterable values
)逐个交给Reducer#reduce
进行处理,可以通过context.write
写入到output
对应分区号的part-r-000x
中。