• 线程池交叉引用问题纠正


    线程池交叉引用问题纠正

    在C++11实现的线程池中,通过submitTask函数提交一个任务对象时,会返回一个Result对象给用户,用户通过这个Result对象来获取任务的返回值。
    获取任务返回值时,如果任务未执行完成,会阻塞到任务执行结束返回。
    任务执行结束会把返回值设置到Result对象的any_成员。

    但是在我们只需要任务做某件事而不关注其返回值时,比如写文件,网络IO等,但继承来的run方法是需要返回一个Any类型的值的。
    而当这些很耗时的操作,还没来得及给Result设置一个空返回值(比如0)时。submitTask返回的对象生存期到了,析构掉了。如下面的情况:

    #include 
    #include 
    #include 
    using namespace std;
    
    #include "threadpool.h"
    
    class MyTask : public Task
    {
    public:
        MyTask()
        {}
        Any run()  // run方法最终就在线程池分配的线程中去做执行了!
        {
            cout << "do something ..." << endl;
            std::this_thread::sleep_for(std::chrono::seconds(3));
    
            cout << "set return value..." << endl;
            return "";
        }
    };
    
    int main()
    {
        ThreadPool pool;
        pool.start();
        {
            Result res = pool.submitTask(make_shared<MyTask>());
        }
        cout << "Result res已经析构" << endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述
    可以看到,set return value是在Result对象析构后才执行的。虽然程序正常结束,但此时已经发生了非法的地址访问了,没出现问题是因为测试代码较少,在生产环境中是肯定会出现问题的。

    来看问题所在:
    在这里插入图片描述
    在这里插入图片描述
    在Task任务抽象类和Result接收返回值类存在交叉引用的关系,Result类的task_是一个强智能指针,而Task类的result_是一个普通指针。
    在我们需要获取返回值的情况下,Result对象的生存期是一定会大于Task的,因为调用Result的get方法获取返回值会阻塞等待Task任务执行完成,此时一切都跟我们预想的一样,Task执行完成给Result设置返回值,Result获取返回值,程序正常执行。正常执行的情况:

    #include 
    #include 
    #include 
    using namespace std;
    
    #include "threadpool.h"
    
    class MyTask : public Task
    {
    public:
        MyTask()
        {}
        Any run()  // run方法最终就在线程池分配的线程中去做执行了!
        {
            cout << "do something ..." << endl;
            std::this_thread::sleep_for(std::chrono::seconds(3));
    
            cout << "set return value..." << endl;
            return "";
        }
    };
    
    int main()
    {
        ThreadPool pool;
        pool.start();
        {
            Result res = pool.submitTask(make_shared<MyTask>());
            res.get();
        }
        cout << "Result res已经析构" << endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    上面的cout << "set return value..." << endl;应该写在myTask的基类Task的exec函数里边的,为了方便就在这里写了。
    在这里插入图片描述
    但如果用户不需要获取返回值,生成的临时对象刚从submitTask构造出来就析构掉了,此时Task执行完在对这个析构掉的成员操作,就会发生段错误。

    所有我们需要保证task任务执行结束时需要检查submitTask返回对象没有吗被析构,如果被析构就不对Result设置返回值了,而这需要用到一个神器:shared_ptr和weak_ptr。
    重新实现的Task和Result:

    //接收提交到线程池的task任务执行完成后的返回值类型Result
    class Result : public std::enable_shared_from_this<Result>
    {
    public:
    	Result(std::shared_ptr<Task> task, bool isValid = true);
    	~Result() { std:: cout << "~Result" << std::endl; }
    
    	//setVal方法,获取任务执行完的返回值
    	void setVal(Any any);
    
    	//get方法,调用这个方法获取task的返回值
    	Any get();
    private:
    	Any any_; //存储任务的返回值
    	Semaphore sem_; //线程通信信号量
    	std::weak_ptr<Task> task_; //指向对应获取返回值的任务对象
    	std::atomic_bool isValid_;	//返回值是否有效
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    //任务抽象基类
    class Task
    {
    public:
    	Task();
    	~Task() = default;
    	//用户可以自定义任意任务类型,从Task继承,重写run方法,实现自定义任务处理
    	virtual Any run() = 0;
    
    	void exec()
    	{
    		Any any = run();
    		std::shared_ptr<Result> sp = result_.lock();
    		if (nullptr != sp)
    		{
    			cout << "set return value..." << endl;
    			sp->setVal(std::move(any));
    		}
    	}
    	void setResult(std::shared_ptr<Result> res);
    
    private:
    	std::weak_ptr<Result> result_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    重新实现的submitTask方法:

    //给线程池提交任务 用户调用该接口,传入任务对象,生产任务
    std::shared_ptr<Result> ThreadPool::submitTask(std::shared_ptr<Task> sp)
    {
    	//获取锁
    	std::unique_lock<std::mutex> lock(taskQueMtx_);
    
    	if (!notFull_.wait_for(lock, std::chrono::seconds(1),
    		[&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreadHold_; }))
    	{
    		//等待1s之后条件还是没有满足
    		std::cerr << "task queue is full, submit task fail." << std::endl;
    		return std::make_shared<Result>(sp, false);
    	}
    
    	//如果有空余,把任务放入任务队列中
    	taskQue_.emplace(sp);
    	taskSize_++;
    
    	//新放任务,任务队列肯定不空,notEmpty_上通知
    	notEmpty_.notify_all();
    
    	//cache模式 场景:小而快的任务,任务处理比较紧急,不适合任务比较多且比较耗时的情况
    	//需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
    	if (poolMode_ == PoolMode::MODE_CACHE 
    		&& taskSize_ > idleThreadSize_ 
    		&& curThreadSize_ < threadSizeThreshHold_)
    	{
    		std::cout << "create new thread " << std::endl;
    		//创建新线程
    		auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
    		int threadId = ptr->getId();
    		threads_.emplace(threadId, std::move(ptr));
    		threads_[threadId]->start();
    		curThreadSize_++;
    		idleThreadSize_++;
    	}
    
    	//返回任务的Result对象
    	//return Result(sp);
    	std::shared_ptr<Result> rsp = std::make_shared<Result>(sp);
    	sp->setResult(rsp);
    	return rsp;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    此时把cout << "set return value..." << endl;加到Task的exec中,再去执行不需要返回值的任务,就不会出现Result已经被析构还回去设置返回值这种情况了。

    #include 
    #include 
    #include 
    using namespace std;
    
    #include "threadpool.h"
    
    class MyTask : public Task
    {
    public:
        MyTask()
        {}
        Any run()  // run方法最终就在线程池分配的线程中去做执行了!
        {
            cout << "do something ..." << endl;
            std::this_thread::sleep_for(std::chrono::seconds(3));
    
            return "";
        }
    };
    
    int main()
    {
        ThreadPool pool;
        pool.start();
        {
            pool.submitTask(make_shared<MyTask>());
        }
        cout << "Result res已经析构" << endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在这里插入图片描述

  • 相关阅读:
    聊一聊 HBase 是如何写入数据的?
    [python刷题模板] 拓扑排序(Topological Sorting)
    数据库管理-第152期 Oracle Vector DB & AI-04(20240220)
    文献阅读 Person-in-WiFi:Fine-grained Person Perception using WiFi
    编译器优化:何为SLP矢量化
    NCCL源码解析②:Bootstrap网络连接的建立
    给女朋友的微信专属推送
    MonkeyRunner自动化测试
    【JAVA程序设计】(C00083)基于SSM+uniapp好物分享小程序及管理系统-有文档
    PDF流前端如何接收:深度解析与实用策略
  • 原文地址:https://blog.csdn.net/weixin_43973403/article/details/126356810