我的学习日志 我的学习日志
首页
  • Go

    • Go基础知识
  • Python

    • Python进阶
  • 操作系统
  • 计算机网络
  • MySQL
  • 学习笔记
  • 常用到的算法
其他技术
  • 友情链接
  • 收藏
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

xuqil

一介帆夫
首页
  • Go

    • Go基础知识
  • Python

    • Python进阶
  • 操作系统
  • 计算机网络
  • MySQL
  • 学习笔记
  • 常用到的算法
其他技术
  • 友情链接
  • 收藏
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 环境部署

  • 测试

  • 反射

  • 数据库操作

  • 并发编程

    • 解密 go Context 包
    • 深入了解Mutex和RWMutex
    • atomic 的使用
    • sync.Once 的使用
    • 深入理解 sync.Pool
    • 深入理解 sync.WaitGroup
    • 深入理解 channel
      • Channels 的特点
        • 有缓冲和无缓冲的 channel
        • channel 的关闭
        • 关闭后的 channel
        • 值为nil的 channel
      • Channels 的使用
        • 使用方法
        • 使用场景
        • 使用案例
      • Channels 陷阱
        • Goroutine 泄漏
        • 内存逃逸
      • Channels 内部实现
        • chansend
        • chanrecv
      • 总结
      • 参考
  • 内存管理

  • Go 技巧

  • 《go基础知识》
  • 并发编程
Xu Qil
2023-03-04
0
目录

深入理解 channel

# Channels 编程

Do not communicate by sharing memory; instead, share memory by communicating.(不要通过共享内存来通信,而要通过通信来实现内存共享。)

Go 语言的 CSP(Communicating Sequential Process,通信顺序进程)模型是由协程 Goroutine 与通道 Channel 实现。Channels 作为 Goroutine 之间沟通的桥梁,Goroutine 之间可以用过 Channel 通信,或者通过 Channel 控制其他 Goroutine 的工作。

# Channels 的特点

和切片、结构体、map 等一样,channel 也是一种复合数据类型,声明一个 channel 类型变量时,必须给出其具体的元素类型:

var ch chan int // 声明一个只可传输 int 类型数据的 channel
1

如果 channel 类型变量在声明时没有被赋予初值,那么它的默认值为nil。并且,和其他复合数据类型支持使用复合类型字面值作为变量初始值不同,为 channel 类型变量赋初值的唯一方法就是使用make这个 Go 预定义的函数:

ch1 := make(chan T)    
ch2 := make(chan T, 5)
1
2

声明只发送和只接收的 channel:

ch1 := make(chan<- int, 1) // 只发送channel类型
ch2 := make(<-chan int, 1) // 只接收channel类型
1
2

# 有缓冲和无缓冲的 channel

  • 无缓冲:无缓冲是同步的,要求收发两端都必须要有 goroutine,否则就是阻塞。
  • 有缓冲:有缓冲是异步的,没满或者没空之前都不会阻塞。但是满了或者空了就会阻塞,这时候变成了同步。

image-20230304112459911

# channel 的关闭

可以使用close函数关闭 channel:

ch := make(chan int)
close(ch)
1
2

为避免发生 panic,发送端负责关闭 channel。

# 关闭后的 channel

  • 不能再次关闭。重复关闭 channel 会发生 panic。

    ch := make(chan int, 10)
    close(ch)
    close(ch) // panic: close of closed channel
    
    1
    2
    3
  • 不能发送数据。往已经关闭了的 channel 发送数据也会发生 panic。

    ch := make(chan struct{})
    close(ch)
    ch <- struct{}{} // panic: send on closed channel
    
    1
    2
    3
  • 接收的数据返回零值。从已经关闭了接收到的数据为 channel 类型的零值。

    chInt := make(chan int)
    close(chInt)
    fmt.Printf("接收的值: %d\n", <-chInt) // 接收到的值: 0
    
    chStr := make(chan string)
    close(chStr)
    fmt.Printf("接收的值: %q\n", <-chStr) // 接收到的值: ""
    
    1
    2
    3
    4
    5
    6
    7
  • 可以通过comma, ok惯用法或for range语句判断 channel 是否已经关闭。

    • 通过comma, ok判断

      func TestChannelClose_IsClose(t *testing.T) {
         var wg sync.WaitGroup
         ch := make(chan int)
         wg.Add(1)
      
         go func() {
            defer wg.Done()
            if res, ok := <-ch; !ok {
               fmt.Println("ch 已经被关闭了") // ch 已经被关闭了
            } else {
               fmt.Println("收到消息:", res)
            }
         }()
         close(ch)
         wg.Wait()
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
    • 通过fo range判断

      func TestChannelClose_Range(t *testing.T) {
      	var wg sync.WaitGroup
      	ch := make(chan int)
      	wg.Add(1)
      
      	go func() {
      		defer wg.Done()
      		for v := range ch {
      			fmt.Println(v)
      		}
      		// 或者
      		//for range ch {
      		//}
      		fmt.Println("ch 已经被关闭了") // ch 已经被关闭了
      	}()
      	close(ch)
      	wg.Wait()
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
  • close 的 channel 会向所有 goroutine 进行“广播”,即所有监听该 channel 的 goroutine 都能收到信息,不会被阻塞。

# 值为nil的 channel

如果一个 channel 类型变量的值为 nil,我们称它为 nil channel。nil channel 有一个特性,那就是对 nil channel 的读写都会发生阻塞。

# Channels 的使用

# 使用方法

goroutine 之间使用 Channel 进行传递数据:

func TestChannel(t *testing.T) {
	ch := make(chan string)

	go func() {
		res := <-ch
		fmt.Println("收到消息:", res)
		ch <- "world"
	}()

	// 向 channel 发送信息
	ch <- "hello"

	fmt.Println(<-ch)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

goroutine 使用 channel 控制任务流:

func TestChannelControl(t *testing.T) {
	ch := make(chan struct{})

	go func() {
		<-ch // 阻塞在这
		fmt.Println("任务启动")
		fmt.Println("任务结束")

		ch <- struct{}{} // 告知主 goroutine 任务结束
	}()

	time.Sleep(time.Second)
	ch <- struct{}{}

	<-ch
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 使用场景

channel 主要用于通信和控制:

  • 通信:用于 Goroutine 之间传递数据,类似于队列。
  • 控制:利用 Channel 的阻塞特性,简接地控制 goroutine 或者其他资源的消耗。

# 无缓冲的 channel

  • 用作信号传递。无缓冲 channel 用作信号传递的时候,有两种情况,分别是 1 对 1 通知信号和 1 对 n 通知信号。

    • 1 对 1 通知信号:

      func TestChannel_OnceToOnce(t *testing.T) {
      	fmt.Println("start a worker...")
      	c := do(func() {
      		fmt.Println("worker is working...")
      		time.Sleep(time.Second)
      	})
      	<-c
      	fmt.Println("worker work done!")
      }
      
      type signal struct{}
      
      func do(f func()) <-chan signal {
      	c := make(chan signal)
      	go func() {
      		fmt.Println("worker start to work...")
      		f()
      		c <- signal{}
      	}()
      	return c
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
    • 1 对 n 通知信号:

      func TestChannel_OnceToN(t *testing.T) {
      	var workers []worker
      	for i := 0; i < 10; i++ {
      		workers = append(workers, func(i int) {
      			fmt.Printf("worker-%d is working...\n", i)
      			time.Sleep(time.Second)
      		})
      	}
      	startSingle := make(chan signal)
      	done := doMulti(workers, startSingle)
      
      	fmt.Println("start a group of workers...")
      	close(startSingle) // 利用 channel close 后的“广播”机制,通知所有 worker 开始工作
      	<-done             // 任务完成信号
      	fmt.Println("the group of workers work done!")
      }
      
      type signal struct{}
      type worker func(i int)
      
      // doMulti 同时执行多个 worker,startSingle 为任务启动信号
      func doMulti(workers []worker, startSingle <-chan signal) <-chan signal {
      	var wg sync.WaitGroup
      	c := make(chan signal)
      
      	for i, w := range workers {
      		wg.Add(1)
      		go func(f worker, i int) {
      			defer wg.Done()
      			<-startSingle
      			f(i) // 执行任务
      		}(w, i)
      	}
      
      	go func() {
      		wg.Wait()
      		c <- signal{}
      	}()
      
      	return c
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
  • 用于替代锁机制。

# 带缓冲的 channel

  • 用作消息队列。channel 具有 Goroutine 安全、FIFO(first-in,fist-out)等队列的特性。
  • 用作计数信号量。例如使用带缓冲的 channel 控制 goroutine 的最大活动数量。channel 的容量(capacity)代表同时处于活动状态的 goroutine 的最大数量。向带缓冲 channel 的一个发送操作表示获取一个信号量,而从 channel 的一个接收操作则表示释放一个信号量。

# 使用案例

# 任务池

利用 channel 来实现一个任务池。该任务池允许开发者提交任务,并且设定最多多少个 goroutine 同时运行。提交任务的时候,如果所有 goroutine 都在执行任务,即满了,任务池就会缓存这个任务,缓存满了阻塞提交者。

type Task func()

type TaskPool struct {
	tasks chan Task
	close chan struct{}
}

// NewTaskPool 新建一个 *TaskPool
// numG 为 goroutine 活动数,capacity 为任务数
func NewTaskPool(numG, capacity int) *TaskPool {
	tp := &TaskPool{
		tasks: make(chan Task, capacity),
		close: make(chan struct{}),
	}

	for i := 0; i < numG; i++ {
		go func() {
			for {
				select {
				case <-tp.close:
					return
				case task := <-tp.tasks:
					task()
				}
			}
		}()
	}

	return tp
}

// Submit 往 TaskPool 提交一个任务
func (tp *TaskPool) Submit(ctx context.Context, t Task) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case tp.tasks <- t:
	}
	return nil
}

// Close 关闭 TaskPool
func (tp *TaskPool) Close() error {
	close(tp.close)
	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 消息队列

利用 channel 实现发布对于模式,发布者不断往 channel 里面发送数据,订阅者从 channel 里面取出数据。进程内的时间驱动可以依托于 channel 来实现。但 channel 有它的缺陷:

  • 没有消费组概念。channel 的同一条数据只能被一个 goroutine 消费,不能被多个 goroutine 同时消费。
  • 无法回退,也无法随机消费(FIFO)。

利用 channel 来实现一个基于内存的消息队列,并且有消费组的概念。这里使用的方案:每个消费者订阅的时候,创建一个子 channel;当队列有数据生产的时候,会遍历每个消费者的 channel,然后向每个 channel 发送数据。

image-20230304112459911

type Msg struct {
   Content string
}

type Broker struct {
   mutex sync.RWMutex
   chans []chan Msg
}

// Send 向每个 channel 发送消息
func (b *Broker) Send(m Msg) error {
   b.mutex.RLock()
   defer b.mutex.RUnlock()
   for _, ch := range b.chans {
      select {
      case ch <- m:
      default:
         return errors.New("消息队列已满")
      }
   }
   return nil
}

// Subscribe 订阅一个 capacity 大小的队列
func (b *Broker) Subscribe(capacity int) (<-chan Msg, error) {
   ch := make(chan Msg, capacity)
   b.mutex.Lock()
   defer b.mutex.Unlock()
   b.chans = append(b.chans, ch)
   return ch, nil
}

// Close 关闭 Broker
func (b *Broker) Close() error {
   b.mutex.Lock()
   chans := b.chans
   b.chans = nil
   b.mutex.Unlock()

   for _, ch := range chans {
      close(ch)
   }
   return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# Channels 陷阱

# Goroutine 泄漏

如果 channel 使用不当,就会导致 goroutine 泄漏:

  • 只发送不接收,发送者会一直阻塞,导致发送者 goroutine 泄露
  • 只接收不发送,接收者会一直阻塞,会导致接收者 goroutine 泄露

goroutine 泄露的根本原因:goroutine 被阻塞之后没有人唤醒它。

# 内存逃逸

  • 分配到栈上:不需要考虑 GC
  • 分配到堆上:需要考虑 GC

如果使用 channel 发送指针,那么必然会发生内存逃逸。因为编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收。

# Channels 内部实现

channel 的核心要素:

  • 要设计缓冲来存储数据。无缓冲等于缓冲容量为 0
  • 要能阻塞 goroutine,也要能唤醒 goroutine。依赖于 go 的运行时:
    • 发数据唤醒收数据的
    • 收数据的唤醒发数据的
  • 要维持住 goroutine 的等待队列,并且是收和发两个队列

chan 结构体

src/runtime/chan.go

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf      unsafe.Pointer // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • qcount:buffer 中已放入的元素个数
  • dataqsiz:用户构造 channel 时指定的 buf 大小
  • buf:bufffer
  • elemsize:buffer 中每个元素的大小
  • closed:channel 是否关闭,== 0 代表未 closed
  • elemtype:channel 元素的类型信息
  • sendx:buffer 中已发送的索引位置
  • recvx:buffer 中已接收的索引位置
  • recvq:等待接收的 goroutine
  • sendq:等待发送的 goroutine

其中,需要额外说明的字段和类型:

  • buf是一个 ring buffer 结构,用于存储数据
  • waitq是一个双向链表,简单来说就是队列

channel 收发数据的处理:

  • 发送的时候,如果缓冲没满,或者有接收者,那就直接发;否则丢进去sendq。
  • 接收的时候,如果缓冲有数据,或者说有发送者,那就收;否则丢进去recvq。

# chansend

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil { // 如果 channel 为 nil,就会直接阻塞,而且无法被唤醒
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

	// ...

   // 有接收者在等待的话,即直接交给接收者
   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   // 没有接收者,就准备放到缓存中
   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         racenotify(c, c.sendx, nil)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
   // ...
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 在此被阻塞住
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep) // 确保 ep 不会垃圾回收掉,实际上就是确保发送的数据不会被垃圾回收掉

   // someone woke us up.
   if mysg != gp.waiting { // 被唤醒,这个时候其实已经发完数据了,后面就要做一下清理工作
      throw("G waiting list is corrupted")
   }
   // ...
   return true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

chansend的步骤:

  1. 看是不是 nil channel,是的话直接阻塞
  2. 看有没有被阻塞的接收者,有的话直接交付数据给接收者,返回
  3. 看看缓冲有没有满,没有满就放缓冲,返回
  4. 阻塞,等待接收者来唤醒自己
  5. 被唤醒,做些清理工作

image-20230304112459911

# chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // raceenabled: don't need to check ep, as it is always on the stack
   // or is new memory allocated by reflect.

   if debugChan {
      print("chanrecv: chan=", c, "\n")
   }

   if c == nil { // 如果 channel 是 nil,直接被阻塞
      if !block {
         return
      }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   //...

   if c.closed != 0 {
      // ...
      // The channel has been closed, but the channel's buffer have data.
   } else {
      // Just found waiting sender with not closed.
      if sg := c.sendq.dequeue(); sg != nil { // 发现有发送者正在发送,直接从发送者里收数据
         // Found a waiting sender. If buffer is size 0, receive value
         // directly from sender. Otherwise, receive from head of queue
         // and add sender's value to the tail of the queue (both map to
         // the same buffer slot because the queue is full).
         recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
         return true, true
      }
   }

   if c.qcount > 0 { // 没有发送者在发送,但是有缓冲,所以冲缓冲里面拿
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         racenotify(c, c.recvx, nil)
      }
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }

   // ...
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 阻塞在这

   // someone woke us up
   if mysg != gp.waiting { // 被唤醒,这个时候已经收到数据了
      throw("G waiting list is corrupted")
   }
   // ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

chanrecv的步骤:

  1. 看是不是 nil channel,是的话直接阻塞
  2. 看有没有被阻塞的发送者,有的话直接从发送者手里拿,返回
  3. 看看缓冲有没有数据,有就读缓冲,返回
  4. 阻塞,等待发送者来唤醒自己
  5. 被唤醒,做些清理工作

image-20230304112459911

# 总结

  • channel 有 buffer 和没有 buffer 有什么特点。

    • 有 buffer 的 channel 是异步的,没有 buffer 的 channel 是同步的。
  • 发送数据给 nil channel 会怎样?发送给已关闭的 channel 会怎样?

    • 发送数据 nil channel 会永远阻塞
    • 发送给已关闭的 channel 会发生 panic
  • 从 nil channel 接收数据会怎样?从已关闭的 channel 接收数据会怎样?

    • 从 nil channel 接收数据会永远阻塞
    • 从已关闭的 channel 接收数据会收到 channel 对应类型的零值
  • channel 是怎么引起 goroutine 泄露的?或者说,goroutine 泄露有什么原因?

    • channel 阻塞且没有其他 goroutine 唤醒会引起 goroutine 泄露
    • goroutine 泄露的根源是 goroutine 阻塞没有或者不能被唤醒
  • channel 发送步骤

    1. 看 channel 是不是 nil,是的话就阻塞
    2. 看有没有在等待接收的 channel,有的话就直接把数据交给它,并唤醒它,返回
    3. 看缓冲有没有满,没有满的话放缓冲里,返回
    4. 如果缓冲已满,就阻塞住,等待被接收者 channel 唤醒
    5. 被唤醒,做些清理工作
  • channel 接收步骤

    • 看 channel 是不是 nil,是的话就阻塞
    • 看有没有在等待发送的 channel,有的话就从它手里直接拿数据,并唤醒它,返回
    • 看缓冲里有没有数据,有的话就从缓冲里拿,返回
    • 缓冲没有数据,就阻塞住,等待发送者 channel 来唤醒
    • 被唤醒,做些清理工作
  • 为什么 channel 发送指针数据会引起内存逃逸?

    • 指针指向的数据是在堆里的,会引发 GC,而且编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收

# 参考

  • 大明的《Go 项目实战》并发编程 章节
  • https://gobyexample.com/channels

评论

深入理解 sync.WaitGroup
Golang 逃逸分析

← 深入理解 sync.WaitGroup Golang 逃逸分析→

最近更新
01
Golang 逃逸分析
03-22
02
深入理解 sync.WaitGroup
03-01
03
深入理解 sync.Pool
02-28
更多文章>
Theme by Vdoing | Copyright © 2018-2023 FeelingLife | 粤ICP备2022093535号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式