• Lab 1 实验 MapReduce


    👂 若月亮没来 (若是月亮还没来)(若是月亮还没来) - 王宇宙Leto/乔浚丞 - 单曲 - 网易云音乐

    目录

    🌼参考代码

    🐙解析 

    🐟mrsequential.go

    🐟mrapps/wc.go

    📕实验--准备

    🎂概念

    🐋思路梳理

    🦖注意要点 

    🐆初始代码--研读

    main/mrcoordinator.go

    main/mrworker.go

    mr/coordinator.go

    mr/worker.go

    mr/rpc.go

    🦈实验--开始

    🐋伪代码

    mr/coordinator.go

    mr/worker.go

    mr/rpc.go

    🐎结果


    🌼参考代码

    🐙解析 

     实验原文要求仔细研读两份代码,并在作业过程中大胆借鉴

    wc.go

    • MapReduce 的插件,实现了 Map 和 Reduce 两个函数
    • Map 函数接收输入文本的内容,分割成单词,并为每个单词生成一个键值对(键是单词,值是1)
    • Reduce 函数接收 Map 生成的所有键值对,统计每个单词出现次数,并返回这个次数

    mrsequential.go

    • MapReduce 的主体,协调 Map 和 Reduce 任务的执行
    • 检查命令行参数

      os.Args[0]  // 可执行文件
      os.Args[1]  // "wc.so"(插件)
      os.Args[2]  // "pg1.txt"(输入文件)
      os.Args[3]  // "pg2.txt"
    • 加载 wc.go,执行其中的 Map 和 Reduce 函数
    • 读取输入文件内容,对每个文件调用 Map 函数
    • Map 函数的输出按键 排序输出单词
    • 排序的键值对进行 Reduce(归约)
    • Reduce 函数的输出写入输出文件统计次数

    🐟mrsequential.go

    代码是单线程的,输入 --> Map --> sort --> Reduce --> 输出

    1. package main
    2. import (
    3. "fmt"
    4. "6.824/mr" // 引入MapReduce相关的数据结构和接口
    5. "plugin" // 用于动态加载插件
    6. "os" // 用于操作系统相关的功能,如命令行参数
    7. "log" // 用于日志记录
    8. "io/ioutil" // 用于I/O操作,如读取文件
    9. "sort" // 用于排序
    10. )
    11. // ByKey 是一个用于按键排序的切片类型
    12. type ByKey []mr.KeyValue
    13. // Len 实现了 sort.Interface 接口的 Len 方法
    14. func (a ByKey) Len() int { return len(a) }
    15. // Swap 实现了 sort.Interface 接口的 Swap 方法
    16. func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
    17. // Less 实现了 sort.Interface 接口的 Less 方法
    18. func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
    19. func main() {
    20. // 检查命令行参数数量
    21. if len(os.Args) < 3 {
    22. fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
    23. os.Exit(1)
    24. }
    25. // 加载插件中的 Map 和 Reduce 函数
    26. mapf, reducef := loadPlugin(os.Args[1])
    27. // 用于存储Map阶段的中间输出
    28. intermediate := []mr.KeyValue{}
    29. // 遍历所有输入文件
    30. for _, filename := range os.Args[2:] {
    31. // 打开文件
    32. file, err := os.Open(filename)
    33. if err != nil {
    34. log.Fatalf("cannot open %v", filename)
    35. }
    36. // 读取文件全部内容
    37. content, err := ioutil.ReadAll(file)
    38. if err != nil {
    39. log.Fatalf("cannot read %v", filename)
    40. }
    41. file.Close()
    42. // 调用 Map 函数处理文件内容
    43. kv := mapf(filename, string(content))
    44. // 将Map结果添加到中间输出
    45. intermediate = append(intermediate, kv...)
    46. }
    47. // 对中间输出按键排序
    48. sort.Sort(ByKey(intermediate))
    49. // 创建输出文件
    50. oname := "mr-out-0"
    51. ofile, _ := os.Create(oname)
    52. // 调用 Reduce 函数并写入输出文件
    53. i := 0
    54. for i < len(intermediate) {
    55. j := i + 1
    56. for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
    57. j++
    58. }
    59. // 收集相同键的所有值
    60. values := []string{}
    61. for k := i; k < j; k++ {
    62. values = append(values, intermediate[k].Value)
    63. }
    64. // 调用 Reduce 函数
    65. output := reducef(intermediate[i].Key, values)
    66. // 按格式写入输出文件
    67. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
    68. i = j
    69. }
    70. // 关闭输出文件
    71. ofile.Close()
    72. }
    73. // loadPlugin 从插件文件中加载 Map 和 Reduce 函数
    74. func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
    75. // 打开插件
    76. p, err := plugin.Open(filename)
    77. if err != nil {
    78. log.Fatalf("cannot load plugin %v", filename)
    79. }
    80. // 查找 Map 函数
    81. xmapf, err := p.Lookup("Map")
    82. if err != nil {
    83. log.Fatalf("cannot find Map in %v", filename)
    84. }
    85. mapf := xmapf.(func(string, string) []mr.KeyValue)
    86. // 查找 Reduce 函数
    87. xreducef, err := p.Lookup("Reduce")
    88. if err != nil {
    89. log.Fatalf("cannot find Reduce in %v", filename)
    90. }
    91. reducef := xreducef.(func(string, []string) string)
    92. // 返回 Map 和 Reduce 函数
    93. return mapf, reducef
    94. }

    🐟mrapps/wc.go

    Map 函数返回键值对切片,Reduce 函数将单词出现次数转化为字符串后返回

    1. // 定义包名为 main,这是一个插件,可以被 MapReduce 框架动态加载。
    2. package main
    3. // 导入 MapReduce 框架的包,用于实现 Map 和 Reduce 函数。
    4. import "6.824/mr"
    5. // 导入 unicode 包,用于判断字符是否为字母。
    6. import "unicode"
    7. // 导入 strings 包,用于字符串操作。
    8. import "strings"
    9. // 导入 strconv 包,用于字符串和基本数据类型之间的转换。
    10. import "strconv"
    11. // Map 函数是 MapReduce 框架中的第一个阶段,它将对输入文件的每一行调用一次。
    12. // 参数 filename 是输入文件的名称,contents 是文件的全部内容。
    13. // 这个函数返回一系列键值对,其中键是单词,值是 "1"。
    14. func Map(filename string, contents string) []mr.KeyValue {
    15. // FieldsFunc 函数将根据 ff 函数来分割字符串。
    16. // ff 函数是一个过滤函数,它返回 true 如果字符不是字母。
    17. ff := func(r rune) bool { return !unicode.IsLetter(r) }
    18. // 使用 FieldsFunc 函数根据 ff 函数分割 contents 字符串,得到单词数组。
    19. words := strings.FieldsFunc(contents, ff)
    20. // 初始化一个空的键值对切片,用于存储 Map 函数的输出。
    21. kv := []mr.KeyValue{}
    22. for _, w := range words {
    23. // 对于每个单词 w,创建一个键值对,键是单词本身,值是 "1"。
    24. kv = append(kv, mr.KeyValue{w, "1"})
    25. }
    26. // 返回包含所有单词和计数的键值对切片。
    27. return kv
    28. }
    29. // Reduce 函数是 MapReduce 框架中的第二个阶段,它对每个唯一的键调用一次。
    30. // 参数 key 是键,values 是所有映射任务为该键生成的值的列表。
    31. // 这个函数返回一个字符串,表示键出现的次数。
    32. func Reduce(key string, values []string) string {
    33. // 使用 strconv.Itoa 函数将 values 切片的长度(即 key 出现的次数)转换为字符串。
    34. return strconv.Itoa(len(values))
    35. }

    📕实验--准备

    🎂概念

    所谓“单机”,整个项目部署在一台机器上

    所谓“集群”,集群中的每一个节点就是一个单机,每个单机运行同一个的项目,通过设置“调度者”,用户请求先发送到“调度者”,再由“调度者”根据所有节点的负载情况,分配任务,即负载均衡

    从单机到集群,代码无需修改,只需多部署几台服务器

    所谓 “分布式”,类似流水线(只是将串行改成了并行),每条线负责不同的功能,最终将一个个小功能,整合成一个项目

    (也就是将原本部署在单机上的系统,拆分成一个个子系统,每个子系统都是独立的)

    这些子系统存在依赖关系,在网络中通过 rpc(remote procedure call) 通信

    🐋思路梳理

    wc.go 是一个实现了 Map 和 Reduce 函数的插件

    而 mrsequential.go 是 MapReduce 的顺序实现

    (可以理解为“单机”实现,一台机器,单个进程,顺序执行)

    我们要做的就是,将 mrsequential.go 拆分成 5 个文件,实现 MapReduce(词频统计) 的分布式部署 / 并行执行

    main(程序入口)

    • main/mrcoordinator.go  协调者初始化
    • main/mrworker.go  工作者初始化

    两个 main 文件不用修改,我们只需完成以下 3 个 mr/.... 文件即可 

    mr(具体实现)

    • mr/coordinator.go  实现协调者(监视 worker,分配任务,处理失败,重新分配)
    • mr/worker.go  实现工作者(请求任务,执行 Map,执行 Reduce,写入中间结果,写入最终结果)
    • mr/rpc.go  协调者 与 工作者 间的远程调用 (定义了通信接口和数据结构)

    🦖注意要点 

    •  修改 mr/ 下任何文件后,需要重新构建插件 wc.go,确保插件不依赖旧版本
    go build -race -buildmode=plugin ../mrapps/wc.go
    • 修改 mr/worker.go 中的 Worker() 函数,通过 RPC 请求 coordinator 分配任务
    • 中间文件命名 mr-X-Y(X 为 Map 任务编号,Y 为 Reduce 任务编号)
    • 使用 Go 的 encoding/json 包写入和读取 JSON 文件
    1. enc := json.NewEncoder(file) // json.Encoder实例,编码为 json 格式
    2. for _, kv := ... {
    3. err := enc.Encode(&kv)
    1. dec := json.NewDecoder(file) // json.Decoder 实例,解码为 json 格式
    2. for {
    3. var kv KeyValue
    4. if err := dec.Decode(&kv); err != nil {
    5. break
    6. }
    7. kva = append(kva, kv)
    8. }
    • 使用 mrapps/crash.go 插件测试崩溃恢复
    1. go build -race -buildmode=plugin crash.go // 编译插件文件
    2. go run -race mrcoordinator.go pg-*.txt // 根据输入文件,启动 MapReduce 作业
    3. go run -race mrworker.go crash.so // 运行 worker 进程,使用插件故意崩溃
    • 为防止崩溃时部分写入,用 ioutil.TempFile 创建临时文件,os.Rename 原子地重命名

    🐆初始代码--研读

    初始代码可以先抄一遍,理解一下,捋清楚思路后,再开始做 

    main/mrcoordinator.go

    创建协调者,通过命令行参数,传递输入文件给工作者,并在作业完成后退出程序

    1. // 程序入口点
    2. package main
    3. // 引入 MapReduce 包,包含协调者和工作者的实现
    4. import "6.824/mr"
    5. // time 包,暂停时间
    6. import "time"
    7. // 引入 os 包,读取命令行参数。
    8. import "os"
    9. // 格式化输入输出
    10. import "fmt"
    11. func main() {
    12. // 访问命令行参数,至少读取一个文件,第一个参数是程序名本身
    13. if len(os.Args) < 2 {
    14. // 如果参数数量小于2,打印到标准错误
    15. fmt.Fprintf(os.Stderr, "Usage: mrcoordinator inputfiles...\n")
    16. os.Exit(1)
    17. }
    18. // 创建协调者实例,除了程序名,剩下的参数作为输入文件传递
    19. m := mr.MakeCoordinator(os.Args[1:], 10) // 10 个工作者
    20. // 循环直到 MapReduce 作业完成
    21. for m.Done() == false { // m.Done() 检查 mr 作业是否完成
    22. // time.Sleep 暂停一秒
    23. time.Sleep(time.Second)
    24. }
    25. // 作业完成后,再等待一秒钟,可能是为了确保所有输出都已经写入
    26. time.Sleep(time.Second)
    27. }

    main/mrworker.go

    从命令行参数中,获取插件文件,并将插件文件中的 Map 和 Reduce 函数,转化为具体函数类型(便于后续调用)

    因为接口类型本身,不能直接被调用,需要转化为具体类型

    1. package main
    2. import (
    3. "6.824/mr"
    4. "plugin"
    5. "os"
    6. "fmt"
    7. "log"
    8. )
    9. // main 是程序的入口点,当程序启动时最先执行的函数
    10. func main() {
    11. // 参数1:程序名 参数2:插件文件路径
    12. if len(os.Args) != 2 {
    13. // 写入标准错误流
    14. fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
    15. // 终止程序,并返回状态码 1 表示错误
    16. os.Exit(1)
    17. }
    18. // 调用 loadPlugin 函数加载 Map 和 Reduce 函数
    19. // Map, Reduce 函数,都来自于插件文件
    20. // mapf 是 Map 函数,reducef 是 Reduce 函数
    21. mapf, reducef := loadPlugin(os.Args[1])
    22. // 调用 mr.Worker 启动 MapReduce 工作者进程,传入加载的 Map 和 Reduce 函数
    23. mr.Worker(mapf, reducef)
    24. }
    25. // loadPlugin 函数用于从插件文件中加载 Map 和 Reduce 函数
    26. // filename 插件文件的路径
    27. func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
    28. // 使用 plugin.Open 函数打开插件文件,返回插件对象实例 p 和可能发生的错误 err
    29. // p 包含 Map 和 Reduce 函数
    30. p, err := plugin.Open(filename)
    31. if err != nil {
    32. log.Fatalf("cannot load plugin: %v", filename)
    33. }
    34. // p.Lookup 方法查找插件中名为 "Map" 的导出符号,其实就是 Map 函数
    35. // xmapf 是一个 plugin.Symbol 类型的变量,用于存储从插件中查找到的 Map 函数符号
    36. // plugin.Symbol 是一个接口类型,代表插件中的任意导出符号
    37. xmapf, err := p.Lookup("Map")
    38. if err != nil {
    39. log.Fatalf("cannot find Map function in plugin: %v", filename)
    40. }
    41. // 类型断言用于确定 xmapf 中存储的具体函数类型
    42. // 并将其从 plugin.Symbol 接口类型断言回其静态的函数类型
    43. // xmapf.() 就是类型断言, 将 plugin.Symbol 转化为具体函数类型
    44. mapf := xmapf.(func(string, string) []mr.KeyValue)
    45. // 同上,查找并断言 Reduce 函数
    46. xreducef, err := p.Lookup("Reduce")
    47. if err != nil {
    48. log.Fatalf("cannot find Reduce function in plugin: %v", filename)
    49. }
    50. reducef := xreducef.(func(string, []string) string)
    51. // 返回加载并断言成功的 Map 和 Reduce 函数
    52. return mapf, reducef
    53. }

    mr/coordinator.go

    struct Coordinator:分配 MapReduce 任务到对应 worker

    Example():rpc 处理函数的例子

    server():启动 rpc 服务,监听来自 worker 的请求

    Done():检查 MapReduce 作业是否完成

    MakeCoordinator():创建并初始化 Coordinator 实例

    1. package mr
    2. import (
    3. "log"
    4. "net"
    5. "os"
    6. "net/rpc"
    7. "net/http"
    8. )
    9. // Coordinator 负责管理和分配任务
    10. type Coordinator struct {
    11. // Your definitions here.
    12. }
    13. // RPC handlers for the worker to call.
    14. // an example RPC handler.
    15. // the RPC argument and reply types are defined in rpc.go
    16. // rpc 调用的参数和返回值,在 rpc.go 中定义
    17. func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
    18. reply.Y = args.X + 1
    19. return nil // rpc 调用成功
    20. }
    21. // start a thread that listens for RPCs from worker.go
    22. func (c *Coordinator) server() {
    23. // 注册协调者实例,处理 RPC 调用
    24. rpc.Register(c)
    25. // 允许使用 HTTP 协议进行 RPC 通信
    26. rpc.HandleHTTP()
    27. // 协调者 socket 文件名
    28. sockname := coordinatorSock()
    29. // 监听前移除已存在的 socket 文件,避免监听失败
    30. os.Remove(sockname)
    31. // 监听 UNIX socket,准备接收来自 worker 的连接
    32. l, e := net.Listen("unix", sockname)
    33. if e != nil {
    34. log.Fatal("listen error:", e)
    35. }
    36. // 新的 goroutine 中启动 HTTP 服务,以处理 RPC 请求
    37. go http.Serve(l, nil)
    38. }
    39. // main/mrcoordinator.go 会定期调用 Done() 函数来检查整个作业是否已完成。
    40. func (c *Coordinator) Done() bool {
    41. ret := false
    42. // Your code here to implement the check for completion of all tasks
    43. // 在这里实现检查所有任务是否完成的逻辑,例如检查所有 Map 和 Reduce 任务的状态
    44. return ret // 作业是否完成
    45. }
    46. // create a Coordinator
    47. // main/mrcoordinator.go calls this function
    48. // nReduce is the number of reduce tasks to use.
    49. // The returned value is a pointer to the newly created Coordinator instance.
    50. func MakeCoordinator(files []string, nReduce int) *Coordinator {
    51. c := Coordinator{}
    52. // Your code here to initialize the Coordinator, e.g., load input files, setup tasks, etc
    53. // 启动 RPC 服务器线程,以便监听和处理来自 worker 的 RPC 请求
    54. c.server()
    55. // 返回指向新创建的协调者实例的指针,这样调用者就可以通过这个指针来访问和操作协调者实例
    56. return &c
    57. }

    mr/worker.go

    KeyValue 结构体

    ihash():返回 reduce 任务编号(用于发送 Map 输出的数据)

    Worker():调用插件中的 map() 和 reduce() 函数

    CallExample():展示 rpc 调用的完整流程,需要借助 call()

    call():建立 rpc 连接,再发送 rpc 请求

    1. // package mr - 定义了MapReduce作业的工作者包,包含实现MapReduce算法所需的结构和函数
    2. // import语句 - 日志记录、rpc远程过程调用、哈希计算
    3. package mr
    4. import (
    5. "fmt"
    6. "log"
    7. "net/rpc"
    8. "hash/fnv"
    9. )
    10. // 定义 MapReduce 中的键值对
    11. type KeyValue struct {
    12. Key string
    13. Value string
    14. }
    15. // 自定义的哈希函数,用于确定Map输出的键值对,应该发送到哪个Reduce任务
    16. // Map阶段输出的键分配到不同的Reduce任务
    17. func ihash(key string) int {
    18. h := fnv.New32a() // 创建FNV-1a哈希生成器
    19. // 字符串 key 转为 []byte 字节切片,因为 Wirte() 需要操作字节数据
    20. h.Write([]byte(key)) // 将键的字节序列写入哈希生成器
    21. // 使用按位与操作确保结果是一个非负整数,适合作为索引使用
    22. // 0x7fffffff 就是 0111 1111 ... 1111,符号位为正,其他不变
    23. return int(h.Sum32() & 0x7fffffff)
    24. }
    25. // Worker 函数 - 是MapReduce工作者的主要工作函数
    26. // 它调用用户提供的map和reduce函数
    27. // main/mrworker.go calls this function.
    28. // 传入的两个参数是 mapf() 和 reducef()
    29. func Worker(mapf func(string, string) []KeyValue,
    30. reducef func(string, []string) string) {
    31. // 工作者实现细节将在这里编写,包括从协调者接收任务和发送结果
    32. }
    33. // example function to show how to make an RPC call to the coordinator.
    34. func CallExample() {
    35. // {X: 99} 结构体字面量, X 初始化为 99
    36. args := ExampleArgs{X: 99} // rpc通信中传递的参数
    37. reply := ExampleReply{} // 用于存储响应的返回值
    38. // 发送RPC请求到协调者,等待回复
    39. // 服务名称.方法名称,rpc包会根据这个字符串,找到对应的服务和方法进行调用
    40. call("Coordinator.Example", &args, &reply)
    41. fmt.Printf("reply.Y %v\n", reply.Y)
    42. }
    43. // send an RPC request to the coordinator, wait for the response.
    44. func call(rpcname string, args interface{}, reply interface{}) bool {
    45. sockname := coordinatorSock() // 获取协调者socket名称
    46. c, err := rpc.DialHTTP("unix", sockname) // 建立RPC连接
    47. if err != nil {
    48. log.Fatal("dialing:", err)
    49. }
    50. defer c.Close()
    51. // Call 方法是 net/rpc 包中的 *rpc.Client 类型的一个实例方法
    52. err = c.Call(rpcname, args, reply) // 发送RPC请求
    53. if err == nil {
    54. return true
    55. }
    56. fmt.Println(err)
    57. return false
    58. }

    mr/rpc.go

    ExampleArgs 和 ExampleReply,表示 rpc 参数和 rpc 返回值两种类型

    coordinatorSock():为协调者生成 socket 文件名

    1. package mr
    2. // RPC definitions
    3. // remember to capitalize(大写) all names
    4. import "os" // 操作系统功能,获取用户ID
    5. import "strconv" // 字符串转换
    6. // example to show how to declare the arguments(参数)
    7. // and reply(返回值) for an RPC
    8. type ExampleArgs struct {
    9. X int
    10. }
    11. type ExampleReply struct {
    12. Y int
    13. }
    14. // Add your RPC definitions here
    15. // Cook up a unique-ish UNIX-domain socket name
    16. // in /var/tmp, for the coordinator
    17. // Can't use the current directory since
    18. // Athena AFS doesn't support UNIX-domain sockets.
    19. // 这里指定的是一个UNIX域socket的文件路径前缀,它位于/var/tmp目录下
    20. // 并且以"824-mr-"作为前缀,以确保socket文件名的唯一性
    21. // 用于获取协调者的socket文件名,以便建立RPC连接
    22. func coordinatorSock() string {
    23. // 定义UNIX域socket的基础路径,前缀为"/var/tmp/824-mr-"
    24. s := "/var/tmp/824-mr-"
    25. // 将当前用户的UID转换为字符串并追加到基础路径之后,创建一个唯一的socket文件名
    26. s += strconv.Itoa(os.Getuid())
    27. return s // 协调者监听的socket文件的路径
    28. }

    🦈实验--开始

    🐋伪代码

    mr/coordinator.go

    mr/worker.go

    .

    mr/rpc.go

    .

    🐎结果

  • 相关阅读:
    实验6、白盒测试:路径测试及测试用例设计
    CentOS7安装Oracle-19c
    Python 实验四 面向对象程序设计
    大数据培训课程MapTask工作机制
    计算模型参数量的方法
    nlp之加载电商评论集
    Yolo算法检测之Anchor Boxes原理详解
    “一篇长文教你进行全方位的使用appium“
    李沐动手学深度学习V2-使用Pytorch框架实现RNN循环神经网络
    【HTML】三种加载动画
  • 原文地址:https://blog.csdn.net/csdner250/article/details/140667936