IOCP模型C++入门级服务端搭建
效果展示
Windows平台打开DOS界面(cmd命令)输入:netstat -anot | findstr 端口号,即可查看端口是否被占用。
源码示例
TIPS:函数API的注解出自Microsoft官方文档。
UNetCore.h
#ifndef UNETCORE_H_
#define UNETCORE_H_
#define U_IOCP_NETCORE
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <functional>
namespace U
{
class IEventLoop
{
public:
virtual bool Init() = 0;
virtual void LoopOnce() = 0;
virtual void UnInit() = 0;
};
class ITcpSocket
{
public:
virtual void Init(IEventLoop* loop) = 0;
};
typedef std::function<void(ITcpSocket*)> FTcpServerCB;
class ITcpServer
{
public:
virtual bool Init(IEventLoop* loop, FTcpServerCB cb) = 0;
virtual bool Listen(const char* ip, int port) = 0;
};
void InitNetCore();
void UnNetCore();
IEventLoop* CreateEventLoop();
ITcpServer* CreateTcpServer();
}
#endif
IOCPEventLoop.h
#include "UNetCore.h"
#ifdef U_IOCP_NETCORE
#include <WinSock2.h>
#include <iostream>
#include <unordered_map>
#pragma comment(lib,"ws2_32")
#pragma comment(lib, "Mswsock")
#pragma comment(lib, "shlwapi")
#pragma comment(lib, "psapi")
#ifndef EVENTLOOP_H_
#define EVENTLOOP_H_
namespace U
{
class sEvent
{
public:
enum class Type
{
E_TCPSERVER,
E_TCPCLIENT,
};
Type type;
SOCKET sock;
union
{
class TcpServer* tcpServer;
};
};
class EventLoop :public IEventLoop
{
private:
std::unordered_map<SOCKET, sEvent*> _events;
HANDLE _iocp;
public:
bool Init() override;
void LoopOnce() override;
void UnInit() override;
void AddEvent(sEvent* event);
bool AssioIOCP(SOCKET sock, void* ptr);
};
}
#endif
#endif
IOCPEventLoop.cpp
CreateIoCompletionPort
作用: ??创建输入/输出(I/O)完成端口并将其与指定的文件句柄(文件描述符)相关联,或创建尚未与文件句柄(文件描述符)关联的I/O完成端口,以便稍后关联。
??将打开的文件句柄(文件描述符)的实例与I/O完成端口相关联,使进程能够接收涉及该文件句柄(文件描述符)的异步I/O操作完成通知。
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle, 文件句柄(文件描述符)
_In_opt_ HANDLE ExistingCompletionPort, 现有的完成端口
_In_ ULONG_PTR CompletionKey, 完成密钥
_In_ DWORD NumberOfConcurrentThreads 并发线程数
);
参数:
- FileHandle,打开的文件句柄或INVALID_HANDLE_VALUE。如果指定了INVALID_HANDLE_VALUE,该函数将创建I/O完成端口,而无需将其与
文件句柄(文件描述符)相关联。在这种情况下,ExistingCompletionPort参数必须设置为NULL,并且忽略CompletionKey参数。 - ExistingCompletionPort,现有I/O完成端口或NULL的句柄。如果此参数指定现有的I/O完成端口,则函数将其与FileHandle参数指定的句柄关联。如果成功,该函数将返回现有I/O完成端口的句柄(注意,这里不会创建新的I/O完成端口)。
如果此参数为NULL,则该函数将创建新的I/O完成端口,如果成功,该函数会将句柄返回到新的I/O完成端口。 - CompletionKey,指定文件句柄(文件描述符)的每个I/O完成数据包中包含的每个句柄用户定义完成密钥。
- NumberOfConcurrentThreads,操作系统允许并发处理I/O完成端口的I/O完成数据包最大线程数。如果ExistingCompletionPort参数不为NULL,则忽略该参数。如果此参数为零,则系统允许与系统中存在处理器的并发运行线程数一样多。
返回值:
??如果函数成功,则返回值是I/O完成端口的句柄。
- 如果ExistingCompletionPort参数为NULL,则返回值为新句柄。
- 如果ExistingCompletionPort参数是有效的I/O完成端口句柄,则返回值是I/O完成端口句柄本身。
- 如果FileHandle参数是有效的句柄,则该文件句柄现有与返回的I/O完成端口相关联。
??如果函数失败,则返回值为NULL。
GetQueuedCompletionStatus
??获取排队队列的完成状态。 作用:
??尝试从指定的I/O完成端口排出I/O完成数据包。如果没有完成的数据包排队,则该函数会等待完成端口相关的待处理的I/O操作。(即发生阻塞)
??要一次处理多个I/O完成数据包,可以使用GetQueuedCompletionStatusEx函数。
BOOL WINAPI GetQueuedCompletionStatus(
_In_ HANDLE CompletionPort, I/O完成端口
_Out_ LPDWORD lpNumberOfBytesTransferred, 传输字节的数量
_Out_ PULONG_PTR lpCompletionKey, 完成密钥
_Out_ LPOVERLAPPED* lpOverlapped, 一个重叠的结构体
_In_ DWORD dwMilliseconds 时间
);
参数:
-
CompletionPort,完成I/O端口句柄,创建一个I/O完成端口需要使用CreateIoCompletionPort函数。 -
lpNumberOfBytesTransferred,一个指向变量的指针,该变量接收完成I/O端口操作中的传输字节数。在客户端、服务端连接成功后数据交互时使用(recv,send)。 -
lpCompletionKey,一个指向变量的指针,该变量接收与I/O操作已完成的文件句柄关联的完成密钥值。 -
lpOverlapped,一个指向变量的指针,该变量接收启动完成的I/O操作时指定的重叠结构的地址。 -
dwMilliseconds,等待完成数据包出现在完成端口的毫秒数(即在这段时间内该应用进程可以做其他事情,无需阻塞等待)。
- 如果完成的数据包在指定的时间内未出现,则函数超时,返回FALSE。并设置*lpOverlapped = NULL。lpOverlapped为一个二级指针。
- 如果dwMilliseconds为INFINITE(无限的),那么该函数永远不会超时。如果dwMilliseconds为0,并且没有I/O操作要脱离,则该函数将会立即超时。
??即如果我们未设置时间参数,在这个过程中如果没有I/O操作,那么会直接返回。 ??而如果我们设置了时间参数,该时间到达之后不管是否有I/O操作,都会返回相应的结果(TRUE或FALSE),但是需要注意,必须要等待该时间才会有返回结果。这个时间段中是阻塞的。(在时间段内如果有I/O操作发生则直接返回) ?? 而如果我们设置时间参数为无限大,那么将会永远阻塞,直至有I/O操作发生则退出阻塞。
返回值:
??成功则返回TRUE,否则返回FALSE。
??获取扩展的错误信息可以调用GetLastError函数。
源码:
#include "IOCPEventLoop.h"
#include "IOCPTcpServer.h"
#ifdef U_IOCP_NETCORE
namespace U
{
void InitNetCore()
{
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
}
void UnNetCore()
{
WSACleanup();
}
IEventLoop* CreateEventLoop()
{
return new EventLoop;
}
}
bool U::EventLoop::Init()
{
_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
if (INVALID_HANDLE_VALUE == _iocp)
{
std::cout << "创建完成端口失败" << std::endl;
return false;
}
std::cout << "创建完成端口成功" << std::endl;
return true;
}
void U::EventLoop::LoopOnce()
{
DWORD NumberOfBytesTransferred;
void* lpCompletionKey = NULL;
OVERLAPPED* lpOverlapped;
BOOL bRet = GetQueuedCompletionStatus(_iocp, &NumberOfBytesTransferred, (PULONG_PTR)&lpCompletionKey, &lpOverlapped, 0);
if (!bRet && NULL == lpOverlapped)
{
return;
}
sEvent* event = (sEvent*)lpCompletionKey;
switch (event->type)
{
case U::sEvent::Type::E_TCPSERVER:
event->tcpServer->OnAccept();
break;
default:
break;
}
}
void U::EventLoop::UnInit()
{
}
void U::EventLoop::AddEvent(sEvent* event)
{
}
bool U::EventLoop::AssioIOCP(SOCKET sock, void* ptr)
{
return _iocp == CreateIoCompletionPort((HANDLE)sock, _iocp, (ULONG_PTR)ptr, 0);
}
#endif
IOCPTcpServer.h
#include "UNetCore.h"
#ifdef U_IOCP_NETCORE
#include "IOCPEventLoop.h"
#ifndef TCPSERVER_H_
#define TCPSERVER_H_
namespace U
{
class TcpServer :public ITcpServer
{
private:
FTcpServerCB _cb;
SOCKET _sock;
sEvent _event;
EventLoop* _loop;
SOCKET _clientSock;
char _buffer[1024];
DWORD _recvLen;
OVERLAPPED _overLapped;
public:
TcpServer();
~TcpServer();
bool Init(IEventLoop* loop, FTcpServerCB cb) override;
bool Listen(const char* ip, int port) override;
public:
void OnAccept();
private:
bool PostAccept();
};
}
#endif
#endif
IOCPTcpServer.cpp
GetAcceptExSockaddrs
作用:
??解析AcceptEx函数获取的数据,将输出缓冲区和接收字节大小传入至函数,最终将本地和远端地址传递给sockaddr结构中。
VOID PASCAL FAR GetAcceptExSockaddrs (
_In_reads_bytes_(dwReceiveDataLength+dwLocalAddressLength+dwRemoteAddressLength) PVOID lpOutputBuffer,输出缓冲区
_In_ DWORD dwReceiveDataLength,接收的额外数据长度
_In_ DWORD dwLocalAddressLength,本地数据长度
_In_ DWORD dwRemoteAddressLength,远端数据长度
_Outptr_result_bytebuffer_(*LocalSockaddrLength) struct sockaddr **LocalSockaddr,本地Sockaddr
_Out_ LPINT LocalSockaddrLength,本地Sockaddr长度
_Outptr_result_bytebuffer_(*RemoteSockaddrLength) struct sockaddr **RemoteSockaddr,远端Sockaddr
_Out_ LPINT RemoteSockaddrLength 远端Sockaddr长度
);
参数:
- lpOutputBuffer,一个指向输出缓冲区的指针,该指针只接收由AcceptEx产生的连接发送的第一个数据块。必须是传递给AcceptEx函数的lpOutputBuffer参数。
- dwReceiveDataLength,输出缓冲区中用于接收第一个数据的字节数。该值必须等于传递给AcceptEx函数的接收数据长度参数。
- dwLocalAddressLength,为本地地址信息保留的字节数。该值必须等于传递给AcceptEx函数的dwLocalAddressLength参数。
- dwRemoteAddressLength,为远端地址信息保留的字节数。该值必须等于传递给AcceptEx函数的dwRemoteAddressLength参数。
- LocalSockaddr,接收连接本地地址的SockAddr结构的指针(与getsockname函数返回的相同信息)。必须指定此参数。
- LocalSockaddrLength,本地地址的字节大小。必须指定此参数。
- RemoteSockaddr,接收连接远端地址的SockAddr结构的指针(与getpeername函数返回的相同信息)。必须指定此参数。
- RemoteSockaddrLength,远端地址的字节大小。必须指定此参数。
返回值:
??无。
WSASocket
SOCKET WSAAPI WSASocketW(
_In_ int af,
_In_ int type,
_In_ int protocol,
_In_opt_ LPWSAPROTOCOL_INFOW lpProtocolInfo,
_In_ GROUP g,
_In_ DWORD dwFlags
);
作用:
??WSASocket函数:创建一个套接字(文件描述符,文件句柄)。
参数:
- 第一个参数表示采用ipv4族。
- 第二个参数表示采用TCP协议。
- 第三个参数表示可能的协议类型为TCP协议。
- 第四个参数如果不为空,则会让创建的套接字与LPWSAPROTOCOL_INFOW指针指向的结构体绑定。
- 第五个参数为0表示没有执行组相关的操作。
- 第六个参数表示WSA_FLAG_OVERLAPPED表示创建一个支持重叠I/O(I/O完成端口句柄)的socket。
返回值:
??文件描述符(文件句柄,socket)。
AcceptEx
作用: ??AcceptEx函数接受一个新的连接,返回本地和远端地址,并接收第一个客户端应用程序发送的第一个数据块。
BOOL PASCAL FAR AcceptEx (
_In_ SOCKET sListenSocket, 服务端文件描述符
_In_ SOCKET sAcceptSocket, 客户端文件描述符
_Out_writes_bytes_(dwReceiveDataLength+dwLocalAddressLength+dwRemoteAddressLength) PVOID lpOutputBuffer, 输出缓冲区
_In_ DWORD dwReceiveDataLength, 额外接收数据长度,除客户端地址和服务端地址之外。
_In_ DWORD dwLocalAddressLength, 本地地址长度(服务端地址长度)
_In_ DWORD dwRemoteAddressLength, 远端地址长度(客户端地址长度)
_Out_ LPDWORD lpdwBytesReceived, 收到客户端的字节大小
_Inout_ LPOVERLAPPED lpOverlapped 一个重叠的结构体
);
参数:
- sListenSocket,服务端socket(文件描述符,文件句柄)
- sAcceptSocket,客户端socket(文件描述符,文件句柄)
- lpOutputBuffer,一个指向缓冲区的指针,该指针接收到新连接 连接上来后发送的第一个数据块,服务器的本地地址和客户端的远端地址。接收数据以偏移零开始写入缓冲区的第一部分,而地址则写入缓冲区的后半部分。注意,该参数必须被指定(必须填写)。
- dwReceiveDataLength,lpOutputBuffer缓冲区中的字节数将在缓冲区开始时用于实际接收数据。这个大小不应该包括服务器本地地址,也不应该包括客户端的远程地址。它们附加到输出缓冲区。如果dwReceiveDataLength长度为零,则接受一个连接不会导致一个接收数据的操作。取而代之的是,AcceptEx将会立即完成,而无需等待任何数据(即仅仅接收客户端地址和服务端地址)。
- dwLocalAddressLength,为本地地址信息保留的字节数。该值必须至少比使用的传输协议的最大地址长度高16个字节。这16个字节是I/O完成端口用于存放内存隐藏结构体进行处理交互使用的。
- dwRemoteAddressLength,为远端地址信息保留的字节数。该值必须至少比使用的传输协议的最大地址长度高16个字节(原因如上)。注意,该值不能为零。
- lpdwBytesReceived,DWORD类型的指针,用于接收客户端传入的字节数。仅当操作同步完成时才设置此参数。如果它返回ERROR_IO_PENDING并且稍后完成(即该socket文件描述符不支持重叠I/O(I/O完成端口,创建时需指定WSA_FLAG_OVERLAPPED)),则DWORD对象永远不会被设置并且从完成通知机制中获取读取的字节数(即无法通过AcceptEx拿到字节数)。
- lpOverlapped,一个用来处理请求的重叠结构。该参数必须被指定,它不能为NULL。
返回值:
??如果没有发生错误,接收函数成功完成,并返回TRUE。否则返回FALSE。可以调用WSAGetLastError函数来返回扩展错误信息。
源码:
#include "IOCPTcpServer.h"
#ifdef U_IOCP_NETCORE
#include <ws2tcpip.h>
#include <mswsock.h>
namespace U
{
ITcpServer* CreateTcpServer()
{
return new TcpServer;
}
}
U::TcpServer::TcpServer()
{
std::cout << "初始化 TCPServer" << std::endl;
_cb = nullptr;
_sock = INVALID_SOCKET;
_event.tcpServer = this;
_event.type = sEvent::Type::E_TCPSERVER;
_event.sock = INVALID_SOCKET;
}
U::TcpServer::~TcpServer()
{
_cb = nullptr;
if (INVALID_SOCKET != _sock)
closesocket(_sock);
_sock = INVALID_SOCKET;
}
bool U::TcpServer::Init(IEventLoop* loop, FTcpServerCB cb)
{
_cb = cb;
_loop = dynamic_cast<EventLoop*>(loop);
return true;
}
bool U::TcpServer::Listen(const char* ip, int port)
{
std::cout << "Listen IP:" << ip << " port:" << port << std::endl;
if (_sock != INVALID_SOCKET)
{
closesocket(_sock);
return false;
}
_sock = socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
if (SOCKET_ERROR == bind(_sock, (sockaddr*)&addr, sizeof(addr)))
return false;
if (SOCKET_ERROR == listen(_sock, 5))
return false;
_event.sock = _sock;
if (_loop->AssioIOCP(_sock, (void*)&_event))
std::cout << "服务端句柄关联IOCP成功" << __FUNCTION__ << std::endl;
else
std::cout << "服务端句柄关联IOCP失败" << __FUNCTION__ << std::endl;
PostAccept();
return true;
}
void U::TcpServer::OnAccept()
{
sockaddr* serverAddr = NULL;
sockaddr* clientAddr = NULL;
int serverAddrLen;
int clientAddrLen;
GetAcceptExSockaddrs(_buffer, _recvLen, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16,
&serverAddr, &serverAddrLen, &clientAddr, &clientAddrLen);
_cb(nullptr);
PostAccept();
}
bool U::TcpServer::PostAccept()
{
_clientSock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (_clientSock == INVALID_SOCKET)
{
std::cout << "创建客户端socket失败" << std::endl;
return false;
}
_recvLen = 0;
if (FALSE == AcceptEx(_sock, _clientSock, _buffer, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &_recvLen, &_overLapped))
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
std::cout << "AcceptEx ERROR" << std::endl;
return false;
}
}
std::cout << "AcceptEx OK" << std::endl;
return true;
}
#endif
UServer.cpp
#include "UNetCore.h"
#include <iostream>
int main()
{
U::InitNetCore();
U::IEventLoop* loop = U::CreateEventLoop();
loop->Init();
U::ITcpServer* server = U::CreateTcpServer();
server->Init(loop, [](U::ITcpSocket* sock) {
std::cout << "客户端1连接" << std::endl;
});
server->Listen("0.0.0.0", 7890);
while (true)
loop->LoopOnce();
U::UnNetCore();
return 0;
}
服务端事件处理顺序
|