多路复用:
使用一个进程(且只有主线程)同时监控若干个文件描述符的读写情况,这种读写模式称为多路复用
多用于TCP的服务端,用于监控客户端的连接和数据的发送
优点:不需要频繁的创建、销毁进程,从而节约了内存资源、时间资源,也避免了进程之间的竞争、等待
缺点:要求单个客户端的任务不能太多于耗时,否则其他客户端就会感知到卡顿
适合并发量高、但是任务量短小的情景,例如:Web服务器
相关函数:
select:
fd_set 是文件描述符的集合,使用以下函数操作
void FD_CLR(int fd, fd_set *set);
功能:从集合set中删除fd文件描述符
int FD_ISSET(int fd, fd_set *set);
功能:判断集合set中是否存在fd文件描述符
void FD_SET(int fd, fd_set *set);
功能:向集合set中添加fd文件描述符
void FD_ZERO(fd_set *set);
功能:清空集合set
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
功能:同时监控多个文件描述的读、写、异常操作
nfds:被监控的文件描述符中的最大值+1
readfds:监控读操作的文件描述符集合
writefds:监控写操作的文件描述符集合
exceptfds:监控异常操作的文件描述符集合
timeout:设置超时时间
NULL 一直阻塞,直到某个文件描述符发生了变化
0秒0微秒 非阻塞
大于0秒 等待超时时间,超时返回0
struct timeval {
time_t tv_sec; //秒
suseconds_t tv_usec; //微秒
};
返回值:监控到发生相关操作的文件描述符的个数,超时返回0
错误返回-1
注意:
readfds、writefds、exceptfds 这三个集合参数既是输入也是输出,
调用select时这三个集合需要存储被监控的见闻描述符,当由于有文件描述符发生了相应的操作而导致函数返回时,这三个集合中
存储了这些文件描述符并返回给调用者
select设计不合理的地方:
1、每次调用select都需要向它重新传递被监控的文件描述符集合
2、调用结束后如果想知道具体是哪个文件描述符发生了相关操作,必须把所有被监控的文件描述符进行一遍测试
select的优点:
它是最早的多路复用函数,几乎所有的操作系统都支持,兼容性很高
#include
#include
#include
#include
#include
#include
#include
#include
#include
// TCP server
typedef struct sockaddr* SP;
int main(int argc,const char* argv[])
{
int sockfd = socket(AF_INET,SOCK_STREAM,0);
if(0 > sockfd)
{
perror("socket");
return EXIT_FAILURE;
}
struct sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = htons(5566);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
socklen_t addrlen = sizeof(addr);
if(bind(sockfd,(SP)&addr,addrlen))
{
perror("bind");
return EXIT_FAILURE;
}
if(listen(sockfd,10))
{
perror("listen");
return EXIT_FAILURE;
}
// 定义读操作文件描述符集合
fd_set reads;
FD_ZERO(&reads);
// 把等待连接的sockfd添加到集合
FD_SET(sockfd,&reads);
// 定义超时时间
struct timeval timeout = {5,0};
// 定义记录集合中最大值fd的变量
int max_fd = sockfd;
char buf[4096] = {};
size_t buf_size = sizeof(buf);
for(;;)
{
// 备份集合
fd_set copy_reads = reads;
int ret =
select(max_fd+1,©_reads,NULL,NULL,&timeout);
if(0 < ret)
{
//sockfd需要读操作 有客户端连接
if(FD_ISSET(sockfd,©_reads))
{
int cli_fd = accept(sockfd,(SP)&addr,&addrlen);
if(0 > cli_fd)
{
perror("accept");
}
else
{
//把连接成功的cli_fd添加到监控集合
FD_SET(cli_fd,&reads);
if(cli_fd > max_fd)
{
max_fd = cli_fd;
}
}
}
else//其他fd需要读操作 有客户端发送了数据
{
for(int fd=3; fd<=max_fd; fd++)
{
if(FD_ISSET(fd,©_reads) &&
fd != sockfd)
{
int ret = recv(fd,buf,buf_size,0);
if(0 >= ret ||
0 == strcmp(buf,"quit"))
{
FD_CLR(fd,&reads);
printf("有客户端%d退出\n",fd);
continue;
}
printf("recv:%s\n",buf);
strcat(buf,"return");
ret = send(fd,buf,strlen(buf)+1,0);
if(0 >= ret)
{
FD_CLR(fd,&reads);
printf("有客户端%d退出\n",fd);
continue;
}
}
}
}
}
}
}
int pselect(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);
功能:大致与select一样
区别:
1、超时时间的结构类型不同
struct timespec {
long tv_sec; //秒
long tv_nsec; //纳秒
};
2、pselect监控时可以通过sigmask参数设置要屏蔽的信号,可以保证pselect监控时不受这些信号干扰
共同点:本质点与select差别不大、select的缺点pelect也是一样的,只是个别功能有所增强而已
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds:struct pollfd结构变量数组
struct pollfd {
int fd; //被监控的文件描述符
short events; //想要监控的事件
short revents; //实际监控到的事件
POLLIN 普通优先级的读事件
POLLPRI 高优先级的读事件
POLLOUT 普通优先级的写事件
POLLRDHUP 监控对方socket是否关闭
POLLERR 错误事件
POLLHUP 对方挂起
POLLNVAL 非法描述符
};
nfds:数组的长度
timeout:超时时间 按毫秒赋值 1000毫秒=1秒
返回值:监控到发生相关操作的文件描述符的个数,超时返回0,错误返回-1
#include
#include
#include
#include
#include
#include
#include
#include
#include
// TCP server
typedef struct sockaddr* SP;
int add_fds(struct pollfd* fds,nfds_t nfds,int fd,short events)
{
for(int i=0; i { if(0 == fds[i].fd) { fds[i].fd = fd; fds[i].events = events; return i; } } return -1; } int main(int argc,const char* argv[]) { int sockfd = socket(AF_INET,SOCK_STREAM,0); if(0 > sockfd) { perror("socket"); return EXIT_FAILURE; } struct sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(5566); addr.sin_addr.s_addr = inet_addr("127.0.0.1"); socklen_t addrlen = sizeof(addr); if(bind(sockfd,(SP)&addr,addrlen)) { perror("bind"); return EXIT_FAILURE; } if(listen(sockfd,10)) { perror("listen"); return EXIT_FAILURE; } char buf[4096] = {}; size_t buf_size = sizeof(buf); // 创建pollfd数组并初始化 struct pollfd* fds = calloc(sizeof(struct pollfd),10); // 设置[0]位置要监控的描述符和事件 fds[0].fd = sockfd; fds[0].events = POLLIN; // 设置超时时间 int timeout = 5000; for(;;) { int ret = poll(fds,10,timeout); if(0 == ret) continue; // 超时 if(0 > ret) { perror("poll"); return EXIT_FAILURE; } // 判断[0]位置是否产生读事件 客户端连接 if(fds[0].revents & POLLIN) { int cli_fd = accept(sockfd,(SP)&addr,&addrlen); if(0 < cli_fd) { if(-1 == add_fds(fds,10,cli_fd,POLLIN)) { printf("客户端数量已满!\n"); } } } else { // 其他客户端发送了数据 遍历判断fds for(int i=1; i<10; i++) { if(!(fds[i].revents & POLLIN)) continue; // 对[i]位置进行收发数据 int ret = recv(fds[i].fd,buf,buf_size,0); if(0 >= ret || 0 == strncmp(buf,"quit",4)) { printf("客户端%d退出!\n",fds[i].fd); fds[i].fd = 0; fds[i].events = 0; continue; } printf("recv:%s\n",buf); strcat(buf,":return"); ret = send(fds[i].fd,buf,strlen(buf)+1,0); if(0 >= ret) { printf("客户端%d退出!\n",fds[i].fd); fds[i].fd = 0; fds[i].events = 0; continue; } } } } } #include int epoll_create(int size); 功能:创建一个epoll的内核对象,该对象可以管理、保存被监控的描述符 size:epoll对象管理描述符的数量 返回值:epoll对象的描述符 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 功能:控制epoll对象,添加、删除描述符 epfd:epoll对象的描述符 op: EPOLL_CTL_ADD 添加监控的描述符 EPOLL_CTL_DEL 删除监控的描述符 EPOLL_CTL_MOD 修改要监控的描述符的事件 fd:要操作的描述符 event:要监听的事件 struct epoll_event { uint32_t events; //要监控的事件,参考poll epoll_data_t data; /* User data variable */ }; typedef union epoll_data { void *ptr; int fd; //产生事件的描述符 uint32_t u32; uint64_t u64; } epoll_data_t; 返回值:成功0,失败-1 int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); 功能:监控文件描述符,并返回发生事件的描述符 epfd:epoll对象的描述符 events:输出型参数,用于获取发生事件的描述符 maxevents:可以返回事件数目的最大值 timeout:超时时间 返回值:监控到发生相关操作的文件描述符的个数,超时返回0,错误返回-1 #include #include #include #include #include #include #include #include #include // TCP server typedef struct sockaddr* SP; #define EPOLL_MAX 10 int main(int argc,const char* argv[]) { int sockfd = socket(AF_INET,SOCK_STREAM,0); if(0 > sockfd) { perror("socket"); return EXIT_FAILURE; } struct sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(5566); addr.sin_addr.s_addr = inet_addr("127.0.0.1"); socklen_t addrlen = sizeof(addr); if(bind(sockfd,(SP)&addr,addrlen)) { perror("bind"); return EXIT_FAILURE; } if(listen(sockfd,10)) { perror("listen"); return EXIT_FAILURE; } char buf[4096] = {}; size_t buf_size = sizeof(buf); // 创建epoll对象 int epfd = epoll_create(EPOLL_MAX); if(0 > epfd) { perror("epoll_create"); return EXIT_FAILURE; } // 向epoll对象添加sockfd struct epoll_event event; event.events = EPOLLIN; event.data.fd = sockfd; if(epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event)) { perror("epoll_ctl"); return EXIT_FAILURE; } // 定义存储监控结果的epoll_event数组 struct epoll_event events[EPOLL_MAX] = {}; for(;;) { int epoll_cnt = epoll_wait(epfd,events,EPOLL_MAX,5000); if(0 == epoll_cnt) continue; // 超时 if(0 > epoll_cnt) { perror("epoll_wait"); return EXIT_FAILURE; } // 遍历events数组 for(int i=0; i { if(sockfd == events[i].data.fd) { int cli_fd = accept(sockfd,(SP)&addr,&addrlen); if(0 < cli_fd) { event.events = EPOLLIN; event.data.fd = cli_fd; if(epoll_ctl(epfd,EPOLL_CTL_ADD,cli_fd,&event)) { perror("epoll_ctl"); continue; } } } else { int ret = recv(events[i].data.fd,buf,buf_size,0); if(0 >= ret) { printf("客户端%d退出\n",events[i].data.fd); epoll_ctl(epfd,EPOLL_CTL_DEL, events[i].data.fd,NULL); continue; } printf("recv:%s\n",buf); strcat(buf,":return"); ret = send(events[i].data.fd, buf,strlen(buf)+1,0); if(0 >= ret) { printf("--客户端%d退出\n", events[i].data.fd); epoll_ctl(epfd,EPOLL_CTL_DEL, events[i].data.fd,NULL); continue; } } } } }