• 浅谈etcd服务注册与发现


    Hello朋友们,在之前参加云原生活动的时候曾写过一篇文章《浅谈云原生技术组件—etcd》,在其中我主要说明了etcd在基于Kubernetes云原生微服务框架中的定位,主要是用来做服务的远程配置、KV存储等等,那么今天就来简要的补充讲解下etcd的另一个重要的作用——服务注册和发现,没错,正是和Zookeeper、Eureka、Consul等拥有一样角色的开源微服务组件,且毫不逊色于这些,那么我们就开始进行讲解。

    1 基于etcd的服务注册与发现逻辑架构

    1.1 服务注册中心抽象

    在这里插入图片描述
    (图片来自网络)

    • Service Registry(服务注册表,通常也成为服务注册中心):内部拥有一个数据结构,用于存储已发布服务的配置信息。注册中心的作用一句话概括就是存放和调度服务的配置,实现服务和注册中心,服务和服务之间的相互通信,可以说是微服务中的”通讯录“,它记录了服务和服务地址的映射关系。
    • Service Requestor(服务调用者):根据服务注册中心调用已有服务。
    • Service Provider(服务提供者):提供服务到服务注册中心。
    1.2 etcd服务注册发现简易版

    在这里插入图片描述

    2 代码实现

    2.1 总体流程

    服务提供者

    (1)监听网络

    (2)创建gRPC服务端,并将具体的服务进行注册

    (3)利用服务地址、服务名等注册etcd服务配置

    (4)gRPC监听服务

    服务消费者

    (1)注册etcd解析器

    (2)连接etcd服务

    (3)获取gRPC客户端

    (4)调用gRPC服务

    2.2 代码
    2.2.1 服务提供方
    var (
       cli         *clientv3.Client
       Schema      = "ns"
       Host        = "127.0.0.1"
       Port        = 3000              //端口
       ServiceName = "api_log_service" //服务名称
       EtcdAddr    = "127.0.0.1:2379"  //etcd地址
    )
    
    type ApiLogServer struct{}
    
    func (api *ApiLogServer) GetApiLogByUid(ctx context.Context, req *proto.ApiLogRequest) (*proto.ApiLogResponse, error) {
       resp := &proto.ApiLogResponse{
          Msg:  "ok",
          Data: "Hello",
       }
       return resp, nil
    }
    
    //将服务地址注册到etcd中
    func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
       var err error
       if cli == nil {
          cli, err = clientv3.New(clientv3.Config{
             Endpoints:   strings.Split(etcdAddr, ";"),
             DialTimeout: 50 * time.Second,
          })
          if err != nil {
             fmt.Printf("connection server err : %s\n", err)
             return err
          }
       }
       //与etcd建立长连接,并保证连接不断(心跳检测)
       ticker := time.NewTicker(time.Second * time.Duration(ttl))
       go func() {
          key := "/" + Schema + "/" + serviceName + "/" + serverAddr
          for {
             resp, err := cli.Get(context.Background(), key)
             if err != nil {
                fmt.Printf("get server address err : %s", err)
             } else if resp.Count == 0 { //尚未注册
                err = keepAlive(serviceName, serverAddr, ttl)
                if err != nil {
                   fmt.Printf("keepAlive err : %s", err)
                }
             }
             <-ticker.C
          }
       }()
       return nil
    }
    
    //保持服务器与etcd的长连接
    func keepAlive(serviceName, serverAddr string, ttl int64) error {
       //创建租约
       leaseResp, err := cli.Grant(context.Background(), ttl)
       if err != nil {
          fmt.Printf("create grant err : %s\n", err)
          return err
       }
       //将服务地址注册到etcd中
       key := "/" + Schema + "/" + serviceName + "/" + serverAddr
       _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
       if err != nil {
          fmt.Printf("register service err : %s", err)
          return err
       }
       //建立长连接
       ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
       if err != nil {
          fmt.Printf("KeepAlive err : %s\n", err)
          return err
       }
       //清空keepAlive返回的channel
       go func() {
          for {
             <-ch
          }
       }()
       return nil
    }
    
    //取消注册
    func unRegister(serviceName, serverAddr string) {
       if cli != nil {
          key := "/" + Schema + "/" + serviceName + "/" + serverAddr
          cli.Delete(context.Background(), key)
       }
    }
    
    func RunApiLog() {
       //监听网络
       listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", Port))
       if err != nil {
          fmt.Println("Listen network err :", err)
          return
       }
       defer listener.Close()
       //创建grpc
       srv := grpc.NewServer()
       defer srv.GracefulStop()
       //注册到grpc服务中
       proto.RegisterApiLogServiceServer(srv, &ApiLogServer{})
       //将服务地址注册到etcd中
       serverAddr := fmt.Sprintf("%s:%d", Host, Port)
       fmt.Printf("rpc server address: %s\n", serverAddr)
       register(EtcdAddr, ServiceName, serverAddr, 10)
       //关闭信号处理
       ch := make(chan os.Signal, 1)
       signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
       go func() {
          s := <-ch
          unRegister(ServiceName, serverAddr)
          if i, ok := s.(syscall.Signal); ok {
             os.Exit(int(i))
          } else {
             os.Exit(0)
          }
       }()
       //监听服务
       err = srv.Serve(listener)
       if err != nil {
          fmt.Println("rpc server err : ", err)
          return
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    2.2.2 服务消费方
    var (
       cli         *clientv3.Client
       Schema      = "ns"
       ServiceName = "api_log_service" //服务名称
       EtcdAddr    = "127.0.0.1:2379"  //etcd地址
    )
    
    type EtcdResolver struct {
       etcdAddr   string
       clientConn resolver.ClientConn
    }
    
    func NewEtcdResolver(etcdAddr string) resolver.Builder {
       return &EtcdResolver{etcdAddr: etcdAddr}
    }
    
    func (r *EtcdResolver) Scheme() string {
       return Schema
    }
    
    //ResolveNow watch有变化调用
    func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
       fmt.Println(rn)
    }
    
    //Close 解析器关闭时调用
    func (r *EtcdResolver) Close() {
       fmt.Println("Close")
    }
    
    //Build 构建解析器 grpc.Dial()时调用
    func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
       var err error
       //构建etcd client
       if cli == nil {
          cli, err = clientv3.New(clientv3.Config{
             Endpoints:   strings.Split(r.etcdAddr, ";"),
             DialTimeout: 15 * time.Second,
          })
          if err != nil {
             fmt.Printf("connect etcd err : %s\n", err)
             return nil, err
          }
       }
       r.clientConn = clientConn
       go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
       return r, nil
    }
    
    //watch机制:监听etcd中某个key前缀的服务地址列表的变化
    func (r *EtcdResolver) watch(keyPrefix string) {
       //初始化服务地址列表
       var addrList []resolver.Address
       resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
       if err != nil {
          fmt.Println("get service list err : ", err)
       } else {
          for i := range resp.Kvs {
             addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
          }
       }
       r.clientConn.NewAddress(addrList)
       //监听服务地址列表的变化
       rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
       for n := range rch {
          for _, ev := range n.Events {
             addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
             switch ev.Type {
             case mvccpb.PUT:
                if !exists(addrList, addr) {
                   addrList = append(addrList, resolver.Address{Addr: addr})
                   r.clientConn.NewAddress(addrList)
                }
             case mvccpb.DELETE:
                if s, ok := remove(addrList, addr); ok {
                   addrList = s
                   r.clientConn.NewAddress(addrList)
                }
             }
          }
       }
    }
    
    func exists(l []resolver.Address, addr string) bool {
       for i := range l {
          if l[i].Addr == addr {
             return true
          }
       }
       return false
    }
    
    func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
       for i := range s {
          if s[i].Addr == addr {
             s[i] = s[len(s)-1]
             return s[:len(s)-1], true
          }
       }
       return nil, false
    }
    
    func RunClient() {
       //注册etcd解析器
       r := NewEtcdResolver(EtcdAddr)
       resolver.Register(r)
       //连接服务器,同步调用r.Build()
       conn, err := grpc.Dial(r.Scheme()+"://author/"+ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
       if err != nil {
          fmt.Printf("connect err : %s", err)
       }
       defer conn.Close()
       //获得gRPC客户端
       c := proto.NewApiLogServiceClient(conn)
       //调用服务
       resp, err := c.GetApiLogByUid(
          context.Background(),
          &proto.ApiLogRequest{UId: 0},
       )
       if err != nil {
          fmt.Printf("call service err : %s", err)
          return
       }
       fmt.Printf("resp : %s , data : %s", resp.Msg, resp.Data)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    2.2.3 公共组件
    syntax = "proto3";  
    package proto; 
    
    option go_package = "../api_log";
    
    service ApiLogService {
      rpc GetApiLogByUid(ApiLogRequest) returns (ApiLogResponse){}
    }
    
    message ApiLogRequest{
      int32 u_id = 1;
    }
    
    message ApiLogResponse{
      int64 code = 1;
      string msg = 2;
      int64 count = 3;
      string data = 4;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    注意要在编译后进行使用哈

    2.3 注意事项

    在我编写代码进行实现的过程中遇到过种种问题,但是最让人记忆深刻的就是etcd与gRPC版本不兼容的问题,用了很长时间才搞定,在这里记录下吧:

    原因是etcd3.x版本不支持grpc1.27版本以上,但是grpc1.27以下编译成的中间代码又不支持新版本的proto buffer,这就陷入了一个两难的处境,最后通过Stack Overflow才查到:

    https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc

    解决,在go.mod中加入这几行代码:

    replace (
       github.com/coreos/etcd => github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible
       google.golang.org/grpc => google.golang.org/grpc v1.27.0
    )
    
    • 1
    • 2
    • 3
    • 4

    3 细节剖析

    3.1 服务生产端keepAlive

    keepAlive是一个老生常谈的问题了,下到TCP/IP、HTTP连接,上到Redis集群、MySQL集群,都会有该机制,那么etcd的keepAlive是怎么搞的呢?

    下面我们来看下

    etcd使用LeaseKeepAlive API调用创建的双向流来刷新租约。当客户端希望刷新租约时,它通过流发送一个leasekeepaliverrequest:

    message LeaseKeepAliveRequest {
      int64 ID = 1;
    }
    
    • 1
    • 2
    • 3
    • ID :keepAlive有效的租约ID。

    LeaseKeepAliveResponse作为keepAlive的响应:

    message LeaseKeepAliveResponse {
      ResponseHeader header = 1;
      int64 ID = 2;
      int64 TTL = 3;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • ID :用新的TTL刷新的租约。
    • TTL :新的生存时间,以秒为单位,租约剩余的时间。
    3.2 服务消费端watch机制

    Watch API提供了一个基于事件的接口,用于异步监视服务key的更改。etcd3 watch通过持续观察给定的修订(当前的或历史的)来等待键的更改,并将键更新流回客户端。

    对每个键的每次更改都用“Event”消息表示。Event消息提供了更新的数据和更新的类型:

    message Event {
      enum EventType {
        PUT = 0;
        DELETE = 1;
      }
      EventType type = 1;
      KeyValue kv = 2;
      KeyValue prev_kv = 3;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • type:PUT类型表示新数据的更新,DELETE表示key的删除。
    • kv:与事件相关的键值PUT事件包含kv。
    • prev_kv:事件发生前修改版本的密钥的键值对。为了节省带宽,它只在watch显式启用的情况下填写。

    watch流:

    watch是长时间运行的请求,并使用gRPC流来流化事件数据。watch流是双向的;客户端写入流来建立监视,读取流来接收监视事件。通过使用每个watch标识符来标记事件,单个watch流可以将多个不同的手表组合在一起。这种多路复用有助于减少核心etcd集群上的内存占用和连接开销。

    4 总结

    微服务是当今互联网领域的广泛概念,也是一种架构演进的结果,微服务的存在让架构设计更加的解耦合,让人员的分工更加明确,当然他的落地实现也并不止步与某一两种方式,在云原生领域的Kubernetes+etcd,互联网领域常用的Spring Cloud全家桶以及Dubbo等都是微服务的具体实现,而etcd也仅仅是微服务中服务注册中心组件角色的一个代表而已。

    参考:

    https://etcd.io/docs/v3.5/dev-guide/grpc_naming/

    https://www.jianshu.com/p/217d0e3a8d0f

    https://www.cnblogs.com/wujuntian/p/12838041.html

    https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc

    https://blog.csdn.net/fly910905/article/details/103545120

  • 相关阅读:
    Joe主题魔改:正文内容实现图片懒加载
    老测试/开发程序员给年轻程序员的一些建议,小码农搬砖工人......
    Linux学习第18天:Linux并发与竞争: 没有规矩不成方圆
    Keil5中复制粘贴中文乱码解决
    Android SensorManager学习
    机器学习数据挖掘十大经典算法 数学建模常用算法
    硬件基础 -比较器
    【转载】DC-DC自举电容(BOOT)几个问题
    elasticsearch-analysis-dynamic-synonym同义词插件实现热更
    Base64编码原理
  • 原文地址:https://blog.csdn.net/Mr_YanMingXin/article/details/126452076