先来看私有数据成员:
- unsigned short _initSize; //初始化线程数量
- using Task = function<void()>; //类似于c函数指针
- vector<thread> _pool; //线程池
- queue<Task> _tasks; //任务队列
- mutex _lock; //任务队列同步锁
-
- #ifdef THREADPOOL_AUTO_GROW
- mutex _lockGrow; //线程池增长同步锁
- #endif // !THREADPOOL_AUTO_GROW
-
- condition_variable _task_cv; //条件阻塞
- atomic<bool> _run{ true }; //线程池是否执行
- atomic<int> _idlThrNum{ 0 }; //空闲线程数量
代码中的注释已经说的很清楚了, 这里介绍下最后三个变量的意义:
_task_cv (condition_variable):这是一个条件变量,用于在多线程环境中进行线程同步和协作。条件变量允许一个线程等待另一个线程满足某个条件后再继续执行。在这个线程池中,_task_cv 主要用于以下两个场景:
_task_cv.notify_all()来唤醒所有等待中的线程,以确保它们能够正确退出。_run (atomic_run 初始值为 true,表示线程池处于运行状态,可以接收和执行任务。当需要停止线程池时,将 _run 设置为 false,这将导致工作线程在执行完当前任务后退出,不再接受新的任务。
_idlThrNum (atomic_idlThrNum 会递增。这个变量的主要作用是控制线程池是否需要动态增加或减少线程的功能,例如在自动增长模式下。
这里并没有使用bool值,主要是下面三点原因:
线程安全性:在线程池中,多个线程同时访问 _run 变量,包括主线程和工作线程。如果使用普通的布尔值,没有同步机制的情况下进行读写操作可能导致数据竞争和未定义行为。使用原子布尔类型可以保证多线程环境下的线程安全性,避免数据竞争问题。
原子性操作:原子布尔类型支持原子操作,这意味着对 _run 的操作(读取和写入)是原子的,不会被中断。这是多线程编程中的重要特性,可以确保操作的一致性,特别是在多线程环境中对共享数据进行修改时。
线程间通信:_run 变量不仅用于控制线程池的运行状态,还用于线程之间的通信。当需要停止线程池时,主线程将 _run 设置为 false,工作线程在执行任务时会检查 _run 的值,如果为 false,则退出循环,实现了线程的协同停止。使用原子布尔类型可以保证线程之间正确地共享和通信这一重要信息。
这些变量一起协作,确保线程池能够安全地接收、执行任务,并在需要时正确停止。通过条件变量 _task_cv,线程可以等待任务的到来,通过 _run 变量控制线程池的运行状态,通过 _idlThrNum 跟踪空闲线程数量,实现了线程池的任务调度和动态管理。
补充一点:using Task = function
现在来看构造函数:
threadpool(unsigned short size = 4) { _initSize = size; addThread(size); }
- void addThread(unsigned short size)
- {
- for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
- { //增加线程数量,但不超过 预定义数量 THREADPOOL_MAX_NUM
- _pool.emplace_back( [this]{ //工作线程函数
- while (true) //防止 _run==false 时立即结束,此时任务队列可能不为空
- {
- Task task; // 获取一个待执行的 task
- {
- // unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
- unique_lock<mutex> lock{ _lock };
- _task_cv.wait(lock, [this] { // wait 直到有 task, 或需要停止
- return !_run || !_tasks.empty();
- });
- if (!_run && _tasks.empty())
- return;
- _idlThrNum--;
- task = move(_tasks.front()); // 按先进先出从队列取一个 task
- _tasks.pop();
- }
- task();//执行任务
- unique_lock<mutex> lock{ _lock };
- _idlThrNum++;
- }
- });
- {
- unique_lock<mutex> lock{ _lock };
- _idlThrNum++;
- }
- }
- }
这段代码的意思是:如果线程池已经停止运行了,那么就返回运行时错误;如果线程池数量小于设置的最大线程数,并且当前已有线程>0,那么就将一个匿名函数封装成一个线程,将这个线程存储到vector中。用了while(true)循环不断询问任务队列有没有任务,由于初始状态没有任务,因此会被阻塞在wait()函数,直到有任务,如果还在运行并且任务队列不为空,那么就将空闲的线程数目-1,将队列的头部线程取出来交给task来执行,并将这个任务从任务队列中删除。
还记得上面的Task定义吗,正好接收匿名函数,对std::function感兴趣的话可以通过下面网址:
std::function - cppreference.com
再来看下代码中对于线程池的动态增长,由于动态改变需要改变线程的数量,因此需要获得自动增长的锁。也就是下面这段代码:
- #ifdef THREADPOOL_AUTO_GROW
- if (!_run) // stoped ??
- throw runtime_error("Grow on ThreadPool is stopped.");
- unique_lock<mutex> lockGrow{ _lockGrow }; //自动增长锁
- #endif // !THREADPOOL_AUTO_GROW
然后我们看下面这段代码:
- #ifdef THREADPOOL_AUTO_GROW
- if (_idlThrNum>0 && _pool.size() > _initSize) //支持自动释放空闲线程,避免峰值过后大量空闲线程
- return;
- #endif // !THREADPOOL_AUTO_GROW
这段代码定义:如果当前有空闲的线程,并且当前线程数量大于初始设置的数量,那么就执行return返回,终止这个进程的执行。乍一看有点矛盾,因为在创建时已经设置for循环创建对应数目的线程,看起来,永远不会满足_pool.size()>_initSize这个条件。
然而在提交函数这里,如果空闲线程<1并且当前线程数目<最大线程数目,就会调用addThread函数增加一个线程。比如我初始化设置4个,最大限制16个,那么当没有空闲线程时,就可以增加新的线程。这就为_pool.size()>_initSize提供了条件。
但是要注意,此时只是该线程不再执行里面的代码了,实际上线程依然存在于vector内部。
再来看提交函数:
- template<class F, class... Args>
- auto commit(F&& f, Args&&... args) -> future<decltype(f(args...))>
- {
- if (!_run) // stoped ??
- throw runtime_error("commit on ThreadPool is stopped.");
-
- using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
- auto task = make_shared<packaged_task<RetType()>>(
- bind(forward<F>(f), forward<Args>(args)...)
- ); // 把函数入口及参数,打包(绑定)
- future<RetType> future = task->get_future();
- { // 添加任务到队列
- lock_guard<mutex> lock{ _lock };//对当前块的语句加锁 lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
- _tasks.emplace([task]() { // push(Task{...}) 放到队列后面
- (*task)();
- });
- }
- #ifdef THREADPOOL_AUTO_GROW
- if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
- addThread(1);
- #endif // !THREADPOOL_AUTO_GROW
- _task_cv.notify_one(); // 唤醒一个线程执行
-
- return future;
- }
auto commit(F&& f, Args&&... args) -> future
using RetType = decltype(f(args...));这是声明函数返回值类型。
auto task = make_shared
bind(forward
);这段代码的意义如下:
auto task:使用 auto 关键字进行类型自动推导,将创建一个名为 task 的智能指针对象,这里使用的是 std::shared_ptr。
make_shared:make_shared 是 C++11 引入的函数模板,用于创建智能指针对象并分配内存。在这里,它创建了一个 std::shared_ptr,并初始化了一个 std::packaged_task 对象,其模板参数 RetType() 表示 packaged_task 将返回一个 RetType 类型的结果。RetType 是通过之前使用 decltype 推断出来的。
bind(forward:std::bind 是 C++11 提供的一个工具,用于将可调用对象和其参数绑定在一起,创建一个新的可调用对象。在这里,它将可调用对象 f 和参数 args 绑定在一起,生成一个新的可调用对象,该对象的类型是 RetType()。
最终,task 是一个智能指针,指向一个 packaged_task 对象,该对象包装了可调用对象 f 和参数 args,并且已经绑定在一起,准备异步执行。通过这个 task,你可以将它传递给一个线程,然后在线程中执行它,最终获取 RetType 类型的返回值。这允许你实现异步执行函数或任务的功能。
future
{ // 添加任务到队列
lock_guard
_tasks.emplace([task]() { // push(Task{...}) 放到队列后面
(*task)();
});
}
future:首先,通过 task 的 get_future() 成员函数创建了一个 std::future 对象 future。std::future 是一种用于获取异步操作结果的机制,它允许你等待任务的完成并获取其返回值。
{} 中的代码块:这是一个作用域块,用于限制 lock_guard 对象 lock 的生命周期,确保在离开作用域时自动释放锁。
lock_guard:在这里,创建了一个 lock_guard 对象 lock,它会自动在其生命周期结束时释放 _lock 互斥锁。这确保了在执行后续代码之前获得了互斥锁,以防止多个线程同时修改 _tasks。
_tasks.emplace([task]() { (*task)(); });:在获得互斥锁之后,这行代码将一个任务添加到 _tasks 队列中。这个任务是一个 lambda 表达式,它会调用 task,也就是之前创建的 packaged_task,以执行任务。因为这是一个 lambda 表达式,它可以被添加到队列中并稍后由线程池中的某个线程执行。
最后来看析构函数:
- ~threadpool()
- {
- _run=false;
- _task_cv.notify_all(); // 唤醒所有线程执行
- for (thread& thread : _pool) {
- //thread.detach(); // 让线程“自生自灭”
- if (thread.joinable())
- thread.join(); // 等待任务结束, 前提:线程一定会执行完
- }
- }
析构函数之所以这么设计,是为了保证让所有的线程都执行完毕。
_run=false;:首先,将线程池的运行状态 _run 设置为 false。这是一个标志,用于告诉线程池中的工作线程停止执行任务。
_task_cv.notify_all();:然后,通过条件变量 _task_cv 来通知所有的工作线程,告知它们线程池即将被销毁,需要停止执行任务。这是通过调用 notify_all() 来实现的,它会唤醒所有因等待条件变量而阻塞的线程。
for (thread& thread : _pool) { ... }:接下来,遍历线程池中的所有线程。
if (thread.joinable()):对于每个线程,首先检查它是否可以被 join()。只有在线程是可联接的(即,该线程不处于已分离状态)时,才能安全地调用 join()。
thread.join();:如果线程是可联接的,就调用 join() 来等待线程执行完成。这确保了所有线程都能正常结束执行,而不会在析构时产生未完成的任务。