• (02)Cartographer源码无死角解析-(26) 阻塞队列BlockingQueue(),与OrderedMultiQueue成员函数


    讲解关于slam一系列文章汇总链接:史上最全slam从零开始,针对于本栏目讲解(02)Cartographer源码无死角解析-链接如下:
    (02)Cartographer源码无死角解析- (00)目录_最新无死角讲解:https://blog.csdn.net/weixin_43013761/article/details/127350885
     
    文 末 正 下 方 中 心 提 供 了 本 人 联 系 方 式 , 点 击 本 人 照 片 即 可 显 示 W X → 官 方 认 证 {\color{blue}{文末正下方中心}提供了本人 \color{red} 联系方式,\color{blue}点击本人照片即可显示WX→官方认证} WX
     

    一、前言

    通过前面的博客,对于 src/cartographer/cartographer/sensor/internal/ordered_multi_queue.cc 中 OrderedMultiQueue 类的成员函数基本上都进行了详细的讲解。从该类的名字来看,可以知道,这是这个多队列数据排序的类,实际也确实如此。该类的主要功能就按时间戳对数据进行排序。大部分成员函数都在前面讲解了,下面来讲其中最重要的成员函数 OrderedMultiQueue::Dispatch()。

    该函数主要在如下几个函数中被调用:

    void OrderedMultiQueue::Add(const QueueKey& queue_key,std::unique_ptr<Data> data) 
    
    void OrderedMultiQueue::MarkQueueAsFinished(const QueueKey& queue_key) 
    
    • 1
    • 2
    • 3

    也就是数列之后,与把队列标记为完成时,其都会调用 OrderedMultiQueue::Dispatch() 进行数据的分发。

    功 能 : 将 处 于 数 据 队 列 中 的 数 据 根 据 时 间 依 次 传 入 回 调 函 数 ( 数 据 分 发 ) \color{red}功能: 将处于数据队列中的数据根据时间依次传入回调函数(数据分发) ()

    为了方便理解,这里列举一个例子,现设现在只有一条轨迹0,分别有三个传感器,且刚好订阅了三个话题,那么也就是说共创建了三个队列,即 OrderedMultiQueue::queues_ 存储了三个队列。现假设根据数据的时间戳,举例如下(数字即表示时间戳):

            (0, scan): {      4,     }
            (0, imu):  {1,  3,   5,  }
            (0, odom): {  2,       6,}
    
    • 1
    • 2
    • 3

    那么在分发这两个队列数的时候,是按照 1,2,3,4,5,6 的顺序。

     

    二、OrderedMultiQueue::GetCommonStartTime()

    在对 Dispatch() 讲解之前,先来看一下 GetCommonStartTime() 函数,该函数比较重要,同时它会被 Dispatch() 调用。其主要功能是根据穿入的 trajectory_id,返回该id所有队列第一帧最大的时间。也就是说每个轨迹都对应一个 common_start_time 变量,其实可以理解为该轨迹的起始时间。所有轨迹的起始时间都保存在 OrderedMultiQueue::common_start_time_per_trajectory_ 之中,代码注释如下:

    /**
     * @brief 找到数据队列所有第一帧的最大时间(共同时间)
     * 对于某个id的轨迹的 common_start_time 只会计算一次
     * 
     * @param[in] trajectory_id 轨迹id
     * @return common::Time 返回数据队列所有第一帧的最大时间
     */
    common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
    
      // c++11: map::emplace() 返回的 pair 对象
      // pair 的成员变量 first 是一个指向插入元素或阻止插入的元素的迭代器
      // 成员变量 second 是个布尔值, 表示是否插入成功, 如果这个元素的索引已经存在插入会失败,返回false
      auto emplace_result = common_start_time_per_trajectory_.emplace(
          trajectory_id, common::Time::min());
      common::Time& common_start_time = emplace_result.first->second;
    
      // 如果插入成功了就找到时间戳最大的对common_start_time进行更新, 失败了就不更新
      // 只会在轨迹开始时插入成功一次
      if (emplace_result.second) {
        // 找到这个轨迹下,所有数据队列中数据的时间戳最大 的时间戳
        // 执行到这里时, 所有的数据队列都有值了, 因为没值的情况在Dispatch()中提前返回了
        for (auto& entry : queues_) {
          if (entry.first.trajectory_id == trajectory_id) {
            common_start_time = std::max(
                common_start_time, entry.second.queue.Peek<Data>()->GetTime());
          }
        }
        LOG(INFO) << "All sensor data for trajectory " << trajectory_id
                  << " is available starting at '" << common_start_time << "'.";
    
        // [ INFO] [1628516134.243770381, 1606808649.533687125]: I0809 21:35:34.000000  8604 ordered_multi_queue.cc:264] All sensor data for trajectory 0 is available starting at '637424054495384530'.
    
      }
      return common_start_time;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    如果轨迹的开始时间插入到 common_start_time_per_trajectory_ 之中,代表轨迹已经开始了,会打印类似如下信息:

    [ INFO] [1628516134.243770381, 1606808649.533687125]: I0809 21:35:34.000000  8604 ordered_multi_queue.cc:264] All sensor data for trajectory 0 is available starting at '637424054495384530'.
    
    • 1

     

    三、OrderedMultiQueue::Dispatch()逻辑分析

    ( 1 ) \color{blue}(1) (1) 进行一个true循环,循环退出的三种条件为:
            ①某个话题的数据队列为空,但却不是完成状态。
            ②所有队列queues_为空
            ③数据队列中数据的个数只有1个,又不是完成状态,且不能确定状态,一般为阻塞。
    如果不是很明白的没有关系,继续往下看,注意:每次循环都会创建如下三个临时变量:

        const Data* next_data = nullptr; //next_data表示接下来要处理数据的指针
        Queue* next_queue = nullptr; //接下需要处理数据所在队列
        QueueKey next_queue_key; //需要处理数据所在队列的key
    
    • 1
    • 2
    • 3

     
    ( 2 ) \color{blue}(2) (2) 对所有队列进行for循环遍历,先获得当前队列第一个(最早)数据,如果返回为空指针,说名该队列没有数据,那么会判断一下该队列是否处于 finished状态,
    如果是→则从 queues_ 中删除该队列,执行 continue,遍历下一个队列。
    如果不是→则说明数据用完了,还没有传送过来。则调用 CannotMakeProgress() 函数,把该队列的 key 赋值给 blocker_,也就是把当前队列标记为阻塞。退出循环( 第 一 种 退 出 循 环 情 况 \color{red}第一种退出循环情况 退)

    ( 3 ) \color{blue}(3) (3) 如果从队列中获取到第一个数据的指针(注意,并没有取出数据), 判断条件①→在大循环中是否第一次获得数据,next_data == nullptr。 判断条件②→当前数据的时间比next_data的时间小。两个条件任意满足一个,都会执行如下代码:

          // 第一次进行到这里或者data的时间比next_data的时间小(老数据)
          // 就更新next_data, 并保存当前话题的数据队列以及queue_key
          if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
            next_data = data; //把当前遍历的data赋值给next_data
            next_queue = &it->second; //当前遍历的队列赋值给next_queue
            next_queue_key = it->first; //当前遍历的队列的kay赋值给next_queue_key
          }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    简而言之,就是 next_data == nullptr 时,会把data指针赋值给next_data指针,同时记录其所在队列与该队列对应的key,表示data指向的数据就是接下来需要处理的数据。另外,如果当前data指向数据的时间小于 next_data 的时间(正常情况下是大于的),则把当前数据赋值给next_data,也就是说,找到了比之前 next_data指向数据更加老的数据,当然要赋值给next_data 。

    ( 4 ) \color{blue}(4) (4) 检测 last_dispatched_time_ 是否小于等于 next_data->GetTime(),last_dispatched_time_ 记录的是之前的 next_data->GetTime()。总的来说,就是保证 next_data 比之前的数据都要老。至此,前面的 for 循环结束。

    ( 5 ) \color{blue}(5) (5) 通过for循环之后,如果 next_data 依旧为 nullptr:
    情况一:从每个队列中第一次获取数据的时候,就没有获取到数据,表示队列为空。即所有队列 queues_ 为空。
    请款二:queues_ 的元素为0,根本没有执行前面的 for 循环,此时也会退出while循环,也就是没有订阅任何话题。
    以上两种情况都会退出最外面的while循环( 第 二 种 退 出 循 环 情 况 \color{red}第二种退出循环情况 退)。

    ( 6 ) \color{blue}(6) (6) 通过调用该 OrderedMultiQueue::GetCommonStartTime 函数,获取 next_data 数据所属轨迹开始时间,赋值给 common_start_time。

    ( 7.1 ) \color{blue}(7.1) (7.1) 如果 next_data 指向的数据时间大于等于 common_start_time,则获得 next_data指向数据 所在的队列 next_queue,通过 next_queue->queue.Pop() 获得 next_data 指针指向的数据,同时将这个数据从数据队列中删除。且把该数据传送给队列的回调函数进行处理。

    ( 7.2 ) \color{blue}(7.2) (7.2) 如果 next_queue 队列中数据的个数只有一个(小于2)且next_data 指向的数据时间小于于 common_start_time:
    ①且没有队列没有被标记为完成,那就不太清楚什么情况了,可能是数据来的太慢了,或者其他的情况,则把该队列标记为阻塞状态,直接return( 第 三 种 退 出 循 环 情 况 \color{red}第三种退出循环情况 退)。
    ②如果队列已经被标记为完成状态,则把最后一个数据取出,调用队列的回调函数及逆行处理。

    ( 7.3 ) \color{blue}(7.3) (7.3) 非上述两种情况→也就是 next_data 数据的时间小于轨迹开始的时间common_start_time,同时队列中还存在大量的数据。那么取出最早的数据抛弃掉,然后再取出一个(第二早)的数据,判断一下,时间是否大于 common_start_time,如果大于则进行处理。进入下一次while循环。

    总 结 : \color{red}总结: 总的来说呢,回调函数在添加数据之后,就会调用一次 OrderedMultiQueue::Dispatch() 函数进行数据的分发。数据的分发分发一个数据而已,虽然其会进入while循环,但其目的是为了找到所有队列所有数据中最早的一个数据进行分发,知道都发送完了,才会退出循环,结束该次回调函数。这样是为了保证队列中的数据,以高优先级进行消耗。

     

    四、OrderedMultiQueue::Dispatch()代码注释

    /**
     * @brief 将处于数据队列中的数据根据时间依次传入回调函数(数据分发)
     * 
     * 3种退出情况:
     * 退出条件1 某个话题的数据队列为空同时又不是完成状态, 就退出
     * 退出条件2 只有多队列queues_为空, 就退出
     * 退出条件3 数据队列中数据的个数只有1个,又不是完成状态,不能确定状态, 就先退出
     */
    void OrderedMultiQueue::Dispatch() {
      while (true) {
        /*
          queues_: 
            (0, scan): {      4,     }
            (0, imu):  {1,  3,   5,  }
            (0, odom): {  2,       6,}
        */
        const Data* next_data = nullptr; //指针可以改变,但是指向的值不能改变
        Queue* next_queue = nullptr;
        QueueKey next_queue_key;
    
        // Step: 1 遍历所有的数据队列, 找到所有数据队列的第一个数据中时间最老的一个数据
        for (auto it = queues_.begin(); it != queues_.end();) {
    
          // c++11: auto*(指针类型说明符), auto&(引用类型说明符), auto &&(右值引用)
    
          // 获取当前队列中时间最老的一个的一个数据,队列为空返回nullptr
          const auto* data = it->second.queue.Peek<Data>();
    
          if (data == nullptr) { //如果队列为空
            // 如果队列已经处于finished状态了, 就删掉这个队列
            if (it->second.finished) {
              queues_.erase(it++);
              continue;
            }
            // 退出条件1: 某个话题的数据队列为空同时又不是完成状态, 就先退出, 发布log并标记为阻塞者
            //说明数据队列都用完了,对该队列进行 
            CannotMakeProgress(it->first);
            return;
          }
    
          // 第一次进行到这里或者data的时间比next_data的时间小(老数据)
          // 就更新next_data, 并保存当前话题的数据队列以及queue_key
          if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
            next_data = data; //把当前遍历的data赋值给next_data
            next_queue = &it->second; //当前遍历的队列赋值给next_queue
            next_queue_key = it->first; //当前遍历的队列的kay赋值给next_queue_key
          }
    
          // 数据的时间戳不是按顺序的, 就报错
          CHECK_LE(last_dispatched_time_, next_data->GetTime())
              << "Non-sorted data added to queue: '" << it->first << "'";
          
          ++it;
        } // end for
    
        // 退出条件2: 只有多队列queues_为空, 才可能next_data==nullptr
        if (next_data == nullptr) {
          CHECK(queues_.empty());
          return;
        }
    
        // If we haven't dispatched any data for this trajectory yet, fast forward
        // all queues of this trajectory until a common start time has been reached.
        // 如果我们还没有为这个轨迹分配任何数据, 快进这个轨迹的所有队列, 直到达到一个共同的开始时间
        
        // Step: 2 获取对应轨迹id的所有数据队列中的最小共同时间戳, 作为轨迹开始的时间
        const common::Time common_start_time =
            GetCommonStartTime(next_queue_key.trajectory_id);
    
        // Step: 3 将 next_queue 的时间最老的一个数据传入回调函数进行处理 
    
        // 大多数情况, 数据时间都会超过common_start_time的
        if (next_data->GetTime() >= common_start_time) {
          // Happy case, we are beyond the 'common_start_time' already.
          // 更新分发数据的时间
          last_dispatched_time_ = next_data->GetTime();
          // 将数据传入 callback() 函数进行处理,并将这个数据从数据队列中删除
          next_queue->callback(next_queue->queue.Pop());
        } 
        // 数据时间小于common_start_time,同时数据队列数据的个数小于2,只有1个数据的情况 罕见
        else if (next_queue->queue.Size() < 2) {
          // 退出条件3: 数据队列数据的个数少,又不是完成状态, 不能确定现在到底是啥情况, 就先退出稍后再处理
          if (!next_queue->finished) {
            // We cannot decide whether to drop or dispatch this yet.
            CannotMakeProgress(next_queue_key);
            return;
          } 
          // 处于完成状态了, 将数据传入 callback() 函数进行最后几个数据的处理
          // 更新分发数据的时间,将数据传入 callback() 进行处理,并将这个数据从数据队列中删除
          last_dispatched_time_ = next_data->GetTime();
          next_queue->callback(next_queue->queue.Pop());
        } 
        // 数据时间小于common_start_time,同时数据队列数据的个数大于等于2个
        else {
          // We take a peek at the time after next data. If it also is not beyond
          // 'common_start_time' we drop 'next_data', otherwise we just found the
          // first packet to dispatch from this queue.
    
          // 只处理数据在common_start_time的前一个数据, 其他更早的数据会被丢弃掉
          std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
          if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
            // 更新分发数据的时间,将数据传入 callback() 进行处理
            last_dispatched_time_ = next_data->GetTime();
            next_queue->callback(std::move(next_data_owner));
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

     

    五、结语

    到目前位置,对于 Cartographer 再ROS端数据的处理,可以说是有一定理解了,但是都是比较零散的,下一篇博客会做一个总结,进行一个总体的分析。

     
     
     

  • 相关阅读:
    搭建AE脚本开发环境
    L1-028 判断素数
    Python机器学习分类算法(二)-- 决策树(Decision Tree)
    JavaScript 解决dayjs在周日获取当前周第一天显示下一周第一天问题
    04 随机梯度下降
    nginx自动化脚本安装
    springboot银行客户管理系统毕业设计源码250903
    最好用的BT下载工具Qbittorrent ~群晖DSM7.0 Docker如何安装QB
    java/php/python在线求助救援网站vue+elementui
    第二证券|多只公募基金损失惨重;储能板块低开高走
  • 原文地址:https://blog.csdn.net/weixin_43013761/article/details/127956898