陈大剩博客

Golang 并发编程(三): Channel 详解

  • 陈大剩
  • 2025-01-23 22:15:15
  • 108

golang 标题

Channel

Golangchannel (通道,下文中说通道等同于 channel)是一种数据类型,golang 所提倡的 “应该以通信作为手段来共享内存” 的最直接和重要的体现 channelgolang 鼓励使用与众不同的方法来共享值,这个方法就是使用一个通道类型值在不同的 goroutine 之间传值。channel 提供了一种机制,它即可 同步两个并发执行的函数,又可以让这两个函数通过 相互传递特定类型的值来通信

类型表示法

channel切片 类型一样属于引用类型,一个泛化的 channel 类型申明应该是这样:

chan T

关键字 chan 代表了通道类型的关键字,T 代表该通道的元素类型。我们可以通过通道元素类型,限制通道传递元素值的类型,例如:

var intChan chan int

也可以申明一个别名类型通道,例如:

type intChan chan int

别名类型 intChan 代表类型为 intchannel 类型,初始化后就可以使用了。

上面的通道类型均为双向通道,意味着既可以发送值又可以接收值,其中还有两种特殊的 channel :接收 channel 和 发送 channel

// chan<- T 
var acceptIntChan chan<- int

chan<- 只能用于接收的 channel 的类型,我们只能向此 channel 发送元素值,而不能从它那里接收值。还有第二种发送 channel 的类型

// <-chan T 
var sendIntChan <-chan int

这表示 channel 只能用于发送元素值, channel 不能接收值。

注意:不论接收还是发送操作符都是 <- ,不存在 -> 这种操作符。

值表示

因为通道类型是一个引用类型,所以一个通道类型的变量在初始化之前,值一定是 nil ,这也是此刻类型的零值。

于其他类型不同的是,通道类型的变量是用来传递值的,还不是存储值的。所以,通道类型并没有对应的值表示法。它的值具有即时性,是无法用字面量来准确表达的。

操作特性

通道类型在操作时,是在多个 goroutine 之间传递数据和同步的重要手段,通道本身是同步的,也就是说在同一个时刻,仅有一个 goroutine 能向通道发送元素值,同时也仅有一个 goroutine 能从它那接受到元素值。

通道中,各个元素都是严格按照发送到此的先后顺序排列的,最早发送的最先被接收,因此,通道相当于一个 消息队列(先进先出 FIFO),此外通道具有原子性,是不可分割的,通道的每一个元素只可能被一个 goroutine 接收,已被接收的元素值会立刻从通道中删除。

初始化通道

golang 中,引用类型的值都需要使用内建函数 make 来初始化,通道也不例外:

ch := make(chan int, 10)

上面表达式初始化了一个通道类型的值,传递给 make 函数的第一个参数表明,此值的具体操作类型是元素为 int 的通道类型,第二个参数表示在 同一时刻最多可以缓冲 5 个元素值

初始化一个通道时,也可以省略第二个参数的值,例如:

ch := make(chan int)

如果第二个参数被省略了,就表示被初始化的这个通道无法缓冲任何元素值。发送给它的元素应该立刻被取走,否则发送的 goroutine 会被暂停,直到有接收方接收这个元素值。

我们把初始化第二个参数值 大于 0 的通道称为缓冲通道,而把初始化 未给定第二个参数,或给定值等于 0 的通道为非缓冲通道

提示:

我们试着打印一下初始化的通道(fmt.println(make(chan int)) ),看看得到什么?

不出意外得到是:0xc000018150 ,这是因为输出的是它的地址,因为通道是一个引用类型。

那为什么同样的打印引用类型(切片、map),返回却不是地址呢?

因为通道的设计是使在 goroutine 之间安全地传递数据,而无法直接查看通道的内部状态,确保了数据的封装性和安全性。要查看通道中的数据,必须通过发送和接收操作

接收元素值

前面介绍了 <- 符号,这个符号不仅可以作为通道类型声明,也可以用于通道操作(发送或接收元素值)。第一种写法例如:

ch := make(chan int, 3)
elem := <-ch

make 函数调用后,返回一个已被初始化的缓存为 3 通道作为结果,所以 ch 变为了一个双向通道。第二行 <- 含义为把通道中的第一个元素值赋值给 elem。不过,此时这类操作会使当前 goroutine 被阻塞在这,因为通道 ch 中没有任何元素,只有有元素值才会唤醒。

第二种这样也可以

elem, ok := <-ch

和前面一样使当前 goroutine 被阻塞,如果接收操作操作之前或过程中关闭此通道,那么操作会立马结束,并且 elem 会赋值为该通道元素类型的零值。

由于相应类型的零值也可以发送至通道中,所以当接收值为这样一个元素值(相应类型的零值)的时候,无法判断它是通道缓存中的一个元素值,还是用来表示已经关闭的标识。这时第二种编写方法就显示出来了,ok 一定是布尔类型的值,接收操作因通道关闭而结束时,该值会为 false(代表操作失败),这样就可以判断。

注意:从一个未被初始化的通道那里接收值,会造成当前 goroutine 的永久阻塞。

Happens before

从通道中接收到元素的值,必须向它发送元素的值。一个元素值在被接收方从通道中取出来之前,必须先存在于该通道内。正式的讲,对于一个缓存通道,有如下规则:

  • 发送操作会使通道复制被发送的元素。也就说向通道中发送元素值时会复制元素的值到某个地址,那么有两种情况,第一种:当前 **通道为空且有接收方在等待元素值 **时,它会将 值复制到最早等待接收方持有通道的内存地址,否则第二种 将发送元素值复制到通道持有的内存地址中
  • 接收操作会使通道给出一个已发给它的元素值的副本。若因通道的缓存空间已为空而无立马给出时,会阻塞进行接收的 goroutine
  • 对于同一个元素值来说,把它发送给某个通道的操作,一定会在从该通道中接收它的操作之前完成;

发送元素值

发送语句由通道表达式、接收操作符(<-)和 代表元素值的表达式 组成。发送元素的值必须是可以赋给通道表达式中的值。

对于 <- 两边表达式求值会优先于发送操作执行,在对这两个表达式求值完成之前,发送操作一定会阻塞。

strChan := make(chan string, 3)
strChan <- "a"
strChan <- "b"
strChan <- "c"

发送类似于上例,现在 strChan 中已经缓存 3 个元素的值,这个已经是他所能够容纳的最大量了(依照对 strChan 的初始化可知),之后再发送则会被阻塞。

例1:普通类型值传入通道

已下具体使用发送值的例子,其中 make(chan struct{}, 1) 充当信号量,因为 struct{} 不占用任何内存空间,并且所有该类型都使用同一个内存地址。

strChan := make(chan string, 5)
syncChan1 := make(chan struct{}, 1)
syncChan2 := make(chan struct{}, 2)

go func() {
    <-syncChan1
    fmt.Println("go1 睡眠 1s,让下面的 go2 处于阻塞状态,只至 strChan 消费一个值后唤醒 go2")
    time.Sleep(time.Second)
    for {
       if elem, ok := <-strChan; ok {
          fmt.Println(elem)
       } else {
          break
       }
    }
    syncChan2 <- struct{}{}
}()

go func() {
    for _, elem := range []string{"a", "b", "c", "d"} {
       strChan <- elem
       if elem == "c" {
          syncChan1 <- struct{}{}
          fmt.Println("syncChan1 发送信号")
       }
    }
    fmt.Println("go2 睡眠 2s,防止接收方还没有全部接收完通道就关闭了")
    time.Sleep(2 * time.Second)
    close(strChan)
    fmt.Println("关闭了 strChan 通道")
    syncChan2 <- struct{}{}
}()
<-syncChan2
<-syncChan2
close(syncChan1)
close(syncChan2)

上述代码会输出:

syncChan1 发送信号
go1 睡眠 1s,让下面的 go2 处于阻塞状态,只至 strChan 消费一个值后唤醒 go2
go2 睡眠 2s,防止接收方还没有全部接收完通道就关闭了
a
b
c
d
关闭了 strChan 通道

注意:发送方向通道发送的值会被复制,接收方接收的总是该值的副本,而不是该本身。

例2:引用类型值传入通道

syncChan := make(chan struct{}, 2)
mpCh := make(chan map[string]int, 1)
go func() {
    for {
       if elem, ok := <-mpCh; ok {
          elem["count"]++
       } else {
          break
       }
    }
    syncChan <- struct{}{}
}()
go func() {
    countMap := make(map[string]int)
    for i := 0; i < 5; i++ {
       mpCh <- countMap
       time.Sleep(time.Millisecond)
       fmt.Printf("这个值的值是: %v \n", countMap)
    }
    close(mpCh)
    syncChan <- struct{}{}
}()
<-syncChan
<-syncChan

上述代码中输出如下:

这个值的值是: map[count:1] 
这个值的值是: map[count:2] 
这个值的值是: map[count:3] 
这个值的值是: map[count:4] 
这个值的值是: map[count:5] 

mpChan 的元素类型属于引用类型,所以接收元素的副本修改会影响到发送方持有的源值。

例3:结构体类型值传入通道

这里引入两种代码值来观察是否

type Counter struct {
    count int
}
mapChan := make(chan map[string]Counter, 1)
syncChan := make(chan struct{}, 2)
go func() {
    for {
       if elem, ok := <-mapChan; ok {
          counter := elem["count"]
          counter.count++
       } else {
          break
       }
    }
    fmt.Println("结束了接收")
    syncChan <- struct{}{}
}()

go func() {
    countMap := map[string]Counter{
       "count": Counter{},
    }
    for i := 0; i < 5; i++ {
       mapChan <- countMap
       time.Sleep(time.Millisecond) // 睡眠一微秒使得接收者有时间消费
       fmt.Printf("这个 map 总数为:%v \n", countMap)
    }
    close(mapChan)
    syncChan <- struct{}{}
}()
<-syncChan
<-syncChan

再换成另一种代码:

package main

import (
    "fmt"
    "time"
)

type Counter struct {
    count int
}

func main() {
    mapChan := make(chan map[string]*Counter, 1)
    syncChan := make(chan struct{}, 2)
    go func() {
        for {
            if elem, ok := <-mapChan; ok {
                counter := elem["count"]
                counter.count++
            } else {
                break
            }
        }
        fmt.Println("结束了接收")
        syncChan <- struct{}{}
    }()

    go func() {
        countMap := map[string]*Counter{
            "count": &Counter{},
        }
        for i := 0; i < 5; i++ {
            mapChan <- countMap
            time.Sleep(time.Millisecond) // 睡眠一微秒使得接收者有时间消费
            //fmt.Printf("这个 map 总数为:%v \n", countMap)
            fmt.Println(countMap["count"].String())
        }
        close(mapChan)
        syncChan <- struct{}{}
    }()
    <-syncChan
    <-syncChan
}

func (counter *Counter) String() string {
    return fmt.Sprintf("count:%d ", counter.count)
}

如果不出意外的话,第一段代码打印的全是 0 ,第二段代码打印的是 1,2,3,4,5,其中最重要的问题是 countMap 中存储的是指向同一个 Counter 实例的指针,因此每次发送 countMap 到通道时,接收者操作的是同一个 Counter 实例。

关闭 channel

调用 close 函数可以关闭通道,不过需要注意的是关闭后不允许发送元素值,这会让运行恐慌。建议在发送端进行关闭通道,关闭通道后对接收端接收数据一般没有什么影响,如果通道关闭后缓存中任然有值,可以通过表达式第二个结果取出。

strChan := make(chan string, 5)
syncChan1 := make(chan struct{}, 1)
syncChan2 := make(chan struct{}, 2)

go func() {
    <-syncChan1
    for {
       if elem, ok := <-strChan; ok {
          fmt.Println(elem)
       } else {
          break
       }
    }
    syncChan2 <- struct{}{}
}()
go func() {
    for _, elem := range []string{"a", "b", "c", "d"} {
       strChan <- elem
    }
    close(strChan)
    syncChan1 <- struct{}{}
    syncChan2 <- struct{}{}
}()
<-syncChan2
<-syncChan2

上述代码演示的关闭通道后任然可以向通道中取值,请注意用第二个参数判断,因为关闭通道时会发送一个目标类型的零值。

关闭通道需注意的点:

  1. 同一个通道只允许关闭一次,第二次关闭会引发恐慌;
  2. 在关闭通道时,必须将所有正在发送通道的数据发送完成(需关闭正在等待阻塞的 goroutine),才能进行关闭,否则关闭会引发恐慌;
  3. 关闭通道时,该通道必须初始化;

长度与容量

通道中长度和容量可使用内建函数 lencap 获取,len 获取当前通道中元素数量(长度),cap 获取当前通道中可容纳元素值的最大数量(容量)。容量是初始化时已经确定的,并且之后不能改变,通道长度则会随着实际情况变化。

提示:可通过容量来判断是否通道带缓存。

单向 channel

单向通道分为:发送通道 和 接收通道。有意思的是通道的命名和操作相反,比如 发送通道,发送通道是只该通道能够接收其他类型的值进通道的通道类型,称为发送通道(简单来说能够进行接收操作的通道叫做发送通道),接收通道则相反。

需要注意的是单向通道一般不会出现在变量申明中,比如:

ch := make(chan<- int, 1)

明显,一个只进不出的通道没有任何意义,那么单向通道的应用场景在哪呢?

func main() {
    strChan := make(chan string, 4)
    syncChan1 := make(chan struct{}, 1)
    syncChan2 := make(chan struct{}, 2)
    go receive(strChan, syncChan1, syncChan2)
    go send(strChan, syncChan1, syncChan2)
    <-syncChan2
    <-syncChan2
}

func receive(strChan <-chan string, syncChan1 <-chan struct{}, syncChan2 chan<- struct{}) {
    <-syncChan1
    for {
       if elem, ok := <-strChan; ok {
          fmt.Println(elem)
       } else {
          break
       }
    }
    syncChan2 <- struct{}{}
}

func send(strChan chan<- string, syncChan1 chan<- struct{}, syncChan2 chan<- struct{}) {
    for _, elem := range []string{"a", "b", "c", "d"} {
       strChan <- elem
       if elem == "c" {
          syncChan1 <- struct{}{}
          fmt.Println("syncChan1 发送信号")
       }
    }
    close(strChan)
    syncChan2 <- struct{}{}
}

根据上面例子可知,单向通道往往是由双向通道转换而来的,在函数或方法处加以强类型限制,利用函数声明将双向通道转化为单向通道的做法,只能算是 Golang 的语法糖罢了。

注意:单向通道可由双向通道转化而来,但是单向通道不可能转化成双向通道。其二,单向通道只能接收通道能 关闭当前通道,发送通道关闭会造成恐慌。

for 语句与 channel

for 使用 range 子句可以持续地从一个通道中接收元素值,例如:

var ch chan int
for elem := range ch {
    fmt.Println(elem)
}

完整例子如下:

func main() {
    strChan := make(chan string, 5)
    syncChan1 := make(chan struct{}, 1)
    syncChan2 := make(chan struct{}, 2)
    go receive(strChan, syncChan1, syncChan2)
    go send(strChan, syncChan1, syncChan2)
    <-syncChan2
    <-syncChan2
}

func receive(strChan <-chan string, syncChan1 <-chan struct{}, syncChan2 chan<- struct{}) {
    <-syncChan1
    for elem := range strChan {
       fmt.Println(elem)
    }
    syncChan2 <- struct{}{}
}

func send(strChan chan<- string, syncChan1 chan<- struct{}, syncChan2 chan<- struct{}) {
    for _, elem := range []string{"a", "b", "c", "d"} {
       strChan <- elem
       if elem == "c" {
          syncChan1 <- struct{}{}
          fmt.Println("syncChan1 发送信号")
       }
    }
    close(strChan)
    syncChan2 <- struct{}{}
}

select 语句与 channel

select 语句可以用于处理多个通道操作。它本身 并不会循环,而是会在每次执行时 随机选择 一个 可用 的通道操作。select 语句结构如下:

ch1 := make(chan int, 3)
ch2 := make(chan int, 3)
close(ch1)
select {
case elem := <-ch1:
    fmt.Printf("ch1 通道值%v", elem)
case elem := <-ch2:
    fmt.Printf("ch2 通道值%v", elem)
default:
    fmt.Println("没有通道数据")
}

其中每个分支以关键字 case 开始,后面只能发送语句或接收语句,大部分情况下需包含一个 default case ,如果所以普通 case 都不满足选择条件时, default case 就会被选中。

注意:

  1. 如果在只有一个 goroutine 且在 select 没有包含 default case ,就会发生死锁。

  2. 在接收操作中如果只赋值给了一个迭代变量,关闭通道时会出现通道类型值的零值,请使用 case elem, ok := <-ch1: 判断。

与 for 语句连用

因为 select 语句本身并不会循环,如果需要多次在多个通道中取值,需要和 for 语句配合使用:

ch1 := make(chan string, 6)
for i := 0; i < 5; i++ {
    ch1 <- fmt.Sprintf("来自 ch1: %d", i)
}
close(ch1)
syncChan := make(chan struct{}, 1)
// 使用 select 处理多个通道
go func() {
Loop:
    for {
       select {
       case msg1, ok := <-ch1:
          if !ok {
             fmt.Println("结束了 for Loop 循环")
             break Loop
          }
          fmt.Printf("%s \n", msg1)
       default:
          fmt.Printf("走到了默认逻辑")
       }

    }
    syncChan <- struct{}{}
}()
<-syncChan
fmt.Println("所有消息已接收,结束程序。")

其中 break Loop 是一条带标签的 break 语句,Loop 为标签的名字,意为中断紧贴于该标签之下的那条语句的执行,只有这样才能正确的结束外层 for 循环。

上述代码输出结果如下:

来自 ch1: 0 
来自 ch1: 1 
来自 ch1: 2 
来自 ch1: 3 
来自 ch1: 4 
结束了 for Loop 循环
所有消息已接收,结束程序。

非缓冲的 channel

一个通道初始化时其容量设为 0 或直接忽略对容量的设置,就会使通道成为一个非缓冲通道。相比于缓冲通道非缓冲通道只能同步的传递元素值。并且有如下特点:

  1. 向此类通道发送元素值的操作会被阻塞,直到至少有一个针对该通道的接收操作进行为止。
  2. 从此类通道接收元素值的操作会被阻塞,直到至少有一个针对该通道的发送操作进行为止。
done := make(chan struct{})

go func() {
    fmt.Println("工作完成")
    done <- struct{}{}
}()

<-done // 等待信号
fmt.Println("主 goroutine 继续执行")

可以通过 cap 函数判断一个通道是否带缓冲。如果想异步执行发送操作,但通道却是非缓冲的,那么请另行异步化,比如:启用额外的 goroutine 执行此操作。一般在执行接收操作时通常无需关心通道是否带有缓冲,也可以根据容量实施不同的接收策略。

最后几个常见面试题

1. 并发打印 100 随机值的平方,要求接收者可无限接收

func main() {
    ch := make(chan int, 5)
    syncChan := make(chan struct{}, 2)
    go func() {
       for i := 0; i < 100; i++ {
          ch <- i
       }
       close(ch) // 通道的关闭操作要在发送端操作
       syncChan <- struct{}{}
    }()

    go func() {
       for elem := range ch {
          fmt.Println(elem * elem)
       }
       syncChan <- struct{}{}
    }()
    <-syncChan
    <-syncChan
}

2. 实现一个互斥锁

lock := make(chan struct{}, 1)
lock <- struct{}{}

go func() {
    <-lock
    fmt.Println("访问共享资源")
    lock <- struct{}{}
}()

go func() {
    <-lock
    fmt.Println("访问共享资源")
    lock <- struct{}{}
}()
time.Sleep(time.Second * 1) // 大家可以思考一下,为什么一定要睡眠?
<-lock
分享到:
0

说点儿什么吧

头像

表情

本站由陈大剩博客程序搭建 | 湘ICP备2023000975号| Copyright © 2017 - 陈大剩博客 | 本站采用创作共用版权:CC BY-NC 4.0

站长统计| 文章总数[123]| 评论总数[11]| 登录用户[26]| 时间点[127]

logo

登入

社交账号登录