MapReduce: 大型集群上的简化数据处理 [Paper]
OSDI’04
MapReduce 是一种用于处理和生成大型数据集的编程模型和相关实现 .
用户指定一个处理键值对以生成一组中间键值对的
m
a
p
map
map 函数, 以及一个合并与同一中间键关联的所有中间值的
r
e
d
u
c
e
reduce
reduce 函数.
特点: 自动并行化并在大型集群上执行. 运行时系统负责对输入数据进行分区、调度程序执行、处理机器故障以及管理所需的机器间通信等细节.
分布式计算的输入数据很大, 处理并行计算、分发数据和处理故障等分布式所具有的问题掩盖了原本问题的简单计算.
MapReduce 使用用户指定的 map(映射) 和 reduce(归约) 操作的函数模型进行并行化大型计算, 并将重新执行作为主要的容错机制.
文章贡献:
MapReduce 编程模型
单词计数

m
a
p
(
k
1
,
v
1
)
→
l
i
s
t
(
k
2
,
v
2
)
r
e
d
u
c
e
(
k
2
,
l
i
s
t
(
v
2
)
)
→
l
i
s
t
(
v
2
)
输入键值与输出键值来自不同的域, 中间键值与输出键值来自同一域. (以上述单词计数为例, 输入键值是文件名和文件内容, 中间键值是单词和对应的数量, 输出是单词的数量)
MapReduce 接口可根据环境有不同的实现.
通过自动将输入数据划分为
M
M
M 个分片(splits)的集合,
M
a
p
Map
Map 调用分布在多台机器上.
通过使用分区函数(如
h
a
s
h
(
k
e
y
)
m
o
d
R
hash(key)\ mod\ R
hash(key) mod R)将中间键空间划分为
R
R
R 个片段来分布
R
e
d
u
c
e
Reduce
Reduce 调用. 分区数(
R
R
R)和分区函数由用户指定.

MapReduce 调用返回到用户代码.成功完成 MapReduce 执行后, 执行的输出在 R R R 个输出文件中(每个 reduce 任务一个, 文件名由用户指定).
存储每个 map 任务和 reduce 任务的状态(idle、in-process 或 completed), 和 worker 的身份(对于非空闲任务).
对于每个完成的 map 任务,存储 map 任务产生的
R
R
R 个中间文件的位置和大小.
master 定期对每个工作节点进行 ping 操作. 若在一定时间内没有收到 worker 的响应, 则 master 将该 worker 标记为故障. 故障的 worker 上已完成的和进行中的 map 任务或进行中的 reduce 任务会被重置为空闲并有资格被重新调度.
已完成的 map 任务失败时需重新执行, 因为其输出存储在故障机器的本地磁盘上而无法访问. 已完成的 reduce 任务不需要重新执行, 因为其输出存储在全局文件系统中.
当有 map 任务执行失败时, 所有执行 reduce 任务的 worker 会收到重新执行的通知. 任何尚未从故障 map worker 读取数据的 reduce 任务将从重新执行该任务的 map worker 上读取数据.
MapReduce 考虑到只有一个 master 发生故障的概率较小, 因此选择 master 故障时中止 MapReduce 计算.
当用户提供的 map 和 reduce 运算符是其输入值的确定性函数时, MapReduce 产生的输出与整个程序的无故障顺序执行产生的输出相同.
依靠 map 和 reduce 任务输出的原子性提交(atomic commit)来实现这个属性.
当一个 reduce 任务完成时, reduce worker 自动将其临时输出文件重命名为最终输出文件. 依靠底层文件系统提供的原子重命名操作来保证最终的文件系统状态只包含一次 reduce 任务执行产生的数据.
当 map 和/或 reduce 是非确定性的运算符时, 特定 reduce 任务的输出等价于由非确定性程序在顺序执行下产生的任务输出(由于非确定性重新存在不同的执行顺序从而可能产生不同的任务输出).
MapReduce master 尝试在包含相应输入数据副本的机器上调度 map 任务, 若不能满足则会尝试在该任务输入数据的副本附近安排一个 map 任务.
大多数(map 任务的)输入数据都是在本地读取的, 且不消耗网络带宽.
map 阶段细分为
M
M
M 个片段, reduce 阶段细分为
R
R
R 个片段. 理想情况下
M
M
M 和
R
R
R 应远大于 worker 机器的数量.
实现中
M
M
M 和
R
R
R 的大小有实际限制, 因为 master 必须做出
O
(
M
+
R
)
O(M + R)
O(M+R) 的调度决策并在内存中保持
O
(
M
∗
R
)
O(M*R)
O(M∗R) 的状态.
R
R
R 经常受到用户的限制, 因为每个 reduce 任务的输出最终在一个单独的输出文件中.
当 MapReduce 操作接近完成时, master 会安排备份执行剩余的进行中的任务
用户可以指定 reduce 任务/输出文件的数量 ( R R R).
在给定的分区内, 中间键/值对以递增的键顺序处理
某些情况下每个 map 任务产生的中间键存在显着的重复, 用户指定一个可选的
C
o
m
b
i
n
e
r
Combiner
Combiner 函数在通过网络发送中间件数据之前对其进行部分合并.
C
o
m
b
i
n
e
r
Combiner
Combiner 函数在每台执行 map 任务的机器上执行. 通常与
r
e
d
u
c
e
reduce
reduce 函数代码相同, 区别在 combiner 函数的输出是写到中间文件中而非最终输出文件中.
用户可以通过提供简单 r e a d e r reader reader 接口实现来添加对新输入类型的支持
在某些情况下, 生成辅助文件作为 map 和/或 reduce 运算符的附加输出很方便.
通常, 应用程序会写入一个临时文件, 并在该文件完全生成后自动重命名该文件.
MapReduce 库检测导致确定性崩溃的记录并跳过这些记录以向前推进.
每个 worker 进程会有一个信号处理程序, 用于捕获段违规和总线错误. 用户代码生成一个信号时,信号处理程序会向 master 发送一个包含序列号的"last gasp" UDP 数据包.
当 master 在特定记录上看到多个故障时, 下一次重新执行相应的 Map 或 Reduce 任务时应跳过该记录
开发了 MapReduce 库的替代实现, 它在本地机器上按顺序执行 MapReduce 操作的所有工作, 以便于调试.
master 运行一个内部 HTTP 服务器并导出一组显示计算进度的状态页面供人类使用.
MapReduce 库提供了一个计数器(Counter)工具来计算各种事件的发生次数.
来自各个 worker 机器的计数器值会定期传播到 master. master从成功的 map 和 reduce 任务中聚合计数器值, 并在 MapReduce 操作完成时将它们返回给用户代码.
本文的核心在于提出了分布式计算的通用接口 MapReduce. 同时利用了重复执行实现容错, 利用局部性优化减少网络通信, 通过备份加速计算等.
MapReduce 在 map 任务后的 shuffle(洗牌) 操作将中间键值按键名传输到相应的 reduce worker, 这一过程必然会导致网络通信, 且传输数据总量较大, 一定程度上是 MapReduce 的性能瓶颈.
MapReduce 整体看来扩展性良好且易于编程(分布式编程的细节被隐藏), 但不是最高效和灵活的(Ref: MIT 6.824 Lec1 Note)