golang 里面有个 chan,可以理解为本地的消息队列,遵循先进先出,一般用于多个协程间通信

使用场景 - 异步计算

举个例子,有个接口,它接收 client 的请求,然后计算,最后存储,其中计算耗时很久,此时有两个办法:

  1. 使用协程去计算并存储,但这样协程数量不可控;
  2. 发送到 chan,然后由处理协程去处理,类似削峰;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    type message struct {
    first int
    second int
    result int
    }

    var messageChan = make(chan message, 10240)

    // initMsgHandler 初始化消息处理协程
    func initMsgHandler() {
    for i := 0; i < 16; i++ {
    go func() {
    for s := range messageChan {
    s.result = s.first + s.second
    // do other job
    }
    }()
    }
    }

    func PublicApi(first int, second int) {
    messageChan <- message{first: first, second: second}
    }
    比如上述代码,PublicApi 只负责接收入参,然后将消息发送到 messageChan,initMsgHandler 创建了 16 个协程用于处于消息;
    这里,使用 16 个协程未必适合,比如没消息时,也有 16 个协程,消息很多时,16 个协程又不够用,此时还可以通过 chan 解决;

使用场景 - 并发控制

这里说的并发控制包括两个:

  1. 并发的总协程数量的控制;
  2. 等待所有协程执行完毕;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    type message struct {
    first int
    second int
    result int
    }

    // goRoutinesChan 控制并发数量
    var goRoutinesChan = make(chan struct{}, 1024)

    // 消息队列
    var msgChan = make(chan message, 10240)

    func initMsgHandler() {
    for s := range msgChan {
    goRoutinesChan <- struct{}{}
    go func() {
    defer func() {
    <-goRoutinesChan
    }()
    s.result = s.first + s.second
    }()
    }
    }
    这里,goRoutinesChan 是一个带缓冲区的 chan,大小 1024,即最多可以丢进 1024 个消息而不用取出,再直白点,可以理解为并发数最大为 1024,每次创建协程时,先向 goRoutinesChan 发送一个空的 struct,空 struct golang 编译时会代码优化,比 int 或 bool 更轻量级,新建的协程在退出前从 goRoutinesChan 取出一个消息,如果并发大于 1024,则 goRoutinesChan 满了,发送时会阻塞,当有协程执行完毕,从 goRoutinesChan 取出一个消息时,阻塞又会恢复,发送成功,故而可用于控制最大并发数量,若没达到最大并发数,则新建协程,如果达到,则阻塞等待;
    等待所有协程执行完毕也是这个原理,协程执行完毕后,向 chan 发送消息,然后循环取出消息,一般用于单测,前提是协程数量已知,举个例子:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func TestChan(t *testing.T) {
    log.Default().Println("begin test")
    const size = 1024
    ch := make(chan struct{}, size)
    for i := 0; i < size; i++ {
    go func() {
    defer func() {
    ch <- struct{}{}
    }()
    time.Sleep(5 * time.Second)
    }()
    }

    for i := 0; i < size; i++ {
    <-ch
    }
    log.Default().Println("end test")
    }

使用姿势 - 非阻塞读写

chan 本身可设置缓冲区大小,当缓冲区满了时,写操作会被阻塞,当为空时,读操作会被阻塞,那么有办法非阻塞读写吗?答案是有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// PutToChan 如果 chan 没满写到 chan 并返回 true,如果满了返回 false
// chan 不依赖通过 close 来释放内存,只要没有 goroutine 持有该 chan,他就能自动释放
func PutToChan[T any](ch chan T, value T) bool {
defer func() {
if r := recover(); r != nil {
loggers.Default.Error("put-to-chan-panic", zap.Any("error", r))
}
}()

select {
case ch <- value:
return true
default:
return false
}
}

select 类似 switch,但每个 case 都是对 chan 的读或写,若成功,则执行对应代码,若都失败,则执行 default,一般用于非阻塞的读写 chan,也可用于超时控制,比如有多个条件,只要其中之一触发,就超时退出的情况;
再举个例子,比如清空 chan

1
2
3
4
5
6
7
8
9
func ClearChan[T any](ch chan T) {
for {
select {
case <-ch:
default:
return
}
}
}

使用姿势 - for-range

如果需要在 chan close 后,退出读取消息,可采用 for-range,举个例子:

1
2
3
4
5
6
7
8
9
10
func TestChan2(t *testing.T) {
chans := make(chan struct{}, 2)
go func() {
chans <- struct{}{}
chans <- struct{}{}
close(chans)
}()
for range chans {
}
}