最近尝试了一种新的无锁链表的实现方法,主要思想是:push操作使用原子操作将原来的tail指针保存在线程的局部变量中,然后再修改old tail的next指针;pop操作首先使用原子操作改变head指针指向下一个位置,保证其他线程无法获old head指针,然后再进行删除。原来的实现方法不会保证head指针指向对象的存在。这样,在push取出old tail并修改tail之后,修改old tail的next之前,pop弹出最后一个对象并删除以后就会导致old tail变成空悬指针,这时push修改old tail的next就会导致未定义行为。在pop弹出最后一个以后,虽然old head指向会被删除,但是这时tail是指向head->next的,即使同时其他线程调用pop,也不会在修改old tail时候崩溃;如果链表只有一个元素,但是同时想弹出两个,那么在弹出第2个不会执行,会失败,也不会导致程序崩溃。那么会不会存在pop函数在验证不为空之后又变成空的情况呢?在多个消费者的情况下会出现这个情况,另外多个消费者还会存在old head被删除后在调用compare_exchange时候求head->next而导致解空悬指针。那么多个生产者会不会出现问题呢?目前来看,首先原子更改了tail的指向,那么后面来的线程即使在push的中间状态tail没有被挂到链表末尾,也能正确的识别并写到当前tail的后面。因此理论上是可以接受多个生产者的。即当前实现为MPSC的。
下面这个就是整个流程:

具体实现如下:
- #ifndef _LOCK_FREE_LIST_HPP_
- #define _LOCK_FREE_LIST_HPP_
- #include
-
- template<typename val_t>
- class lock_free_list
- {
- private:
- struct node
- {
- val_t v;
- node* next;
- node():v(), next(nullptr)
- {}
- node(const val_t& v_):v(v_), next(nullptr)
- {}
- };
- std::atomic
head, tail; - public:
- lock_free_list():head(new node), tail(head.load(std::memory_order_relaxed))
- {}
-
- lock_free_list<val_t>& push_back(const val_t& v)
- {
- auto pnew = new node(v);
- node* poldtail = tail.load(std::memory_order_relaxed);
- while(!tail.compare_exchange_strong(poldtail, pnew, std::memory_order_relaxed, std::memory_order_relaxed));
- poldtail->next = pnew;
- return *this;
- }
-
- bool pop_front(val_t& v)
- {
- if (head.load(std::memory_order_relaxed)->next == nullptr)
- {
- return false;
- }
- node* poldhead = head.load(std::memory_order_relaxed);
- while(!head.compare_exchange_strong(poldhead, poldhead->next, std::memory_order_relaxed, std::memory_order_relaxed));
- v = head.load(std::memory_order_relaxed)->v;
- delete poldhead;
- return true;
- }
- };
-
- #endif
下面对这个MPSC的程序作一些修改,使其变成MPMC的。主要修改是:1)取出head:循环尝试取出有效的head,并将head置为nullptr(原子操作),阻止其他pop出头节点,保证了head的独享。2)判断取出的oldhead是否被用过了,于是引入了一个atomic
具体的修改后代码如下:
- #ifndef _LOCK_FREE_LIST_HPP_
- #define _LOCK_FREE_LIST_HPP_
- #include
-
- template<typename val_t>
- class lock_free_list
- {
- private:
- struct node
- {
- std::atomic<bool> valid;
- val_t v;
- node* next;
- node():v(), next(nullptr),valid(true)
- {}
- node(const val_t& v_):v(v_), next(nullptr),valid(true)
- {}
- };
- std::atomic
head, tail; - public:
- lock_free_list():head(new node), tail(head.load(std::memory_order_relaxed))
- {
- head.load(std::memory_order_acquire)->valid = false;
- }
-
- lock_free_list<val_t>& push_back(const val_t& v)
- {
- auto pnew = new node(v);
- node* poldtail = tail.load(std::memory_order_relaxed);
- while(!tail.compare_exchange_strong(poldtail, pnew, std::memory_order_acq_rel));
- poldtail->next = pnew;
- return *this;
- }
-
- bool pop_front(val_t& v)
- {
- node* poldhead = head.load(std::memory_order_relaxed);
- node* poldhead_next = nullptr;
- while(!(poldhead = head.exchange(nullptr, std::memory_order_acq_rel)));
- assert(poldhead);
- poldhead_next = poldhead->next;
- bool valid = poldhead->valid.exchange(false, std::memory_order_acq_rel);
- if (valid)v = poldhead->v;
- if (!poldhead_next)
- {
- head.store(poldhead, std::memory_order_release);
- }
- else
- {
- head.store(poldhead_next, std::memory_order_release);
- delete poldhead;
- }
- return valid;
- }
-
- void reset()
- {
- val_t v;
- while (pop_front(v));
- }
- };
-
- #endif
MPMC的实现有一个技巧,如果直接使用head.compare_exchange_strong(oldhead, oldhead->next)是有问题的,因为在取oldhead->next的时候不能保证oldhead没有被析构调,比如另外一个线程恰好弹出了head并删除掉;所以这里使用的是exchange(nullptr),如果取出的是nullptr,那么说明其他pop线程正在处理,就自旋等待其他pop线程处理完毕,如此取出成功后的oldhead就是线程独享的,其他线程无法获取到,那么也就可以安全的通过oldhead->next更新head指针,不会存在有两个pop线程同时取出有效的head值的情况(所以这种实现方式的pop是串行的,低效的,但是push由于是先在自己的栈中生成独享对象,然后再插入,所以push很高效);程序中当节点的next为空时候是不会删除的,这也保证了push在最后一步设置oldtail->next时候oldtail不会被析构掉。用google-benchmark写一个测试代码试试MPMC的情况:
- #include
- #include
- #include "lock_free_list.hpp"
-
-
- lock_free_list<int> lst;
- static void BM_MultiThreaded(benchmark::State& state) {
- if ((state.threads()&1)==1) state.SkipWithError("Need even number of threads!");
- if (state.thread_index() == 0)lst.reset();
- const bool producer = state.thread_index() & 1;
- const size_t N = state.range(0);
- int i = 0;
- if (state.thread_index() == 0) {
- // Setup code here.
- }
- for (auto _ : state) {
- // Run the test as normal.
- if (producer)
- for (size_t i = 0; i < N; ++i) lst.push_back(i++);
- else
- {
- int v = 0;
- for (size_t i = 0; i < N; ++i) benchmark::DoNotOptimize(lst.pop_front(v));
- }
- }
- if (state.thread_index() == 0) {
- // Teardown code here.
- }
- state.SetItemsProcessed(state.iterations()*N);
- }
- static const long numcpu = sysconf(_SC_NPROCESSORS_CONF);
-
- BENCHMARK(BM_MultiThreaded)->Arg(1000)->ThreadRange(2, numcpu)->UseRealTime();
-
- BENCHMARK_MAIN();
结果如下:
