muduo Buffer类的设计与使用
muduo的IO模型
event loop是non-blocking网络编程的核心,non-blocking总是和IO multiplexing一起使用,原因如下:
- 不要使用轮询的方式来检查某个
non-blockingIO操作是否完成,这样浪费CPU。 IO multiplexing一般不能和blocking IO一起使用,因为blocking IO中read()/write()/accept()/connect()都有可能阻塞当前线程,这样线程就没有办法处理其他socket上的IO事件了。
为什么non-blocking网络编程中应用层buffer是必需的
non-blockingIO的核心思想是避免阻塞在read()或write()或其他IO系统调用上,这样可以最大限度地复用thread-of-control,让一个线程能服务于多个socket连接。IO线程只能阻塞在IO multiplexing函数上,如select/poll/epoll_wait,这样一来,应用层的缓冲是必需的,每个TCPsocket都要个stateful的input buffer和output buffer。
TcpConnection必须要有output buffer,考虑一个常见场景:程序想通过TCP连接发送100KB的数据,但是在write()调用中,操作系统只接受了80KB,这时肯定不想在原地等待,因为不知道等待多久,程序应该尽快交出控制权,返回event loop,这种情况下,剩余的20KB应该怎么办?
对于应用程序而言,它只管生成数据,它不应该关心到底数据是一次性发送还是分成几次发送,这些应该由网络库接管,程序只要调用TcpConnection::send()就可以了,网络库应该接管剩余的20KB,把它保存在该TCPconnection的output buffer里面,然后注册POLLOUT事件,一旦socket可写就立刻发送数据,当然,第二次write()也不一定能完全写入20KB,如果还有剩余,网络库应该继续关注POLLOUT事件,如果写完了20KB,网络库应该关闭POLLOUT。
如果程序又写入了50KB,这时output buffer里面还有待发送的20KB数据,那么网络库不应该直接调用write(),而应该把这50KB数据append在20KB数据之后,等socket变得可写的时候再一并写入。
如果output buffer里面还有待发送的数据,而程序又想关闭连接,那么这时候网络库不能立刻关闭连接,而要等数据发送完毕。
综上,要让程序在write操作上不阻塞,网络库必需要给每个TCPconnection配置output buffer。
TcpConnection必需要有input buffer,TCP是一个无边界的字节流协议,接收方必须要处理”收到的数据尚不构成一条完整的消息“和”一次收到两条消息的数据“等情况。一个常见的场景是,发送方send()了两条1KB的消息(共2KB),接收方收到数据的情况可能是:
- 一次性收到2KB数据
- 分两次收到,第一次600B,第二次1400B
- 等等
网络库在处理”socket可读”事件的时候,必须一次性把socket里的数据读完(从操作系统buffer搬到应用层buffer),否则会反复触发POLLIN事件,造成busy-loop,那么网络库必然要应对“数据不完整”的情况,收到的数据先放到input buffer里面,等构成一条完整的消息再通知程序的业务逻辑。所以,在TCP网络编程中,网络库必须要给每个TCPconnection配置input buffer。
muduo EventLoop采用的是epoll()的level trigger,而不是edge trigger。一是为了与传统的poll()兼容,因为在文件描述符数目较少,活动文件描述符比例较高时,epoll不见得比poll高效,必要时可以在进程启动时切换Poller,二是level trigger编程更容易。
所有muduo中的IO都是带缓冲的IO(buffered IO),你不会自己去read()或write()某个socket,只会操作TcpConnection的input buffer和output buffer。更确切地说,是在onMessage()回调里读取input buffer;调用TcpConnection::send()来间接操作output buffer,一般不会直接操作output buffer。
7.4.3 Buffer的功能需求
muduo Buffer的设计要点:
- 对外表现为一块连续的内存
(char* p, int len),方便客户代码的编写。 - 其
size()可以自动增长,以适应不同大小的消息。 - 内部以
std::vector<char>来保存数据,并提供相应的访问函数。
TcpConnection会有两个Buffer成员,input buffer与output buffer。
input buffer,TcpConnection从socket读取数据,然后写入input buffer;客户代码从input buffer读取。output buffer,客户代码会把数据写入output buffer;TcpConnection从output buffer读取数据并写入socket。
具体做法,在栈上准备一个65536字节的extrabuf,然后利用readv()来读取数据,iovec有两块,第一块指向muduo Buffer中的writable字节,另一块指向栈上extrabuf。如果这样读入的数据不多,那么全部都读到了Buffer中去了;如果长度超过Buffer的writable字节数,就会读到栈上的extrabuf里面,然后程序再把extrabuf里的数据append()到Buffer中。
这么做利用了临时栈空间,避免每个连接的初始Buffer过大造成的内存浪费,也避免反复调用read()的系统开销,
线程安全? muduo::net::Buffer不是线程安全的。
- 对于
input buffer,onMessage()回调始终发生在该TcpConnection所属的那个IO线程,应用程序应该在onMessage()完成对input buffer的操作,并且不要把input buffer暴露给其他线程。这样所有对input buffer的操作都在同一个线程,Buffer class不必是线程安全的。 - 对于
output buffer,应用程序不会直接操作它, 而是调用TcpConnection::send()来发送数据,后者是线程安全的。
代码中用EventLoop::assertInLoopThread()保证以上假设成立。
如果TcpConnection::send()调用发生在该TcpConnection所述的那个IO线程,那么它会转而调用TcpConnection::sendInLoop(),sendInLoop()会在当前线程操作output buffer;如果TcpConnection::send()调用发生在其他线程,它不会在当前线程调用sendInLoop(),而是通过EventLoop::runInLoop()把sendInLoop()函数转移到IO线程。这样sendInLoop()还会在IO线程操作output buffer,不会有线程安全问题,当然,跨线程的函数转移调用涉及函数参数的跨线程传递,一种简单的做法是把数据拷贝一份,绝对安全。
Buffer的数据结构
Buffer的内部是一个std::vector<char>,是一块连续的内存。Buffer有两个data member,指向该vector中的元素。这两个index的类型是int。
示意图中表示指针或下标的箭头所指位置的具体含义,对于长度为10的字符串“Chen shuo\n”,p0指向第0个字符(白色区域的开始),p1指向第5个字符(灰色区域的开始),p2指向\n之后的那个位置。

 两个index把vector分成三块,prependable,readable,writable,各块的大小如下。灰色部分是Buffer的有效载荷。
p
r
e
p
e
n
d
a
b
l
e
=
r
e
a
d
I
n
d
e
x
r
e
a
d
a
b
l
e
=
w
r
i
t
e
I
n
d
e
x
?
r
e
a
d
I
n
d
e
x
w
i
r
t
a
b
l
e
=
s
i
z
e
(
)
?
w
r
i
t
e
I
n
d
e
x
prependable = readIndex \\ readable = writeIndex - readIndex \\ wirtable = size() - writeIndex
prependable=readIndexreadable=writeIndex?readIndexwirtable=size()?writeIndex readIndex和writeIndex满足以下不等式
0
≤
r
e
a
d
I
n
d
e
x
≤
w
r
i
t
e
I
n
d
e
x
≤
s
i
z
e
(
)
0\le readIndex \le writeIndex \le size()
0≤readIndex≤writeIndex≤size() muduo Buffer里有两个常数kCheapPrepend和kInitialSize,定义了prependable的初始大小和writable的初始大小,readable的初始大小为0。在初始化之后,Buffer的数据结构如下图所示。
 根据以上公式,可计算出各块的大小,刚刚初始化的Buffer没有payload数据,所以readable == 0。
7.4.5 Buffer操作
Buffer初始化后的情况见图7-4。如果向Buffer写入了200字节,其布局如图7-6所示。
 图7-6中writeIndex向后移动了200字节,readIndex保持不变,readable和writeable的值变化。
如果从Buffer read() & retrieve()了50字节,结果如下图7-7所示。与图7-6相比,readIndex向后移动50字节。writeIndex保持不变,readable和writable改变。
 然后又写入200字节,writeIndex向后移动了200字节,readIndex保持不变,如图7-8所示。
 接下来一次性读取350字节。优于全部数据读完了,readIndex和writeIndex返回原位以备新一轮使用。

自动增长
muduoBuffer不是固定长度的,可以自动增长,是使用vector的优点。假设当前的状态如图7-10所示。

客户代码一次性写入1000字节,而当前可写的字节数只有624,那么buffer会自动增长以容纳全部数据,结果如下图所示。readIndex返回到了前面,以保持prependable等于kCheapPrependable。由于vector重新分配了内存,原来指向其元素的指针会失效,这就是为什么readIndex和writeIndex是整数下标,而不是指针。
 然后读入350字节,readIndex前移,如图7-12所示。
 最后,读完剩下的1000字节,readIndex和writeIndex返回kCheapPrependable,如下图7-13所示。
 注意buffer并没有缩小大小,下次写入1350字节就不会重新分配内存了。
size()和capacity()
使用vector的另一个好处是它的capacity()机制减少了内存分配的次数。vector的capacity()以指数方式增长,让push_back()的平均复杂度是常数。
内部腾挪
有时候,经过若干次读写,readIndex移动到了比较靠后的位置,留下了巨大的prependable空间,如下图所示。

这时候,如果想写入300字节,而writable只有200字节,这时候,Muduo Buffer不会重新分配内存,而是把已有的数据移动到前面去,腾出writable空间。如下图所示。
 然后就可以写入300字节,如下图所示。
 这么做的原因,如果重新分配内存,反正也是要把数据拷贝到新分配的内存区域,代价只会更大。
前方添加(prepend)
muduo Buffer提供prependable空间,让程序能以很低的代价在数据前面添加几个字节。
比方说,程序以固定的4个字节表示消息的长度,如果需要序列化一个消息,但是不知道它有多长,那么可以一直append()直到序列化完成,然后再往序列化数据的前面添加消息的长度。

 通过预留kCheapPrependable空间,可以简化客户代码,以空间换时间。
|