参考
- 《TCP/IP网络编程》 尹圣雨
重叠I/O模型
理解重叠I/O模型
同一线程内部向多个目标传输(或从多个目标接收)数据引起的I/O重叠现象称为“重叠I/O”。为了完成这项任务,调用的I/O函数应立即返回,因此前提条件是异步I/O。而且,为了完成异步I/O,调用的I/O函数应以非阻塞模式工作
Windows中重叠I/O的重点并非I/O本身,而是如何确认I/O完成时的状态
创建重叠I/O套接字
#include <winsock2.h>
SOCKET WSASocket(int af, int type, int protocol, LPWSAPROTOCOL_INFO lpProtocolInfo, GROUP g, DWORD dwFlags);
成功时返回套接字句柄,失败时返回INVALID_SOCKET。其中:
- af:协议族信息
- type:套接字数据传输方式
- protocol:2个套接字之间使用的协议信息
- lpProtocolInfo:包含创建的套接字信息的WSAPROTOCOL_INFO结构体变量地址值,不需要时传递NULL
- g:为扩展函数而预约的参数,可以使用0
- dwFlags:套接字属性信息
可以向最后一个参数传递WSA_FLAG_OVERLAPPED,赋予创建出的套接字重叠I/O特性。例如:
WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
执行重叠I/O的函数
重叠I/O模型中,服务器端和客户端间的连接过程与一般的套接字连接过程相同,但I/O数据时使用的函数不同
WSASend
#include <winsock2.h>
int WSASend(SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, DWORD dwFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);
成功时返回0,失败时返回SOCKET_ERROR。其中:
- s:套接字句柄,传递具有重叠I/O属性的套接字句柄时,以重叠I/O模型输出
- lpBuffers:WSABUF结构体变量数组的地址值,WSABUF中存有待传输数据
- dwBufferCount:第二个参数中数组的长度
- lpNumberOfBytesSent:用于保存实际发送字节数的变量地址值
- dwFlags:用于更改数据传输特性,如传递MSG_OOB时发送OOB模式的数据
- lpOverlapped:WSAOVERLAPPED结构体变量的地址值,使用事件对象,用于确认完成数据传输
- lpCompletionRoutine:传入Completion Routine函数的入口地址值,可以通过该函数确认是否完成数据传输
WSABUF结构体
WSASend第二个参数涉及的WSABUF结构体中存有待传输数据的地址和大小等信息:
typedef struct __WSABUF
{
u_long len;
char FAR* buf;
}WSABUF, * LPWSABUF;
WSAOVERLAPPED结构体
WSASend第六个参数涉及的WSAOVERLAPPED结构体定义如下:
typedef struct __WSAOVERLAPPED
{
DWORD Internal;
DWORD InternalHigh;
DWORD Offset;
DWORD OffsetHigh;
WSAEVENT hEvent;
}WSAOVERLAPPED, * LPWSAOVERLAPPED;
其中,Internal、InternalHigh成员是进行重叠I/O时操作系统内部使用的成员,而Offset、OffsetHigh属于具有特殊用途的成员。此处只需关注hEvent成员
WSASend函数的lpOverlapped参数中应传递有效的结构体变量地址值,如果传递NULL,WSASend函数的第一个参数中的句柄所指的套接字将以阻塞模式工作。另外,利用WSASend函数同时向多个目标传递数据时,需要分别构建传入第六个参数的WSAOVERLAPPED结构体变量
WSASend示例
WSAEVENT event;
WSAOVERLAPPED overlapped;
WSABUF dataBuf;
char buf[BUF_SIZE] = {"待传输的数据"};
int recvBytes = 0;
......
event = WSACreateEvent();
memset(&overlapped, 0, sizeof(overlapped));
overlapped.hEvent = event;
dataBuf.len = sizeof(buf);
dataBuf.buf = buf;
WSASend(hSocket, &dataBuf, 1, &recvBytes, 0, &overlapped, NULL);
......
获取实际传输数据大小
WSASend函数调用过程中,如果输出缓冲是空的,且传输的数据并不大,那么函数调用后可以立即完成数据传输。此时WSASend函数将返回0,而lpNumberOfBytesSent中将保存实际传输的数据大小的信息。反之,WSASend函数返回后仍需要传输数据时,将返回SOCKET_ERROR,并将WSA_IO_PENDING注册为错误代码,该代码可以通过WSAGetLastError函数得到。这时使用WSAGetOverlappedResult函数获取实际传输的数据大小
#include <winsock2.h>
BOOL WSAGetOverlappedResult(SOCKET s, LPWSAOVERLAPPED lpOverlapped, LPDWORD lpcbTransfer, BOOL fWait, LPDWORD lpdwFlags);
成功时返回TRUE,失败时返回FALSE。其中:
- s:进行重叠I/O的套接字句柄
- lpOverlapped:进行重叠I/O时传递的WSAOVERLAPPED结构体变量的地址值
- lpcbTransfer:用于保存实际传输的字节数的变量地址值
- fWait:如果调用该函数时仍在进行I/O,fWait为TRUE时等待I/O完成,fWait为FALSE时将返回FALSE并跳出函数
- lpdwFlags:调用WSARecv函数时,用于获取附加信息(例如OOB消息)。如果不需要,可以传递NULL
WSARecv
#include <winsock2.h>
int WSARecv(SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesRecvd, LPWORD lpFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);
成功时返回0,失败时返回SOCKET_ERROR。其中:
- s:赋予重叠I/O属性的套接字句柄
- lpBuffers:用于保存接收数据的WSABUF结构体数组地址值
- dwBufferCount:向第二个参数传递的数组的长度
- lpNumberOfBytesRecvd:保存接收的数据大小信息的变量地址值
- lpFlags:用于设置或读取传输特性信息
- lpOverlapped:WSAOVERLAPPED结构体变量地址值
- lpCompletionRoutine:Completion Routine函数地址值
确认重叠I/O的I/O完成
重叠I/O中有2种方法确认I/O的完成并获取结果:
- 利用WSASend、WSARecv函数的第六个参数,基于事件对象
- 利用WSASend、WSARecv函数的第七个参数,基于Completion Routine
使用事件对象
Sender
#include <stdio.h>
#include <stdlib.h>
#include <WinSock2.h>
#include <WS2tcpip.h>
void ErrorHandling(char* msg);
int main(int argc, char* argv[])
{
WSADATA wsaData;
SOCKET hSocket;
SOCKADDR_IN sendAdr;
WSABUF dataBuf;
char msg[] = "Network is Computer!";
int sendBytes = 0;
WSAEVENT evObj;
WSAOVERLAPPED overlapped;
if (argc != 3)
{
printf("Usage : %s <IP> <port>\n", argv[0]);
exit(1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
ErrorHandling("WSAStartup() error!");
}
hSocket = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
memset(&sendAdr, 0, sizeof(sendAdr));
sendAdr.sin_family = AF_INET;
inet_pton(AF_INET, argv[1], &sendAdr.sin_addr);
sendAdr.sin_port = htons(atoi(argv[2]));
if (connect(hSocket, (SOCKADDR*)&sendAdr, sizeof(sendAdr)) == SOCKET_ERROR)
{
ErrorHandling("connnect() error!");
}
evObj = WSACreateEvent();
memset(&overlapped, 0, sizeof(overlapped));
overlapped.hEvent = evObj;
dataBuf.len = strlen(msg) + 1;
dataBuf.buf = msg;
if (WSASend(hSocket, &dataBuf, 1, &sendBytes, 0, &overlapped, NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() == WSA_IO_PENDING)
{
puts("Background data send");
WSAWaitForMultipleEvents(1, &evObj, TRUE, WSA_INFINITE, FALSE);
WSAGetOverlappedResult(hSocket, &overlapped, &sendBytes, FALSE, NULL);
}
else
{
ErrorHandling("WSASend() error");
}
}
printf("Send data size: %d \n", sendBytes);
WSACloseEvent(evObj);
closesocket(hSocket);
WSACleanup();
return 0;
}
void ErrorHandling(char* msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
Receiver
#include <stdio.h>
#include <stdlib.h>
#include <WinSock2.h>
#define BUF_SIZE 1024
void ErrorHandling(char* message);
int main(int argc, char* argv[])
{
WSADATA wsaData;
SOCKET hLisnSock, hRecvSock;
SOCKADDR_IN lisnAdr, recvAdr;
int recvAdrSz;
WSABUF dataBuf;
WSAEVENT evObj;
WSAOVERLAPPED overlapped;
char buf[BUF_SIZE];
int recvBytes = 0, flags = 0;
if (argc != 2)
{
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
ErrorHandling("WSAStartup() error!");
}
hLisnSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
memset(&lisnAdr, 0, sizeof(lisnAdr));
lisnAdr.sin_family = AF_INET;
lisnAdr.sin_addr.s_addr = htonl(INADDR_ANY);
lisnAdr.sin_port = htons(atoi(argv[1]));
if (bind(hLisnSock, (SOCKADDR*)&lisnAdr, sizeof(lisnAdr)) == SOCKET_ERROR)
{
ErrorHandling("bind() error");
}
if (listen(hLisnSock, 5) == SOCKET_ERROR)
{
ErrorHandling("listen() error");
}
recvAdrSz = sizeof(recvAdr);
hRecvSock = accept(hLisnSock, (SOCKADDR*)&recvAdr, &recvAdrSz);
evObj = WSACreateEvent();
memset(&overlapped, 0, sizeof(overlapped));
overlapped.hEvent = evObj;
dataBuf.len = BUF_SIZE;
dataBuf.buf = buf;
if (WSARecv(hRecvSock, &dataBuf, 1, &recvBytes, &flags, &overlapped, NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() == WSA_IO_PENDING)
{
puts("Background data receive");
WSAWaitForMultipleEvents(1, &evObj, TRUE, WSA_INFINITE, FALSE);
WSAGetOverlappedResult(hRecvSock, &overlapped, &recvBytes, FALSE, NULL);
}
else
{
ErrorHandling("WSARecv() error");
}
}
printf("Received message: %s \n", buf);
WSACloseEvent(evObj);
closesocket(hRecvSock);
closesocket(hLisnSock);
WSACleanup();
return 0;
}
void ErrorHandling(char* message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
使用Completion Routine函数
通过WSASend、WSARecv函数的最后一个参数中指定的Completion Routine(CR)函数验证I/O完成情况
I/O完成时调用注册的CR函数进行事后处理。操作系统通常会预先定义规则,只有请求I/O的线程处于alertable wait状态时才能调用Completion Routine函数,防止破坏程序正常执行流
启动I/O任务后,执行完紧急任务时可以调用下面任一函数验证I/O完成与否:
- WaitForSingObjectEx
- WaitForMultipleObject
- WSAWaitForMultipleEvents
- SleepEx
这些函数都有1个参数fAlertable,如果该参数为TRUE,则相应线程进入alertable wait状态,如果有已完成的I/O,则调用相应CR函数。调用后,这些函数都返回WAIT_IO_COMPLETION
CR函数原型:
void CALLBACK CompletionROUTINE(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags);
其中:
- dwError:写入错误信息(正常结束时写入0)
- cbTransferred:写入实际收发的字节数
- lpOverlapped:写入WSASend、WSARecv函数的参数lpOverlapped
- dwFlags:写入调用I/O函数时传入的特性信息或0
- CALLBACK:与WINAPI相同,都是用于声明函数的调用规范,必须添加
Receiver
#include <stdio.h>
#include <stdlib.h>
#include <WinSock2.h>
#define BUF_SIZE 1024
void CALLBACK CompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
void ErrorHandling(char* message);
WSABUF dataBuf;
char buf[BUF_SIZE];
int recvBytes = 0;
int main(int argc, char* argv[])
{
WSADATA wsaData;
SOCKET hLisnSock, hRecvSock;
SOCKADDR_IN lisnAdr, recvAdr;
WSAOVERLAPPED overlapped;
WSAEVENT evObj;
int idx, recvAdrSz, flags = 0;
if (argc != 2)
{
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
ErrorHandling("WSAStartup() error!");
}
hLisnSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
memset(&lisnAdr, 0, sizeof(lisnAdr));
lisnAdr.sin_family = AF_INET;
lisnAdr.sin_addr.s_addr = htonl(INADDR_ANY);
lisnAdr.sin_port = htons(atoi(argv[1]));
if (bind(hLisnSock, (SOCKADDR*)&lisnAdr, sizeof(lisnAdr)) == SOCKET_ERROR)
{
ErrorHandling("bind() error");
}
if (listen(hLisnSock, 5) == SOCKET_ERROR)
{
ErrorHandling("listen() error");
}
recvAdrSz = sizeof(recvAdr);
hRecvSock = accept(hLisnSock, (SOCKADDR*)&recvAdr, &recvAdrSz);
if (hRecvSock == INVALID_SOCKET)
{
ErrorHandling("accept() error");
}
memset(&overlapped, 0, sizeof(overlapped));
dataBuf.len = BUF_SIZE;
dataBuf.buf = buf;
evObj = WSACreateEvent();
if (WSARecv(hRecvSock, &dataBuf, 1, &recvBytes, &flags, &overlapped, CompRoutine) == SOCKET_ERROR)
{
if (WSAGetLastError() == WSA_IO_PENDING)
{
puts("Background data receive");
}
}
idx = WSAWaitForMultipleEvents(1, &evObj, FALSE, WSA_INFINITE, TRUE);
if (idx == WAIT_IO_COMPLETION)
{
puts("Overlapped I/O Completed");
}
else
{
ErrorHandling("WSARecv() error");
}
WSACloseEvent(evObj);
closesocket(hRecvSock);
closesocket(hLisnSock);
WSACleanup();
return 0;
}
void CALLBACK CompRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)
{
if (dwError != 0)
{
ErrorHandling("CompRoutine error");
}
else
{
recvBytes = szRecvBytes;
printf("Received message: %s \n", buf);
}
}
void ErrorHandling(char* message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
|