Linux内核
Socket
Socket 起源于 Unix ,Unix/Linux 基本哲学之一就是“一切皆文件”,都可以用“打开(open) –> 读写(write/read) –> 关闭(close)”模式来进行操作。因此 Socket 也被处理为一种特殊的文件或者文件描述符。
Socket 是对 TCP/IP 协议族的一种封装,是应用层与 TCP/IP 协议族通信的中间软件抽象层。Socket 还可以认为是一种网络间不同计算机上的进程通信的一种方法,利用三元组(IP地址,协议,端口号)就可以唯一标识网络中的进程,网络中的进程通信可以利用这个标志与其它进程进行交互。
五层网络模型:应用层(HTTP/SMTP)/传输层(TCP/UDP)/网络层(IP)/链路层/物理层;
socket.h
使用如下语句导入linux内核下的socket库
#include <sys/types.h>
#include <sys/socket.h>
具体关于库的描述和说明可以在命令行下使用如下命令查看:
man socket
调用setsockopt或者getsockopt可以设置或者获取的一些选项:
内核接收网络数据全过程
- 内核等待网络数据,阻塞进程(阻塞的进程不会占用CPU);
- 计算机收到了对端传送的数据;
- 数据经由网卡传送到内存;
- 网卡通过中断信号通知CPU有数据到达,CPU执行中断程序;
- 中断程序主要有两项功能,先将网络数据写入到对应socket的接收缓冲区里面,再唤醒进程;
问题
- 操作系统如何知道网络数据对应于哪个socket?
因为一个socket对应着一个端口号,而网络数据包中包含了ip和端口的信息,内核可以通过端口号找到对应的socket。当然,为了提高处理速度,操作系统会维护端口号到socket的索引结构,以快速读取。 - 如何同时监视多个socket的数据?
多路复用。
epoll
epoll的存在就是为了高效地管理多个socket,实现多路复用。epoll的工作流程如下:
- 通过调用epoll_create,创建epoll对象,同时会在文件系统中产生相应的文件描述符;
- 维护监事列表,通过调用epoll_ctl添加或删除所要监听的socket;
- 接收数据,当socket收到数据后,中断程序会给eventpoll的rdlist添加socket引用;
- 阻塞和唤醒进程,当进程调用epoll_wait语句时,内核将进程放入eventpool的等待队列中,并挂起进程;当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等待队列中的进程,进程再次进入运行状态。
编码类型表
编码模块
由于数据在传输过程中是以字节串的形式存在的,因此,在传输前后需要分别对数据进行编码和解码,这是一个序列化与反序列化的过程。在对数据进行序列化的过程中,将复杂的数据结构编码成字节串;而在对数据进行反序列化的过程中,则从字节串中还原复杂的数据结构。这个模块主要包括如下四个cpp文件:TarsType.h, Tars.h, Servant.h, ServantManager.h。它们的功能分别是,TarsType.h定义了编码模块中的数据定义类型以及数据存储类型;Tars.h文件是对数据编码以及解码过程的具体实现;Servant.h为服务端向客户端提供服务的基类,后续服务端需要实现具体的业务逻辑,只需要继承Servant,并实现其中的doRequest()函数;由于一个服务端包含的服务不止一个,因此我们利用ServantManger对不同的服务进行管理调度并实现对不同Servant的管理,根据用户的请求调用来选择不同的服务。同时,基于Servant,我们可以方便地实现实验中给出的sum()以及uppercase()远程调用。各个文件之间及类之间的关系如下:
基本类型
由于在不同编程语言中,数据的基本类型不同,因此我们对其进行归纳,将数据的基本类型划分为如下几种:
基本类型 | 含义 |
---|
Bool | 布尔类型 | Char | 字符类型 | Float | 单精度浮点数 | Dobule | 双精度浮点数 | Int8 | 8位带符号整数 | Int16 | 16位带符号整数 | Int32 | 32位带符号整数 | Int64 | 64位带符号整数 | UInt8 | 8位无符号整数 | UInt16 | 16位无符号整数 | UInt32 | 32位无符号整数 | UInt64 | 64位无符号整数 | String | 字符串类型 |
其中,Char类型与Int8类型在本质上是相同的,因此在实现过程中我们只需对其中一个进行实现。具体在代码中的定义如下:
复杂类型
除了基本类型之外,我们对基本类型进行不同形式的构造以及组合可以生成复杂类型。
结构
结构定义如下:
struct Test
{
0 require string s;
1 optional int i;
};
其中第一列的数字0和1分别表示相应字段的标识符Tag,并且Tag的值在[0,255]之间。require表示该字段必选,optional表示该字段可选。
序列
序列vector的定义如下:
vector<int> vi;
字典
字典map的定义如下:
map<int, string> m;
嵌套
任何struct,map,vector都可以嵌套,以构成更加复杂的数据结构。这里我们只给出了复杂类型的定义,但并不给出它们的具体实现方式。我们只在代码中实现了对基本类型的序列化和反序列化方法。
数据编码
每一个数据由两个部分组成,如下所示:
| 头信息 | 实际数据 |
而头部信息包含以下几个部分:
| Type(4 bits) | Tag 1(4 bits) | Tag 2(1 byte) |
Type表示类型,用4个二进制位表示,取值范围是0~15,用来标识该数据的类型。不同类型的数据,其后紧跟着的实际数据的长度和格式都是不一样的,详见一下的类型表。同时,这里的类型与上面提到的基本类型是不一样的,这里的类型仅表示数据的存储类型(如数据长度等存储信息),而上述提到的类型则是数据定义的类型。
取值 | 类型 | 含义 |
---|
0 | int1 | 紧跟1个字节的整型数据 | 1 | int2 | 紧跟2个字节的整型数据 | 2 | int4 | 紧跟4个字节的整型数据 | 3 | int8 | 紧跟8个字节的整型数据 | 4 | float | 紧跟4个字节浮点型数据 | 5 | double | 紧跟8个字节浮点型数据 | 6 | string1 | 紧跟1个字节长度,再跟内容 | 7 | string4 | 紧跟4个字节长度,再跟内容 | 8 | map | 紧跟一个整型数据表示Map的大小,再跟[key, value]对列表 | 9 | list | 紧跟一个整型数据表示List的大小,再跟元素列表 | 10 | structbegin | 自定义结构开始标志 | 11 | structend | 自定义结构结束标志 |
具体在代码中的定义如下: Tag为用户给定的标记,由Tag1和Tag2两部分组成。Tag 2是可选的,当Tag的值不超过14时,只需要用Tag 1就可以表示;当Tag的值超过14而小于256时,Tag 1固定为15,而用Tag 2表示Tag的值。Tag不允许大于255。 为了简化实现,我们在代码中按照这样的方式使用Tag:
- 客户端要向服务端发起RPC请求调用,客户端需传入请求的函数名及对应的参数;
- 客户端传入的请求函数名将用Tag为0来标记,之后传入的所有参数从1开始按序依次标记;
- 根据相应的Tag以及数据的定义类型,RPC请求调用的参数进行序列化;
- 服务端收到序列化的数据,对收到的数据进行反序列化;
- 具体地,服务端会首先解析出Tag为0的字段来确定客户端RPC请求调用的函数名,再依次解析出后续的参数,最后调用相应的函数。
编码实现
编码的具体实现在Tars.h中,其中两个主要的类TarsInputStream以及TarsOutputStream它们分别实现了对数据结构的解码和编码的过程,即反序列化以及序列化。同时这两个类又分别继承自BufferReader以及BufferWriter类,这两个类主要是对缓冲读取器和缓冲写入器的封装,具体地,它们的定义如下:
class BufferReader
{
public:
const char *_buf;
size_t _buf_len;
size_t _cur;
public:
BufferReader() : _buf(NULL), _buf_len(0), _cur(0) {}
~BufferReader()
{
delete[] _buf;
}
void reset();
void readBuf(void *buf, size_t len);
void peekBuf(void *buf, size_t len, size_t offset = 0);
void readBuf(std::vector<char> &v, size_t len);
void peekBuf(std::vector<Char> &v, size_t len, size_t offset = 0);
void setBuffer(const char *buf, size_t len);
void setBuffer(const std::vector<char> &buf);
bool hasEnd();
size_t len() const;
const char *base() const;
size_t size() const;
}
class BufferWriter
{
public:
char *_buf;
size_t _len;
size_t _buf_len;
static char *_reserve(BufferWriter &os, size_t len)
{
char *p = new char[(len)];
memcpy(p, (os)._buf, (os)._len);
delete[](os)._buf;
return p;
}
public:
BufferWriter() : _buf(NULL), _len(0), _buf_len(0) {}
~BufferWriter()
{
delete[] _buf;
}
void reset();
void writeBuf(const void *buf, size_t len);
std::vector<char> getByteBuffer() const;
size_t len() const;
const char *base() const;
size_t size() const;
}
其中TarsInputStream继承BufferReader类,并实现类Read()函数,其主要功能为从缓冲区中读取对应Tag的数据,并将数据解析为相应的类型,这里的类型即为上面给出的数据定义类型中的基本类型。这里给出将数据读取为Char类型的具体实现,其他类型过程相似:
void read(Char &c, uint8_t tag)
{
uint8_t headType = 0, headTag = 0;
bool skipFlag = false;
TarsSkipToTag(skipFlag, tag, headType, headTag);
if (skipFlag)
{
switch (headType)
{
case TarsHeadeChar:
TarsReadTypeBuf(*this, c, Char);
break;
default:
{
char s[64];
snprintf(s, sizeof(s), "read 'Char' type mismatch, tag: %d, get type: %d.", tag, headType);
throw TarsDecodeMismatch(s);
}
}
}
else
{
char s[64];
snprintf(s, sizeof(s), "require field not exist, tag: %d, headTag: %d.", tag, headTag);
throw TarsDecodeRequireNotExist(s);
}
}
读取的过程为,首先跳转到Buffer中指定Tag的位置,然后判断跳转是否成功,如果跳转成功,则根据头部的Type字段判断数据是否能够被解析为Char类型,如果可以,则将数据以Char类型读出;其他情况均为读取失败,返回提示信息。
其中TarsOutputStream继承BufferWriter类,并实现类Write()函数,其主要功能为向缓冲区中写入对应Tag的数据,并将数据编码为相应的类型,这里的类型即为上面给出的数据定义类型中的基本类型。这里给出将数据读取为Char类型的具体实现,其他类型过程相似:
void write(Char c, uint8_t tag)
{
TarsWriteToHead(*this, TarsHeadeChar, tag);
TarsWriteToBody(*this, c, (*this)._len, Char);
}
写入的实现过程比较简单,其主要做的就是向Buffer末尾首先写入头部信息(带上指定的Tag),然后Buffer写入具体的数据信息。
服务模版
Servant.h中给出了实现RPC服务的模版,它的主要内容如下:
class Servant
{
public:
Servant()
{
is = new TarsInputStream();
os = new TarsOutputStream();
}
~Servant()
{
is=NULL;
os=NULL;
}
void setName(const string &name)
{
_name = name;
}
string getName() const
{
return _name;
}
virtual void initialize() = 0;
virtual void destroy() = 0;
virtual int doRequest(const vector<char> &request, vector<char> &response) { return -1; }
protected:
string _name;
TarsInputStream *is;
TarsOutputStream *os;
};
每个具体的服务都有它们对应的名字,这个名字在_name字段中给出,同时它利用到了Tars.h中实现的TarsInputStream和TarsOutputStream来对输入中的字节数据进行解码,并对最终的调用执行结果(返回输出)进行编码。每一个具体的服务,需要继承Servant这个基类,并实现其中的initialize()、destroy()以及doRequest()三个函数,同时主要的RPC处理是在doRequest()这个函数中进行的。
服务管理
ServantManager.h文件则实现了对不同的RPC服务进行管理,其主要内容如下:
class ServantManager
{
public:
template <typename T>
void addServant(const string &id)
{
if (servant_creator.find(id) != servant_creator.end())
{
cerr << "ServantManager::addServant id: " << id << " already exists!" << endl;
throw runtime_error("ServantManager::addServant id: " + id + " already exists!");
}
shared_ptr<ServantCreation<T>> p = make_shared<ServantCreation<T>>();
servant_creator[id] = p;
}
ServantPtr create(const string &id)
{
ServantPtr servant = NULL;
if (servant_creator.find(id) != servant_creator.end())
{
cout << "ServantManager::create servant name " << id << endl;
servant = servant_creator[id]->create(id);
}
return servant;
}
int serve(const vector<char> &request, vector<char> &response)
{
TarsInputStream *is = new TarsInputStream();
is->setBuffer(request);
String op;
is->read(op, 0);
ServantPtr p = create(op);
if (p != NULL)
{
p->doRequest(request, response);
return 0;
}
else
{
return -1;
}
}
protected:
map<string, ServantHelperCreationPtr> servant_creator;
};
其中,addServant()函数由服务端根据自身的业务需求添加相应的服务处理函数;create()函数根据RPC请求的函数名,创建一个具体的RPC处理实例,处理客户端的RPC请求;最后,serve()函数则是解析客户端请求中需要调用的RPC服务,来选择不同的RPC请求,并调用create()函数创建具体的实例。
Sum服务
我们利用Sevant模版,实现远程调用 float sum(float a, float b) ,其主要函数doRequest实现如下:
int SumImp::doRequest(const vector<char> &request, vector<char> &response)
{
cout<<"SumImp::doRequest"<<endl;
is->setBuffer(request);
tars::Float i1,i2;
is->read(i1,1);
is->read(i2,2);
tars::Float i=i1+i2;
os->write(i,1);
response=os->getByteBuffer();
return 0;
}
Uppercase服务
我们利用Sevant模版,实现远程调用 string uppercase(str),其主要函数doRequest实现如下:
int UppercaseImp::doRequest(const vector<char> &request, vector<char> &response)
{
cout<<"UppercaseImp::doRequest"<<endl;
is->setBuffer(request);
tars::String s;
is->read(s,1);
transform(s.begin(), s.end(), s.begin(), ::toupper);
os->write(s,1);
response=os->getByteBuffer();
return 0;
}
模块测试
编写测试代码对两个服务进行测试,内容如下:
int main(){
ServantManager sm;
sm.addServant<SumImp>("sum");
sm.addServant<UppercaseImp>("uppercase");
TarsOutputStream *os1=new TarsOutputStream();
String op1=String("sum");
Float x=10.1;
Float y=12.2;
os1->write(op1,0);
os1->write(x,1);
os1->write(y,2);
vector<char> req1,res1;
req1=os1->getByteBuffer();
int b1=sm.serve(req1,res1);
cout<<b1<<endl;
TarsInputStream *is1=new TarsInputStream();
is1->setBuffer(res1);
Float r1;
is1->read(r1,1);
cout<<"sum result: "<<r1<<endl;
TarsOutputStream *os2=new TarsOutputStream();
String op2=String("uppercase");
String s=String("abcdefg");
os2->write(op2,0);
os2->write(s,1);
vector<char> req2,res2;
req2=os2->getByteBuffer();
int b2=sm.serve(req2,res2);
cout<<b2<<endl;
TarsInputStream *is2=new TarsInputStream();
is2->setBuffer(res2);
String r2;
is2->read(r2,1);
cout<<r2<<endl;
return 0;
}
经过编译,最终测试输出的内容如下: 测试结果与预期相符。
|