写在前面:本文以Linux2.6.0的内核源码进行讲解,使用x86 32位机讲解。
讲多路复用的原理,那么一定先要讲没有多路复用的弊端。传统的阻塞式,进程一旦io读写就开始阻塞,效率太低,导致整个系统的吞吐量急剧下降。那么,就有人要思考,居然要阻塞,那么我就使用线程去阻塞,那么一个系统的io操作如此之多,要多少个线程才能满足呢?线程是不是会占用资源?当线程多起来后分时复用的CPU需要一直切换上下文,切换上下文是不是在浪费CPU资源呢,会导致大量的时间在切换上下文?
所以就引入了多路复用,把需要io读写的操作叫做一个事件(多个事件,所以叫多路),把这些io事件维护在一个队列中,并且每个事件都绑定上一个回调函数(回调函数负责唤醒当前阻塞的进程),而这些事件的绑定回调函数和维护队列的操作都是交给一个进程来做(所以叫复用)。当底层网卡没有触发读写事件的时候,这个进程就去阻塞等待(让出CPU的使用权),当底层网卡建立好连接后会发出中断信号,此时中断处理函数就会回调事件对应的回调函数,而回调函数就是让进程唤醒,唤醒后,进程会去遍历所有事件,得到那个准备好了的事件,最终把所有事件返回给用户态(所以用户态还需要遍历才能得到准备好的事件)。
考虑到,文字描述比较难懂,特意画了一张流程图帮助读者来理解(尽力了...)
此多路复用只是内核中select的流程,poll和epoll的流程略有区别,poll和epoll后续会有文章来细讲。
如何从用户态进入内核态,执行系统调用的流程,这里就不进行讲解,那么直接讲解sys_select的方法。
其次,在Linux内核中,VFS万物皆文件的思想,一言以蔽之:不管是普通文件,网络、外设设备等等都是抽象成一个文件,而文件在内核中的实现file结构体返回给用户态不安全,所以都是返回的fd下标给用户态。
- // n是事件数量
- // inp是输入事件的位图
- // outp是输出事件的位图
- // exp是不感兴趣事件的位图
- // tvp是计时器
- asmlinkage long
- sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
- {
- fd_set_bits fds;
- char *bits;
- long timeout;
- int ret, size, max_fdset;
-
- timeout = MAX_SCHEDULE_TIMEOUT;
- if (tvp) {
- time_t sec, usec;
-
- if ((ret = verify_area(VERIFY_READ, tvp, sizeof(*tvp)))
- || (ret = __get_user(sec, &tvp->tv_sec))
- || (ret = __get_user(usec, &tvp->tv_usec)))
- goto out_nofds;
-
- ret = -EINVAL;
- if (sec < 0 || usec < 0)
- goto out_nofds;
-
- if ((unsigned long) sec < MAX_SELECT_SECONDS) {
- timeout = ROUND_UP(usec, 1000000/HZ);
- timeout += sec * (unsigned long) HZ;
- }
- }
-
- ret = -EINVAL;
- if (n < 0)
- goto out_nofds;
-
- /* max_fdset can increase, so grab it once to avoid race */
- max_fdset = current->·files->max_fdset;
- if (n > max_fdset)
- n = max_fdset;
-
- /*
- * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
- * since we used fdset we need to allocate memory in units of
- * long-words.
- */
- ret = -ENOMEM;
-
- // 根据n的数量,算出需要多少字节(也就是多少位)
- size = FDS_BYTES(n);
-
- // 开辟4 * 6的大小,用来存放fd_set_bits fds;
- bits = select_bits_alloc(size);
-
- if (!bits)
- goto out_nofds;
-
- // 推指针
- fds.in = (unsigned long *) bits;
- fds.out = (unsigned long *) (bits + size);
- fds.ex = (unsigned long *) (bits + 2*size);
- fds.res_in = (unsigned long *) (bits + 3*size);
- fds.res_out = (unsigned long *) (bits + 4*size);
- fds.res_ex = (unsigned long *) (bits + 5*size);
-
- // 把用户态的数据拷贝到内核态,因为用户态的数据不可信。
- if ((ret = get_fd_set(n, inp, fds.in)) ||
- (ret = get_fd_set(n, outp, fds.out)) ||
- (ret = get_fd_set(n, exp, fds.ex)))
- goto out;
-
- // 清空返回给用户态的位图
- zero_fd_set(n, fds.res_in);
- zero_fd_set(n, fds.res_out);
- zero_fd_set(n, fds.res_ex);
-
- // 调用do_select执行具体逻辑
- ret = do_select(n, &fds, &timeout);
-
- if (tvp && !(current->personality & STICKY_TIMEOUTS)) {
- time_t sec = 0, usec = 0;
- if (timeout) {
- sec = timeout / HZ;
- usec = timeout % HZ;
- usec *= (1000000/HZ);
- }
- put_user(sec, &tvp->tv_sec);
- put_user(usec, &tvp->tv_usec);
- }
-
- if (ret < 0)
- goto out;
- if (!ret) {
- ret = -ERESTARTNOHAND;
- if (signal_pending(current))
- goto out;
- ret = 0;
- }
-
- // 把内核态返回数据的内容拷贝到用户态.
- set_fd_set(n, inp, fds.res_in);
- set_fd_set(n, outp, fds.res_out);
- set_fd_set(n, exp, fds.res_ex);
-
- out:
- select_bits_free(bits, size);
- out_nofds:
- return ret;
- }
方法参数:
// n是事件数量
// inp是输入事件的位图
// outp是输出事件的位图
// exp是不感兴趣事件的位图(如果是不感兴趣的事件,就会被忽略)
// tvp是计时器fd_set __user:
fd的位图形式,用一个bit位来代表一个fd,这样非常非常节省空间。__user就代表是用户态传来的数据.
这段代码并不复杂,这里根据事件的数量,得出内核要开辟多少个位图大小来存放用户态传过来的数据,大部分的操作都是把用户态的内容拷贝到内核态,再把内核态的数据拷贝到用户态。(为什么要把用户态的数据给拷贝到内核态呢?思考一个问题,这里是传入的指针,指针具体指向的数据在用户态,而用户态可能会修改数据,导致数据不安全,所以,对于内核态来说,用户态的数据不可信,所以就需要进行拷贝)
把用户态的数据进行拷贝以后,进入到do_select方法中进行具体的处理。
- int do_select(int n, fd_set_bits *fds, long *timeout)
- {
- struct poll_wqueues table;
- poll_table *wait;
- int retval, i;
- long __timeout = *timeout;
-
- spin_lock(¤t->files->file_lock);
- retval = max_select_fd(n, fds);
- spin_unlock(¤t->files->file_lock);
-
- if (retval < 0)
- return retval;
- n = retval;
-
- poll_initwait(&table);
- wait = &table.pt;
- if (!__timeout)
- wait = NULL;
- retval = 0;
- for (;;) {
- unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
-
- set_current_state(TASK_INTERRUPTIBLE);
-
- inp = fds->in; outp = fds->out; exp = fds->ex;
- rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
-
- for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
- unsigned long in, out, ex, all_bits, bit = 1, mask, j;
- unsigned long res_in = 0, res_out = 0, res_ex = 0;
- struct file_operations *f_op = NULL;
- struct file *file = NULL;
-
- // 这里是先++ 再*
- // 取数组的下一位。
- in = *inp++; out = *outp++; ex = *exp++;
-
- // 二进制组合,最后还是为0,就代表这三个值都为0
- all_bits = in | out | ex;
- if (all_bits == 0) {
- i += __NFDBITS;
- continue;
- }
-
- for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
-
- // 这个退出条件,代表所有的n都已经遍历完了。
- if (i >= n)
- break;
- if (!(bit & all_bits))
- continue;
-
- // 获取到文件
- file = fget(i);
-
- if (file) {
- f_op = file->f_op;
- mask = DEFAULT_POLLMASK;
- if (f_op && f_op->poll)
- // 这里面会把当前fd和当前current做绑定放入到一个队列中等待
- mask = (*f_op->poll)(file, retval ? NULL : wait);
- fput(file);
-
- // in 必须有bit这一位?
- if ((mask & POLLIN_SET) && (in & bit)) {
- res_in |= bit;
- retval++;
- }
-
- // out 必须有bit这一位?
- if ((mask & POLLOUT_SET) && (out & bit)) {
- res_out |= bit;
- retval++;
- }
-
- // ex 必须有bit这一位?
- if ((mask & POLLEX_SET) && (ex & bit)) {
- res_ex |= bit;
- retval++;
- }
- }
- }
- if (res_in)
- *rinp = res_in;
- if (res_out)
- *routp = res_out;
- if (res_ex)
- *rexp = res_ex;
- }
- // shit
- wait = NULL;
- if (retval || !__timeout || signal_pending(current))
- break;
- if(table.error) {
- retval = table.error;
- break;
- }
- __timeout = schedule_timeout(__timeout);
- }
- __set_current_state(TASK_RUNNING);
-
-
- poll_freewait(&table);
-
- /*
- * Up-to-date the caller timeout.
- */
- *timeout = __timeout;
- return retval;
- }
嗯,代码量确实挺多的,多层for循环嵌套,但是没关系,待笔者细细道来。
首先,我们得先要明白所有逻辑都在do_select方法中了,所以给事件挂回调的处理、队列的处理都会在这里。
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->error = 0;
pwq->table = NULL;
}函数指针的赋值,把__pollwait函数挂在了poll_wqueues结构体中poll_table pt属性中。
随之进入到for循环中。
第一层for循环:都把select称为轮训式多路复用,这里体现的淋漓尽致,因为死循环的机制+schedule_timeout(__timeout)切换上下文的方式(让出CPU的使用权,直到有中断唤醒)
第二层for循环和第三层for循环:就是来遍历位图,因为位图的一位代表一个fd,所以通过fd获取到file结构体。
拿到file结构体后,VFS虚拟文件+函数指针的魅力又体现出来了,因为通过if (f_op && f_op->poll) 就可以得知当前的文件系统是否实现了poll函数指针。因为对于poll函数指针而言,普通的文件系统是没有实现的,而socket套接字的文件系统是有实现的。而下文会仔细道来。而poll返回的mask标志位是可以判断是否有读写事件,存在读写事件的话,res_in |= bit就把返回的位图对应置位,并且retval++,后续就会break;退出所有循环,回到sys_select中。
所以看到mask = (*f_op->poll)(file, retval ? NULL : wait);具体的函数实现。



我们直接看到tcp的实现,但是此方法不过细讲,我们能明白他内部回调了之前poll_initwait方法挂上的__pollwait回调函数即可。
并且此方法,会返回do_select方法需要的mask标志位。也就是可以理解为,当tcp完成三次握手,建立上连接以后,会根据tcp协议分析出读写标志位(这里网络栈,不过细讲,能明白会建立连接后会得到do_select需要的标志位即可)。
所以看到__pollwait的实现。
- void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
- {
- // 骚操作获取到结构体的基址。
- struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
-
- //
- struct poll_table_page *table = p->table;
-
- // 队列不存在,或者内部元素为0.
- // 也就是初始化的操作
- if (!table || POLL_TABLE_FULL(table)) {
- struct poll_table_page *new_table;
-
- new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
- if (!new_table) {
- p->error = -ENOMEM;
- __set_current_state(TASK_RUNNING);
- return;
- }
- new_table->entry = new_table->entries;
- new_table->next = table;
- p->table = new_table;
- table = new_table;
- }
-
- /* Add a new entry */
- // 放入到队列。
- {
- struct poll_table_entry * entry = table->entry;
-
- // 意思是poll_table_entry是一个连续的空间,数组?
- table->entry = entry+1;
-
- // 原子性加引用。代表被使用了。
- get_file(filp);
-
- entry->filp = filp;
-
- entry->wait_address = wait_address;
-
- // 初始化wait_queue_t wait;
- // 把对应的进程结构体、回调钩子赋值
- init_waitqueue_entry(&entry->wait, current);
-
- // wait_address这个是sock维护的队列
- // &entry->wait这个是队列中的元素
- // 所以这是放入到队列中
- add_wait_queue(wait_address,&entry->wait);
- }
- }
此方法就是添加到等待队列的具体实现。上面是初始化的操作,下面是添加到等待队列的操作。
看到init_waitqueue_entry方法的实现.
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
q->flags = 0;
q->task = p;
// 继续放钩子
// 被唤醒的钩子。
q->func = default_wake_function;
}这里给等待队列中的元素又挂上了default_wake_function钩子回调函数。
看到add_wait_queue方法的实现.
// wait_queue_head_t *q 是当前tcp的等待队列。
// wait_queue_t * wait 是当前文件对应的等待队列的元素。
void add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)
{
unsigned long flags;wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}这里把当前文件对应的等待队列的元素链到tcp的等待队列,而等待队列的元素中的func函数指针是挂上了default_wake_function钩子。
等待底层网卡给CPU发出中断信号,CPU响应中断,中断处理方法中去处理等待队列。并且会回调default_wake_function钩子(这里的论证太底层,偏硬件,笔者能力有限,笔者只能给出论据,不能给出论证了....)
- int default_wake_function(wait_queue_t *curr, unsigned mode, int sync)
- {
- task_t *p = curr->task;
- return try_to_wake_up(p, mode, sync);
- }
这里就特别的简单了,把当前进程给唤醒。如何唤醒:把task_struct的state赋值为TASK_RUNNING,然后等待时钟中断的中断处理函数调度此进程。
然后当前进程苏醒过来以后,又回到了do_select方法的schedule_timeout(__timeout); 中,此时又开始了新的一轮轮询,而这轮轮询是通过poll函数指针获取到mask标志位(因为当前中断唤醒都是因为tcp建立了连接通过default_wake_function把进程给唤醒,所以已经有读写事件的产生)然后通过mask判断把对应的位图置位,然后返回给用户态。

最后,如果本帖对您有一定的帮助,希望能点赞+关注+收藏!您的支持是给我最大的动力,后续会一直更新各种框架的使用和框架的源码解读~!