• go-kit-consul client服务发现源码分析


    背景:在我之前一篇博客文章(golang实现的商城)中
    曾经大言不惭地说到 后续可能会考虑douyu-jupiter微服务或者dubbo-go服务,目前为止微服务版确实在做,并且server端功能已经完成,只不过是用go-kit实现。刚完成直连调用。项目架构先简单说下:
    项目简单分为5个模块,其中 “前端” 2个,后端3个。
    前端

    • bal (bisiness access layer): 后台管理界面http controller层(接入浏览器http请求)
    • applet: 小程序 http controller 层(接入wx applet http请求)

    后端

    • user:用户模块(管理员,会员,会员地址,反馈,帮助…)
    • product: 商品模块(spu/sku/stock/banner…)
    • order: 订单模块(order/cart/refund/payment…)

    通讯方式为grpc(就这一句话,让我写了一大片的encoder/decoder/proto)…

    回到文本正题,go-kit服务发现(service discovery)有
    sd
    那在我这儿没得商量喽,先用consul搞起来!

    server端注册代码片断如下:

    user.RegisterAdminMgrServer(baseServer, grpcAdminServer)
    pbFeedback.RegisterFeedbackServer(baseServer, feedbackSvr)
    // Register其它Server,如果有的话....
    
    // 当前(本地测试)server服务实例注册到consul中
    var apiClient *consulapi.Client
    apiClient, err = consulapi.NewClient(&consulapi.Config{Address: "你的consul地址:8500"})
    if err != nil {
    	log.Fatalf("consul new client err:%v\n", err)
    }
    client := consul.NewClient(apiClient)
    r := &consulapi.AgentServiceRegistration{
    	ID: "user-ID-01", Name: "user", Tags: []string{"test"}, Port: 8091, // @todo 写到配置中, 8091是我user模块服务启动占用的端口
    	Address: "127.0.0.1", EnableTagOverride: false,
    	//Check: &consulapi.AgentServiceCheck{
    	//	HTTP: "http://127.0.0.1" + common.AppConfig.DebugAddr,
    	//	Interval: "5s", Timeout: "5s",
    	//}, // health check
    }
    registrar := consul.NewRegistrar(client, r, log.With(log.NewLogfmtLogger(os.Stdout), "user", "registrar"))
    registrar.Register()
    defer registrar.Deregister()
    return baseServer.Serve(listener)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    client端服务发现:

    func getAdminServiceConsul(svcName string, tags []string) (admSvc.AdminService, error) {
    	// 先从缓存中取....没有时往下:
    	apiClient, err := consulapi.NewClient(&consulapi.Config{Address: "你的consul服务地址:8500"})
    	if err != nil {
    		log.Printf("consul new client err:%v\n", err)
    		return nil, err
    	}
    	client := consul.NewClient(apiClient)
    	logger := log.NewLogfmtLogger(os.Stdout)
    	instancer := consul.NewInstancer(client, logger, svcName, tags, true)      <1>
    	var retryMax = 3
    	var retryTimeout = 1 * time.Second
    	var admSet endpoints.AdminSet
    	{
    		factory := factoryForAdmin(endpoints.MakeGetAdminByUpEndpoint)
    		endpointer := sd.NewEndpointer(instancer, factory, logger)  <2>
    		balancer := lb.NewRoundRobin(endpointer)
    		retry := lb.Retry(retryMax, retryTimeout, balancer)
    		admSet.GetAdminByUpEndpoint = retry
    	}
    	{
    		factory := factoryForAdmin(endpoints.MakePageAdminEndpoint)
    		endpointer := sd.NewEndpointer(instancer, factory, logger)
    		balancer := lb.NewRoundRobin(endpointer)
    		retry := lb.Retry(retryMax, retryTimeout, balancer)
    		admSet.PageAdminEndpoint = retry
    	}
    	// ... 其它 业务 endpoints...
    	
    	return &admSet, nil
    }
    func factoryForAdmin(makeEndpoint func(admSvc.AdminService) endpoint.Endpoint) sd.Factory {
    
    	return func(instance string) (endpoint.Endpoint, io.Closer, error) {
    		// 得到 EndpointSet, 它实现了biz service 接口
    		svr, err := makeClientEndpoints(instance)
    		if err != nil {
    			return nil, nil, err
    		}
    		// 有了service实现,然后用endpoint包起来,这是client端的 endpoint包装
    		// 对比server端的biz service实现(调用dao,组装业务出参...),然后在外围用endpoints包装起来
    		// 这里也是,只不过是client side 的
    		return makeEndpoint(svr), nil, nil
    	}
    }
    func makeClientEndpoints(instance string) (admSvc.AdminService, error) {
    	var err error
    	conn, err = grpc.Dial(instance, grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		return nil, err
    	}
    	svc = transport.NewAdminGrpcClient(conn)
    	return svc, nil
    }
    func NewAdminGrpcClient(conn *grpc.ClientConn) service.AdminService {
    
    	//limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 5))
    
    	var getAdminByUpEndpoint endpoint.Endpoint
    	{
    		getAdminByUpEndpoint = grpctransport.NewClient(
    			conn,
    			"pb.AdminMgr",  // 通过生成的pb.go文件中搜索 ServiceName 查看
    			"getAdminByUp", // 通过生成的pb.go文件中搜索 MethodName 查看
    			encodeGrpcGetAdminByUpReq,
    			decodeGrpcGetAdminByUpResp,
    			user.Admin{},
    		).Endpoint()
    
    		getAdminByUpEndpoint = limiter(getAdminByUpEndpoint)
    		getAdminByUpEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
    			Name:    "getAdminByUp",
    			Timeout: 5 * time.Second,
    		}))(getAdminByUpEndpoint)
    	}
    	var pageAdminEndpoint endpoint.Endpoint
    	{
    		pageAdminEndpoint = grpctransport.NewClient(
    			conn,
    			"pb.AdminMgr", // 通过生成的pb.go文件中搜索 ServiceName 查看
    			"PageAdmin",   // 通过生成的pb.go文件中搜索 MethodName 查看
    			encodeGrpcPageAdminReq,
    			decodeGrpcPageAdminResp,
    			user.Page{},
    		).Endpoint()
    
    		pageAdminEndpoint = limiter(pageAdminEndpoint)
    		pageAdminEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
    			Name:    "pageAdmin",
    			Timeout: 3 * time.Second,
    		}))(pageAdminEndpoint)
    	}
    	return &endpoints.AdminSet{
    		GetAdminByUpEndpoint:     getAdminByUpEndpoint,
    		PageAdminEndpoint:        pageAdminEndpoint,
    		// @ todo 后续功能往后排
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    点到为止,有了架子后,咱们分析consul client sd, 就上面的2行代码,粘贴如下:

    // NewInstancer returns a Consul instancer that publishes instances for the
    // requested service. It only returns instances for which all of the passed tags
    // are present.
    instancer := consul.NewInstancer(client, logger, svcName, tags, true)      <1>
    // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
    // and uses factory f to create Endpoints.
    endpointer := sd.NewEndpointer(instancer, factory, logger)  <2>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    <1>说明
    在这里插入图片描述

    <2>说明
    在这里插入图片描述
    最后再把源码贴一下,里面有一些细节没说到,比如根据tag filter,event copy, endpointerOptions设置当服务实例无效时的操作及失效timeout…

    func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
    	s := &Instancer{
    		cache:       instance.NewCache(),
    		client:      client,
    		logger:      log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
    		service:     service,
    		tags:        tags,
    		passingOnly: passingOnly,
    		quitc:       make(chan struct{}),
    	}
    
    	instances, index, err := s.getInstances(defaultIndex, nil)
    	if err == nil {
    		s.logger.Log("instances", len(instances))
    	} else {
    		s.logger.Log("err", err)
    	}
    
    	s.cache.Update(sd.Event{Instances: instances, Err: err})
    	go s.loop(index)
    	return s
    }
    
    func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
    	opts := endpointerOptions{}
    	for _, opt := range options {
    		opt(&opts)
    	}
    	se := &DefaultEndpointer{
    		cache:     newEndpointCache(f, logger, opts),
    		instancer: src,
    		ch:        make(chan Event),
    	}
    	go se.receive()
    	src.Register(se.ch)
    	return se
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
  • 相关阅读:
    基本类型转换和引用类型转换
    python练习:赋值运算 => 输入身高,体重,求BMI = 体重(kg)/身高(m)的平方。
    机器人命令表设计
    NODEJS版本管理工具
    【Redis】Zset 有序集合命令
    基于用户行为的交易反欺诈探索
    鲜花网络专送站
    Redis与分布式-主从复制
    qt环境配置
    Word控件Spire.Doc 【文档操作】教程(五):在 C#、VB.NET 中合并、设置多个 Word 文档
  • 原文地址:https://blog.csdn.net/csdnfanguyinheng/article/details/125622540