• Kitex微服务开发实践:ETCD服务注册


    服务注册通常用于分布式系统或微服务架构中,是一种用于管理和发现这些分布式服务的机制。它的目标是让服务能够动态地找到其他服务,并能够与其进行通信,而无需显式地配置其位置信息

    本文简单讲述使用etcd进行服务注册,基于kitex和hertz框架简单实现微服务应用

    代码地址:https://github.com/T4t4KAU/Documents/tree/main/etcd-test

    接口定义

    使用thrift编写如下idl

    add.thrift

    namespace go api
    struct AddRequest {
    1: i32 first
    2: i32 second
    }
    struct AddResponse {
    1: i32 sum
    }
    service AddService {
    AddResponse Add(AddRequest req)
    }

    echo.thrift

    namespace go api
    struct EchoRequest {
    1: string message
    }
    struct EchoResponse {
    2: string message
    }
    service EchoService {
    EchoResponse Echo(1:EchoRequest req)
    }

    使用Kitex生成代码(此处不作赘述):

    .
    ├── add
    │   ├── handler.go
    │   └── main.go
    ├── build.sh
    ├── echo
    │   ├── handler.go
    │   └── main.go
    ├── go.mod
    ├── go.sum
    ├── idl
    │   ├── add.thrift
    │   └── echo.thrift
    ├── kitex_gen
    │   └── api
    │   ├── add.go
    │   ├── addservice
    │   │   ├── addservice.go
    │   │   ├── client.go
    │   │   ├── invoker.go
    │   │   └── server.go
    │   ├── echo.go
    │   ├── echoservice
    │   │   ├── client.go
    │   │   ├── echoservice.go
    │   │   ├── invoker.go
    │   │   └── server.go
    │   ├── k-add.go
    │   ├── k-consts.go
    │   └── k-echo.go
    ├── kitex_info.yaml
    └── script
    └── bootstrap.sh

    echo服务将参数message原样返回,add服务将参数中的两个整数求和后返回

    代码实现

    首先实现一个无服务注册的版本,在上述生成代码的基础上完善方法实现:

    add/hander.go

    // Add implements the AddServiceImpl interface.
    func (s *AddServiceImpl) Add(ctx context.Context, req *api.AddRequest) (resp *api.AddResponse, err error) {
    resp = new(api.AddResponse)
    resp.Sum = req.First + req.Second
    return
    }

    add/main.go

    package main
    func main() {
    // 服务地址
    addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9091")
    svr := api.NewServer(new(AddServiceImpl),
    server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "add"}),
    server.WithServiceAddr(addr),
    )
    err := svr.Run()
    if err != nil {
    log.Println(err.Error())
    }
    }

    echo/handler.go

    // Echo implements the EchoServiceImpl interface.
    func (s *EchoServiceImpl) Echo(ctx context.Context, req *api.EchoRequest) (resp *api.EchoResponse, err error) {
    resp = new(api.EchoResponse)
    resp.Message = req.Message
    return
    }

    echo/main.go

    package main
    func main() {
    // 服务地址
    addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9092")
    svr := api.NewServer(new(EchoServiceImpl),
    server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "echo"}),
    server.WithServiceAddr(addr),
    )
    err := svr.Run()
    if err != nil {
    log.Println(err.Error())
    }
    }

    创建一个api包,作为两个服务的API网关,使用hertz来处理用户请求

    目录结构:

    .
    ├── add
    │   ├── handler.go
    │   └── main.go
    ├── api
    │   ├── add.go
    │   ├── echo.go
    │   └── main.go
    ├── build.sh
    ├── echo
    │   ├── handler.go
    │   └── main.go
    ├── go.mod
    ├── go.sum
    ├── idl
    │   ├── add.thrift
    │   └── echo.thrift
    ├── kitex_gen
    │   └── api
    │   ├── add.go
    │   ├── addservice
    │   │   ├── addservice.go
    │   │   ├── client.go
    │   │   ├── invoker.go
    │   │   └── server.go
    │   ├── echo.go
    │   ├── echoservice
    │   │   ├── client.go
    │   │   ├── echoservice.go
    │   │   ├── invoker.go
    │   │   └── server.go
    │   ├── k-add.go
    │   ├── k-consts.go
    │   └── k-echo.go
    ├── kitex_info.yaml
    └── script
    └── bootstrap.sh

    实现web handler函数,在handler中使用RPC请求服务

    add:

    package main
    var addClient addservice.Client
    func AddHandler(ctx context.Context, c *app.RequestContext) {
    num1, _ := strconv.Atoi(c.Query("first"))
    num2, _ := strconv.Atoi(c.Query("second"))
    // RPC请求
    resp, err := addClient.Add(ctx, &api.AddRequest{
    First: int32(num1), Second: int32(num2),
    })
    if err != nil {
    c.JSON(http.StatusInternalServerError, utils.H{
    "message": err.Error(),
    })
    return
    }
    c.JSON(http.StatusOK, utils.H{
    "message": resp.Sum,
    })
    }

    echo:

    package main
    var echoClient echoservice.Client
    func EchoHandler(ctx context.Context, c *app.RequestContext) {
    // RPC请求
    resp, err := echoClient.Echo(ctx, &api.EchoRequest{
    Message: c.Query("message"),
    })
    if err != nil {
    c.JSON(http.StatusInternalServerError, utils.H{
    "message": err.Error(),
    })
    }
    c.JSON(http.StatusOK, utils.H{
    "message": resp.Message,
    })
    }

    在主调函数中进行RPC客户端初始化和路由注册:

    package main
    func Init() {
    var err error
    addClient, err = addservice.NewClient("addservice",
    client.WithHostPorts("127.0.0.1:9091"))
    if err != nil {
    panic(err)
    }
    echoClient, err = echoservice.NewClient("echoservice",
    client.WithHostPorts("127.0.0.1:9092"))
    if err != nil {
    panic(err)
    }
    }
    func main() {
    Init()
    r := server.Default()
    r.POST("/add", AddHandler)
    r.GET("/echo", EchoHandler)
    r.Spin()
    }

    启动add和echo服务,再启动api,请求URL即可访问,此处省略测试过程

    引入服务注册

    在上述实现中,要显式的告诉API网关服务的地址,一旦这个地址发生变化,会导致服务不可用,接下来使用服务注册

    使用docker启动etc:

    docker run -d -p 10079:2379 --name etcd \
    -e ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 \
    -e ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 \
    -e ETCDCTL_API=3 \
    quay.io/coreos/etcd:v3.5.5

    etcd服务器启动在10079端口

    接下来修改服务代码:

    add/main.go

    package main
    func main() {
    // 服务注册
    r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:10079"})
    if err != nil {
    panic(err)
    }
    addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9091")
    svr := api.NewServer(new(AddServiceImpl),
    server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "add"}), // 指定服务名称
    server.WithServiceAddr(addr),
    server.WithRegistry(r), // 设置服务注册
    )
    err = svr.Run()
    if err != nil {
    log.Println(err.Error())
    }
    }

    echo/main.go

    package main
    func main() {
    // 服务注册
    r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:10079"})
    if err != nil {
    panic(err)
    }
    addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9092")
    svr := api.NewServer(new(EchoServiceImpl),
    server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: "echo"}), // 指定服务名称
    server.WithServiceAddr(addr),
    server.WithRegistry(r), // 设置服务注册
    )
    err = svr.Run()
    if err != nil {
    log.Println(err.Error())
    }
    }

    修改api代码,无须给出两个服务的地址了:

    package main
    func Init() {
    var err error
    r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:10079"})
    if err != nil {
    panic(err)
    }
    addClient, err = addservice.NewClient("add",
    client.WithResolver(r),
    )
    if err != nil {
    panic(err)
    }
    echoClient, err = echoservice.NewClient("echo",
    client.WithResolver(r),
    )
    if err != nil {
    panic(err)
    }
    }
    func main() {
    Init()
    r := server.Default()
    r.POST("/add", AddHandler)
    r.GET("/echo", EchoHandler)
    r.Spin()
    }

    启动之后,服务正常使用

  • 相关阅读:
    SpringMVC源码分析(一)启动流程分析
    使用gradio创建一个提取pdf、excel中表格数据的demo
    面试必备:HashMap底层源码原来是这么简单(分析)
    18.flink kafka使用thrift序列化
    4.JS高级和promise
    SpringMvc(四、统一异常处理
    keil配置After Bulid Run #1 Run #2
    【Leetcode小解析】正则表达式匹配
    【方法封装】时间格式化输出,获取请求设备和IP
    Docker简介
  • 原文地址:https://www.cnblogs.com/N3ptune/p/17595464.html