注:完整代码见仓库。
因为我之前看过unix网络编程-select函数,所以看epoll接口没啥难度。
本文缺点:
本文目标:
这里主要是介绍下epoll的api。主要是一些复制粘贴的工作。复制粘贴的不全,意思意思,主要看明白书上的概念介绍,在瞅瞅代码进行验证。
来源:
epoll api执行与poll类似的任务: 监视多个文件描述符,以查看它们中的任何一个是否可以执行I/O。epoll api既可以用作边缘触发的接口,也可以用作水平触发的接口,可以很好地扩展到大量可观察的文件描述符。
epoll api的核心概念是epoll实例,它是一种内核数据结构,从用户空间的角度来看,可以将其视为两个列表的容器:
下面是一些api罗列。
int epoll_create(int size):
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);:
struct epoll_event
{
__uint32_t events;/*epoll事件*/
epoll_data_t data;/*用户数据*/
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);:
这里不粘贴使用的示例代码。示例代码见上文链接。
写epoll的demo,还是要多线程的。线程池的介绍可参考:C++线程池
另外,我们还需要知道下Reactor模式。下面介绍,复制自《高性能服务器编程》游双–8.4.1 Reactor模式。
Reactor是这样一种模式, 它要求主线程(I/O处理单元, 下同) 只负责监听文件描述上是否有事件发生, 有的话就立即将该事件通知工作线程(逻辑单元, 下同) 。 除此之外, 主线程不做任何其他实质性的工作。 读写数据, 接受新的连接, 以及处理客户请求均在工作线程中完成。
1) 主线程往epoll内核事件表中注册socket上的读就绪事件。
2) 主线程调用epoll_wait等待socket上有数据可读。
3) 当socket上有数据可读时, epoll_wait通知主线程。 主线程则将socket可读事件放入请求队列。
4) 睡眠在请求队列上的某个工作线程被唤醒, 它从socket读取数据, 并处理客户请求, 然后往epoll内核事件表中注册该socket上的写就绪事件。
5) 主线程调用epoll_wait等待socket可写。
6) 当socket可写时, epoll_wait通知主线程。 主线程将socket可写事件放入请求队列。
7) 睡眠在请求队列上的某个工作线程被唤醒, 它往socket上写入服务器处理客户请求的结果。
注:代码实现中,accept连接过程,放在了主线程中了。
完整代码见仓库,这里只粘贴下,server.hpp代码。
#pragma once
#include "resolve.hpp"
#include "thread_pool.hpp"
#include "util.hpp"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX_EVENT_NUMBER 10000
class server {
private:
int m_port;
int m_listenfd;
int m_epollfd;
bool m_stop = false;
thread_pool<resolve> pool; // 处理resolve对象的线程池
std::map<int, std::shared_ptr<resolve>> resolves; // 存储从客户端发送来的内容,存储回复给客户端的内容
private:
void event_listen();
void event_loop();
public:
server(int port);
void start();
};
server::server(int port = 9999): m_port(port) {}
void server::start() {
event_listen();
event_loop();
}
void server::event_listen() {
// 创建监听描述符并加入epoll set
m_listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(m_listenfd >= 0);
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(m_port);
int ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret >= 0);
ret = listen(m_listenfd, 5);
assert(ret >= 0);
m_epollfd = epoll_create(5);
assert(m_epollfd != -1);
utils::epoll_help::instance().addfd(m_epollfd, m_listenfd);
}
void server::event_loop()
{
epoll_event events[MAX_EVENT_NUMBER];
while(!m_stop) {
int n = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if(n < 0 && errno == EINTR) {
continue;
} else if(n < 0 && errno != EINTR) {
assert(n >=0); // 这里最好抛出异常,先用assert顶顶
}
for(int i=0; i<n; i++) {
int sockfd = events[i].data.fd;
if(sockfd == m_listenfd) { // 建立新的连接
struct sockaddr_in client_address;
socklen_t client_addr_len = sizeof(client_address);
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addr_len);
assert(connfd >= 0); // 这里应当使用异常处理
utils::epoll_help::instance().addfd(m_epollfd, connfd);
resolves[connfd] = std::shared_ptr<resolve>(new resolve(m_epollfd, connfd));
} else if(events[i].events & EPOLLIN) { // 读-丢给线程池处理
pool.append(resolves[sockfd]);
} else if(events[i].events & EPOLLOUT) { // 写-丢给线程池处理
pool.append(resolves[sockfd]);
} else if(events[i].events & EPOLLRDHUP) { // 客户端关闭
utils::epoll_help::instance().removefd(m_epollfd, events[i].data.fd);
// resolves.erase(events[i].data.fd); // 这样擦出或许不好,要改变树结构
// 所以不用删除,fd已经关闭,不会使用resolves[fd];当新的相同的fd连接时,自动覆盖
}
}
}
}
测试的客户端代码没写。使用nc 127.0.0.1 9999进行连接测试即可。