线程池顾名思义就是一个存放线程的池子。为什么会有线程池?在这里笔者想强调一点:任何一项新的技术的产生都是为了解决某种问题的!那么线程池的存在肯定也是为了解决项目中存在的某种问题的。
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。线程池就是专门解决这种问题的。线程池中的线程创建一次多次复用,直至不需要多线程时(大概率进程结束)再全部销毁。
线程池的组成:线程队列、任务队列、管理者。
线程队列:主要是创建的线程队列,用于线程复用,不频繁申请、释放。
任务队列:需要程序执行的任务组成的队列。
管理者:用于协调、调度线程执行任务的分配。
该案例是主线程创建任务,然后线程池管理线程去执行任务,执行完退出。该案例中涉及线程控制的一些知识,详见:Linux的线程控制_码农诗人的博客-CSDN博客,代码pthread_pool.c如下:
- /**************************************************
- *** author : lijd
- *** date : 2022-09-07
- **************************************************/
- #include
- #include
- #include
- #include
- #include
-
- #define PTHREAD_NUM 3
-
- // 链表的添加删除宏定义
- #define LL_ADD(item, list) do{ \
- item->prev = NULL; \
- item->next = list; \
- if(list != NULL) list->prev = item; \
- list = item; \
- } while(0)
-
- #define LL_REMOVE(item, list) do { \
- if(item->prev != NULL) item->prev->next = item->next; \
- if(item->next != NULL) item->next->prev = item->prev; \
- if(list == item) list = item->next; \
- item->prev = item->next = NULL; \
- } while(0)
-
- // 线程队列结构
- struct NWORKER{
- pthread_t thread;
- struct NMANAGER *pool;
-
- // 线程池是否启停止
- int terminate;
-
- struct NWORKER *prev;
- struct NWORKER *next;
- };
-
- // 任务队列结构
- struct NJOB{
- void (*func)(void *);
- void *user_data;
-
- struct NJOB *prev;
- struct NJOB *next;
- };
-
- // 管理对象结构
- struct NMANAGER{
- // 任务链表
- struct NWORKER *workers;
-
- // 线程链表
- struct NJOB *jobs;
-
- // 条件变量
- pthread_cond_t jobs_cond;
- // 线程互斥锁
- pthread_mutex_t jobs_mutex;
- };
-
- typedef struct NMANAGER nThreadPool;
-
- // 线程回调函数
- static void *nThreadCallback(void *arg)
- {
- printf("---------------------nThreadCallback entry---------------thread ID:%ld\n", pthread_self());
- struct NWORKER *worker = (struct NWORKER *)arg;
-
- while(1)
- {
- pthread_mutex_lock(&worker->pool->jobs_mutex);
- while(worker->pool->jobs == NULL){
- if(worker->pool->workers->terminate) break;
-
- // pthread_cond_wait 原子调用: 等待条件变量, 解除锁, 然后阻塞
- // 当 pthread_cond_wait 返回,则条件变量有信号,同时上锁
- pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex);
- }
- // printf("-----------terminate:%d-------thread ID:%ld\n", worker->pool->workers->terminate, pthread_self());
- if(worker->pool->workers->terminate){
- pthread_mutex_unlock(&worker->pool->jobs_mutex);
- break;
- }
-
- struct NJOB *job = worker->pool->jobs;
- LL_REMOVE(job, worker->pool->jobs);
- pthread_mutex_unlock(&worker->pool->jobs_mutex);
- job->func(job->user_data);
- }
-
- free(worker);
- printf("---------------------nThreadCallback exit---------------thread ID:%ld\n", pthread_self());
- pthread_exit(NULL);
- }
-
- // 创建线程池
- int nThreadPoolCreate(nThreadPool *pool, int numWorkers)
- {
- if(numWorkers < 1) numWorkers = 1;
-
- if(pool == NULL) return -1;
- memset(pool, 0, sizeof(nThreadPool));
-
- pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
- memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
-
- pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
- memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
-
- int i = 0;
- for(i = 0; i < numWorkers; i++)
- {
- struct NWORKER *worker = (struct NWORKER *)malloc(sizeof(struct NWORKER));
-
- if(worker == NULL){
- perror("malloc");
- return -2;
- }
-
- memset(worker, 0, sizeof(struct NWORKER));
- worker->pool = pool;
-
- int ret = pthread_create(&worker->thread, NULL, nThreadCallback, worker);
- if(ret){
- perror("pthread_create");
- return -3;
- }
-
- LL_ADD(worker, pool->workers);
- }
- printf("---------------------nThreadPoolCreate success!---------------------\n");
- return 0;
- }
-
- int nThreadPoolDestroy(nThreadPool *pool)
- {
- struct NWORKER *worker = NULL;
- for(worker = pool->workers; worker != NULL; worker = worker->next){
- worker->terminate = 1;
- }
-
- pthread_mutex_lock(&pool->jobs_mutex);
- pthread_cond_broadcast(&pool->jobs_cond);
- pthread_mutex_unlock(&pool->jobs_mutex);
- }
-
- void nThreadPoolPush(nThreadPool *pool, struct NJOB *job)
- {
- pthread_mutex_lock(&pool->jobs_mutex);
- LL_ADD(job, pool->jobs);
- pthread_cond_signal(&pool->jobs_cond);
- pthread_mutex_unlock(&pool->jobs_mutex);
- }
-
- #if 1
-
- // 任务回调函数
- static void *my_jobFunc(void *user_data)
- {
- if(user_data == NULL) return;
- printf("thread ID:%ld, data:%d\n", pthread_self(), *(int *)user_data);
- sleep(1);
- }
-
- int main()
- {
- nThreadPool *pool = (nThreadPool *)malloc(sizeof(struct NMANAGER));
-
- int ret = nThreadPoolCreate(pool, PTHREAD_NUM);
-
- if(ret != 0)
- {
- return -1;
- }
- printf("-----------------Main thread ID:%ld----------------\n", pthread_self());
- int i = 0;
- for(i = 0; i < 10; i++)
- {
- struct NJOB *job_node = (struct NJOB *)malloc(sizeof(struct NJOB));
- if(job_node == NULL)
- {
- perror("malloc job_node");
- return -2;
- }
-
- job_node->func = my_jobFunc;
-
- job_node->user_data = (void *)malloc(sizeof(int));
- memcpy(job_node->user_data, &i, sizeof(int));
-
- nThreadPoolPush(pool, job_node);
- }
-
- while(pool->jobs != NULL)
- {
- sleep(5);
- }
-
- nThreadPoolDestroy(pool);
-
- sleep(2);
- printf("-----------------Main thread Exit----------------\n");
- return 0;
- }
-
- #endif
执行结果如下:
