在 golang 的 grpc 里,双向流可用于类似 push 的场景,性能非常不错,但 client 接受消息是阻塞的,如果 client 由于某些原因需要主动断开 grpc 长连接,打破阻塞,则可通过 context 来完成。

client 主动断开双向流

我这里,举个例子,proto 文件如下:

syntax = "proto3";

package hello;
option go_package="./hello";

message HelloReq {
  string name = 1;
}

message HelloResp {
  string message = 1;
}

service Hello {
  rpc sayHello(HelloReq) returns (HelloResp);
  rpc sayHelloStream(HelloReq) returns (stream HelloResp);
}

sayHelloStream 为 server 流,对应的 client 如下:

type HelloClient interface {
	SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error)
	SayHelloStream(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (Hello_SayHelloStreamClient, error)
}

这里,ctx 可用于优雅关闭,举个例子:

func sayHelloStream() {
	conn, _ := createConn()
	if conn == nil {
		return
	}

	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)

	defer func() {
		log.Default().Println("close conn in defer")
		_ = conn.Close()
		// cancel 可以多次调用,内部有锁,仅第一次调用有效,后续调用 return
		cancel()
	}()

	helloClient := hello.NewHelloClient(conn)
	client, err := helloClient.SayHelloStream(ctx, &hello.HelloReq{Name: "world"})
	if err != nil {
		log.Default().Println(err.Error())
		return
	}

	go func() {
		time.Sleep(5 * time.Second)
		log.Default().Println("call cancel...")
		// cancel 可以多次调用,内部有锁,仅第一次调用有效,后续调用 return
		cancel()
	}()

	for {
		resp, err2 := client.Recv()
		if err2 != nil {
			if ctx.Err() == nil || !errors.Is(ctx.Err(), context.Canceled) {
				log.Default().Println(err2.Error())
			}
			log.Default().Println("exit receive")
			break
		}
		log.Default().Printf("received %+v", resp)
	}
	log.Default().Println("exit...")
}

我们开启了一个协程,5 秒后调用了 ctx 的 cancel 方法,此时原本阻塞的 client.Recv() 会返回,且 err2 为 context.Canceled,而 server 端,ctx 的 done 会被触发;

server 感知 client 退出

func (HelloServer) SayHelloStream(req *hello.HelloReq, server hello.Hello_SayHelloStreamServer) error {
	data := make(chan *hello.HelloResp, 10)
	closed := make(chan struct{}, 1)

	go func() {
		data <- &hello.HelloResp{Message: "Hello World"}
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				data <- &hello.HelloResp{Message: "Hello World"}
			case <-closed:
				return
			}
		}
	}()

	ctx := server.Context()

	defer func() {
		closed <- struct{}{}
	}()

	for {
		select {
		case <-ctx.Done():
			log.Default().Println("done...")
			return nil
		case resp := <-data:
			err := server.Send(resp)
			if err != nil {
				ctxErr := ctx.Err()
				if ctxErr != nil && errors.Is(ctxErr, context.Canceled) {
					log.Default().Println("exit stream")
					return nil
				}
				log.Default().Println(err.Error())
				return err
			}
		}
	}
}

server 端代码如上,最后的 for 循环监听 ctx.Done 以及 data 两个 Chanel,若 ctx.Done 发生则返回 nil,若有 data 则发送给 client,若发送失败,且 ctx.Err 为 context.Canceled 则返回 nil,否则返回 err,server 在退出时(defer 里面)发送向 closed Chanel 发送了消息,前面的协程原本是用于制造数据,但在收到 closed 消息后也会退出,从而避免该协程泄漏;

比起 client 直接关闭 conn,client 使用 context 的 cancel 方法会更平滑,当然,server 监听 client 的消息,client 调用 SendAndClose 也OK,但这 server 需要额外的协程去处理消息接收,若 server 本身不接收 client 消息,则浪费,若 server 本身也接收 client 消息则OK;

server 如需主动关闭,只需要退出循环即可,比如这里退出 for 循环,并返回 nil 即可;