• 手写RPC Day3 服务注册


    手写RPC Day3 服务注册

    具体细节参考 传送门

    1.结构体映射为服务

    RPC 框架的一个基础能力是:像调用本地程序一样调用远程服务。那如何将程序映射为服务呢?那么对 Go 来说,这个问题就变成了如何将结构体的方法映射为服务。

    通过 “T.MethodName” 可以确定调用的是类型 T 的 MethodName,如果硬编码实现这个功能,很可能是这样:

    switch req.ServiceMethod {
        case "T.MethodName":
            t := new(t)
            reply := new(T2)
            var argv T1
            gob.NewDecoder(conn).Decode(&argv)
            err := t.MethodName(argv, reply)
            server.sendMessage(reply, err)
        case "Foo.Sum":
            f := new(Foo)
            ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    也就是说,如果使用硬编码的方式来实现结构体与服务的映射,那么每暴露一个方法,就需要编写等量的代码。那有没有什么方式,能够将这个映射过程自动化呢?可以借助反射。

    通过反射,我们能够非常容易地获取某个结构体的所有方法,并且能够通过方法,获取到该方法所有的参数类型与返回值。例如:

    func main() {
    	var wg sync.WaitGroup
    	typ := reflect.TypeOf(&wg)
    	for i := 0; i < typ.NumMethod(); i++ {
    		method := typ.Method(i)
    		argv := make([]string, 0, method.Type.NumIn())
    		returns := make([]string, 0, method.Type.NumOut())
    		// j 从 1 开始,第 0 个入参是 wg 自己。
    		for j := 1; j < method.Type.NumIn(); j++ {
    			argv = append(argv, method.Type.In(j).Name())
    		}
    		for j := 0; j < method.Type.NumOut(); j++ {
    			returns = append(returns, method.Type.Out(j).Name())
    		}
    		log.Printf("func (w *%s) %s(%s) %s",
    			typ.Elem().Name(),
    			method.Name,
    			strings.Join(argv, ","),
    			strings.Join(returns, ","))
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    运行的结果是:

    func (w *WaitGroup) Add(int)
    func (w *WaitGroup) Done()
    func (w *WaitGroup) Wait()
    
    • 1
    • 2
    • 3

    2.定义结构体 methodType

    每一个 methodType 实例包含了一个方法的完整信息。包括

    • method:方法本身
    • ArgType:第一个参数的类型
    • ReplyType:第二个参数的类型
    • numCalls:后续统计方法调用次数时会用到

    另外,我们还实现了 2 个方法 newArgvnewReplyv,用于创建对应类型的实例。newArgv 方法有一个小细节,指针类型和值类型创建实例的方式有细微区别。

    3.定义结构体 service

    type service struct {
    	name   string
    	typ    reflect.Type
    	rcvr   reflect.Value
    	method map[string]*methodType
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    service 的定义也是非常简洁的,name 即映射的结构体的名称,比如 T,比如 WaitGroup;typ 是结构体的类型;rcvr 即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0 个参数;method 是 map 类型,存储映射的结构体的所有符合条件的方法。

    接下来,完成构造函数 newService,入参是任意需要映射为服务的结构体实例。

    func newService(rcvr interface{}) *service {
    	s := new(service)
    	s.rcvr = reflect.ValueOf(rcvr)
    	s.name = reflect.Indirect(s.rcvr).Type().Name()
    	s.typ = reflect.TypeOf(rcvr)
    	if !ast.IsExported(s.name) {
    		log.Fatalf("rpc server: %s is not a valid service name", s.name)
    	}
    	s.registerMethods()
    	return s
    }
    
    func (s *service) registerMethods() {
    	s.method = make(map[string]*methodType)
    	for i := 0; i < s.typ.NumMethod(); i++ {
    		method := s.typ.Method(i)
    		mType := method.Type
    		if mType.NumIn() != 3 || mType.NumOut() != 1 {
    			continue
    		}
    		if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
    			continue
    		}
    		argType, replyType := mType.In(1), mType.In(2)
    		if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {
    			continue
    		}
    		s.method[method.Name] = &methodType{
    			method:    method,
    			ArgType:   argType,
    			ReplyType: replyType,
    		}
    		log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
    	}
    }
    
    func isExportedOrBuiltinType(t reflect.Type) bool {
    	return ast.IsExported(t.Name()) || t.PkgPath() == ""
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    registerMethods 过滤出了符合条件的方法:

    • 两个导出或内置类型的入参(反射时为 3 个,第 0 个是自身,类似于 python 的 self,java 中的 this)
    • 返回值有且只有 1 个,类型为 error

    最后,我们还需要实现 call 方法,即能够通过反射值调用方法。

    func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
    	atomic.AddUint64(&m.numCalls, 1)
    	f := m.method.Func
    	returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})
    	if errInter := returnValues[0].Interface(); errInter != nil {
    		return errInter.(error)
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    4.集成到服务端

    通过反射结构体已经映射为服务,但请求的处理过程还没有完成。从接收到请求到回复还差以下几个步骤:第一步,根据入参类型,将请求的 body 反序列化;第二步,调用 service.call,完成方法调用;第三步,将 reply 序列化为字节流,构造响应报文,返回。

    回到代码本身,补全之前在 server.go 中遗留的 2 个 TODO 任务 readRequesthandleRequest 即可。

    在这之前,我们还需要为 Server 实现一个方法 Register

    // Server represents an RPC Server.
    type Server struct {
    	serviceMap sync.Map
    }
    
    // Register publishes in the server the set of methods of the
    func (server *Server) Register(rcvr interface{}) error {
    	s := newService(rcvr)
    	if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {
    		return errors.New("rpc: service already defined: " + s.name)
    	}
    	return nil
    }
    
    // Register publishes the receiver's methods in the DefaultServer.
    func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    配套实现 findService 方法,即通过 ServiceMethod 从 serviceMap 中找到对应的 service

    func (server *Server) findService(serviceMethod string) (svc *service, mtype *methodType, err error) {
    	dot := strings.LastIndex(serviceMethod, ".")
    	if dot < 0 {
    		err = errors.New("rpc server: service/method request ill-formed: " + serviceMethod)
    		return
    	}
    	serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]
    	svci, ok := server.serviceMap.Load(serviceName)
    	if !ok {
    		err = errors.New("rpc server: can't find service " + serviceName)
    		return
    	}
    	svc = svci.(*service)
    	mtype = svc.method[methodName]
    	if mtype == nil {
    		err = errors.New("rpc server: can't find method " + methodName)
    	}
    	return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    findService 的实现看似比较繁琐,但是逻辑还是非常清晰的。因为 ServiceMethod 的构成是 “Service.Method”,因此先将其分割成 2 部分,第一部分是 Service 的名称,第二部分即方法名。现在 serviceMap 中找到对应的 service 实例,再从 service 实例的 method 中,找到对应的 methodType。

    准备工具已经就绪,我们首先补全 readRequest 方法:

    // request stores all information of a call
    type request struct {
    	h            *codec.Header // header of request
    	argv, replyv reflect.Value // argv and replyv of request
    	mtype        *methodType
    	svc          *service
    }
    
    func (server *Server) readRequest(cc codec.Codec) (*request, error) {
    	h, err := server.readRequestHeader(cc)
    	if err != nil {
    		return nil, err
    	}
    	req := &request{h: h}
    	req.svc, req.mtype, err = server.findService(h.ServiceMethod)
    	if err != nil {
    		return req, err
    	}
    	req.argv = req.mtype.newArgv()
    	req.replyv = req.mtype.newReplyv()
    
    	// make sure that argvi is a pointer, ReadBody need a pointer as parameter
    	argvi := req.argv.Interface()
    	if req.argv.Type().Kind() != reflect.Ptr {
    		argvi = req.argv.Addr().Interface()
    	}
    	if err = cc.ReadBody(argvi); err != nil {
    		log.Println("rpc server: read body err:", err)
    		return req, err
    	}
    	return req, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    readRequest 方法中最重要的部分,即通过 newArgv()newReplyv() 两个方法创建出两个入参实例,然后通过 cc.ReadBody() 将请求报文反序列化为第一个入参 argv,在这里同样需要注意 argv 可能是值类型,也可能是指针类型,所以处理方式有点差异。

    接下来补全 handleRequest 方法:

    func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
    	defer wg.Done()
    	err := req.svc.call(req.mtype, req.argv, req.replyv)
    	if err != nil {
    		req.h.Error = err.Error()
    		server.sendResponse(cc, req.h, invalidRequest, sending)
    		return
    	}
    	server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    相对于 readRequest,handleRequest 的实现非常简单,通过 req.svc.call 完成方法调用,将 replyv 传递给 sendResponse 完成序列化即可。

    到这里,今天的所有内容已经实现完成,成功在服务端实现了服务注册与调用。


    5.思路

    methodType自定义方法的结构体,用于创建方法的参数实例。

    service是定义服务的结构体,保存了服务映射到结构体的实例 和对应的方法。

    然后是集成到服务端。Server用map存每个服务名到服务实例的映射。

    然后写一个通过ServiceMethodT.methodName 找到对应的服务和其方法的function。

    然后重写readRequest、handleRequest两个方法。

    总体来说,主要思想就是:通过反射来是实现服务映射结构体。大大减少了代码量。同时完成了服务注册功能。


    6.测试

    第一步,定义结构体 Foo 和方法 Sum

    package main
    
    import (
    	"geerpc"
    	"log"
    	"net"
    	"sync"
    	"time"
    )
    
    type Foo int
    
    type Args struct{ Num1, Num2 int }
    
    func (f Foo) Sum(args Args, reply *int) error {
    	*reply = args.Num1 + args.Num2
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    第二步,注册 Foo 到 Server 中,并启动 RPC 服务

    func startServer(addr chan string) {
    	var foo Foo
    	if err := geerpc.Register(&foo); err != nil {
    		log.Fatal("register error:", err)
    	}
    	// pick a free port
    	l, err := net.Listen("tcp", ":0")
    	if err != nil {
    		log.Fatal("network error:", err)
    	}
    	log.Println("start rpc server on", l.Addr())
    	addr <- l.Addr().String()
    	geerpc.Accept(l)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    第三步,构造参数,发送 RPC 请求,并打印结果。

    func main() {
    	log.SetFlags(0)
    	addr := make(chan string)
    	go startServer(addr)
    	client, _ := geerpc.Dial("tcp", <-addr)
    	defer func() { _ = client.Close() }()
    
    	time.Sleep(time.Second)
    	// send request & receive response
    	var wg sync.WaitGroup
    	for i := 0; i < 5; i++ {
    		wg.Add(1)
    		go func(i int) {
    			defer wg.Done()
    			args := &Args{Num1: i, Num2: i * i}
    			var reply int
    			if err := client.Call("Foo.Sum", args, &reply); err != nil {
    				log.Fatal("call Foo.Sum error:", err)
    			}
    			log.Printf("%d + %d = %d", args.Num1, args.Num2, reply)
    		}(i)
    	}
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    运行结果如下:

    rpc server: register Foo.Sum
    start rpc server on [::]:57509
    1 + 1 = 2
    2 + 4 = 6
    3 + 9 = 12
    0 + 0 = 0
    4 + 16 = 20
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    HTML+CSS制作简单的家乡网页 ——我的家乡介绍广东 web前端期末大作业
    【Hadoop】hive工具使用的两种方式
    记录一次线上gitlab11.x升级gitlab14.x版本操作
    慎投:这两本期刊被剔除SCI/SSCI, 11月WOS数据库已更新~
    自动驾驶升级、开发模式生变,如何实现SOA软件架构快速落地?
    软件设计师_数据结构与算法基础_学习笔记
    噢!查重原来是这样实现的啊!
    这才是CSDN最系统的网络安全学习路线(建议收藏)
    【C++】STL之list深度剖析及模拟实现
    使用Vue实现弹窗效果
  • 原文地址:https://blog.csdn.net/weixin_45750972/article/details/127784293