• C11线程池详解


    先来看私有数据成员:

    1. unsigned short _initSize; //初始化线程数量
    2. using Task = function<void()>; //类似于c函数指针
    3. vector<thread> _pool; //线程池
    4. queue<Task> _tasks; //任务队列
    5. mutex _lock; //任务队列同步锁
    6. #ifdef THREADPOOL_AUTO_GROW
    7. mutex _lockGrow; //线程池增长同步锁
    8. #endif // !THREADPOOL_AUTO_GROW
    9. condition_variable _task_cv; //条件阻塞
    10. atomic<bool> _run{ true }; //线程池是否执行
    11. atomic<int> _idlThrNum{ 0 }; //空闲线程数量

    代码中的注释已经说的很清楚了, 这里介绍下最后三个变量的意义:

    1. _task_cv (condition_variable):这是一个条件变量,用于在多线程环境中进行线程同步和协作。条件变量允许一个线程等待另一个线程满足某个条件后再继续执行。在这个线程池中,_task_cv 主要用于以下两个场景:

      • 当任务队列为空时,工作线程需要等待,直到有任务被添加到队列中,然后被唤醒以执行任务。
      • 在线程池销毁时,通过调用_task_cv.notify_all()来唤醒所有等待中的线程,以确保它们能够正确退出。
    2. _run (atomic):这是一个原子布尔类型的变量,用于标志线程池是否处于运行状态。_run 初始值为 true,表示线程池处于运行状态,可以接收和执行任务。当需要停止线程池时,将 _run 设置为 false,这将导致工作线程在执行完当前任务后退出,不再接受新的任务。

    3. _idlThrNum (atomic):这是一个原子整数类型的变量,用于跟踪线程池中空闲线程的数量。在线程池启动时,初始化为 0,每当一个工作线程完成任务并空闲时,_idlThrNum 会递增。这个变量的主要作用是控制线程池是否需要动态增加或减少线程的功能,例如在自动增长模式下。

    这里并没有使用bool值,主要是下面三点原因:

    1. 线程安全性:在线程池中,多个线程同时访问 _run 变量,包括主线程和工作线程。如果使用普通的布尔值,没有同步机制的情况下进行读写操作可能导致数据竞争和未定义行为。使用原子布尔类型可以保证多线程环境下的线程安全性,避免数据竞争问题。

    2. 原子性操作:原子布尔类型支持原子操作,这意味着对 _run 的操作(读取和写入)是原子的,不会被中断。这是多线程编程中的重要特性,可以确保操作的一致性,特别是在多线程环境中对共享数据进行修改时。

    3. 线程间通信_run 变量不仅用于控制线程池的运行状态,还用于线程之间的通信。当需要停止线程池时,主线程将 _run 设置为 false,工作线程在执行任务时会检查 _run 的值,如果为 false,则退出循环,实现了线程的协同停止。使用原子布尔类型可以保证线程之间正确地共享和通信这一重要信息。

    这些变量一起协作,确保线程池能够安全地接收、执行任务,并在需要时正确停止。通过条件变量 _task_cv,线程可以等待任务的到来,通过 _run 变量控制线程池的运行状态,通过 _idlThrNum 跟踪空闲线程数量,实现了线程池的任务调度和动态管理。

    补充一点:using Task = function;,std::function是一个函数包装器,该函数包装器模板能包装任何类型的可调用实体,如普通函数,函数对象,lamda表达式等。包装器可拷贝,移动等,并且包装器类型仅仅依赖于调用特征,而不依赖于可调用元素自身的类型。一个std::function类型对象实例可以包装下列这几种可调用实体:函数、函数指针、成员函数、静态函数、lamda表达式和函数对象。std::function对象实例可被拷贝和移动,并且可以使用指定的调用特征来直接调用目标元素。当std::function对象实例未包含任何实际可调用实体时,调用该std::function对象实例将抛出std::bad_function_call异常(经过测试,在使用函数重载时会报错)。

    现在来看构造函数:

    threadpool(unsigned short size = 4) { _initSize = size; addThread(size); }
    1. void addThread(unsigned short size)
    2. {
    3. for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
    4. { //增加线程数量,但不超过 预定义数量 THREADPOOL_MAX_NUM
    5. _pool.emplace_back( [this]{ //工作线程函数
    6. while (true) //防止 _run==false 时立即结束,此时任务队列可能不为空
    7. {
    8. Task task; // 获取一个待执行的 task
    9. {
    10. // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
    11. unique_lock<mutex> lock{ _lock };
    12. _task_cv.wait(lock, [this] { // wait 直到有 task, 或需要停止
    13. return !_run || !_tasks.empty();
    14. });
    15. if (!_run && _tasks.empty())
    16. return;
    17. _idlThrNum--;
    18. task = move(_tasks.front()); // 按先进先出从队列取一个 task
    19. _tasks.pop();
    20. }
    21. task();//执行任务
    22. unique_lock<mutex> lock{ _lock };
    23. _idlThrNum++;
    24. }
    25. });
    26. {
    27. unique_lock<mutex> lock{ _lock };
    28. _idlThrNum++;
    29. }
    30. }
    31. }

    这段代码的意思是:如果线程池已经停止运行了,那么就返回运行时错误;如果线程池数量小于设置的最大线程数,并且当前已有线程>0,那么就将一个匿名函数封装成一个线程,将这个线程存储到vector中。用了while(true)循环不断询问任务队列有没有任务,由于初始状态没有任务,因此会被阻塞在wait()函数,直到有任务,如果还在运行并且任务队列不为空,那么就将空闲的线程数目-1,将队列的头部线程取出来交给task来执行,并将这个任务从任务队列中删除。

    还记得上面的Task定义吗,正好接收匿名函数,对std::function感兴趣的话可以通过下面网址:

    std::function - cppreference.com

    再来看下代码中对于线程池的动态增长,由于动态改变需要改变线程的数量,因此需要获得自动增长的锁。也就是下面这段代码:

    1. #ifdef THREADPOOL_AUTO_GROW
    2. if (!_run) // stoped ??
    3. throw runtime_error("Grow on ThreadPool is stopped.");
    4. unique_lock<mutex> lockGrow{ _lockGrow }; //自动增长锁
    5. #endif // !THREADPOOL_AUTO_GROW

     然后我们看下面这段代码:

    1. #ifdef THREADPOOL_AUTO_GROW
    2. if (_idlThrNum>0 && _pool.size() > _initSize) //支持自动释放空闲线程,避免峰值过后大量空闲线程
    3. return;
    4. #endif // !THREADPOOL_AUTO_GROW

    这段代码定义:如果当前有空闲的线程,并且当前线程数量大于初始设置的数量,那么就执行return返回,终止这个进程的执行。乍一看有点矛盾,因为在创建时已经设置for循环创建对应数目的线程,看起来,永远不会满足_pool.size()>_initSize这个条件。

    然而在提交函数这里,如果空闲线程<1并且当前线程数目<最大线程数目,就会调用addThread函数增加一个线程。比如我初始化设置4个,最大限制16个,那么当没有空闲线程时,就可以增加新的线程。这就为_pool.size()>_initSize提供了条件。

    但是要注意,此时只是该线程不再执行里面的代码了,实际上线程依然存在于vector内部。

    再来看提交函数:

    1. template<class F, class... Args>
    2. auto commit(F&& f, Args&&... args) -> future<decltype(f(args...))>
    3. {
    4. if (!_run) // stoped ??
    5. throw runtime_error("commit on ThreadPool is stopped.");
    6. using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
    7. auto task = make_shared<packaged_task<RetType()>>(
    8. bind(forward<F>(f), forward<Args>(args)...)
    9. ); // 把函数入口及参数,打包(绑定)
    10. future<RetType> future = task->get_future();
    11. { // 添加任务到队列
    12. lock_guard<mutex> lock{ _lock };//对当前块的语句加锁 lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
    13. _tasks.emplace([task]() { // push(Task{...}) 放到队列后面
    14. (*task)();
    15. });
    16. }
    17. #ifdef THREADPOOL_AUTO_GROW
    18. if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
    19. addThread(1);
    20. #endif // !THREADPOOL_AUTO_GROW
    21. _task_cv.notify_one(); // 唤醒一个线程执行
    22. return future;
    23. }

     auto commit(F&& f, Args&&... args) -> future这里的意思是,接受一个可调用对象f和它的参数args,然后将它们放到future对象内,以便异步执行f,并获得返回值,然后利用auto的参数推导,将auto换为函数的返回值。

    using RetType = decltype(f(args...));这是声明函数返回值类型。

    auto task = make_shared>(
                bind(forward(f), forward(args)...)
    );这段代码的意义如下:

    1. auto task:使用 auto 关键字进行类型自动推导,将创建一个名为 task 的智能指针对象,这里使用的是 std::shared_ptr

    2. make_shared>make_shared 是 C++11 引入的函数模板,用于创建智能指针对象并分配内存。在这里,它创建了一个 std::shared_ptr,并初始化了一个 std::packaged_task 对象,其模板参数 RetType() 表示 packaged_task 将返回一个 RetType 类型的结果。RetType 是通过之前使用 decltype 推断出来的。

    3. bind(forward(f), forward(args)...)std::bind 是 C++11 提供的一个工具,用于将可调用对象和其参数绑定在一起,创建一个新的可调用对象。在这里,它将可调用对象 f 和参数 args 绑定在一起,生成一个新的可调用对象,该对象的类型是 RetType()

    4. 最终,task 是一个智能指针,指向一个 packaged_task 对象,该对象包装了可调用对象 f 和参数 args,并且已经绑定在一起,准备异步执行。通过这个 task,你可以将它传递给一个线程,然后在线程中执行它,最终获取 RetType 类型的返回值。这允许你实现异步执行函数或任务的功能。

    future future = task->get_future();
            {    // 添加任务到队列
                lock_guard lock{ _lock };
                _tasks.emplace([task]() { // push(Task{...}) 放到队列后面
                    (*task)();
                });
            }

    1. future future = task->get_future();:首先,通过 taskget_future() 成员函数创建了一个 std::future 对象 futurestd::future 是一种用于获取异步操作结果的机制,它允许你等待任务的完成并获取其返回值。

    2. {} 中的代码块:这是一个作用域块,用于限制 lock_guard 对象 lock 的生命周期,确保在离开作用域时自动释放锁。

    3. lock_guard lock{ _lock };:在这里,创建了一个 lock_guard 对象 lock,它会自动在其生命周期结束时释放 _lock 互斥锁。这确保了在执行后续代码之前获得了互斥锁,以防止多个线程同时修改 _tasks

    4. _tasks.emplace([task]() { (*task)(); });:在获得互斥锁之后,这行代码将一个任务添加到 _tasks 队列中。这个任务是一个 lambda 表达式,它会调用 task,也就是之前创建的 packaged_task,以执行任务。因为这是一个 lambda 表达式,它可以被添加到队列中并稍后由线程池中的某个线程执行。

    最后来看析构函数:

    1. ~threadpool()
    2. {
    3. _run=false;
    4. _task_cv.notify_all(); // 唤醒所有线程执行
    5. for (thread& thread : _pool) {
    6. //thread.detach(); // 让线程“自生自灭”
    7. if (thread.joinable())
    8. thread.join(); // 等待任务结束, 前提:线程一定会执行完
    9. }
    10. }

    析构函数之所以这么设计,是为了保证让所有的线程都执行完毕。

    1. _run=false;:首先,将线程池的运行状态 _run 设置为 false。这是一个标志,用于告诉线程池中的工作线程停止执行任务。

    2. _task_cv.notify_all();:然后,通过条件变量 _task_cv 来通知所有的工作线程,告知它们线程池即将被销毁,需要停止执行任务。这是通过调用 notify_all() 来实现的,它会唤醒所有因等待条件变量而阻塞的线程。

    3. for (thread& thread : _pool) { ... }:接下来,遍历线程池中的所有线程。

    4. if (thread.joinable()):对于每个线程,首先检查它是否可以被 join()。只有在线程是可联接的(即,该线程不处于已分离状态)时,才能安全地调用 join()

    5. thread.join();:如果线程是可联接的,就调用 join() 来等待线程执行完成。这确保了所有线程都能正常结束执行,而不会在析构时产生未完成的任务。

  • 相关阅读:
    JavaScript 67 JavaScript HTML DOM 67.7 JavaScript HTML DOM - 改变 CSS
    每日五道java面试题之spring篇(三)
    Diagrams——制作短小精悍的流程图
    【毕业设计】基于javaEE+原生Servlet+MySql的酒店管理系统设计与实现(毕业论文+程序源码)——酒店管理系统
    RabiitMQ消息队列系统
    iOS高级理论:常用的架构模式
    Linux命令
    Java的IO流-转换流
    【小白专用 已验证24.5.30】ThinkPHP6 视图
    新消费降温,良品铺子还能走多远?
  • 原文地址:https://blog.csdn.net/pan_1214_/article/details/133554841