• GRPC整体学习


    What

    是一款高性能,跨语言,跨平台的rpc框架

    Why

    优点

    1. protobuf二进制消息,性能好/效率高(空间和时间效率都很不错)

    1. GRPC可以通过protobuf来定义接口,从而可以有更加严格的接口约束条件

    缺点

    1. GRPC尚未提供连接池,需要自行实现

    1. 尚未提供“服务发现”、“负载均衡”机制

    1. Protobuf二进制可读性差

    How

    1. 添加依赖

      1. https://github.com/grpc/grpc-java/blob/master/README.md

    1. 定义pb文件

    2.  

    3. 通过插件生成java文件

    1.  

    源码分析

    总体架构

     

    服务注册(Registry Pack)负责对服务描述、接口描述等信息进行注册,以供方法调用模块查询;

    方法调用(Call Pack)为真正的方法调用逻辑,最终它会调用到我们实现的服务接口对应的方法;

    网络流(Stream Pack)是对方法调用的网络会话的封装,即一次方法调用为一个流;

    传输逻辑(Transport Pack)负责构建真正的底层IO数据传输和相应的事件监听器;

    服务管理(Server Pack)负责逻辑服务ServerImpl和监听服务NettyServer的构建与启动。它是整个服务端逻辑的功能工厂;

    服务调用原理

    服务调用方式

    • 普通 RPC 调用方式,即请求 - 响应模式。

    • 基于 HTTP/2.0 的 streaming 调用方式。

    普通调用-同步阻塞调用

    代码

    blockingStub = GreeterGrpc.newBlockingStub(channel);

    ...

    HelloRequest request = HelloRequest.newBuilder().setName(name).build();

    HelloReply response;

    try {

    response = blockingStub.sayHello(request);

    ...

    原理

    同步服务调用是由 gRPC 框架的 ClientCalls 在框架层做了封装,异步发起服务调用之后,同步阻塞调用方线程,直到收到响应再唤醒被阻塞的业务线程

     

    基于 Future 的异步 RPC 调用

    代码

    HelloRequest request = HelloRequest.newBuilder().setName(name).build();

    try {

    com.google.common.util.concurrent.ListenableFuture

    listenableFuture = futureStub.sayHello(request);

    Futures.addCallback(listenableFuture, new FutureCallback() {

    @Override

    public void onSuccess(@Nullable HelloReply result) {

    logger.info("Greeting: " + result.getMessage());

    }

    原理

    将 ListenableFuture 加入到 gRPC 的 Future 列表中,创建一个新的 FutureCallback 对象,当 ListenableFuture 获取到响应之后,gRPC 的 DirectExecutor 线程池会调用新创建的 FutureCallback,执行 onSuccess 或者 onFailure,实现异步回调通知。

     

    Reactive 风格异步 RPC 调用

    代码

    HelloRequest request = HelloRequest.newBuilder().setName(name).build();

    io.grpc.stub.StreamObserver responseObserver =

    new io.grpc.stub.StreamObserver()

    {

    public void onNext(HelloReply value)

    {

    logger.info("Greeting: " + value.getMessage());

    }

    public void onError(Throwable t){

    logger.warning(t.getMessage());

    }

    public void onCompleted(){}

    };

    stub.sayHello(request,responseObserver);

    原理

     

    将响应 StreamObserver 作为入参传递到异步服务调用中,该方法返回空,程序继续向下执行,不阻塞当前业务线程

    Streaming 模式服务调用

    • 服务端 streaming

    • 客户端 streaming

    • 服务端和客户端双向 streaming

    服务端streaming

    代码

    rpc ListFeatures(Rectangle) returns (stream Feature) {}

    public void listFeatures(io.grpc.examples.routeguide.Rectangle request,

    io.grpc.stub.StreamObserver responseObserver) {

    asyncUnimplementedUnaryCall(METHOD_LIST_FEATURES, responseObserver);

    }

    原理

     

    构造 io.grpc.stub.StreamObserver responseObserver,实现它的三个回调接口,注意由于是服务端 streaming 模式,所以它的 onNext(Feature value) 将会被回调多次,每次都代表一个响应,如果所有的响应都返回,则会调用 onCompleted() 方法。

    客户端 streaming

    代码

    rpc RecordRoute(stream Point) returns (RouteSummary) {}

    StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);

    try {

    // Send numPoints points randomly selected from the features list.

    for (int i = 0; i < numPoints; ++i) {

    int index = random.nextInt(features.size());

    Point point = features.get(index).getLocation();

    info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),

    RouteGuideUtil.getLongitude(point));

    requestObserver.onNext(point);

    原理

     

    异步服务调用获取请求 StreamObserver 对象,循环调用 requestObserver.onNext(point),异步发送请求消息到服务端,发送完成之后,调用 requestObserver.onCompleted(),通知服务端所有请求已经发送完成,可以接收服务端的响应了。

    双向 streaming

    代码

    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

    StreamObserver requestObserver =

    asyncStub.routeChat(new StreamObserver() {

    @Override

    public void onNext(RouteNote note) {

    info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()

    .getLatitude(), note.getLocation().getLongitude());

    if (testHelper != null) {

    testHelper.onMessage(note);

    }

    }

    //发送多个请求

    for (RouteNote request : requests) {

    info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()

    .getLatitude(), request.getLocation().getLongitude());

    requestObserver.onNext(request);

    }

    } catch (RuntimeException e) {

    // Cancel RPC

    requestObserver.onError(e);

    throw e;

    }

    // Mark the end of requests

    requestObserver.onCompleted();

    原理

     

    构造 Streaming 响应对象 StreamObserver并实现 onNext 等接口,由于服务端也是 Streaming 模式,因此响应是多个的,也就是说 onNext 会被调用多次。

    通过在循环中调用 requestObserver 的 onNext 方法,发送请求消息

  • 相关阅读:
    springboot的jar包启动时指定加载的配置文件
    09_Webpack打包工具
    Linux内核的配置和编译(3)——Linux内核加载自己的驱动(编译最新内核Linux-5.18.7)
    09 配置文件&日志&多线程
    字节跳动社招内推,长期有效,长期有效,长期有效
    康力健身中心
    LabVIEW计算相机图像传感器分辨率以及镜头焦距
    Linux:进程概念的引入和理解
    Windows安装Linux子系统
    CNCF项目全景图介绍
  • 原文地址:https://blog.csdn.net/qq_29857681/article/details/126229434