• [源码解析] TensorFlow 分布式环境(2)---Master 静态逻辑


    [源码解析] TensorFlow 分布式环境(2)---Master 静态逻辑

    在具体介绍 TensorFlow 分布式的各种 Strategy 之前,我们首先需要看看分布式的基础:分布式环境。只有把基础打扎实了,才能在以后的分析工作之中最大程度的扫清障碍,事半功倍。本文梳理下 Master 的静态逻辑。

    本系列其他文章是:

    [翻译] TensorFlow 分布式之论文篇 "TensorFlow : Large-Scale Machine Learning on Heterogeneous Distributed Systems"

    [翻译] TensorFlow 分布式之论文篇 "Implementation of Control Flow in TensorFlow"

    [源码解析] TensorFlow 分布式环境(1) --- 总体架构

    1. 总述

    Server 上运行了两个 RPC 服务,分别是MasterService 和 WorkerService。如果 Client 接入到Server,那么Server 就是 Master 角色,Client 访问的就是 MasterService 服务(MasterService 同时负责协调和控制多个 WorkerService 的执行过程)。

    Master 这个角色的具体实现是 Master Service。Master Service是一个GRPC service,用于与一系列远端的分布式设备进行交互来协调多个worker service。

    • Master Service 对应了 "//tensorflow/core/protobuf/master_service.proto",其内部有 CreateSession,RunStep 等接口,所有的 TensorFlow Server 都实现了 Master Service。
    • 客户端可以与 Master Service 交互以执行分布式 TensorFlow 计算。
    • 一个 Master Service 会跟踪多个 "主会话(master sessions)"。每个 master sessions 封装了一个计算图及其相关状态。
    • Master session 运行在 Master 之上,在会话建立后,master 返回一个句柄给客户端,该句柄可用于关联客户端和主会话。
    • 每个 Master session 通常对应一个 "客户会话(client session)"。客户端可以通过调用 CreateSession 向 master 发送一个初始图,通过调用 ExtendSession 向图添加节点。
    • 这里需要说明下,Master 即是一个概念角色,比如 Master 节点,也有一个具体 Master 类。

    2. 接口

    2.1 接口规范

    Client 通过 GrpcSession 调用 Master Service,既然是 RPC 服务,那么 Client 和 MasterService 之间就需要有一个接口规范。这个规范定义在 master_service.proto 文件中,其定义了各个接口的消息体。

    service MasterService {
      // Creates a session.
      rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse);
    
      // Extends a session.
      rpc ExtendSession(ExtendSessionRequest) returns (ExtendSessionResponse);
    
      // Prepares future partial run calls.
      rpc PartialRunSetup(PartialRunSetupRequest) returns (PartialRunSetupResponse);
    
      // Drives the graph computation.
      rpc RunStep(RunStepRequest) returns (RunStepResponse);
    
      // Closes a session.
      rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);
    
      // List the devices usable by the master.
      rpc ListDevices(ListDevicesRequest) returns (ListDevicesResponse);
    
      // Close and abandon all existing sessions.  Ongoing computations
      // will no longer affect fresh ones via the resources in containers listed in
      // the ResetRequest.  See ResetRequest for more details.
      rpc Reset(ResetRequest) returns (ResetResponse);
    
      // Registers a callable for execution with RunCallable.
      rpc MakeCallable(MakeCallableRequest) returns (MakeCallableResponse);
    
      // Executes a callable registered with MakeCallable.
      rpc RunCallable(RunCallableRequest) returns (RunCallableResponse);
    
      // Frees resources associated with a callable registered with MakeCallable.
      rpc ReleaseCallable(ReleaseCallableRequest) returns (ReleaseCallableResponse);
    }
    

    2.2 MasterInterface

    Client 使用接口 MasterInterface 获取远端 MasterService 的服务。MasterInterface 是接口类,是 Client 与 TensorFlow Master service 进行通信的抽象接口。这个接口既支持基于 RPC 的 master 实现,也支持不需要 RPC 往返的进程内部的 master 实现。MasterInterface 所有接口都是同步接口,这样 Client 就像调用本地函数一样调用远端 MasterService 提供的服务。

    MasterInterface有两种实现,都是用来和 Master service 进行通信,

    • LocalMaster 用于进程间的直接通信,此时 Client 和 Master 在同一个进程。
    • GrpcRemoteMaster 则使用 Grpc 来和 Master service 进行通信,此时 Client 和 Master 分别部署在两个不同进程。
      • 可以调用工厂方法 NewGrpcMaster 生成 GrpcRemoteMaster 实例。
      • GrpcRemoteMaster 其实就实现了 gRPC 客户端,它通过 Stub 访问远端 Master 上的 MasterService 服务,具体服务是 GrpcMasterService。
      • 因为 MasterInterface 都是同步接口,所以 Client 就好像访问本地函数一样访问 MasterService。
    class MasterInterface {
     public:
      virtual ~MasterInterface() {}
      virtual Status CreateSession(CallOptions* call_options,
                                   const CreateSessionRequest* request,
                                   CreateSessionResponse* response) = 0;
    
      virtual Status ExtendSession(CallOptions* call_options,
                                   const ExtendSessionRequest* request,
                                   ExtendSessionResponse* response) = 0;
    
      virtual Status PartialRunSetup(CallOptions* call_options,
                                     const PartialRunSetupRequest* request,
                                     PartialRunSetupResponse* response) {
        return errors::Unimplemented("Partial run not implemented for this master");
      }
    
      virtual Status RunStep(CallOptions* call_options,
                             RunStepRequestWrapper* request,
                             MutableRunStepResponseWrapper* response) = 0;
    
      virtual Status RunStep(CallOptions* call_options,
                             const RunStepRequest* request,
                             RunStepResponse* response) {
        std::unique_ptr<RunStepRequestWrapper> wrapped_request(
            new ProtoRunStepRequest(request));
        std::unique_ptr<MutableRunStepResponseWrapper> wrapped_response(
            new NonOwnedProtoRunStepResponse(response));
        return RunStep(call_options, wrapped_request.get(), wrapped_response.get());
      }
    
      virtual MutableRunStepRequestWrapper* CreateRunStepRequest() {
        MutableProtoRunStepRequest* ret = new MutableProtoRunStepRequest;
        ret->request_.set_request_id(GetUniqueRequestId());
        return ret;
      }
    
      virtual MutableRunStepResponseWrapper* CreateRunStepResponse() {
        return new OwnedProtoRunStepResponse;
      }
    
      virtual Status CloseSession(CallOptions* call_options,
                                  const CloseSessionRequest* request,
                                  CloseSessionResponse* response) = 0;
    
      virtual Status ListDevices(CallOptions* call_options,
                                 const ListDevicesRequest* request,
                                 ListDevicesResponse* response) = 0;
    
      virtual Status Reset(CallOptions* call_options, const ResetRequest* request,
                           ResetResponse* response) = 0;
    
      virtual Status MakeCallable(CallOptions* call_options,
                                  const MakeCallableRequest* request,
                                  MakeCallableResponse* response) = 0;
      virtual Status RunCallable(CallOptions* call_options,
                                 const RunCallableRequest* request,
                                 RunCallableResponse* response) = 0;
      virtual Status ReleaseCallable(CallOptions* call_options,
                                     const ReleaseCallableRequest* request,
                                     ReleaseCallableResponse* response) = 0;
    
     protected:
      // NOTE: This should only be called by implementations of this
      // interface whose CreateRunStepResponse() method returns a
      // proto-based wrappers for the RunStepResponse message.
      RunStepResponse* get_proto_from_wrapper(
          MutableRunStepResponseWrapper* wrapper) {
        return wrapper->get_proto();
      }
    };
    

    具体使用如下,如果 Client 和 Master 在同一个进程,则直接使用 LocalMaster,否则使用 GrpcRemoteMaster 来利用 gRPC 访问远程 GrpcMasterService。图上两个矩形封装的 Master 代表实际的 Master 类,此类实现了具体 Master 功能。

    图 1 Master 逻辑结构

    2.3 调用

    下面的伪代码说明了客户端如何与 master 交互,这其实就是分布式模式之中,使用 GrpcRemoteMaster 来通过 gRPC 与远端 MasterSerivce 服务交互的过程。

    stub = NewStub("/job:mnist/replica:0/task:0")
    {handle} = stub->CreateSession({graph_def})
      
    do {
       stub->RunStep({handle, {feeds}, {fetches}})
       // The client can evaluate a predicate locally, based on the
       // result of fetches, to determine whether to terminate. For
       // example, it might fetch the loss and evaluate whether it is less
       // than some threshold.
    } while (!should_stop({fetches}));
    
    stub->CloseSession({handle})
    

    3. LocalMaster

    当 Client 调用时候,GrpcSession 使用 LocalMaster 获取本地master,如果没有得到,则才使用 GrpcRemoteMaster。此时 Client 和 master 没有跨节点,LocalMaster 使客户端和master之间能够直接进行进程内通信,这样就可以给同进程内部的Client提供更高效的Master服务。

    3.1 定义

    LocalMaster 定义如下,主要成员变量就是 master_impl_。LocalMaster 其实就是一个壳而已,直接转发给master_impl_。master_impl_ 是当 Client 和 master 没有跨节点时候,本地直接调用的类。

    class LocalMaster : public MasterInterface {
     private:
      Master* master_impl_;  // Not owned.
      const int64 default_timeout_in_ms_;
    
      // See LocalMaster::Lookup for the factory function that creates
      // objects of this type.
      LocalMaster(Master* master_impl, const int64 default_timeout_in_ms);
    
      TF_DISALLOW_COPY_AND_ASSIGN(LocalMaster);
    };
    

    3.2 注册

    LocalMaster 有一个静态变量 local_master_registry_ 用来注册。

    typedef std::unordered_map<string, MasterInfo> LocalMasterRegistry;
    
    LocalMasterRegistry* local_master_registry() {
      static LocalMasterRegistry* local_master_registry_ = new LocalMasterRegistry;
      return local_master_registry_;
    }
    

    在 GrpcServer 初始化时候,调用如下代码把 target="grpc://" 生成的 Master 注册到本地 LocalMaster。

    LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms());
    

    就是把 master 注册到这个static变量 local_master_registry_ 之中。

    /* static */
    void LocalMaster::Register(const string& target, Master* master,
                               int64 default_timeout_in_ms) {
      mutex_lock l(*get_local_master_registry_lock());
      local_master_registry()->insert(
          {target, MasterInfo(master, default_timeout_in_ms)});
    }
    

    3.3 查找

    当调用 GrpcSession::Create 方法时候,如果 Client 和 Master 在同一个进程,Lookup 在本地能够找到注册的 Master,则会生成一个 LocalMaster 返回,同时 LocalMaster 的 master_impl_ 就配置成找到的 Master。如果找不到,就返回空,则 GrpcSession::Create 方法会创建一个 GrpcRemoterMaster,这样就同远端 Master 进行交互。

    /* static */
    std::unique_ptr<LocalMaster> LocalMaster::Lookup(const string& target) {
      std::unique_ptr<LocalMaster> ret;
      mutex_lock l(*get_local_master_registry_lock());
      auto iter = local_master_registry()->find(target);
      if (iter != local_master_registry()->end()) {
        ret.reset(new LocalMaster(iter->second.master,
                                  iter->second.default_timeout_in_ms));
      }
      return ret;
    }
    

    以下是同一个进程,Lookup 可以找到的情况,生成 LocalMaster 进行本地操作。

    图 2 同进程 master 操作

    我们看看不同进程的情况。此时进程 1 之中的 LocalMaster 没有指向任何 Master,因为本地没有启动 Server,所以 GrpcSession::Create 方法第一步 Lookup 调用失败,返回 Null,GrpcSession::Create 方法执行第二步骤,创建 GrpcRemoteMaster,进行远程交互。进程 2 之中,LocalMaster 因为没有客户端调用 GrpcSession::Create 方法,所以也没有指向任何 Master。

    图 3 跨进程 master 操作

    3.4 功能

    LocalMaster 调用到其内部成员变量 master_impl_ 来完成业务功能。

    Status LocalMaster::CreateSession(CallOptions* call_options,
                                      const CreateSessionRequest* request,
                                      CreateSessionResponse* response) {
      Notification n;
      Status ret;
      master_impl_->CreateSession(request, response, [&n, &ret](const Status& s) {
        ret.Update(s);
        n.Notify();
      });
      TF_RETURN_IF_ERROR(
          WaitForNotification(call_options, default_timeout_in_ms_, &n));
      return ret;
    }
    
    Status LocalMaster::ExtendSession(CallOptions* call_options,
                                      const ExtendSessionRequest* request,
                                      ExtendSessionResponse* response) {
      Notification n;
      Status ret;
      master_impl_->ExtendSession(request, response, [&n, &ret](const Status& s) {
        ret.Update(s);
        n.Notify();
      });
      TF_RETURN_IF_ERROR(
          WaitForNotification(call_options, default_timeout_in_ms_, &n));
      return ret;
    }
    
    Status LocalMaster::RunStep(CallOptions* call_options,
                                RunStepRequestWrapper* request,
                                MutableRunStepResponseWrapper* response) {
      Notification n;
      Status ret;
      master_impl_->RunStep(call_options, request, response,
                            [&n, &ret](const Status& s) {
                              ret.Update(s);
                              n.Notify();
                            });
      TF_RETURN_IF_ERROR(
          WaitForNotification(call_options, default_timeout_in_ms_, &n));
      return ret;
    }
    

    4. GrpcRemoteMaster

    GrpcRemoteMaster 是 gRPC 客户端的一种实现, 其终通过 Stub 调用远端 Master 上的 GrpcMasterService 服务,这样调用行为就犹如本地函数调用一样。远端 GrpcMasterService 实现了 MasterService 服务定义的所有接口,是 MasterService 服务的真正实体。当创建 GrpcRemoteMaster 实例时候,需要通过 target 来指定 Master 服务的地址和端口,并且创建对应的 RPC 通道。GrpcSession 和 GrpcRemoteMaster 从严格意义上讲都是 Client 实现的一部分。

    4.1 定义

    GrpcRemoteMaster 具体定义如下,主要是使用了MasterServiceStub。

    // GrpcRemoteMaster is an implementation of the MasterInterface
    // that uses gRPC to talk to the Master service.
    class GrpcRemoteMaster : public MasterInterface {
      using MasterServiceStub = grpc::MasterService::Stub;
    
     public:
      explicit GrpcRemoteMaster(const SharedGrpcChannelPtr& client_channel)
          : stub_(grpc::MasterService::NewStub(client_channel)) {}
    
      ~GrpcRemoteMaster() override {}
    
      std::unique_ptr<MasterServiceStub> stub_;
    };
    

    4.2 功能

    GrpcRemoteMaster 的功能很简单,就是通过 gRPC 的一 个 stub 调用远端 Master 服务的相应接口。

    4.2.1 CreateSession

    我们使用 CreateSession 为例看看,是使用 CallWithRetry 完成功能。

    Status CreateSession(CallOptions* call_options,
                         const CreateSessionRequest* request,
                         CreateSessionResponse* response) override {
      return CallWithRetry(call_options, request, response,
                           &MasterServiceStub::CreateSession);
    }
    

    CallWithRetry 代码如下,其又是调用 s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response)) 获取 Stub 来完成功能。

    template <typename Request, typename Response>
    Status CallWithRetry(CallOptions* call_options, const Request* request,
                         Response* response,
                         ::grpc::Status (MasterServiceStub::*pfunc)(
                             ::grpc::ClientContext*, const Request&, Response*),
                         string trace_string = {}) {
      absl::Duration timeout = absl::Milliseconds(call_options->GetTimeout());
      absl::Time expired_time = absl::FromUnixMicros(Env::Default()->NowMicros());
      if (timeout > absl::ZeroDuration()) {
        expired_time += timeout;
      }
      Status s;
      for (int num_retries = 0;; ++num_retries) {
        ::grpc::ClientContext ctx;
        std::unique_ptr<profiler::TraceMe> trace;
        if (!trace_string.empty()) {
          trace.reset(NewTraceRpc(trace_string, &ctx));
        }
        ctx.set_fail_fast(false);
        if (timeout > absl::ZeroDuration()) {
          // We do not modify the timeout here to match legacy behavior. However,
          // this could violate the contract of tensorflow::Session. If we retry
          // an RPC just before the deadline is exceeded, we will still set the
          // timeout to the original value. This leads to the overall timeout
          // being double what was expected.
          ctx.set_deadline(absl::ToChronoTime(absl::Now() + timeout));
        }
        s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response));
        if (!errors::IsUnavailable(s)) {
          return s;
        }
        // TODO(b/117162170): we may want to make this configurable.
        constexpr int kMaxRetries = 10;
        if (num_retries >= kMaxRetries) {
          return s;
        }
        absl::Time now = absl::FromUnixMicros(Env::Default()->NowMicros());
        const absl::Time deadline_with_backoff =
            now + absl::Microseconds(ComputeBackoffMicroseconds(num_retries));
        // Wait for a short period of time before retrying the RPC.  If our
        // backoff would put us past the RPC deadline, we truncate it to ensure
        // our RPC starts before the deadline.
        const auto backoff_until = (timeout <= absl::ZeroDuration() ||
                                    expired_time > deadline_with_backoff)
                                       ? deadline_with_backoff
                                       : expired_time;
        Env::Default()->SleepForMicroseconds(
            absl::ToInt64Microseconds(backoff_until - now));
        now = absl::FromUnixMicros(Env::Default()->NowMicros());
        if (now > expired_time && timeout > absl::ZeroDuration()) {
          // If timeout_in_ms is set, exit the retry loop on timeout.
          return errors::DeadlineExceeded(ctx.debug_error_string());
        }
      }
    }
    

    4.2.2 Master Service Stub

    接下来我们看看 Stub,这是依据 "//tensorflow/core/protobuf/master_service.proto" 来使用 grpc 实现的。

    class Stub final : public StubInterface {
     public:
      Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
      ::grpc::Status CreateSession(::grpc::ClientContext* context,
                                   const CreateSessionRequest& request,
                                   CreateSessionResponse* response) override;
      ::grpc::Status ExtendSession(::grpc::ClientContext* context,
                                   const ExtendSessionRequest& request,
                                   ExtendSessionResponse* response) override;
      ::grpc::Status PartialRunSetup(::grpc::ClientContext* context,
                                     const PartialRunSetupRequest& request,
                                     PartialRunSetupResponse* response) override;
      ::grpc::Status RunStep(::grpc::ClientContext* context,
                             const RunStepRequest& request,
                             RunStepResponse* response) override;
      ::grpc::Status CloseSession(::grpc::ClientContext* context,
                                  const CloseSessionRequest& request,
                                  CloseSessionResponse* response) override;
      ::grpc::Status ListDevices(::grpc::ClientContext* context,
                                 const ListDevicesRequest& request,
                                 ListDevicesResponse* response) override;
      ::grpc::Status Reset(::grpc::ClientContext* context,
                           const ResetRequest& request,
                           ResetResponse* response) override;
      ::grpc::Status MakeCallable(::grpc::ClientContext* context,
                                  const MakeCallableRequest& request,
                                  MakeCallableResponse* response) override;
      ::grpc::Status RunCallable(::grpc::ClientContext* context,
                                 const RunCallableRequest& request,
                                 RunCallableResponse* response) override;
      ::grpc::Status ReleaseCallable(::grpc::ClientContext* context,
                                     const ReleaseCallableRequest& request,
                                     ReleaseCallableResponse* response) override;
    
     private:
      std::shared_ptr< ::grpc::ChannelInterface> channel_;
      const ::grpc::internal::RpcMethod rpcmethod_CreateSession_;
      const ::grpc::internal::RpcMethod rpcmethod_ExtendSession_;
      const ::grpc::internal::RpcMethod rpcmethod_PartialRunSetup_;
      const ::grpc::internal::RpcMethod rpcmethod_RunStep_;
      const ::grpc::internal::RpcMethod rpcmethod_CloseSession_;
      const ::grpc::internal::RpcMethod rpcmethod_ListDevices_;
      const ::grpc::internal::RpcMethod rpcmethod_Reset_;
      const ::grpc::internal::RpcMethod rpcmethod_MakeCallable_;
      const ::grpc::internal::RpcMethod rpcmethod_RunCallable_;
      const ::grpc::internal::RpcMethod rpcmethod_ReleaseCallable_;
    };
    

    具体远端的对应方法是:

    static const char* grpcMasterService_method_names[] = {
        "/tensorflow.MasterService/CreateSession",
        "/tensorflow.MasterService/ExtendSession",
        "/tensorflow.MasterService/PartialRunSetup",
        "/tensorflow.MasterService/RunStep",
        "/tensorflow.MasterService/CloseSession",
        "/tensorflow.MasterService/ListDevices",
        "/tensorflow.MasterService/Reset",
        "/tensorflow.MasterService/MakeCallable",
        "/tensorflow.MasterService/RunCallable",
        "/tensorflow.MasterService/ReleaseCallable",
    };
    
    std::unique_ptr<MasterService::Stub> MasterService::NewStub(
        const std::shared_ptr< ::grpc::ChannelInterface>& channel,
        const ::grpc::StubOptions& options) {
      std::unique_ptr<MasterService::Stub> stub(new MasterService::Stub(channel));
      return stub;
    }
    

    Stub 内部调用 grpc 完成发送功能。

    ::grpc::Status MasterService::Stub::CreateSession(
        ::grpc::ClientContext* context, const CreateSessionRequest& request,
        CreateSessionResponse* response) {
      return ::grpc::internal::BlockingUnaryCall(
          channel_.get(), rpcmethod_CreateSession_, context, request, response);
    }
    

    所以,如果是 GrpcRemoteMaster,则调用流程应该是:GrpcRemoteMaster 接收到 grpc session 的请求,转交给 grpc master service,这期间经历了 GrpcSession -> GrpcRemoteMaster -> GrpcMasterService -> Master -> MasterSession 一系列流程。

    4.3 创建

    当建立 GrpcSession 时候,create 方法之中会先查找有没有 Master。如果找到了就直接返回 LocalMaster,这部分我们前面介绍过。如果 Lookup 找不到。所以会调用 NewGrpcMaster 生成一个 GrpcRemoteMaster。

    /* static */
    Status GrpcSession::Create(const SessionOptions& options,
                               std::unique_ptr<GrpcSession>* out_session) {
      std::unique_ptr<GrpcSession> session(new GrpcSession(options));
      std::unique_ptr<MasterInterface> master;
      // For testing, we enable the client to disable the use of the local
      // master registry, so that the RPC stack is exercised.
      if (!options.config.rpc_options().use_rpc_for_inprocess_master()) {
        master = LocalMaster::Lookup(options.target); 
      }
      if (!master) {
        SharedGrpcChannelPtr master_channel;
        TF_RETURN_IF_ERROR(
            NewHostPortGrpcChannel(options.target.substr(kSchemePrefixLength),
                                   &options.config.rpc_options(), &master_channel));
        // 建立 GrpcRemoteMaster,与远端 Master 交互
        master.reset(NewGrpcMaster(master_channel));
      } else {
        session->is_local_ = true;
      }
      session->SetRemoteMaster(std::move(master));
      *out_session = std::move(session);
      return Status::OK();
    }
    

    NewGrpcMaster 方法具体如下:

    MasterInterface* NewGrpcMaster(const SharedGrpcChannelPtr& channel) {
      return new GrpcRemoteMaster(channel);
    }
    

    5. GrpcMasterService

    GrpcMasterService 实现了 RPC 对应的 MasterService。GrpcMasterService 会:

    • 预先了解有哪些本地设备可以给客户使用,也会发现远端设备并且跟踪其统计数据。
    • 维护/管理实时计算图会话(MasterSession),这些会话将调用本地或者远端设备来对收到的计算图进行计算。
    • 会话功能是:对收到的计算图进行分析,剪枝,把节点放到可用设备上,通过调用 RunGraph 在工作者上进行图计算。

    5.1 创建

    GrpcServer 之中,master_service_ 是 GrpcMasterService 类型的变量。

      // 创建 Master 以及对应的 GrpcMasterService
      master_impl_ = CreateMaster(&master_env_);
      master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);
    

    GrpcServer 使用 master_thread_ 线程来执行 GrpcMasterService 的 HandleRPCsLoop方法。

    master_thread_.reset(
        env_->StartThread(ThreadOptions(), "TF_master_service",
                          [this] { master_service_->HandleRPCsLoop(); }));
    

    5.2 定义

    GrpcMasterService 定义如下,master_impl_ 是 Server 传入的 master 指针,是一个 Master 类的实例:

    class GrpcMasterService : public AsyncServiceInterface {
      Master* master_impl_ = nullptr;  // Not owned.
      std::unique_ptr<::grpc::ServerCompletionQueue> cq_;
      grpc::MasterService::AsyncService master_service_;
    
      mutex mu_;
      bool is_shutdown_ TF_GUARDED_BY(mu_);
      const ConfigProto default_session_config_;
      ::grpc::Alarm* shutdown_alarm_ = nullptr;
    
      template <class RequestMessage, class ResponseMessage>
      using MasterCall = Call<GrpcMasterService, grpc::MasterService::AsyncService,
                              RequestMessage, ResponseMessage>;
    }
    

    GrpcMasterService 初始化时候,会得到 grpc 的消息队列 cq_。

    GrpcMasterService(Master* master, const ConfigProto& default_session_config,
                      ::grpc::ServerBuilder* builder)
        : master_impl_(master),
          is_shutdown_(false),
          default_session_config_(default_session_config) {
      builder->RegisterService(&master_service_);
      cq_ = builder->AddCompletionQueue();
    }
    

    5.3 主循环

    前面提到了,master_thread_ 线程来执行 GrpcMasterService 的 HandleRPCsLoop 方法。HandleRPCsLoop 会调用 GrpcMasterService 内部函数来进行处理RPC消息。主循环 HandleRPCsLoop 代码如下:

    void HandleRPCsLoop() override {
      ENQUEUE_REQUEST(CreateSession, true);
      ENQUEUE_REQUEST(ExtendSession, false);
      for (int i = 0; i < 100; ++i) {
        ENQUEUE_REQUEST(PartialRunSetup, false);
        ENQUEUE_REQUEST(RunStep, true);
      }
      ENQUEUE_REQUEST(CloseSession, false);
      ENQUEUE_REQUEST(ListDevices, false);
      ENQUEUE_REQUEST(Reset, false);
      ENQUEUE_REQUEST(MakeCallable, false);
      for (int i = 0; i < 100; ++i) {
        ENQUEUE_REQUEST(RunCallable, true);
      }
      ENQUEUE_REQUEST(ReleaseCallable, false);
    
      void* tag;
      bool ok;
      while (cq_->Next(&tag, &ok)) {
        UntypedCall<GrpcMasterService>::Tag* callback_tag =
            static_cast<UntypedCall<GrpcMasterService>::Tag*>(tag);
        if (callback_tag) {
          callback_tag->OnCompleted(this, ok);
        } else {
          // NOTE(mrry): A null callback_tag indicates that this is
          // the shutdown alarm.
          cq_->Shutdown();
        }
      }
    }
    

    上面代码之中有一些最佳实践,具体就是围绕 ENQUEUE_REQUEST 做了一些处理:

    • this->cq_ 是 grpc 队列。
    • ENQUEUE_REQUEST 宏会为给定的 RPC 方法名称创建一个新请求(比如 ENQUEUE_REQUEST(GetStatus, false) 就会生成一个 GetStatus 请求),这些请求将在 this->cq_ 之上进行排队。
    • 预先把一定数量的要处理的任务放入 cq_,如果任务被任务响应 handler 调用,则 handler 会调用ENQUEUE_REQUEST() 往队列之中补充一个同样的调用,这样可以确保完成队列 cq_ 有足够的任务来处理传入的请求,这样处理将不会阻塞,整体处理速度会提高。
    • 代码最后的 while 循环将读取 gRPC 队列中的内容,就是 gRPC 调用之后的收尾工作。
    #define ENQUEUE_REQUEST(method, supports_cancel)                              \
      do {                                                                        \
        mutex_lock l(mu_);                                                        \
        if (!is_shutdown_) {                                                      \
          Call<GrpcMasterService, grpc::MasterService::AsyncService,              \
               method##Request, method##Response>::                               \
              EnqueueRequest(&master_service_, cq_.get(),                         \
                             &grpc::MasterService::AsyncService::Request##method, \
                             &GrpcMasterService::method##Handler,                 \
                             (supports_cancel));                                  \
        }                                                                         \
      } while (0)
    

    5.4 消息处理

    在具体消息响应之中,会调用 master_impl_ 进行处理,当 Master 处理完成之后,处理函数将回调一个 lambda 表达式,向 Client 返回的响应消息。可以看到,代码在最后会使用 ENQUEUE_REQUEST 再插入一个同样类型的请求,比如下面最后会返回给 Client 一个 CreateSessionResponse。

    // RPC handler for creating a session.
    void CreateSessionHandler(
        MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
      CreateSessionRequest* rewritten_req = new CreateSessionRequest;
      rewritten_req->mutable_config()->MergeFrom(default_session_config_);
      rewritten_req->MergeFrom(call->request);
      master_impl_->CreateSession(rewritten_req, &call->response,
                                  [call, rewritten_req](const Status& status) {
                                    call->SendResponse(ToGrpcStatus(status));
                                    delete rewritten_req;
                                  });
      ENQUEUE_REQUEST(CreateSession, true);
    }
    

    5.5 功能

    GrpcMasterService 提供的 API 如下:

    static const char* grpcMasterService_method_names[] = {
        "/tensorflow.MasterService/CreateSession",
        "/tensorflow.MasterService/ExtendSession",
        "/tensorflow.MasterService/PartialRunSetup",
        "/tensorflow.MasterService/RunStep",
        "/tensorflow.MasterService/CloseSession",
        "/tensorflow.MasterService/ListDevices",
        "/tensorflow.MasterService/Reset",
        "/tensorflow.MasterService/MakeCallable",
        "/tensorflow.MasterService/RunCallable",
        "/tensorflow.MasterService/ReleaseCallable",
    };
    

    我们举出三个具体功能分析一下:

    5.5.1 CreateSession

    CreateSessionRequest 消息之中会带有 Client 设定的计算图和配置信息。Master 接收到请求之后,为这个 Client 建立一个 MasterSession 实例,并建立一个唯一地标识该 MasterSession 实例的 session_handle。这是通过 Master 类成员变量 std::unordered_map<string, MasterSession*> sessions_ 来完成的,session_handle 就是 string 类型。

    Master 返回消息 CreateSessionResponse 给 Client。CreateSessionResponse 消息中携带:

    • session_handle。Client 的 GrpcSession 据此和 Master 端的 MasterSession 建立关联,后续交互之中,Client 在消息内均会携带此 session_handle,随后,Client 与 Master 的所有交互中,在请求 消息中通过携带 session_handle,Master 通过它在 std::unordered_map<string, MasterSession*> sessions_ 会找到相对应的 MasterSession 实例。
    • 初始 graph_version。用于后续发起 ExtendSession 操作,往原始的计算图中追加新的节点。

    图 4 CreateSession

    具体响应代码如下:

    // RPC handler for creating a session.
    void CreateSessionHandler(
        MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
      CreateSessionRequest* rewritten_req = new CreateSessionRequest;
      rewritten_req->mutable_config()->MergeFrom(default_session_config_);
      rewritten_req->MergeFrom(call->request);
      master_impl_->CreateSession(rewritten_req, &call->response,
                                  [call, rewritten_req](const Status& status) {
                                    call->SendResponse(ToGrpcStatus(status));
                                    delete rewritten_req;
                                  });
      ENQUEUE_REQUEST(CreateSession, true);
    }
    

    5.5.2 ExtendSession

    当建立 Session 之后,Client 可以通过 ExtendSession 告诉 Master 我需要拓展原有计算图的规模 (只能追加子图,不能修改或删除)。

    在请求消息 ExtendSessionRequest 中有:

    • session_handle :用来查找哪一个 MasterSession 实例;
    • graph_def :需要加到计算图上的节点;
    • current_graph_version :需要拓展的计算图版本号;

    在在响应消息 ExtendSessionResponse 中返回 new_graph_version,其用于下一此 ExtendSession 操作。

    图 5 ExtendSession

    具体代码如下:

    // RPC handler for extending a session.
    void ExtendSessionHandler(
        MasterCall<ExtendSessionRequest, ExtendSessionResponse>* call) {
      master_impl_->ExtendSession(&call->request, &call->response,
                                  [call](const Status& status) {
                                    call->SendResponse(ToGrpcStatus(status));
                                  });
      ENQUEUE_REQUEST(ExtendSession, false);
    }
    

    5.5.3 RunStep

    客户端会迭代执行 RunStep,请求消息 RunStepRequest 的变量较多,比如:

    • session_handle :用来查找哪一个 MasterSession 实例;
    • feed :输入的 NamedTensor 列表;
    • fetch :待输出 Tensor 的名称列表;
    • target :执行节点列表;

    响应消息 RunStepResponse 主要携带:

    • tensor :输出的 Tensor 列表;

    图 6 RunStep

    消息定义具体如下:

    message RunStepRequest {
      // REQUIRED: session_handle must be returned by a CreateSession call
      // to the same master service.
      string session_handle = 1;
    
      // Tensors to be fed in the step. Each feed is a named tensor.
      repeated NamedTensorProto feed = 2;
    
      // Fetches. A list of tensor names. The caller expects a tensor to
      // be returned for each fetch[i] (see RunStepResponse.tensor). The
      // order of specified fetches does not change the execution order.
      repeated string fetch = 3;
    
      // Target Nodes. A list of node names. The named nodes will be run
      // to but their outputs will not be fetched.
      repeated string target = 4;
    
      // Options for the run call.
      RunOptions options = 5;
    
      // Partial run handle (optional). If specified, this will be a partial run
      // execution, run up to the specified fetches.
      string partial_run_handle = 6;
    
      // If true then some errors, e.g., execution errors that have long
      // error messages, may return an OK RunStepResponse with the actual
      // error saved in the status_code/status_error_message fields of the
      // response body. This is a workaround since the RPC subsystem may
      // truncate long metadata messages.
      bool store_errors_in_response_body = 7;
    
      // Unique identifier for this request. Every RunStepRequest must
      // have a unique request_id, and retried RunStepRequest must have
      // the same request_id. If request_id is zero, retry detection is disabled.
      int64 request_id = 8;
    }
    
    message RunStepResponse {
      // NOTE: The order of the returned tensors may or may not match
      // the fetch order specified in RunStepRequest.
      repeated NamedTensorProto tensor = 1;
    
      // Returned metadata if requested in the options.
      RunMetadata metadata = 2;
    
      // If store_errors_in_response_body is true in the request, then
      // optionally the server may return an OK status for the RPC and
      // fill the true status into the fields below, to allow for messages
      // that are too long to fit in metadata.
      error.Code status_code = 3;
      string status_error_message = 4;
    }
    

    具体代码如下:

    // RPC handler for running one step in a session.
    void RunStepHandler(MasterCall<RunStepRequest, RunStepResponse>* call) {
      auto* trace = TraceRpc("RunStep/Server", call->client_metadata());
      CallOptions* call_opts = new CallOptions;
      if (call->request.options().timeout_in_ms() > 0) {
        call_opts->SetTimeout(call->request.options().timeout_in_ms());
      } else {
        call_opts->SetTimeout(default_session_config_.operation_timeout_in_ms());
      }
      RunStepRequestWrapper* wrapped_request =
          new ProtoRunStepRequest(&call->request);
      MutableRunStepResponseWrapper* wrapped_response =
          new NonOwnedProtoRunStepResponse(&call->response);
      call->SetCancelCallback([call_opts]() { call_opts->StartCancel(); });
      master_impl_->RunStep(
          call_opts, wrapped_request, wrapped_response,
          [call, call_opts, wrapped_request, trace](const Status& status) {
            call->ClearCancelCallback();
            delete call_opts;
            delete wrapped_request;
            delete trace;
            if (call->request.store_errors_in_response_body() && !status.ok()) {
              call->response.set_status_code(status.code());
              call->response.set_status_error_message(status.error_message());
              call->SendResponse(ToGrpcStatus(Status::OK()));
            } else {
              call->SendResponse(ToGrpcStatus(status));
            }
          });
      ENQUEUE_REQUEST(RunStep, true);
    }
    

    6. 业务实现 Master 类

    6.1 创建

    前面提到了,GrpcServer 之中建立的是 Master 类的实例。

    std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) {
      return std::unique_ptr<Master>(new Master(master_env, 0.0));
    }
    

    这样,在收到 Client 的消息后,在具体消息响应之中,GrpcMasterService 的线程会调用 master_impl_ 进行处理,就是把业务逻辑委托给 Master 类来实现。所以我们接下来就看看 Master 如何处理。

    // RPC handler for creating a session.
    void CreateSessionHandler(
        MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
      CreateSessionRequest* rewritten_req = new CreateSessionRequest;
      rewritten_req->mutable_config()->MergeFrom(default_session_config_);
      rewritten_req->MergeFrom(call->request);
      master_impl_->CreateSession(rewritten_req, &call->response,
                                  [call, rewritten_req](const Status& status) {
                                    call->SendResponse(ToGrpcStatus(status));
                                    delete rewritten_req;
                                  });
      ENQUEUE_REQUEST(CreateSession, true);
    }
    
    

    6.2 定义

    Master 其实不是 MasterInterface 的派生类,其定义在tensorflow/core/distributed_runtime/master.cc。可以从成员变量 sessions_ 上看出来,主要就是管理 MasterSession。

    class Master {
    
     private:
      typedef Master ME;
    
      // Not owned.
      MasterEnv* env_ = nullptr;
    
      // Owned.
      mutex mu_;
    
      // shutdown_ is set to true by the dtor.
      condition_variable shutdown_cv_;
      bool shutdown_ TF_GUARDED_BY(mu_) = false;
      Thread* gc_thread_;
    
      // Maps session handles to sessions.
      std::unordered_map<string, MasterSession*> sessions_ TF_GUARDED_BY(mu_);
    
      // Moving average of step times.
      MovingAverage last_1000_steps_ TF_GUARDED_BY(mu_);
    
      // Cumulative number of steps executed.
      int64 step_count_ TF_GUARDED_BY(mu_);
    
      // If a session is not active for this many seconds, it will be
      // closed automatically.
      const double session_gc_seconds_;
    
      // Used to track ids for incoming requests so we can detect duplicates.
      RecentRequestIds recent_request_ids_;
    };
    

    6.3 功能

    我们回忆一下之前提到的。

    分布式运行的核心是如何操作计算图,但是计算功能被拆分为 Client,Master 和 Worker 三个角色。

    Client 负责构造计算图,Worker 负责执行具体计算,但是 Worker 怎么知道应该计算什么?TensorFlow 在两者之间插入了一个 Master 角色来负责协调,调度。

    虽然 Master 不是 MasterInterface 的派生类,但时其实现了 MasterService 的具体业务。Master 具体负责:

    • Master 预先知道本地有哪些设备可以作为客户使用的设备,也会发现远程设备,并跟踪这些远程设备的统计数据。
    • 一个 Master 包含多个 "主会话(master sessions)"。每个 master sessions 封装了一个计算图及其相关状态。
    • 主会话将:
      • 精简优化计算图,比如剪枝/分割/插入发送和接受算子。
      • 协调/调度资源。比如哪个计算应该在哪个设备运行,具体就是按照 graph -> Partition -> Device 这个策略把子图划分到硬件设备之上。
      • 把分割之后的各个子图发送给各个 worker,具体每一个子图对应一个 MasterSession。并最终通过在工作者上启动 RunGraph 来驱动图的计算。
    • Master 维护实时图计算会话的状态。

    至此,Master 的静态结构我们已经介绍完毕,具体 Master 功能我们将在后文 Session 部分进行具体介绍。

    最后,强烈推荐两个大神:

    0xFF 参考

    TensorFlow Internals

    TensorFlow架构与设计:概述

    TensorFlow内核剖析

    TensorFlow架构与设计:OP本质论

    [译] TensorFlow 白皮书

    2017TensorFlow开发者峰会

    https://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/

    TensorFlow 拆包(五):Distributed

    TensorFlow Architecture

    『深度长文』Tensorflow代码解析(五)

    什么是in-graph replication和between-graph replication?

    [腾讯机智] TensorFlow源码解析(1): 创建会话

    05tensorflow分布式会话

    第八节,配置分布式TensorFlow

    TensorFlow 分布式(Distributed TensorFlow)

    tensorflow源码解析之distributed_runtime

    Distributed TensorFlow: A Gentle Introduction

    一文说清楚Tensorflow分布式训练必备知识

    TensorFlow中的Placement启发式算法模块——Placer

    TensorFlow的图切割模块——Graph Partitioner

    TensorFlow中的通信机制——Rendezvous(一)本地传输

    TensorFlow分布式采坑记

    TensorFlow技术内幕(九):模型优化之分布式执行

    Tensorflow架构流程]

  • 相关阅读:
    K8s - Pod配置容器
    flutter Chip 组件
    微信小程序案例3-2 计算器
    一个程序员的晋升之路
    干货 | 5719个字详解低代码在某银行&券商的实践
    强缓存与协商缓存
    服务器数据恢复-EMC存储磁盘损坏的RAID5数据恢复案例
    朴素贝叶斯模型
    8.谈谈线程安全:活跃性问题(死锁、活锁、饥饿)
    好玩的js特效
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/16024266.html