RpcThird
yago/base/basethird/rpc.go
是对 grpc.ClientConn
的再次封装,
主要的作用是统一规范第三方 rpc 接口的调用方式,简化业务层的调用,统一记录调用的日志。
1. 如何使用
1.1. 定义第三方调用的 ThirdApi
每个第三方单独定义一个目录(package),统一放置于 app/third
目录下。
本文以 yago 中 example 里面的 homeapi 为例,homeapi 即为一个第三方调用,
它具体的 homeApi 结构体定义于 homeapi/home.go 中。
type homeApi struct {
basethird.RpcThird
}
由于 homeApi 组合了 basethird.RpcThird,即拥有了所有 RpcThird 的调用方法,下文中给出调用样例。
1.2. 配置第三方 rpc 接口
[home_api]
address = "127.0.0.1:50051"
# Host 配置,如果域名已解析, hostname 可以设置为空串
hostname = "localhost"
# 读写超时时间,单位 s
timeout = 10
# 如果 rpc 服务端开启 ssl,客户端需要打开 ssl_on,并指定公钥
# ssl_on = true
# cert_file = "./conf/server.pem"
我们在模版 app.toml 中给出了配置 homeapi 的样例。
1.3. 定义 protobuf 文件
客户端的 protobuf 一般由服务端提供,这里我们把 home.proto 文件放在 homeapi/protobuf/homepb 目录下。
go 中间文件 home.pb.go
的生成参考 Protobuf 规范
1.4. 实现实例化 api 的 Ins 方法
func Ins() *homeApi {
name := "home_api"
v := yago.Component.Ins(name, func() interface{} {
api := new(homeApi)
api.Address = yago.Config.GetString(name + ".address")
api.Hostname= yago.Config.GetString(name + ".hostname")
api.Timeout = yago.Config.GetInt(name + ".timeout")
// api.SslOn = yago.Config.GetBool(name + ".ssl_on")
// api.CertFile = yago.Config.GetString(name + ".cert_file")
return api
})
return v.(*homeApi)
}
可以使用组件的方式来实例化 ThirdApi,grpc
的连接采用的是连接池。
1.5. 实现接口调用的方法
通常第三方的每个接口参数和返回值都不一样,我们需要在 ThirdApi 中为每个接口定义一个方法,下面给出示例。
func (a *homeApi) Hello(name string) () {
req := &pb.HelloRequest{Name: name}
conn, _ := a.GetConn()
ctx, cancel := a.GetCtx()
defer cancel()
c := pb.NewHomeClient(conn)
resp, err := c.Hello(ctx, req)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("ok:", resp.Data)
}
// 采用 stream 流的方式
func (a *homeApi) HelloStream() {
req := &pb.HelloRequest{Name: name}
conn, _ := a.GetConn()
ctx, cancel := a.GetCtx()
defer cancel()
c := pb.NewHomeClient(conn)
stream, err := c.HelloStream(ctx, req)
if err != nil {
fmt.Println(err.Error())
return
}
for {
reply, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
break
}
fmt.Println(reply)
}
}
1.6. 调用 api
homeapi.Ins().Hello()
homeapi.Ins().HelloStream()
1.7. grpc-Interceptor
RpcThird 使用了 grpc-Interceptor, 默认提供一个日志 interceptor,并对外提供了添加自定义 interceptor 的函数。 interceptor 支持添加多个,执行顺序为添加的顺序,默认的日志 interceptor 总是在最后执行。
- UnaryClientInterceptor
func (a *homeApi) RpcHello() {
a.AddUnaryClientInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("method:", method, "client before")
err := invoker(ctx, method, req, reply, cc, opts...)
fmt.Println("method:", method, "client after")
return err
})
// ...
}
- StreamClientInterceptor
func (a *homeApi) RpcHelloStream() {
a.AddStreamClientInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (stream grpc.ClientStream, e error) {
fmt.Println("method:", method, "stream before")
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return clientStream, err
})
// ...
}
- 关闭默认的日志 interceptor
// 关闭 unary
a.DisableDefaultUnaryInterceptor()
// 关闭 stream
a.DisableDefaultStreamInterceptor()