• 深入理解Linux网络笔记(三):内核和用户进程协作之epoll


    本文为《深入理解Linux网络》学习笔记,使用的Linux源码版本是3.10,网卡驱动默认采用的都是Intel的igb网卡驱动

    Linux源码在线阅读:https://elixir.bootlin.com/linux/v3.10/source

    2、内核是如何与用户进程协作的(二)

    3)、内核和用户进程协作之epoll

    IO多路复用:

    在IO多路复用模型中,会有一个内核线程不断去轮询多个socket的状态,只有当真正读写事件发生时,才真正调用实际的IO读写操作。因为在IO多路复用模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有读写事件进行时,才会使用IO资源,所以它大大减少了资源占用

    IO多路复用机制可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。这里的复用指的是对进程的复用

    在Linux上多路复用方案有select、poll、epoll。它们三个中的epoll的性能表现是最优秀的,能支持的并发量也最大

    epoll的简单示例如下:

    int main() {
        listen(lfd, ...);
        cfd1 = accept(...);
        cfd2 = accept(...);
        efd = epoll_create(...);
    
        epoll_ctl(efd, EPOLL_CTL_ADD, cfd1, ...);
        epoll_ctl(efd, EPOLL_CTL_ADD, cfd2, ...);
        epoll_wait(efd, ...);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中和epoll相关的函数是如下三个:

    • epoll_create:创建一个epoll对象
    • epoll_ctl:向epoll对象添加要管理的连接
    • epoll_wait:等待其管理的连接上的IO事件
    1)epoll内核对象的创建

    在用户进程调用epoll_create时,内核会创建一个struct eventpoll内核对象,并把它关联到当前进程的已打开文件列表中,如下图所示:

    对于struct eventpoll对象,更详细的结构如下图所示:

    poll_create源码如下:

    // fs/eventpoll.c
    SYSCALL_DEFINE1(epoll_create1, int, flags)
    {
    	int error, fd;
    	struct eventpoll *ep = NULL;
    	...
    	// 创建一个eventpoll对象
    	error = ep_alloc(&ep);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    struct eventpoll的定义如下:

    // fs/eventpoll.c
    struct eventpoll {
    	...
    	// sys_epollo_wait用到的等待队列
    	wait_queue_head_t wq;
    	...
    	// 接收就绪的描述符都会放到这里
    	struct list_head rdllist;
    
    	// 每个epollo对象中都有一棵红黑树
    	struct rb_root rbr;
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    eventpoll这个结构体的几个成员的含义如下:

    • wq:等待队列链表。软中断数据就绪的时候会通过wq来找到阻塞在epoll对象上的用户进程
    • rbr:一棵红黑树。为了支持对海量连接的高效查找、插入和删除,eventpoll内部使用了一棵红黑树。通过这棵树来管理用户进程添加进来的所有socket连接
    • rdllist:就绪的描述符的链表。当有连接就绪的时候,内核会把就绪的连接放到rdllist链表里。这样应用进程只需要判断链表就能找出就绪连接,而不用去遍历整棵树

    eventpoll初始化工作在ep_alloc中完成:

    // fs/eventpoll.c
    static int ep_alloc(struct eventpoll **pep)
    {
    	...
    	struct eventpoll *ep;
    	...
    	// 申请eventpoll内存
    	ep = kzalloc(sizeof(*ep), GFP_KERNEL);
    	...
    	// 初始化等待队列头
    	init_waitqueue_head(&ep->wq);
    	init_waitqueue_head(&ep->poll_wait);
    	// 初始化就绪队列
    	INIT_LIST_HEAD(&ep->rdllist);
    	// 初始化红黑树指针
    	ep->rbr = RB_ROOT;
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    2)为epoll添加socket

    理解这一步是理解整个epoll的关键。为了简单起见,这里只考虑使用EPOLL_CTL_ADD添加socket,先忽略删除和更新

    假设现在和客户端的多个连接的socket都创建好了,也创建好了epoll内核对象。在使用epoll_ctl注册每一个socket的时候,内核会做如下三件事情:

    1. 分配一个红黑树节点对象epitem
    2. 将等待事件添加到socket的等待队列中,其回调函数是ep_poll_callback
    3. 将epitem插入epoll对象的红黑树

    通过epoll_ctl添加两个socket以后,这些内核数据结构最终在进程中的关系大致如下图所示:

    来看看socket是如何添加到epoll对象里的,找到epoll_ctl的源码

    // fs/eventpoll.c
    SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
    		struct epoll_event __user *, event)
    {
    	...
    	struct file *file, *tfile;
    	struct eventpoll *ep;
    	...
    	// 根据epfd找到eventpoll内核对象
    	file = fget(epfd);
    	...
    	// 根据socket句柄号,找到其file内核对象
    	tfile = fget(fd);
    	...
    	ep = file->private_data;
    	...
    	switch (op) {
    	case EPOLL_CTL_ADD:
    		if (!epi) {
    			epds.events |= POLLERR | POLLHUP;
    			error = ep_insert(ep, &epds, tfile, fd);
    		} else
    			error = -EEXIST;
    		clear_tfile_check_list();
    		break;
    	...
    	}
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在epoll_ctl中首先根据传入fd找到eventpoll、socket相关的内核对象。对于EPOLL_CTL_ADD操作来说,会执行到ep_insert函数。所有的注册都是在这个函数中完成的

    // fs/eventpoll.c
    static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
    		     struct file *tfile, int fd)
    {
    	...
    	struct epitem *epi;
    	struct ep_pqueue epq;
    	...
    	// 1.分配并初始化epitem
    	// 分配一个epi对象
    	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
    		return -ENOMEM;
    	...
    	// 对分配的epi对象进行初始化
    	INIT_LIST_HEAD(&epi->pwqlist);
    	epi->ep = ep;
    	// epi->ffd中存了句柄号和struct file对象地址
    	ep_set_ffd(&epi->ffd, tfile, fd);
    	...
    	// 2.设置socket等待队列
    	// 定义并初始化ep_pqueue对象
    	epq.epi = epi;
    	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
    	// 调用ep_ptable_queue_proc注册回调函数
    	// 实际注入的函数为ep_poll_callback
    	revents = ep_item_poll(epi, &epq.pt);
    	...
    	// 3.将epi插入eventpoll对象的红黑树中
    	ep_rbtree_insert(ep, epi);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    分配并初始化epitem

    对于每一个socket,调用epoll_ctl的时候,都会为之分配一个epitem。该结构的主要数据结构如下:

    // fs/eventpoll.c
    struct epitem {
    	// 红黑树节点
    	struct rb_node rbn;
    	...
    	// socket文件描述信息
    	struct epoll_filefd ffd;
    	...
    	// 等待队列
    	struct list_head pwqlist;
    
    	// 所归属的eventpoll对象
    	struct eventpoll *ep;
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    对epitem进行一些初始化,首先在epi->ep = ep;这行代码中将其ep指针指向eventpoll对象。另外用要添加的socket的file、fd来填充epi->ffd。epitem初始化后的关联关系如下图所示:

    其中使用到的ep_set_ffd函数如下:

    // fs/eventpoll.c
    static inline void ep_set_ffd(struct epoll_filefd *ffd,
    			      struct file *file, int fd)
    {
    	ffd->file = file;
    	ffd->fd = fd;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    设置socket等待队列

    在创建epitem并初始化之后,ep_insert中第二件事情就是设置socket对象上的等待任务队列,并把函数fs/eventpoll.c文件下的ep_poll_callback设置为数据就绪时候的回调函数,如下图所示:

    先来看ep_item_poll

    // fs/eventpoll.c
    static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
    {
    	pt->_key = epi->event.events;
    
    	return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里调用了socket下的file->f_op->poll,这个函数实际上是sock_poll

    // net/socket.c
    static unsigned int sock_poll(struct file *file, poll_table *wait)
    {
    	struct socket *sock;
    	sock = file->private_data;
    	return sock->ops->poll(file, sock, wait);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    sock->ops->poll指向的是tcp_poll

    // net/ipv4/tcp.c
    unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
    {
    	...
    	struct sock *sk = sock->sk;
    	...
    	sock_poll_wait(file, sk_sleep(sk), wait);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在sock_poll_wait的第二个参数传参前,先调用了sk_sleep函数。在这个函数里它获取了sock对象下的等待队列列表头wait_queue_head_t,稍后等待队列项就插到这里。这里稍微注意下,是socket的等待队列,不是epoll对象的。下面来看sk_sleep源码

    // include/net/sock.h
    static inline wait_queue_head_t *sk_sleep(struct sock *sk)
    {
    	BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
    	return &rcu_dereference_raw(sk->sk_wq)->wait;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    接着真正进入sock_poll_wait

    // include/net/sock.h
    static inline void sock_poll_wait(struct file *filp,
    		wait_queue_head_t *wait_address, poll_table *p)
    {
    	...
    		poll_wait(filp, wait_address, p);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    // include/linux/poll.h
    static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
    {
    	if (p && p->_qproc && wait_address)
    		p->_qproc(filp, wait_address, p);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里的qproc是个函数指针,它在前面的init_poll_funcptr调用时设置成了ep_ptable_queue_proc函数,ep_ptable_queue_proc源码如下:

    // fs/eventpoll.c
    static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
    				 poll_table *pt)
    {
    	struct epitem *epi = ep_item_from_epqueue(pt);
    	struct eppoll_entry *pwq;
    
    	if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
    		// 初始化回调方法
    		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
    		pwq->whead = whead;
    		pwq->base = epi;
    		// 将ep_poll_callback放入socket等待队列whead(注意不是epollo等待队列)
    		add_wait_queue(whead, &pwq->wait);
    		list_add_tail(&pwq->llink, &epi->pwqlist);
    		epi->nwait++;
    	} else {
    		epi->nwait = -1;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在ep_ptable_queue_proc函数中,新建了一个等待队列项,并注册其回调函数为ep_poll_callback函数,然后再将这个等待项添加到socket的等待队列中

    在前面介绍阻塞式的系统调用recvfrom时,由于需要在数据就绪的时候唤醒用户进程,所以等待对象项的private会设置成当前用户进程描述符current。而这里的socket是交给epoll来管理的,不需要在一个socket就绪的时候就唤醒进程,所以这里的q->private没有什么用就设置成了NULL

    // include/linux/wait.h
    static inline void init_waitqueue_func_entry(wait_queue_t *q,
    					wait_queue_func_t func)
    {
    	q->flags = 0;
    	q->private = NULL;
    	q->func = func;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如上,等待队列项中仅将回调函数q->func设置为ep_poll_callback。后面讲到“数据来了”时,软中断将数据收到socket的接收队列后,会通过注册的这个ep_poll_callback函数来回调,进而通知epoll对象

    插入红黑树

    分配完epitem对象后,紧接着把它插入红黑树。一个插入了一些socket描述符的epoll里的红黑树示意图如下图所示:

    这里使用红黑树是为了让epoll在查找效率、插入效率、内存开销等多个方法比较均衡

    3)epoll_wait之等待接收

    epoll_wait做的事情不复杂,当它被调用时它观察eventpoll->rdllist链表里有没有数据。有数据就返回,没有数据就创建一个等待队列项,将其添加到eventpoll的等待队列上,然后把自己阻塞掉

    其源代码如下:

    // fs/eventpoll.c
    SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
    		int, maxevents, int, timeout)
    {
    	...
    	error = ep_poll(ep, events, maxevents, timeout);
    	...
    }
    
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
    		   int maxevents, long timeout)
    {
    	...
    	wait_queue_t wait;
    	...
    fetch_events:
    	spin_lock_irqsave(&ep->lock, flags);
    	// 1.判断就绪队列上有没有事件就绪
    	if (!ep_events_available(ep)) {
    		// 2.定义等待事件关联当前进程
    		init_waitqueue_entry(&wait, current);
    		// 3.把新waitqueue添加到epoll->wq链表
    		__add_wait_queue_exclusive(&ep->wq, &wait);
    
    		for (;;) {
    			// 4.让出CPU,主动进入睡眠状态
    			set_current_state(TASK_INTERRUPTIBLE);
    			if (ep_events_available(ep) || timed_out)
    				break;
    			if (signal_pending(current)) {
    				res = -EINTR;
    				break;
    			}
    
    			spin_unlock_irqrestore(&ep->lock, flags);
    			if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
    				timed_out = 1;
    
    			spin_lock_irqsave(&ep->lock, flags);
    		}
    		__remove_wait_queue(&ep->wq, &wait);
    
    		set_current_state(TASK_RUNNING);
    	}
    ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    判断就绪队列上有没有事件就绪

    首先调用ep_events_available来判断就绪链表中是否有可处理的事件

    // fs/eventpoll.c
    static inline int ep_events_available(struct eventpoll *ep)
    {
    	return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义等待事件关联当前进程

    假设确实没有就绪的连接,那接着会进入init_waitqueue_entry中定义等待任务,并把current(当前进程)添加到waitqueue上

    当没有IO事件的时候,epollo也会阻塞调当前进程,因为没有事情可做了占着CPU也没什么意义。epoll本身是阻塞的,但一般会把socket设置成非阻塞

    // include/linux/wait.h
    static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
    {
    	q->flags = 0;
    	q->private = p;
    	q->func = default_wake_function;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注意这里的回调函数名称是default_wake_function。后面讲到“数据来了”时将会调用该函数

    添加到等待队列

    // include/linux/wait.h
    static inline void __add_wait_queue_exclusive(wait_queue_head_t *q,
    					      wait_queue_t *wait)
    {
    	wait->flags |= WQ_FLAG_EXCLUSIVE;
    	__add_wait_queue(q, wait);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里把定义的等待事件添加到了epoll对象的等待队列中

    让出CPU主动进入睡眠状态

    通过set_current_state把当前进程设置为可打断。调用schedule_hrtimeout_range让出CPU,主动进入睡眠状态

    // kernel/hrtimer.c
    int __sched schedule_hrtimeout_range(ktime_t *expires, unsigned long delta,
    				     const enum hrtimer_mode mode)
    {
    	return schedule_hrtimeout_range_clock(expires, delta, mode,
    					      CLOCK_MONOTONIC);
    }
    
    int __sched
    schedule_hrtimeout_range_clock(ktime_t *expires, unsigned long delta,
    			       const enum hrtimer_mode mode, int clock)
    {
    	...
    		schedule();
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在schedule中选择下一个进程调度

    // kernel/sched/core.c
    static void __sched __schedule(void)
    {
    	...
    	next = pick_next_task(rq);
    	...
    		context_switch(rq, prev, next);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    4)数据来了

    在前面epoll_ctl执行的时候,内核为每一个socket都添加了一个等待队列项。在epoll_wait运行完的时候,又在event poll对象上添加了等待队列元素

    • socket->sock->sk_data_ready设置的就绪处理函数是sock_def_readable
    • 在socket的等待队列中,其回调函数是ep_poll_callback,private指向的是空指针null
    • 在eventpoll的等待队列项中,其回调函数是default_wake_function,private指向的是等待该事件的用户进程

    将数据接收到任务队列

    从TCP协议栈的处理入口函数tcp_v4_rcv开始说起:

    // net/ipv4/tcp_ipv4.c
    int tcp_v4_rcv(struct sk_buff *skb)
    {
    	...
        // 获取TCP头
    	th = tcp_hdr(skb);
    	// 获取IP头
    	iph = ip_hdr(skb);
    	...
    	// 根据数据包头中的IP、端口信息查找到对应的socket
    	sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
    	...
    	// socket未被用户锁定
    	if (!sock_owned_by_user(sk)) {
    		...
    		{
    			if (!tcp_prequeue(sk, skb))
    				ret = tcp_v4_do_rcv(sk, skb);
    		}
    		...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在tcp_v4_rcv中首先根据收到的网络包的header里的source和dest信息在本机上查找对应的socket。找到以后,直接接入接收的主体函数tcp_v4_do_rcv

    // net/ipv4/tcp_ipv4.c
    int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
    {
    	...
    	if (sk->sk_state == TCP_ESTABLISHED) {
    		...
            // 执行连接状态下的数据处理  
    		if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
    			rsk = sk;
    			goto reset;
    		}
    		return 0;
    	}
    	// 其他非ESTABLISH状态的数据包处理
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    假设处理的是ESTABLISH状态下的包,这样就又进入tcp_rcv_established函数中进行处理了

    // net/ipv4/tcp_input.c
    int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
    			const struct tcphdr *th, unsigned int len)
    {
      			    ...
    				// 将数据接收到队列中
    				eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
    						      &fragstolen);
    			}
    			...
                // 数据准备好,唤醒socket上阻塞掉的进程  
    			sk->sk_data_ready(sk, 0);
    			...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在tcp_rcv_established中通过调用tcp_queue_rcv函数完成了将接收数据放到socket的接收队列上,如下图所示:

    // net/ipv4/tcp_input.c
    static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
    		  bool *fragstolen)
    {
    	...
        // 把接收到的数据放到socket的接收队列的尾部 
    	if (!eaten) {
    		__skb_queue_tail(&sk->sk_receive_queue, skb);
    		skb_set_owner_r(skb, sk);
    	}
    	return eaten;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    查找就绪回调函数

    调用tcp_queue_rcv完成接收之后,接着再调用sk_data_ready来唤醒在socket上等待的用户进程。在“socket的直接创建”中讲到的sock_init_data函数,已经把sk_data_ready设置成了sock_def_readable函数了。它是默认的数据就绪处理函数

    当socket上数据就绪时,内核将以sock_def_readable这个函数为入口,找到epoll_ctl添加socket时在其上设置的回调函数ep_poll_callback,如下图所示:

    // net/core/sock.c
    static void sock_def_readable(struct sock *sk, int len)
    {
    	struct socket_wq *wq;
    
    	rcu_read_lock();
    	wq = rcu_dereference(sk->sk_wq);
        // 判断等待队列不为空
    	if (wq_has_sleeper(wq))
            // 执行等待队列项上的回调函数
    		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
    						POLLRDNORM | POLLRDBAND);
    	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    	rcu_read_unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    重点看wake_up_interruptible_sync_poll,看一下内核是怎么找到等待队列项里注册的回调函数的

    // include/linux/wait.h
    #define wake_up_interruptible_sync_poll(x, m)				\
    	__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
    
    • 1
    • 2
    • 3
    // kernel/sched/core.c
    void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
    			int nr_exclusive, void *key)
    {
    	unsigned long flags;
    	int wake_flags = WF_SYNC;
    
    	if (unlikely(!q))
    		return;
    
    	if (unlikely(!nr_exclusive))
    		wake_flags = 0;
    
    	spin_lock_irqsave(&q->lock, flags);
    	__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    	spin_unlock_irqrestore(&q->lock, flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    接着进入__wake_up_common

    // kernel/sched/core.c
    static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
    			int nr_exclusive, int wake_flags, void *key)
    {
    	wait_queue_t *curr, *next;
    
    	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
    		unsigned flags = curr->flags;
    
    		if (curr->func(curr, mode, wake_flags, key) &&
    				(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
    			break;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    __wake_up_common中,选出等待队列里注册的某个元素curr,回调其curr->func。之前调用ep_insert的时候,把这个func设置成ep_poll_callback了

    执行socket就绪回调函数

    找到了socket等待队列项里注册的函数ep_poll_callback,接着软中断就会调用它

    // fs/eventpoll.c
    static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
    {
    	...
        // 获取wait对应的epitem
    	struct epitem *epi = ep_item_from_wait(wait);
        // 获取epitem对应的eventpoll结构体
    	struct eventpoll *ep = epi->ep;
    	...
    	if (!ep_is_linked(&epi->rdllink)) {
            // 1.将当前epitem添加到eventpoll的就绪队列中
    		list_add_tail(&epi->rdllink, &ep->rdllist);
    		ep_pm_stay_awake_rcu(epi);
    	}
    	// 2.查看eventpoll的等待队列上是否有等待
    	if (waitqueue_active(&ep->wq))
    		wake_up_locked(&ep->wq);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在ep_poll_callback中根据等待任务队列上额外的base指针可以找到epitem,进而也可以找到eventpoll对象

    它做的第一件事就是把自己的epitem添加到epoll的就绪队列中。接着它又会查看eventpoll对象上的的等待队列里是否有等待项(epoll_wait执行的时候会设置)。如果没有等待项,软中断的事情就做完了。如果有等待项,那就找到等待项里设置的回调函数,如下图所示:

    依次调用wake_up_locked() => __wake_up_locked() => __wake_up_common

    // kernel/sched/core.c
    static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
    			int nr_exclusive, int wake_flags, void *key)
    {
    	wait_queue_t *curr, *next;
    
    	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
    		unsigned flags = curr->flags;
    
    		if (curr->func(curr, mode, wake_flags, key) &&
    				(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
    			break;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    __wake_up_common离,调用curr->func。这里的func是在epoll_wait时传入的default_wake_function函数

    执行epoll就绪通知

    在default_wake_function中找到等待队列项里的进程描述符,然后唤醒它,如下图所示:

    // kernel/sched/core.c
    int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
    			  void *key)
    {
    	return try_to_wake_up(curr->private, mode, wake_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    等待队列项curr->private指针是在对象上等待而被阻塞掉的进程

    将epoll_wait进程推入可运行队列,等待内核重新调度进程。当这个进程重新运行后,从epoll_wait阻塞时暂停的代码处继续执行。把rdlist中就绪的事件返回给用户进程

    // fs/eventpoll.c
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
    		   int maxevents, long timeout)
    {
    		...
    		__remove_wait_queue(&ep->wq, &wait);
    
    		set_current_state(TASK_RUNNING);
    	}
    check_events:
    	eavail = ep_events_available(ep);
    
    	spin_unlock_irqrestore(&ep->lock, flags);
    
    	// 给用户进程返回就绪事件
    	if (!res && eavail &&
    	    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
    		goto fetch_events;
    
    	return res;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    从用户角度来看,epoll_wait只是多等了一会儿而已,但执行流程还是顺序的

    5)小结

    epoll的整个工作流程总结如下图所示:

    其中软中断回调时的回调函数调用关系整理如下:

    sock_def_readable: sock对象初始化时设置的
    	=> ep_poll_callback: 调用epll_ctl时添加到socket上的
    		=> default_wake_function: 调用epoll_wait时设置到epoll上的
    
    • 1
    • 2
    • 3

    总结一下,epoll相关的函数里内核运行环境分两部分:

    • 用户进程内核态。调用epoll_wait等函数时会将进程写入内核态来执行。这部分代码负责查看接收队列,以及负责把当前进程阻塞掉,让出CPU
    • 硬、软中断上下文。在这些组件中,将包从网卡接收过来进行处理,然后放到socket的接收队列。对于epoll来说,再找到socket关联的epitem,并把它添加到epoll对象的就绪链表中。这个时候再捎带检查一下epoll上是否有被阻塞的进程,如果有唤醒它

    在实践中,只要活儿足够多,epoll_wait根本不会让进程阻塞。用户进程会一直干活儿,一直干活儿,直到epoll_wait里实在没活儿可干的时候才主动让出CPU。这就是epol高效的核心原因所在

    4)、总结

    1)阻塞到底是怎么一回事?

    阻塞其实说的是进程因为等待某个事件而主动让出CPU挂起的操作。在网络IO中,当进程等待socket上的数据时,如果数据还没有到来,那就把当前进程状态从TASK_RUNNING修改为TASK_INTERRUPTIBLE,然后主动让出CPU。由调度器来调度下一个就绪状态的进程来执行

    所以,在分析某个技术方案是不是阻塞的时候,关键要看进程有没有放弃CPU。如果放弃了,那就是阻塞。如果没放弃,那就是非阻塞。事实上,recvfrom也可以设置成非阻塞。在这种情况下,如果socket上没有数据到达,调用直接返回空,而不是挂起等待

    2)同步阻塞IO都需要哪些开销?

    同步阻塞IO的开销主要有以下这些:

    • 进程通过recv系统调用接收一个socket上的数据时,如果数据没有到达,进程就被从CPU上拿下来,然后再换上另一个进程。这导致一次进程上下文切换的开销
    • 当连接上的数据就绪的时候,睡眠的进程又会被唤醒,又是一次进程切换的开销
    • 一个进程同时只能等待一条连接,如果有很多并发,则需要很多进程。每个进程都将占用大于几MB的内存

    3)多路复用epoll为什么就能提高网络性能?

    epoll高性能最根本的原因是极大程度地减少了无用的进程上下文切换,让进程更专注地处理网络请求

    在内核的硬、软中断上下文中,包从网卡接收过来进行处理,然后放到socket的接收队列。再找到socket关联的epitem,并把它添加到epoll对象的就绪链表中

    在用户进程中,通过调用epoll_wait来查看就绪链表中是否有事件到达,如果有,直接走进行处理。处理完毕再次调用epoll_wait。在高并发的实践中,主要活儿足够多,epoll_wait根本不会让进程阻塞。用户进程会一直干活儿,一直干活儿,直到epoll_wait里实在没活儿可干的时候才主动让出CPU。这就是epoll高效的核心原因所在

    至于红黑树,仅仅是提高了epoll查找、添加、删除socket时的效率而已,不算epoll在高并发场景高性能的根本原因

    4)epoll也是阻塞的?

    例如,一个epoll对象下添加了一万个客户端连接的socket。假设所有这些socket上都还没有数据到达,这个时候进程调用epoll_wait发现没有任何事情可干。这种情况下用户进程就会被阻塞掉,而这种情况是完全正常的,没有工作需要处理,那还占着CPU是没有道理的

    阻塞不会导致低性能,过多过频繁的阻塞才会。epoll的阻塞和它的高性能并不冲突

    5)为什么Redis的网络性能很突出?

    Redis在网络IO性能上表现非常突出,单进程的服务器在极限情况下可以达到10万的QPS

    Redis的事件循环可以简化到用如下伪代码来表示

    void aeMain(aeEventLoop *eventLoop) {
        job = epollo_wait(...);
        do_job();
    }
    
    • 1
    • 2
    • 3
    • 4

    Redis的主要业务逻辑就是在本机内存上的数据结构的读写,几乎没有网络IO和磁盘IP,单个请求处理起来很快。所以它把主服务器程序干脆就做成了单进程的,这样省去了多进程之间协作的负担,也很大程序减少了进程切换。进程主要的工作过程就是调用epoll_wait等待事件,有了事件以后处理,处理完之后再调用epoll_wait。一直工作,一直工作,直到实在没有请求需要处理,或者进程时间片到的时候才让出CPU。工作效率发挥到了极致

    推荐阅读:

    Linux五种I/O模型:带你彻底理解Linux五种I/O模型

    红黑树:图解:什么是红黑树?

  • 相关阅读:
    DRF: 序列化器、View、APIView、GenericAPIView、Mixin、ViewSet、ModelViewSet的源码解析
    TCP 流控问题两则
    一道任务编排服务面试题解析
    java多线程这一篇就差不多了
    Android——gradle构建知识片-散装版
    【华为OD机试真题 JAVA】数字涂色
    【Windows 11】安装 Android子系统 和 Linux子系统
    【老生谈算法】matlab实现PID控制算法源码——PID控制算法
    企业架构LNMP学习笔记32
    ROS学习ROS基础
  • 原文地址:https://blog.csdn.net/qq_40378034/article/details/133631327