在C++11实现的线程池中,通过submitTask函数提交一个任务对象时,会返回一个Result对象给用户,用户通过这个Result对象来获取任务的返回值。
获取任务返回值时,如果任务未执行完成,会阻塞到任务执行结束返回。
任务执行结束会把返回值设置到Result对象的any_成员。
但是在我们只需要任务做某件事而不关注其返回值时,比如写文件,网络IO等,但继承来的run方法是需要返回一个Any类型的值的。
而当这些很耗时的操作,还没来得及给Result设置一个空返回值(比如0)时。submitTask返回的对象生存期到了,析构掉了。如下面的情况:
#include
#include
#include
using namespace std;
#include "threadpool.h"
class MyTask : public Task
{
public:
MyTask()
{}
Any run() // run方法最终就在线程池分配的线程中去做执行了!
{
cout << "do something ..." << endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
cout << "set return value..." << endl;
return "";
}
};
int main()
{
ThreadPool pool;
pool.start();
{
Result res = pool.submitTask(make_shared<MyTask>());
}
cout << "Result res已经析构" << endl;
return 0;
}

可以看到,set return value是在Result对象析构后才执行的。虽然程序正常结束,但此时已经发生了非法的地址访问了,没出现问题是因为测试代码较少,在生产环境中是肯定会出现问题的。
来看问题所在:


在Task任务抽象类和Result接收返回值类存在交叉引用的关系,Result类的task_是一个强智能指针,而Task类的result_是一个普通指针。
在我们需要获取返回值的情况下,Result对象的生存期是一定会大于Task的,因为调用Result的get方法获取返回值会阻塞等待Task任务执行完成,此时一切都跟我们预想的一样,Task执行完成给Result设置返回值,Result获取返回值,程序正常执行。正常执行的情况:
#include
#include
#include
using namespace std;
#include "threadpool.h"
class MyTask : public Task
{
public:
MyTask()
{}
Any run() // run方法最终就在线程池分配的线程中去做执行了!
{
cout << "do something ..." << endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
cout << "set return value..." << endl;
return "";
}
};
int main()
{
ThreadPool pool;
pool.start();
{
Result res = pool.submitTask(make_shared<MyTask>());
res.get();
}
cout << "Result res已经析构" << endl;
return 0;
}
上面的cout << "set return value..." << endl;应该写在myTask的基类Task的exec函数里边的,为了方便就在这里写了。

但如果用户不需要获取返回值,生成的临时对象刚从submitTask构造出来就析构掉了,此时Task执行完在对这个析构掉的成员操作,就会发生段错误。
所有我们需要保证task任务执行结束时需要检查submitTask返回对象没有吗被析构,如果被析构就不对Result设置返回值了,而这需要用到一个神器:shared_ptr和weak_ptr。
重新实现的Task和Result:
//接收提交到线程池的task任务执行完成后的返回值类型Result
class Result : public std::enable_shared_from_this<Result>
{
public:
Result(std::shared_ptr<Task> task, bool isValid = true);
~Result() { std:: cout << "~Result" << std::endl; }
//setVal方法,获取任务执行完的返回值
void setVal(Any any);
//get方法,调用这个方法获取task的返回值
Any get();
private:
Any any_; //存储任务的返回值
Semaphore sem_; //线程通信信号量
std::weak_ptr<Task> task_; //指向对应获取返回值的任务对象
std::atomic_bool isValid_; //返回值是否有效
};
//任务抽象基类
class Task
{
public:
Task();
~Task() = default;
//用户可以自定义任意任务类型,从Task继承,重写run方法,实现自定义任务处理
virtual Any run() = 0;
void exec()
{
Any any = run();
std::shared_ptr<Result> sp = result_.lock();
if (nullptr != sp)
{
cout << "set return value..." << endl;
sp->setVal(std::move(any));
}
}
void setResult(std::shared_ptr<Result> res);
private:
std::weak_ptr<Result> result_;
};
重新实现的submitTask方法:
//给线程池提交任务 用户调用该接口,传入任务对象,生产任务
std::shared_ptr<Result> ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
//获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreadHold_; }))
{
//等待1s之后条件还是没有满足
std::cerr << "task queue is full, submit task fail." << std::endl;
return std::make_shared<Result>(sp, false);
}
//如果有空余,把任务放入任务队列中
taskQue_.emplace(sp);
taskSize_++;
//新放任务,任务队列肯定不空,notEmpty_上通知
notEmpty_.notify_all();
//cache模式 场景:小而快的任务,任务处理比较紧急,不适合任务比较多且比较耗时的情况
//需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
if (poolMode_ == PoolMode::MODE_CACHE
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
std::cout << "create new thread " << std::endl;
//创建新线程
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start();
curThreadSize_++;
idleThreadSize_++;
}
//返回任务的Result对象
//return Result(sp);
std::shared_ptr<Result> rsp = std::make_shared<Result>(sp);
sp->setResult(rsp);
return rsp;
}
此时把cout << "set return value..." << endl;加到Task的exec中,再去执行不需要返回值的任务,就不会出现Result已经被析构还回去设置返回值这种情况了。
#include
#include
#include
using namespace std;
#include "threadpool.h"
class MyTask : public Task
{
public:
MyTask()
{}
Any run() // run方法最终就在线程池分配的线程中去做执行了!
{
cout << "do something ..." << endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
return "";
}
};
int main()
{
ThreadPool pool;
pool.start();
{
pool.submitTask(make_shared<MyTask>());
}
cout << "Result res已经析构" << endl;
return 0;
}
