在ETCD数据库源码分析——etcd gRPC 服务 API注册服务小节,有如下grpc服务注册代码,包含了服务端PUT流程所属的KVServer。

RegisterKVServer函数定义在/api/etcdserverpb/rpc.pb.go文件中,这里pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体。
func RegisterKVServer(s *grpc.Server, srv KVServer) {
s.RegisterService(&_KV_serviceDesc, srv)
}
在/api/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到Put方法对应的函数_KV_Put_Handler。

从_KV_Put_Handler函数可以看到其最重要的语句就是srv.(KVServer).Put(ctx, in),其实就是调用的quotaKVServer.(KVServer).Put(ctx, in)。
func _KV_Put_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PutRequest)
if err := dec(in); err != nil { return nil, err }
if interceptor == nil { return srv.(KVServer).Put(ctx, in) }
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/etcdserverpb.KV/Put",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(KVServer).Put(ctx, req.(*PutRequest)) }
return interceptor(ctx, in, info, handler)
}
因此这里从NewQuotaKVServer看出,quotaKVServer和pb.KVServer接口是组合关系,NewQuotaKVServer函数中调用NewKVServer函数初始化继承了pb.KVServer接口的kvServer结构体。
// server/etcdserver/api/v3rpc/quota.go
type quotaKVServer struct {
pb.KVServer
qa quotaAlarmer
}
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return "aKVServer{
NewKVServer(s),
quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberId()},
}
}
etcdserver.RaftKV接口继承了KVServer接口。而pb.RegisterKVServer(grpcServer, NewQutaKVServer(s))中的s指的就是ETCD数据库中的最重要的etcdserver结构体,所以这里的etcdserver.RaftKV指向的就是etcdserver结构体。
type kvServer struct {
hdr header // 用于填充响应消息的头信息
kv etcdserver.RaftKV
// maxTxnOps is the max operations per txn.
// e.g suppose maxTxnOps = 128.
// Txn.Success can have at most 128 operations,
// and Txn.Failure can have at most 128 operations.
maxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := checkPutRequest(r); err != nil { return nil, err }
resp, err := s.kv.Put(ctx, r)
if err != nil { return nil, togRPCError(err) }
s.hdr.fill(resp.Header)
return resp, nil
}
kvServer结构体Put函数处理流程如下:首先会对请求消息进行各方面的检查,检查完之后会将所有的请求交给其内封装的RaftKV接口进行处理,待处理完成得到响应消息之后,会通过header.fill()方法填充响应的头信息,最后将完整的响应消息返回给客户端。因此,往下追踪 srv.(KVServer).Put(ctx, in)其实就是调用(s *EtcdServer) Put()函数。
// server/etcdserver/v3_server.go
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil { return nil, err }
return resp.(*pb.PutResponse), nil
}
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
return s.raftRequestOnce(ctx, r)
}
func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
result, err := s.processInternalRaftRequestOnce(ctx, r)
if err != nil { return nil, err }
if result.Err != nil { return nil, result.Err }
if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {
applyStart := result.Trace.GetStartTime()
// The trace object is created in toApply. Here reset the start time to trace
// the raft request time by the difference between the request start time
// and toApply start time
result.Trace.SetStartTime(startTime)
result.Trace.InsertStep(0, applyStart, "process raft request")
result.Trace.LogIfLong(traceThreshold)
}
return result.Resp, nil
}
从上面流程看出最终调用的是 (s *EtcdServer) processInternalRaftRequestOnce(…)函数,在该函数里面有一句关键调用 s.r.Propose(cctx, data)。s是EtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了。
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex { return nil, errors.ErrTooManyRequests }
r.Header = &pb.RequestHeader{ ID: s.reqIDGen.Next(), }
// check authinfo if it is not InternalAuthenticateRequest
if r.Authenticate == nil {
authInfo, err := s.AuthInfoFromCtx(ctx)
if err != nil { return nil, err }
if authInfo != nil {
r.Header.Username = authInfo.Username
r.Header.AuthRevision = authInfo.Revision
}
}
data, err := r.Marshal()
if err != nil { return nil, err }
if len(data) > int(s.Cfg.MaxRequestBytes) { return nil, errors.ErrRequestTooLarge }
id := r.ID
if id == 0 { id = r.Header.ID }
ch := s.w.Register(id)
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
err = s.r.Propose(cctx, data)
if err != nil {
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, err
}
proposalsPending.Inc()
defer proposalsPending.Dec()
select {
case x := <-ch:
return x.(*apply2.Result), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done:
return nil, errors.ErrStopped
}
}
欢迎关注微信公众号肥叔菌,最新内容优先在微信公众号发布。