套接字前言
在 套接字(一)与UDP编程 中我们讲了套接字和UDP的一些知识及常用API,我们知道当系统有多个进程进行网络通信的时候,每个进程必须各自通过协议+端口+IP地址的方式才能确定网络对端的唯一进程。为了区别不同的应用程序以及进程的网络连接之间相互独立,Linux系统为应用程序和TCP/IP协议交互提供了称为套接字(Socket)的接口
套接字是网络编程中十分重要的概念,Linux以文件的形式实现套接字,与套接字相应的文件属于sockfs特殊文件系统,每创建一个套接字,就是在sockfs中创建一个特殊文件,并建立起实现接口功能的相关数据结构(文件属性,操作函数指针,缓冲区等)。
struct socket
{
socket_state state;
unsignedlong flags;
structproto_ops ops;
structinode inode;
structfasync_struct *fasync_list;
structfile *file;
structsock sk;
wait_queue_head_t wait;
short type;
unsignedchar passcred;
};
套接字socket是不同进程进行双向通信的端点,简单说来即是通信双方的一种约定,用套接字的相关函数来完成通信过程。socket是应用程序和传输层之间的桥梁,套接字socket在系统调用中创建,通过绑定使应用程序与本地IP和端口号建立关系。此后应用程序发给套接字socket的数据,并告知其发往哪一台主机(对端IP+端口号),由套接字socket交给传输层,随后传输层自顶向下直至物理层向网络上发送出去。对端主机从网络上收到与该套接字socket绑定IP地址和端口号相关的数据后,在自底向上交给socket,应用程序便可从socket中提取到数据。
socket翻译为插座,通过 协议+IP+端口号 这三个参数,与一个插座socket进行绑定,只有参数完全满足的socket才能区分来自不同应用程序或网络连接的通信,实现数据传输的并发业务。
套接字类型
-
流式套接字(SOCK_STREAM) 用于提供面向连接,可靠的数据传输服务。该服务将保证数据能够实现无差错,无重复发送,并按序接收。流套接字之所以能够实现可靠的数据服务,原因在于其使用了传输控制协议,即TCP(The Transmission Control Protocol)协议。 -
数据报套接字(SOCK_DGRAM) 数据报套接字提供了一种无连接的服务。该服务并不能保证数据传输的可靠性,数据有可能在传输过程中丢失或出现数据重复,且无法保证顺序地接收到数据。数据报套接字使用UDP(User Datagram Protocol)协议进行数据的传输。由于数据报套接字不能保证数据传输的可靠性,对于有可能出现的数据丢失情况,需要在程序中做相应的处理。 -
原始套接字(SOCK_RAW) 原始套接字(SOCKET_RAW)允许对较低层次的协议直接访问,比如IP、 ICMP协议,它常用于检验新的协议实现,或者访问现有服务中配置的新设备,因为RAW SOCKET可以自如地控制Windows下的多种协议,能够对网络底层的传输机制进行控制,所以可以应用原始套接字来操纵网络层和传输层应用。比如,我们可以通过RAW SOCKET来接收发向本机的ICMP、IGMP协议包,或者接收TCP/IP栈不能够处理的IP包,也可以用来发送一些自定包头或自定协议的IP包。网络监听技术很大程度上依赖于SOCKET_RAW 。 原始套接字与标准套接字(标准套接字指的是前面介绍的流套接字和数据报套接字)的区别在于:原始套接字可以读写内核没有处理的IP数据包,而流套接字只能读取TCP协议的数据,数据报套接字只能读取UDP协议的数据。因此,如果要访问其他协议发送数据必须使用原始套接字。
socket 地址
我曾在UDP的文章中介绍过,在bind,recvfrom和sendto等函数中,特定于协议的套接字结构地址都需要强转成通用的套接字地址结构指针——struct sockaddr* ,结构体如下:
struct sockaddr {
sa_family_t sa_family;
char sa_data[14];
};
此结构体指明地址信息,但是针对网络通信,有其专用的结构体。
域间套接字 —— struct sockaddr_un
在单个主机上执行客户/服务器通信的一种方法,可视为进程间通信(IPC)方法之一(POSIX 也把 Unix 域协议称为“本地 IPC”)。结构体如下:
#include <sys/un.h>
struct sockaddr_un{
sa_family_t sun_family;
char sun_path[104];
};
网络套接字 —— struct sockaddr_in
IP地址是由4个字节组成的一个32位的值
#include <netinet/in.h>
struct sockaddr_in
{
short sin_family;
unsigned short sin_port;
struct in_addr sin_addr;
unsigned char sin_zero[sizeof(struct sockaddr) - __SOCKADDR_COMMON_SIZE - sizeof(in_port_t) - sizeof(struct in_addr)];
};
其中sin_addr 结构
struct in_addr {
uint32_t s_addr;
};
struct sockaddr_in6 {
sa_family_t sin6_family;
in_port_t sin6_port;
uint32_t sin6_flowinfo;
struct in6_addr sin6_addr;
uint32_t sin6_scope_id;
};
struct in6_addr {
unsigned char s6_addr[16];
};
TCP socket API 详解
创建套接字 —— socket 函数
socket函数的作用就是生成一个用于通信的套接字文件描述符sockfd,他唯一标识一个socket,这个文件描述符可以作为稍后 bind 函数的绑定对象。
#include <sys/types.h>
#include <sys/socket.h>
int socket(int domain,
int type,
int protocol);
创建一个socket时,还没有一个具体的地址,如果想要给他赋值一个地址(协议+端口+IP),就必须调用bind函数,否则之后的listen和connect会隐式自动分配一个空闲端口。所以服务器必须要显式绑定socket地址,因为服务器必须要为客户提供相应服务的准确入口。
绑定套接字 —— bind 函数 [服务器]
#include <sys/socket.h>
int bind( int sockfd,
const struct sockaddr *addr,
socklen_t addrlen);
-
成功返回0,失败返回-1,并设置error -
参数
- sockfd: 通过socket()函数创建的套接字描述符。
- addr:一个const struct sockaddr *指针,指向要绑定给sockfd的协议地址。这个地址结构根据地址创建socket时的地址协议族的不同而不同。
在创建套接字后,我们再新建socket地址(sickaddr_in) 并赋予协议 AF_INET ,端口号和IP地址,就可以使用bind函数将这个套接字绑定到要监听的地址和端口组合(addr:port)了。操作如下:
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_addr.s_addr=INADDR_ANY;
local.sin_port=htons(port);
server_len = sizeof(server_add);
bind(server_sockfd, (struct sockaddr*)&server_add, server_len);
此函数一般只由服务器使用,通常服务器在启动的时候都会绑定一个确定的地址(如ip地址+端口号),用于提供服务,客户就可以通过它来接连服务器;而客户端就不用指定,有系统自动分配一个端口号和自身的ip地址组合。
绑定后的端口套接字sockfd可以作为listen函数的监听对象了。
监听 socket —— listen 函数
socket在与socket地址绑定后,还不能马上接受客户连接,我们需要使用如下系统调用来创建一个监听队列以存放待处理的客户链接:
#include <sys/socket.h>
int listen(int sockfd,int backlog);
sockfd 参数指定监听的socket(数据类型为SOCK_STREAM)。backlog 参数提示内核监听队列的最大长度。监听队列的最大长度如果超过backlog,服务器将不再受理新的客户连接,客户端也将受到 ECONNREFUSED 错误信息。在内核版本2.2之前的linux中backlog参数是指所有处于未完全连接状态(SYN_RCVD)和完全连接状态(ESTABLISHED)的socket状态的上限。自版本2.2以后,他只表示处于完全连接状态的socket的上限,处于半连接状态的socket上限则有 /proc/sys/net/ipv4/tcp_max_syn_backlog 内核参数定义。backlog参数的典型值是5。
listen成功时返回0,失败返回-1,并设置errno。
接受连接 —— accept 函数
下面的系统调用从listen监听队列中接受一个连接
#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd,struct sockaddr* addr, socklen_t *addrlen);
sockfd 参数是执行过listen系统调用的监听socket。addr 参数用于获取被接受连接的远端socket地址,该地址的长度由 addrlen 参数指出。- 返回值
accept参数成功时返回一个新的连传输socket,该socket唯一地标识了这个被接受的连接,服务器可以通过读写该socket来与被接受连接对应的客户端进行通信。accept失败返回-1,并设置errno。
如果监听队列中处于ESTABLISHED状态的连接对应客户端出现网络异常或者提前退出,accept将依然可以调用成功,accept只是从监听队列中取出连接,而不论连接处于何种状态,更不关心网络状况的变化。
首先需要澄清如下概念:TCP socket分两种:监听socket和传输socket。
- 监听socket:负责处理网络的连接请求(客户端的syn包到达便是连接请求)
- 传输socket:负责在网络上的两个端点之间的TCP传输。
socket的两种状态,以队列的方式进行维护:
- 未决socket(pending socket):客户端的syn包到达,内核为这个syn包对应的tcp生成一个socket,但是此时三次握手没有完成,此时便是pending socket,是未决连接,没有经过三次握手认证的tcp连接。
- 已建立连接的socket(established socket):tcp服务器利用三次握手完成对客户端的认证后,未决socket就变成已连接socket,后续即可用这个socket进行传输。
在创建socket完毕,调用listen函数的时候,会发生如下动作:
- 由socket()创建的套接字描述符sockfd转换为服务器的监听socket,让次socket进入监听请求模式,此tcp状态由CLOSE转为LISTEN。
- 内核为此tcp服务器建立两条队列。
梳理下TCP建立连接的过程
- 监听socket收到客户端的syn包,第一次握手完成;
- 然后内核为此syn请求生成一个pending socket,标记状态为SYN_RCVD,并将其添加进pending队列,并且服务器发出ack和syn,第二次握手完成。
- 客户端响应服务器syn(第三个ack到达),第三次握手结束。内核触发accept函数执行,将socket状态标记为ESTABLISHED。并且将此socket从pending socket queue 调入 established socket queue。
发起连接 —— connect 函数
如果说服务器通过listen调用来被动接受连接,那么客户端需要通过 connect 来主动与已监听的服务器套接字建立连接,客户端在connect时会自动隐式绑定(bind)本地socket地址。
自然在使用 connect() 函数时需要带上连接的目的地,即目标地址和目标端口,这正是服务端的监听套接字上绑定的地址和端口。
于是,TCP 连接的两端的套接字都已经成了五元组的完整格式。
#include <sys/types.h>
#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
-
成功返回0,失败返回-1,并设置errno。 -
参数
- sockfd:客户端的通过socket()创建,返回的套接字描述符
- addr:对端服务器的socket地址
- addrlen: 对端socket地址的长度
-
客户端connect设置模板代码
struct sockaddr_in server_addr;
memset(&server_addr,0,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
server_addr.sin_addr.s_addr=inet_addr(server_ip.c_str());
socklen_t len=sizeof(server_addr);
if(connect(sockfd,(struct sockaddr*)&server_addr,len)==0)
{
}
else
{
}
关闭连接 —— close 、 shutdown 函数
关闭一个连接实际上就是关闭该连接对应的socket,这可以通过如下关闭普通文件描述符的系统调用来完成。
#include <unistd.h>
int close(int fd);
fd参数就是待关闭的socket。不过,close系统调用并非总是立即关闭一个连接,而是将fd的引用计数减1.只有当fd的引用计数为0时,才真正关闭连接。
多进程程序中,一次fork调用默认将使父进程中打开的socket的引用计数加1,因此我们必须在父进程和子进程中都对该socket执行close调用才能将连接关闭。
如果无论如何都要立即终止连接(而不是将socket的引用计数减1),可以使用如下的shutdown系统调用(它是专门为网络编程设计的)
#include <sys/socket.h>
int shutdown(int sockfd,int howto);
sockfd是待关闭的socket。howto参数决定了shutdown的行为:
选项 | 含义 |
---|
SHUT_RD | 关闭sockfd上的读通道。应用程序无法针对此socket执行读操作,并且该socket接收缓冲区的数据被丢弃 | SHUT_WR | 关闭sockfd上的写通道。sockfd的发送缓冲区的数据会在真正关闭连接前全部发送出去,应用程序无法针对此socket执行写操作。 | SHUT_RDWR | 读写通道都被关闭。发送缓冲区中已有的数据会发送直到完毕,但接收缓冲区中已有的数据将被丢弃。 |
注意:socket连接是全双工的,所以客户端和服务器都需要close(sockfd).
为什么要断开连接?
对于每个socket,服务器系统都会为其进行维护,在上文accept的讲解中,我们知道。对于每个socket,服务器都会以队列的形式为其管理。然而维护连接是有成本的(时间与空间)。
TCP数据读写 —— send、recv 函数
对文件的读写操作 read 和 write 同样适用于socket,但是socket API 提供了专用于socket数据读写的系统调用,他们增加了对数据读写的控制。其中用于TCP流数据读写的系统调用是:
#include <sys/types.h>
#include <sys/socket.h>
ssize_t recv(int sockfd,void* buf,size_t len,int flags);
ssize_t send(int sockfd,const void* buf,size_t len,int flags);
recv读取sockfd上的数据到buf指定的缓冲区上,大小为len。recv成功时返回实际读取到的数据长度,他可能小于我们所期待的长度len。因此可能要多次调用recv,才能读取到完整的数据。recv可能返回0,意味着通信对方已经关闭连接。recv出错返回-1并设置errno。
send往sockfd上写入数据,buf和len指定写缓冲区的位置和大小。send写入成功返回实际写入的数据的长度,失败返回-1并设置errno。
recv/send与read/write的区别就在于多了第四个参数flags。当flags为0时,功能是一样的。
flags参数为数据收发提供了额外控制,可以取下表的一个或几个进行逻辑或:
单执行流 TCP 网络程序实现
依旧是分别建立TCP服务器端和客户端的文件以及Makefile:
Makefile 文件如下:
CC=g++
.PHONY:all
all:tcp_client tcp_server
tcp_server:tcp_server.cc
$(CC) -o $@ $^ -std=c++11
tcp_client:tcp_client.cc
$(CC) -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -rf tcp_client tcp_server test
单执行流 TCP 服务器实现
此处仅做实验,业务不做详写,当服务器接受客户端的数据后直接打印并返回给客户端。
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define DEFAULT 8081
class TcpServer
{
private:
int port;
int listen_sock;
public:
TcpServer(int _port=DEFAULT):port(_port),listen_sock(-1)
{}
void InitTcpServer()
{
listen_sock=socket(AF_INET,SOCK_STREAM,0);
if(listen_sock<0)
{
std::cerr<<"socket error"<<std::endl;
exit(2);
}
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(port);
local.sin_addr.s_addr=INADDR_ANY;
if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0)
{
std::cerr<<"bind error"<<std::endl;
exit(3);
}
if(listen(listen_sock,5)<0)
{
std::cerr<<"listen error"<<std::endl;
exit(4);
}
}
void Loop()
{
for(;;)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
socklen_t len=sizeof(peer);
int sock=accept(listen_sock,(struct sockaddr*)&peer,&len);
if(sock<0)
{
std::cout<<"accept error"<<std::endl;
continue;
}
std::string peer_ip=inet_ntoa(peer.sin_addr);
int peer_port=ntohs(peer.sin_port);
std::cout<<"get a new link : ["<<peer_ip<<"]:"<<peer_port<<std::endl;
Service(sock,peer_ip,peer_port);
}
}
void Service(int sock,std::string peer_ip,int peer_port)
{
char buffer[1024];
while(true)
{
ssize_t size=read(sock,buffer,sizeof(buffer)-1);
if(size>0)
{
buffer[size]=0;
std::cout<<"client["<<peer_ip<<":"<<peer_port<<"]# "<<buffer<<std::endl;
write(sock,buffer,size);
}
else if(size==0)
{
std::cout<<"client["<<peer_ip<<":"<<peer_port<<"] close!"<<std::endl;
break;
}
else
{
std::cerr<<sock<<"read error"<<std::endl;
break;
}
}
close(sock);
std::cout<<"service done"<<std::endl;
}
~TcpServer()
{
if(listen_sock>0)
{
close(listen_sock);
}
}
};
#include "tcp_server.hpp"
void Usage(char* c)
{
std::cout<<"Usage: "<<c<<" port"<<std::endl;
}
int main(int argc , char* argv[])
{
if(argc<=1)
{
Usage(argv[0]);
exit(1);
}
std::cout<<"HELLO SERVER"<<std::endl;
TcpServer* server=new TcpServer(atoi(argv[1]));
server->InitTcpServer();
server->Loop();
delete server;
return 0;
}
单执行流 TCP 客户端实现
#pragma once
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
class TcpClient
{
private:
std::string server_ip;
int server_port;
int sock;
public:
TcpClient(std::string _ip,int _port):server_ip(_ip),server_port(_port),sock(-1)
{}
void InitTcpClient()
{
sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
std::cerr<<"socket error"<<std::endl;
exit(2);
}
}
void Start()
{
struct sockaddr_in server_addr;
memset(&server_addr,0,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(server_port);
server_addr.sin_addr.s_addr=inet_addr(server_ip.c_str());
socklen_t len=sizeof(server_addr);
if(connect(sock,(struct sockaddr*)&server_addr,len)==0)
{
std::cout<<"connect success"<<std::endl;
Request(sock);
}
else
{
std::cout<<"connect error"<<std::endl;
}
}
void Request(int sock)
{
std::string message;
char buffer[1024];
while(true)
{
std::cout<<"Please input# ";
std::cin>>message;
write(sock,message.c_str(),message.size());
ssize_t retsize=read(sock,buffer,sizeof(buffer)-1);
if(retsize>0)
{
buffer[retsize]=0;
}
std::cout<<"server echo# "<<buffer<<std::endl;
}
}
~TcpClient()
{
if(sock>0)
{
close(sock);
}
}
};
#include "tcp_client.hpp"
void Usage(char* c)
{
std::cout<<"Usage: "<<c<<" server_ip server_port"<<std::endl;
}
int main(int argc,char* argv[])
{
if(argc!=3)
{
Usage(argv[0]);
exit(1);
}
std::cout<<"HELLO CLIENT"<<std::endl;
std::string server_ip=argv[1];
int server_port=atoi(argv[2]);
TcpClient* client=new TcpClient(server_ip,server_port);
client->InitTcpClient();
client->Start();
return 0;
}
测试以及弊端
先打开服务器端开始监听,再打开客户端发送数据:
如果客户端终止进程,那么服务端的read函数的返回值为0,此时服务端结束此次服务并继续监听,当客户端再次发起连接请求,服务器便会与其建立连接:
但是单执行流的服务器在同一时刻只能服务一个客户,如果我们同时打开两个客户端1和2:
此时客户端2显示连接是成功的,但是客户端2在输入数据后,服务器没有对其响应服务。而当前一个客户端1断开时,服务器会响应之前已在缓冲区中的数据。
显然,单执行流服务器每次只能为一个客户端服务,客户端2显示连接成功是因为服务器监听到了它,但是还没有accept,而此时服务器正单进程状态服务客户端1。所以此时负责客户端2的socket正处于未决socket队列中。
因此,要将服务器改成多执行流模式就需要引入多进程或者多线程。
多进程 TCP 网络程序
我们将之前的单进程流服务器改造为多进程流服务器。
当服务器accept之后,我们选择让子进程为新的连接socket进行服务。
不同的进程流使得分工得以明确,父进程在fork子进程后,继续负责监听套接字中获取新的连接socket,而不用关心连接socket的服务整个服务流程。
**子进程会继承父进程的文件描述符(包含套接字描述符),引用计数加1。**如果子进程再创建子进程,还是会继承同一份文件描述符,当然引用计数需加1。
我们知道子进程的资源需要由父进程回收,父进程使用wait/waitpid来回收子进程资源,但是这两个函数默认为阻塞状态,在子进程没有结束时父进程将会等待那此时的服务端仍是以串行的方式为客户端提供服务,于是需要设置为非阻塞,由于要记录回收的子进程pid,父进程又无法提前得知哪个子进程会结束,非阻塞也不好操作,于是我们选择让操作系统回收子进程的资源,有如下两种方法。
SIGCHLD信号设置为忽略
我们知道子进程结束时会给父进程发送 SIGCHLD 信号,如果父进程提前将此信号设置为忽略操作(SIG_IGN),那么子进程结束时其资源将会被自动回收。
于是,我们对服务器端的Loop函数(包含accept和具体业务,父子进程实现其分离)进行重写,改造为多进程流。
void Loop()
{
for(;;)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
socklen_t len=sizeof(peer);
int sock=accept(listen_sock,(struct sockaddr*)&peer,&len);
if(sock<0)
{
std::cout<<"accept error"<<std::endl;
continue;
}
std::string peer_ip=inet_ntoa(peer.sin_addr);
int peer_port=ntohs(peer.sin_port);
std::cout<<"get a new link : ["<<peer_ip<<"]:"<<peer_port<<std::endl;
signal(SIGCHLD,SIG_IGN);
pid_t id=fork();
if(id==0)
{
close(listen_sock);
Service(sock,peer_ip,peer_port);
}
close(sock);
}
}
父进程关闭传输socket
🚩注意:父进程不负责数据传输等逻辑业务,当父进程把业务转交给子进程后,父进程在accept时的返回值:传输socket文件描述符 对父进程本身而言没有任何意义,父进程必须立即close该传输套接字描述符,如果父进程不关闭此socket,会造成文件描述符泄漏,导致父进程中可用的文件描述符越来越少,而父进程关闭传输socket不会对子进程为客户端服务造成影响。
🚩注意: 同时建议子进程也关闭从父进程继承来的监听套接字listen_sock,实际上就算子进程不关闭监听套接字,最终也只会导致这一个文件描述符泄漏,但是逻辑业务中可能涉及对监听套接字的误操作,所以应做到功能解耦。
测试
先执行脚本循环查看进程:
while :; do ps axj | head -1 && ps axj | grep tcp_server | grep -v grep;echo "######################";sleep 1;done
这次我们多开客户端测试下代码:
- 首先开启服务器端:
仅服务器一个进程,正在不断监听。
- 打开两个客户端
多出了两个子进程以服务两个客户端。
- 客户端各自发送数据
由于服务器有两个执行流分别为两个客户端提供服务,因此他们发送给服务器的内容一并得以反馈。
孙子进程执行服务
我们对父进程fork创建出来的子进程再次进行fork,之后子进程直接结束进程,让创建出的孙子进程执行服务器业务,同时成为孤儿进程的孙子进程在业务执行结束后,由守护进程(pid=1)回收其资源。
void Loop()
{
for(;;)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
socklen_t len=sizeof(peer);
int sock=accept(listen_sock,(struct sockaddr*)&peer,&len);
if(sock<0)
{
std::cout<<"accept error"<<std::endl;
continue;
}
std::string peer_ip=inet_ntoa(peer.sin_addr);
int peer_port=ntohs(peer.sin_port);
std::cout<<"get a new link ->"<<sock<<": ["<<peer_ip<<"]:"<<peer_port<<std::endl;
pid_t id=fork();
if(id==0)
{
if(fork()>0)
{
exit(0);
}
close(listen_sock);
Service(sock,peer_ip,peer_port);
exit(0);
}
close(sock);
waitpid(id,nullptr,0);
}
}
🚩总结:
- 父进程:负责监听及accept新的传输socket,fork子进程;
- 子进程:fork创建孙子进程后立即关闭,目的让孙子进程成为孤儿进程,交由守护进程“领养”。防止子进程自身成为僵尸进程,其需要父进程的wait/waitpid来回收资源,但这种等待是瞬时的,不影响并发;
- 孤儿进程:执行逻辑业务,结束业务后exit退出,由守护进程回收其资源。
测试代码
我们开启两个客户端同时向服务器通信,此时这两个客户端由两个孤儿进程提供服务,因此它们也是能够同时享受到服务的,可以看到这两个客户端发送给服务端的数据都能够在服务端输出,并且服务端也会对它们的数据进行响应。
当客户端退出后,对应为客户端提供服务的孤儿进程也会跟着退出,这时这些孤儿进程exit后会被系统回收,而最终剩下的服务器进程将继续监听。
多线程 TCP 网络程序
创建进程的成本较高,每创建一个子进程,都要创建相应的进程控制块,页表等数据结构。而创建线程,多线程会共用一个进程的大部分资源(包含文件描述符表),成本更小,所以选择多线程为服务器提供多执行流是更好的选择。
依旧是延用之前的TCP框架,需要改造为多线程版本,这里需要注意🚩:
🚩1. 在主线程accept后,创建新线程来为客户端提供服务吗,我们把业务函数Service 放在线程运行的例程函数HandlerRequest() 中去调用。
#include <pthread.h>
pthread_t tid;
pthread_create(&tid,nullptr,HandlerRequest,);
🚩2. 根据上面的代码我们知道Service函数需要三个参数:传输socket+客户端IP+客户端端口号,但是线程创建函数只允许我们传一个参数交给例程函数,于是我们需要把这三个参数封装为一个参数类。
class Prama
{
public:
int sock;
std::string ip;
int port;
public:
Prama(int _sock,std::string _ip,int _port)
:sock(_sock)
,ip(_ip)
,port(_port)
{}
~Prama()
{}
};
于是我们的主线程中包含accept功能的Loop函数改造如下:
- 实例化参数类
- 创建新线程,将客户端的参数传入到例程中执行业务
- 由于多线程共享一张文件描述符表,所以主线程不能关闭传输socket
void Loop()
{
for(;;)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
socklen_t len=sizeof(peer);
int sock=accept(listen_sock,(struct sockaddr*)&peer,&len);
if(sock<0)
{
std::cout<<"accept error"<<std::endl;
continue;
}
std::string client_ip=inet_ntoa(peer.sin_addr);
int client_port=ntohs(peer.sin_port);
std::cout<<"get a new link ->"<<sock<<": ["<<client_ip<<"]:"<<client_port<<std::endl;
Prama* p=new Prama(sock,client_ip,client_port);
pthread_t tid;
pthread_create(&tid,nullptr,HandlerRequest,p);
}
}
🚩3. 例程函数HandlerRequest()
在类中的成员函数默认的第一个参数为隐藏this指针,而例程函数的类型为void *(*start_routine) (void *) ,所以我们需要将其定义为静态成员函数。同时我们会在例程中调用Service,静态函数无法调用非静态函数,恰好Service中没有使用成员函数和成员变量,所以Service函数也可定义为静态成员函数。在子线程执行完业务后,可以关闭传输socket。
例程函数与业务函数如下(都在TcpServer类中):
static void* HandlerRequest(void* arg)
{
pthread_detach(pthread_self());
Prama* p=(Prama*)arg;
Service(p->sock,p->ip,p->port);
delete(p);
return nullptr;
}
static void Service(int sock,std::string peer_ip,int peer_port)
{
char buffer[1024];
while(true)
{
ssize_t size=read(sock,buffer,sizeof(buffer)-1);
if(size>0)
{
buffer[size]=0;
std::cout<<"client["<<peer_ip<<":"<<peer_port<<"]# "<<buffer<<std::endl;
write(sock,buffer,size);
}
else if(size==0)
{
std::cout<<"client["<<peer_ip<<":"<<peer_port<<"] close!"<<std::endl;
break;
}
else
{
std::cerr<<sock<<"read error"<<std::endl;
break;
}
}
close(sock);
std::cout<<"service done"<<std::endl;
}
其余的代码不做改变。
代码测试
使用指令 ps -aL 来查看线程的使用情况,为了方便实时查看,我们写一个脚本,循环查看:
while :; do ps -aL | head -1 && ps -aL | grep tcp_server;echo "####################";sleep 1;done
我们开启运行服务器端后,打开两个客户端对服务器发送数据:
基于线程池的 TCP 网络程序
上述的多线程方案,服务器每接收一个客户端请求便创建一个线程,而当服务结束后线程又会被销毁,效率低下。且上述的方法无法控制线程创建的上限,一旦请求服务客户端数量很多,CPU负担增加,且每个线程等待调度的周期也会被拉长,使得每个用户都得不到良好的服务体验。
于是我们可以在运行服务端时,预先创建一批线程——线程池。好处在于:
- 避免处理短时间任务时,创建与销毁线程的代价;
- 线程池给出了同时在服务器处理任务的客户端数量的上限,保证cpu充分利用,防止过度的调度。
🚩操作思路:
- 将上述的TCP多线程框架改造为线程池版本
- 线程池首先会创建一批线程,一开始任务队列为空,所有线程阻塞在条件变量处
- 当一有客户端请求服务时,我们把负责客户端的所有信息(传输sock+客户端IP+客户端端口号)作为任务(
Task )压入任务队列(_task_queue ),随后后唤醒条件变量。 - 条件变量会放行一个线程来执行任务,将任务pop出任务队列,我们执行任务
Task 的成员函数 Run() ,其函数将会为客户端提供业务处理,不过这里我们将业务作为仿函数(Handler )解耦出来。
多说无益,直接上代码:
我们引入线程池类(ThreadPool )和任务类(Task )
线程池
我们默认创建5个线程,如果有任务就拿出任务,然后调用该任务对应的Run函数对该任务进行处理,如果线程池当中没有任务那么当前线程就会进入休眠状态。
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <unistd.h>
#include <pthread.h>
const int g_num=5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num=g_num):_num(num)
{
pthread_mutex_init(&_mtx,nullptr);
pthread_cond_init(&_cond,nullptr);
}
static void *Routine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = (ThreadPool<T> *)args;
while (true)
{
tp->Lock();
while(tp->IsEmpty())
{
tp->Wait();
}
T t;
tp->PopTask(&t);
tp->Unlock();
t.Run();
}
}
void ThreadInit()
{
pthread_t tid;
for(int i=0;i<_num;++i)
{
pthread_create(&tid,nullptr,Routine,(void*)this);
}
}
void PushTask(const T& in)
{
Lock();
_task_queue.push(in);
WakeUp();
Unlock();
}
void PopTask(T* out)
{
*out=_task_queue.front();
_task_queue.pop();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void Unlock()
{
pthread_mutex_unlock(&_mtx);
}
void Wait()
{
pthread_cond_wait(&_cond,&_mtx);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
bool IsEmpty()
{
return _task_queue.empty();
}
private:
int _num;
std::queue<T> _task_queue;
pthread_mutex_t _mtx;
pthread_cond_t _cond;
};
任务类Task
该任务类需要包含客户端对应的套接字描述符(sock)、IP地址和端口号,以明确线程是在为哪一个客户端提供服务。
类中的成员函数Run() 将是具体的业务逻辑,实际上就是上述TCP版本中的Service函数,不过为了实现代码的通信功能与具体业务功能的解耦,我们增加仿函数类成员Handler ,让Run函数来回调它,当后续需要更改业务时修改这个仿函数类即可,其他代码都不需要动。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Handler
{
public:
Handler(){}
~Handler(){}
void operator()(int sock,std::string peer_ip,int peer_port)
{
char buffer[1024];
while(true)
{
ssize_t size=read(sock,buffer,sizeof(buffer)-1);
if(size>0)
{
buffer[size]=0;
std::cout<<"client["<<peer_ip<<":"<<peer_port<<"]# "<<buffer<<std::endl;
write(sock,buffer,size);
}
else if(size==0)
{
std::cout<<"client["<<peer_ip<<":"<<peer_port<<"] close!"<<std::endl;
break;
}
else
{
std::cerr<<sock<<"read error"<<std::endl;
break;
}
}
close(sock);
std::cout<<"service done"<<std::endl;
}
};
class Task
{
private:
int sock;
std::string ip;
int port;
Handler handler;
public:
Task(){}
Task(int _sock,std::string _ip,int _port):sock(_sock),ip(_ip),port(_port)
{}
void Run()
{
handler(sock,ip,port);
}
~Task()
{}
};
服务器类TcpServer引入线程池类
- TcpServer的成员变量加入线程池类指针
tp ; - 初始化函数
InitTcpServer() 中,tp指针new出线程池对象,可以指定线程池的线程个数,默认为5个; Loop 函数中首先对线程池进行初始化(ThreadInit() ),批量线程被创建出来后阻塞在条件变量前等待任务,然后主线程开始accept客户端的请求。- 当服务器accept一个客户端的请求后,会将客户端的套接字描述符、IP地址以及端口号构建出一个任务,然后调用线程池提供的
PushTask 接口将该任务塞入任务队列。
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <pthread.h>
#include "threadpool.hpp"
#include "Task.hpp"
#define DEFAULT 8081
class Prama
{
public:
int sock;
std::string ip;
int port;
public:
Prama(int _sock,std::string _ip,int _port)
:sock(_sock)
,ip(_ip)
,port(_port)
{}
~Prama()
{}
};
class TcpServer
{
private:
int port;
int listen_sock;
ThreadPool<Task> *tp;
public:
TcpServer(int _port=DEFAULT):port(_port),listen_sock(-1),tp(nullptr)
{}
void InitTcpServer()
{
listen_sock=socket(AF_INET,SOCK_STREAM,0);
if(listen_sock<0)
{
std::cerr<<"socket error"<<std::endl;
exit(2);
}
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(port);
local.sin_addr.s_addr=INADDR_ANY;
if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0)
{
std::cerr<<"bind error"<<std::endl;
exit(3);
}
if(listen(listen_sock,5)<0)
{
std::cerr<<"listen error"<<std::endl;
exit(4);
}
tp=new ThreadPool<Task>();
}
void Loop()
{
tp->ThreadInit();
for(;;)
{
struct sockaddr_in peer;
memset(&peer,0,sizeof(peer));
socklen_t len=sizeof(peer);
int sock=accept(listen_sock,(struct sockaddr*)&peer,&len);
if(sock<0)
{
std::cout<<"accept error"<<std::endl;
continue;
}
std::string client_ip=inet_ntoa(peer.sin_addr);
int client_port=ntohs(peer.sin_port);
std::cout<<"get a new link ->"<<sock<<": ["<<client_ip<<"]:"<<client_port<<std::endl;
Task t(sock,client_ip,client_port);
tp->PushTask(t);
}
}
~TcpServer()
{
if(listen_sock>0)
{
close(listen_sock);
}
delete tp;
}
};
其余的代码不做修改。
测试线程池版的TCP网络程序
依旧是先采用脚本对线程进行查看:
while :; do ps -aL|head -1&&ps -aL|grep tcp_server;echo "####################";sleep 1;done
运行服务端,线程池的线程已全部创建,并等待为客户端进行服务
我们开启两个客户端并发送数据,可顺利实现并发:
我们如果打开的客户端超过线程池上限将会阻塞住:
直到其中的一个执行的客户端退出后,后续排队的客户端才可以被线程服务
TCP 套接字网络编程流程
- 服务器初始化
- 调用socket,创建文件描述符listenfd
- 调用bind,将listenfd和本地IP及端口号绑定,如果这个端口已经被占用,bind会失败
- 调用listen,声明listenfd为监听描述符,为后面accept做准备
- 调用accept并阻塞,等待客户端connect
- 客户端调用socket,创建文件描述符;
- 调用connect,向服务器发起连接请求;
- connect发出SYN段,阻塞等待服务器应答;(第一次)
- 服务器收到客户端SYN,会应答SYN-ACK段表示同意连接;(第二次)
- 客户端收到SYN-ACK后会从connect()返回,同时应答一个ACK段;(第三次)
这个建立连接的过程称为:三次握手;
- 建立连接后,TCP协议提供全双工的通信服务——同一时刻,通信双方可以同时写数据。(半双工,同一时刻只能由一方写数据)
- 服务器从accept返回后立即调用read,读socket就如同读管道一样,没有数据到达则阻塞等待。
- 客户端调用write给服务器发送数据,服务器收到后从read返回,对客户端的请求进行处理,在此期间客户端调用read()阻塞等待服务器的应答。
- 服务器调用write()将处理结果发回给客户端,再次调用read阻塞等待下一个请求。
- 客户端收到后从read()返回,如此循环下去。
- 如果客户端没有更多的请求,就调用close()关闭连接,客户端会向服务器发送FIN段;(第一次)
- 此时服务器收到FIN后,会回应ACK,同时read返回0;(第二次)
- read返回后,服务器就知道客户端关闭了连接,也调用close关闭连接,这个时候服务器会向客户端发送FIN;(第三次)
- 客户端收到FIN,返回ACK给服务器;(第四次)
这个断开连接的过程称为:四次挥手
在学习socket API时要注意应用程序和TCP协议层是如何交互的:
应用程序调用某个socket函数时TCP协议层完成什么动作,比如调用connect()会发出SYN段
应用程序如何知道TCP协议层的状态变化,比如从某个阻塞的socket函数返回就表明TCP协议收到了某些段,再比如read()返回0就表明收到了FIN段。
— end —
青山不改 绿水长流
|