# I/O 多路复用 (转接)
- 处理并发,
- 多进程 / 多线程并发,
accept检测客户端连接请求和子线程和建立连接的客户端通信,都会发送阻塞,有新客户端连接/ 有数据到达才会解除阻塞。 - IO 多路转接并发
委托
内核帮助检测文件描述符的状态(通信和监听两类),
在通信过程中监听 (连接请求信息) 和通信 (读和写) 都会放到读写缓冲区中,当调用accept就会检测监听中是否有连接请求,如果没有则会一直阻塞。通信的读写缓冲区也是阻塞的。当只有一个线程时,监听和两个通信都会处于阻塞状态,相互冲突,
基于内核可以同时监听多个文件描述符的读写缓冲区和是否有剩余空间,当内核帮我们完成所有读写缓冲区,则不会再进入阻塞。文件描述符是线性顺序执行,若同时则仍需要多线程。select跨平台poll和epoll用于linux平台
IO 多路复用技术的最大优势就是系统开销小,系统不必创建进程/线程,也不必维护这些进程 / 线程,从而较少系统的开销。
# select
跨平台通过此函数委托内核检测若干个文件描述符状态 (读写缓存区状态)
委托检测的文件描述符被遍历检测完毕之后,已就绪的满足条件的文件描述符会通过select()参数传出,根据参数分情况依次处理.fd_set类型标识文件描述符集合,类型有 128 个字节,1024个标志位
sizeof(fd_set) = 128 字节 * 8 = 1024 bit // int [32] |
#include <sys/select.h> | |
// 不用的微秒一定要初始化 | |
struct timeval { | |
time_t tv_sec; /* seconds */ | |
suseconds_t tv_usec; /* microseconds */ | |
}; | |
int select(int nfds, fd_set *readfds, fd_set *writefds, | |
fd_set *exceptfds, struct timeval * timeout); |
nfds: 委托内核检测的三个集合中最大文件描述符+1, 在Windows中此参数无效为-1传入传出参数readfds: 内核检测此集合文件描述符对应读缓冲区writefds: 对应写缓存区不需要可以指定NULL
*exceptfds: 检测是否有异常状态,不需要可以传入NULL超时参数
*timeout:超时时长,用来强制解除select()函数阻塞- NULL :函数检测不到就绪文件就一直阻塞
- 等待固定时间:指定时间后强制解除阻塞,函数返回 0
- 不等待:函数不阻塞,参数对应结构体阻塞为 0
函数返回值
大于0:成功,返回集合中已就绪文件描述符总数等于-1:函数调用失败等于0: :超时,没有检测到就绪文件描述符
# fd_set 类型参数
如果
fd_set标志为0不检测此文件描述符状态,为1检测文件描述符,内核在读集合的过程中,若被检测的文件描述符没有数据,修改此文件描述符对应标志位为 0,若有数据,标志位不变,仍为 1。当select函数被解除阻塞之后,被内核修改过的读集合通过参数传出,此时集合中只要标志位为1,那么它对应的文件描述符肯定就是就绪的。基于此文件描述符和客户端建立新连接或者通信。
// 将文件描述符 fd 从集合 set 中删除,将 fd 对应的标志位设置为 0 | |
void FD_CLR(int fd,fd_set* set); | |
// 判断文件描述符 fd 是否在 set 集合中, 读 fd 对应的标志位是 0 还是 1 | |
int FD_ISSET(int fd,fd_set *set); | |
// 将文件描述符 fd 添加到 set 集合中 ,将 fd 对应的标志位设置为 1 | |
void FD_SET(int fd,fd_set* set); | |
// 将 set 集合中,所有文件描述符对应的标志位设置为 0 | |
void FD_ZERO(fd_Set* set); |
# 并发处理流程
- 创建监听套接字
fd=socket bind绑定本地IP和端口listen设置套接字监听- 创建文件描述符集合
fd_set,用于存储需要检测事件的所有文件描述符FD_ZERO初始化FD_SET将监听的文件描述符放入检测的读集合中
- 循环调用
select, 对所有文件描述符进行检测 select 解除阻塞返回,得到内核传出满足条件的文件描述符集合- 通过
FD_ISSET判断集合中标志位是否为1 - 若是监听的文件描述符调用
accept和客户端建立连接,将得到的通信的文件描述符,通过FD_SET放入到检测集合中 - 若果是
通信的文件描述符,调用通信函数和客户端通信。如果客户端和服务器断开了连接,使用FD_CLR将文件描述符从检测集合中删除,若果没有断开连接,正常通信即可。
- 通过
- 重复上一步
# 代码测试
# 服务端
#include <stdio.h> | |
#include <ctype.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <arpa/inet.h> | |
#include <sys/select.h> | |
#include <pthread.h> | |
#include <sys/select.h> | |
// 添加互斥锁 | |
pthread_mutex_t mutex; // 针对读,和最大设置互斥 | |
// 存储文件描述符 传递给子线程的数据 | |
typedef struct fdinfo | |
{ | |
int fd; // 文件描述符可监听 / 读写 | |
int *maxfd; // 最大文件描述符 | |
fd_set* rdset; // 文件读地址 | |
}FDInfo; | |
// 子进程函数 | |
void* acceptConn(void* arg) | |
{ | |
printf("子线程线程ID:%ld\n",pthread_self()); | |
FDInfo* info = (FDInfo*)arg; | |
// 接受连接请求,此调用不阻塞 | |
struct sockaddr_in cliaddr; | |
int clilen = sizeof(cliaddr); | |
int cfd = accept(info->fd,(struct sockaddr*)&cliaddr,&clilen); | |
// 得到有效的文件描述符 | |
// 通信的文件描述符添加到读集合 | |
printf("连接的客户端端口为:%d\n",cliaddr.sin_port); | |
// 在下一轮 select 检测的时候,就能得到缓冲区的状态 | |
pthread_mutex_lock(&mutex); | |
FD_SET(cfd,info->rdset); // 添加文件描述符 | |
// 重置最大文件描述符 | |
*info->maxfd = cfd > *info->maxfd ? cfd:*info->maxfd; | |
pthread_mutex_unlock(&mutex); | |
free(info); | |
return NULL; | |
} | |
// 发送与接受数据子线程 | |
void* communication(void* arg) | |
{ | |
printf("子线程线程ID:%ld\n",pthread_self()); | |
FDInfo* info = (FDInfo*)arg; | |
// 接受数据 | |
char buf[10] = {0}; | |
// 一次只能接受 10 个字节,客户端一次发送 100 个字节 | |
// 一次接受不玩,文件描述符对应缓冲区还有数据 | |
// 下一轮 select 检测的时候,内核还会标记这个文件描述符再读一吃 | |
// 循环一直次序,直到缓冲区数据被读完为止 | |
int len = read(info->fd,buf,sizeof(buf)); | |
if(len ==-1) | |
{ | |
perror("recv error"); | |
free(info); | |
return NULL; | |
} | |
else if(len == 0 ) | |
{ | |
printf("客户端关闭了连接....\n"); | |
// 将检测的文件描述符从读集合中删除 | |
pthread_mutex_lock(&mutex); | |
FD_CLR(info->fd,info->rdset); | |
pthread_mutex_unlock(&mutex); | |
close(info->fd); | |
free(info); | |
return NULL; | |
} | |
else if(len > 0) | |
{ | |
printf("read buf = %s\n",buf); | |
for(int i =0;i<len;++i) | |
{ | |
buf[i] = toupper(buf[i]); | |
} | |
printf("After buf = %s\n",buf); | |
// 收到了数据 | |
// 发送数据 | |
int ret = write(info->fd,buf,strlen(buf)+1); | |
if(ret == -1) | |
{ | |
perror("send error"); | |
} | |
} | |
else{ | |
// 异常 | |
perror("read"); | |
} | |
free(info); | |
return NULL; | |
} | |
int main(void) | |
{ | |
// 初始化互斥锁 | |
pthread_mutex_init(&mutex,NULL); | |
//1,创建监听 | |
int lfd = socket(AF_INET,SOCK_STREAM,0); | |
//2,绑定端口与 IP | |
struct sockaddr_in addr; | |
addr.sin_family = AF_INET; | |
addr.sin_port = htons(9999); //htons 将主机字节序转换为网络字节序 | |
addr.sin_addr.s_addr = INADDR_ANY; | |
bind(lfd, (struct sockaddr*)&addr, sizeof(addr)); | |
// 3,设置监听 | |
listen(lfd,128); | |
// 4 | |
// 将监听的 fd 状态委托给内核 | |
int maxfd = lfd; | |
// 初始化检测的读集合 | |
fd_set rdset; | |
fd_set rdtemp; | |
// 清零 | |
FD_ZERO(&rdset); | |
// 将监听的 lfd 舍之道检测的读集合中 | |
FD_SET(lfd,&rdset); | |
// 通过 select 委托内核检测读集合中的文件描述符,检测 read 缓冲区有没有数据 | |
// 若有数据 select 解除阻塞返回 | |
// 应该让内核持续检测 | |
while(1) | |
{ | |
pthread_mutex_lock(&mutex); | |
// 默认阻塞,rdset 中是委托内核检测中的所有文件描述符 | |
rdtemp = rdset; // 防止 select 对内核数据的修改 | |
pthread_mutex_unlock(&mutex); | |
int num = select(maxfd+1,&rdtemp,NULL,NULL,NULL); | |
//rdset 中的数据被内核改写了,只保留了发送变化的文件标志位为 1,没变化的改为 0 | |
// 只要 rdset 中 fd 对应的标志位为 1,-> 缓存区有数据 | |
// 判断有没有新连接,是不是在监听的读集合里 | |
if(FD_ISSET(lfd,&rdtemp)) // 判断文件描述符属于哪一类 | |
{ | |
// 创建子线程 | |
pthread_t tid; | |
FDInfo* info = (FDInfo*)malloc(sizeof(FDInfo)); | |
info->fd=lfd; | |
info->maxfd = &maxfd; | |
info->rdset = &rdset; | |
pthread_create(&tid,NULL,acceptConn,info); | |
pthread_detach(tid); // 线程脱离 | |
} | |
// 没有新连接,通信 | |
for(int i =0;i<maxfd+1;++i) | |
{ | |
// 判断从监听的文件描述符之后到 maxfd 这个范围内的文件描述符是否有缓冲区数据 | |
if(i!=lfd &&FD_ISSET(i,&rdtemp)) // 判断是不是通信文件描述符 | |
{ | |
// 创建子线程接受数据并发送数据 | |
// 创建子线程 | |
pthread_t tid; | |
FDInfo* info = (FDInfo*)malloc(sizeof(FDInfo)); | |
info->fd = i; | |
info->rdset = &rdset; // 传递过去的原始读集合 | |
pthread_create(&tid,NULL,communication,info); | |
pthread_detach(tid); // 线程脱离 | |
} | |
} | |
} | |
// 销毁互斥锁 | |
pthread_mutex_destroy(&mutex); | |
close(lfd); | |
return 0; | |
} |
# 客户端
#include <stdio.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <arpa/inet.h> | |
int main() | |
{ | |
// 1. 创建用于通信的套接字 | |
int fd = socket(AF_INET, SOCK_STREAM, 0); | |
if(fd == -1) | |
{ | |
perror("socket"); | |
exit(0); | |
} | |
// 2. 连接服务器 | |
struct sockaddr_in addr; | |
addr.sin_family = AF_INET; // ipv4 | |
addr.sin_port = htons(9999); // 服务器监听的端口,字节序应该是网络字节序 | |
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr); | |
int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr)); | |
if(ret == -1) | |
{ | |
perror("connect"); | |
exit(0); | |
} | |
// 通信 | |
while(1) | |
{ | |
// 读数据 | |
char recvBuf[1024]; | |
// 写数据 | |
// sprintf(recvBuf, "data: %d\n", i++); | |
printf("输入数据:"); | |
fgets(recvBuf, sizeof(recvBuf), stdin); | |
write(fd, recvBuf, strlen(recvBuf)+1); | |
// 如果客户端没有发送数据,默认阻塞 | |
int len = read(fd, recvBuf, sizeof(recvBuf)); | |
if(len == -1) | |
{ | |
perror("read error"); | |
exit(1); | |
} | |
printf("recv buf: %s\n", recvBuf); | |
sleep(1); | |
} | |
// 释放资源 | |
close(fd); | |
return 0; | |
} |
# poll
内核对应的文件描述符也是以
线性的方式进行轮训,根据描述符状态进行处理,poll和select检测文件描述符集合会在检测过程中频繁进行用户区和内核去的拷贝,会随着文件描述符的增加而线性增大,从而效率降低。select检测文件描述符上限1024,poll没有最大文件描述符限制
# 函数
#include <poll.h> | |
// 每个委托 poll 检测的 fd 都对应这样一个结构体 | |
struct pollfd { | |
int fd; /* 委托内核检测的文件描述符 */ | |
short events; /* 委托内核检测文件描述符的什么事件 */ | |
short revents; /* 文件描述符实际发生的事件 -> 传出 */ | |
}; | |
struct pollfd myfd[100]; | |
int poll(struct pollfd *fds, nfds_t nfds, int timeout); |
# 参数
fds:struct pollfd类型数组events和 ``events` 可选参数
nfds :nfds 参数数组中最后一个有效元素的下标 + 1
timeout : 指定 poll 函数的阻塞时长
失败返回 - 1,成功返回一个大于 0 的整数,表示检测的集合中已就绪的文件描述符的总个数
# 代码服务端
#include <stdio.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <arpa/inet.h> | |
#include <sys/select.h> | |
#include <poll.h> | |
int main() | |
{ | |
// 1. 创建套接字 | |
int lfd = socket(AF_INET, SOCK_STREAM, 0); | |
if(lfd == -1) | |
{ | |
perror("socket"); | |
exit(0); | |
} | |
// 2. 绑定 ip, port | |
struct sockaddr_in addr; | |
addr.sin_port = htons(9999); | |
addr.sin_family = AF_INET; | |
addr.sin_addr.s_addr = INADDR_ANY; | |
int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr)); | |
if(ret == -1) | |
{ | |
perror("bind"); | |
exit(0); | |
} | |
// 3. 监听 | |
ret = listen(lfd, 100); | |
if(ret == -1) | |
{ | |
perror("listen"); | |
exit(0); | |
} | |
// 4. 等待连接 -> 循环 | |
// 检测 -> 读缓冲区,委托内核去处理 | |
// 数据初始化,创建自定义的文件描述符集 | |
struct pollfd fds[1024]; | |
// 初始化 | |
for(int i=0; i<1024; ++i) | |
{ | |
fds[i].fd = -1; // 初始化为 - 1 | |
fds[i].events = POLLIN; // 数据可读事件 | |
} | |
fds[0].fd = lfd; | |
int maxfd = 0; | |
while(1) | |
{ | |
// 委托内核检测 | |
ret = poll(fds, maxfd+1, -1); | |
if(ret == -1) | |
{ | |
perror("select"); | |
exit(0); | |
} | |
// 检测的度缓冲区有变化 | |
// 有新连接 | |
if(fds[0].revents && POLLIN) | |
{ | |
// 接收连接请求 | |
struct sockaddr_in sockcli; | |
int len = sizeof(sockcli); | |
// 这个 accept 是不会阻塞的 | |
int connfd = accept(lfd, (struct sockaddr*)&sockcli, &len); | |
// 委托内核检测 connfd 的读缓冲区 | |
int i; | |
for(i=0; i<1024; ++i) | |
{ | |
if(fds[i].fd == -1) // 选择一个存储 | |
{ | |
fds[i].fd = connfd; // 连接的文件描述符 | |
break; | |
} | |
} | |
maxfd = i > maxfd ? i : maxfd; // 更新最大值 | |
} | |
// 通信,有客户端发送数据过来 | |
for(int i=1; i<=maxfd; ++i) | |
{ | |
// 如果在集合中,说明读缓冲区有数据 | |
if(fds[i].revents & POLLIN) | |
{ | |
char buf[128]; | |
int ret = read(fds[i].fd, buf, sizeof(buf)); | |
if(ret == -1) | |
{ | |
perror("read"); | |
exit(0); | |
} | |
else if(ret == 0) | |
{ | |
printf("对方已经关闭了连接...\n"); | |
close(fds[i].fd); | |
fds[i].fd = -1; | |
} | |
else | |
{ | |
printf("客户端say: %s\n", buf); | |
write(fds[i].fd, buf, strlen(buf)+1); | |
} | |
} | |
} | |
} | |
close(lfd); | |
return 0; | |
} |
# epoll
全称
eventpoll,是linux内核实现IO多路转接/复用的一个实现。IO 多路转接在一个操作里同时监听多个输入输出源,在其中一个或多个输入输出源可用的时候返回,然后对其进行读写操作。
select / pol相对低效的原因之一是添加/维护待检测任务和阻塞进程 / 线程 两个步骤合二为一,在每次调用 select 都需要这两步,然而在大多数应用场景中,需要监视的socket个数相对固定,epoll将两个操作分开,epoll_ctl维护等待队列,在调用epoll_wait阻塞进程,进而提高epoll的效率
epoll基于红黑树来管理待检测集合。epoll基于回调机制,而select`和`poll`每次都会`线性扫描整个待检测集合,集合越大速度越慢。- 需要对 select 和 poll 的返回的
集合进行判断才能知道那些文件描述符是就绪的,通过epoll可以直接得到已就绪的文件描述符集合,无需再次检测。
# 操作函数
#include <sys/epoll.h> | |
// 创建 | |
// 创建 epoll 实例,通过一个红黑树管理待检测集合,在 lnnux 内核 2.6.8 以后,只需指定大于 0 的数组 | |
int epoll_create(int size); | |
// 返回值 -1 :失败。 成功:返回一个有效的文件描述符,通过此刻访问创建的 epoll 实例 | |
// 管理 | |
// 联合体,多个变量共用同一块内存 | |
typedef union epoll_data { | |
void *ptr; | |
int fd; // 通常情况下使用这个成员,和 epoll_ctl 的第三个参数相同即可 | |
uint32_t u32; | |
uint64_t u64; | |
} epoll_data_t; | |
struct epoll_event { | |
uint32_t events; /* Epoll events */ | |
epoll_data_t data; // 用户数据变量,使用 fd 值,用于存储待检测的文件描述符值,在调用 epoll_wait 函数时值被传出 | |
}; | |
//evetns:epoll 事件,修饰第三个 fd 对应文件描述符,检测指定对应的文件描述符的什么时间 | |
// EPOLLIN:读事件,接收数据,检测读缓冲区,如果有数据该文件描述符就绪 | |
// EPOLLOUT:写事件,发送数据,检测写缓冲区,如果可写该文件描述符就绪 | |
// EPOLLERR:异常事件 | |
// EPOLLET :设置水平或边缘模式 | |
// 管理红黑树上的文件描述符 (添加,修改,删除) | |
int epoll_ctl(int epfd,int op,int fd,struct epool_event* event); | |
//epfd: epoll_create 函数的返回值 | |
//op :枚举值,控制通过该函数执行什么操作 | |
// EPOLL_CTL_ADD:往 epoll 模型中添加新的节点 | |
// EPOLL_CTL_MOD:修改 epoll 模型中已经存在的节点 | |
// EPOLL_CTL_DEL:删除 epoll 模型中的指定的节点 | |
//fd : 文件描述符,即要添加 / 修改 / 删除的文件描述符 | |
// 成功 返回 0 | |
// 失败 返回 - 1 | |
// 检测 epoll 树中是否有就绪的文件描述符 | |
int epoll_wait(int epfd,struct epoll_event *event,int maxevents,int timeout); | |
//epfd : epoll_ceate 函数返回值 | |
//events , 传出参数,结构体数组地址,存储已就绪的文件描述符信息 | |
//maxevents:结构体数组的容量 (元素个数) | |
//timeout :如果 epoll 没有已就绪的文件描述符,该函数的阻塞时长,单位 ms 毫秒 | |
// 0 函数不阻塞,epoll 实例中没有就绪的文件描述符,函数被调用后直接返回,大于 0,若 epoll 实例中没有已就绪的文件描述符,函数阻塞对应的毫秒数再返回,-1 函数一直阻塞,直到 epoll 实例中有已就绪的文件描述符之后才解除阻塞 | |
// 返回值 | |
// 等于 0,函数阻塞被强制解除,没有检测到满足条件的文件描述符 | |
// 大于 0 检测到的已就绪的文件描述符的总个数 | |
// 失败 返回 -1 |
# 操作步骤
epoll创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0); |
- 设置
端口复用(可选)
int opt = 1; | |
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); |
IP和端口和监听的套接字进行绑定
int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); |
- 给监听的套接字
设置监听
listen(lfd, 128); |
- 创建
epoll实例对象
int epfd = epoll_create(100); // 一定要大于 0 的数 |
- 将用于监听的套接字
添加到epoll实例中
struct epoll_event ev; | |
ev.events = EPOLLIN; // 检测 lfd 读读缓冲区是否有数据 | |
ev.data.fd = lfd; // 键文件描述符添加到实例中 | |
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev); |
- 检测到
epoll实例中文件描述符是否已就绪,并将这些已就绪的文件描述符进行处理
int num = epoll_wait(epfd, evs, size, -1); |
* 如果是监听,和客户端建立`连接`,将得到的`文件描述符`添加到`epoll实例`中
int cfd = accept(curfd, NULL, NULL); | |
ev.events = EPOLLIN; | |
ev.data.fd = cfd; | |
// 新得到的文件描述符添加到 epoll 模型中,下一轮循环的时候就可以被检测了 | |
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev); |
- 若是通信,和对应的客户端
通信,如果连接已断开,将该文件描述符从epoll实例中删除
int len = recv(curfd, buf, sizeof(buf), 0); | |
if(len == 0) | |
{ | |
// 将这个文件描述符从 epoll 模型中删除 | |
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL); | |
close(curfd); | |
} | |
else if(len > 0) | |
{ | |
send(curfd, buf, len, 0); | |
} |
* 重复最后一步
# epoll工作模式
# LT水平模式
level triggered同时支持block和no-block socket,内核通知使用者哪些文件描述符已经就绪,之后就可以对这些已就绪的文件描述符进行IO操作,若不进行任何操作,内核还是会继续通知使用者特点:
- 读事件:如果文件描述符对应的读缓存区还有数据,读事件就会被触发,
epoll_wait解除阻塞。如果接受的数据buf很小,不能全部将缓存区全部数据读出,那么读事件会继续被触发,直到数据被全部读出,如果接受数据的内存相对较大,读数据的效率相对较高 (减少了读数据次数)。读数据是被动的,必须要通过读事件才能知道有数据到达了,因此对于读事件的检测是必须的。- 写事件:如果文件描述符对应的写缓存区可写,写事件就会被触发,
epoll_wait解除阻塞。写事件的触发发生在写数据之前。写数据是主动行为,写缓存去一般情况下都是可写的 (缓存区不满),因此对于些时间的检测不是必须的。
# 示例代码
#include <stdio.h> | |
#include <ctype.h> | |
#include <unistd.h> | |
#include <stdlib.h> | |
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <string.h> | |
#include <arpa/inet.h> | |
#include <sys/socket.h> | |
#include <sys/epoll.h> | |
// server | |
int main(int argc, const char* argv[]) | |
{ | |
// 创建监听的套接字 | |
int lfd = socket(AF_INET, SOCK_STREAM, 0); | |
if(lfd == -1) | |
{ | |
perror("socket error"); | |
exit(1); | |
} | |
// 绑定 | |
struct sockaddr_in serv_addr; | |
memset(&serv_addr, 0, sizeof(serv_addr)); | |
serv_addr.sin_family = AF_INET; | |
serv_addr.sin_port = htons(9999); | |
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP | |
// 设置端口复用 | |
int opt = 1; | |
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); | |
// 绑定端口 | |
int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); | |
if(ret == -1) | |
{ | |
perror("bind error"); | |
exit(1); | |
} | |
// 监听 | |
ret = listen(lfd, 64); | |
if(ret == -1) | |
{ | |
perror("listen error"); | |
exit(1); | |
} | |
// 现在只有监听的文件描述符 | |
// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的 epoll | |
// 创建一个 epoll 模型 | |
int epfd = epoll_create(100); // 参数为大于 1 即可,并没有实际含义 | |
if(epfd == -1) | |
{ | |
perror("epoll_create"); | |
exit(0); | |
} | |
// 往 epoll 实例中添加需要检测的节点,现在只有监听的文件描述符 | |
struct epoll_event ev; | |
ev.events = EPOLLIN; // 检测 lfd 读读缓冲区是否有数据 | |
ev.data.fd = lfd; | |
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev); | |
if(ret == -1) | |
{ | |
perror("epoll_ctl"); | |
exit(0); | |
} | |
struct epoll_event evs[1024]; | |
int size = sizeof(evs) / sizeof(struct epoll_event); | |
// 持续检测 | |
while(1) | |
{ | |
// 调用一次,检测一次 | |
int num = epoll_wait(epfd, evs, size, -1); | |
printf("num = %d \n",num); | |
for(int i=0; i<num; ++i) | |
{ | |
// 取出当前的文件描述符 | |
int curfd = evs[i].data.fd; | |
// 判断这个文件描述符是不是用于监听的 | |
if(curfd == lfd) | |
{ | |
// 建立新的连接 | |
int cfd = accept(curfd, NULL, NULL); | |
// 新得到的文件描述符添加到 epoll 模型中,下一轮循环的时候就可以被检测了 | |
ev.events = EPOLLIN; // 读缓冲区是否有数据 | |
ev.data.fd = cfd; | |
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev); | |
if(ret == -1) | |
{ | |
perror("epoll_ctl-accept"); | |
exit(0); | |
} | |
} | |
else | |
{ | |
// 处理通信的文件描述符 | |
// 接收数据 | |
char buf[1024]; | |
memset(buf, 0, sizeof(buf)); | |
int len = recv(curfd, buf, sizeof(buf), 0); | |
if(len == 0) | |
{ | |
printf("客户端已经断开了连接\n"); | |
// 将这个文件描述符从 epoll 模型中删除 | |
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL); | |
close(curfd); | |
} | |
else if(len > 0) | |
{ | |
printf("客户端say: %s\n", buf); | |
send(curfd, buf, len, 0); | |
} | |
else | |
{ | |
perror("recv"); | |
exit(0); | |
} | |
} | |
} | |
} | |
return 0; | |
} |
# ET 边沿模式
edge-triggered,只支持no-block socket。当文件描述符从未就绪变为就绪时,内核会通过epoll通知使用者,然后它会假设使用者知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知 (only one)。ET 模式在很大程度上减少epoll事件被重复触发的次数,ET效率>LT模式特点:
- 读事件:当读缓存区有新数据进入,读事件
被触发一次,没有新数据不会触发该事件。如果数据没有被全部读走,并且没有新数据进入,读事件不会再次触发,只通知一次,若有数据被全部读走或者只读走一部分,此时有新数据进入,读事件被触发,只通知一次- 写事件:当写缓冲区状态可写,写事件智慧被触发一次。写缓存区从不满到被写满,从满到不满,写事件都
只会被触发一次、epoll 的边沿模式下 epoll_wait 检测到文件描述符有新事件才会通知,如果不是新的事件就不通知,
通知的次数比水平模式少,效率比水平模式高。
# ET 模式的设置
struct epoll_event ev; | |
ev.events = EPOLLIN | EPOLLET; // 设置边沿模式 EPOLLET | |
// 示例代码 线程安全的,不需要考虑加锁问题 | |
int num = epoll_wait(epfd, evs, size, -1); | |
for(int i=0; i<num; ++i) | |
{ | |
// 取出当前的文件描述符 | |
int curfd = evs[i].data.fd; | |
// 判断这个文件描述符是不是用于监听的 | |
if(curfd == lfd) | |
{ | |
// 建立新的连接 | |
int cfd = accept(curfd, NULL, NULL); | |
// 新得到的文件描述符添加到 epoll 模型中,下一轮循环的时候就可以被检测了 | |
// 读缓冲区是否有数据,并且将文件描述符设置为边沿模式 | |
struct epoll_event ev; | |
ev.events = EPOLLIN | EPOLLET; | |
ev.data.fd = cfd; | |
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev); | |
if(ret == -1) | |
{ | |
perror("epoll_ctl-accept"); | |
exit(0); | |
} | |
} | |
} |
# 设置非阻塞
写事件一般写缓冲区有足够的空间,不需要进行检测。对于读事件的茶法就必须检测,使用 epoll 的边缘检测进行读事件检测,有新数据只会通知一次,就必须保证得到通知后将数据全部从读缓冲区中读出。
- 方式一:准备特大内存,用于存储缓冲区 (内存无法定义,弊端大)
- 方式二:循环接受数据:将套接字设置为非阻塞模式
因为套接字默认是阻塞的,当读缓冲区的数据被读完之后,read/recv 函数就被阻塞了,当前进程 / 线程就无法处理其他操作 ----- 将套接字默认修改为非阻塞
#include <fcntl.h> | |
// 设置完成之后,读写都变成了非阻塞模式 | |
int flag = fcntl(cfd, F_GETFL); // 取出 cfd 的 flag 属性 | |
// 将变量 flag 的标志位与 O_NONBLOCK 进行按位或操作,并将结果存储回 flag 中 | |
flag |= O_NONBLOCK; // 设置 flag 为非阻塞属性 位运算符 |(按位或)将 O_NONBLOCK 与变量的标志位进行按位或操作,将 O_NONBLOCK 标志位的值设置到变量的标志位中。 | |
fcntl(cfd, F_SETFL, flag); //flag 属性被舍之道 cfd 当中 | |
// 循环接受数据 | |
int len = 0; | |
while((len = recv(curfd, buf, sizeof(buf), 0)) > 0) | |
{ | |
// 数据处理... | |
// 非阻塞模式下 recv () /read () 函数返回值 len == -1 | |
int len = recv(curfd, buf, sizeof(buf), 0); | |
if(len == -1) | |
{ | |
// 对于数据读完,仍然读取的保存,就行修改 | |
if(errno == EAGAIN) | |
{ | |
printf("数据读完了...\n"); | |
} | |
else | |
{ | |
perror("recv"); | |
exit(0); | |
} | |
} |
# 多线程 epoll
#include <stdio.h> | |
#include <ctype.h> | |
#include <unistd.h> | |
#include <stdlib.h> | |
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <string.h> | |
#include <arpa/inet.h> | |
#include <sys/socket.h> | |
#include <sys/epoll.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
#include <pthread.h> | |
// 结构体存储要传递的数据 | |
typedef struct socketinfo | |
{ | |
int fd; // 在子线程要操作的文件描述符可监听 / 读写 | |
int epfd; // 要操作的 epoll 树实例 | |
}SocketInfo; | |
// 用于连接 | |
void* acceptConn(void* arg) | |
{ | |
printf("子线程acceptConn线程ID:%ld\n",pthread_self()); | |
SocketInfo* info = (SocketInfo*)arg; | |
// 建立新的连接 | |
int cfd = accept(info->fd, NULL, NULL); | |
// 将文件描述符设置为非阻塞 | |
// 得到文件描述符的属性 | |
int flag = fcntl(cfd, F_GETFL); | |
flag |= O_NONBLOCK; | |
fcntl(cfd, F_SETFL, flag); | |
// 新得到的文件描述符添加到 epoll 模型中,下一轮循环的时候就可以被检测了 | |
// 通信的文件描述符检测读缓冲区数据的时候设置为边沿模式 | |
struct epoll_event ev; | |
ev.events = EPOLLIN | EPOLLET; // 读缓冲区是否有数据 | |
ev.data.fd = cfd; | |
int ret = epoll_ctl(info->epfd, EPOLL_CTL_ADD, cfd, &ev); | |
if(ret == -1) | |
{ | |
perror("epoll_ctl-accept"); | |
free(info); | |
exit(0); | |
} | |
free(info); | |
return NULL; | |
} | |
// 用于通信函数 | |
void* communication(void* arg) | |
{ | |
printf("子线程communication线程ID:%ld\n",pthread_self()); | |
SocketInfo* info = (SocketInfo*)arg; | |
// 处理通信的文件描述符 | |
// 接收数据 | |
char buf[5]; | |
char temp[1024]; | |
bzero(temp,sizeof(temp)); // 数据后面加入 /0 | |
memset(buf, 0, sizeof(buf)); | |
// 循环读数据 | |
while(1) | |
{ | |
int len = recv(info->fd, buf, sizeof(buf), 0); | |
if(len == 0) | |
{ | |
// 非阻塞模式下和阻塞模式是一样的 => 判断对方是否断开连接 | |
printf("客户端断开了连接...\n"); | |
// 将这个文件描述符从 epoll 模型中删除 // 线程安全的函数,不需要进行线程同步 | |
epoll_ctl(info->epfd, EPOLL_CTL_DEL, info->fd, NULL); | |
close(info->fd); | |
break; | |
} | |
else if(len > 0) | |
{ | |
// 通信 | |
for(int i=0;i<len;++i) | |
{ | |
buf[i]=toupper(buf[i]); | |
} | |
strncat(temp+strlen(temp),buf,len); | |
// 接收的数据打印到终端 | |
// 修改到读取完数据之后一起发送 | |
// write(STDOUT_FILENO, buf, len); | |
//// 发送数据 | |
// send(info->fd, buf, len, 0); | |
} | |
else | |
{ | |
// len == -1 | |
if(errno == EAGAIN) // 读缓存区已经没有了数据 | |
{ | |
printf("数据读完了...\n"); | |
send(info->fd,temp,strlen(temp)+1,0); | |
break; | |
} | |
else | |
{ | |
perror("recv"); | |
break; | |
} | |
} | |
} | |
free(info); | |
return NULL; | |
} | |
// server | |
int main(int argc, const char* argv[]) | |
{ | |
// 创建监听的套接字 | |
int lfd = socket(AF_INET, SOCK_STREAM, 0); | |
if(lfd == -1) | |
{ | |
perror("socket error"); | |
exit(1); | |
} | |
// 绑定 | |
struct sockaddr_in serv_addr; | |
memset(&serv_addr, 0, sizeof(serv_addr)); | |
serv_addr.sin_family = AF_INET; | |
serv_addr.sin_port = htons(9999); | |
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 本地多有的IP | |
// 127.0.0.1 | |
// inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr.s_addr); | |
// 设置端口复用 | |
int opt = 1; | |
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); | |
// 绑定端口 | |
int ret = bind(lfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); | |
if(ret == -1) | |
{ | |
perror("bind error"); | |
exit(1); | |
} | |
// 监听 | |
ret = listen(lfd, 64); | |
if(ret == -1) | |
{ | |
perror("listen error"); | |
exit(1); | |
} | |
// 现在只有监听的文件描述符 | |
// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的 epoll | |
// 创建一个 epoll 模型 | |
int epfd = epoll_create(100); | |
if(epfd == -1) | |
{ | |
perror("epoll_create"); | |
exit(0); | |
} | |
// 往 epoll 实例中添加需要检测的节点,现在只有监听的文件描述符 | |
struct epoll_event ev; | |
ev.events = EPOLLIN; // 检测 lfd 读读缓冲区是否有数据 | |
ev.data.fd = lfd; | |
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev); | |
if(ret == -1) | |
{ | |
perror("epoll_ctl"); | |
exit(0); | |
} | |
struct epoll_event evs[1024]; | |
int size = sizeof(evs) / sizeof(struct epoll_event); | |
// 持续检测 | |
while(1) | |
{ | |
// 调用一次,检测一次 | |
int num = epoll_wait(epfd, evs, size, -1); | |
printf("==== num: %d\n", num); | |
pthread_t tid; | |
for(int i=0; i<num; ++i) | |
{ | |
// 取出当前的文件描述符 | |
SocketInfo* info=(SocketInfo*)malloc(sizeof(SocketInfo)); | |
int curfd = evs[i].data.fd; | |
info->fd = curfd; | |
info->epfd = epfd; | |
// 判断这个文件描述符是不是用于监听的 | |
if(curfd == lfd) | |
{ | |
pthread_create(&tid,NULL,acceptConn,info); | |
pthread_detach(&tid); // 线程分离 | |
} | |
else | |
{ | |
pthread_create(&tid,NULL,communication,info); | |
pthread_detach(&tid); // 线程分离 | |
} | |
} | |
} | |
return 0; | |
} |
