gRPC-go 连接管理
(金庆的专栏 2017.12)
把 example greeter 改一下,处理 SayHello() 请求时,不仅仅返回本次请求者的名字,
还返回上次请求的名字,如:
```
λ go run greeter_client/main.go
2017/12/25 17:59:13 Greeting: Hello 'world' (prev '')
2017/12/25 17:59:15 Greeting: Hello 'world2' (prev 'world')
```
先将客户端单次请求改为多次请求:
```go
r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
log.Printf("Greeting: %s", r.Message)
time.Sleep(2 * time.Second)
r, err = c.SayHello(context.Background(), &pb.HelloRequest{Name: name + "2"})
log.Printf("Greeting: %s", r.Message)
...
```
服务器需要为每个连接保存各自的数据。连接创建时初始化数据,连接断开时清理数据。
这里利用了连接统计的接口,不知道是否是最适当的实现方式?
服务器创建时添加 StatsHandler 选项,输入一个 stats.Handler 的实现。
```
- s := grpc.NewServer()
+ s := grpc.NewServer(grpc.StatsHandler(&statshandler{}))
```
statshandler 需实现4个方法,只用到2个连接相关的方法,TagConn() 和 HandleConn(),
另外2个 TagRPC() 和 HandleRPC() 用于RPC统计, 实现为空。
```go
type statshandler struct{}
// TagConn 用来给连接打个标签,以此来标识连接(实在是找不出还有什么办法来标识连接).
// 这个标签是个指针,可保证每个连接唯一。
// 将该指针添加到上下文中去,键为 connCtxKey{}.
func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return context.WithValue(ctx, connCtxKey{}, info)
}
// TagRPC 为空.
func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
// HandleConn 会在连接开始和结束时被调用,分别会输入不同的状态.
func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
tag, ok := getConnTagFromContext(ctx)
if !ok {
log.Fatal("can not get conn tag")
}
connsMutex.Lock()
defer connsMutex.Unlock()
switch s.(type) {
case *stats.ConnBegin:
conns[tag] = ""
log.Printf("begin conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns))
case *stats.ConnEnd:
delete(conns, tag)
log.Printf("end conn, tag = (%p)%#v, now connections = %d\n", tag, tag, len(conns))
default:
log.Printf("illegal ConnStats type\n")
}
}
// HandleRPC 为空.
func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
}
```
用一个map来管理所有连接,以连接的标签(是个指针)为键,值为上次请求者的名字。
因为有多线程访问,所有加个 Mutex 来保护。
连接结束时,将从 conns 中删除连接相关的数据。
```go
var connsMutex sync.Mutex
var conns map[*stats.ConnTagInfo]string = make(map[*stats.ConnTagInfo]string)
```
getConnTagFromContext() 从上下文中取连接标签:
```go
type connCtxKey struct{}
func getConnTagFromContext(ctx context.Context) (*stats.ConnTagInfo, bool) {
tag, ok := ctx.Value(connCtxKey{}).(*stats.ConnTagInfo)
return tag, ok
}
```
最后将 SayHello() 改为记录请求者名字,并返回上次请求者的名字。
```go
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
tag, _ := getConnTagFromContext(ctx)
log.Printf("SayHello(), conn tag = (%p)%#v\n", tag, tag)
connsMutex.Lock()
defer connsMutex.Unlock()
prev := conns[tag]
conns[tag] = in.Name
return &pb.HelloReply{Message: fmt.Sprintf("Hello '%s' (prev '%s')", in.Name, prev)}, nil
}
```
测试多个客户端连接,可以看到每个客户端有自己的状态,互不影响。
```
E:\Git\grpc-go\examples\helloworld (master)
λ go run greeter_server/main.go
2017/12/25 18:39:03 start
2017/12/25 18:39:11 begin conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1
2017/12/25 18:39:11 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:13 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:13 begin conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 2
2017/12/25 18:39:13 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:15 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:15 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:17 SayHello(), conn tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}
2017/12/25 18:39:17 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:19 end conn, tag = (0xc042182040)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0420818f0), LocalAddr:(*net.TCPAddr)(0xc0420818c0)}, now connections = 1
2017/12/25 18:39:19 SayHello(), conn tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}
2017/12/25 18:39:21 end conn, tag = (0xc0421ae200)&stats.ConnTagInfo{RemoteAddr:(*net.TCPAddr)(0xc0421de060), LocalAddr:(*net.TCPAddr)(0xc0421de030)}, now connections = 0
```