Golang 并发编程(三): Channel 详解
- 陈大剩
- 2025-01-23 22:15:15
- 108
Channel
Golang
中 channel
(通道,下文中说通道等同于 channel
)是一种数据类型,golang
所提倡的 “应该以通信作为手段来共享内存” 的最直接和重要的体现 channel
,golang
鼓励使用与众不同的方法来共享值,这个方法就是使用一个通道类型值在不同的 goroutine
之间传值。channel
提供了一种机制,它即可 同步两个并发执行的函数,又可以让这两个函数通过 相互传递特定类型的值来通信。
类型表示法
channel
和 切片 类型一样属于引用类型,一个泛化的 channel
类型申明应该是这样:
chan T
关键字 chan
代表了通道类型的关键字,T
代表该通道的元素类型。我们可以通过通道元素类型,限制通道传递元素值的类型,例如:
var intChan chan int
也可以申明一个别名类型通道,例如:
type intChan chan int
别名类型 intChan
代表类型为 int
的 channel
类型,初始化后就可以使用了。
上面的通道类型均为双向通道,意味着既可以发送值又可以接收值,其中还有两种特殊的 channel
:接收 channel
和 发送 channel
// chan<- T
var acceptIntChan chan<- int
chan<-
只能用于接收的 channel
的类型,我们只能向此 channel
发送元素值,而不能从它那里接收值。还有第二种发送 channel
的类型
// <-chan T
var sendIntChan <-chan int
这表示 channel
只能用于发送元素值, channel
不能接收值。
注意:不论接收还是发送操作符都是
<-
,不存在->
这种操作符。
值表示
因为通道类型是一个引用类型,所以一个通道类型的变量在初始化之前,值一定是 nil
,这也是此刻类型的零值。
于其他类型不同的是,通道类型的变量是用来传递值的,还不是存储值的。所以,通道类型并没有对应的值表示法。它的值具有即时性,是无法用字面量来准确表达的。
操作特性
通道类型在操作时,是在多个 goroutine
之间传递数据和同步的重要手段,通道本身是同步的,也就是说在同一个时刻,仅有一个 goroutine
能向通道发送元素值,同时也仅有一个 goroutine
能从它那接受到元素值。
通道中,各个元素都是严格按照发送到此的先后顺序排列的,最早发送的最先被接收,因此,通道相当于一个 消息队列(先进先出 FIFO),此外通道具有原子性,是不可分割的,通道的每一个元素只可能被一个 goroutine
接收,已被接收的元素值会立刻从通道中删除。
初始化通道
在 golang
中,引用类型的值都需要使用内建函数 make 来初始化,通道也不例外:
ch := make(chan int, 10)
上面表达式初始化了一个通道类型的值,传递给 make
函数的第一个参数表明,此值的具体操作类型是元素为 int
的通道类型,第二个参数表示在 同一时刻最多可以缓冲 5 个元素值。
初始化一个通道时,也可以省略第二个参数的值,例如:
ch := make(chan int)
如果第二个参数被省略了,就表示被初始化的这个通道无法缓冲任何元素值。发送给它的元素应该立刻被取走,否则发送的 goroutine
会被暂停,直到有接收方接收这个元素值。
我们把初始化第二个参数值 大于 0 的通道称为缓冲通道,而把初始化 未给定第二个参数,或给定值等于 0 的通道为非缓冲通道。
提示:
我们试着打印一下初始化的通道(
fmt.println(make(chan int))
),看看得到什么?不出意外得到是:
0xc000018150
,这是因为输出的是它的地址,因为通道是一个引用类型。那为什么同样的打印引用类型(切片、map),返回却不是地址呢?
因为通道的设计是使在
goroutine
之间安全地传递数据,而无法直接查看通道的内部状态,确保了数据的封装性和安全性。要查看通道中的数据,必须通过发送和接收操作
接收元素值
前面介绍了 <-
符号,这个符号不仅可以作为通道类型声明,也可以用于通道操作(发送或接收元素值)。第一种写法例如:
ch := make(chan int, 3)
elem := <-ch
make 函数调用后,返回一个已被初始化的缓存为 3 通道作为结果,所以 ch
变为了一个双向通道。第二行 <-
含义为把通道中的第一个元素值赋值给 elem
。不过,此时这类操作会使当前 goroutine
被阻塞在这,因为通道 ch
中没有任何元素,只有有元素值才会唤醒。
第二种这样也可以
elem, ok := <-ch
和前面一样使当前 goroutine
被阻塞,如果接收操作操作之前或过程中关闭此通道,那么操作会立马结束,并且 elem
会赋值为该通道元素类型的零值。
由于相应类型的零值也可以发送至通道中,所以当接收值为这样一个元素值(相应类型的零值)的时候,无法判断它是通道缓存中的一个元素值,还是用来表示已经关闭的标识。这时第二种编写方法就显示出来了,ok
一定是布尔类型的值,接收操作因通道关闭而结束时,该值会为 false
(代表操作失败),这样就可以判断。
注意:从一个未被初始化的通道那里接收值,会造成当前
goroutine
的永久阻塞。
Happens before
从通道中接收到元素的值,必须向它发送元素的值。一个元素值在被接收方从通道中取出来之前,必须先存在于该通道内。正式的讲,对于一个缓存通道,有如下规则:
- 发送操作会使通道复制被发送的元素。也就说向通道中发送元素值时会复制元素的值到某个地址,那么有两种情况,第一种:当前 **通道为空且有接收方在等待元素值 **时,它会将 值复制到最早等待接收方持有通道的内存地址,否则第二种 将发送元素值复制到通道持有的内存地址中;
- 接收操作会使通道给出一个已发给它的元素值的副本。若因通道的缓存空间已为空而无立马给出时,会阻塞进行接收的
goroutine
; - 对于同一个元素值来说,把它发送给某个通道的操作,一定会在从该通道中接收它的操作之前完成;
发送元素值
发送语句由通道表达式、接收操作符(<-
)和 代表元素值的表达式 组成。发送元素的值必须是可以赋给通道表达式中的值。
对于 <-
两边表达式求值会优先于发送操作执行,在对这两个表达式求值完成之前,发送操作一定会阻塞。
strChan := make(chan string, 3)
strChan <- "a"
strChan <- "b"
strChan <- "c"
发送类似于上例,现在 strChan
中已经缓存 3 个元素的值,这个已经是他所能够容纳的最大量了(依照对 strChan
的初始化可知),之后再发送则会被阻塞。
例1:普通类型值传入通道
已下具体使用发送值的例子,其中 make(chan struct{}, 1)
充当信号量,因为 struct{}
不占用任何内存空间,并且所有该类型都使用同一个内存地址。
strChan := make(chan string, 5)
syncChan1 := make(chan struct{}, 1)
syncChan2 := make(chan struct{}, 2)
go func() {
<-syncChan1
fmt.Println("go1 睡眠 1s,让下面的 go2 处于阻塞状态,只至 strChan 消费一个值后唤醒 go2")
time.Sleep(time.Second)
for {
if elem, ok := <-strChan; ok {
fmt.Println(elem)
} else {
break
}
}
syncChan2 <- struct{}{}
}()
go func() {
for _, elem := range []string{"a", "b", "c", "d"} {
strChan <- elem
if elem == "c" {
syncChan1 <- struct{}{}
fmt.Println("syncChan1 发送信号")
}
}
fmt.Println("go2 睡眠 2s,防止接收方还没有全部接收完通道就关闭了")
time.Sleep(2 * time.Second)
close(strChan)
fmt.Println("关闭了 strChan 通道")
syncChan2 <- struct{}{}
}()
<-syncChan2
<-syncChan2
close(syncChan1)
close(syncChan2)
上述代码会输出:
syncChan1 发送信号
go1 睡眠 1s,让下面的 go2 处于阻塞状态,只至 strChan 消费一个值后唤醒 go2
go2 睡眠 2s,防止接收方还没有全部接收完通道就关闭了
a
b
c
d
关闭了 strChan 通道
注意:发送方向通道发送的值会被复制,接收方接收的总是该值的副本,而不是该本身。
例2:引用类型值传入通道
syncChan := make(chan struct{}, 2)
mpCh := make(chan map[string]int, 1)
go func() {
for {
if elem, ok := <-mpCh; ok {
elem["count"]++
} else {
break
}
}
syncChan <- struct{}{}
}()
go func() {
countMap := make(map[string]int)
for i := 0; i < 5; i++ {
mpCh <- countMap
time.Sleep(time.Millisecond)
fmt.Printf("这个值的值是: %v \n", countMap)
}
close(mpCh)
syncChan <- struct{}{}
}()
<-syncChan
<-syncChan
上述代码中输出如下:
这个值的值是: map[count:1]
这个值的值是: map[count:2]
这个值的值是: map[count:3]
这个值的值是: map[count:4]
这个值的值是: map[count:5]
mpChan
的元素类型属于引用类型,所以接收元素的副本修改会影响到发送方持有的源值。
例3:结构体类型值传入通道
这里引入两种代码值来观察是否
type Counter struct {
count int
}
mapChan := make(chan map[string]Counter, 1)
syncChan := make(chan struct{}, 2)
go func() {
for {
if elem, ok := <-mapChan; ok {
counter := elem["count"]
counter.count++
} else {
break
}
}
fmt.Println("结束了接收")
syncChan <- struct{}{}
}()
go func() {
countMap := map[string]Counter{
"count": Counter{},
}
for i := 0; i < 5; i++ {
mapChan <- countMap
time.Sleep(time.Millisecond) // 睡眠一微秒使得接收者有时间消费
fmt.Printf("这个 map 总数为:%v \n", countMap)
}
close(mapChan)
syncChan <- struct{}{}
}()
<-syncChan
<-syncChan
再换成另一种代码:
package main
import (
"fmt"
"time"
)
type Counter struct {
count int
}
func main() {
mapChan := make(chan map[string]*Counter, 1)
syncChan := make(chan struct{}, 2)
go func() {
for {
if elem, ok := <-mapChan; ok {
counter := elem["count"]
counter.count++
} else {
break
}
}
fmt.Println("结束了接收")
syncChan <- struct{}{}
}()
go func() {
countMap := map[string]*Counter{
"count": &Counter{},
}
for i := 0; i < 5; i++ {
mapChan <- countMap
time.Sleep(time.Millisecond) // 睡眠一微秒使得接收者有时间消费
//fmt.Printf("这个 map 总数为:%v \n", countMap)
fmt.Println(countMap["count"].String())
}
close(mapChan)
syncChan <- struct{}{}
}()
<-syncChan
<-syncChan
}
func (counter *Counter) String() string {
return fmt.Sprintf("count:%d ", counter.count)
}
如果不出意外的话,第一段代码打印的全是 0
,第二段代码打印的是 1,2,3,4,5
,其中最重要的问题是 countMap
中存储的是指向同一个 Counter
实例的指针,因此每次发送 countMap
到通道时,接收者操作的是同一个 Counter
实例。
关闭 channel
调用 close
函数可以关闭通道,不过需要注意的是关闭后不允许发送元素值,这会让运行恐慌。建议在发送端进行关闭通道,关闭通道后对接收端接收数据一般没有什么影响,如果通道关闭后缓存中任然有值,可以通过表达式第二个结果取出。
strChan := make(chan string, 5)
syncChan1 := make(chan struct{}, 1)
syncChan2 := make(chan struct{}, 2)
go func() {
<-syncChan1
for {
if elem, ok := <-strChan; ok {
fmt.Println(elem)
} else {
break
}
}
syncChan2 <- struct{}{}
}()
go func() {
for _, elem := range []string{"a", "b", "c", "d"} {
strChan <- elem
}
close(strChan)
syncChan1 <- struct{}{}
syncChan2 <- struct{}{}
}()
<-syncChan2
<-syncChan2
上述代码演示的关闭通道后任然可以向通道中取值,请注意用第二个参数判断,因为关闭通道时会发送一个目标类型的零值。
关闭通道需注意的点:
- 同一个通道只允许关闭一次,第二次关闭会引发恐慌;
- 在关闭通道时,必须将所有正在发送通道的数据发送完成(需关闭正在等待阻塞的
goroutine
),才能进行关闭,否则关闭会引发恐慌; - 关闭通道时,该通道必须初始化;
长度与容量
通道中长度和容量可使用内建函数 len
和 cap
获取,len
获取当前通道中元素数量(长度),cap
获取当前通道中可容纳元素值的最大数量(容量)。容量是初始化时已经确定的,并且之后不能改变,通道长度则会随着实际情况变化。
提示:可通过容量来判断是否通道带缓存。
单向 channel
单向通道分为:发送通道 和 接收通道。有意思的是通道的命名和操作相反,比如 发送通道,发送通道是只该通道能够接收其他类型的值进通道的通道类型,称为发送通道(简单来说能够进行接收操作的通道叫做发送通道),接收通道则相反。
需要注意的是单向通道一般不会出现在变量申明中,比如:
ch := make(chan<- int, 1)
明显,一个只进不出的通道没有任何意义,那么单向通道的应用场景在哪呢?
func main() {
strChan := make(chan string, 4)
syncChan1 := make(chan struct{}, 1)
syncChan2 := make(chan struct{}, 2)
go receive(strChan, syncChan1, syncChan2)
go send(strChan, syncChan1, syncChan2)
<-syncChan2
<-syncChan2
}
func receive(strChan <-chan string, syncChan1 <-chan struct{}, syncChan2 chan<- struct{}) {
<-syncChan1
for {
if elem, ok := <-strChan; ok {
fmt.Println(elem)
} else {
break
}
}
syncChan2 <- struct{}{}
}
func send(strChan chan<- string, syncChan1 chan<- struct{}, syncChan2 chan<- struct{}) {
for _, elem := range []string{"a", "b", "c", "d"} {
strChan <- elem
if elem == "c" {
syncChan1 <- struct{}{}
fmt.Println("syncChan1 发送信号")
}
}
close(strChan)
syncChan2 <- struct{}{}
}
根据上面例子可知,单向通道往往是由双向通道转换而来的,在函数或方法处加以强类型限制,利用函数声明将双向通道转化为单向通道的做法,只能算是 Golang
的语法糖罢了。
注意:单向通道可由双向通道转化而来,但是单向通道不可能转化成双向通道。其二,单向通道只能接收通道能 关闭当前通道,发送通道关闭会造成恐慌。
for 语句与 channel
for
使用 range
子句可以持续地从一个通道中接收元素值,例如:
var ch chan int
for elem := range ch {
fmt.Println(elem)
}
完整例子如下:
func main() {
strChan := make(chan string, 5)
syncChan1 := make(chan struct{}, 1)
syncChan2 := make(chan struct{}, 2)
go receive(strChan, syncChan1, syncChan2)
go send(strChan, syncChan1, syncChan2)
<-syncChan2
<-syncChan2
}
func receive(strChan <-chan string, syncChan1 <-chan struct{}, syncChan2 chan<- struct{}) {
<-syncChan1
for elem := range strChan {
fmt.Println(elem)
}
syncChan2 <- struct{}{}
}
func send(strChan chan<- string, syncChan1 chan<- struct{}, syncChan2 chan<- struct{}) {
for _, elem := range []string{"a", "b", "c", "d"} {
strChan <- elem
if elem == "c" {
syncChan1 <- struct{}{}
fmt.Println("syncChan1 发送信号")
}
}
close(strChan)
syncChan2 <- struct{}{}
}
select 语句与 channel
select
语句可以用于处理多个通道操作。它本身 并不会循环,而是会在每次执行时 随机选择 一个 可用 的通道操作。select
语句结构如下:
ch1 := make(chan int, 3)
ch2 := make(chan int, 3)
close(ch1)
select {
case elem := <-ch1:
fmt.Printf("ch1 通道值%v", elem)
case elem := <-ch2:
fmt.Printf("ch2 通道值%v", elem)
default:
fmt.Println("没有通道数据")
}
其中每个分支以关键字 case
开始,后面只能发送语句或接收语句,大部分情况下需包含一个 default case
,如果所以普通 case
都不满足选择条件时, default case
就会被选中。
注意:
如果在只有一个
goroutine
且在select
没有包含default case
,就会发生死锁。在接收操作中如果只赋值给了一个迭代变量,关闭通道时会出现通道类型值的零值,请使用
case elem, ok := <-ch1:
判断。
与 for 语句连用
因为 select
语句本身并不会循环,如果需要多次在多个通道中取值,需要和 for 语句配合使用:
ch1 := make(chan string, 6)
for i := 0; i < 5; i++ {
ch1 <- fmt.Sprintf("来自 ch1: %d", i)
}
close(ch1)
syncChan := make(chan struct{}, 1)
// 使用 select 处理多个通道
go func() {
Loop:
for {
select {
case msg1, ok := <-ch1:
if !ok {
fmt.Println("结束了 for Loop 循环")
break Loop
}
fmt.Printf("%s \n", msg1)
default:
fmt.Printf("走到了默认逻辑")
}
}
syncChan <- struct{}{}
}()
<-syncChan
fmt.Println("所有消息已接收,结束程序。")
其中 break Loop
是一条带标签的 break
语句,Loop
为标签的名字,意为中断紧贴于该标签之下的那条语句的执行,只有这样才能正确的结束外层 for
循环。
上述代码输出结果如下:
来自 ch1: 0
来自 ch1: 1
来自 ch1: 2
来自 ch1: 3
来自 ch1: 4
结束了 for Loop 循环
所有消息已接收,结束程序。
非缓冲的 channel
一个通道初始化时其容量设为 0 或直接忽略对容量的设置,就会使通道成为一个非缓冲通道。相比于缓冲通道非缓冲通道只能同步的传递元素值。并且有如下特点:
- 向此类通道发送元素值的操作会被阻塞,直到至少有一个针对该通道的接收操作进行为止。
- 从此类通道接收元素值的操作会被阻塞,直到至少有一个针对该通道的发送操作进行为止。
done := make(chan struct{})
go func() {
fmt.Println("工作完成")
done <- struct{}{}
}()
<-done // 等待信号
fmt.Println("主 goroutine 继续执行")
可以通过 cap
函数判断一个通道是否带缓冲。如果想异步执行发送操作,但通道却是非缓冲的,那么请另行异步化,比如:启用额外的 goroutine
执行此操作。一般在执行接收操作时通常无需关心通道是否带有缓冲,也可以根据容量实施不同的接收策略。
最后几个常见面试题
1. 并发打印 100 随机值的平方,要求接收者可无限接收
func main() {
ch := make(chan int, 5)
syncChan := make(chan struct{}, 2)
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch) // 通道的关闭操作要在发送端操作
syncChan <- struct{}{}
}()
go func() {
for elem := range ch {
fmt.Println(elem * elem)
}
syncChan <- struct{}{}
}()
<-syncChan
<-syncChan
}
2. 实现一个互斥锁
lock := make(chan struct{}, 1)
lock <- struct{}{}
go func() {
<-lock
fmt.Println("访问共享资源")
lock <- struct{}{}
}()
go func() {
<-lock
fmt.Println("访问共享资源")
lock <- struct{}{}
}()
time.Sleep(time.Second * 1) // 大家可以思考一下,为什么一定要睡眠?
<-lock