• Grpc 通信模式


    通讯模式简介

    gRPC应用程序提供四种基础的通讯模式,这四种通信模式基本能满足绝大多数应用场景。得益于底层HTTP2长连接以及消息推送的机制,gRPC也把推送的能力进行抽象形成开发可调用的API。

    在这里插入图片描述

    如上图,四种通信模式为:

    • Unary RPC - 也叫做 Simple RPC 简单的请求-响应,一问一答式的RPC请求,类似本地方法调用
    • Server-side streaming RPC - 服务端流RPC模式,服务端在接收到客户请求后 主动推送数据。应用场景:APP消息推送、股票动态数据
    • Client-side streaming RPC - 客户端流RPC模式,在该模式下客户端会发送多次请求,服务端则会发送最终响应给客户端。需要注意的是服务端并不一定要等到从客户端接收所有消息后,才发送响应。基于此,可以在接收到流中的一条或者几条消息之后就发送响应。也可以在等到所有消息后再发送响应。
    • Bidirectional streaming RPC - 全双工流RPC模式,即数据在客户端、服务端双向流动。但是发起方必须是客户端。感觉可以使用这种方式进行IM聊天,哈哈。

    Proto 文件定义

    下面从完整的demo、不同模式的语法上展开讲一讲每一种模式实现的细节,以及底层的协议。创建route_guide.proto文件,内容如下:

    // protobuf 语法
    syntax = "proto3";
    
    option go_package = "google.golang.org/grpc/examples/route_guide/routeguide";
    // 我们使用golang做测试 以下代码不用考虑 
    option java_multiple_files = true;
    option java_package = "io.grpc.examples.routeguide";
    option java_outer_classname = "RouteGuideProto";
    
    package routeguide;
    
    // Interface exported by the server.
    service RouteGuide {
      // A simple RPC. 
      rpc GetFeature(Point) returns (Feature) {}
    
      // A server-to-client streaming RPC.
      // 跟 Simple RPC 相比,返回类型从对象 变成了 stream 
      rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
      // A client-to-server streaming RPC.
      // 由于客户端发送多次数据 因此 参数前面加上 stream 关键字
      rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
      // A Bidirectional streaming RPC.
      // 双向流模式 参数、返回 前面都加上 stream 关键字
      rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    }
    
    // 以下定义 RPC service 需要的对象 不做过多讲解
    // Points are represented as latitude-longitude pairs in the E7 representation
    // (degrees multiplied by 10**7 and rounded to the nearest integer).
    // Latitudes should be in the range +/- 90 degrees and longitude should be in
    // the range +/- 180 degrees (inclusive).
    message Point {
      int32 latitude = 1;
      int32 longitude = 2;
    }
    
    // A latitude-longitude rectangle, represented as two diagonally opposite
    // points "lo" and "hi".
    message Rectangle {
      // One corner of the rectangle.
      Point lo = 1;
    
      // The other corner of the rectangle.
      Point hi = 2;
    }
    
    // A feature names something at a given point.
    //
    // If a feature could not be named, the name is empty.
    message Feature {
      // The name of the feature.
      string name = 1;
    
      // The point where the feature is detected.
      Point location = 2;
    }
    
    // A RouteNote is a message sent while at a given point.
    message RouteNote {
      // The location from which the message is sent.
      Point location = 1;
    
      // The message to be sent.
      string message = 2;
    }
    
    // A RouteSummary is received in response to a RecordRoute rpc.
    //
    // It contains the number of individual points received, the number of
    // detected features, and the total distance covered as the cumulative sum of
    // the distance between each point.
    message RouteSummary {
      // The number of points received.
      int32 point_count = 1;
    
      // The number of known features passed while traversing the route.
      int32 feature_count = 2;
    
      // The distance covered in metres.
      int32 distance = 3;
    
      // The duration of the traversal in seconds.
      int32 elapsed_time = 4;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    自动生成代码

    protoc --go_out=. --go_opt=paths=source_relative \
        --go-grpc_out=. --go-grpc_opt=paths=source_relative \
        route_guide.proto
    
    • 1
    • 2
    • 3

    执行以上指令会在相应的目录下生成两个文件

    • route_guide.pb.go - 包含所有proto文件中message定义的数据对象,以及自动生成的对用编码、解码方法
    • route_guide_grpc.pb.go - 包含两部分内容
      • 在proto文件内部定义,需要被客户端实现的接口- RouteGuideClient
      • 在proto文件内部定义,需要被服务端实现的接口- RouteGuideServer

    完整代码

    鉴于代码篇幅过大,文章中完整的代码在 通信模式完整代码

    #拉取代码
    git clone https://github.com/grpc/grpc-go.git
    
    #下载依赖
    cd grpc-go
    go mod tidy
    
    #代码目录
    cd examples/route_guide
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    下面按照不同的通信模式,讲解一下其中核心的代码逻辑

    Simple(Unary) RPC

    服务端

    // GetFeature returns the feature at the given point.
    func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    	for _, feature := range s.savedFeatures {
    		if proto.Equal(feature.Location, point) {
    			return feature, nil
    		}
    	}
    	// No feature was found, return an unnamed feature
    	return &pb.Feature{Location: point}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    客户端

    // printFeature gets the feature for the given point.
    func printFeature(client pb.RouteGuideClient, point *pb.Point) {
    	log.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    	feature, err := client.GetFeature(ctx, point)
    	if err != nil {
    		log.Fatalf("client.GetFeature failed: %v", err)
    	}
    	log.Println(feature)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Server-side streaming RPC

    服务端

    func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    	for _, feature := range s.savedFeatures {
    		if inRange(feature.Location, rect) {
    			if err := stream.Send(feature); err != nil {
    				return err
    			}
    		}
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    客户端

    func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
    	log.Printf("Looking for features within %v", rect)
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    	stream, err := client.ListFeatures(ctx, rect)
    	if err != nil {
    		log.Fatalf("client.ListFeatures failed: %v", err)
    	}
    	for {
    		feature, err := stream.Recv()
    		if err == io.EOF {
    			break
    		}
    		if err != nil {
    			log.Fatalf("client.ListFeatures failed: %v", err)
    		}
    		log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
    			feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    协议分析

    同样屏蔽其它模式的调用代码,先后启动服务端、客户端程序,控制台输出结果请自行测试,就不一一展示了。这里需要说明的,当服务端通过多个流会送结果时,在协议上是如何进行数据通信的。

    在这里插入图片描述

    如上图,服务端在最后一个流输出时,会将该stream标记为 END 状态,从而结束数据传递。后续无论是客户端流RPC模式、全双工流RPC模式都是基于这种思路,将最后一个stream 标记为END状态,终止数据传输

    Client-side streaming RPC

    服务端

    func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    	var pointCount, featureCount, distance int32
    	var lastPoint *pb.Point
    	startTime := time.Now()
    	for {
    		// 流结束 发送流关闭的请求
    		point, err := stream.Recv()
    		if err == io.EOF {
    			endTime := time.Now()
    			return stream.SendAndClose(&pb.RouteSummary{
    				PointCount:   pointCount,
    				FeatureCount: featureCount,
    				Distance:     distance,
    				ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
    			})
    		}
    		// 数据计算
    		if err != nil {
    			return err
    		}
    		pointCount++
    		for _, feature := range s.savedFeatures {
    			if proto.Equal(feature.Location, point) {
    				featureCount++
    			}
    		}
    		if lastPoint != nil {
    			distance += calcDistance(lastPoint, point)
    		}
    		lastPoint = point
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    客户端

    func runRecordRoute(client pb.RouteGuideClient) {
    	// Create a random number of random points
    	r := rand.New(rand.NewSource(time.Now().UnixNano()))
    	pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
    	var points []*pb.Point
    	for i := 0; i < pointCount; i++ {
    		points = append(points, randomPoint(r))
    	}
    	log.Printf("Traversing %d points.", len(points))
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    	stream, err := client.RecordRoute(ctx)
    	if err != nil {
    		log.Fatalf("client.RecordRoute failed: %v", err)
    	}
    	//多次发送
    	for _, point := range points {
    		if err := stream.Send(point); err != nil {
    			log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
    		}
    	}
    	reply, err := stream.CloseAndRecv()
    	if err != nil {
    		log.Fatalf("client.RecordRoute failed: %v", err)
    	}
    	log.Printf("Route summary: %v", reply)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    Bidirectional streaming RPC

    服务端

    func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    	for {
    		in, err := stream.Recv()
    		if err == io.EOF {
    			return nil
    		}
    		if err != nil {
    			return err
    		}
    		key := serialize(in.Location)
    
    		s.mu.Lock()
    		s.routeNotes[key] = append(s.routeNotes[key], in)
    		// Note: this copy prevents blocking other clients while serving this one.
    		// We don't need to do a deep copy, because elements in the slice are
    		// insert-only and never modified.
    		rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
    		copy(rn, s.routeNotes[key])
    		s.mu.Unlock()
    
    		for _, note := range rn {
    			if err := stream.Send(note); err != nil {
    				return err
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    客户端

    func runRouteChat(client pb.RouteGuideClient) {
    	notes := []*pb.RouteNote{
    		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
    		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
    		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
    		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
    		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
    		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    	defer cancel()
    	stream, err := client.RouteChat(ctx)
    	if err != nil {
    		log.Fatalf("client.RouteChat failed: %v", err)
    	}
    	waitc := make(chan struct{})
    	go func() {
    		for {
    			in, err := stream.Recv()
    			if err == io.EOF {
    				// read done.
    				close(waitc)
    				return
    			}
    			if err != nil {
    				log.Fatalf("client.RouteChat failed: %v", err)
    			}
    			log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
    		}
    	}()
    	for _, note := range notes {
    		if err := stream.Send(note); err != nil {
    			log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
    		}
    	}
    	stream.CloseSend()
    	<-waitc
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    番外

    代码中的服务端、客户端程序都支持TLS启动,默认不开启可以通过以下指令开启TLS

    ```sh
    $ go run server/server.go -tls=true
    ```
    
    
    ```sh
    $ go run client/client.go -tls=true
    ```
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  • 相关阅读:
    查看docker run 时启动命令
    Web基础与HTTP协议
    设计师设计相关图表时,如何运用设计技巧与合理的用户体验?【大屏可视化(PC端、移动端)】
    DigiCert证书——银行官网的首选
    卡塔尔世界杯倒计时!世界杯直播在哪里观看?美家市场汇总来了!
    【算法】二叉树的存储与遍历模板
    6月适配进展|优炫数据库与40余款产品完成兼容认证
    orderby是如何工作的?
    Python学习01、计算机基础概念、初识Python、常量,变量,类型和表达式、字符串、动态静态类型、注释
    在R中通过正则化表达式提取向量中的正负数
  • 原文地址:https://blog.csdn.net/u013433591/article/details/127505091