本文分享自华为云社区《高性能网络设计秘笈:深入剖析Linux网络IO与epoll》,作者: Lion Long 。
epoll是Linux内核中一种可扩展的IO事件处理机制,可替代select和poll的系统调用。处理百万级并发访问性能更佳。
(1) 文件描述符越多,性能越差。 单个进程中能够监视的文件描述符存在最大的数量,默认是1024(在linux内核头文件中定义有 #define _FD_SETSIZE 1024),当然也可以修改,但是文件描述符数量越多,性能越差。
(2)开销巨大 ,select需要复制大量的句柄数据结构,产生了巨大的开销(内核/用户空间内存拷贝问题)。
(3)select需要遍历整个句柄数组才能知道哪些句柄有事件。
(4)如果没有完成对一个已经就绪的文件描述符的IO操作,那么每次调用select还是会将这些文件描述符通知进程,即水平触发。
(5)poll使用链表保存监视的文件描述符,虽然没有了监视文件数量的限制,但是其他缺点依旧存在。
由于以上缺点,基于select模型的服务器程序,要达到十万以上的并发访问,是很难完成的。因此,epoll出场了。
(1)不需要轮询所有的文件描述符
(2)每次取就绪集合,都在固定位置
(3)事件的就绪和IO触发可以异步解耦
#include <sys/epoll.h> int epoll_create(int size);
功能:创建epoll的文件描述符。
参数说明:size表示内核需要监控的最大数量,但是这个参数内核已经不会用到,只要传入一个大于0的值即可。 当size<=0时,会直接返回不可用,这是历史原因保留下来的,最早的epoll_create是需要定义一次性就绪的最大数量;后来使用了链表以便便维护和扩展,就不再需要使用传入的参数。
返回:返回该对象的描述符,注意要使用 close 关闭该描述符。
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl对应系统调用sys_epoll_ctl
功能:操作epoll的文件描述符,主要是对epoll的红黑树节点进行操作,比如节点的增删改查。
参数说明:
struct epoll_event结构体原型
typedef union epoll_data{ void* ptr; int fd; uint32_t u32; uint64_t u64 }; struct epoll_event{ uint32_t events; epoll_data_t data; }
events成员代表要监听的epoll事件类型
events成员:
data成员:
data 成员时一个联合体类型,可以在调用 epoll_ctl 给 fd 添加/修改描述符监听的事件时携带一些数据,方便后面的epoll_wait可以取出信息使用。
跟着的数字代表函数需要的参数数量,比如SYSCALL_DEFINE1代表函数需要一个参数、SYSCALL_DEFINE4代表函数需要4个参数。
epoll_ctl是非阻塞的,不会被挂起。
函数原型
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
功能:阻塞一段时间,等待事件发生
返回:返回事件数量,事件集添加到events数组中。也就是遍历红黑树中的双向链表,把双向链表中的节点数据拷贝出来,拷贝完毕后把节点从双向链表中移除。
step 1:创建epoll文件描述符
int epfd = epoll_create(1);
step 2:创建struct epoll_event结构体
struct epoll_event ev; ev.data.fd=listenfd;//保存监听的fd,以便epoll_wait的后续操作 ev.events=EPOLLIN;//设置监听fd的可读事件
step 3:添加事件监听
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
step 4:等待事件
struct epoll_event events[EVENTS_LENGTH]; char rbuffer[MAX_BUFF]={ 0 }; char wbuffer[MAX_BUFF]={ 0 }; while(1) { int nready = epoll_wait(epfd,events,EVENTS_LENGTH,-1);//-1表示阻塞等待 int i=0; for(i=0;i<nready;i++) { int clientfd=events[i].data.fd; if(clientfd==listenfd) { struct sockaddr_in client; int len=sizeof(client); int confd=accept(listenfd,(struct sockaddr*)&client,&len); //step 2:创建struct epoll_event结构体 struct epoll_event evt; evt.data.fd=confd;//保存监听的fd,以便epoll_wait的后续操作 evt.events=EPOLLIN;//设置监听fd的可读事件 // step 3:添加事件监听 epoll_ctl(epfd,EPOLL_CTL_ADD,confd,&evt); } else if(events[i].events &EPOLLIN) { int ret = recv(clientfd,rbuffer,MAX_BUFF,0); if(ret>0) { rbuffer[ret]='\0';//剔除干扰数据 printf("recv: %s\n",rbuffer); memcpy(wbuffer,rbuffer,MAX_BUFF);//拷贝数据,做回传示例 //step 2:创建struct epoll_event结构体 struct epoll_event evt; evt.data.fd=clientfd;//保存监听的fd,以便epoll_wait的后续操作 evt.events=EPOLLOUT;//设置监听fd的可写事件 // step 3:修改事件监听 epoll_ctl(epfd,EPOLL_CTL_MOD,clientfd,&evt); } } else if(events[i].events &EPOLLOUT) { int ret = send(clientfd,wbuffer,MAX_BUFF,0); printf("send: %s\n",wbuffer); //step 2:创建struct epoll_event结构体 struct epoll_event evt; evt.data.fd=clientfd;//保存监听的fd,以便epoll_wait的后续操作 evt.events=EPOLLIN;//设置监听fd的可读事件 // step 3:修改事件监听 epoll_ctl(epfd,EPOLL_CTL_MOD,clientfd,&evt); } } }
#include <stdio.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <fcntl.h> #include <unistd.h> #include <pthread.h> #include <sys/epoll.h> #include <string.h> #define BUFFER_LENGTH 128 #define EVENTS_LENGTH 128 char rbuff[BUFFER_LENGTH] = { 0 }; char wbuff[BUFFER_LENGTH] = { 0 }; int main() { // block int listenfd = socket(AF_INET, SOCK_STREAM, 0); // if (listenfd == -1) return -1; // listenfd struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) { return -2; } #if 0 // nonblock int flag = fcntl(listenfd, F_GETFL, 0); flag |= O_NONBLOCK; fcntl(listenfd, F_SETFL, flag); #endif listen(listenfd, 10); int epfd = epoll_create(1); struct epoll_event ev, events[EVENTS_LENGTH]; ev.events = EPOLLIN; ev.data.fd = listenfd; epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); printf("epfd : %d\n", epfd); while (1) { int nready = epoll_wait(epfd, events, EVENTS_LENGTH, -1); printf("nready --> %d\n",nready); int i; for (i = 0; i < nready;i++) { int clientfd = events[i].data.fd; if (listenfd == clientfd) { // accept struct sockaddr_in client; int len = sizeof(client); int conffd = accept(clientfd, (struct sockaddr*)&client,&len); printf("conffd --> %d\n",conffd); ev.events = EPOLLIN; ev.data.fd = conffd; epoll_ctl(epfd, EPOLL_CTL_ADD, conffd, &ev); } else if(events[i].events & EPOLLIN)//client { int ret=recv(clientfd, rbuff, BUFFER_LENGTH, 0); if (ret > 0) { rbuff[ret] = '\0'; printf("recv buffer: %s\n", rbuff); /* int j; for (j = 0; j < BUFFER_LENGTH;j++) { buff[j] = 'a' + (j % 26); } send(clientfd, buff, BUFFER_LENGTH, 0); */ memcpy(wbuff, rbuff, BUFFER_LENGTH); ev.events = EPOLLOUT; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev); } } else if (events[i].events & EPOLLOUT) { send(clientfd, wbuff, BUFFER_LENGTH, 0); printf("send --> %s\n",wbuff); ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev); } } } return 0; }
读写使用相同的缓冲区。比如上述的示例中,wbuffer和rbuffer是使用同一个缓冲区的,所以需要rbuff[ret] = ‘\0’;去除杂数据。
1、水平触发可以一次recv,边沿触发需要用循环来recv;
2、水平触发可以使用阻塞模式,边沿模式不能
3、两者性能差异非常小,一般小数据使用水平触发LT,大数据使用边沿触发ET
4、listen fd最好使用水平触发,尽量不要边沿触发
5、当当recv的buffer小于接受的数据时:
(1)水平触发是只要有数据就一直触发,直到数据读完;
(2)边沿触发是来一次连接触发一次,如果接受数据的buffer不够大,则数据会保留在缓冲区,下次触发继续从缓冲区读出来;
6、一般,水平触发只需要一个recv,边沿触发需要搭配while从缓冲区读完数据
默认是水平触发模式,在事件中设置中 | EPOLLET 就可以设置边沿触发,不设置则默认是水平触发。
例如:
ev.events=EPOLL_IN | EPOLLET
我们需要注册,内核才会有事件来的时候通知进程。比如生活中要退一个快递,那么我们需要注册一个快递公司的账户,然后发送一个退快递请求时快递公司才能找到你并取快递。
epoll会循环拷贝红黑树结构体中的双向链表节点,读取节点数据,直到没有事件。
只要缓冲区有空间就返回可读、可写,不管空间多少。比如缓冲区是1024,但是有1023有数据了,这种极端条件也会返回可读、可写。
发送给客户端数据很大的时候(大于内核缓冲区),就可能出现send不全,客户端recv不全,最好用EPOLLOUT单独处理发送数据事件。
本文介绍了网络IO模型,引入了epoll作为Linux系统中高性能网络编程的核心工具。通过分析epoll的特点与优势,并给出使用epoll的注意事项和实践技巧,该文章为读者提供了宝贵的指导。通过掌握这些知识,读者能够构建高效、可扩展和稳定的网络应用,提供出色的用户体验。
本文介绍了网络IO模型,引入了epoll作为Linux系统中高性能网络编程的核心工具。通过分析epoll的特点与优势,并给出使用epoll的注意事项和实践技巧,该文章为读者提供了宝贵的指导。