深入理解 channel
# Channels 编程
Do not communicate by sharing memory; instead, share memory by communicating.(不要通过共享内存来通信,而要通过通信来实现内存共享。)
Go 语言的 CSP(Communicating Sequential Process,通信顺序进程)模型是由协程 Goroutine 与通道 Channel 实现。Channels 作为 Goroutine 之间沟通的桥梁,Goroutine 之间可以用过 Channel 通信,或者通过 Channel 控制其他 Goroutine 的工作。
# Channels 的特点
和切片、结构体、map 等一样,channel 也是一种复合数据类型,声明一个 channel 类型变量时,必须给出其具体的元素类型:
var ch chan int // 声明一个只可传输 int 类型数据的 channel
如果 channel 类型变量在声明时没有被赋予初值,那么它的默认值为nil
。并且,和其他复合数据类型支持使用复合类型字面值作为变量初始值不同,为 channel 类型变量赋初值的唯一方法就是使用make
这个 Go 预定义的函数:
ch1 := make(chan T)
ch2 := make(chan T, 5)
2
声明只发送和只接收的 channel:
ch1 := make(chan<- int, 1) // 只发送channel类型
ch2 := make(<-chan int, 1) // 只接收channel类型
2
# 有缓冲和无缓冲的 channel
- 无缓冲:无缓冲是同步的,要求收发两端都必须要有 goroutine,否则就是阻塞。
- 有缓冲:有缓冲是异步的,没满或者没空之前都不会阻塞。但是满了或者空了就会阻塞,这时候变成了同步。
# channel 的关闭
可以使用close
函数关闭 channel:
ch := make(chan int)
close(ch)
2
为避免发生 panic,发送端负责关闭 channel。
# 关闭后的 channel
不能再次关闭。重复关闭 channel 会发生 panic。
ch := make(chan int, 10) close(ch) close(ch) // panic: close of closed channel
1
2
3不能发送数据。往已经关闭了的 channel 发送数据也会发生 panic。
ch := make(chan struct{}) close(ch) ch <- struct{}{} // panic: send on closed channel
1
2
3接收的数据返回零值。从已经关闭了接收到的数据为 channel 类型的零值。
chInt := make(chan int) close(chInt) fmt.Printf("接收的值: %d\n", <-chInt) // 接收到的值: 0 chStr := make(chan string) close(chStr) fmt.Printf("接收的值: %q\n", <-chStr) // 接收到的值: ""
1
2
3
4
5
6
7可以通过
comma, ok
惯用法或for range
语句判断 channel 是否已经关闭。通过
comma, ok
判断func TestChannelClose_IsClose(t *testing.T) { var wg sync.WaitGroup ch := make(chan int) wg.Add(1) go func() { defer wg.Done() if res, ok := <-ch; !ok { fmt.Println("ch 已经被关闭了") // ch 已经被关闭了 } else { fmt.Println("收到消息:", res) } }() close(ch) wg.Wait() }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16通过
fo range
判断func TestChannelClose_Range(t *testing.T) { var wg sync.WaitGroup ch := make(chan int) wg.Add(1) go func() { defer wg.Done() for v := range ch { fmt.Println(v) } // 或者 //for range ch { //} fmt.Println("ch 已经被关闭了") // ch 已经被关闭了 }() close(ch) wg.Wait() }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
close 的 channel 会向所有 goroutine 进行“广播”,即所有监听该 channel 的 goroutine 都能收到信息,不会被阻塞。
# 值为nil
的 channel
如果一个 channel 类型变量的值为 nil,我们称它为 nil channel。nil channel 有一个特性,那就是对 nil channel 的读写都会发生阻塞。
# Channels 的使用
# 使用方法
goroutine 之间使用 Channel 进行传递数据:
func TestChannel(t *testing.T) {
ch := make(chan string)
go func() {
res := <-ch
fmt.Println("收到消息:", res)
ch <- "world"
}()
// 向 channel 发送信息
ch <- "hello"
fmt.Println(<-ch)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
goroutine 使用 channel 控制任务流:
func TestChannelControl(t *testing.T) {
ch := make(chan struct{})
go func() {
<-ch // 阻塞在这
fmt.Println("任务启动")
fmt.Println("任务结束")
ch <- struct{}{} // 告知主 goroutine 任务结束
}()
time.Sleep(time.Second)
ch <- struct{}{}
<-ch
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 使用场景
channel 主要用于通信和控制:
- 通信:用于 Goroutine 之间传递数据,类似于队列。
- 控制:利用 Channel 的阻塞特性,简接地控制 goroutine 或者其他资源的消耗。
# 无缓冲的 channel
用作信号传递。无缓冲 channel 用作信号传递的时候,有两种情况,分别是 1 对 1 通知信号和 1 对 n 通知信号。
1 对 1 通知信号:
func TestChannel_OnceToOnce(t *testing.T) { fmt.Println("start a worker...") c := do(func() { fmt.Println("worker is working...") time.Sleep(time.Second) }) <-c fmt.Println("worker work done!") } type signal struct{} func do(f func()) <-chan signal { c := make(chan signal) go func() { fmt.Println("worker start to work...") f() c <- signal{} }() return c }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
211 对 n 通知信号:
func TestChannel_OnceToN(t *testing.T) { var workers []worker for i := 0; i < 10; i++ { workers = append(workers, func(i int) { fmt.Printf("worker-%d is working...\n", i) time.Sleep(time.Second) }) } startSingle := make(chan signal) done := doMulti(workers, startSingle) fmt.Println("start a group of workers...") close(startSingle) // 利用 channel close 后的“广播”机制,通知所有 worker 开始工作 <-done // 任务完成信号 fmt.Println("the group of workers work done!") } type signal struct{} type worker func(i int) // doMulti 同时执行多个 worker,startSingle 为任务启动信号 func doMulti(workers []worker, startSingle <-chan signal) <-chan signal { var wg sync.WaitGroup c := make(chan signal) for i, w := range workers { wg.Add(1) go func(f worker, i int) { defer wg.Done() <-startSingle f(i) // 执行任务 }(w, i) } go func() { wg.Wait() c <- signal{} }() return c }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
用于替代锁机制。
# 带缓冲的 channel
- 用作消息队列。channel 具有 Goroutine 安全、FIFO(first-in,fist-out)等队列的特性。
- 用作计数信号量。例如使用带缓冲的 channel 控制 goroutine 的最大活动数量。channel 的容量(capacity)代表同时处于活动状态的 goroutine 的最大数量。向带缓冲 channel 的一个发送操作表示获取一个信号量,而从 channel 的一个接收操作则表示释放一个信号量。
# 使用案例
# 任务池
利用 channel 来实现一个任务池。该任务池允许开发者提交任务,并且设定最多多少个 goroutine 同时运行。提交任务的时候,如果所有 goroutine 都在执行任务,即满了,任务池就会缓存这个任务,缓存满了阻塞提交者。
type Task func()
type TaskPool struct {
tasks chan Task
close chan struct{}
}
// NewTaskPool 新建一个 *TaskPool
// numG 为 goroutine 活动数,capacity 为任务数
func NewTaskPool(numG, capacity int) *TaskPool {
tp := &TaskPool{
tasks: make(chan Task, capacity),
close: make(chan struct{}),
}
for i := 0; i < numG; i++ {
go func() {
for {
select {
case <-tp.close:
return
case task := <-tp.tasks:
task()
}
}
}()
}
return tp
}
// Submit 往 TaskPool 提交一个任务
func (tp *TaskPool) Submit(ctx context.Context, t Task) error {
select {
case <-ctx.Done():
return ctx.Err()
case tp.tasks <- t:
}
return nil
}
// Close 关闭 TaskPool
func (tp *TaskPool) Close() error {
close(tp.close)
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 消息队列
利用 channel 实现发布对于模式,发布者不断往 channel 里面发送数据,订阅者从 channel 里面取出数据。进程内的时间驱动可以依托于 channel 来实现。但 channel 有它的缺陷:
- 没有消费组概念。channel 的同一条数据只能被一个 goroutine 消费,不能被多个 goroutine 同时消费。
- 无法回退,也无法随机消费(FIFO)。
利用 channel 来实现一个基于内存的消息队列,并且有消费组的概念。这里使用的方案:每个消费者订阅的时候,创建一个子 channel;当队列有数据生产的时候,会遍历每个消费者的 channel,然后向每个 channel 发送数据。
type Msg struct {
Content string
}
type Broker struct {
mutex sync.RWMutex
chans []chan Msg
}
// Send 向每个 channel 发送消息
func (b *Broker) Send(m Msg) error {
b.mutex.RLock()
defer b.mutex.RUnlock()
for _, ch := range b.chans {
select {
case ch <- m:
default:
return errors.New("消息队列已满")
}
}
return nil
}
// Subscribe 订阅一个 capacity 大小的队列
func (b *Broker) Subscribe(capacity int) (<-chan Msg, error) {
ch := make(chan Msg, capacity)
b.mutex.Lock()
defer b.mutex.Unlock()
b.chans = append(b.chans, ch)
return ch, nil
}
// Close 关闭 Broker
func (b *Broker) Close() error {
b.mutex.Lock()
chans := b.chans
b.chans = nil
b.mutex.Unlock()
for _, ch := range chans {
close(ch)
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Channels 陷阱
# Goroutine 泄漏
如果 channel 使用不当,就会导致 goroutine 泄漏:
- 只发送不接收,发送者会一直阻塞,导致发送者 goroutine 泄露
- 只接收不发送,接收者会一直阻塞,会导致接收者 goroutine 泄露
goroutine 泄露的根本原因:goroutine 被阻塞之后没有人唤醒它。
# 内存逃逸
- 分配到栈上:不需要考虑 GC
- 分配到堆上:需要考虑 GC
如果使用 channel 发送指针,那么必然会发生内存逃逸。因为编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收。
# Channels 内部实现
channel 的核心要素:
- 要设计缓冲来存储数据。无缓冲等于缓冲容量为 0
- 要能阻塞 goroutine,也要能唤醒 goroutine。依赖于 go 的运行时:
- 发数据唤醒收数据的
- 收数据的唤醒发数据的
- 要维持住 goroutine 的等待队列,并且是收和发两个队列
chan 结构体
src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
qcount
:buffer 中已放入的元素个数dataqsiz
:用户构造 channel 时指定的 buf 大小buf
:buffferelemsize
:buffer 中每个元素的大小closed
:channel 是否关闭,== 0 代表未 closedelemtype
:channel 元素的类型信息sendx
:buffer 中已发送的索引位置recvx
:buffer 中已接收的索引位置recvq
:等待接收的 goroutinesendq
:等待发送的 goroutine
其中,需要额外说明的字段和类型:
buf
是一个 ring buffer 结构,用于存储数据waitq
是一个双向链表,简单来说就是队列
channel 收发数据的处理:
- 发送的时候,如果缓冲没满,或者有接收者,那就直接发;否则丢进去
sendq
。 - 接收的时候,如果缓冲有数据,或者说有发送者,那就收;否则丢进去
recvq
。
# chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { // 如果 channel 为 nil,就会直接阻塞,而且无法被唤醒
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ...
// 有接收者在等待的话,即直接交给接收者
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 没有接收者,就准备放到缓存中
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ...
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 在此被阻塞住
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep) // 确保 ep 不会垃圾回收掉,实际上就是确保发送的数据不会被垃圾回收掉
// someone woke us up.
if mysg != gp.waiting { // 被唤醒,这个时候其实已经发完数据了,后面就要做一下清理工作
throw("G waiting list is corrupted")
}
// ...
return true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
chansend
的步骤:
- 看是不是 nil channel,是的话直接阻塞
- 看有没有被阻塞的接收者,有的话直接交付数据给接收者,返回
- 看看缓冲有没有满,没有满就放缓冲,返回
- 阻塞,等待接收者来唤醒自己
- 被唤醒,做些清理工作
# chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil { // 如果 channel 是 nil,直接被阻塞
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//...
if c.closed != 0 {
// ...
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil { // 发现有发送者正在发送,直接从发送者里收数据
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 { // 没有发送者在发送,但是有缓冲,所以冲缓冲里面拿
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// ...
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 阻塞在这
// someone woke us up
if mysg != gp.waiting { // 被唤醒,这个时候已经收到数据了
throw("G waiting list is corrupted")
}
// ...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
chanrecv
的步骤:
- 看是不是 nil channel,是的话直接阻塞
- 看有没有被阻塞的发送者,有的话直接从发送者手里拿,返回
- 看看缓冲有没有数据,有就读缓冲,返回
- 阻塞,等待发送者来唤醒自己
- 被唤醒,做些清理工作
# 总结
channel 有 buffer 和没有 buffer 有什么特点。
- 有 buffer 的 channel 是异步的,没有 buffer 的 channel 是同步的。
发送数据给 nil channel 会怎样?发送给已关闭的 channel 会怎样?
- 发送数据 nil channel 会永远阻塞
- 发送给已关闭的 channel 会发生 panic
从 nil channel 接收数据会怎样?从已关闭的 channel 接收数据会怎样?
- 从 nil channel 接收数据会永远阻塞
- 从已关闭的 channel 接收数据会收到 channel 对应类型的零值
channel 是怎么引起 goroutine 泄露的?或者说,goroutine 泄露有什么原因?
- channel 阻塞且没有其他 goroutine 唤醒会引起 goroutine 泄露
- goroutine 泄露的根源是 goroutine 阻塞没有或者不能被唤醒
channel 发送步骤
- 看 channel 是不是 nil,是的话就阻塞
- 看有没有在等待接收的 channel,有的话就直接把数据交给它,并唤醒它,返回
- 看缓冲有没有满,没有满的话放缓冲里,返回
- 如果缓冲已满,就阻塞住,等待被接收者 channel 唤醒
- 被唤醒,做些清理工作
channel 接收步骤
- 看 channel 是不是 nil,是的话就阻塞
- 看有没有在等待发送的 channel,有的话就从它手里直接拿数据,并唤醒它,返回
- 看缓冲里有没有数据,有的话就从缓冲里拿,返回
- 缓冲没有数据,就阻塞住,等待发送者 channel 来唤醒
- 被唤醒,做些清理工作
为什么 channel 发送指针数据会引起内存逃逸?
- 指针指向的数据是在堆里的,会引发 GC,而且编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收
# 参考
- 大明的《Go 项目实战》并发编程 章节
- https://gobyexample.com/channels