• 【操作系统-线程池】Pthread 线程池的设计及实现


    Lab Week 15 实验报告

    实验内容:设计实现一个线程池 (Thread Pool)

    • 使用 Pthread API 管理线程。
    • 利用回调函数作为线程的启动函数。
    • 采用 pthread 信号量解决线程池分配的互斥问题。
    • 讨论上述方案的技术可行性,尝试写一个设计报告。
    • 在一个源代码文件内实现编码、完成编译、运行和用例测试。

    I.线程池

    线程池的提出,主要是为了解决

    (1)频繁地为某一任务创建和销毁线程所引起的系统资源耗费问题;

    (2)无限制地为并发请求创建线程且系统对线程数量没有限制而引起的系统资源耗费问题;

    线程池的主要思想是:在进程一开始启动时即创建一定数量空闲的线程,加入到线程池中等待工作。当任务传给线程池的时候,线程池会唤醒一个线程来执行这个任务,一旦线程完成任务,就会回到线程池中等待新的任务到来,如果线程池中没有空闲的线程,那么会等到有空闲线程为止。

    线程池具有以下优点:

    1. 用现有线程服务请求比等待创建一个线程更快。
    2. 线程池限制了任何时候可用线程的数量。这对那些不能支持大量并发线程的系统非常重要。
    3. 将要执行任务从创建任务的机制中分离出来,允许我们采用不同策略运行任务。例如,任务可以被安排在某一个时间延迟后执行,或定期执行。

    II.线程池设计

    A. 线程池组成

    (1)线程管理函数(ThreadPool):用于创建线程池、销毁线程池、添加新任务;

    (2)任务队列(TaskQueue):用于管理任务的队列(FIFO),将新的任务加到队列尾部,即将执行的任务在队列头部;

    (3)工作线程(Workers):线程池中的线程,在任务到来时分配给工作线程,在没有任务时等待;

    注意:在任务队列的部分,需要保证一个任务只能被一个线程取走,所以需要引入pthread 信号量以解决线程池分配的互斥问题

    image-20220525153358375

    B. 设计思路:

    (1)在主函数中调用线程池初始化函数,同时线程池中的线程执行线程函数,随后将任务添加到任务队列中;

    (2)在线程函数中,每个线程对当前任务队列的状态进行判断,若不空,则取出任务执行,若为空则等待;

    (3)待任务添加完且线程执行完所有任务后,对线程进行回收并销毁线程池及其他动态创建的堆栈和信号量

    (4)程序结束;

    C.实现代码:

    代码模块:

    以下对线程池的实现根据不同功能相对应的模块进行说明:

    任务结构和线程池结构

    typedef struct
    {
        void*(*func)(void*);
        void*args;
    }threadpool_task;
    
    struct threadpool
    {
        threadpool_task* task_queue; // 任务队列
        sem_t unnamded_sem; //匿名信号量
        int front;
        int rear;
        int queue_size; //队列大小
        int thread_num;
        int count; //等待任务数
        pthread_t* ptid; //线程tid表
        int tasknum; //总的任务数
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    代码说明:

    这部分定义了线程需要完成的任务和线程池的数据结构;

    在任务结构中,定义了该任务对应的回调函数和相应的参数;

    在线程池结构中,定义了任务队列和相应的控制变量、与任务相关的变量、解决线程池分配的互斥问题的匿名信号量;

    线程池初始化函数

    threadpool* threadpool_create(int thread_num, int queue_size, int task_num){
      /*
      	param: thread_num 线程数
      	param: queue_size 队列大小
      	param: task_num 任务数
      
      */
        int ret, i;
        if(thread_num > MAX_THREADS || queue_size > MAX_QUEUE){
            return NULL;
        }
    
        threadpool* pool;
        if((pool = (threadpool*)malloc(sizeof(threadpool))) == NULL){
            perror("threadpool:malloc()");
        }
    
        //Initialize thread pool
        pool->front = pool->rear = 0;
        pool->queue_size = queue_size;
        pool->thread_num = 0; 
        pool->tasknum = task_num;
      
        // Allocate thread and task queue
        pool->task_queue = (threadpool_task*)malloc(sizeof(threadpool_task)*queue_size);
        pool->ptid = (pthread_t*) malloc(sizeof(pthread_t)*thread_num);
    
        //Initialize unnamed semaphore
        ret = sem_init(&pool->unnamded_sem, 0, 1);
        if(ret == -1) {
            perror("sem_init()");
        }
        
        //Create threads
        for(i = 0; i<thread_num; i++){
            ret = pthread_create(&(pool->ptid[i]), NULL, &threadpool_exec, (void*)pool);
            if(ret != 0) {
                perror("producer pthread_create()");
                break;
            }
            pool->thread_num++; // 进入线程池的线程数加1
        }
         
        return pool;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    代码说明:

    这部分对线程池进行初始化,其中包括:

    • 用malloc进行动态分配:线程池,任务队列、线程号表;

    • 对匿名信号量进行初始化,sem_init(&pool->unnamded_sem, 0, 1) ,将信号量初始化为1,并限制在当前进程使用;

    • pthread_create(&(pool->ptid[i]), NULL, &threadpool_exec, (void*)pool) 创建线程,并调用函数threadpool_exec() ,传递的参数为线程池指针;

    任务添加函数

    void threadpool_add(threadpool * pool, void*(*func)(void*), void*args){
        /*
      	param: pool 线程池
      	param: func 任务函数
      	param: args 函数参数
      
      */
        while((pool->rear + 1) % pool->queue_size == pool->front){ // 当任务队列满的时候
            printf("Task queue is full, task No.%d waitting for handling\n", *(int*)args);
            sleep(1);
        }
    
        pool->task_queue[pool->rear].func = func;
        pool->task_queue[pool->rear].args = args;
        pool->rear = (pool->rear + 1) % pool->queue_size;
        pool->count += 1; //任务队列中的任务数加1
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    代码说明:

    这部分主要负责向任务队列中添加函数,通过该函数传进来的任务函数和相应的参数,对线程池中的任务队列添加任务,并对控制变量进行修改;

    线程执行函数

    static void *threadpool_exec(void *arg){
        threadpool* pool = (threadpool*)arg;
        threadpool_task task;
        while(1){
            while(pool->front == pool->rear){ //当队列为空时
                if(!pool->tasknum){ // 如果所有任务都已经处理完毕,则退出线程
                    printf("Thread %ld exits\n", gettid());
                    pthread_exit(NULL);
                }
                printf("Thread %ld is waitting for new task\n", gettid()); //否则打印出线程等待的信息
                sleep(1);
            }
            if(pool->tasknum == 0){
                break;
            }
            //利用信号量保证一个线程只能处理一个任务
            sem_wait(&pool->unnamded_sem); //获得信号量
    
            //线程获得任务
            task.func = pool->task_queue[pool->front].func;
            task.args = pool->task_queue[pool->front].args;
            pool->front = (pool->front + 1) % pool->queue_size; 
            pool->count -= 1; //任务队列中的任务数减1
            pool->tasknum -= 1; // 总任务数减1
            sem_post(&pool->unnamded_sem); //释放信号量
    
            // Go to Work!!
            (*(task.func))(task.args);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    代码说明:

    这部分代码为线程调用的函数,其中包括对线程池分配的互斥问题、线程获取任务、等待任务的处理;

    • 当队列为空时,如果所有任务执行完毕,则让线程退出,反之则打印出等待信息

    • 为了让一个线程只处理一个任务,这里用了Pthread信号量来解决线程池分配的互斥问题,通过 sem_wait(&pool->unnamded_sem)来获取信号量,随后从任务队列中获取任务,修改控制变量后,执行sem_post(&pool->unnamded_sem) 释放信号量,之后,再利用

      (*(task.func))(task.args)来调用任务函数;

    任务函数

    void* taskfun(void* arg){
        int* temp = (int*) arg;
        int task_sn = *temp;
        printf("Thread %ld is working on task No.%d\n", gettid(), task_sn);
        printf("---- Thread %ld working last for 2s -----\n", gettid());
        sleep(2);
        printf("\t\t  Task No.%d finished\n", task_sn);
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    代码说明:

    这部分为任务具体需要实现的内容,在这里,打印出线程号和它所处理的任务的序列号(以函数形参方式传入),随后sleep 2s,这样能够模拟线程在调用该任务函数时处理的过程,最后打印出结束标志;

    线程池销毁函数

    void threadpool_destroy(threadpool* pool){
        free(pool->task_queue);
        free(pool->ptid);
        sem_destroy(&pool->unnamded_sem);
        free(pool);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    代码说明:

    这部分对动态创建的线程池、线程号表、线程池还有匿名信号量进行销毁。

    主函数

    int main(){
        int i, ret;
        int thread_num, queue_size, task_num;
        thread_num = MAX_THREADS;
        queue_size = MAX_QUEUE;
        task_num = MAX_TASKS;
        printf("------Thread pool parameters-------\n");
        printf("Number of thread: %d, Size of task queue: %d, Number of task: %d\n", thread_num, queue_size, task_num);
        sleep(1);
        
        threadpool* pool = threadpool_create(thread_num, queue_size, task_num);
        if(pool == NULL){
            perror("threadpool_create()");
        }
        sleep(2);  //进入休眠,同时测试线程是否会等待任务队列添加任务
    
        for(i=0; i < task_num; ++i){
            printf("Adding task No.%d\n",i);
            threadpool_add(pool, &taskfun, &i); //添加任务
            sleep(1);
        }
    
    
        for(int i=0;i<thread_num;i++){
            pthread_join(pool->ptid[i], NULL);
        }
        printf("Tasks Completed, starting destroy\n");
        threadpool_destroy(pool);
        printf("\n\tExit Successfully\n");
        exit(EXIT_SUCCESS);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    代码说明:

    这部分为代码的主函数部分,在一开始调用threadpool_create(thread_num, queue_size, task_num)对线程池进行初始化,创建相应大小的任务队列、线程数和任务数,此后休眠2s,这里为了测试线程是否会等待任务队列添加任务;

    然后通过循环调用 threadpool_add(pool, &taskfun, &i)来向任务队列中添加任务,每隔1s添加一次任务;

    完整代码:

    #include<pthread.h>
    #include<stdio.h>
    #include<stdlib.h>
    #include<sys/syscall.h>
    #include<semaphore.h>
    #include<unistd.h>
    
    #define gettid() syscall(__NR_gettid)
    #define MAX_THREADS 10
    #define MAX_QUEUE 12
    #define MAX_TASKS 20
    typedef struct threadpool threadpool;
    threadpool * threadpool_create(int thread_num, int queue_size, int task_num);
    void threadpool_add(threadpool * pool, void*(*func)(void*), void*args);
    static void *threadpool_exec(void *threadpool);
    
    
    typedef struct
    {
        void*(*func)(void*);
        void*args;
    }threadpool_task;
    
    struct threadpool
    {
        threadpool_task* task_queue;
        sem_t unnamded_sem;
        int front;
        int rear;
        int queue_size;
        int thread_num;
        int count; //等待任务数
        pthread_t* ptid;
        int tasknum; //总的任务数
    };
    
    threadpool* threadpool_create(int thread_num, int queue_size, int task_num){
        int ret, i;
        if(thread_num > MAX_THREADS || queue_size > MAX_QUEUE){
            return NULL;
        }
    
        threadpool* pool;
        if((pool = (threadpool*)malloc(sizeof(threadpool))) == NULL){
            perror("threadpool:malloc()");
        }
    
        //Initialize thread pool
        pool->front = pool->rear = 0;
        pool->queue_size = queue_size;
        pool->thread_num = 0;
        pool->tasknum = task_num;
        // Allocate thread and task queue
        pool->task_queue = (threadpool_task*)malloc(sizeof(threadpool_task)*queue_size);
        pool->ptid = (pthread_t*) malloc(sizeof(pthread_t)*thread_num);
    
        //Initialize unnamed semaphore
        ret = sem_init(&pool->unnamded_sem, 0, 1);
        if(ret == -1) {
            perror("sem_init()");
        }
        
        //Create threads
        for(i = 0; i<thread_num; i++){
            ret = pthread_create(&(pool->ptid[i]), NULL, &threadpool_exec, (void*)pool);
            if(ret != 0) {
                perror("producer pthread_create()");
                break;
            }
            pool->thread_num++;
        }
         
        return pool;
    }
    
    static void *threadpool_exec(void *arg){
        threadpool* pool = (threadpool*)arg;
        threadpool_task task;
        while(1){
            // sem_wait(&pool->unnamded_sem);
            while(pool->front == pool->rear){
                if(!pool->tasknum){
                    printf("Thread %ld exits\n", gettid());
                    pthread_exit(NULL);
                }
                printf("Thread %ld is waitting for new task\n", gettid());
                sleep(1);
            }
            if(pool->tasknum == 0){
                break;
            }
            //利用信号量保证一个线程只能处理一个任务
            sem_wait(&pool->unnamded_sem);
    
            //线程获得任务
            task.func = pool->task_queue[pool->front].func;
            task.args = pool->task_queue[pool->front].args;
            pool->front = (pool->front + 1) % pool->queue_size;
            pool->count -= 1;
            pool->tasknum -= 1;
            sem_post(&pool->unnamded_sem);
    
            // Go to Work!!
            (*(task.func))(task.args);
        }
    }
    
    void* taskfun(void* arg){
        int* temp = (int*) arg;
        int task_sn = *temp;
        printf("Thread %ld is working on task No.%d\n", gettid(), task_sn);
        printf("---- Thread %ld working last for 2s -----\n", gettid());
        sleep(2);
        printf("\t\t  Task No.%d finished\n", task_sn);
        return 0;
    }
    
    void threadpool_add(threadpool * pool, void*(*func)(void*), void*args){
        while((pool->rear + 1) % pool->queue_size == pool->front){ // 当任务队列满的时候
            printf("Task queue is full, task No.%d waitting for handling\n", *(int*)args);
            sleep(1);
        }
    
        pool->task_queue[pool->rear].func = func;
        pool->task_queue[pool->rear].args = args;
        pool->rear = (pool->rear + 1) % pool->queue_size;
        pool->count += 1;
    
    }
    
    
    void threadpool_destroy(threadpool* pool){
        free(pool->task_queue);
        free(pool->ptid);
        sem_destroy(&pool->unnamded_sem);
    }
    
    int main(){
        int i, ret;
        int thread_num, queue_size, task_num;
        thread_num = MAX_THREADS;
        queue_size = MAX_QUEUE;
        task_num = MAX_TASKS;
        printf("------Thread pool parameters-------\n");
        printf("Number of thread: %d, Size of task queue: %d, Number of task: %d\n", thread_num, queue_size, task_num);
        sleep(1);
        
        threadpool* pool = threadpool_create(thread_num, queue_size, task_num);
        if(pool == NULL){
            perror("threadpool_create()");
        }
        sleep(2);  //进入休眠,同时测试线程是否会等待任务队列添加任务
    
        for(i=0; i < task_num; ++i){
            printf("Adding task No.%d\n",i);
            threadpool_add(pool, &taskfun, &i); //添加任务
            sleep(1);
        }
    
    
        for(int i=0;i<thread_num;i++){
            pthread_join(pool->ptid[i], NULL);
        }
        printf("Tasks Completed, starting destroy\n");
        threadpool_destroy(pool);
        printf("\n\tExit Successfully\n");
        exit(EXIT_SUCCESS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168

    D.实验结果:

    初始阶段:

    在线程池中初始化10个线程,任务队列的大小为12,需要完成的总任务数为20;

    image-20220525130537822

    在一开始,由于是先对线程池进行初始化,在sleep 2s之后才向任务队列中添加任务,红框的部分为线程池中的线程在等待任务时打印出来的信息,如:Thread 28389 is waitting for new task,表示线程号为28389的线程正在线程池中等待系统分配给它任务;

    工作阶段:

    此时开始向任务队列中添加任务,任务以序列号标识,No.0表示序列号为0的任务:

    image-20220525130630138

    在添加完任务No.0之后,线程号为28394的线程首先接手这个任务,完成这个任务需要费时2s,由于任务1s才添加一个,所以在这期间线程需要等待,随后任务No.1进入任务队列,线程号为28392的线程接手这个任务,之后任务No.0被线程28394完成,它又重新回到线程池中等待任务,随后任务No.2进入任务队列,线程号为28397的线程接手这个任务,任务No.1被线程28392完成;

    image-20220525130706962

    注意到,在添加完任务No.6之后,线程号为28394的线程接手这个任务,该线程在之前曾接手过任务No.0;任务No.5也相继完成。

    在添加完No.7的任务之后,线程号为28398的线程接手这个任务;

    image-20220525130735324

    在添加完任务No.17之后,线程号为28398的线程接手这个任务,该线程在之前曾接手过任务No.7;

    线程28398执行完任务No.19之后,所有任务执行完毕,所有线程开始退出,最后调用销毁函数,线程池销毁,程序结束。

  • 相关阅读:
    TDengine3.0 基础操作
    用go封装一下封禁功能
    JAVA操作Excel(POI、easyPOI、easyExcel)
    将kali系统放在U盘中&插入电脑直接进入kali系统
    uniapp在App端如何动态修改原生导航栏?
    GitHub访问慢解决办法
    工厂生产管理MES系统的数据来源,你知道是哪里吗?
    Loss损失函数
    【微软漏洞分析】MS15-023 Win32k 特权提升漏洞 - CVE-2015-0078 + 绕过(CVE-2015-2527 in MS15-097)
    SQL查询面试题,会这些基本够用了
  • 原文地址:https://blog.csdn.net/m0_52387305/article/details/124967321