参考
- 《TCP/IP网络编程》 尹圣雨
异步通知I/O模型
异步I/O
同步I/O的缺点:进行I/O的过程中函数无法返回,所以不能执行其他任务
异步I/O是指I/O函数的返回时刻与数据收发的完成时刻不一致,是为了克服同步I/O的缺点而设计的模型。无论数据是否完成交互都返回函数,这就意味着可以执行其他任务,能够更有效地使用CPU
理解异步通知I/O模型
异步通知I/O模型,意为“通知I/O”以异步方式工作
通知I/O指发生了I/O相关的特定情况,即通知输入缓冲收到数据并需要读取,以及输出缓冲为空可以发送数据
典型的通知I/O模型是select方式,select函数是以同步方式进行的,因为I/O相关事件发生的时间点与select函数的返回时间点一致
异步通知I/O模型中函数的返回与I/O状态无关。异步通知I/O中,指定I/O监视对象的函数和实际验证状态变化的函数是相互分离的。因此,指定监视对象后可以离开执行其他任务,最后再回来验证状态变化
实现异步通知I/O模型
异步通知I/O模型的实现方法有:1)WSAEventSelect函数;2)WSAAsyncSelect函数:需要指定Windows句柄以获取发生的事件(UI相关内容)
通知
告知I/O状态变化的操作就是“通知”。I/O的状态变化可以分为不同情况:
- 套接字的状态变化:套接字的I/O状态变化
- 发生套接字相关事件:发生套接字I/O相关事件
这2种情况都意味着发生了需要或可以进行I/O的事件
WSAEventSelect函数
该函数用于指定某一套接字为事件监视对象
#include <winsock2.h>
int WSAEventSelect(SOCKET s, WSAEVENT hEventObject, long lNetworkEvents);
成功时返回0,失败时返回SOCKET_ERROR。其中s为监视对象的套接字句柄;hEventObject传递事件对象句柄以验证事件发生与否;lNetworkEvents为希望监视的事件类型信息
传递参数s的套接字只要发生lNetworkEvents中指定的事件之一,WSAEventSelect函数就将hEventObject句柄所指内核对象改为signaled状态。因此,该函数又称“连接事件对象和套接字的函数”
该函数以异步通知方式工作,无论事件发生与否,WSAEventSelect函数调用后都会直接返回
该函数第三个参数的事件类型信息:
- FD_READ:是否存在需要接收的数据?
- FD_WRITE:能否以非阻塞方式传输数据?
- FD_OOB:是否收到带外数据?
- FD_ACCEPT:是否有新的连接请求?
- FD_CLOSE:是否有断开连接的请求?
manual-reset模式事件对象的创建
CreateEvent函数在创建事件对象时,可以在auto-reset模式和manual-reset模式中任选其一。此处只需要manual-reset模式non-signaled状态的事件对象,使用WSACreateEvent函数更好
#include <winsock2.h>
WSAEVENT WSACreateEvent(void);
成功时返回事件对象句柄,失败时返回WSA_INVALID_EVENT
其返回类型WSAEVENT的定义为:
#define WSAEVENT HANDLE
销毁该函数创建的事件对象:
#include <winsock2.h>
BOOL WSACloseEvent(WSAEVENT hEvent);
成功时返回TRUE,失败时返回FALSE
验证是否发生事件
#include <winsock2.h>
DWORD WSAWaitForMultipleEvents(DWORD cEvents, const WSAEVENT* lphEvents, BOOL fWaitAll, DWORD dwTimeout, BOOL fAlertable);
成功时返回发生事件的对象信息,失败时返回WSA_INVALID_EVENT。其中:
- cEvents:需要验证是否转为signaled状态的事件对象的个数
- lphEvents:存有事件对象句柄的数组地址值
- fWaitAll:传递TRUE时,所有事件对象在signaled状态时返回;传递FALSE时,只要其中1个变为signaled状态就返回
- dwTimeout:以1/1000秒为单位指定超时,传递WSA_INFINITE时,直到变为signaled状态时才会返回
- fAlertable:传递TRUE时进入alertable wait状态
- 返回值:返回值减去常量WSA_WAIT_EVENT_0时,可以得到转变为signaled状态的事件对象句柄对应的索引,可以通过该索引在第二个参数指定的数组中查找句柄。如果有多个事件对象变为signaled状态,这得到其中较小的值。发生超时将返回WSA_WAIT_TIMEOUT
通过以宏的方式声明WSA_MAXIMUM_WAIT_EVENTS常量得知WSAWaitForMultipleEvents函数可以同时监视的最大事件对象数为64,即最大句柄数为64个。如果需要监视更多句柄,就只能创建线程或扩展保存句柄的数组,并多次调用该函数
只通过1次函数调用无法得到转为signaled状态的所有事件对象句柄的信息。通过该函数可以得到转为signaled状态事件对象中的第一个索引值。可以利用“事件对象为manual-reset模式”的特点,通过如下方式获得所有signaled状态的事件对象:
int posInfo, startIdx, i;
......
posInfo = WSAWaitForMultipleEvents(numOfSock, hEventArray, FALSE, WSA_INFINITE, FALSE_;
startIdx = posInfo - WSA_WAIT_EVENT_0;
......
for (i = startIdx; i < numOfSock; i++)
{
int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArray[i], TRUE, 0, FALSE);
......
}
区分事件类型
#include <winsock2.h>
int WSAEnumNetworkEvents(SOCKET s, WSAEVENT hEventObject, LPWSANETWORKEVENTS lpNetworkEvents);
成功时返回0,失败时返回SOCKET_ERROR。其中:
- s:发生事件的套接字句柄
- hEventObject:与套接字相连的(由WSAEventSelect函数调用引发的)signaled状态的事件对象句柄
- lpNetworkEvents:保存发生的事件类型信息和错误信息的WSANETWORKEVENTS结构体变量地址值
该函数将manual-reset模式的事件对象改为non-signaled状态,所以得到发生的事件类型后,不必单独调用ResetEvent函数
WSANETWORKEVENTS结构体:
typedef struct _WSANETWORKEVENTS
{
long lNetworkEvents;
int iErrorCode[FD_MAX_EVENTS];
}WSANETWORKEVENTS, * LPWSANETWORKEVENTS;
结构体中lNetworkEvents成员保存发生的事件信息,与WSAEventSelect函数的第三个参数相同。错误信息将保存到iErrorCode数组,如果发生FD_XXX相关错误,则在iErrorCode[FD_XXX_BIT]中保存除0以外的其他值
利用异步通知I/O模型实现回声服务器端
#include <stdio.h>
#include <string.h>
#include <WinSock2.h>
#define BUF_SIZE 100
void CompressSockets(SOCKET hSockArr[], int idx, int total);
void CompressEvents(WSAEVENT hEventArr[], int idx, int total);
void ErrorHandling(char* msg);
int main(int argc, char* argv[])
{
WSADATA wsaData;
SOCKET hServSock, hClntSock;
SOCKADDR_IN servAdr, clntAdr;
SOCKET hSockArr[WSA_MAXIMUM_WAIT_EVENTS];
WSAEVENT hEventArr[WSA_MAXIMUM_WAIT_EVENTS];
WSAEVENT newEvent;
WSANETWORKEVENTS netEvents;
int numOfClntSock = 0;
int strLen, i;
int posInfo, startIdx;
int clntAdrLen;
char msg[BUF_SIZE];
if (argc != 2)
{
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
ErrorHandling("WSAStartup() error!");
}
hServSock = socket(PF_INET, SOCK_STREAM, 0);
memset(&servAdr, 0, sizeof(servAdr));
servAdr.sin_family = AF_INET;
servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
servAdr.sin_port = htons(atoi(argv[1]));
if (bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
{
ErrorHandling("bind() error");
}
if (listen(hServSock, 5) == SOCKET_ERROR)
{
ErrorHandling("listen() error");
}
newEvent = WSACreateEvent();
if (WSAEventSelect(hServSock, newEvent, FD_ACCEPT) == SOCKET_ERROR)
{
ErrorHandling("WSAEventSelect() error");
}
hSockArr[numOfClntSock] = hServSock;
hEventArr[numOfClntSock] = newEvent;
numOfClntSock++;
while (1)
{
posInfo = WSAWaitForMultipleEvents(numOfClntSock, hEventArr, FALSE, WSA_INFINITE, FALSE);
startIdx = posInfo - WSA_WAIT_EVENT_0;
for (i = startIdx; i < numOfClntSock; i++)
{
int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArr[i], TRUE, 0, FALSE);
if ((sigEventIdx == WSA_WAIT_FAILED || sigEventIdx == WSA_WAIT_TIMEOUT))
{
continue;
}
else
{
sigEventIdx = i;
WSAEnumNetworkEvents(hSockArr[sigEventIdx], hEventArr[sigEventIdx], &netEvents);
if (netEvents.lNetworkEvents & FD_ACCEPT)
{
if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)
{
puts("Accept Error");
break;
}
clntAdrLen = sizeof(clntAdr);
hClntSock = accept(hSockArr[sigEventIdx], (SOCKADDR*)&clntAdr, &clntAdrLen);
newEvent = WSACreateEvent();
WSAEventSelect(hClntSock, newEvent, FD_READ | FD_CLOSE);
hEventArr[numOfClntSock] = newEvent;
hSockArr[numOfClntSock] = hClntSock;
numOfClntSock++;
puts("connected new client...");
}
if (netEvents.lNetworkEvents & FD_READ)
{
if (netEvents.iErrorCode[FD_READ_BIT] != 0)
{
puts("Read Error");
break;
}
strLen = recv(hSockArr[sigEventIdx], msg, sizeof(msg), 0);
send(hSockArr[sigEventIdx], msg, strLen, 0);
}
if (netEvents.lNetworkEvents & FD_CLOSE)
{
if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
{
puts("Close Error");
break;
}
WSACloseEvent(hEventArr[sigEventIdx]);
closesocket(hSockArr[sigEventIdx]);
numOfClntSock--;
CompressSockets(hSockArr, sigEventIdx, numOfClntSock);
CompressEvents(hEventArr, sigEventIdx, numOfClntSock);
}
}
}
}
WSACleanup();
return 0;
}
void CompressSockets(SOCKET hSockArr[], int idx, int total)
{
int i;
for (i = idx; i < total; i++)
{
hSockArr[i] = hSockArr[i + 1];
}
}
void CompressEvents(WSAEVENT hEventArr[], int idx, int total)
{
int i;
for (i = idx; i < total; i++)
{
hEventArr[i] = hEventArr[i + 1];
}
}
void ErrorHandling(char* msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
|