• 手写能任务窃取的线程池


    目录

    function_wrapper.hpp:

    stealing_queue.hpp

    thread_pool_steal_hpp

    参考:《C++并发编程实战》

    对于thread_pool_steal.hpp的代码有改动,不然运行不了

    function_wrapper.hpp:


    //包装可调用对象,对外消除对象型别,还需要有一个函数调用
    //
    //
    //私有成员:
    //      1.一个虚基类(没有的话在3很难定义一个指向实例化类的指针)
    //      struct impl_base{
    //              1.一个纯虚函数用来让派生类继承3执行函数
    //              2.虚析构函数
    //      }
    //      2.因为需要接受任意类型的可调用对象,所以内部需要封装一个模板类(派生自1>)
    //      template
    //      struct impl_type{
    //              1.成员变量:传入的函数
    //              2.构造函数:移动构造成员变量
    //              3.成员函数:执行函数
    //      }
    //      3.一个指向实例化后的指针(在这里看出确实需要一个基类,可以通过基类指针构
    造派生类指针)
    //公有成员:
    //      1.构造函数,因为接收的对象是任意类型,所以是模板构造函数,并且使用万能引
    用和完美转发实例化私有成员的3.
    //      2.()运算符的重载通过调用私有成员2.的3.实现
    //      3.默认构造函数 = default
    //      4.移动拷贝构造函数
    //      5.移动复制构造函数
    //   
     

    1. #ifndef _FUNCTION_WRAPPER_HPP_
    2. #define _FUNCTION_WRAPPER_HPP_
    3. #include
    4. class function_wrapper
    5. {
    6. struct impl_base{
    7. virtual void call() = 0;
    8. virtual ~impl_base(){};
    9. };
    10. template<typename Function>
    11. struct impl_type:impl_base{
    12. Function f;
    13. impl_type(Function&& f_):f(std::move(f_)){}
    14. void call(){ f(); }
    15. };
    16. std::unique_ptr impl;
    17. public:
    18. function_wrapper() = default;
    19. template<typename Function>
    20. function_wrapper(Function&& f):impl(new impl_type(std::move(f))) {}
    21. void operator()(){
    22. impl->call();
    23. }
    24. function_wrapper(function_wrapper&& other):impl(std::move(other.impl)) {}
    25. function_wrapper& operator=(function_wrapper&& other){
    26. impl = std::move(other.impl);
    27. return *this;
    28. }
    29. function_wrapper(function_wrapper& other) = delete;
    30. function_wrapper(const function_wrapper& other) = delete;
    31. function_wrapper& operator=(const function_wrapper& other)=delete;
    32. };
    33. #endif

    stealing_queue.hpp


    //可以进行任务窃取的队列
    //
    //私有成员:
    //      1.一个双端队列,pop操作在队头进行,steal操作在队尾进行,存储的内容为function_wrapper类
    //      2.互斥 保证安全,因为不存储线程,所以不需要条件变量传递线程运行信息
    //公有成员:
    //      1.默认构造函数
    //      2.push
    //      3.try_pop
    //      4.try_steal基本和pop都一样,就是弹出队尾元素//在线程池中定义一个存储窃取
    队列的vector,vector的索引代表每个线程的标识,这样每个线程就可以通过这个vector访>问窃取队列。
    //      5.empty
     

    1. #ifndef _STEALING_QUEUE_HPP
    2. #define _STEALING_QUEUE_HPP
    3. #include"function_wrapper.hpp"
    4. #include
    5. #include
    6. class work_stealing_queue
    7. {
    8. typedef function_wrapper data_type;
    9. std::deque the_queue;
    10. mutable std::mutex mut;
    11. public:
    12. work_stealing_queue(){}
    13. work_stealing_queue(const work_stealing_queue& othre) = delete;
    14. work_stealing_queue& operator=(const work_stealing_queue& othre) = delete;
    15. void push(data_type data){
    16. std::lock_guard lk(mut);
    17. the_queue.push_front(std::move(data));
    18. }
    19. bool try_pop(data_type& data){
    20. std::lock_guard lk(mut);
    21. if(the_queue.empty())
    22. return false;
    23. else{
    24. data = std::move(the_queue.front());
    25. the_queue.pop_front();
    26. return true;
    27. }
    28. }
    29. bool try_steal(data_type& data){
    30. std::lock_guard lk(mut);
    31. if(the_queue.empty())
    32. return false;
    33. else{
    34. data = std::move(the_queue.front());
    35. the_queue.pop_back();
    36. return true;
    37. }
    38. }
    39. bool empty() const{
    40. std::lock_guard lk(mut);
    41. return the_queue.empty();
    42. }
    43. };
    44. #endif

    thread_pool_steal_hpp:

    //私有成员:
    //      1.控制线程正常运行的原子变量,抛出异常设置为true
    //      2.线程池的池队列,基于普通的线程安全队列
    //      3.提供索引的队列存储指向窃取队列的指针
    //      4.存放线程的队列
    //      5.封装可联结线程
    //      6.静态本地线程变量 本地线程队列
    //      7.静态本地线程变量 索引
    //
    //      8.work_thread工作函数(任务函数)
    //      9.判断能否从 本地线程队列获取任务
    //      10.判断能否从 线程池队列获取任务
    //      11.判断能否从 其他线程窃取任务
    //公有成员:
    //      1.构造函数:初始化原子变量,可联结线程类。
    //      {
    //              try
    //              {       
    //                      for()
    //                      {初始化提供索引的队列   }
    //                      for()
    //                      {初始化线程}
    //              }
    //              catch
    //      }
    //      2.提交任务函数//为了获取返回值应该返回future,传入各种函数,所以应该是模
    板函数{
    //              1.传入函数打包给pakage_task
    //              2.获取future
    //              3.判断是传入本地线程队列还是线程池队列
    //              4.return future;
    //      }
    //      3.run_package_task//work_thread的主要部分
    //      {
    //              if(判断从哪个队列获得任务)
    //              {
    //                      task()
    //              }
    //              else 交出cpu时间。
    //        }

    //        4.析构函数
     

    1. #ifndef _THREAD_POOL_STEAL_HPP_
    2. #define _THREAD_POOL_STEAL_HPP_
    3. #include "threadsafe_queue.hpp"
    4. #include "ThreadRAII.h"
    5. #include "stealing_queue.hpp"
    6. #include "function_wrapper.hpp"
    7. #include "stealing_queue.hpp"
    8. #include
    9. #include
    10. #include
    11. #include
    12. #include
    13. class thread_pool_steal
    14. {
    15. typedef function_wrapper task_type;
    16. std::atomic_bool done;
    17. threadsafe_queue pool_work_queue;
    18. std::vector> queues;
    19. std::vector threads;
    20. join_threads joiner;
    21. static thread_local work_stealing_queue* local_work_queue;
    22. static thread_local unsigned my_index ;
    23. void work_thread(unsigned index){
    24. my_index = index;
    25. local_work_queue = queues[my_index].get();
    26. while(!done){
    27. run_pending_task();
    28. }
    29. }
    30. bool pop_from_local(task_type& task){
    31. return local_work_queue && local_work_queue->try_pop(task);
    32. }
    33. bool pop_from_pool(task_type& task){
    34. return pool_work_queue.try_pop(task);
    35. }
    36. bool pop_from_steal(task_type& task){
    37. work_stealing_queue steal_queue;
    38. for(unsigned i = 0; isize(); i++){
    39. unsigned index =(my_index + i+1)%queues.size();
    40. if(queues[index]->try_steal(task))
    41. {
    42. return true;
    43. }
    44. }
    45. return false;
    46. }
    47. public:
    48. thread_pool_steal():done(false),joiner(threads)
    49. {
    50. unsigned number = std::thread::hardware_concurrency();
    51. try{
    52. for(unsigned i = 0; i
    53. queues.push_back(std::unique_ptr(new work_stealing_queue));
    54. }
    55. for(unsigned i = 0; i
    56. threads.push_back(std::thread(&thread_pool_steal::work_thread,this,i));
    57. }
    58. }
    59. catch(...){
    60. done = true;
    61. throw;
    62. }
    63. }
    64. ~thread_pool_steal(){
    65. done = true;
    66. }
    67. template <typename Function>
    68. std::future<typename std::result_of<Function()>::type> submit(Function f)
    69. {
    70. typedef typename std::result_of<Function()>::type result_type;
    71. std::packaged_task<result_type()> task(f);
    72. std::future res(task.get_future());
    73. int index = 0;
    74. for(auto& ptr:queues){
    75. if(ptr->empty()){
    76. ptr->push(std::move(task));
    77. index = -1;
    78. break;
    79. }
    80. index++;
    81. }
    82. if(index>=0){
    83. pool_work_queue.push(std::move(task));
    84. }
    85. return res;
    86. }
    87. void run_pending_task(){
    88. task_type task;
    89. if(pop_from_local(task) || pop_from_pool(task) || pop_from_steal(task))
    90. {
    91. task();
    92. }
    93. else{
    94. std::this_thread::yield();
    95. }
    96. }
    97. };
    98. thread_local work_stealing_queue* thread_pool_steal::local_work_queue = nullptr;
    99. thread_local unsigned thread_pool_steal::my_index = 0;
    100. #endif

  • 相关阅读:
    哈希表题目:键盘行
    陈年雷司令葡萄酒中的石油笔记
    django的update和create高级操作
    Spring Cloud Alibaba 容器化部署最佳实践 | 本地部署版本详解
    联通边缘AI:打造“职业技能”,助力行业高质量发展
    按钮测试: 循环遍历 单据行明细 &执行SQL
    力扣LeatCode算法题第三题-无重复字符的最长子串
    Latex参考文献中大写字母编译后自动变成了小写,如何保持原字母大写形式
    详解--Hash(中文也称散列、哈希)
    动态规划合集
  • 原文地址:https://blog.csdn.net/qq_52758467/article/details/133685416