在构建 gRPC 应用程序时,无论是客户端应用程序,还是服务器端应用程序,在远程方法执行之前或之后,都可能需要执行一些通用逻辑。在gRPC 中,可以拦截 RPC 的执行,来满足特定的需求,如日志、认证、性能度量指标等,这会使用一种名为拦截器的扩展机制。gRPC 提供了简单的 API,用来在客户端和服务器端的 gRPC 应用程序中实现并安装拦截器。它是 gRPC 核心扩展机制之一,在一些使用场景中非常有用,比如日志、身份验证、授权、性能度量指标、跟踪以及其他一些自定义
需求。

支持 gRPC 的所有语言并非都支持拦截器功能,而且每种语言的拦截器实现可能会有所差异。本书只涉及 Go 语言和 Java 语言。

根据所拦截的 RPC 调用的类型,gRPC 拦截器可以分为两类。对于一元RPC,可以使用一元拦截器(unary interceptor);对于流 RPC,则可以使用流拦截器(streaming interceptor)。这些拦截器既可以用在 gRPC服务器端,也可以用在 gRPC 客户端。接下来看如何在服务器端使用拦截器。

5.1.1 服务器端拦截器

当客户端调用 gRPC 服务的远程方法时,通过使用服务器端拦截器,可以在执行远程方法之前,执行一个通用的逻辑。当需要在调用远程方法之前应用认证等特性时,这会非常有帮助。如图 5-1 所示,在所开发的任意 gRPC 服务器端,都可以插入一个或多个拦截器。如果希望向OrderManagement gRPC 服务中插入新服务器端拦截器,则可以实现该拦截器并在创建 gRPC 服务器端时将其注册进来。

图 5-1:服务器端拦截器

在服务器端,一元拦截器拦截一元 RPC,流拦截器则拦截流 RPC。下面来看一下服务器端一元拦截器。

01. 一元拦截器

如果想在服务器端拦截 gRPC 服务的一元 RPC,需要为 gRPC 服务器端实现一元拦截器。如代码清单 5-1 所示,要实现这一点,需要先实现 UnaryServerInterceptor 类型的函数,并在创建 gRPC服务器端时将函数注册进来。UnaryServerInterceptor 是用于服务器端一元拦截器的类型,它具有以下签名:

func(ctx context.Context, req interface{}, info *UnaryServerInfo,handler UnaryHandler) (resp interface{}, err error)

在这个函数中,我们能够完全控制传入 gRPC 服务器端的所有一元RPC。

代码清单 5-1 gRPC 服务器端一元拦截器

// 服务器端一元拦截器
func orderUnaryServerInterceptor(ctx context.Context, req interface{},info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // 前置处理逻辑
    // 通过检查传入的参数,获取关于当前RPC的信息
    log.Println("======= [Server Interceptor] ", info.FullMethod) ➊
    // 调用handler完成一元RPC的正常执行
    m, err := handler(ctx, req) ➋
    // 后置处理逻辑
    log.Printf(" Post Proc Message : %s", m) ➌
    return m, err ➍
}
// ...
func main() {
    ...
    // 在服务器端注册拦截器
    s := grpc.NewServer(
    grpc.UnaryInterceptor(orderUnaryServerInterceptor)) ➎
    ...
  • ❶ 前置处理阶段:可以在调用对应的 RPC 之前拦截消息。
  • ❷ 通过 UnaryHandler 调用 RPC 方法。
  • ❸ 后置处理阶段:可以在这里处理 RPC 响应。
  • ❹ 将 RPC 响应发送回去。
  • ❺ 使用 gRPC 服务器端注册一元拦截器。

服务器端一元拦截器的实现通常可以分为 3 个部分:前置处理、调用 RPC 方法以及后置处理。顾名思义,前置处理阶段是在调用预期的 RPC 远程方法之前执行。在前置处理阶段,用户可以通过检查传入的参数来获取关于当前 RPC 的信息,比如 RPC 上下文、RPC 请求和服务器端信息。因此,我们甚至可以在预处理阶段修改RPC。

随后,在调用阶段,需要调用 gRPC UnaryHandler 来触发 RPC 方法。在调用 RPC 之后,就进入后置处理阶段。这意味着,RPC 响应要流经后置处理阶段。在这个阶段中,可以按需处理返回的响应和错误。当后置处理阶段完成之后,需要以拦截器函数返回参数的形式将消息和错误返回。如果不需要后置处理器,那么可以直接返回 handler 调用(handler(ctx, req))。接下来看一下流拦截器。

02. 流拦截器

服务器端流拦截器会拦截 gRPC 服务器所处理的所有流 RPC。流拦截器包括前置处理阶段和流操作拦截阶段。

如代码清单 5-2 所示,假设希望拦截对 OrderManagement 服务的流 RPC。StreamServerInterceptor 是服务器端的流拦截器类型。orderServerStreamInterceptor 是具有如下签名的StreamServerInterceptor 类型的拦截器函数:

func(srv interface{}, ss ServerStream, info *StreamServerInfo,handler StreamHandler) error

与一元拦截器类似,在前置处理阶段,可以在流 RPC 进入服务实现之前对其进行拦截。在前置处理阶段之后,则可以调用StreamHandler 来完成远程方法的 RPC 执行,而且通过已实现grpc.ServerStream 接口的包装器流接口,可以拦截流 RPC 的消息。在通过 handler(srv, newWrappedStream(ss)) 方法调用grpc.StreamHandler 时,可以将这个包装器结构传递进来。grpc.ServerStream 的包装器可以拦截 gRPC 服务发送或接收到的数据。它实现了 SendMsg 函数和 RecvMsg 函数,这两个函数分别会在服务发送和接收 RPC 流消息的时候被调用。

代码清单 5-2 gRPC 服务器端流拦截器

// 服务器端流拦截器
// wrappedStream包装嵌入的grpc.ServerStream
// 并拦截对RecvMsg函数和SendMsg函数的调用
type wrappedStream struct { ➊
    grpc.ServerStream
}
➋
func (w *wrappedStream) RecvMsg(m interface{}) error {
    log.Printf("====== [Server Stream Interceptor Wrapper]  Receive a message (Type: %T) at %s",m, time.Now().Format(time.RFC3339))
    return w.ServerStream.RecvMsg(m)
}
➌
func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v",m, time.Now().Format(time.RFC3339))
    return w.ServerStream.SendMsg(m)
}
➍
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
    return &wrappedStream{s}
}
➎
func orderServerStreamInterceptor(srv interface{},ss grpc.ServerStream, info *grpc.StreamServerInfo,handler grpc.StreamHandler) error {
    log.Println("====== [Server Stream Interceptor] ",info.FullMethod) ➏
    err := handler(srv, newWrappedStream(ss)) ➐
    if err != nil {
        log.Printf("RPC failed with error %v", err)
    }
    return err
}
...
// 注册拦截器
s := grpc.NewServer(
grpc.StreamInterceptor(orderServerStreamInterceptor)) ➑
...
  • ❶ grpc.ServerStream 的包装器流。
  • ❷ 实现包装器的 RecvMsg 函数,来处理流 RPC 所接收到的消息。
  • ❸ 实现包装器的 SendMsg 函数,来处理流 RPC 所发送的消息。
  • ❹ 创建新包装器流的实例。
  • ❺ 流拦截器的实现。
  • ❻ 前置处理阶段。
  • ❼ 使用包装器流调用流 RPC。
  • ❽ 注册拦截器。

下面的 gRPC 服务器端日志输出,可以帮助你理解服务器端流拦截器的行为。根据每条日志的打印顺序,可以看出流拦截器的行为。

这里调用的流远程方法是 searchOrders,它是一个服务器端流RPC:

[Server Stream Interceptor] /ecommerce.OrderManagement/searchOrders
[Server Stream Interceptor Wrapper] Receive a message
Matching Order Found : 102 -> Writing Order to the stream ...
[Server Stream Interceptor Wrapper] Send a message...
Matching Order Found : 104 -> Writing Order to the stream ...
[Server Stream Interceptor Wrapper] Send a message...

客户端拦截器的术语与服务器端拦截器的非常相似,只不过在接口和函数签名方面有细微的差异。下面来看关于客户端拦截器的介绍。

.2 客户端拦截器

当客户端发起 RPC 来触发 gRPC 服务的远程方法时,可以在客户端拦截这些 RPC。如图 5-2 所示,借助客户端拦截器,可以拦截一元 RPC和流 RPC。

图 5-2:客户端拦截器

当需要实现一些特定的可重用特性时,这会非常有用,比如在客户端应
用程序代码之外实现对 gRPC 服务调用的安全保护。

一元拦截器

客户端一元拦截器用于拦截一元 RPC 客户端的调用。UnaryClientInterceptor 是客户端一元拦截器的类型,函数签名如下:

func(ctx context.Context, method string, req, reply interface{},cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

与前面介绍的服务器端一元拦截器一样,客户端一元拦截器也有不同的阶段。代码清单 5-3 展示了客户端一元拦截器的基本 Go 语言实现。在前置处理阶段,可以在调用远程方法之前拦截 RPC。这里可以通过检查传入的参数来访问关于当前 RPC 的信息,比如 RPC的上下文、方法字符串、要发送的请求以及 CallOption 配置。这样一来,我们甚至可以在原始的 RPC 发送至服务器端应用程序之前,对其进行修改。随后,借助 UnaryInvoker 参数,可以调用实际的一元 RPC。在后置处理阶段,可以访问 RPC 的响应结果或错误结果。

代码清单 5-3 gRPC 客户端一元拦截器

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},cc *grpc.ClientConn,invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    // 前置处理阶段
    log.Println("Method : " + method) ➊
    // 调用远程方法
    err := invoker(ctx, method, req, reply, cc, opts...) ➋
    // 后置处理阶段
    log.Println(reply) ➌
    return err ➍
}
...
func main() {
// 建立到服务器端的连接
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) ➎
...
  • ❶ 前置处理阶段能够在 RPC 请求发送至服务器端之前访问它。
  • ❷ 通过 UnaryInvoker 调用 RPC 方法。
  • ❸ 后置处理阶段,可以在这里处理响应结果或错误结果。
  • ❹ 向 gRPC 客户端应用程序返回错误,同时包含作为参数传递进来的答复。
  • ❺ 通过传入一元拦截器作为 grpc.Dial 的选项,建立到服务器端的连接。

注册拦截器函数通过使用 grpc.WithUnaryInterceptor,来在grpc.Dial 操作中实现。

02. 流拦截器

客户端流拦截器会拦截 gRPC 客户端所处理的所有流 RPC。客户端流拦截器的实现与服务器端流拦截器的实现非常相似。StreamClientInterceptor 是客户端流拦截器的类型,其函数类型签名如下所示:

func(ctx context.Context, desc *StreamDesc, cc *ClientConn,method string, streamer Streamer,opts ...CallOption) (ClientStream, error)

如代码清单 5-4 所示,客户端流拦截器实现包括前置处理和流操作拦截。

代码清单 5-4 gRPC 客户端流拦截器

func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc,cc *grpc.ClientConn, method string,streamer grpc.Streamer, opts ...grpc.CallOption)(grpc.ClientStream, error) {
    log.Println("======= [Client Interceptor] ", method) ➊
    s, err := streamer(ctx, desc, cc, method, opts...) ➋
    if err != nil {
        return nil, err
    }
    return newWrappedStream(s), nil ➌
}
type wrappedStream struct { ➍
    grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error { ➎
    log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
    return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error { ➏
    log.Printf("====== [Client Stream Interceptor]  Send a message (Type: %T) at %v",m, time.Now().Format(time.RFC3339))
    return w.ClientStream.SendMsg(m)
}
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
    return &wrappedStream{s}
}
...
func main() {
    // 建立到服务器端的连接
    conn, err := grpc.Dial(address, grpc.WithInsecure(),
    grpc.WithStreamInterceptor(clientStreamInterceptor)) ➐
    ...
  • ❶ 前置处理阶段能够在将 RPC 请求发送至服务器端之前访问它。
  • ❷ 调用传入的 streamer 来获取 ClientStream。
  • ❸ 包装 ClientStream,使用拦截逻辑重载其方法并返回给客户端应用程序。
  • ❹ grpc.ClientStream 的包装器流。
  • ❺ 拦截流 RPC 所接收消息的函数。
  • ❻ 拦截流 RPC 所发送消息的函数。
  • ❼ 注册流拦截器。

流操作拦截是通过流的包装器实现完成的,该实现中必须实现包装grpc.ClientStream 的新结构。这里实现了两个包装流的函数,即 RecvMsg 函数和 SendMsg 函数,分别用来拦截客户端接收及发送的流消息。拦截器的注册和一元拦截器是一样的,都是通过grpc.Dial 操作完成的。

接下来看一下在客户端应用程序中调用 gRPC 服务时经常需要的另一项功能,那就是截止时间。

文档更新时间: 2023-09-02 05:38   作者:Minho