golang 里面有个 chan,可以理解为本地的消息队列,遵循先进先出,一般用于多个协程间通信
使用场景 - 异步计算
举个例子,有个接口,它接收 client 的请求,然后计算,最后存储,其中计算耗时很久,此时有两个办法:
- 使用协程去计算并存储,但这样协程数量不可控;
- 发送到 chan,然后由处理协程去处理,类似削峰;比如上述代码,PublicApi 只负责接收入参,然后将消息发送到 messageChan,initMsgHandler 创建了 16 个协程用于处于消息;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23type message struct {
first int
second int
result int
}
var messageChan = make(chan message, 10240)
// initMsgHandler 初始化消息处理协程
func initMsgHandler() {
for i := 0; i < 16; i++ {
go func() {
for s := range messageChan {
s.result = s.first + s.second
// do other job
}
}()
}
}
func PublicApi(first int, second int) {
messageChan <- message{first: first, second: second}
}
这里,使用 16 个协程未必适合,比如没消息时,也有 16 个协程,消息很多时,16 个协程又不够用,此时还可以通过 chan 解决;
使用场景 - 并发控制
这里说的并发控制包括两个:
- 并发的总协程数量的控制;
- 等待所有协程执行完毕;这里,goRoutinesChan 是一个带缓冲区的 chan,大小 1024,即最多可以丢进 1024 个消息而不用取出,再直白点,可以理解为并发数最大为 1024,每次创建协程时,先向 goRoutinesChan 发送一个空的 struct,空 struct golang 编译时会代码优化,比 int 或 bool 更轻量级,新建的协程在退出前从 goRoutinesChan 取出一个消息,如果并发大于 1024,则 goRoutinesChan 满了,发送时会阻塞,当有协程执行完毕,从 goRoutinesChan 取出一个消息时,阻塞又会恢复,发送成功,故而可用于控制最大并发数量,若没达到最大并发数,则新建协程,如果达到,则阻塞等待;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23type message struct {
first int
second int
result int
}
// goRoutinesChan 控制并发数量
var goRoutinesChan = make(chan struct{}, 1024)
// 消息队列
var msgChan = make(chan message, 10240)
func initMsgHandler() {
for s := range msgChan {
goRoutinesChan <- struct{}{}
go func() {
defer func() {
<-goRoutinesChan
}()
s.result = s.first + s.second
}()
}
}
等待所有协程执行完毕也是这个原理,协程执行完毕后,向 chan 发送消息,然后循环取出消息,一般用于单测,前提是协程数量已知,举个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func TestChan(t *testing.T) {
log.Default().Println("begin test")
const size = 1024
ch := make(chan struct{}, size)
for i := 0; i < size; i++ {
go func() {
defer func() {
ch <- struct{}{}
}()
time.Sleep(5 * time.Second)
}()
}
for i := 0; i < size; i++ {
<-ch
}
log.Default().Println("end test")
}
使用姿势 - 非阻塞读写
chan 本身可设置缓冲区大小,当缓冲区满了时,写操作会被阻塞,当为空时,读操作会被阻塞,那么有办法非阻塞读写吗?答案是有:
1 | // PutToChan 如果 chan 没满写到 chan 并返回 true,如果满了返回 false |
select 类似 switch,但每个 case 都是对 chan 的读或写,若成功,则执行对应代码,若都失败,则执行 default,一般用于非阻塞的读写 chan,也可用于超时控制,比如有多个条件,只要其中之一触发,就超时退出的情况;
再举个例子,比如清空 chan
1 | func ClearChan[T any](ch chan T) { |
使用姿势 - for-range
如果需要在 chan close 后,退出读取消息,可采用 for-range,举个例子:
1 | func TestChan2(t *testing.T) { |