在 golang 的 grpc 里,双向流可用于类似 push 的场景,性能非常不错,但 client 接受消息是阻塞的,如果 client 由于某些原因需要主动断开 grpc 长连接,打破阻塞,则可通过 context 来完成。
client 主动断开双向流
我这里,举个例子,proto 文件如下:
syntax = "proto3";
package hello;
option go_package="./hello";
message HelloReq {
string name = 1;
}
message HelloResp {
string message = 1;
}
service Hello {
rpc sayHello(HelloReq) returns (HelloResp);
rpc sayHelloStream(HelloReq) returns (stream HelloResp);
}
sayHelloStream 为 server 流,对应的 client 如下:
type HelloClient interface {
SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error)
SayHelloStream(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (Hello_SayHelloStreamClient, error)
}
这里,ctx 可用于优雅关闭,举个例子:
func sayHelloStream() {
conn, _ := createConn()
if conn == nil {
return
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer func() {
log.Default().Println("close conn in defer")
_ = conn.Close()
// cancel 可以多次调用,内部有锁,仅第一次调用有效,后续调用 return
cancel()
}()
helloClient := hello.NewHelloClient(conn)
client, err := helloClient.SayHelloStream(ctx, &hello.HelloReq{Name: "world"})
if err != nil {
log.Default().Println(err.Error())
return
}
go func() {
time.Sleep(5 * time.Second)
log.Default().Println("call cancel...")
// cancel 可以多次调用,内部有锁,仅第一次调用有效,后续调用 return
cancel()
}()
for {
resp, err2 := client.Recv()
if err2 != nil {
if ctx.Err() == nil || !errors.Is(ctx.Err(), context.Canceled) {
log.Default().Println(err2.Error())
}
log.Default().Println("exit receive")
break
}
log.Default().Printf("received %+v", resp)
}
log.Default().Println("exit...")
}
我们开启了一个协程,5 秒后调用了 ctx 的 cancel 方法,此时原本阻塞的 client.Recv() 会返回,且 err2 为 context.Canceled,而 server 端,ctx 的 done 会被触发;
server 感知 client 退出
func (HelloServer) SayHelloStream(req *hello.HelloReq, server hello.Hello_SayHelloStreamServer) error {
data := make(chan *hello.HelloResp, 10)
closed := make(chan struct{}, 1)
go func() {
data <- &hello.HelloResp{Message: "Hello World"}
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
data <- &hello.HelloResp{Message: "Hello World"}
case <-closed:
return
}
}
}()
ctx := server.Context()
defer func() {
closed <- struct{}{}
}()
for {
select {
case <-ctx.Done():
log.Default().Println("done...")
return nil
case resp := <-data:
err := server.Send(resp)
if err != nil {
ctxErr := ctx.Err()
if ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
log.Default().Println("exit stream")
return nil
}
log.Default().Println(err.Error())
return err
}
}
}
}
server 端代码如上,最后的 for 循环监听 ctx.Done 以及 data 两个 Chanel,若 ctx.Done 发生则返回 nil,若有 data 则发送给 client,若发送失败,且 ctx.Err 为 context.Canceled 则返回 nil,否则返回 err,server 在退出时(defer 里面)发送向 closed Chanel 发送了消息,前面的协程原本是用于制造数据,但在收到 closed 消息后也会退出,从而避免该协程泄漏;
比起 client 直接关闭 conn,client 使用 context 的 cancel 方法会更平滑,当然,server 监听 client 的消息,client 调用 SendAndClose 也OK,但这 server 需要额外的协程去处理消息接收,若 server 本身不接收 client 消息,则浪费,若 server 本身也接收 client 消息则OK;
server 如需主动关闭,只需要退出循环即可,比如这里退出 for 循环,并返回 nil 即可;
评论