• linux C语言 手写线程池


    main.cpp

    1. #include
    2. #include "threadpool.h"
    3. #include
    4. #include
    5. #include
    6. void taskFunc(void* arg)
    7. {
    8. int num = *((int*)arg);
    9. printf("thread %ld is working, number = %d\n", pthread_self(), num);
    10. sleep(1);
    11. }
    12. int main()
    13. {
    14. printf("%s 向你问好!\n", "threadpool");
    15. // 创建线程池
    16. ThreadPool* pool = threadPoolCreate(3, 10, 100);
    17. for (int i = 0; i < 100; i++)
    18. {
    19. int* num = (int*)malloc(sizeof(int));
    20. *num = i + 100;
    21. threadPoolAdd(pool, taskFunc, num);
    22. }
    23. sleep(30);
    24. threadPoolDestory(pool);
    25. return 0;
    26. }

    threadpool.h

    1. #pragma once
    2. #include
    3. class threadpool
    4. {
    5. };
    6. typedef struct ThreadPool ThreadPool;
    7. // 创建线程池并初始化
    8. ThreadPool* threadPoolCreate(int min, int max, int queueSize);
    9. // 销毁线程池
    10. int threadPoolDestory(ThreadPool* pool);
    11. // 给线程池添加任务
    12. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
    13. // 获取线程池中工作的线程个数
    14. int threadPoolBusyNum(ThreadPool* pool);
    15. // 获取线程池中活着的线程的个数
    16. int threadPoolAliveNum(ThreadPool* pool);
    17. void* worker(void* arg);
    18. void* manager(void* arg);
    19. void threadExit(ThreadPool* pool);

     threadpool.cpp

     

    1. #include "threadpool.h"
    2. #include
    3. #include
    4. #include
    5. #include
    6. const int NUMBER = 2;
    7. // 任务结构体
    8. typedef struct Task
    9. {
    10. void (*function)(void* arg);
    11. void* arg;
    12. }Task;
    13. // 线程池结构体
    14. struct ThreadPool
    15. {
    16. // 任务队列
    17. Task* taskQ;
    18. int queueCapacity; // 队列容量
    19. int queueSize; // 当前任务个数
    20. int queueFront; // 队头 -- 取数据
    21. int queueRear; // 队尾 -- 放数据
    22. pthread_t managerId; // 管理者线程ID
    23. pthread_t* threadIDs; // 工作的线程ID
    24. int minNum; // 最小线程数
    25. int maxNum; // 最大线程数
    26. int busyNum;// 忙碌线程数
    27. int liveNum;// 存活线程数
    28. int exitNum;// 要销毁线程个数
    29. pthread_mutex_t mutexPool; // 锁整个线程池
    30. pthread_mutex_t mutexBusy; // 锁busyNum变量
    31. pthread_cond_t notFull; // 任务队列是不是满了
    32. pthread_cond_t notEmpty; // 任务队列是不是空了
    33. int shutdown; // 是不是要销毁线程, 销毁为1, 不销毁为0
    34. };
    35. ThreadPool* threadPoolCreate(int min, int max, int queueSize)
    36. {
    37. ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    38. do
    39. {
    40. if (pool == NULL)
    41. {
    42. printf("malloc threadpool fail .. \n");
    43. break;
    44. }
    45. pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
    46. if (pool->threadIDs == NULL)
    47. {
    48. printf("malloc threadIDs fail .. \n");
    49. break;
    50. }
    51. memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
    52. pool->minNum = min;
    53. pool->maxNum = max;
    54. pool->busyNum = 0;
    55. pool->liveNum = min; // 和最小个数相等
    56. pool->exitNum = 0;
    57. if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
    58. pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
    59. pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
    60. pthread_cond_init(&pool->notFull, NULL) != 0)
    61. {
    62. printf("mutex init fail ... \n");
    63. break;
    64. }
    65. pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
    66. pool->queueCapacity = queueSize;
    67. pool->queueSize = 0;
    68. pool->queueFront = 0;
    69. pool->queueRear = 0;
    70. pool->shutdown = 0;
    71. // 创建线程
    72. pthread_create(&pool->managerId, NULL, manager, pool);
    73. for (int i = 0; i < min; i++)
    74. {
    75. pthread_create(&pool->threadIDs[i], NULL, worker, pool);
    76. }
    77. return pool;
    78. } while (0);
    79. // 释放资源
    80. if (pool && pool->threadIDs)
    81. {
    82. free(pool->threadIDs);
    83. }
    84. if (pool && pool->taskQ)
    85. {
    86. free(pool->taskQ);
    87. }
    88. if (pool)
    89. {
    90. free(pool);
    91. }
    92. return NULL;
    93. }
    94. int threadPoolDestory(ThreadPool* pool)
    95. {
    96. if (pool == NULL)
    97. {
    98. return -1;
    99. }
    100. // 关掉线程池
    101. pool->shutdown = 1;
    102. // 阻塞回收管理者线程
    103. // 唤醒阻塞的消费者线程
    104. for (int i = 0; i < pool->liveNum; i++)
    105. {
    106. pthread_cond_signal(&pool->notEmpty);
    107. }
    108. pthread_join(pool->managerId, NULL);
    109. // 释放堆内存
    110. if (pool->taskQ)
    111. {
    112. free(pool->taskQ);
    113. }
    114. if (pool->threadIDs)
    115. {
    116. free(pool->threadIDs);
    117. }
    118. pthread_mutex_destroy(&pool->mutexPool);
    119. pthread_mutex_destroy(&pool->mutexBusy);
    120. pthread_cond_destroy(&pool->notEmpty);
    121. pthread_cond_destroy(&pool->notFull);
    122. free(pool);
    123. pool = NULL;
    124. return 0;
    125. }
    126. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
    127. {
    128. pthread_mutex_lock(&pool->mutexPool);
    129. while (pool->queueSize==pool->queueCapacity && !pool->shutdown)
    130. {
    131. // 阻塞生产者线程
    132. pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    133. }
    134. if (pool->shutdown)
    135. {
    136. pthread_mutex_unlock(&pool->mutexPool);
    137. return;
    138. }
    139. // 添加任务
    140. pool->taskQ[pool->queueRear].function = func;
    141. pool->taskQ[pool->queueRear].arg = arg;
    142. pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    143. pool->queueSize++;
    144. pthread_cond_signal(&pool->notEmpty);
    145. pthread_mutex_unlock(&pool->mutexPool);
    146. }
    147. int threadPoolBusyNum(ThreadPool* pool)
    148. {
    149. pthread_mutex_lock(&pool->mutexBusy);
    150. int busyNum = pool->busyNum;
    151. pthread_mutex_unlock(&pool->mutexBusy);
    152. return busyNum;
    153. }
    154. int threadPoolAliveNum(ThreadPool* pool)
    155. {
    156. pthread_mutex_lock(&pool->mutexPool);
    157. int aliveNum = pool->liveNum;
    158. pthread_mutex_unlock(&pool->mutexPool);
    159. return aliveNum;
    160. }
    161. void* worker(void* arg)
    162. {
    163. ThreadPool* pool = (ThreadPool*)arg;
    164. while (1)
    165. {
    166. pthread_mutex_lock(&pool->mutexPool);
    167. // 当前任务队列是否为空
    168. while (pool->queueSize == 0 && !pool->shutdown)
    169. {
    170. // 阻塞工作线程
    171. pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
    172. // 判断是不是要销毁线程
    173. if (pool->exitNum > 0)
    174. {
    175. pool->exitNum--;
    176. if (pool->liveNum > pool->minNum)
    177. {
    178. pool->liveNum--;
    179. pthread_mutex_unlock(&pool->mutexPool);
    180. threadExit(pool);
    181. }
    182. }
    183. }
    184. // 判断线程池是否被关闭了
    185. if (pool->shutdown)
    186. {
    187. pthread_mutex_unlock(&pool->mutexPool);
    188. threadExit(pool);
    189. }
    190. // 从任务队列中取出一个任务
    191. Task task;
    192. task.function = pool->taskQ[pool->queueFront].function;
    193. task.arg = pool->taskQ[pool->queueFront].arg;
    194. // 移动头节点
    195. pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
    196. pool->queueSize--;
    197. // 解锁
    198. pthread_cond_signal(&pool->notFull);
    199. pthread_mutex_unlock(&pool->mutexPool);
    200. printf("thread %ld start working ... \n", pthread_self());
    201. pthread_mutex_lock(&pool->mutexBusy);
    202. pool->busyNum++;
    203. pthread_mutex_unlock(&pool->mutexBusy);
    204. task.function(task.arg);
    205. free(task.arg);
    206. task.arg = NULL;
    207. // 结束工作
    208. printf("thread %ld end working ... \n", pthread_self());
    209. pthread_mutex_lock(&pool->mutexBusy);
    210. pool->busyNum--;
    211. pthread_mutex_unlock(&pool->mutexBusy);
    212. }
    213. return NULL;
    214. }
    215. void* manager(void* arg)
    216. {
    217. ThreadPool* pool = (ThreadPool*)arg;
    218. while (!pool->shutdown)
    219. {
    220. // 每隔3s检测一次
    221. sleep(3);
    222. // 取出线程池中任务的数量和当前线程的数量
    223. pthread_mutex_lock(&pool->mutexPool);
    224. int queueSize = pool->queueSize;
    225. int liveNum = pool->liveNum;
    226. pthread_mutex_unlock(&pool->mutexPool);
    227. // 取出忙的线程的数量
    228. pthread_mutex_lock(&pool->mutexBusy);
    229. int busyNum = pool->busyNum;
    230. pthread_mutex_unlock(&pool->mutexBusy);
    231. // 添加线程
    232. // 当前任务的个数>穿暖和的线程个数 && 存活的线程数<最大线程数
    233. if (queueSize > liveNum && liveNum < pool->maxNum)
    234. {
    235. pthread_mutex_lock(&pool->mutexPool);
    236. int counter = 0;
    237. for (int i = 0; i < pool->maxNum && counterliveNummaxNum; i++)
    238. {
    239. if (pool->threadIDs[i] == 0) {
    240. pthread_create(&pool->threadIDs[i], NULL, worker, pool);
    241. counter++;
    242. pool->liveNum++;
    243. }
    244. }
    245. pthread_mutex_unlock(&pool->mutexPool);
    246. }
    247. // 销毁线程
    248. // 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数
    249. if (busyNum*2 < liveNum && liveNum > pool->minNum)
    250. {
    251. pthread_mutex_lock(&pool->mutexPool);
    252. pool->exitNum = NUMBER;
    253. pthread_mutex_unlock(&pool->mutexPool);
    254. // 让工作的线程自杀
    255. for (int i = 0; i < NUMBER; i++)
    256. {
    257. pthread_cond_signal(&pool->notEmpty);
    258. }
    259. }
    260. }
    261. return nullptr;
    262. }
    263. void threadExit(ThreadPool* pool)
    264. {
    265. pthread_t tid = pthread_self();
    266. for (int i = 0; i < pool->maxNum; i++)
    267. {
    268. if (pool->threadIDs[i] == tid)
    269. {
    270. pool->threadIDs[i] = 0;
    271. printf("threadExit called, %ld exiting ...\n", tid);
    272. break;
    273. }
    274. }
    275. pthread_exit(NULL);
    276. }

  • 相关阅读:
    浅谈如何更好的进行需求评审
    网络安全(黑客)自学
    Chrome使用本地修改过的js替换原js内容
    linux c++ 开发 - 05- 使用CMake创建一个动态库
    VMware Ubuntu 关闭自动更新
    【无标题】
    web前端期末大作业:美食文化网页设计与实现——美食餐厅三级(HTML+CSS+JavaScript)
    决策树 #数据挖掘 #Python
    记一个带批注、表头样式的导入导出excel方法(基于easyexcel)
    Linux常用基础命令三
  • 原文地址:https://blog.csdn.net/sono_io/article/details/133951746