在一元 RPC 模式中,gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。在服务器端流 RPC 模式中,服务器端在接收到客户端的请求消息后,会发回一个响应的序列。这种多个响应所组成的序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端会以 trailer 元数据的形式将其状态发送给客户端,从而标记流的结束。
下面通过一个真实的用例来进一步了解服务器端流。在OrderManagement 服务中,假设需要实现一个订单搜索功能,利用该功能,只要提供一个搜索词就能得到匹配的结果,如图 3-2 所示。OrderManagement 服务不会将所有匹配的订单一次性地发送给客户端,而是在找到匹配的订单时,逐步将其发送出去。这意味着当订单服务的客户端发出一个请求之后,会接收到多条响应消息。
现在,在 OrderManagement 服务的 gRPC 服务定义中新增searchOrders 方法。如代码清单 3-4 所示,searchOrders 方法定义与代码清单 3-1 中的 getOrder 方法非常类似,但是在服务定义的 proto文件中,我们通过使用 returns (stream Order) 将返回参数指定为订单的流。
代码清单 3-4 使用服务器端流 RPC 模式的服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
...
rpc searchOrders(google.protobuf.StringValue) returns (stream Order); ➊
...
}
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
➊ 通过返回 Order 消息的 stream 定义服务器端流。
通过服务定义,可以生成服务器端的代码,然后通过实现所生成的接口,就可以为 OrderManagement
服务的 searchOrders
方法构建逻辑了。在代码清单 3-5 所示的 Go 实现中,SearchOrders
方法有两个参数,分别是字符串类型的 searchQuery
和用来写入响应的特殊参数OrderManagement_SearchOrdersServer
。OrderManagement_SearchOrdersServer
是流的引用对象,可以写入多个响应。这里的业务逻辑是找到匹配的订单,并通过流将其依次发送出去。当找到新的订单时,使用流引用对象的 Send(...)
方法将其写入流。一旦所有响应都写到了流中,就可以通过返回 nil
来标记流已经结束,服务器端的状态和其他 trailer
元数据
会发送给客户端。
代码清单 3-5 使用 Go 语言编写的 SearchOrders
方法的OrderManagement
服务实现
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
log.Print(key, order)
for _, itemStr := range order.Items {
log.Print(itemStr)
if strings.Contains(itemStr, searchQuery.Value) { ➊
// 在流中发送匹配的订单
err := stream.Send(&order) ➋
if err != nil {
return fmt.Errorf("error sending message to stream : %v",err) ➌
}
log.Print("Matching Order Found : " + key)
break
}
}
}
return nil
}
❶ 查找匹配的订单。
❷ 通过流发送匹配的订单。
❸ 检查在将消息以流的形式发送给客户端的过程中可能出现的错误。
客户端的远程方法调用和一元 RPC 模式中的非常类似。但是,因为服务器端往流中写入了多个响应,所以这里必须处理多个响应。因此,我们在 gRPC 客户端的 Go 语言实现中使用 Recv 方法从客户端流中检索消息,并且持续检索,直到流结束为止,如代码清单 3-6 所示。
代码清单 3-6 使用 Go 语言编写的 SearchOrders 方法的OrderManagement 客户端实现
// 建立到服务器端的连接
...
c := pb.NewOrderManagementClient(conn)
...
searchStream, _ := c.SearchOrders(ctx,&wrapper.StringValue{Value: "Google"}) ➊
for {
searchOrder, err := searchStream.Recv() ➋
if err == io.EOF { ➌
break
}
// 处理可能出现的错误
log.Print("Search Result : ", searchOrder)
}
❶ SearchOrders 方法返回 OrderManagement_SearchOrdersClient的客户端流,它有一个名为 Recv 的方法。
❷ 调用客户端流的 Recv 方法,逐个检索 Order 响应。
❸ 当发现流结束的时候,Recv 会返回 io.EOF。
下面看一下客户端流 RPC 模式,它恰好与服务器端流 RPC 模式相反。