• C++——无锁链表的探索


    最近尝试了一种新的无锁链表的实现方法,主要思想是:push操作使用原子操作将原来的tail指针保存在线程的局部变量中,然后再修改old tail的next指针;pop操作首先使用原子操作改变head指针指向下一个位置,保证其他线程无法获old head指针,然后再进行删除。原来的实现方法不会保证head指针指向对象的存在。这样,在push取出old tail并修改tail之后,修改old tail的next之前,pop弹出最后一个对象并删除以后就会导致old tail变成空悬指针,这时push修改old tail的next就会导致未定义行为。在pop弹出最后一个以后,虽然old head指向会被删除,但是这时tail是指向head->next的,即使同时其他线程调用pop,也不会在修改old tail时候崩溃;如果链表只有一个元素,但是同时想弹出两个,那么在弹出第2个不会执行,会失败,也不会导致程序崩溃。那么会不会存在pop函数在验证不为空之后又变成空的情况呢?在多个消费者的情况下会出现这个情况,另外多个消费者还会存在old head被删除后在调用compare_exchange时候求head->next而导致解空悬指针。那么多个生产者会不会出现问题呢?目前来看,首先原子更改了tail的指向,那么后面来的线程即使在push的中间状态tail没有被挂到链表末尾,也能正确的识别并写到当前tail的后面。因此理论上是可以接受多个生产者的。即当前实现为MPSC的。

    下面这个就是整个流程:

    具体实现如下:

    1. #ifndef _LOCK_FREE_LIST_HPP_
    2. #define _LOCK_FREE_LIST_HPP_
    3. #include
    4. template<typename val_t>
    5. class lock_free_list
    6. {
    7. private:
    8. struct node
    9. {
    10. val_t v;
    11. node* next;
    12. node():v(), next(nullptr)
    13. {}
    14. node(const val_t& v_):v(v_), next(nullptr)
    15. {}
    16. };
    17. std::atomic head, tail;
    18. public:
    19. lock_free_list():head(new node), tail(head.load(std::memory_order_relaxed))
    20. {}
    21. lock_free_list<val_t>& push_back(const val_t& v)
    22. {
    23. auto pnew = new node(v);
    24. node* poldtail = tail.load(std::memory_order_relaxed);
    25. while(!tail.compare_exchange_strong(poldtail, pnew, std::memory_order_relaxed, std::memory_order_relaxed));
    26. poldtail->next = pnew;
    27. return *this;
    28. }
    29. bool pop_front(val_t& v)
    30. {
    31. if (head.load(std::memory_order_relaxed)->next == nullptr)
    32. {
    33. return false;
    34. }
    35. node* poldhead = head.load(std::memory_order_relaxed);
    36. while(!head.compare_exchange_strong(poldhead, poldhead->next, std::memory_order_relaxed, std::memory_order_relaxed));
    37. v = head.load(std::memory_order_relaxed)->v;
    38. delete poldhead;
    39. return true;
    40. }
    41. };
    42. #endif

    下面对这个MPSC的程序作一些修改,使其变成MPMC的。主要修改是:1)取出head:循环尝试取出有效的head,并将head置为nullptr(原子操作),阻止其他pop出头节点,保证了head的独享。2)判断取出的oldhead是否被用过了,于是引入了一个atomic类型进行判断;2)复原head:判断oldhead->next是否为nullptr,如果是nullptr说明当前head是最后一个,那么就不能删除(如果删除了,那么push在给oldtail->next赋值的时候就会崩溃)。如果不是nullptr就将head复原为oldhead->next,否则就将oldhead放回head。

    具体的修改后代码如下:

    1. #ifndef _LOCK_FREE_LIST_HPP_
    2. #define _LOCK_FREE_LIST_HPP_
    3. #include
    4. template<typename val_t>
    5. class lock_free_list
    6. {
    7. private:
    8. struct node
    9. {
    10. std::atomic<bool> valid;
    11. val_t v;
    12. node* next;
    13. node():v(), next(nullptr),valid(true)
    14. {}
    15. node(const val_t& v_):v(v_), next(nullptr),valid(true)
    16. {}
    17. };
    18. std::atomic head, tail;
    19. public:
    20. lock_free_list():head(new node), tail(head.load(std::memory_order_relaxed))
    21. {
    22. head.load(std::memory_order_acquire)->valid = false;
    23. }
    24. lock_free_list<val_t>& push_back(const val_t& v)
    25. {
    26. auto pnew = new node(v);
    27. node* poldtail = tail.load(std::memory_order_relaxed);
    28. while(!tail.compare_exchange_strong(poldtail, pnew, std::memory_order_acq_rel));
    29. poldtail->next = pnew;
    30. return *this;
    31. }
    32. bool pop_front(val_t& v)
    33. {
    34. node* poldhead = head.load(std::memory_order_relaxed);
    35. node* poldhead_next = nullptr;
    36. while(!(poldhead = head.exchange(nullptr, std::memory_order_acq_rel)));
    37. assert(poldhead);
    38. poldhead_next = poldhead->next;
    39. bool valid = poldhead->valid.exchange(false, std::memory_order_acq_rel);
    40. if (valid)v = poldhead->v;
    41. if (!poldhead_next)
    42. {
    43. head.store(poldhead, std::memory_order_release);
    44. }
    45. else
    46. {
    47. head.store(poldhead_next, std::memory_order_release);
    48. delete poldhead;
    49. }
    50. return valid;
    51. }
    52. void reset()
    53. {
    54. val_t v;
    55. while (pop_front(v));
    56. }
    57. };
    58. #endif

    MPMC的实现有一个技巧,如果直接使用head.compare_exchange_strong(oldhead, oldhead->next)是有问题的,因为在取oldhead->next的时候不能保证oldhead没有被析构调,比如另外一个线程恰好弹出了head并删除掉;所以这里使用的是exchange(nullptr),如果取出的是nullptr,那么说明其他pop线程正在处理,就自旋等待其他pop线程处理完毕,如此取出成功后的oldhead就是线程独享的,其他线程无法获取到,那么也就可以安全的通过oldhead->next更新head指针,不会存在有两个pop线程同时取出有效的head值的情况(所以这种实现方式的pop是串行的,低效的,但是push由于是先在自己的栈中生成独享对象,然后再插入,所以push很高效);程序中当节点的next为空时候是不会删除的,这也保证了push在最后一步设置oldtail->next时候oldtail不会被析构掉。用google-benchmark写一个测试代码试试MPMC的情况:

    1. #include
    2. #include
    3. #include "lock_free_list.hpp"
    4. lock_free_list<int> lst;
    5. static void BM_MultiThreaded(benchmark::State& state) {
    6. if ((state.threads()&1)==1) state.SkipWithError("Need even number of threads!");
    7. if (state.thread_index() == 0)lst.reset();
    8. const bool producer = state.thread_index() & 1;
    9. const size_t N = state.range(0);
    10. int i = 0;
    11. if (state.thread_index() == 0) {
    12. // Setup code here.
    13. }
    14. for (auto _ : state) {
    15. // Run the test as normal.
    16. if (producer)
    17. for (size_t i = 0; i < N; ++i) lst.push_back(i++);
    18. else
    19. {
    20. int v = 0;
    21. for (size_t i = 0; i < N; ++i) benchmark::DoNotOptimize(lst.pop_front(v));
    22. }
    23. }
    24. if (state.thread_index() == 0) {
    25. // Teardown code here.
    26. }
    27. state.SetItemsProcessed(state.iterations()*N);
    28. }
    29. static const long numcpu = sysconf(_SC_NPROCESSORS_CONF);
    30. BENCHMARK(BM_MultiThreaded)->Arg(1000)->ThreadRange(2, numcpu)->UseRealTime();
    31. BENCHMARK_MAIN();

    结果如下:

  • 相关阅读:
    ECCV2022|时尚领域的多模态预训练预训练模型FashionViL,在五个下游任务中SOTA!
    Go和Java实现抽象工厂模式
    CC3链分析与复现
    历时一年 Apache Spark 3.3.0 正式发布,新特性详解
    代码随想录算法训练营第十三天|239. 滑动窗口最大值、 347.前 K 个高频元素
    js人民币转换大写函数
    链式二叉树的实现及遍历(C语言版)
    SAP BC 源代码搜索
    RK3588平台开发系列讲解(USB篇)USB 外设 CONFIG
    MATLAB和物联网连载1:Internet of Things离你有多远?5行MATLAB的距离
  • 原文地址:https://blog.csdn.net/Dr_Jack/article/details/133953309