IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Tars RPC -> 正文阅读

[网络协议]Tars RPC

Linux内核

Socket

Socket 起源于 Unix ,Unix/Linux 基本哲学之一就是“一切皆文件”,都可以用“打开(open) –> 读写(write/read) –> 关闭(close)”模式来进行操作。因此 Socket 也被处理为一种特殊的文件或者文件描述符。

Socket 是对 TCP/IP 协议族的一种封装,是应用层与 TCP/IP 协议族通信的中间软件抽象层。Socket 还可以认为是一种网络间不同计算机上的进程通信的一种方法,利用三元组(IP地址,协议,端口号)就可以唯一标识网络中的进程,网络中的进程通信可以利用这个标志与其它进程进行交互。

五层网络模型:应用层(HTTP/SMTP)/传输层(TCP/UDP)/网络层(IP)/链路层/物理层;

socket.h

使用如下语句导入linux内核下的socket库

#include <sys/types.h> 
#include <sys/socket.h>

具体关于库的描述和说明可以在命令行下使用如下命令查看:

man socket

调用setsockopt或者getsockopt可以设置或者获取的一些选项:
在这里插入图片描述

内核接收网络数据全过程

  1. 内核等待网络数据,阻塞进程(阻塞的进程不会占用CPU);
  2. 计算机收到了对端传送的数据;
  3. 数据经由网卡传送到内存;
  4. 网卡通过中断信号通知CPU有数据到达,CPU执行中断程序;
  5. 中断程序主要有两项功能,先将网络数据写入到对应socket的接收缓冲区里面,再唤醒进程;

问题

  1. 操作系统如何知道网络数据对应于哪个socket?
    因为一个socket对应着一个端口号,而网络数据包中包含了ip和端口的信息,内核可以通过端口号找到对应的socket。当然,为了提高处理速度,操作系统会维护端口号到socket的索引结构,以快速读取。
  2. 如何同时监视多个socket的数据?
    多路复用。

epoll

epoll的存在就是为了高效地管理多个socket,实现多路复用。epoll的工作流程如下:

  1. 通过调用epoll_create,创建epoll对象,同时会在文件系统中产生相应的文件描述符;
    在这里插入图片描述
  2. 维护监事列表,通过调用epoll_ctl添加或删除所要监听的socket;
    在这里插入图片描述
  3. 接收数据,当socket收到数据后,中断程序会给eventpoll的rdlist添加socket引用;
    在这里插入图片描述
  4. 阻塞和唤醒进程,当进程调用epoll_wait语句时,内核将进程放入eventpool的等待队列中,并挂起进程;当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等待队列中的进程,进程再次进入运行状态。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    编码类型表
    在这里插入图片描述

编码模块

由于数据在传输过程中是以字节串的形式存在的,因此,在传输前后需要分别对数据进行编码和解码,这是一个序列化与反序列化的过程。在对数据进行序列化的过程中,将复杂的数据结构编码成字节串;而在对数据进行反序列化的过程中,则从字节串中还原复杂的数据结构。这个模块主要包括如下四个cpp文件:TarsType.h, Tars.h, Servant.h, ServantManager.h。它们的功能分别是,TarsType.h定义了编码模块中的数据定义类型以及数据存储类型;Tars.h文件是对数据编码以及解码过程的具体实现;Servant.h为服务端向客户端提供服务的基类,后续服务端需要实现具体的业务逻辑,只需要继承Servant,并实现其中的doRequest()函数;由于一个服务端包含的服务不止一个,因此我们利用ServantManger对不同的服务进行管理调度并实现对不同Servant的管理,根据用户的请求调用来选择不同的服务。同时,基于Servant,我们可以方便地实现实验中给出的sum()以及uppercase()远程调用。各个文件之间及类之间的关系如下:
在这里插入图片描述

基本类型

由于在不同编程语言中,数据的基本类型不同,因此我们对其进行归纳,将数据的基本类型划分为如下几种:

基本类型含义
Bool布尔类型
Char字符类型
Float单精度浮点数
Dobule双精度浮点数
Int88位带符号整数
Int1616位带符号整数
Int3232位带符号整数
Int6464位带符号整数
UInt88位无符号整数
UInt1616位无符号整数
UInt3232位无符号整数
UInt6464位无符号整数
String字符串类型

其中,Char类型与Int8类型在本质上是相同的,因此在实现过程中我们只需对其中一个进行实现。具体在代码中的定义如下:

复杂类型

除了基本类型之外,我们对基本类型进行不同形式的构造以及组合可以生成复杂类型。

结构

结构定义如下:

struct Test
{
    0  require  string s;
    1  optional int  i;
};

其中第一列的数字0和1分别表示相应字段的标识符Tag,并且Tag的值在[0,255]之间。require表示该字段必选,optional表示该字段可选。

序列

序列vector的定义如下:

vector<int> vi;

字典

字典map的定义如下:

map<int, string> m;

嵌套

任何struct,map,vector都可以嵌套,以构成更加复杂的数据结构。这里我们只给出了复杂类型的定义,但并不给出它们的具体实现方式。我们只在代码中实现了对基本类型的序列化和反序列化方法。

数据编码

每一个数据由两个部分组成,如下所示:

| 头信息 | 实际数据 |

而头部信息包含以下几个部分:

| Type(4 bits) | Tag 1(4 bits) | Tag 2(1 byte) |

Type表示类型,用4个二进制位表示,取值范围是0~15,用来标识该数据的类型。不同类型的数据,其后紧跟着的实际数据的长度和格式都是不一样的,详见一下的类型表。同时,这里的类型与上面提到的基本类型是不一样的,这里的类型仅表示数据的存储类型(如数据长度等存储信息),而上述提到的类型则是数据定义的类型。

取值类型含义
0int1紧跟1个字节的整型数据
1int2紧跟2个字节的整型数据
2int4紧跟4个字节的整型数据
3int8紧跟8个字节的整型数据
4float紧跟4个字节浮点型数据
5double紧跟8个字节浮点型数据
6string1紧跟1个字节长度,再跟内容
7string4紧跟4个字节长度,再跟内容
8map紧跟一个整型数据表示Map的大小,再跟[key, value]对列表
9list紧跟一个整型数据表示List的大小,再跟元素列表
10structbegin自定义结构开始标志
11structend自定义结构结束标志

具体在代码中的定义如下:
在这里插入图片描述
Tag为用户给定的标记,由Tag1和Tag2两部分组成。Tag 2是可选的,当Tag的值不超过14时,只需要用Tag 1就可以表示;当Tag的值超过14而小于256时,Tag 1固定为15,而用Tag 2表示Tag的值。Tag不允许大于255。
为了简化实现,我们在代码中按照这样的方式使用Tag:

  1. 客户端要向服务端发起RPC请求调用,客户端需传入请求的函数名及对应的参数;
  2. 客户端传入的请求函数名将用Tag为0来标记,之后传入的所有参数从1开始按序依次标记;
  3. 根据相应的Tag以及数据的定义类型,RPC请求调用的参数进行序列化;
  4. 服务端收到序列化的数据,对收到的数据进行反序列化;
  5. 具体地,服务端会首先解析出Tag为0的字段来确定客户端RPC请求调用的函数名,再依次解析出后续的参数,最后调用相应的函数。

编码实现

编码的具体实现在Tars.h中,其中两个主要的类TarsInputStream以及TarsOutputStream它们分别实现了对数据结构的解码和编码的过程,即反序列化以及序列化。同时这两个类又分别继承自BufferReader以及BufferWriter类,这两个类主要是对缓冲读取器和缓冲写入器的封装,具体地,它们的定义如下:

 class BufferReader
    {
    public:
        const char *_buf; ///< 缓冲区
        size_t _buf_len;  ///< 缓冲区长度
        size_t _cur;      ///< 当前位置

    public:
        BufferReader() : _buf(NULL), _buf_len(0), _cur(0) {}
        ~BufferReader()
        {
            delete[] _buf;
        }
        void reset();
        
        /// 读取缓存
        void readBuf(void *buf, size_t len);
        
        /// 读取缓存,但不改变偏移量
        void peekBuf(void *buf, size_t len, size_t offset = 0);

        /// 读取缓存 for vector<char>
        void readBuf(std::vector<char> &v, size_t len);

		/// 读取缓存,但不改变偏移量 for vector<char>
        void peekBuf(std::vector<Char> &v, size_t len, size_t offset = 0);
        
        /// 设置缓存
        void setBuffer(const char *buf, size_t len);
        
        /// 设置缓存
        void setBuffer(const std::vector<char> &buf);
        
        /// 判断是否已经到buf的末尾
        bool hasEnd();

		size_t len() const;

		const char *base() const;

		size_t size() const;    
   }

class BufferWriter
    {
    public:
        char *_buf;
        size_t _len;
        size_t _buf_len;

        static char *_reserve(BufferWriter &os, size_t len)
        {
            char *p = new char[(len)];
            memcpy(p, (os)._buf, (os)._len);
            delete[](os)._buf;
            return p;
        }

    public:
        BufferWriter() : _buf(NULL), _len(0), _buf_len(0) {}
        ~BufferWriter()
        {
            delete[] _buf;
        }
        void reset();
        
        /// 添加缓存内容
        void writeBuf(const void *buf, size_t len);
        
        /// 获取缓存内容
        std::vector<char> getByteBuffer() const;

		size_t len() const;

		const char *base() const;

		size_t size() const;   
        
     }

其中TarsInputStream继承BufferReader类,并实现类Read()函数,其主要功能为从缓冲区中读取对应Tag的数据,并将数据解析为相应的类型,这里的类型即为上面给出的数据定义类型中的基本类型。这里给出将数据读取为Char类型的具体实现,其他类型过程相似:

        void read(Char &c, uint8_t tag)
        {
            uint8_t headType = 0, headTag = 0;
            bool skipFlag = false;
            TarsSkipToTag(skipFlag, tag, headType, headTag);
            if (skipFlag)
            {
                switch (headType)
                {
                case TarsHeadeChar:
                    TarsReadTypeBuf(*this, c, Char);
                    break;
                default:
                {
                    char s[64];
                    snprintf(s, sizeof(s), "read 'Char' type mismatch, tag: %d, get type: %d.", tag, headType);
                    throw TarsDecodeMismatch(s);
                }
                }
            }
            else
            {
                char s[64];
                snprintf(s, sizeof(s), "require field not exist, tag: %d, headTag: %d.", tag, headTag);
                throw TarsDecodeRequireNotExist(s);
            }
        }

读取的过程为,首先跳转到Buffer中指定Tag的位置,然后判断跳转是否成功,如果跳转成功,则根据头部的Type字段判断数据是否能够被解析为Char类型,如果可以,则将数据以Char类型读出;其他情况均为读取失败,返回提示信息。

其中TarsOutputStream继承BufferWriter类,并实现类Write()函数,其主要功能为向缓冲区中写入对应Tag的数据,并将数据编码为相应的类型,这里的类型即为上面给出的数据定义类型中的基本类型。这里给出将数据读取为Char类型的具体实现,其他类型过程相似:

        void write(Char c, uint8_t tag)
        {
            TarsWriteToHead(*this, TarsHeadeChar, tag);
            TarsWriteToBody(*this, c, (*this)._len, Char);
        }

写入的实现过程比较简单,其主要做的就是向Buffer末尾首先写入头部信息(带上指定的Tag),然后Buffer写入具体的数据信息。

服务模版

Servant.h中给出了实现RPC服务的模版,它的主要内容如下:

	class Servant
	{
	public:
		Servant()
		{
			is = new TarsInputStream();
			os = new TarsOutputStream();
		}

		~Servant()
		{
			is=NULL;
			os=NULL;
		}

		void setName(const string &name)
		{
			_name = name;
		}

		string getName() const
		{
			return _name;
		}

		virtual void initialize() = 0;

		virtual void destroy() = 0;

		virtual int doRequest(const vector<char> &request,  vector<char> &response) { return -1; }

	protected:

		string _name;

		TarsInputStream *is;
		TarsOutputStream *os;
	};

每个具体的服务都有它们对应的名字,这个名字在_name字段中给出,同时它利用到了Tars.h中实现的TarsInputStream和TarsOutputStream来对输入中的字节数据进行解码,并对最终的调用执行结果(返回输出)进行编码。每一个具体的服务,需要继承Servant这个基类,并实现其中的initialize()、destroy()以及doRequest()三个函数,同时主要的RPC处理是在doRequest()这个函数中进行的。

服务管理

ServantManager.h文件则实现了对不同的RPC服务进行管理,其主要内容如下:

   class ServantManager
    {
    public:
        template <typename T>
        void addServant(const string &id)
        {
            if (servant_creator.find(id) != servant_creator.end())
            {
                cerr << "ServantManager::addServant id: " << id << " already exists!" << endl;
                throw runtime_error("ServantManager::addServant id: " + id + " already exists!");
            }
            shared_ptr<ServantCreation<T>> p = make_shared<ServantCreation<T>>();
            //ServantCreation<T>* p=new ServantCreation<T>();
            servant_creator[id] = p;
        }

        ServantPtr create(const string &id)
        {
            ServantPtr servant = NULL;
            if (servant_creator.find(id) != servant_creator.end())
            {
                cout << "ServantManager::create servant name " << id << endl;
                servant = servant_creator[id]->create(id);
            }
            return servant;
        }

        int serve(const vector<char> &request, vector<char> &response)
        {
            TarsInputStream *is = new TarsInputStream();
            is->setBuffer(request);
            String op;
            is->read(op, 0);
            ServantPtr p = create(op);
            if (p != NULL)
            {
                p->doRequest(request, response);
                return 0;
            }
            else
            {
                return -1;
            }
        }

    protected:
        map<string, ServantHelperCreationPtr> servant_creator;

        //TC_ThreadMutex _mutex;
    };

其中,addServant()函数由服务端根据自身的业务需求添加相应的服务处理函数;create()函数根据RPC请求的函数名,创建一个具体的RPC处理实例,处理客户端的RPC请求;最后,serve()函数则是解析客户端请求中需要调用的RPC服务,来选择不同的RPC请求,并调用create()函数创建具体的实例。

Sum服务

我们利用Sevant模版,实现远程调用 float sum(float a, float b) ,其主要函数doRequest实现如下:

int SumImp::doRequest(const vector<char> &request,  vector<char> &response)
{
	cout<<"SumImp::doRequest"<<endl;

	is->setBuffer(request);
	tars::Float i1,i2;
	is->read(i1,1);
	is->read(i2,2);

	tars::Float i=i1+i2;
	
	os->write(i,1);
	response=os->getByteBuffer();

	return 0;
}

Uppercase服务

我们利用Sevant模版,实现远程调用 string uppercase(str),其主要函数doRequest实现如下:

int UppercaseImp::doRequest(const vector<char> &request,  vector<char> &response)
{
	cout<<"UppercaseImp::doRequest"<<endl;
	
	is->setBuffer(request);
	tars::String s;
	is->read(s,1);

	transform(s.begin(), s.end(), s.begin(), ::toupper);  

	os->write(s,1);
	response=os->getByteBuffer();

	return 0;
}

模块测试

编写测试代码对两个服务进行测试,内容如下:

int main(){
    //创建一个ServantManger
    ServantManager sm;
    
    //添加add服务接口
    sm.addServant<SumImp>("sum");
    sm.addServant<UppercaseImp>("uppercase");

    /// sum test
    TarsOutputStream *os1=new TarsOutputStream();
    String op1=String("sum");
    Float x=10.1;
    Float y=12.2;
    os1->write(op1,0);
    os1->write(x,1);
    os1->write(y,2);

    vector<char> req1,res1;
    req1=os1->getByteBuffer();
    int b1=sm.serve(req1,res1);
    cout<<b1<<endl;

    TarsInputStream *is1=new TarsInputStream();
    is1->setBuffer(res1);
    Float r1;
    is1->read(r1,1);
    cout<<"sum result: "<<r1<<endl;

    /// uppercasetest
    TarsOutputStream *os2=new TarsOutputStream();
    String op2=String("uppercase");
    String s=String("abcdefg");
    os2->write(op2,0);
    os2->write(s,1);

    vector<char> req2,res2;
    req2=os2->getByteBuffer();
    int b2=sm.serve(req2,res2);
    cout<<b2<<endl;

    TarsInputStream *is2=new TarsInputStream();
    is2->setBuffer(res2);
    String r2;
    is2->read(r2,1);
    cout<<r2<<endl;


    return 0;
}

经过编译,最终测试输出的内容如下:
在这里插入图片描述
测试结果与预期相符。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-11-30 15:58:44  更:2021-11-30 15:59:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 21:43:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码