• Go语言进阶,详解并发中的通道机制


    对于Go语言的并发,前面有几篇文章都有涉及到,如下:

    Go语言并发比较二叉树(Binary Tree)icon-default.png?t=M85Bhttps://blog.csdn.net/weixin_41896770/article/details/127569147
    Go语言进阶,闭包、指针、并发icon-default.png?t=M85Bhttps://blog.csdn.net/weixin_41896770/article/details/127547900但都只是一个示例,没有具体涉及到其中存在的一些问题,比如说很容易出现死锁等情况,这里重点讲解如何使用通道以及遇到的问题的处理方法。
    先看个简单的创建通道,然后发送内容的代码:

    1. package main
    2. import "fmt"
    3. func main() {
    4. ch := make(chan string)
    5. ch <- "你好,寅恪光潜"
    6. fmt.Println(<-ch)
    7. }

    看起来是没有问题,而且语法都是合规的,但是运行时会报错:

    fatal error: all goroutines are asleep - deadlock!

    这是比较常见的错误,也就是发生了死锁,信息显示goroutine没有启动,是睡眠的状态,当然这里没有使用到goroutine机制,那么换个角度来理解,这个通道虽然是申请了,由于是无缓冲通道,所以如果没有接收者的话,你发送到通道就会直接流出去(看成是水管),这样就没啥意义。

    缓冲通道

    那么可以定义为有长度的通道,这样的话就相当于创建一种有缓冲机制的通道,让通道可以暂存数据,如下:

    1. func main() {
    2. ch := make(chan string, 1)//这里使其成为有缓冲的通道
    3. ch <- "你好,寅恪光潜"
    4. fmt.Println(<-ch)
    5. }
    6. //你好,寅恪光潜

    goroutine机制

    使用goroutine机制,创建一个接收通道去接收发送通道发送过来的值,如下:

    1. func receiver(ch chan string) {
    2. r := <-ch
    3. fmt.Println("这里就接收到了发送来的值:", r)
    4. }
    5. func main() {
    6. ch := make(chan string)
    7. go receiver(ch)
    8. ch <- "你好,寅恪光潜"
    9. fmt.Println("发送成功")
    10. }
    11. //这里就接收到了发送来的值: 你好,寅恪光潜
    12. //发送成功

    如果是多个goroutine并发的话将如何处理?我们将内容先全部发送到通道,然后再读取出来:

    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. )
    6. func f(ch chan<- string, i int) {
    7. time.Sleep(time.Duration(i) * time.Second)
    8. s := fmt.Sprintf("寅恪光潜%d", i)
    9. ch <- s
    10. }
    11. func main() {
    12. ch := make(chan string, 10)
    13. fmt.Println(ch, len(ch))
    14. for i := 0; i < 4; i++ {
    15. go f(ch, i)
    16. }
    17. for ret := range ch {
    18. fmt.Println(ret)
    19. }
    20. }

    看起来好像是没有问题,启动goroutine发送内容到通道,然后遍历读取出通道的内容。不过在遍历完之后会报错:

    0xc000062180 0
    寅恪光潜0
    寅恪光潜1
    寅恪光潜2
    寅恪光潜3
    fatal error: all goroutines are asleep - deadlock!

    错误原因是,读取完之后通道是空的,然后再读取(接收)的时候就会发生阻塞死锁。

    sync.WaitGroup等待组

    这个时候我们使用sync.WaitGroup等待组,这个等待组的作用可以保证在并发环境中完成指定数量的任务。里面有几个方法的用法说明如下:
    Add(1) 计数器+1 ; Done() 计数器-1或者 Add(-1)也可以 ;Wait() 等待计数器一直减到0,比如上面是循环4次,4个任务,我们将修改为:

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. var wg sync.WaitGroup
    8. func f(ch chan<- string, i int) {
    9. time.Sleep(time.Duration(i) * time.Second)
    10. s := fmt.Sprintf("寅恪光潜%d", i)
    11. ch <- s
    12. defer wg.Done() //完成一个任务,计数器就减1
    13. }
    14. func main() {
    15. ch := make(chan string, 10)
    16. go func() {
    17. wg.Wait() //一直等待直到完成
    18. close(ch) //记得关闭通道
    19. }()
    20. for i := 0; i < 4; i++ {
    21. wg.Add(1) //每个任务开始,计数器就就加1
    22. go f(ch, i)
    23. }
    24. for ret := range ch {
    25. fmt.Println(ret)
    26. }
    27. }
    28. /*
    29. 寅恪光潜0
    30. 寅恪光潜1
    31. 寅恪光潜2
    32. 寅恪光潜3
    33. */

    select多路复用

    除了上面的等待组外,还有一种常用的方法,使用select来同步并发,select会一直等待,直到某个case的通信操作完成时,执行case分支对应的语句,这个看起来更简洁一点,如下:

    1. func f(ch chan<- string, i int) {
    2. time.Sleep(time.Duration(i) * time.Second)
    3. s := fmt.Sprintf("寅恪光潜%d", i)
    4. ch <- s
    5. }
    6. func main() {
    7. ch := make(chan string, 10)
    8. for i := 0; i < 4; i++ {
    9. go f(ch, i)
    10. }
    11. for {
    12. select {
    13. case info := <-ch:
    14. println(info)
    15. default:
    16. time.Sleep(time.Second)
    17. fmt.Println("无数据")
    18. }
    19. }
    20. }
    21. /*
    22. 无数据
    23. 寅恪光潜0
    24. 寅恪光潜1
    25. 无数据
    26. 无数据
    27. 寅恪光潜2
    28. 寅恪光潜3
    29. 无数据
    30. 无数据
    31. ...
    32. */

    然后会一直这样下去,这种情况是没有关闭通道,一直循环着,比如做聊天室这种一直监听的情况。

    单向通道

    上面介绍是双向通道,可以发送也可以接收,当然你也可以定义只接收或者只发送的单通道。

    定义接收通道
    type Receiver <-chan int
    定义发送通道
    type Sender chan<- int 

    对于这个符号在左边还是右边,我个人是这样去理解记忆,<-chan表示从通道流出来,那就是需要拿东西接着,就是接收通道;chan<-表示指向通道,那就是往通道发送内容,就是发送通道。

    1. var ch = make(chan int, 3) //带缓冲的通道
    2. var sender Sender = ch //发送通道
    3. var receiver Receiver = ch //接收通道 

    看个示例:

    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. )
    6. type Sender chan<- int //发送通道
    7. type Receiver <-chan int //接收通道
    8. func main() {
    9. var ch = make(chan int, 0) //无缓冲的通道
    10. var num int = 110
    11. go func() {
    12. var sender Sender = ch
    13. sender <- num //往发送通道发送数据
    14. fmt.Println("发送的数据:", num)
    15. }()
    16. go func() {
    17. var receiver Receiver = ch //接收通道里的数据
    18. fmt.Println("接收的数据:", <-receiver)
    19. }()
    20. //让main函数执行结束的时间延迟1秒
    21. time.Sleep(time.Second)
    22. }
    23. /*
    24. 接收的数据: 110
    25. 发送的数据: 110
    26. */
  • 相关阅读:
    Springboot RabbitMq源码解析之RabbitListener注解 (四)
    简化开发流程,消除重复任务:refine 帮您轻松搞定 | 开源日报 No.63
    全域智慧采摘无人机系统探索
    设计模式初版讲解
    Vue3中 子组件内v-model绑定props接收到参数时报错update:modelValue
    专注写作,快速上线:Cpolar+Inis帮助你在Ubuntu上建立博客网站
    【Python脚本进阶】2.1、端口扫描器(上):TCP全连接扫描
    mysql创建用户
    MyBatis和MyBatis-plus教程
    如何确保ChatGPT的文本生成对特定行业术语的正确使用?
  • 原文地址:https://blog.csdn.net/weixin_41896770/article/details/127748316