• 线程池实现简单案例(C语言)


    原理

            线程池顾名思义就是一个存放线程的池子。为什么会有线程池?在这里笔者想强调一点:任何一项新的技术的产生都是为了解决某种问题的!那么线程池的存在肯定也是为了解决项目中存在的某种问题的。

            我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。线程池就是专门解决这种问题的。线程池中的线程创建一次多次复用,直至不需要多线程时(大概率进程结束)再全部销毁。

    组成

            线程池的组成:线程队列、任务队列、管理者。

            线程队列:主要是创建的线程队列,用于线程复用,不频繁申请、释放。

            任务队列:需要程序执行的任务组成的队列。

            管理者:用于协调、调度线程执行任务的分配。

    案例

            该案例是主线程创建任务,然后线程池管理线程去执行任务,执行完退出。该案例中涉及线程控制的一些知识,详见:Linux的线程控制_码农诗人的博客-CSDN博客,代码pthread_pool.c如下:

    1. /**************************************************
    2. *** author : lijd
    3. *** date : 2022-09-07
    4. **************************************************/
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #define PTHREAD_NUM 3
    11. // 链表的添加删除宏定义
    12. #define LL_ADD(item, list) do{ \
    13. item->prev = NULL; \
    14. item->next = list; \
    15. if(list != NULL) list->prev = item; \
    16. list = item; \
    17. } while(0)
    18. #define LL_REMOVE(item, list) do { \
    19. if(item->prev != NULL) item->prev->next = item->next; \
    20. if(item->next != NULL) item->next->prev = item->prev; \
    21. if(list == item) list = item->next; \
    22. item->prev = item->next = NULL; \
    23. } while(0)
    24. // 线程队列结构
    25. struct NWORKER{
    26. pthread_t thread;
    27. struct NMANAGER *pool;
    28. // 线程池是否启停止
    29. int terminate;
    30. struct NWORKER *prev;
    31. struct NWORKER *next;
    32. };
    33. // 任务队列结构
    34. struct NJOB{
    35. void (*func)(void *);
    36. void *user_data;
    37. struct NJOB *prev;
    38. struct NJOB *next;
    39. };
    40. // 管理对象结构
    41. struct NMANAGER{
    42. // 任务链表
    43. struct NWORKER *workers;
    44. // 线程链表
    45. struct NJOB *jobs;
    46. // 条件变量
    47. pthread_cond_t jobs_cond;
    48. // 线程互斥锁
    49. pthread_mutex_t jobs_mutex;
    50. };
    51. typedef struct NMANAGER nThreadPool;
    52. // 线程回调函数
    53. static void *nThreadCallback(void *arg)
    54. {
    55. printf("---------------------nThreadCallback entry---------------thread ID:%ld\n", pthread_self());
    56. struct NWORKER *worker = (struct NWORKER *)arg;
    57. while(1)
    58. {
    59. pthread_mutex_lock(&worker->pool->jobs_mutex);
    60. while(worker->pool->jobs == NULL){
    61. if(worker->pool->workers->terminate) break;
    62. // pthread_cond_wait 原子调用: 等待条件变量, 解除锁, 然后阻塞
    63. // 当 pthread_cond_wait 返回,则条件变量有信号,同时上锁
    64. pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex);
    65. }
    66. // printf("-----------terminate:%d-------thread ID:%ld\n", worker->pool->workers->terminate, pthread_self());
    67. if(worker->pool->workers->terminate){
    68. pthread_mutex_unlock(&worker->pool->jobs_mutex);
    69. break;
    70. }
    71. struct NJOB *job = worker->pool->jobs;
    72. LL_REMOVE(job, worker->pool->jobs);
    73. pthread_mutex_unlock(&worker->pool->jobs_mutex);
    74. job->func(job->user_data);
    75. }
    76. free(worker);
    77. printf("---------------------nThreadCallback exit---------------thread ID:%ld\n", pthread_self());
    78. pthread_exit(NULL);
    79. }
    80. // 创建线程池
    81. int nThreadPoolCreate(nThreadPool *pool, int numWorkers)
    82. {
    83. if(numWorkers < 1) numWorkers = 1;
    84. if(pool == NULL) return -1;
    85. memset(pool, 0, sizeof(nThreadPool));
    86. pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    87. memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
    88. pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    89. memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
    90. int i = 0;
    91. for(i = 0; i < numWorkers; i++)
    92. {
    93. struct NWORKER *worker = (struct NWORKER *)malloc(sizeof(struct NWORKER));
    94. if(worker == NULL){
    95. perror("malloc");
    96. return -2;
    97. }
    98. memset(worker, 0, sizeof(struct NWORKER));
    99. worker->pool = pool;
    100. int ret = pthread_create(&worker->thread, NULL, nThreadCallback, worker);
    101. if(ret){
    102. perror("pthread_create");
    103. return -3;
    104. }
    105. LL_ADD(worker, pool->workers);
    106. }
    107. printf("---------------------nThreadPoolCreate success!---------------------\n");
    108. return 0;
    109. }
    110. int nThreadPoolDestroy(nThreadPool *pool)
    111. {
    112. struct NWORKER *worker = NULL;
    113. for(worker = pool->workers; worker != NULL; worker = worker->next){
    114. worker->terminate = 1;
    115. }
    116. pthread_mutex_lock(&pool->jobs_mutex);
    117. pthread_cond_broadcast(&pool->jobs_cond);
    118. pthread_mutex_unlock(&pool->jobs_mutex);
    119. }
    120. void nThreadPoolPush(nThreadPool *pool, struct NJOB *job)
    121. {
    122. pthread_mutex_lock(&pool->jobs_mutex);
    123. LL_ADD(job, pool->jobs);
    124. pthread_cond_signal(&pool->jobs_cond);
    125. pthread_mutex_unlock(&pool->jobs_mutex);
    126. }
    127. #if 1
    128. // 任务回调函数
    129. static void *my_jobFunc(void *user_data)
    130. {
    131. if(user_data == NULL) return;
    132. printf("thread ID:%ld, data:%d\n", pthread_self(), *(int *)user_data);
    133. sleep(1);
    134. }
    135. int main()
    136. {
    137. nThreadPool *pool = (nThreadPool *)malloc(sizeof(struct NMANAGER));
    138. int ret = nThreadPoolCreate(pool, PTHREAD_NUM);
    139. if(ret != 0)
    140. {
    141. return -1;
    142. }
    143. printf("-----------------Main thread ID:%ld----------------\n", pthread_self());
    144. int i = 0;
    145. for(i = 0; i < 10; i++)
    146. {
    147. struct NJOB *job_node = (struct NJOB *)malloc(sizeof(struct NJOB));
    148. if(job_node == NULL)
    149. {
    150. perror("malloc job_node");
    151. return -2;
    152. }
    153. job_node->func = my_jobFunc;
    154. job_node->user_data = (void *)malloc(sizeof(int));
    155. memcpy(job_node->user_data, &i, sizeof(int));
    156. nThreadPoolPush(pool, job_node);
    157. }
    158. while(pool->jobs != NULL)
    159. {
    160. sleep(5);
    161. }
    162. nThreadPoolDestroy(pool);
    163. sleep(2);
    164. printf("-----------------Main thread Exit----------------\n");
    165. return 0;
    166. }
    167. #endif

    执行结果如下:

            

  • 相关阅读:
    Python Pandas数据处理作图——波尔共振实验
    [C++随笔录] vector模拟实现
    高数中值定理总结
    MySQL(五)增删改查进阶
    Redis高级及实战
    动态内存管理改造简易通讯录
    Linux操作系统之进程控制
    【luckfox】3、计算重量差
    VBA操作数据库
    [Spring Cloud] Open Feign 使用
  • 原文地址:https://blog.csdn.net/ddazz0621/article/details/126747102