• Linux开发之线程池详解


    Linux开发之线程池

    1.什么是线程池

    线程池就是首先创建一些线程,它们的集合称为线程池。

    使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

    2.为什么要引入线程池

    传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,这笔开销将是不可忽略的。

    目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。

    3.线程池的特点

    经典生产者消费者模型

    线程池中有若干等待的线程

    线程池中的线程用于执行大量的相对短暂的任务
    在这里插入图片描述

    4.线程池工作流程图

    在这里插入图片描述

    具体唤醒哪个线程使用内核决定的,因为每个线程的优先级不一样以及内核的调度策略不一样

    5.线程池对象

    typedef struct{
        pthread_mutex_t pool_lock;  //互斥锁
        pthread_cond_t cond;        //条件变量
        
        worker *queuehead;       //队列头  利用了队列先进先出的特点
        int cur_queue_cont;      //当前队列中成员个数
        pthread_t *pthread_num;  //线程号
        int max_thread;          //创建线程的个数
        int shutdown;            //线程停止标准
    }pool;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    typedef struct work{
        void *(*process)(void* arg); //执行任务的函数指针
        void *arg;                   //传递的参数
        struct work *next;           //指向队列的下一个成员
    }worker;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6.线程的同步机制

    线程的同步是指线程的执行顺序已经提前规划好了,常用的实现线程同步的机制有无名信号量和条件变量。线程的同步机制常用来解决生产者和消费者模型。

    线程是使用第三方库函数,在使用线程的函数的时候需要先安装线程函数的man手册

    sudo apt-get install manpages-posix-dev
    
    • 1

    6.1 条件变量相关API

    1.定义条件变量
        pthread_cond_t cond;
        pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 
    
    2.初始化条件变量
       int pthread_cond_init(pthread_cond_t * cond,const pthread_condattr_t * attr);
       功能:初始化条件变量
       参数:
           @cond:条件变量的指针
           @attr:NULL
       返回值:成功返回0,失败返回错误码
               
     3.获取条件变量
       int pthread_cond_wait(pthread_cond_t * cond,pthread_mutex_t * mutex);
       功能:阻塞等待条件
       参数:
           @cond:条件变量的指针
           @mutex:互斥锁
     返回值:成功返回0,失败返回错误码
      所有流程:
            1.先获取互斥锁
            2.调用pthread_cond_wait
             2.1将当前的线程放到队列中
             2.2解锁
             2.3在队列中休眠
             2.4重新获取锁
             2.5从队列中移除
            3.执行你的代码
            4.解锁
       深度解析:   
           阻塞等待条件满足后被唤醒
           如果条件不满足,释放互斥锁 pthread_mutex_unlock(&mutex)
           当被唤醒时,解除阻塞,重新申请互斥锁 pthread_mutex_lock(&mutex)  
               
       4.释放条件变量
         int pthread_cond_signal(pthread_cond_t *cond);
      功能:唤醒条件变量上的一个线程
         参数:
             @cond:条件变量的指针
         返回值:成功返回0,失败返回错误码
         int pthread_cond_broadcast(pthread_cond_t *cond);
      功能:唤醒条件变量上的所有线程
         参数:
             @cond:条件变量的指针
         返回值:成功返回0,失败返回错误码
                 
     5.销毁条件变量
        int pthread_cond_destroy(pthread_cond_t *cond);
     功能:销毁条件变量
        参数:
             @cond:条件变量的指针
         返回值:成功返回0,失败返回错误码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    6.2 无名信号量相关API

    #include 
    1.定义无名信号量
        sem_t sem;
    2.初始化无名信号量
       int sem_init(sem_t *sem, int pshared, unsigned int value);
       功能:初始化无名信号量
       参数:
           @sem:无名信号量的指针
           @pshared: 0:两个线程的同步
                   1:两个进程的同步
        @value:信号量的初始值
                如果value非0,表示线程可以获取信号量
                如果value为0,表示线程获取不到信号量,将阻塞
     返回值:成功返回0,失败返回-1置位错误码
    3.获取信号量(P操作)
         int sem_wait(sem_t *sem);
      功能:获取信号量(如果获取不到就阻塞)
         参数:
            @sem:无名信号量的指针
      返回值:成功返回0,如果失败信号量的值没有改变,失败返回-1置位错误码
    4.释放信号量(V操作)
          int sem_post(sem_t *sem)
          功能:释放信号量
          参数:
            @sem:无名信号量的指针
      返回值:成功返回0,如果失败信号量的值没有改变,失败返回-1置位错误码      
    5.销毁信号量
          int sem_destroy(sem_t *sem);
       功能:销毁信号量
          参数:
            @sem:无名信号量的指针
       返回值:成功返回0,失败返回-1置位错误码   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    7.线程的互斥机制

    线程的互斥机制实现的时候是资源争抢的过程,当一个线程抢占资源成功之后,另外一个线程就回去不到资源(休眠等)

    7.1互斥锁相关API

    1.定义线程互斥锁的变量
        pthread_mutex_t mutex;
     
    2.初始化线程的互斥锁
    //静态初始化
     pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    //动态初始化
    int pthread_mutex_init(pthread_mutex_t * mutex,
               const pthread_mutexattr_t * attr);
    功能:初始化线程的互斥锁
    参数:
        @mutex:被初始化的锁
     @attr:缺省属性,一般填写为NULL
    返回值:成功返回0,失败返回错误码
    
    3.上锁
    int pthread_mutex_lock(pthread_mutex_t *mutex);
    功能:上锁(如果锁之前没有被获取过,获取锁成功,
           如果锁之前被获取,阻塞等待锁的资源)
    参数:
        @mutex:互斥锁的变量的地址
    返回值:成功返回0,失败返回错误码
            
    int pthread_mutex_trylock(pthread_mutex_t *mutex);
    功能:尝试上锁(如果锁之前没有被获取过,获取锁成功,
           如果锁之前被获取,这个函数立即返回,表示获取锁失败)
    参数:
        @mutex:互斥锁的变量的地址
    返回值:成功返回0,失败返回错误码
            
    4.解锁
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    功能:解锁
    参数:
        @mutex:互斥锁的变量的地址
    返回值:成功返回0,失败返回错误码
        
    5.销毁锁
    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    功能:销毁互斥锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    8.条件变量与无名信号量的使用场景

    无名信号量适合在线程数比较少的线程中实现同步过程,而条件变量适合在大量线程实现同步过程

    9.线程池代码实例

    方便看到现象,代码中加入了sleep

    #include 
    #include 
    #include 
    #include 
    
    typedef struct work{
    	void * (*process)(void * arg);
    	void * args;
    	struct work * next;
    }worker;
    
    typedef struct{
    	pthread_mutex_t pool_lock;
    	pthread_cond_t cond;
    
    	worker * queuehead;
    	int cur_queue_cont;
    	pthread_t * pthread_num;
    	int max_thread;
    	int shutdown;
    }pool;
    
    pool *mypool;
    
    void * thread_routine(void *args)
    {
    	pthread_t *tid = (pthread_t *)args;
    	printf("start thread id = %ld.\n",*tid);
    	while(1){
    		pthread_mutex_lock(&mypool->pool_lock);
    		if(mypool->cur_queue_cont == 0 && !mypool->shutdown){
    			pthread_cond_wait(&mypool->cond,&mypool->pool_lock);
    		}
    		if(mypool->shutdown){
    			pthread_mutex_unlock(&mypool->pool_lock);
    			pthread_exit(NULL);
    		}
    
    		mypool->cur_queue_cont --;
    		worker * work = mypool->queuehead;
    		mypool->queuehead = work->next;
    		pthread_mutex_unlock(&mypool->pool_lock);
    		
    		work->process((void *)work->args);
    
    	}
    
    }
    void * invok_process(void * arg)
    {
    	sleep(2);
    	printf("this is thread invok %d affair.\n",*(int *)arg);
    }
    
    void create_poll(int num)
    {
    	int i,ret;
    	mypool = (pool *)malloc(sizeof(*mypool));
    	if(mypool == NULL){
    		printf("malloc pool memory is fail.\n");
    		return ;
    	}
    	pthread_mutex_init(&mypool->pool_lock,NULL);
    	pthread_cond_init(&mypool->cond,NULL);
    
    	mypool->queuehead = NULL;
    	mypool->shutdown = 0;
    	mypool->max_thread = num;
    
    	mypool->pthread_num = (pthread_t *)malloc(num*sizeof(pthread_t));
    
    	for(i=0; i<num; i++)
    	{
    		ret = pthread_create(&(mypool->pthread_num[i]),NULL,thread_routine,(void *)&(mypool->pthread_num[i]));
    		if(ret != 0)
    		{
    			printf("create thread is fail.\n");
    		}
    	}
    
    }
    
    void add_work(void * (*process)(void *),void *args)
    {
    	worker *work = (worker *)malloc(sizeof(worker));
    	work->process = process;
    	work->args = args;
    	work->next = NULL;
    	pthread_mutex_lock(&mypool->pool_lock);
    	worker * pwork = mypool->queuehead;
    	if(pwork != NULL){
    		while(pwork->next != NULL)
    			pwork = pwork->next;
    		pwork->next = work;
    	}else{
    		mypool->queuehead = work;
    	}
    	mypool->cur_queue_cont ++;
    	pthread_mutex_unlock(&mypool->pool_lock);
    	pthread_cond_signal(&mypool->cond);
    
    }
    
    void destory_pool()
    {
    	int i;
    
    	mypool->shutdown = 1;
    	
    	pthread_cond_broadcast(&mypool->cond);
    
    	for(i=0; i<mypool->max_thread; i++){
    		pthread_join(mypool->pthread_num[i],NULL);
    	}
    	free(mypool->pthread_num);
    	
    	worker * tmp;
    	while(mypool->queuehead){
    		tmp = mypool->queuehead;
    		mypool->queuehead = mypool->queuehead->next;
    		free(tmp);
    	}
    	pthread_mutex_destroy(&mypool->pool_lock);
    	pthread_cond_destroy(&mypool->cond);
    }
    
    int main(int argc, const char *argv[])
    {
    	int i,num[10];
    	
    	create_poll(5);
    
    	sleep(2);
    
    	
    	for(i=0; i<10; i++){
    		num[i] = i;
    		add_work(invok_process,(void *)&num[i]);
    	}
    
    	sleep(3);
    	
    	destory_pool();
    	
    	free(mypool);
    	printf("free resource is ok.\n");
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
  • 相关阅读:
    34、camunda架构
    【数据结构】图论(图的储存方式,图的遍历算法DFS和BFS、图的遍历算法的应用、图的连通性问题)
    SpringBoot 项目实战 ~ 9.数据缓存
    开源低代码平台,JeecgBoot v3.7.0 里程碑版本发布
    做一个答题pk小程序多少钱
    SpringSecurity源码学习三:认证
    智能手表上的音频(二):驱动
    探索前端开发新利器:MFSU
    【web前端】<meta>标签
    【05】Yarn
  • 原文地址:https://blog.csdn.net/l2020429/article/details/127719693