• ETCD数据库源码分析——服务端PUT流程


    ETCD数据库源码分析——etcd gRPC 服务 API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。
    在这里插入图片描述
    RegisterKVServer函数定义在/api/etcdserverpb/rpc.pb.go文件中,这里pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体。

    func RegisterKVServer(s *grpc.Server, srv KVServer) {
    	s.RegisterService(&_KV_serviceDesc, srv)
    }
    
    • 1
    • 2
    • 3

    在/api/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到Put方法对应的函数_KV_Put_Handler。
    在这里插入图片描述
    从_KV_Put_Handler函数可以看到其最重要的语句就是srv.(KVServer).Put(ctx, in),其实就是调用的quotaKVServer.(KVServer).Put(ctx, in)

    func _KV_Put_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    	in := new(PutRequest)
    	if err := dec(in); err != nil { return nil, err }
    	if interceptor == nil { return srv.(KVServer).Put(ctx, in) }
    	info := &grpc.UnaryServerInfo{
    		Server:     srv,
    		FullMethod: "/etcdserverpb.KV/Put",
    	}
    	handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(KVServer).Put(ctx, req.(*PutRequest)) }
    	return interceptor(ctx, in, info, handler)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    因此这里从NewQuotaKVServer看出,quotaKVServer和pb.KVServer接口是组合关系,NewQuotaKVServer函数中调用NewKVServer函数初始化继承了pb.KVServer接口的kvServer结构体。

    // server/etcdserver/api/v3rpc/quota.go
    type quotaKVServer struct {
    	pb.KVServer
    	qa quotaAlarmer
    }
    func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
    	return &quotaKVServer{
    		NewKVServer(s),
    		quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    etcdserver.RaftKV接口继承了KVServer接口。而pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体,所以这里的etcdserver.RaftKV指向的就是etcdserver结构体。

    type kvServer struct {
    	hdr header // 用于填充响应消息的头信息
    	kv  etcdserver.RaftKV
    	// maxTxnOps is the max operations per txn.
    	// e.g suppose maxTxnOps = 128.
    	// Txn.Success can have at most 128 operations,
    	// and Txn.Failure can have at most 128 operations.
    	maxTxnOps uint
    }
    func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
    	return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
    }
    func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    	if err := checkPutRequest(r); err != nil { return nil, err }
    	resp, err := s.kv.Put(ctx, r)
    	if err != nil { return nil, togRPCError(err) }
    	s.hdr.fill(resp.Header)
    	return resp, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    kvServer结构体Put函数处理流程如下:首先会对请求消息进行各方面的检查,检查完之后会将所有的请求交给其内封装的RaftKV接口进行处理,待处理完成得到响应消息之后,会通过header.fill()方法填充响应的头信息,最后将完整的响应消息返回给客户端。因此,往下追踪 srv.(KVServer).Put(ctx, in)其实就是调用(s *EtcdServer) Put()函数。

    // server/etcdserver/v3_server.go
    func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    	ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
    	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    	if err != nil { return nil, err }
    	return resp.(*pb.PutResponse), nil
    }
    func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
    	return s.raftRequestOnce(ctx, r)
    }
    func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
    	result, err := s.processInternalRaftRequestOnce(ctx, r)
    	if err != nil { return nil, err }
    	if result.Err != nil { return nil, result.Err }
    	if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {
    		applyStart := result.Trace.GetStartTime()
    		// The trace object is created in toApply. Here reset the start time to trace
    		// the raft request time by the difference between the request start time
    		// and toApply start time
    		result.Trace.SetStartTime(startTime)
    		result.Trace.InsertStep(0, applyStart, "process raft request")
    		result.Trace.LogIfLong(traceThreshold)
    	}
    	return result.Resp, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    从上面流程看出最终调用的是 (s *EtcdServer) processInternalRaftRequestOnce(…)函数,在该函数里面有一句关键调用 s.r.Propose(cctx, data)。s是EtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了。

    func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
    	ai := s.getAppliedIndex()
    	ci := s.getCommittedIndex()
    	if ci > ai+maxGapBetweenApplyAndCommitIndex { return nil, errors.ErrTooManyRequests }
    	r.Header = &pb.RequestHeader{ ID: s.reqIDGen.Next(), }
    	// check authinfo if it is not InternalAuthenticateRequest
    	if r.Authenticate == nil {
    		authInfo, err := s.AuthInfoFromCtx(ctx)
    		if err != nil { return nil, err }
    		if authInfo != nil {
    			r.Header.Username = authInfo.Username
    			r.Header.AuthRevision = authInfo.Revision
    		}
    	}
    	data, err := r.Marshal()
    	if err != nil { return nil, err }
    	if len(data) > int(s.Cfg.MaxRequestBytes) { return nil, errors.ErrRequestTooLarge }
    
    	id := r.ID
    	if id == 0 { id = r.Header.ID }
    	ch := s.w.Register(id)
    	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    	defer cancel()
    
    	start := time.Now()
    	err = s.r.Propose(cctx, data)
    	if err != nil {
    		proposalsFailed.Inc()
    		s.w.Trigger(id, nil) // GC wait
    		return nil, err
    	}
    	proposalsPending.Inc()
    	defer proposalsPending.Dec()
    
    	select {
    	case x := <-ch:
    		return x.(*apply2.Result), nil
    	case <-cctx.Done():
    		proposalsFailed.Inc()
    		s.w.Trigger(id, nil) // GC wait
    		return nil, s.parseProposeCtxErr(cctx.Err(), start)
    	case <-s.done:
    		return nil, errors.ErrStopped
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    欢迎关注微信公众号肥叔菌,最新内容优先在微信公众号发布。

  • 相关阅读:
    【人工智能】知识图谱
    后缀自动机(SAM)+广义后缀自动机(GSA)
    若依 从下载到成功运行及打包
    fastjson 1.2.47 远程命令执行漏洞
    【编程题】【Scratch二级】2022.06 大鱼吃小鱼
    浏览器tab页签上的title图标favicon.icon
    如何搭建一个基础的springmvc+mybatis项目
    关于log4net的详细使用教程
    天宇优配|上架秒光 “3时代”的大额存单受宠
    QT配置MySQL数据库 && ninja: build stopped: subcommand failed
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/125519606