• Golang 并发 Channel的用法


    Golang 并发 Channel的用法

    1. channel 的创建

    ch := make(chan int)
    
    • 1

    上面是创建了无缓冲的 channel,一旦有 goroutine 往 channel 发送数据,那么当前的 goroutine 会被阻塞住,直到有其他的 goroutine 消费了 channel 里的数据,才能继续运行。

    ch := make(chan int, 2)
    
    • 1

    上面示例中的第二个参数表示 channel 可缓冲数据的容量。只要当前 channel 里的元素总数不大于这个可缓冲容量,则当前的 goroutine 就不会被阻塞住。

    2. nil channel

    nil是pointers, interfaces, maps, slices, channels 和 function 类型的零值,表示未初始化值。nil不是未定义状态,它本身就是值。error是接口类型,因此error变量可以为nil,但string不能为nil。

    下面我们看下nil 通道有什么特点,空通道对操作的反应如下:

    • 从空通道读、写会永远阻塞
    • 关闭通道会终止程序(panic)

    空通道是一种特殊通道,总是阻塞。对比非空已关闭的通道仍然可以进行读取,并能够读取对应类型的零值,但对于已关闭的通道发送信息会终止程序。

    一般 nil channel 用在 select 上,让 select 不再从这个 channel 里读取数据

    读写阻塞示例

    示例如下:

    func TestNil(t *testing.T) {
    	c := make(chan int)
    
    	go sendIntegers(c)
    	addIntegers(c)
    }
    
    func addIntegers(c chan int) {
    	sum := 0
    	t := time.NewTimer(time.Second * 5)
    	for {
    		select {
    		case input := <-c:
    			sum = sum + input
    			fmt.Println("addIntegers , input : " + strconv.Itoa(input) + " , sum : " + strconv.Itoa(sum))
    		case <-t.C:
    			c = nil
    			fmt.Println("addIntegers , nil channel , sum : " + strconv.Itoa(sum))
    		}
    	}
    }
    
    func sendIntegers(c chan int) {
    	for {
    		time.Sleep(time.Second * 1)
    		c <- rand.Intn(100)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    输出如下

    === RUN   TestNil
    addIntegers , input : 81 , sum : 81
    addIntegers , input : 87 , sum : 168
    addIntegers , input : 47 , sum : 215
    addIntegers , input : 59 , sum : 274
    addIntegers , nil channel , sum : 274
    panic: test timed out after 30s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    此示例会一直阻塞下去,addIntegers是程序的主协程会一直阻塞下去,sendIntegers是子协程同样会一直阻塞下去。

    其中:输出中的panic是单元测试的Test引发的异常,不需要考虑在内。

    close示例

    func TestCloseNil(t *testing.T) {
    	c := make(chan int)
    	go writeChannel(c)
    	num := <-c
    	fmt.Println("main goroutine , read num : " + strconv.Itoa(num))
    	c = nil
    	fmt.Println("main goroutine , to close channel .")
    	close(c)
    	time.Sleep(time.Second * 10)
    
    }
    
    func writeChannel(c chan int) {
    	fmt.Println("writeChannel goroutine ,  running ...")
    	c <- 1
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    输出如下

    === RUN   TestCloseNil
    writeChannel goroutine ,  running ...
    main goroutine , read num : 1
    main goroutine , to close channel .
    --- FAIL: TestCloseNil (0.00s)
    panic: close of nil channel [recovered]
    	panic: close of nil channel
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    关闭nil通道会引起程序panic

    3. channel 的读写

    写操作

    ch := make(chan int)
    ch <- 1
    
    • 1
    • 2

    读操作

    data <- ch
    
    • 1

    当我们不再使用 channel 的时候,可以对其进行关闭:

     close(ch)
    
    • 1

    如果 channel 关闭,继续读取关闭后的 channel,不会产生 pannic,还是可以读到数据,将得到零值,即对应类型的默认值。

    为了能知道当前 channel 是否被关闭,可以使用下面的写法来判断。

    • 当channel关闭时,ok = false
    • 当channel未关闭时,ok = true
     if v, ok := <-ch; !ok {
      fmt.Println("channel 已关闭,读取不到数据")
     }
    
    • 1
    • 2
    • 3

    另一种写法可以使用 for-range ,使用下面的写法不断的获取 channel 里的数据,直到channel关闭,跳出循环,执行后面的代码。

     for data := range ch {
      // get data dosomething
     }
    
    • 1
    • 2
    • 3

    4. channel 只读只写

    在默认情况下,管道是双向的,可读可写,在使用 channel 时我们还可以控制 channel 只读只写操作:

    声明为只写,如下:

    var chan2 chan<- int
    chan2 = make(chan int, 3)
    chan2 <- 20
    
    • 1
    • 2
    • 3

    如果试着读此chan,则编译报错,编译错误如下:

    invalid operation: cannot receive from send-only channel chan2 (variable of type chan<- int) compiler (InvalidReceive)
    
    • 1

    声明为只读,不可写,否则编译报错,如下:

    var chan3 <-chan int
    nm2 := <-chan3
    
    • 1
    • 2

    函数可以声明chan只读只写,代码示例:

    // 只写操作
    func send(ch chan<- int, exitChan chan struct{}) {
    	for i := 0; i < 5; i++ {
    		time.Sleep(time.Second * 1)
    		ch <- i
    	}
    	close(ch)
    	var a struct{}
    	exitChan <- a
    }
    
    // 只读操作
    func recv(ch <-chan int, exitChan chan struct{}) {
    	for {
    		v, ok := <-ch
    		if !ok {
    			break
    		}
    		fmt.Println("recv goroutine , value : " + strconv.Itoa(v))
    	}
    	var a struct{}
    	exitChan <- a
    }
    func TestOnlyReadWrite(t *testing.T) {
    	ch := make(chan int, 10)
    	exitChan := make(chan struct{}, 2)
    	go send(ch, exitChan)
    	go recv(ch, exitChan)
    	var total = 0
    	for _ = range exitChan {
    		total++
    		if total == 2 {
    			break
    		}
    	}
    	fmt.Println("main goroutine , 结束")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    输出如下:

    === RUN   TestOnlyReadWrite
    recv goroutine , value : 0
    recv goroutine , value : 1
    recv goroutine , value : 2
    recv goroutine , value : 3
    recv goroutine , value : 4
    main goroutine , 结束
    --- PASS: TestOnlyReadWrite (5.03s)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5. 关闭channel

    channel关闭后,剩余的数据能否取到

    golang channel关闭后,其中剩余的数据,是可以继续读取的,channel关闭之后,仍然可以从channel中读取剩余的数据,直到数据全部读取完成。

    对于关闭的channel的读写需要注意两点:

    • 如果继续向channel发送数据,会引起panic,
    • 如果继续读数据,得到的是零值(对于int,就是0)。

    读取关闭的channel,将获取零值

    当读取已关闭的channel时,如果继续读取channel,获取到的是零值,不会堵塞,

    另外即使是无缓冲的channel,也将能一直获取到零值。

    代码示例如下

    func TestCloseDemo01(t *testing.T) {
    
    	done := make(chan struct{})
    	ch := make(chan int, 3)
    	ch <- 1
    	ch <- 2
    	ch <- 3
    	close(ch)
    
    	go func() {
    		for {
    			value := <-ch
    			//此处为假设判断,value永远不会等于10
    			if value == 10 {
    				break
    			}
    			fmt.Println("read channel , value : ", value)
    			time.Sleep(time.Second * 1)
    		}
    		done <- struct{}{}
    	}()
    
    	select {
    	case <-done:
    		fmt.Println("读取channel,正常结束")
    	case <-time.After(time.Second * 5):
    		fmt.Println("超时退出")
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    输出如下:

    === RUN   TestCloseDemo01
    read channel , value :  1
    read channel , value :  2
    read channel , value :  3
    read channel , value :  0
    read channel , value :  0
    超时退出
    --- PASS: TestCloseDemo01 (5.00s)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用ok判断,是否关闭

    读取channel,判断是否关闭:

    value, ok := <-ch
    
    • 1
    • 当channel关闭时,ok=false
    • 当channel未关闭时,ok=true

    通过判断channel是否关闭,当channel关闭时,程序可以正常退出,代码示例如下:

    func TestCloseDemo02(t *testing.T) {
    
    	done := make(chan struct{})
    	ch := make(chan int, 3)
    	ch <- 1
    	ch <- 2
    	ch <- 3
    	close(ch)
    
    	go func() {
    		for {
    			value, ok := <-ch
    			if !ok {
    				break
    			}
    			fmt.Println("read channel , value : ", value)
    			time.Sleep(time.Second * 1)
    		}
    		done <- struct{}{}
    	}()
    
    	select {
    	case <-done:
    		fmt.Println("读取channel,正常结束")
    	case <-time.After(time.Second * 5):
    		fmt.Println("超时退出")
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    输出如下:

    === RUN   TestCloseDemo02
    read channel , value :  1
    read channel , value :  2
    read channel , value :  3
    读取channel,正常结束
    --- PASS: TestCloseDemo02 (3.03s)
    PASS
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    使用for-range退出

    for-range是使用频率很高的结构,常用它来遍历数据,range能够感知channel的关闭,当channel被发送数据的协程关闭时,range就会结束,接着退出for循环。

    它在并发中的使用场景是:当协程只从1个channel读取数据,然后进行处理,处理后协程退出。

    下面这个示例程序,当通道被关闭时,协程可自动退出。

    func TestCloseDemo02(t *testing.T) {
    	ch := make(chan int, 3)
    	ch <- 1
    	ch <- 2
    	ch <- 3
    	close(ch)
    	for v := range ch {
    		fmt.Println("value", v)
    	}
    	time.Sleep(time.Second * 10)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    使用close(ch)关闭所有下游协程

    • 关闭通道,可以主动通知所有协程退出的场景

    当启动100个worker时,只要main()执行关闭stopCh,每一个worker都会都到信号,进而关闭。如果main()向stopCh发送100个数据,这种就低效了。

    //close关闭所有子协程
    func TestCloseDemo04(t *testing.T) {
    
    	ch := make(chan int, 3)
    	stopCh := make(chan struct{})
    
    	for i := 1; i < 6; i++ {
    		worker("worker"+strconv.Itoa(i), stopCh, ch)
    	}
    
    	time.Sleep(time.Second * 5)
    	close(stopCh)
    
    	time.Sleep(time.Second * 5)
    }
    
    
    func worker(workerName string, stopCh <-chan struct{}, ch <-chan int) {
    	go func() {
    		defer fmt.Println(workerName, "goroutine , worker exit")
    		// Using stop channel explicit exit
    		for {
    			select {
    			case <-stopCh:
    				fmt.Println(workerName, "goroutine , Recv stop signal , return")
    				return
    			default:
    				fmt.Println(workerName, "goroutine , worker default ...")
    			}
    			time.Sleep(time.Second * 3)
    		}
    	}()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    输出如下

    === RUN   TestCloseDemo04
    worker5 goroutine , worker default ...
    worker3 goroutine , worker default ...
    worker4 goroutine , worker default ...
    worker1 goroutine , worker default ...
    worker2 goroutine , worker default ...
    worker3 goroutine , worker default ...
    worker2 goroutine , worker default ...
    worker5 goroutine , worker default ...
    worker4 goroutine , worker default ...
    worker1 goroutine , worker default ...
    worker4 goroutine , Recv stop signal , return
    worker4 goroutine , worker exit
    worker2 goroutine , Recv stop signal , return
    worker2 goroutine , worker exit
    worker5 goroutine , Recv stop signal , return
    worker5 goroutine , worker exit
    worker1 goroutine , Recv stop signal , return
    worker1 goroutine , worker exit
    worker3 goroutine , Recv stop signal , return
    worker3 goroutine , worker exit
    --- PASS: TestCloseDemo04 (10.01s)
    PASS
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    6. 当函数传递channel时,是传递的引用,还是值

    golang 传递给函数chan类型时,是值传递和引用传递?

    • golang默认都是采用值传递,即拷贝传递
    • 有些值天生就是指针(slice、map、channel)

    可以看出来map和slice都是指针传递,即函数内部是可以改变参数的值的。而array是数组传递,不管函数内部如何改变参数,都是改变的拷贝值,并未对原值进行处理。

    在 Go 语言中,所有的函数参数传递都是值传递(pass by value),当将参数传递给函数时,实际上是将参数的副本传递给函数。然而,这并不意味着在函数内部对参数的修改都不会影响原始数据。因为在 Go 中,有些数据类型本身就是引用类型,比如切片(slice)、映射(map)、通道(channel)、接口(interface)和指针(pointer)。当这些类型作为参数传递给函数时,虽然传递的是值,但值本身就是一个引用。

    小结
    Go 语言中的参数传递总是值传递,意味着传递的总是变量的副本,无论是基本数据类型还是复合数据类型。由于复合数据类型(如切片、映射、通道、接口和指针)内部包含的是对数据的引用,所以在函数内部对这些参数的修改可能会影响到原始数据。理解这一点对于编写正确和高效的Go代码至关重要。

    另外即使是引用类型,比如切片,当长度或容量(比如使用 append 函数)发生变化了,可能会导致分配新的底层数组。这种情况下,原始切片不会指向新的数组,但是函数内部的切片会。因此,如果想在函数内部修改切片的长度或容量并反映到外部,应该传递一个指向切片的指针。

    参考

    • https://www.cnblogs.com/-wenli/p/12350181.html
    • https://segmentfault.com/a/1190000017958702
    • https://zhuanlan.zhihu.com/p/395278270
    • https://zhuanlan.zhihu.com/p/613771870
    • Go里面如何实现广播 https://juejin.cn/post/6844903857395335182
  • 相关阅读:
    Linux信号详解
    2022 CCF BDCI 小样本数据分类任务 baseline
    Hadoop集群搭建之Hadoop组件安装
    Python 添加记录到有自增长ID的表,如何获取新产生的ID
    <MySQL> 什么是数据库事务?事务该如何使用?
    Linux 业务突然宕机、系统卡死、磁盘空间爆满,该怎么查?
    使用flutter的Scaffold脚手架开发一个最简单的带tabbar的app模板
    vue3+vite引入图片不能再用require,要使用new Url(完整方法步骤)
    Redis入门:Redis持久化策略RDB&AOF简介
    http 请求 Cros 跨域问题记录(转)
  • 原文地址:https://blog.csdn.net/qq_26857259/article/details/136146698