总述
????????在这个实验里,尝试对 IO 多路复用 select 接口进行使用。
????????在?【网络编程】TCP socket 单机通信实验?里实现了一个 client 线程和一个 server 线程进行 socket 通信的 demo。对于 server 线程来说,既要监听、接受连接请求,又要收、发、处理与客户端交互的数据。一个 server 线程在同一时间只能应付一个 client,在处理一个 client 的数据的时候就不能接受另一个 client 的连接请求。对于多客户端的场景,这样的 server 基本没有实用价值。
?????????一种解决问题的思路是:由一个线程/进程监听连接请求,由其他多个线程/进程分别对接每一个 client,这样就能让 server 同时处理与多个 client 的数据交互、及时接受新的 client 的连接请求:
????????但是,如果连接请求非常多,server 也负担不起一个 client 一个处理线程带来的资源消耗,线程切换也需要成本。而且实际上,大多数的时候,client 和 server 之间都没有数据交互。针对这一点,可以考虑用 IO 多路复用(select、poll、epoll):IO 多路复用可以实现一个线程监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序,交出 cpu。多路指的是多路网络连接,复用指的是复用同一个线程。本质上, IO 多路复用是把 accept、recv、send 阻塞等待事件就绪的工作集中起来交给内核管理。
主体流程结构
????????在?【网络编程】TCP socket 单机通信实验?的基础上,把客户端线程增加到 3 个,把单个服务端线程拆成一个监听线程和一个工作线程。在监听线程里监听端口、和 client 建立连接,把连接句柄传给工作线程;在工作线程里用 select 监测所有已连接的句柄是否可读,如果可读则进行数据的收发处理。
实验结果
????????select 接口可以监测到 3 个连接句柄上的可读事件,并把事件交给应用层,让应用程序完成后续的数据读写交互:
select 接口
函数原型
int select (int maxfdp,
fd_set *readset,
fd_set *writeset,
fd_set *exceptest,
struct timeval *timeout);
参数?maxfdp
????????第一个参数 maxfdp?是所有要监测的文件描述符的最大值加 1,注意:不是要监测的文件描述符的个数加 1。文件描述符本质上就是一个整数,值超过 maxfdp 的文件描述符不会被 select 监测。
如果给 maxfdp 传入 3 + 1(要监测的文件描述符个数加 1):
select 不会对 3 个客户端的连接句柄进行监测:
3 个客户端发来的数据累积在服务端的接收窗口中,不会被服务端的应用层读取:
参数 指向 fd_set 的指针
????????第二、三、四个参数都是指向 fd_set 的指针。第二个参数?readset 对应可读事件的描述符集合;第三个参数?writeset 对应可写事件的描述符集合;第四个参数?exceptest 对应异常事件的描述符集合。
typedef struct {
unsigned long fds_bits[__FDSET_LONGS];
} fd_set;
????????fd_set 实质上是一个无符号长整形的数组。数组的大小 __FDSET_LONGS 是一个宏定义,值是 1024/(8 * sizeof(unsigned long)),数组实际占用空间也就是?1024 / 8 = 128 Byte = 1024 bit。
????????fd_set 以位图的形式表示文件描述符集合:一个比特位对应一个文件描述符,用对应位置的比特位的 0 或 1 来表示是否对文件描述符进行监测或者文件描述符是否就绪。
????????因为这样的数组大小、这样的表示方式,导致 select 最多就只能监测 0~1023 的文件描述符,总计 1024 个 —— 是存放文件描述符的数据结构 fd_set?限制了 select 最多只能监听 1024 个文件描述符。
????????select 提供了一些?宏?可以用来操作文件描述符集合,实际上也就是更改对应比特位的 0、1 值:
FD_ZERO(fd_set* fds);
清空文件描述符集合 fds,也就是把所有比特位都置为 0
FD_SET(int fd, fd_set* fds);
把文件描述符 fd 加入集合 fds,也就是把对应的比特位置为 1
FD_ISSET(int fd, fd_set* fds);
判断文件描述符 fd 是否在集合 fds 中,也就是判断对应的比特位是否为 1
FD_CLR(int fd, fd_set* fds);
把文件描述符?fd?从集合?fds?中删除,也就是把对应比特位置为 0
????????在使用方式上,先调用 FD_ZERO 将 fd_set 清零,然后调用 FD_SET 把需要监测的文件描述符加入 fd_set,接着调用函数 select 测试 fd_set 中的所有文件描述符,select 函数返回后,用宏 FD_ISSET 检查某个文件描述符对应的比特位是否仍然为 1,如果为 1 表示有事件就绪,为 0 则反之。
参数 超时时间
????????最后一个参数?timeout 是超时时间,表示等待多长时间之后就放弃等待,传 NULL 表示等待无限长的时间,持续阻塞直到有事件就绪才返回。
????????timeout 指向的?timeval 结构体有秒和微秒两个成员:
struct timeval
{
__time_t tv_sec; /* Seconds. */
__suseconds_t tv_usec; /* Microseconds. */
};
????????使用 timeval 时,直接对成员进行赋值就好,eg.
struct timeval timeout = {0};
timeout.tv_sec = 1;
timeout.tv_usec = 0;
返回值
????????如果 select 执行失败,返回 -1;如果超时,返回 0;如果有描述符就绪,则返回就绪的描述符的个数。
select 缺点
select 有几个主要的缺点,都跟文件描述符集合 fd_set 有关:
- 每次调用 select,都需要把 fd_set 从用户态拷贝到内核态,select 检测到有就绪事件或者超时返回时,又要把 fd_set 再从内核态拷贝回用户态,开销比较大;
- select 监测文件描述符用的是轮询的方法,一遍一遍对文件描述符集合进行线性扫描,检查是否有事件就绪,效率比较低;
- select 最多只能监测 1024 个文件描述符,数量有限;
完整代码实现
头文件
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/syscall.h>
宏定义
#define LOCAL_IP_ADDR "127.0.0.1"
#define SERVER_LISTEN_PORT 5197
#define MAX_LISTEN_EVENTS 16
#define NET_MSG_BUF_LEN 128
#define CLINET_SEND_MSG "Hello Server~"
#define SERVER_SEND_MSG "Hello Client~"
功能函数获取线程 ID
pid_t gettid(void) {
// 需引入头文件 sys/syscall.h
return syscall(SYS_gettid);
}
客户端线程入口函数
void* client(void* param) {
int iRes = 0, iConnFd = 0, iNetMsgLen = 0;
pthread_t thdId = gettid();
char szNetMsg[NET_MSG_BUF_LEN] = {0};
struct sockaddr_in stServAddr;
iConnFd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == iConnFd) {
printf("Client[%u] failed to create socket, err[%s]\n",
thdId, strerror(errno));
return NULL;
}
// 填充目标地址结构体,指定协议族、目标端口、目标主机 IP 地址
stServAddr.sin_family = AF_INET;
stServAddr.sin_port = htons(SERVER_LISTEN_PORT);
stServAddr.sin_addr.s_addr = inet_addr(LOCAL_IP_ADDR);
// 1 参传套接字句柄,2 参传准备连接的目标地址结构体指针,3 参传地址结构体大小
while (1) {
iRes = connect(iConnFd, (struct sockaddr *)&stServAddr,
sizeof(stServAddr));
if (0 != iRes) {
printf("Client[%u] failed to connect to[%s:%u], err[%s]\n",
thdId, LOCAL_IP_ADDR, SERVER_LISTEN_PORT,
strerror(errno));
sleep(2);
continue;
} else {
printf("Client[%u] succeeded to connect to[%s:%u]\n",
thdId, LOCAL_IP_ADDR, SERVER_LISTEN_PORT);
break;
}
}
iNetMsgLen = send(iConnFd, CLINET_SEND_MSG,
strlen(CLINET_SEND_MSG), 0);
if (iNetMsgLen < 0) {
printf("Client[%u] failed to send msg to server, err[%s]\n",
thdId, strerror(errno));
close(iConnFd);
return NULL;
}
iNetMsgLen = recv(iConnFd, szNetMsg, sizeof(szNetMsg), 0);
if (iNetMsgLen < 0) {
printf("Client[%u] failed to read from network, err[%s]\n",
thdId, strerror(errno));
} else {
printf("Client[%u] recv reply[%s]\n", thdId, szNetMsg);
}
close(iConnFd);
return NULL;
}
功能函数处理可读事件
int eventProc(int *piConnFdSet, fd_set *pfsReadSet) {
int iIndex = 0, iConnFd = 0, iNetMsgLen = 0;
char szNetMsg[NET_MSG_BUF_LEN] = {0};
for (iIndex = 0; iIndex < MAX_LISTEN_EVENTS; iIndex++) {
if (!FD_ISSET(piConnFdSet[iIndex], pfsReadSet)) {
continue;
}
// 用临时变量简化代码
iConnFd = piConnFdSet[iIndex];
// 接收 client 消息
iNetMsgLen = recv(iConnFd, szNetMsg, sizeof(szNetMsg), 0);
if (iNetMsgLen < 0) {
printf("Server failed to recv from network, err[%s]\n",
strerror(errno));
close(iConnFd);
return -1;
}
printf("Server recv msg[%s]\n", szNetMsg);
// 答复 client
iNetMsgLen = send(iConnFd, SERVER_SEND_MSG,
strlen(SERVER_SEND_MSG), 0);
if (iNetMsgLen < 0) {
printf("Server failed to reply client, err[%s]\n",
strerror(errno));
close(iConnFd);
return -1;
}
piConnFdSet[iIndex] = 0;
close(iConnFd);
}
return 0;
}
服务端工作线程入口函数
void* serverWork(void* param) {
int iRes = 0, iIndex = 0, iMaxFd = 0, iEventNum = 0;
int *piConnFdSet = (int *)param;
fd_set fsReadSet;
struct timeval timeout = {0};
// 监听
while (1) {
timeout.tv_sec = 1;
timeout.tv_usec = 0;
FD_ZERO(&fsReadSet);
#if 1
iMaxFd = 0;
for (iIndex = 0; iIndex < MAX_LISTEN_EVENTS; iIndex++) {
FD_SET(piConnFdSet[iIndex], &fsReadSet);
iMaxFd = piConnFdSet[iIndex] > iMaxFd ?
piConnFdSet[iIndex] : iMaxFd;
}
#else
iMaxFd = 3;
#endif
iEventNum = select(iMaxFd + 1, &fsReadSet, NULL, NULL, &timeout);
if (-1 == iEventNum) {
printf("Server failed to select event.\n");
break;
}
printf("Server select get [%u] event\n", iEventNum);
if (0 == iEventNum) {
continue;
}
iRes = eventProc(piConnFdSet, &fsReadSet);
if (0 != iRes) {
printf("Server failed to proc event.\n");
break;
}
}
return NULL;
}
服务端监听线程入口函数
void* serverLsn(void* param) {
int iRes = 0, iIndex = 0;
int iLsnFd = 0, iConnFd = 0, iReusePort = 0, iSockAddrLen = 0;
int *paiConnFdSet = (int *)param;
struct sockaddr_in stLsnAddr;
struct sockaddr_in stCliAddr;
// 创建 socket
iLsnFd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == iLsnFd) {
printf("Server failed to create socket, err[%s]\n",
strerror(errno));
return NULL;
}
// 设置端口复用
iReusePort = 1;
iRes = setsockopt(iLsnFd, SOL_SOCKET, SO_REUSEPORT, &iReusePort,
sizeof (iReusePort));
if (-1 == iRes) {
printf("Server failed set reuse attr, err[%s]\n",
strerror(errno));
close(iLsnFd);
return NULL;
}
stLsnAddr.sin_family = AF_INET;
stLsnAddr.sin_port = htons(SERVER_LISTEN_PORT);
stLsnAddr.sin_addr.s_addr = INADDR_ANY;
// 绑定端口
iRes = bind(iLsnFd, (struct sockaddr*)&stLsnAddr,
sizeof(stLsnAddr));
if (-1 == iRes) {
printf("Server failed to bind port[%u], err[%s]\n",
SERVER_LISTEN_PORT, strerror(errno));
close(iLsnFd);
return NULL;
} else {
printf("Server succeeded to bind port[%u], start listen.\n",
SERVER_LISTEN_PORT);
}
// 监听
iRes = listen(iLsnFd, MAX_LISTEN_EVENTS);
if (-1 == iRes) {
printf("Server failed to listen port[%u], err[%s]\n",
SERVER_LISTEN_PORT, strerror(errno));
close(iLsnFd);
return NULL;
}
while (iIndex < MAX_LISTEN_EVENTS) {
iSockAddrLen = sizeof(stCliAddr);
// 1 参传入监听句柄,2 传入地址结构体指针接收客户端地
// 3 参传入地址结构体大小
iConnFd = accept(iLsnFd, (struct sockaddr*)&stCliAddr,
&iSockAddrLen);
if (-1 == iConnFd) {
printf("Server failed to accept connect request, err[%s]\n",
strerror(errno));
break;
} else {
printf("Server accept connect request from[%s:%u]\n",
inet_ntoa(stCliAddr.sin_addr),
ntohs(stCliAddr.sin_port));
paiConnFdSet[iIndex] = iConnFd;
iIndex++;
}
}
close(iLsnFd);
return NULL;
}
主函数
int main() {
// 线程 ID,实质是 unsigned long 类型整数
pthread_t thdServerWork = 101;
pthread_t thdServerLsn = 102;
pthread_t thdClient1 = 1;
pthread_t thdClient2 = 2;
pthread_t thdClient3 = 3;
// 监测是否可读的 socket 文件描述符集合
int aiConnFdSet[MAX_LISTEN_EVENTS] = {0};
// 1 参传线程 ID,2 参传线程属性,
// 3 参指定线程入口函数,4 参指定传给入口函数的参数
pthread_create(&thdServerWork, NULL, serverWork, &aiConnFdSet[0]);
pthread_create(&thdServerLsn, NULL, serverLsn, &aiConnFdSet[0]);
pthread_create(&thdClient1, NULL, client, NULL);
pthread_create(&thdClient2, NULL, client, NULL);
pthread_create(&thdClient3, NULL, client, NULL);
// 1 参传入线程 ID,2 参用于接收线程入口函数的返回值,不需要返回值则置 NULL
pthread_join(thdServerWork, NULL);
pthread_join(thdServerLsn, NULL);
pthread_join(thdClient1, NULL);
pthread_join(thdClient2, NULL);
pthread_join(thdClient3, NULL);
return 0;
}
|