在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一个响应给客户端。但是,服务器端不一定要等到从客户端接收到所有消息后才发送响应。基于这样的逻辑,我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以在读取完流中的所有消息之后再发送响应。
现在进一步扩展 OrderManagement 服务,从而更好地理解客户端流RPC 模式。假设希望在 OrderManagement 服务中添加新的updateOrders 方法,从而更新一个订单集合,如图 3-3 所示。在这里,我们想以消息流的形式发送订单列表到服务器端,服务器端会处理这个流并发送一条带有已更新订单状态的消息给客户端。
然后,可以将 updateOrders 方法添加到 OrderManagement 服务的服务定义文件中,如代码清单 3-7 所示。只需使用 stream Order 作为updateOrders 方法的参数,就能表明 updateOrders 会接收来自客户端的多条消息作为输入。因为服务器端只发送一个响应,所以返回值是单一的字符串消息。
代码清单 3-7 具有客户端流 RPC 功能的服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
...
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
...
}
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
当更新完服务定义文件之后,就可以生成服务器端和客户端的代码了。
在服务器端,需要实现 OrderManagement 服务中所生成的updateOrders 方法接口。在代码清单 3-8 所示的 Go 实现中,UpdateOrders 方法有一个OrderManagement_UpdateOrdersServer 参数,它是客户端传入消息流的引用对象。因此,可以通过调用该对象的 Recv 方法来读取消息。
根据业务逻辑,可以读取其中一些消息,也可以读取所有的消息。服务只需调用 OrderManagement_UpdateOrdersServer 对象的SendAndClose 方法就可以发送响应,它同时也标记服务器端消息终结了流。如果要提前停止读取客户端流,那么服务器端应该取消客户端流,这样客户端就知道停止生成消息了。
代码清单 3-8 使用 Go 语言编写的 UpdateOrders 方法的OrderManagement 服务实现
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
order, err := stream.Recv() ➊
if err == io.EOF { ➋
// 完成读取订单流
return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed ordersStr"})
}
// 更新订单
orderMap[order.Id] = *order
log.Printf("Order ID ", order.Id, ": Updated")
ordersStr += order.Id + ", "
}
}
❶ 从客户端流中读取消息。
❷ 检查流是否已经结束。
下面来看这个客户端流用例的客户端实现。如代码清单 3-9 中的 Go 实现所示,客户端可以通过客户端流引用,借助 updateStream.Send 方法发送多条消息。一旦所有消息都以流的形式发送出去,客户端就可以将流标记为已完成,并接收来自服务器端的响应。这是通过流引用的CloseAndRecv 方法实现的。
代码清单 3-9 使用 Go 语言编写的 UpdateOrders 方法的OrderManagement 客户端实现
// 建立到服务器端的连接
...
c := pb.NewOrderManagementClient(conn)
...
updateStream, err := client.UpdateOrders(ctx) ➊
if err != nil { ➋
log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
}
// 更新订单1
if err := updateStream.Send(&updOrder1); err != nil { ➌
log.Fatalf("%v.Send(%v) = %v",updateStream, updOrder1, err) ➍
}
// 更新订单2
if err := updateStream.Send(&updOrder2); err != nil {
log.Fatalf("%v.Send(%v) = %v",updateStream, updOrder2, err)
}
// 更新订单3
if err := updateStream.Send(&updOrder3); err != nil {
log.Fatalf("%v.Send(%v) = %v",updateStream, updOrder3, err)
}
updateRes, err := updateStream.CloseAndRecv() ➎
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v",updateStream, err, nil)
}
log.Printf("Update Orders Res : %s", updateRes)
❶ 调用 UpdateOrders 远程方法。
❷ 处理与 UpdateOrders 相关的错误。
❸ 通过客户端流发送订单更新的请求。
❹ 处理在发送消息到流时发生的错误。
❺ 关闭流并接收响应。
当调用这个方法后,会收到服务的响应消息。现在,我们对服务器端流RPC 模式和客户端流 RPC 模式都有了非常好的了解。接下来将介绍双向流 RPC 模式,它是前面讨论的不同 RPC 风格的一种组合。