线程池就是首先创建一些线程,它们的集合称为线程池。
使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,这笔开销将是不可忽略的。
目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。
经典生产者消费者模型
线程池中有若干等待的线程
线程池中的线程用于执行大量的相对短暂的任务


具体唤醒哪个线程使用内核决定的,因为每个线程的优先级不一样以及内核的调度策略不一样
typedef struct{
pthread_mutex_t pool_lock; //互斥锁
pthread_cond_t cond; //条件变量
worker *queuehead; //队列头 利用了队列先进先出的特点
int cur_queue_cont; //当前队列中成员个数
pthread_t *pthread_num; //线程号
int max_thread; //创建线程的个数
int shutdown; //线程停止标准
}pool;
typedef struct work{
void *(*process)(void* arg); //执行任务的函数指针
void *arg; //传递的参数
struct work *next; //指向队列的下一个成员
}worker;
线程的同步是指线程的执行顺序已经提前规划好了,常用的实现线程同步的机制有无名信号量和条件变量。线程的同步机制常用来解决生产者和消费者模型。
线程是使用第三方库函数,在使用线程的函数的时候需要先安装线程函数的man手册
sudo apt-get install manpages-posix-dev
1.定义条件变量
pthread_cond_t cond;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2.初始化条件变量
int pthread_cond_init(pthread_cond_t * cond,const pthread_condattr_t * attr);
功能:初始化条件变量
参数:
@cond:条件变量的指针
@attr:NULL
返回值:成功返回0,失败返回错误码
3.获取条件变量
int pthread_cond_wait(pthread_cond_t * cond,pthread_mutex_t * mutex);
功能:阻塞等待条件
参数:
@cond:条件变量的指针
@mutex:互斥锁
返回值:成功返回0,失败返回错误码
所有流程:
1.先获取互斥锁
2.调用pthread_cond_wait
2.1将当前的线程放到队列中
2.2解锁
2.3在队列中休眠
2.4重新获取锁
2.5从队列中移除
3.执行你的代码
4.解锁
深度解析:
阻塞等待条件满足后被唤醒
如果条件不满足,释放互斥锁 pthread_mutex_unlock(&mutex)
当被唤醒时,解除阻塞,重新申请互斥锁 pthread_mutex_lock(&mutex)
4.释放条件变量
int pthread_cond_signal(pthread_cond_t *cond);
功能:唤醒条件变量上的一个线程
参数:
@cond:条件变量的指针
返回值:成功返回0,失败返回错误码
int pthread_cond_broadcast(pthread_cond_t *cond);
功能:唤醒条件变量上的所有线程
参数:
@cond:条件变量的指针
返回值:成功返回0,失败返回错误码
5.销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
功能:销毁条件变量
参数:
@cond:条件变量的指针
返回值:成功返回0,失败返回错误码
#include
1.定义无名信号量
sem_t sem;
2.初始化无名信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
功能:初始化无名信号量
参数:
@sem:无名信号量的指针
@pshared: 0:两个线程的同步
1:两个进程的同步
@value:信号量的初始值
如果value非0,表示线程可以获取信号量
如果value为0,表示线程获取不到信号量,将阻塞
返回值:成功返回0,失败返回-1置位错误码
3.获取信号量(P操作)
int sem_wait(sem_t *sem);
功能:获取信号量(如果获取不到就阻塞)
参数:
@sem:无名信号量的指针
返回值:成功返回0,如果失败信号量的值没有改变,失败返回-1置位错误码
4.释放信号量(V操作)
int sem_post(sem_t *sem)
功能:释放信号量
参数:
@sem:无名信号量的指针
返回值:成功返回0,如果失败信号量的值没有改变,失败返回-1置位错误码
5.销毁信号量
int sem_destroy(sem_t *sem);
功能:销毁信号量
参数:
@sem:无名信号量的指针
返回值:成功返回0,失败返回-1置位错误码
线程的互斥机制实现的时候是资源争抢的过程,当一个线程抢占资源成功之后,另外一个线程就回去不到资源(休眠等)
1.定义线程互斥锁的变量
pthread_mutex_t mutex;
2.初始化线程的互斥锁
//静态初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//动态初始化
int pthread_mutex_init(pthread_mutex_t * mutex,
const pthread_mutexattr_t * attr);
功能:初始化线程的互斥锁
参数:
@mutex:被初始化的锁
@attr:缺省属性,一般填写为NULL
返回值:成功返回0,失败返回错误码
3.上锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
功能:上锁(如果锁之前没有被获取过,获取锁成功,
如果锁之前被获取,阻塞等待锁的资源)
参数:
@mutex:互斥锁的变量的地址
返回值:成功返回0,失败返回错误码
int pthread_mutex_trylock(pthread_mutex_t *mutex);
功能:尝试上锁(如果锁之前没有被获取过,获取锁成功,
如果锁之前被获取,这个函数立即返回,表示获取锁失败)
参数:
@mutex:互斥锁的变量的地址
返回值:成功返回0,失败返回错误码
4.解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
功能:解锁
参数:
@mutex:互斥锁的变量的地址
返回值:成功返回0,失败返回错误码
5.销毁锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
功能:销毁互斥锁
参数:
@mutex:互斥锁变量的地址
返回值:成功返回0,失败返回错误码
无名信号量适合在线程数比较少的线程中实现同步过程,而条件变量适合在大量线程实现同步过程
方便看到现象,代码中加入了sleep
#include
#include
#include
#include
typedef struct work{
void * (*process)(void * arg);
void * args;
struct work * next;
}worker;
typedef struct{
pthread_mutex_t pool_lock;
pthread_cond_t cond;
worker * queuehead;
int cur_queue_cont;
pthread_t * pthread_num;
int max_thread;
int shutdown;
}pool;
pool *mypool;
void * thread_routine(void *args)
{
pthread_t *tid = (pthread_t *)args;
printf("start thread id = %ld.\n",*tid);
while(1){
pthread_mutex_lock(&mypool->pool_lock);
if(mypool->cur_queue_cont == 0 && !mypool->shutdown){
pthread_cond_wait(&mypool->cond,&mypool->pool_lock);
}
if(mypool->shutdown){
pthread_mutex_unlock(&mypool->pool_lock);
pthread_exit(NULL);
}
mypool->cur_queue_cont --;
worker * work = mypool->queuehead;
mypool->queuehead = work->next;
pthread_mutex_unlock(&mypool->pool_lock);
work->process((void *)work->args);
}
}
void * invok_process(void * arg)
{
sleep(2);
printf("this is thread invok %d affair.\n",*(int *)arg);
}
void create_poll(int num)
{
int i,ret;
mypool = (pool *)malloc(sizeof(*mypool));
if(mypool == NULL){
printf("malloc pool memory is fail.\n");
return ;
}
pthread_mutex_init(&mypool->pool_lock,NULL);
pthread_cond_init(&mypool->cond,NULL);
mypool->queuehead = NULL;
mypool->shutdown = 0;
mypool->max_thread = num;
mypool->pthread_num = (pthread_t *)malloc(num*sizeof(pthread_t));
for(i=0; i<num; i++)
{
ret = pthread_create(&(mypool->pthread_num[i]),NULL,thread_routine,(void *)&(mypool->pthread_num[i]));
if(ret != 0)
{
printf("create thread is fail.\n");
}
}
}
void add_work(void * (*process)(void *),void *args)
{
worker *work = (worker *)malloc(sizeof(worker));
work->process = process;
work->args = args;
work->next = NULL;
pthread_mutex_lock(&mypool->pool_lock);
worker * pwork = mypool->queuehead;
if(pwork != NULL){
while(pwork->next != NULL)
pwork = pwork->next;
pwork->next = work;
}else{
mypool->queuehead = work;
}
mypool->cur_queue_cont ++;
pthread_mutex_unlock(&mypool->pool_lock);
pthread_cond_signal(&mypool->cond);
}
void destory_pool()
{
int i;
mypool->shutdown = 1;
pthread_cond_broadcast(&mypool->cond);
for(i=0; i<mypool->max_thread; i++){
pthread_join(mypool->pthread_num[i],NULL);
}
free(mypool->pthread_num);
worker * tmp;
while(mypool->queuehead){
tmp = mypool->queuehead;
mypool->queuehead = mypool->queuehead->next;
free(tmp);
}
pthread_mutex_destroy(&mypool->pool_lock);
pthread_cond_destroy(&mypool->cond);
}
int main(int argc, const char *argv[])
{
int i,num[10];
create_poll(5);
sleep(2);
for(i=0; i<10; i++){
num[i] = i;
add_work(invok_process,(void *)&num[i]);
}
sleep(3);
destory_pool();
free(mypool);
printf("free resource is ok.\n");
return 0;
}