在双向流 RPC 模式中,客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。下面通过一个示例来进一步了解双向流 RPC 模式。如图 3-4 所示,在OrderManagement 服务用例中,假设需要一个订单处理功能,通过该功能,用户可以发送连续的订单集合(订单流),并根据投递地址对它们进行组合发货,也就是说,订单要根据投递目的地进行组织和发货。

图 3-4:双向流 RPC 模式

可以看到,这个业务用例的关键步骤如下所示。

  • 客户端应用程序通过建立与服务器端的连接并发送调用元数据(头信息)初始化业务用例。
  • 一旦连接成功建立,客户端应用程序就发送连续的订单 ID 集合,这些订单需要由 OrderManagement 服务进行处理。
  • 每个订单 ID 以独立的 gRPC 消息的形式发送至服务器端。
  • 服务会处理给定订单 ID 所对应的每个订单,并根据订单的投递位置将它们组织到发货组合中。
  • 每个发货组合可能会包含多个订单,它们应该被投递到相同的目的地。
  • 订单是成批处理的。当达到指定的批次大小时,当前创建的所有发货组合都会被发送至客户端。
  • 假设流中有 4 个订单,其中有 2 个订单要发送至位置 X,另外两个要发送至位置 Y,则可以将其表示为 X、Y、X、Y。如果批次大小为 3,那么所创建的订单发货组合会是 [X, X]、[Y] 和 [Y]。这些发货组合也会以流的形式发送至客户端。

这个业务用例的核心理念就是一旦调用 RPC 方法,那么无论是客户端还是服务器端,都可以在任意时间发送消息。这也包括来自任意一端的流结束标记。

下面看一下上述用例的服务定义。如代码清单 3-10 所示,可以定义一个 processOrders 方法,该方法接受一个字符串流作为方法参数,代表了订单流 ID 并且以 CombinedShipment 流作为方法的返回值。因此,通过将方法参数和返回参数均声明为 stream,可以定义双向流的RPC 方法。发货组合的消息也是通过服务定义声明的,它包含了订单元素的列表。

代码清单 3-10 具有双向流 RPC 功能的服务定义

syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
...
rpc processOrders(stream google.protobuf.StringValue)
    returns (stream CombinedShipment); ➊
}
message Order { ➋
    string id = 1;
    repeated string items = 2;
    string description = 3;
    float price = 4;
    string destination = 5;
}
message CombinedShipment { ➌
    string id = 1;
    string status = 2;
    repeated Order ordersList = 3;
}

❶ 在双向流 RPC 模式中,将方法参数和返回参数均声明为 stream。
❷ Order 消息的结构。
❸ CombinedShipment 消息的结构。

接下来,就可以根据更新后的服务定义生成服务器端的代码了。服务应该实现 OrderManagement 服务中的 processOrders 方法。如代码清单 3-11 所示,在 Go 实现中,ProcessOrders 方法有一个OrderManagement_ProcessOrdersServer 参数,它是客户端和服务器端之间消息流的对象引用。借助这个流对象,服务器端可以读取客户端以流的方式发送的消息,也能写入服务器端的流消息并返回给客户端。传入的消息流可以通过该引用对象的 Recv 方法来读取。在ProcessOrders 方法中,服务可在持续读取传入消息流的同时,使用Send 方法将消息写入同一个流中。

为了便于演示,代码清单 3-11 没有展示完整的逻辑。不过,可以通过本书的源代码仓库找到完整的代码示例。

代码清单 3-11 使用 Go 语言编写的 ProcessOrders 方法的OrderManagement 服务实现

func (s *server) ProcessOrders(
stream pb.OrderManagement_ProcessOrdersServer) error {
    ...
    for {
        orderId, err := stream.Recv() ➊
        if err == io.EOF { ➋
            ...
            for _, comb := range combinedShipmentMap {
                stream.Send(&comb) ➌
            }
            return nil ➍
        }
        if err != nil {
            return err
        }
        // 基于目的地位置,
        // 将订单组织到发货组合中的逻辑
        ...
        //
        if batchMarker == orderBatchSize { ➎
            // 将组合后的订单以流的形式分批发送至客户端
            for _, comb := range combinedShipmentMap {
                // 将发货组合发送到客户端
                stream.Send(&comb) ➏
            }
            batchMarker = 0
            combinedShipmentMap = make(
            map[string]pb.CombinedShipment)
        } else {
            batchMarker++
        }
    }
}

❶ 从传入的流中读取订单 ID。
❷ 持续读取,直到流结束为止。
❸ 当流结束时,将所有剩余的发货组合发送给客户端。
❹ 通过返回 nil 标记服务器端流已经结束。
❺ 按批次处理订单。当达到该批次的规模时,将所有已创建的发货组合以流的形式发送给客户端。
❻ 将发货组合写入流中。

这里是基于订单 ID 来处理传入的订单的,当创建新的发货组合后,服务会将其写入相同的流中。这与客户端流 RPC 模式不同,当时服务通过 SendAndClose 方法写入流并将其关闭。当发现客户端流已经结束时,发送 nil 标记服务器端流的结束。

如代码清单 3-12 所示,客户端实现与之前的示例非常相似。当客户端通过 OrderManagement 对象调用 ProcessOrders 方法时,它会得到一个对流的引用(streamProcOrder),这个引用可以用来发送消息到服务器端,也能读取来自服务器端的消息。

代码清单 3-12 使用 Go 语言编写的 ProcessOrders 方法的OrderManagement 客户端实现

// 处理订单
streamProcOrder, _ := c.ProcessOrders(ctx) ➊
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"102"}); err != nil { ➋
    log.Fatalf("%v.Send(%v) = %v", client, "102", err)
}
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"103"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", client, "103", err)
}
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"104"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", client, "104", err)
}
channel := make(chan struct{}) ➌
go asncClientBidirectionalRPC(streamProcOrder, channel) ➍
time.Sleep(time.Millisecond * 1000) ➎
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"101"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", client, "101", err)
}
if err := streamProcOrder.CloseSend(); err != nil { ➏
    log.Fatal(err)
}
<- channel
func asncClientBidirectionalRPC (streamProcOrder pb.OrderManagement_ProcessOrdersClient,c chan struct{}) {
    for {
        combinedShipment, errProcOrder := streamProcOrder.Recv() ➐
        if errProcOrder == io.EOF { ➑
            break
        }
        log.Printf("Combined shipment : ", combinedShipment.OrdersList)
    }
    <-c
}

❶ 调用远程方法并获取流引用,以便在客户端写入和读取。
❷ 向服务发送消息。
❸ 创建 Goroutines 所使用的通道。
❹ 使用 Goroutines 调用函数,以便并行读取来自服务的消息。
❺ 模拟向服务发送消息的延迟。
❻ 为客户端流标记流的结束(订单 ID)。
❼ 在客户端读取服务的消息。
❽ 该条件探测流是否已经结束。

客户端可以在任意时间发送消息给服务并关闭流。读取消息也是同样的道理。前面的示例使用了 Go 语言中的 Goroutines,在两个并发线程中执行客户端的消息写入逻辑和消息服务逻辑。

在 Go 语言中,Goroutines 是能够与其他函数或方法并行运行的函数或方法,可以将它们视为轻量级的线程。

客户端可以并发读取和写入同一个流,输入流和输出流可以独立进行操作。这里所展示的是稍微复杂的示例,它展现了双向流 RPC 模式的威力。流的操作完全独立,客户端和服务器端可以按照任意顺序进行读取和写入,理解这一点非常重要。一旦建立连接,客户端和服务器端之间的通信模式就完全取决于客户端和服务器端本身。

目前本书已经讨论了所有可能的通信模式,可以使用它们实现基于gRPC 的应用程序之间的交互。至于具体选择哪种通信模式,并没有硬性的规定,但是最好的办法就是分析业务用例,并据此选择最合适的模式。

在结束关于 gRPC 通信模式的讨论之前,还有一个重要的方面需要了解,即 gRPC 是如何应用于微服务通信的。

文档更新时间: 2023-09-02 04:44   作者:Minho