• [源码解析] NVIDIA HugeCTR,GPU 版本参数服务器---(8) ---Distributed Hash之后向传播


    [源码解析] NVIDIA HugeCTR,GPU 版本参数服务器---(8) ---Distributed Hash之后向传播

    0x00 摘要

    在这个系列中,我们介绍了 HugeCTR,这是一个面向行业的推荐系统训练框架,针对具有模型并行嵌入和数据并行密集网络的大规模 CTR 模型进行了优化。本文介绍 DistributedSlotSparseEmbeddingHash 的后向操作。

    其中借鉴了HugeCTR源码阅读 这篇大作,特此感谢。

    本系列其他文章如下:

    [源码解析] NVIDIA HugeCTR,GPU 版本参数服务器 --(1)

    [源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (2)

    [源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)

    [源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (4)

    [源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (5) 嵌入式hash表

    [源码解析] NVIDIA HugeCTR,GPU版本参数服务器--- (6) --- Distributed hash表

    [源码解析] NVIDIA HugeCTR,GPU 版本参数服务器---(7) ---Distributed Hash之前向传播

    0x01 回顾

    前文我们介绍了Distributed Hash之前向传播过程,其逻辑流程如下:

    img

    本文我们来看看如何进行后向传播。

    0x02 总述

    反向传播是求各种权重的变化对最终的误差能造成什么样的影响,或者说是各种权重怎么调整能让预估误差尽可能小,其实就是给各种权重找到梯度下降最快的方向,让损失函数快速地全局达到一个最优点。

    2.1 注释

    我们从注释之中可以看到一共有如下思路,对于后向传播来说,就是计算梯度,然后更新嵌入表。我们后续就按照这个思路来分析代码。

    /**
     * All the CUDA kernel functions used by embedding layer are defined in this file, including
     * forward propagation, backward propagation. The functions are defined by propagation type
     * and combiner type(sum or mean) as below:
     *   1) forward
     *        sum: calling forward_sum_kernel()
     *        mean: calling foward_sum_kernel() + forward_scale_kernel()
     *   2) backward:
     *        calculating wgrad:
     *          sum: calling backward_sum_kernel()
     *          mean: calling backward_mean_kernel()
     *        update embedding table: including several steps as below,
     *          step1: expand sample IDs, calling sample_id_expand_kernel()
     *          step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
     *          step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
     *          step4: count the number for each unduplicated value_index, calling value_count_kernel()
     *          step5: use optimizer method to compute deltaw, and record corresponding, including three
     * types of optimizer: Adam: caling opt_adam_kernel() Momentum sgd: calling
     * opt_momentum_sgd_kernel() Nesterov: calling opt_nesterov_kernel() step6: update embedding table
     * by deltaw, calling update_kernel()
     */
    

    2.2 代码

    在 session::train() 之中有如下代码,这些就对应了总体思路。

    • backward 进行反向传播计算。
    • exchange_wgrad 进行交换梯度。
    • update_params 来更新参数。
          // Embedding backward
          for (const auto& one_embedding : embeddings_) {
            one_embedding->backward();
          }
    
          // Exchange wgrad and update params
          if (networks_.size() > 1) {
    #pragma omp parallel num_threads(networks_.size())
            {
              size_t id = omp_get_thread_num();
              exchange_wgrad(id);
              networks_[id]->update_params();
            }
          } else if (resource_manager_->get_global_gpu_count() > 1) {
            exchange_wgrad(0);
            networks_[0]->update_params();
          }
          for (const auto& one_embedding : embeddings_) {
            one_embedding->update_params();
          }
    

    0x03 输入

    我们首先看看如何获取反向传播的输入。因为从嵌入层比较难以查找,我们换个思路,从 reshape 层来看看。

    3.1 定义

    可以看到,其主要成员变量就是输入 in_tensors_ 和输出 out_tensors_。

    /**
     * Layer which reshapes a 3D/2D input tensor to 2D output tensor,
     * e.g., (batch_size, n_slots, vector_size) to (batch_size, n_slots * vector_size),
     * e.g., (batch_size * n_slots, vector_size) to (batch_size, n_slots * vector_size),
     * If the input tensor is 3D, you can choose which slots participate by calling the different Ctor
     */
    template <typename T>
    class ReshapeLayerCPU : public LayerCPU {
      /*
       * stores the weight tensors of this layer.
       */
      Tensors2<T> weights_;
      /*
       * stores the weight gradient tensors of this layer.
       */
      Tensors2<T> wgrad_;
      /*
       * stores the references to the input tensors of this layer.
       */
      Tensors2<T> in_tensors_;
      /*
       * stores the references to the output tensors of this layer.
       */
      Tensors2<T> out_tensors_;
    
      bool in_place_;
      int batch_size_;
      int n_slot_;
      int vector_length_;
      size_t n_active_slot_;
      Tensor2<int> selected_tensor_;
      std::vector<int> selected_;
    }
    

    3.2 切换

    从代码可以知道,在训练时候就是反复利用了这两个成员变量 in_tensor 和 out_tensor 来做切换。

    • 前向传播时候,fprop是把数据从in_tensor拷贝到out_tensor。
    • 后向传播时候,bprop 是把数据从out_tensor拷贝到in_tensor。

    所以,前向传播的输入变量,在反向传播时候被用来作为输入变量。因此我们可以知道嵌入层也是这个套路。

    template <typename T>
    void ReshapeLayer<T>::fprop(bool is_train) {
      prop_common(true, is_train, get_gpu().get_stream());
    }
    
    template <typename T>
    void ReshapeLayer<T>::bprop() {
      prop_common(false, true, get_gpu().get_stream());
    }
    
    template <typename T>
    void ReshapeLayer<T>::prop_common(bool forward, bool is_train, cudaStream_t stream) {
      CudaDeviceContext context(get_device_id());
      Tensor2<T>& in_tensor = get_in_tensors(is_train)[0];
      Tensor2<T>& out_tensor = out_tensors_[0];
    
      if (in_place_) {
        if (forward) { // 前向传播
          CK_CUDA_THROW_(cudaMemcpyAsync(out_tensor.get_ptr(), in_tensor.get_ptr(),
                                         in_tensor.get_size_in_bytes(), cudaMemcpyDeviceToDevice,
                                         stream));
        } else { // 反向传播
          CK_CUDA_THROW_(cudaMemcpyAsync(in_tensor.get_ptr(), out_tensor.get_ptr(),
                                         out_tensor.get_size_in_bytes(), cudaMemcpyDeviceToDevice,
                                         stream));
        }
      } else {
        int block_size = 128;
        int n_block = get_gpu().get_sm_count() * 16;
        T* in = in_tensor.get_ptr();
        T* out = out_tensor.get_ptr();
        reshape_kernel<<<n_block, block_size>>>(in, out, batch_size_, n_slot_, vector_length_,
                                                selected_tensor_.get_ptr(), n_active_slot_, forward);
      }
    #ifndef NDEBUG
      CK_CUDA_THROW_(cudaDeviceSynchronize());
      CK_CUDA_THROW_(cudaGetLastError());
    #endif
    }
    

    0x04 backward

    4.1 总体代码

    由之前分析我们可以知道,反向传播时候,输入的梯度就是存储在embedding_data_.get_output_tensors(true)之中。总体代码分为两部分,第一步是使用all-gather 操作来在每个GPU之上都收集到所有样本的全部梯度。第二步是调用 functors_.backward进行计算。

    /** 
     * The first stage of backward propagation of embedding layer,
     * which only computes the wgrad by the dgrad from the top layer.
     */
    void backward() override {
      // Read dgrad from output_tensors -> compute wgrad
    
      // do all-gather to collect the top_grad
      size_t send_count = embedding_data_.get_batch_size_per_gpu(true) *
                          embedding_data_.embedding_params_.slot_num *
                          embedding_data_.embedding_params_.embedding_vec_size;
      functors_.all_gather(send_count, embedding_data_.get_output_tensors(true),
                           embedding_feature_tensors_, embedding_data_.get_resource_manager());
    
      // do backward
      functors_.backward(embedding_data_.embedding_params_.get_batch_size(true),
                         embedding_data_.embedding_params_.slot_num,
                         embedding_data_.embedding_params_.embedding_vec_size,
                         embedding_data_.embedding_params_.combiner, row_offset_allreduce_tensors_,
                         embedding_feature_tensors_, wgrad_tensors_,
                         embedding_data_.get_resource_manager());
    
      return;
    }
    

    4.2 AllGather

    反向传播的第一步是使用 all-gather 操作来在每个 GPU 之上都收集到的所有样本的全部梯度,这样后续可以进行计算并且更新每个 GPU 之上的参数。

    4.2.1 原理

    首先我们看 AllGather 原理。在执行 AllGather 操作时,K个处理器之中,每个处理器都会将来自每个处理器的N个值聚集成一个维度为K*N的输出。输出是按rank索引排序的。AllGather操作会受到不同rank或设备映射的影响,因为rank决定了数据布局。

    注意:执行ReduceScatter + AllGather,就等同于AllReduce操作。

    4.2.2 代码

    调用代码如下,可以看到其会把梯度从反向传播的输入 embedding_data_.get_output_tensors(true) 拷贝到 embedding_feature_tensors_。因此,embedding_feature_tensors_ 将会拥有所有的梯度。

      functors_.all_gather(send_count, embedding_data_.get_output_tensors(true),
                           embedding_feature_tensors_, embedding_data_.get_resource_manager());
    

    算子如下:

    /**
     * collection communication: all_gather.
     * @param send_count the count of elements will be sent.
     * @param send_tensors the send tensors of multi GPUs.
     * @param recv_tensors the recv tensors of multi GPUs.
     * @param device_resources all gpus device resources.
     * @param context gpu device context, for switching device.
     */
    template <typename Type>
    void SparseEmbeddingFunctors::all_gather(size_t send_count, const Tensors2<Type> &send_tensors,
                                             Tensors2<Type> &recv_tensors,
                                             const ResourceManager &resource_manager) {
      size_t local_gpu_count = resource_manager.get_local_gpu_count();
      size_t total_gpu_count = resource_manager.get_global_gpu_count();
    
      // need to know the Type
      ncclDataType_t type;
      switch (sizeof(Type)) {
        case 2:
          type = ncclHalf;
          break;
        case 4:
          type = ncclFloat;
          break;
        default:
          CK_THROW_(Error_t::WrongInput, "Error: Type not support by now");
      }
    
      // for multi GPUs, use NCCL to do All-Gather
      if (total_gpu_count > 1) {
        CK_NCCL_THROW_(ncclGroupStart());
        for (size_t id = 0; id < local_gpu_count; id++) {
          const auto &local_gpu = resource_manager.get_local_gpu(id);
          CK_NCCL_THROW_(ncclAllGather(send_tensors[id].get_ptr(),  // send buff
                                       recv_tensors[id].get_ptr(),  // recv buff
                                       send_count, type, local_gpu->get_nccl(),
                                       local_gpu->get_stream()));
        }
        CK_NCCL_THROW_(ncclGroupEnd());
      }
      // for single GPU, just do memcpyD2D
      else {  // total_gpu_count == 1
        const auto &local_gpu = resource_manager.get_local_gpu(0);
        CudaDeviceContext context(local_gpu->get_device_id());
        CK_CUDA_THROW_(cudaMemcpyAsync(recv_tensors[0].get_ptr(), send_tensors[0].get_ptr(),
                                       send_count * sizeof(Type), cudaMemcpyDeviceToDevice,
                                       local_gpu->get_stream()));
      }
    
      return;
    }
    

    4.3 backward

    这部分完成如下功能:计算本地每个gpu上的梯度。此函数完成之后,wgrad_tensors_ 成员变量就是本GPU计算产生的新梯度。

    // do backward
    functors_.backward(embedding_data_.embedding_params_.get_batch_size(true),
                       embedding_data_.embedding_params_.slot_num,
                       embedding_data_.embedding_params_.embedding_vec_size,
                       embedding_data_.embedding_params_.combiner, row_offset_allreduce_tensors_,
                       embedding_feature_tensors_, wgrad_tensors_,
                       embedding_data_.get_resource_manager());
    

    calculating wgrad,会选择如下两种之一:

    • sum: calling backward_sum_kernel() ;
    • mean: calling backward_mean_kernel();

    具体backward代码如下:

    template <typename TypeHashKey, typename TypeEmbeddingComp>
    void SparseEmbeddingFunctors::backward(size_t batch_size,
                                           const std::vector<size_t> &slot_num_per_gpu,
                                           size_t embedding_vec_size, int combiner,
                                           const Tensors2<TypeHashKey> &row_offset_allreduce_tensors,
                                           const Tensors2<TypeEmbeddingComp> &embedding_feature_tensors,
                                           Tensors2<TypeEmbeddingComp> &wgrad_tensors,
                                           const ResourceManager &resource_manager) {
      size_t local_gpu_count = resource_manager.get_local_gpu_count();
    
      CudaDeviceContext context;
      for (size_t id = 0; id < local_gpu_count; id++) { // 遍历本地GPU
        if (slot_num_per_gpu[id] == 0) {
          continue;
        }
    
        const auto &local_gpu = resource_manager.get_local_gpu(id);
        context.set_device(local_gpu->get_device_id());
        // 拿到某一个GPU对应的梯度和offset信息
        const TypeEmbeddingComp *top_grad = embedding_feature_tensors[id].get_ptr();
        const TypeHashKey *row_offset = row_offset_allreduce_tensors[id].get_ptr();
        TypeEmbeddingComp *wgrad = wgrad_tensors[id].get_ptr();
    
        // 计算更新本地梯度
        if (combiner == 0)  // sum
        {
          backward_sum(batch_size, slot_num_per_gpu[id], embedding_vec_size, top_grad, wgrad,
                       local_gpu->get_stream());
        } else if (combiner == 1)  // mean
        {
          backward_mean(batch_size, slot_num_per_gpu[id], embedding_vec_size, row_offset, top_grad,
                        wgrad, local_gpu->get_stream());
        } else {
          CK_THROW_(Error_t::WrongInput, "Invalid combiner type ");
        }
      }
    
      return;
    }
    

    我们以backward_sum 为例,这里采用了GPU多线程更新以加快速度。

    template <typename TypeEmbeddingComp>
    void backward_sum(size_t batch_size, size_t slot_num, size_t embedding_vec_size,
                      const TypeEmbeddingComp *top_grad, TypeEmbeddingComp *wgrad,
                      cudaStream_t stream) {
      const size_t grid_size = batch_size;  // each block corresponds to a sample
      const size_t block_size = embedding_vec_size;
      backward_sum_kernel<<<grid_size, block_size, 0, stream>>>(batch_size, slot_num,
                                                                embedding_vec_size, top_grad, wgrad);
    }
    
    // backward kernel function: for combiner=sum
    template <typename TypeEmbeddingComp>
    __global__ void backward_sum_kernel(int batch_size, int slot_num, int embedding_vec_size,
                                        const TypeEmbeddingComp *top_grad, TypeEmbeddingComp *wgrad) {
      int tid = threadIdx.x;
      int bid = blockIdx.x;
    
      if (bid < batch_size && tid < embedding_vec_size) {
        for (int i = 0; i < slot_num; i++) {
          // 先找到某一个稠密张量的位置,再加上tid就得到了张量之中某一个元素(本tid对应的元素)的位置
          size_t feature_index = (size_t)(bid * slot_num + i) * embedding_vec_size + tid;
          // 更新梯度数值
          wgrad[feature_index] = top_grad[feature_index];
        }
      }
    }
    

    作为对比,贴出backward_mean_kernel,大家可以比对学习。

    // backward kernel function: for combiner=mean
    template <typename TypeKey, typename TypeEmbeddingComp>
    __global__ void backward_mean_kernel(int batch_size, int slot_num, int embedding_vec_size,
                                         const TypeKey *row_offset, const TypeEmbeddingComp *top_grad,
                                         TypeEmbeddingComp *wgrad) {
      int bid = blockIdx.x;
      int tid = threadIdx.x;
    
      if (bid < batch_size && tid < embedding_vec_size) {
        for (int i = 0; i < slot_num; i++) {
          size_t feature_row_index = bid * slot_num + i;
          int value_num = row_offset[feature_row_index + 1] - row_offset[feature_row_index];
          float scaler = 1.0f;
          if (value_num > 1) {
            scaler = 1.0f / value_num;  // partial derivatice of MEAN
          }
    
          size_t feature_index = feature_row_index * embedding_vec_size + tid;
          float g = TypeConvertFunc<float, TypeEmbeddingComp>::convert(top_grad[feature_index]);
          g *= scaler;
          wgrad[feature_index] = TypeConvertFunc<TypeEmbeddingComp, float>::convert(g);
        }
      }
    }
    

    现在,wgrad_tensors_ 之中已经是本地 GPU 产生的梯度了,需要根据这个来更新嵌入层权重,就是更新 hash_table_value 的内容。

    0x05 ExchangeWgrad

    session.train 接下来会交换梯度和更新网络参数。

          // Exchange wgrad and update params
          if (networks_.size() > 1) {
    #pragma omp parallel num_threads(networks_.size())
            {
              size_t id = omp_get_thread_num();
              exchange_wgrad(id);
              networks_[id]->update_params();
            }
          } else if (resource_manager_->get_global_gpu_count() > 1) {
            exchange_wgrad(0);
            networks_[0]->update_params();
          }
    

    具体代码如下:

    void Session::exchange_wgrad(size_t device_id) {
      auto& gpu_resource = resource_manager_->get_local_gpu(device_id);
      CudaCPUDeviceContext context(gpu_resource->get_device_id());
      PROFILE_RECORD("exchange_wgrad.start", gpu_resource->get_stream(), false);
      exchange_wgrad_->allreduce(device_id, gpu_resource->get_stream());
      PROFILE_RECORD("exchange_wgrad.stop", gpu_resource->get_stream(), false);
    }
    

    5.1 定义

    从定义可以看到,ExchangeWgrad 的功能就是简单封装底层资源。

    class ExchangeWgrad {
     public:
      virtual void allocate() = 0;
      virtual void update_embed_wgrad_size(size_t size) = 0;
      virtual void allreduce(size_t device_id, cudaStream_t stream) = 0;
    };
    
    template <typename TypeFP>
    class NetworkExchangeWgrad : public ExchangeWgrad {
     public:
      const BuffPtrs<TypeFP>& get_network_wgrad_buffs() const { return network_wgrad_buffs_; }
      const BuffPtrs<TypeFP>& get_embed_wgrad_buffs() const { return null_wgrad_buffs_; }
      void allocate() final;
      void update_embed_wgrad_size(size_t size) final;
      void allreduce(size_t device_id, cudaStream_t stream);
      NetworkExchangeWgrad(const std::shared_ptr<ResourceManager>& resource_manager);
      ~NetworkExchangeWgrad() = default;
    
     private:
      BuffPtrs<TypeFP> network_wgrad_buffs_;
      BuffPtrs<TypeFP> null_wgrad_buffs_;
      std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> bufs_;
      std::shared_ptr<ResourceManager> resource_manager_;
    
      AllReduceInPlaceComm::Handle ar_handle_;
    
      size_t network_wgrad_size_ = 0;
      size_t num_gpus_ = 0;
    };
    
    template <typename TypeFP>
    class GroupedExchangeWgrad : public ExchangeWgrad {
     public:
      const BuffPtrs<TypeFP>& get_network_wgrad_buffs() const { return network_wgrad_buffs_; }
      const BuffPtrs<TypeFP>& get_embed_wgrad_buffs() const { return embed_wgrad_buffs_; }
      void allocate() final;
      void update_embed_wgrad_size(size_t size) final;
      void allreduce(size_t device_id, cudaStream_t stream);
      GroupedExchangeWgrad(const std::shared_ptr<ResourceManager>& resource_manager);
      ~GroupedExchangeWgrad() = default;
    
     private:
      BuffPtrs<TypeFP> network_wgrad_buffs_;
      BuffPtrs<TypeFP> embed_wgrad_buffs_;
      std::vector<std::shared_ptr<GeneralBuffer2<CudaAllocator>>> bufs_;
      std::shared_ptr<ResourceManager> resource_manager_;
    
      AllReduceInPlaceComm::Handle ar_handle_;
    
      size_t network_wgrad_size_ = 0;
      size_t embed_wgrad_size_ = 0;
      size_t num_gpus_ = 0;
    };
    

    5.2 功能

    交换功能主要是使用底层 all_reduce 来完成操作。

    template <typename T>
    void NetworkExchangeWgrad<T>::allreduce(size_t device_id, cudaStream_t stream) {
      auto ar_comm = resource_manager_->get_ar_comm();
      ar_comm->all_reduce(ar_handle_, stream, device_id);
    }
    
    template <typename T>
    void GroupedExchangeWgrad<T>::allreduce(size_t device_id, cudaStream_t stream) {
      auto ar_comm = resource_manager_->get_ar_comm();
      ar_comm->all_reduce(ar_handle_, stream, device_id);
    }
    

    0x06 更新参数

    Session.train 接下来会让嵌入层来更新参数,具体是使用优化器进行更新。

          for (const auto& one_embedding : embeddings_) {
            one_embedding->update_params();
          }
    

    具体代码如下,其主要逻辑就是在优化器和backward()产生的wgrad合作下,更新hash table。

      /**
       * The second stage of backward propagation of embedding layer, which
       * updates the hash table by wgrad(from backward()) and optimizer.
       */
      void update_params() override {
        // accumulate times for adam optimizer
        embedding_data_.embedding_params_.opt_params.hyperparams.adam.times++;
    #pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
        {
          size_t id = omp_get_thread_num();
          CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
          // do update params operation
          embedding_optimizers_[id].update(
              embedding_data_.embedding_params_.get_batch_size(true),
              embedding_data_.embedding_params_.slot_num,
              embedding_data_.embedding_params_.embedding_vec_size, max_vocabulary_size_per_gpu_,
              *embedding_data_.get_nnz_array(true)[id],
              embedding_data_.get_row_offsets_tensors(true)[id], hash_value_index_tensors_[id],
              wgrad_tensors_[id], hash_table_value_tensors_[id],
              embedding_data_.get_local_gpu(id).get_sm_count(),
              embedding_data_.get_local_gpu(id).get_stream());
        }
    
        return;
      }
    

    这部分是反向操作的难点。现在的问题是,wgrad_tensors_ 之中已经是梯度了,需要根据这个来更新嵌入层权重,就是 hash_table_value。但是如何更新呢?比如怎样利用GPU多线程更新?是否需要更新 hash_value_index_index?我们接下来一步一步分析。

    6.1 问题和思路

    假如batch_size=2,slot_num=2,给出一个CSR例子格式如下(两个样本):

    *   40,50,10,20 // 样本1,slot 1
    *   30,50,10 // 样本1,slot 2
    *   30,20 // 样本2,slot 1
    *   10 // 样本2,slot 2
    * Will be convert to the form of:
    * row offset: 0,4,7,9,10
    * value: 40,50,10,20,30,50,10,30,20,10
    

    6.1.1 前向传播

    下图是前向传播的embedding look示例,最后生成的 embedding_feature 之中,embedding vector个数是:batch_size x slot_num,针对我们的例子:40,50,10,20,30,50,10,30,20,10,分成slot就是:[40,50,10,20],[30,50,10],[30,20],[10]。分别对应embedding_feature矩阵中的四行。

    注:最后输出的是 train_output_tensors_,中间变量为 embedding_feature,embedding_feature 经过了几次GPU之间的通信变化之后演化成了train_output_tensors_ ,两者维度相同,所以我们就使用 embedding_feature。下面图之中数字是构造出来,只供演示使用。

    我们给出 embedding_feature 之中第三条向量的计算过程,他对应了第二个样本的第一个slot,就是 "30,20"。所以就是从 hash_table_value 选出了第2行,第3行,对应位置元素相加,即图中给出的计算过程。

    6.1.2 后向传播

    我们再考虑后向传播。

    后向传播时候用梯度来更新权重,g31,g32,g33,g34 这一行就应该更新 hash_table_value 的第2行,第3行。另外,如果假设第二个样本的第一个slot 是 "30,20,20,20",那么其实就应该用梯度更新hash_table_value 的第2行三次,第3行一次。其实也可以看出来,这种更新不要知道 train_value的数值究竟是什么。

    6.1.3 思路

    我们先用常规思路来梳理一下上面例子:

    • sample_id 列表对应的是40,50,10,20,.....,20 是一个key,它在低维嵌入表 hash_table_value 之中对应一个稠密向量(第2行 10,20,30,40),里面是权重。
    • 嵌入层输出是embedding_feature。
      • embedding vector个数是:batch_size x slot_num,也就是说,CSR 有几行,这里就有几个向量。
      • 其中第三条向量对应了第二个样本的第一个slot,就是 "30,20"。所以就是从 hash_table_value 选出了第2行,第3行,对应位置元素相加: 10,220,330,440,550 = (10+100),(20+200),(30+300),(40+400),(50+500)。
    • 如果有了梯度稠密向量,其是被 hash table value 若干稠密向量做pooling得到的结果。
      • 比如,梯度矩阵第三条向量 g31,g32,g33,g34 对应的就是 embedding_feature 第三条向量 10,220,330,440,550,如果梯度更新权重,就应该更新hash_table_value 的第2行,第3行。
      • 如果样本slot之中有多个同样数值,比如第二个样本的第一个slot 是 "30,20,20,20",那么其实就应该用更新hash_table_value 的第 2 行三次,第 3 行一次。

    我们接着从CUDA角度来看如何更新,其目的是让每一个block 更新一个低维矩阵 hash_table_value 的一行,所以有几个问题:

    • 如何依据本GPU线程的 block id 找到其在低维矩稠密向量阵之中的row offset,假设是第二行。

    • 如何知道本 block 应该更新第二行几次。

    • 更新这几次,分别用哪一个梯度来更新。

      • 比如第1个梯度可能更新第二行,第三个梯度也可能更新第二行。针对我们的例子:40,50,10,20,30,50,10,30,20,10,分成slot就是:[40,50,10,20],[30,50,10],[30,20],[10]。分别对应梯度矩阵中的四行,所以需要从梯度矩阵之中1,2,4行的梯度来更新 10 对应的 hash_table_value。
      • 具体参见下图,这里 train_value 到 gradient 只是示意,就是逻辑上一一对应。

    这里有一个疑问,为什么不像前向传播那样操作,而是要另外重起炉灶呢?这是因为我们不需要知道样本数值就可以更新权重,不需要把40,50,10,20,.....,等等重新走一遍操作哈希表的流程。所以,接下来就看看HugeCTR如何解决这几个问题,这里代码比较烧脑。

    6.2 嵌入层更新

    我们首先看看嵌入层的总体代码和注释里面提到的思路。

    6.2.1 注释

    注释里面关于更新的部分有5步,我们可以看到其大致思路:

    •      step1: expand sample IDs, calling sample_id_expand_kernel();
      
    •      step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib);
      
    •      step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib);
      
    •      step4: count the number for each unduplicated value_index, calling value_count_kernel();
      
    •      step5: use optimizer method to compute deltaw, and record corresponding;
      
    /**
     * All the CUDA kernel functions used by embedding layer are defined in this file, including
     * forward propagation, backward propagation. The functions are defined by propagation type
     * and combiner type(sum or mean) as below:
     *   1) forward
     *        sum: calling forward_sum_kernel()
     *        mean: calling foward_sum_kernel() + forward_scale_kernel()
     *   2) backward:
     *        calculating wgrad:
     *          sum: calling backward_sum_kernel()
     *          mean: calling backward_mean_kernel()
     *        update embedding table: including several steps as below,
     *          step1: expand sample IDs, calling sample_id_expand_kernel()
     *          step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
     *          step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
     *          step4: count the number for each unduplicated value_index, calling value_count_kernel()
     *          step5: use optimizer method to compute deltaw, and record corresponding, including three
     * types of optimizer: Adam: caling opt_adam_kernel() Momentum sgd: calling
     * opt_momentum_sgd_kernel() Nesterov: calling opt_nesterov_kernel() step6: update embedding table
     * by deltaw, calling update_kernel()
     */
    

    6.2.2 update代码

    我们摘录 EmbeddingOptimizer::update 的代码如下,这里只是选择了Optimizer_t::AdaGrad相关部分,其通过 opt_adagrad_kernel 进行更新。这里可以清楚看到注释中的各个步骤,我们接下来就会逐一分析。

    template <typename TypeHashKey, typename TypeEmbeddingComp>
    void EmbeddingOptimizer<TypeHashKey, TypeEmbeddingComp>::update(
        size_t batch_size, size_t slot_num, size_t embedding_vec_size,
        size_t max_vocabulary_size_per_gpu, size_t nnz, const Tensor2<TypeHashKey> &row_offset,
        Tensor2<size_t> &hash_value_index, const Tensor2<TypeEmbeddingComp> &wgrad,
        Tensor2<float> &hash_table_value, size_t sm_count, cudaStream_t stream) {
      OptimizerTensor<TypeEmbeddingComp> &opt_tensor = opt_tensors_;
      OptParams &opt_params = param.opt_params;
      Tensor2<TypeHashKey> &sample_id = sample_id_tensors_;
      Tensor2<TypeHashKey> &sample_id_sort = sample_id_sort_tensors_;
      Tensor2<size_t> &hash_value_index_sort = hash_value_index_sort_tensors_;
      Tensor2<uint32_t> &hash_value_index_count_offset = hash_value_index_count_offset_tensors_;
      Tensor2<uint32_t> &new_hash_value_flag = new_hash_value_flag_tensors_;
      Tensor2<uint32_t> &hash_value_flag_sumed = hash_value_flag_sumed_tensors_;
      Tensor2<uint32_t> &hash_value_index_count_counter = hash_value_index_count_counter_tensors_;
      Tensor2<void> &temp_storage_sort = temp_storage_sort_tensors_;
      Tensor2<void> &temp_storage_scan = temp_storage_scan_tensors_;
    
      size_t block_size, grid_size;
    
      try {
        // step1: expand sample IDs
        block_size = 64;
        grid_size = (batch_size * slot_num - 1) / block_size + 1;
        sample_id_expand_kernel<<<grid_size, block_size, 0, stream>>>(
            batch_size, slot_num, row_offset.get_ptr(), sample_id.get_ptr());
    
        if (opt_params.optimizer == Optimizer_t::SGD &&
            opt_params.hyperparams.sgd.atomic_update) {  // for SGD, do atomic update
          const size_t block_size = embedding_vec_size;
          const size_t grid_size = min(max(1ul, nnz), sm_count * 32);
    
          float lr_scale = opt_params.lr / opt_params.scaler;
          opt_sgd_atomic_kernel<<<grid_size, block_size, 0, stream>>>(
              nnz, embedding_vec_size, lr_scale, hash_value_index.get_ptr(), sample_id.get_ptr(),
              wgrad.get_ptr(), hash_table_value.get_ptr());
        } else {
          // step3: sort by hash_value_index
          int end_bit = static_cast<int>(log2(static_cast<float>(max_vocabulary_size_per_gpu))) + 1;
          size_t temp_storage_sort_size = temp_storage_sort.get_size_in_bytes();
          CK_CUDA_THROW_(cub::DeviceRadixSort::SortPairs(
              temp_storage_sort.get_ptr(), temp_storage_sort_size, hash_value_index.get_ptr(),
              hash_value_index_sort.get_ptr(), sample_id.get_ptr(), sample_id_sort.get_ptr(), nnz, 0,
              end_bit, stream, false));
    
          // step4: count the number for each unduplicated hash_value_index
          CK_CUDA_THROW_(
              cudaMemsetAsync(hash_value_index_count_counter.get_ptr(), 0, sizeof(uint32_t), stream));
    
          constexpr size_t max_grid_size = 384;
          block_size = 256;
          grid_size = min(max_grid_size, (nnz - 1) / block_size + 1);
    
          value_count_kernel_1<<<grid_size, block_size, 0, stream>>>(
              nnz, hash_value_index_sort.get_ptr(), new_hash_value_flag.get_ptr());
    
          // a pinned memroy
          CK_CUDA_THROW_(cudaMemcpyAsync(&hash_hash_value_index_count_num,
                                         hash_value_index_count_counter.get_ptr(), sizeof(uint32_t),
                                         cudaMemcpyDeviceToHost, stream));
    
          // step5: use optimizer method to compute deltaw and update the parameters
          block_size = embedding_vec_size;
          grid_size = max(1, hash_hash_value_index_count_num);
    
          switch (opt_params.update_type) {
            case Update_t::Global: {
              switch (opt_params.optimizer) {
                case Optimizer_t::Adam: {
                }
                case Optimizer_t::AdaGrad: {
                  opt_adagrad_kernel<<<grid_size, block_size, 0, stream>>>(
                      hash_hash_value_index_count_num, embedding_vec_size, opt_params.lr,
                      opt_params.hyperparams.adagrad, opt_tensor.opt_accm_tensors_.get_ptr(),
                      sample_id_sort.get_ptr(), hash_value_index_sort.get_ptr(),
                      hash_value_index_count_offset.get_ptr(), wgrad.get_ptr(),
                      hash_table_value.get_ptr(), opt_params.scaler);
                  break;
                }
                case Optimizer_t::MomentumSGD:
                case Optimizer_t::Nesterov:
                case Optimizer_t::SGD:
                default:
                  CK_THROW_(Error_t::WrongInput, "Error: Invalid opitimizer type");
              }  // switch (optimizer)
              break;
            }
            case Update_t::Local: {
              switch (opt_params.optimizer) {
                case Optimizer_t::Adam: {
                }
                case Optimizer_t::AdaGrad: {
                  opt_adagrad_kernel<<<grid_size, block_size, 0, stream>>>(
                      hash_hash_value_index_count_num, embedding_vec_size, opt_params.lr,
                      opt_params.hyperparams.adagrad, opt_tensor.opt_accm_tensors_.get_ptr(),
                      sample_id_sort.get_ptr(), hash_value_index_sort.get_ptr(),
                      hash_value_index_count_offset.get_ptr(), wgrad.get_ptr(),
                      hash_table_value.get_ptr(), opt_params.scaler);
                  break;
                }
                case Optimizer_t::MomentumSGD:
                case Optimizer_t::Nesterov:
                case Optimizer_t::SGD:
                default:
                  CK_THROW_(Error_t::WrongInput, "Error: Invalid opitimizer type");
              }  // switch (optimizer)
              break;
            }
            case Update_t::LazyGlobal: {
              switch (opt_params.optimizer) {
                case Optimizer_t::Adam: {
                }
                case Optimizer_t::AdaGrad:
                case Optimizer_t::MomentumSGD:
                case Optimizer_t::Nesterov:
                case Optimizer_t::SGD: {
                  CK_THROW_(Error_t::WrongInput,
                            "Error: lazy global update is only implemented for Adam");
                  break;
                }
                default:
                  CK_THROW_(Error_t::WrongInput, "Error: Invalid opitimizer type");
              }
              break;
            }
            default:
              CK_THROW_(Error_t::WrongInput, "Error: Invalid update type");
          }  // switch (update type)
        }
    #ifndef NDEBUG
        cudaDeviceSynchronize();
        CK_CUDA_THROW_(cudaGetLastError());
    #endif
      } catch (const std::runtime_error &rt_err) {
        std::cerr << rt_err.what() << std::endl;
        throw;
      }
    
      return;
    }
    

    首先要说明,这里nnz(non-zero feature number per batch)来自如下,就是本样本之中非零key个数。

    std::vector<std::shared_ptr<size_t>>& get_nnz_array(bool is_train) { 
    	if (is_train) {    
    		return train_nnz_array_;  
    	} else {    
    		return evaluate_nnz_array_;  
    	}
    }
    

    我们接下来逐一看看这些步骤。

    6.3 拓展sample id

    这里对应了第一步,在后续代码之中,每个key对应了一个sample ID。总体思路就是找到每个 key(sample ID) 和梯度矩阵,或者说和embedding_feature之中哪一行相对应,我们后续就直接以 embedding_feature来看,暂时不考虑梯度矩阵 。可以大致理解为把样本id扩展为key id的列表。

    step1: expand sample IDs, calling sample_id_expand_kernel()
    

    就是调用 sample_id_expand_kernel 来拓展sample id。这里 sample_id 是成员变量 sample_id_tensors_的引用,这样就可以直接修改成员变量。

    Tensor2<TypeHashKey> sample_id_tensors_; /**< The temp memory to store the sample ids of hash table value in update_params(). */
    

    具体代码如下:

    Tensor2<TypeHashKey> &sample_id = sample_id_tensors_;
    
    // step1: expand sample IDs
    block_size = 64;
    grid_size = (batch_size * slot_num - 1) / block_size + 1;
    sample_id_expand_kernel<<<grid_size, block_size, 0, stream>>>(
        batch_size, slot_num, row_offset.get_ptr(), sample_id.get_ptr());
    

    通过前面分析我们知道,embedding vector个数是:batch_size x slot_num,也就是说,CSR 有几行,这里就有几个向量。所以这里就直接读取CSR行信息即可。即, sample_id_expand_kernel 会把 sample_id_tensors_ 设置为 CSR row offset(expand sample id by row_offset),就是找到 CSR row offset 之中的index。

    CSR row_offset = [0,4,7,9,10],样本之中key的数值是40,50,10,20,30,50,10,30,20,10,那么 40,50,10,20对应了 0,30,50,10对应了1,30,20对应了 2,10对应了3。因此,sample_id 数值是 [0,0,0,0,1,1,1,2,2,3],就是记录了该 batch 在 embedding_feature_tensors_ 之中的 row index。

    sample_id_expand_kernel 代码如下,这里几个重点:

    • gid 是grid ID,表示本线程对应了embedding_feature_tensors_ 哪个元素。
    • blockIdx 表示一个样本。
    • (batch_size * slot_num) 代表 本batch在 嵌入层输出 train_output_tensors_ 之中对应了多少行,或者说是在 embedding_feature_tensors_ 之中占据了多少行,其实 embedding_feature_tensors_ 也就这么大。
    • sample_id[offset + i] = gid; 目的就是记录该样本某key在 embedding_feature_tensors_ 之中的 row index(对应哪一行)。embedding_feature_tensors_ 这个稠密向量是由 hash_table_value 之中"CSR 本行的元素数目"个稠密向量做pooling得到的结果。
    // expand sample id by row_offset
    template <typename TypeKey>
    __global__ void sample_id_expand_kernel(int batch_size, int slot_num, const TypeKey *row_offset, TypeKey *sample_id) {
      
      // 本线程对应的grid id,其实对应的就是global thread id
      int gid = blockIdx.x * blockDim.x + threadIdx.x; 
    
      if (gid < (batch_size * slot_num)) { // 假如batch_size=2,slot_num=2,取值为 gid < 4
        // 并不是每个GPU线程都会走到这里,对应我们的假设,则只会取出gid = 0~3 这样的线程才会进行下面配置操作
        // 比如,假定gid取值范围8,那么只有gid=0,gid=1,gid=2,gid=3 这几个线程会进入if,执行操作,其余线程不会进入,比如grid=4就不会进入
        TypeKey offset = row_offset[gid]; // 拿到对应的个数,比如 row_offset[0],row_offset[1],row_offset[2]的数值
        int value_num = row_offset[gid + 1] - offset; // 拿到CSR 本行的元素数目
        for (int i = 0; i < value_num; i++) {
          sample_id[offset + i] = gid; // 记录该样本某key在 embedding_feature_tensors_ 之中的 row index
        }
      }
    }
    

    我们把目前涉及的变量整理如下,这里假定从CSR数值到hash_value_index_tensors_ 行的映射是取十位数,比如50就映射到第5行。

    名称 数值 意义
    CSR row offset 0,4,7,9,10 两个样本,两个slot,所以分成四行
    CSR value 40,50,10,20,30,50,10,30,20,10 样本内容
    hash_value_index_tensors_ 4,5,1,2,3,5,1,3,2,1 低维嵌入表的index,样本每个key对应一个,比如50对应了 hash_table_value 第5行
    hash_table_value 5 x 8 的矩阵 低维嵌入表,假定稠密向量长度是8,因为一共只有5个不同数字,所以只有5行
    embedding_feature_tensors_ 4 x 8 的矩阵 嵌入层输出的稠密向量。形状是(batch_size * slot_num) * embedding_vec_len
    sample_id 0,0,0,0,1,1,1,2,2,3 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比如CSR第一行是40,50,10,20,它们都为 embedding_feature_tensors_ 的第一行做出了贡献。

    6.4 从key得到value_index

    下面我们看看第二步,根据key获取到 hash table value index。

    step2: get value_index by key (will call hash_table->get_insert() in nv_hashtable lib)
    

    这部分只是在 test/utest/embedding/sparse_embedding_hash_cpu.hpp 之中有,因为是测试代码,所以此时哈希表没有数据,需要设置,训练代码不需要这一步

    对应代码就是:

    // step2: do hash table get() value_index by key
    int nnz = row_offset_[batchsize_ * slot_num_];
    hash_table_->get(hash_key_.get(), hash_value_index_.get(), nnz);
    

    HashTableCpu 的get方法如下:

      void get(const KeyType* keys, ValType* vals, size_t len) const {
        if (len == 0) {
          return;
        }
        for (size_t i = 0; i < len; i++) {
          auto it = table_->find(keys[i]);
          assert(it != table_->end() && "error: can't find key");
          vals[i] = it->second;
        }
      }
    

    6.5 排序

    这部分对应第三步:

    step3: sort by value_index (will call cub::DeviceRadixSort::SortPairs in cub lib)
    

    现在得到了:sample_id 数值是 [0,0,0,0,1,1,1,2,2,3],就是记录了该 batch 在 embedding_feature_tensors_ 之中的 row index。

    就是把 sample_id 按照 hash_value_index 来排序,最后排序结果放入 hash_value_index_sort 和 sample_id_sort。在我们例子之中,得到结果如下:hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5]。sample_id_sort 是 [0,1,3,0,2,1,2,0,0,1 ]。

    我们还是用表格记录:

    名称 数值 意义
    CSR row offset 0,4,7,9,10 两个样本,两个slot,所以分成四行
    CSR value 40,50,10,20,30,50,10,30,20,10 样本内容
    hash_value_index_tensors_ 4,5,1,2,3,5,1,3,2,1 低维嵌入表的index,样本每个key对应一个,比如50对应了 hash_table_value 第5行
    hash_table_value 5 x 8 的矩阵 低维嵌入表,假定稠密向量长度是8,因为一共只有5个不同数字,所以只有5行
    embedding_feature_tensors_ 4 x 8 的矩阵 嵌入层输出的稠密向量。形状是(batch_size * slot_num) * embedding_vec_len
    sample_id 0,0,0,0,1,1,1,2,2,3 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比如CSR第一行是40,50,10,20,它们都为 embedding_feature_tensors_ 的第一行做出了贡献。
    sample_id_sort [0,1,3,0,2,1,2,0,0,1 ] 和 hash_value_index_sort 对应,就是 hash_value_index_sort 前三个 1 分别对应了embedding_feature 的第1行,第2行,第4行(从0开始的序列)
    hash_value_index_sort [1,1,1,2,2,3,3,4,5,5] 排序之后的结果,举例来说,111 意思是本batch之中,一共有3个key对最终embedding_feature第一行做出了贡献

    具体代码如下:

    // step3: sort by hash_value_index
    int end_bit = static_cast<int>(log2(static_cast<float>(max_vocabulary_size_per_gpu))) + 1;
    size_t temp_storage_sort_size = temp_storage_sort.get_size_in_bytes();
    CK_CUDA_THROW_(cub::DeviceRadixSort::SortPairs(
        temp_storage_sort.get_ptr(), temp_storage_sort_size, hash_value_index.get_ptr(),
        hash_value_index_sort.get_ptr(), sample_id.get_ptr(), sample_id_sort.get_ptr(), nnz, 0,
        end_bit, stream, false));
    

    6.5.1 SortPairs

    这里依然用到了CUB的方法,具体可以参见:https://nvlabs.github.io/cub/structcub_1_1_device_radix_sort.html#a9e14a29dc4ba6d68dc804bc6b0da7dd4。

    方法声明如下:

    template<typename KeyT , typename ValueT >
    static CUB_RUNTIME_FUNCTION cudaError_t cub::DeviceRadixSort::SortPairs	(	
      void * 	d_temp_storage,
      size_t & 	temp_storage_bytes,
      const KeyT * 	d_keys_in,
      KeyT * 	d_keys_out,
      const ValueT * 	d_values_in,
      ValueT * 	d_values_out, 	
      int 	num_items,
      int 	begin_bit = 0,
      int 	end_bit = sizeof(KeyT) * 8,
      cudaStream_t 	stream = 0,
      bool 	debug_synchronous = false 
    )	
    

    具体使用方法如下:

    6.6 计算value_index对应的数目

    现在知道了 hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5],sample_id_sort 是 [0,1,3,0,2,1,2,0,0,1 ]。

    • hash_value_index_sort 是hash_value_index排序之后的结果,举例来说,111 意思是本batch之中,一共有3个key对最终embedding_feature第一行做出了贡献
    • sample_id_sort 和 hash_value_index_sort 对应,就是 hash_value_index_sort 前三个 1 分别对应了embedding_feature 的第1行,第2行,第4行(从0开始的序列)

    接下来需要知道 embedding_feature_tensors_ 每行的来源是多少个 hash_table_value 行,比如第0行有4个,第1行有3个......。embedding_feature_tensors_ 之中的一个行 是被同一个slot的多个 hash_table_value 行的稠密向量做pooling完成的

    这部分对应了如下:

    step4: count the number for each unduplicated value_index, calling value_count_kernel()
    

    就是对 hash_value_index_sort 进行处理,这里是 embedding 表 hash_table_value 的 row index。

    // step4: count the number for each unduplicated hash_value_index
    CK_CUDA_THROW_(
        cudaMemsetAsync(hash_value_index_count_counter.get_ptr(), 0, sizeof(uint32_t), stream));
    
    constexpr size_t max_grid_size = 384;
    block_size = 256;
    grid_size = min(max_grid_size, (nnz - 1) / block_size + 1);
    
    // 目的是找到新的group,就是新的 row index。目的是为了计算每个row index对应的sample id个数
    value_count_kernel_1<<<grid_size, block_size, 0, stream>>>(
        nnz, hash_value_index_sort.get_ptr(), new_hash_value_flag.get_ptr());
    
    // prefix_sum
    size_t temp_storage_scan_size = temp_storage_scan.get_size_in_bytes();
    CK_CUDA_THROW_(cub::DeviceScan::InclusiveSum(
        temp_storage_scan.get_ptr(), temp_storage_scan_size, new_hash_value_flag.get_ptr(),
        hash_value_flag_sumed.get_ptr(), nnz, stream));
    
    value_count_kernel_2<<<grid_size, block_size, 0, stream>>>(
        nnz, new_hash_value_flag.get_ptr(), hash_value_flag_sumed.get_ptr(),
        hash_value_index_count_offset.get_ptr(), hash_value_index_count_counter.get_ptr());
    
    uint32_t hash_hash_value_index_count_num = 0;
    // this async memcpy will not perform as a async operation because the host memory is not
    // a pinned memroy
    CK_CUDA_THROW_(cudaMemcpyAsync(&hash_hash_value_index_count_num,
                                   hash_value_index_count_counter.get_ptr(), sizeof(uint32_t),
                                   cudaMemcpyDeviceToHost, stream));
    

    我们接下来一点点分析。

    6.6.1 value_count_kernel_1

    value_count_kernel_1目的是找到新的group,就是新的 row index。目的是为了计算每个row index对应的sample id 个数。就是找到哪些点是新行起始点。我们拓展表格如下。

    名称 数值 意义
    CSR row offset 0,4,7,9,10 两个样本,两个slot,所以分成四行
    CSR value 40,50,10,20,30,50,10,30,20,10 样本内容
    hash_value_index_tensors_ 4,5,1,2,3,5,1,3,2,1 低维嵌入表的index,样本每个key对应一个,比如50对应了 hash_table_value 第5行
    sample_id 0,0,0,0,1,1,1,2,2,3 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比如CSR第一行是40,50,10,20,它们都为 embedding_feature_tensors_ 的第一行做出了贡献。
    sample_id_sort [0,1,3,0,2,1,2,0,0,1 ] 和 hash_value_index_sort 对应,就是 hash_value_index_sort 前三个 1 分别对应了 embedding_feature 的第1行,第2行,第4行(从0开始的序列)
    hash_value_index_sort [1,1,1,2,2,3,3,4,5,5] 排序之后的结果,举例来说,1,1,1 意思是本batch之中,一共有3个key对最终embedding_feature第一行做出了贡献
    new_hash_value_flag [1,0,0,1,0,1,0,1,1,0] 为了计算每个row index对应的sample id 个数。就是找到哪些点是新行起始点

    具体代码如下:

    __global__ void value_count_kernel_1(int nnz, const size_t *hash_value_index_sort,
                                         uint32_t *new_hash_value_flag) {
      for (int gid = blockIdx.x * blockDim.x + threadIdx.x; gid < nnz; gid += blockDim.x * gridDim.x) {
        size_t cur_value = hash_value_index_sort[gid];
        if (gid > 0) {
          size_t former_value = hash_value_index_sort[gid - 1];
          // decide if this is the start of a group(the elements in this group have the same
          // hash_value_index_sort)
          if (cur_value != former_value) {
            new_hash_value_flag[gid] = 1;
          } else {
            new_hash_value_flag[gid] = 0;
          }
        } else {  // gid == 0
          new_hash_value_flag[gid] = 1;
        }
      }
    }
    

    6.6.2 prefix_sum

    对 new_hash_value_flag 排序,目的是得到每个group(row index)内部包含多少元素,放入 hash_value_flag_sumed 之中。

    // prefix_sum
    size_t temp_storage_scan_size = temp_storage_scan.get_size_in_bytes();
    CK_CUDA_THROW_(cub::DeviceScan::InclusiveSum(
        temp_storage_scan.get_ptr(), temp_storage_scan_size, new_hash_value_flag.get_ptr(),
        hash_value_flag_sumed.get_ptr(), nnz, stream));
    

    这里使用了 cub::DeviceScan::InclusiveSum,如果想深入研究,可以参见 https://nvlabs.github.io/cub/structcub_1_1_device_scan.html

    以下是函数说明。

    以下是使用方法。

    我们拓展表格如下。

    名称 数值 意义
    CSR row offset 0,4,7,9,10 两个样本,两个slot,所以分成四行
    CSR value 40,50,10,20,30,50,10,30,20,10 样本内容
    hash_value_index_tensors_ [4,5,1,2,3,5,1,3,2,1] 低维嵌入表的index,样本每个key对应一个,比如50对应了 hash_table_value 第5行
    sample_id [0,0,0,0,1,1,1,2,2,3] 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比如CSR第一行是40,50,10,20,它们都为 embedding_feature_tensors_ 的第一行做出了贡献。
    sample_id_sort [0,1,3,0,2,1,2,0,0,1] 和 hash_value_index_sort 对应,就是 hash_value_index_sort 前三个 1 分别对应了 embedding_feature 的第1行,第2行,第4行(从0开始的序列)
    hash_value_index_sort [1,1,1,2,2,3,3,4,5,5] 排序之后的结果,举例来说,1,1,1 意思是本batch之中,一共有3个key对最终embedding_feature第一行做出了贡献
    new_hash_value_flag [1,0,0,1,0,1,0,1,1,0] 为了计算每个row index对应的sample id 个数。就是找到哪些点是新行起始点
    hash_value_flag_sumed [1,1,1,2,2,3,3,4,5,5] 对 new_hash_value_flag 合并,目的是得到每个group(row index)内部包含多少元素。
    hash_table_value 5 x 8 的矩阵 低维嵌入表,假定稠密向量长度是8,因为一共只有5个不同数字,所以只有5行

    6.6.3 value_count_kernel_2

    这个代码作用就是得到最终每行元素个数。

    value_count_kernel_2<<<grid_size, block_size, 0, stream>>>(
        nnz, new_hash_value_flag.get_ptr(), hash_value_flag_sumed.get_ptr(),
        hash_value_index_count_offset.get_ptr(), hash_value_index_count_counter.get_ptr());
    
    uint32_t hash_hash_value_index_count_num = 0;
    // this async memcpy will not perform as a async operation because the host memory is not
    // a pinned memroy
    CK_CUDA_THROW_(cudaMemcpyAsync(&hash_hash_value_index_count_num,
                                   hash_value_index_count_counter.get_ptr(), sizeof(uint32_t),
                                   cudaMemcpyDeviceToHost, stream));
    

    hash_hash_value_index_count_num 是index总数,就是一共真实有几行,其对应了nnz。

    * @param nnz non-zero feature number per batch
    

    现在知道了 hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5],sample_id_sort 是 [0,1,3,0,2,1,2,0,0,1 ],new_hash_value_flag 是 [1,0,0,1,0,1,0,1,1,0],里面放置了本行是不是新行。hash_value_flag_sumed 是[ 1,1,1,2,2,3,3,4,5,5 ]。

    我们分析一下代码。总体思想是:在 hash_value_index_index(对应传进来的参数是 hash_value_index_count_offset)设定 "按照数目计算的,对应的 embedding 表 index(就是对应的 embedding 表行号)"。因为embedding_feature 最多只有5行(nnz个数),所以这里取前五个即可。

    比如,每个block要处理低维稠密矩阵一行。如 bid = 1,它希望更新低维稠密矩阵第2行,但是想知道更新几次。所以先从 hash_value_index_count_offset[1] 得到了数值 3,然后找到 hash_value_index_sort[3] 来进行处理。

    具体是:遍历grid,但是需要小于nnz(该batch的非零key数目),其实就是 hash_table_value 的行数。比如说nnz这里等于10,gid 取值就是0~9。grid为0,3,5,7,8 时候new_hash_value_flag[gid] 为 1。hash_value_flag_sumed[gid]分别为:1,2,3,4,5。所以 hash_value_index_count_offset 是 [0, 3, 5, 7, 8, 0, 0, 0, 0, 0],这些是 hash_value_index_sort 之中的offset。

    __global__ void value_count_kernel_2(int nnz, const uint32_t *new_hash_value_flag,
                                         const uint32_t *hash_value_flag_sumed,
                                         uint32_t *hash_value_index_index, uint32_t *counter)
    
    {
      // 遍历grid,但是需要小于该batch的非零key数目,其实就是 hash_table_value 的行数
      for (int gid = blockIdx.x * blockDim.x + threadIdx.x; gid < nnz; gid += blockDim.x * gridDim.x) {
        uint32_t flag = new_hash_value_flag[gid];
        if (flag == 1) {
          // 设定
          hash_value_index_index[hash_value_flag_sumed[gid] - 1] = gid; 
        }
      }
      if (blockIdx.x * blockDim.x + threadIdx.x == 0) {
        *counter = hash_value_flag_sumed[nnz - 1]; 
        hash_value_index_index[*counter] = nnz; 
      }
    }
    

    到目前为止,所有变量如下:

    名称 数值 意义
    CSR row offset 0,4,7,9,10 两个样本,两个slot,所以分成四行
    CSR value 40,50,10,20,30,50,10,30,20,10 样本内容
    hash_table_value 5 x 8 的矩阵 低维嵌入表,假定稠密向量长度是8,因为一共只有5个不同数字(nnz),所以只有5行
    embedding_feature_tensors_ 4 x 8 的矩阵 嵌入层输出的稠密向量。形状是(batch_size * slot_num) * embedding_vec_len
    hash_value_index_tensors_ [4,5,1,2,3,5,1,3,2,1] 低维嵌入表的index,样本每个key对应一个,比如50对应了 hash_table_value 第5行
    sample_id [0,0,0,0,1,1,1,2,2,3] 每个样本的每个key 对应了embedding_feature_tensors_ 中的 row index。比如CSR第一行是40,50,10,20,它们都为 embedding_feature_tensors_ 的第一行做出了贡献。
    sample_id_sort [0,1,3,0,2,1,2,0,0,1] 和 hash_value_index_sort 对应,就是 hash_value_index_sort 前三个 1 分别对应了 embedding_feature 的第1行,第2行,第4行(从0开始的序列)
    hash_value_index_sort [1,1,1,2,2,3,3,4,5,5] 排序之后的结果,举例来说,1,1,1 意思是本batch之中,一共有3个key对最终embedding_feature第一行做出了贡献
    new_hash_value_flag [1,0,0,1,0,1,0,1,1,0] 为了计算每个row index对应的sample id 个数。就是找到哪些点是新行起始点
    hash_value_flag_sumed [1,1,1,2,2,3,3,4,5,5] 对 new_hash_value_flag 合并,目的是得到每个group(row index)内部包含多少元素。
    hash_value_index_count_offset [0, 3, 5, 7, 8, 0, 0, 0, 0, 0] 每个block要处理低维稠密矩阵一行。如 bid = 1,它希望更新低维稠密矩阵第2行,但想知道更新几次。所以先从 hash_value_index_count_offset[1] 得到了数值 3,然后找到 hash_value_index_sort[3]。因为embedding_feature 最多只有5行(nnz个数),所以这里取前五个即可

    最终思路如下:

    • 每个block要处理低维稠密矩阵一行。假如bid=0 想更新低维矩阵第一行,就是要更新10对应的低维矩阵稠密向量。

    • bid对应了key(的梯度),比如 40,50,10,20,30,50,10,30,20,10 这些,其key就是10~50这个5个。

    • hash_value_index_count_offset :本bid对于低维稠密矩阵该行要更新几次。sum_num = hash_value_index_count_offset[1] - hash_value_index_count_offset[0] = 3 - 0 = 3个,所以更新3次。

    • hash_value_index_sort :在 [1,1,1,2,2,3,3,4,5,5] 这里找到 1,1,1,表示本batch之中一共有3个key对最终embedding_feature第一行做出了贡献。

    • 所以 bid = 0 ,就是hash_table_value[0]这一行 有三个1,应该更新3次。

    • sample_id_sort :更新就是累积,就是这3次更新分别去输入梯度哪一行去找?3个10分别在梯度的0,1,3这几行。

    6.7 更新权重

    这是最后一步,对应了如下:

    step5: use optimizer method to compute deltaw and update the parameters
    

    调用代码如下:

    注意,这里传递的是 sample_id_sort [0,1,3,0,2,1,2,0,0,1],对应的 hash_value_index_sort 是 [1,1,1,2,2,3,3,4,5,5],hash_value_index_count_offset 是 [0, 3, 5, 7, 8, 0, 0, 0, 0, 0]。

    case Optimizer_t::AdaGrad: {
      opt_adagrad_kernel<<<grid_size, block_size, 0, stream>>>(
          hash_hash_value_index_count_num, embedding_vec_size, opt_params.lr,
          opt_params.hyperparams.adagrad, opt_tensor.opt_accm_tensors_.get_ptr(),
          sample_id_sort.get_ptr(), hash_value_index_sort.get_ptr(),
          hash_value_index_count_offset.get_ptr(), wgrad.get_ptr(),
          hash_table_value.get_ptr(), opt_params.scaler);
      break;
    }
    

    很明显可以看到,其就是使用权重更新 hash_table_value。

    // Local update for the Adagrad optimizer: compute the gradients and update the accumulators and the
    // weights
    template <typename TypeKey, typename TypeEmbeddingComp>
    __global__ void opt_adagrad_kernel(uint32_t hash_value_index_count_num, int embedding_vec_size,
                                       float lr, const AdaGradParams adagrad,
                                       TypeEmbeddingComp *accum_ptr, const TypeKey *sample_id,
                                       const size_t *hash_value_index_sort,
                                       const uint32_t *hash_value_index_count_offset,
                                       const TypeEmbeddingComp *wgrad, float *hash_table_value,
                                       float scaler) {
      int bid = blockIdx.x; // 一个block对应一个样本之中的一个key,比如例子之中的30
      int tid = threadIdx.x; // 本线程
    
      if (tid < embedding_vec_size && bid < hash_value_index_count_num) {
        // 找到本线程样本在 hash_value_index_sort 的偏移
        uint32_t offset = hash_value_index_count_offset[bid];  // [0, 3, 5, 7, 8, 0, 0, 0, 0, 0]
    
        // 累积得出梯度
        float gi = accumulate_gradients(embedding_vec_size, sample_id, hash_value_index_count_offset,
                                        wgrad, scaler, offset, bid, tid);
    
        // 找到本样本在低维矩阵之中的row index
        size_t row_index = hash_value_index_sort[offset]; // [1,1,1,2,2,3,3,4,5,5]
        // 注意,hash_table_value 是元素级别,比如稠密向量长度是8,那么在 hash_table_value 里面就有8个元素
        // feature_index 就是得到本线程对应的 embedding vector 之中的哪个元素
        size_t feature_index = row_index * embedding_vec_size + tid;
        
        float accum = //accum_ptr 来自优化器
            TypeConvertFunc<float, TypeEmbeddingComp>::convert(accum_ptr[feature_index]) + gi * gi;
    
        accum_ptr[feature_index] = TypeConvertFunc<TypeEmbeddingComp, float>::convert(accum);
        float weight_diff = -lr * gi / (sqrtf(accum) + adagrad.epsilon);
    
        // 更新梯度
        hash_table_value[feature_index] += weight_diff;
      }
    }
    

    accumulate_gradients 的逻辑是:

    // Helper function to accumulate the weight gradients for a thread's embedding vector
    template <typename TypeKey, typename TypeEmbeddingComp>
    __device__ __forceinline__ float accumulate_gradients(int embedding_vec_size,
                                                          const TypeKey *sample_id,
                                                          const uint32_t *hash_value_index_count_offset,
                                                          const TypeEmbeddingComp *wgrad, float scaler,
                                                          uint32_t offset, int bid, int tid) {
    
      // 哪一行更新几次
      // 如果bid=0,则sum_num = hash_value_index_count_offset[1] - hash_value_index_count_offset[0] = 3 - 0 = 3个。bid对应了key,比如 40,50,10,20,30,50,10,30,20,10 这些key,其key就是10~50这个5个。所以 bid = 0 就是要更新10对应的低维矩阵稠密向量,就是hash_table_value[0]这一行,有三个1,应该更新3次。
      uint32_t sample_num = hash_value_index_count_offset[bid + 1] - hash_value_index_count_offset[bid];
    
      // 计算梯度
      float gi = 0.0f;
      // sample_id_sort [0,1,3,0,2,1,2,0,0,1] ---- 第几行,恰恰和 wgrad 对上了
      for (int i = 0; i < sample_num; i++) { // offset 就是0, 3, 5, 7, 8,比如对于第1行,需要更新3次
        // sample_id 是[0,1,3,0,2,1,2,0,0,1],对应了低维矩阵第1,2,4,...,行,就是3个10分别在输出稠密向量的哪一行
        // 更新这几次,就是一个累积,这个累积用哪些梯度来累积。    
        int sample_index = sample_id[offset + i]; // 找到本样本梯度
        gi += TypeConvertFunc<float, TypeEmbeddingComp>::convert(
            wgrad[sample_index * embedding_vec_size + tid]); // 本线程梯度,并且累积
      }
      return gi / scaler;
    }
    

    最终具体如下图:

    至此,我们关于 DistributedSlotSparseEmbeddingHash 分析全部完成,下一篇介绍 LocalSlotSparseEmbeddingHash。

    0xFF 参考

    https://nvlabs.github.io/cub/annotated.html

    https://developer.nvidia.com/blog/introducing-merlin-hugectr-training-framework-dedicated-to-recommender-systems/

    https://developer.nvidia.com/blog/announcing-nvidia-merlin-application-framework-for-deep-recommender-systems/

    https://developer.nvidia.com/blog/accelerating-recommender-systems-training-with-nvidia-merlin-open-beta/

    HugeCTR源码阅读

    embedding层如何反向传播

    https://web.eecs.umich.edu/~justincj/teaching/eecs442/notes/linear-backprop.html

    稀疏矩阵存储格式总结+存储效率对比:COO,CSR,DIA,ELL,HYB

    无中生有:论推荐算法中的Embedding思想

    tf.nn.embedding_lookup函数原理

    求通俗讲解下tensorflow的embedding_lookup接口的意思?

    【技术干货】聊聊在大厂推荐场景中embedding都是怎么做的

    ctr预估算法对于序列特征embedding可否做拼接,输入MLP?与pooling

    推荐系统中的深度匹配模型

    土法炮制:Embedding 层是如何实现的?

    不等距双杆模型_搜索中的深度匹配模型(下)

    深度特征 快牛策略关于高低层特征融合

    [深度学习] DeepFM 介绍与Pytorch代码解释

    deepFM in pytorch

    推荐算法之7——DeepFM模型

    DeepFM 参数理解(二)

    推荐系统遇上深度学习(三)--DeepFM模型理论和实践

    [深度学习] DeepFM 介绍与Pytorch代码解释

    https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/operations.html

    带你认识大模型训练关键算法:分布式训练Allreduce算法

  • 相关阅读:
    Eureka介绍与使用
    vue+element模仿电商商城,前后端分离实现,下单微信扫码支付
    大数据平台搭建2024(一)
    机器人导航必备的栅格地图数学模型及使用
    关联式容器(Associative Container)
    CF464E The Classic Problem
    【微机接口】可编程定时器/计数器8254
    join(),Java内存图
    计算机视觉与深度学习实战,Python工具,深度学习的视觉场景识别
    C语言实现杨辉三角
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/15965286.html