muduo Buffer类的设计与使用
muduo的IO模型
event loop 是non-blocking 网络编程的核心,non-blocking 总是和IO multiplexing 一起使用,原因如下:
- 不要使用轮询的方式来检查某个
non-blocking IO操作是否完成,这样浪费CPU。 IO multiplexing 一般不能和blocking IO 一起使用,因为blocking IO 中read()/write()/accept()/connect() 都有可能阻塞当前线程,这样线程就没有办法处理其他socket 上的IO事件了。
为什么non-blocking网络编程中应用层buffer是必需的
non-blocking IO的核心思想是避免阻塞在read() 或write() 或其他IO系统调用上,这样可以最大限度地复用thread-of-control ,让一个线程能服务于多个socket 连接。IO线程只能阻塞在IO multiplexing 函数上,如select/poll/epoll_wait ,这样一来,应用层的缓冲是必需的,每个TCPsocket 都要个stateful 的input buffer 和output buffer 。
TcpConnection 必须要有output buffer ,考虑一个常见场景:程序想通过TCP连接发送100KB的数据,但是在write() 调用中,操作系统只接受了80KB,这时肯定不想在原地等待,因为不知道等待多久,程序应该尽快交出控制权,返回event loop ,这种情况下,剩余的20KB应该怎么办?
对于应用程序而言,它只管生成数据,它不应该关心到底数据是一次性发送还是分成几次发送,这些应该由网络库接管,程序只要调用TcpConnection::send() 就可以了,网络库应该接管剩余的20KB,把它保存在该TCPconnection 的output buffer 里面,然后注册POLLOUT 事件,一旦socket 可写就立刻发送数据,当然,第二次write() 也不一定能完全写入20KB,如果还有剩余,网络库应该继续关注POLLOUT 事件,如果写完了20KB,网络库应该关闭POLLOUT 。
如果程序又写入了50KB,这时output buffer 里面还有待发送的20KB数据,那么网络库不应该直接调用write() ,而应该把这50KB数据append 在20KB数据之后,等socket 变得可写的时候再一并写入。
如果output buffer 里面还有待发送的数据,而程序又想关闭连接,那么这时候网络库不能立刻关闭连接,而要等数据发送完毕。
综上,要让程序在write 操作上不阻塞,网络库必需要给每个TCPconnection 配置output buffer 。
TcpConnection必需要有input buffer,TCP是一个无边界的字节流协议,接收方必须要处理”收到的数据尚不构成一条完整的消息“和”一次收到两条消息的数据“等情况。一个常见的场景是,发送方send() 了两条1KB的消息(共2KB),接收方收到数据的情况可能是:
- 一次性收到2KB数据
- 分两次收到,第一次600B,第二次1400B
- 等等
网络库在处理”socket 可读”事件的时候,必须一次性把socket 里的数据读完(从操作系统buffer 搬到应用层buffer ),否则会反复触发POLLIN 事件,造成busy-loop ,那么网络库必然要应对“数据不完整”的情况,收到的数据先放到input buffer 里面,等构成一条完整的消息再通知程序的业务逻辑。所以,在TCP网络编程中,网络库必须要给每个TCPconnection 配置input buffer 。
muduo EventLoop 采用的是epoll() 的level trigger ,而不是edge trigger 。一是为了与传统的poll() 兼容,因为在文件描述符数目较少,活动文件描述符比例较高时,epoll 不见得比poll 高效,必要时可以在进程启动时切换Poller ,二是level trigger 编程更容易。
所有muduo中的IO都是带缓冲的IO(buffered IO ),你不会自己去read() 或write() 某个socket ,只会操作TcpConnection 的input buffer 和output buffer 。更确切地说,是在onMessage() 回调里读取input buffer ;调用TcpConnection::send() 来间接操作output buffer ,一般不会直接操作output buffer 。
7.4.3 Buffer的功能需求
muduo Buffer 的设计要点:
- 对外表现为一块连续的内存
(char* p, int len) ,方便客户代码的编写。 - 其
size() 可以自动增长,以适应不同大小的消息。 - 内部以
std::vector<char> 来保存数据,并提供相应的访问函数。
TcpConnection 会有两个Buffer 成员,input buffer 与output buffer 。
input buffer ,TcpConnection 从socket 读取数据,然后写入input buffer ;客户代码从input buffer 读取。output buffer ,客户代码会把数据写入output buffer ;TcpConnection 从output buffer 读取数据并写入socket 。
具体做法,在栈上准备一个65536字节的extrabuf ,然后利用readv() 来读取数据,iovec 有两块,第一块指向muduo Buffer 中的writable 字节,另一块指向栈上extrabuf 。如果这样读入的数据不多,那么全部都读到了Buffer 中去了;如果长度超过Buffer 的writable 字节数,就会读到栈上的extrabuf 里面,然后程序再把extrabuf 里的数据append() 到Buffer 中。
这么做利用了临时栈空间,避免每个连接的初始Buffer 过大造成的内存浪费,也避免反复调用read() 的系统开销,
线程安全? muduo::net::Buffer 不是线程安全的。
- 对于
input buffer ,onMessage() 回调始终发生在该TcpConnection 所属的那个IO线程,应用程序应该在onMessage() 完成对input buffer 的操作,并且不要把input buffer 暴露给其他线程。这样所有对input buffer 的操作都在同一个线程,Buffer class 不必是线程安全的。 - 对于
output buffer ,应用程序不会直接操作它, 而是调用TcpConnection::send() 来发送数据,后者是线程安全的。
代码中用EventLoop::assertInLoopThread() 保证以上假设成立。
如果TcpConnection::send() 调用发生在该TcpConnection 所述的那个IO线程,那么它会转而调用TcpConnection::sendInLoop() ,sendInLoop() 会在当前线程操作output buffer ;如果TcpConnection::send() 调用发生在其他线程,它不会在当前线程调用sendInLoop() ,而是通过EventLoop::runInLoop() 把sendInLoop() 函数转移到IO线程。这样sendInLoop() 还会在IO线程操作output buffer ,不会有线程安全问题,当然,跨线程的函数转移调用涉及函数参数的跨线程传递,一种简单的做法是把数据拷贝一份,绝对安全。
Buffer的数据结构
Buffer 的内部是一个std::vector<char> ,是一块连续的内存。Buffer 有两个data member ,指向该vector 中的元素。这两个index 的类型是int 。
示意图中表示指针或下标的箭头所指位置的具体含义,对于长度为10的字符串“Chen shuo\n”,p0 指向第0个字符(白色区域的开始),p1 指向第5个字符(灰色区域的开始),p2 指向\n 之后的那个位置。
两个index 把vector 分成三块,prependable ,readable ,writable ,各块的大小如下。灰色部分是Buffer 的有效载荷。
p
r
e
p
e
n
d
a
b
l
e
=
r
e
a
d
I
n
d
e
x
r
e
a
d
a
b
l
e
=
w
r
i
t
e
I
n
d
e
x
?
r
e
a
d
I
n
d
e
x
w
i
r
t
a
b
l
e
=
s
i
z
e
(
)
?
w
r
i
t
e
I
n
d
e
x
prependable = readIndex \\ readable = writeIndex - readIndex \\ wirtable = size() - writeIndex
prependable=readIndexreadable=writeIndex?readIndexwirtable=size()?writeIndex readIndex 和writeIndex 满足以下不等式
0
≤
r
e
a
d
I
n
d
e
x
≤
w
r
i
t
e
I
n
d
e
x
≤
s
i
z
e
(
)
0\le readIndex \le writeIndex \le size()
0≤readIndex≤writeIndex≤size() muduo Buffer 里有两个常数kCheapPrepend 和kInitialSize ,定义了prependable 的初始大小和writable 的初始大小,readable 的初始大小为0。在初始化之后,Buffer 的数据结构如下图所示。
根据以上公式,可计算出各块的大小,刚刚初始化的Buffer 没有payload 数据,所以readable == 0 。
7.4.5 Buffer操作
Buffer 初始化后的情况见图7-4。如果向Buffer 写入了200字节,其布局如图7-6所示。
图7-6中writeIndex 向后移动了200字节,readIndex 保持不变,readable 和writeable 的值变化。
如果从Buffer read() & retrieve() 了50字节,结果如下图7-7所示。与图7-6相比,readIndex 向后移动50字节。writeIndex 保持不变,readable 和writable 改变。
然后又写入200字节,writeIndex 向后移动了200字节,readIndex 保持不变,如图7-8所示。
接下来一次性读取350字节。优于全部数据读完了,readIndex 和writeIndex 返回原位以备新一轮使用。
自动增长
muduoBuffer 不是固定长度的,可以自动增长,是使用vector 的优点。假设当前的状态如图7-10所示。
客户代码一次性写入1000字节,而当前可写的字节数只有624,那么buffer 会自动增长以容纳全部数据,结果如下图所示。readIndex 返回到了前面,以保持prependable 等于kCheapPrependable 。由于vector 重新分配了内存,原来指向其元素的指针会失效,这就是为什么readIndex 和writeIndex 是整数下标,而不是指针。
然后读入350字节,readIndex 前移,如图7-12所示。
最后,读完剩下的1000字节,readIndex 和writeIndex 返回kCheapPrependable ,如下图7-13所示。
注意buffer 并没有缩小大小,下次写入1350字节就不会重新分配内存了。
size()和capacity()
使用vector 的另一个好处是它的capacity() 机制减少了内存分配的次数。vector 的capacity() 以指数方式增长,让push_back() 的平均复杂度是常数。
内部腾挪
有时候,经过若干次读写,readIndex 移动到了比较靠后的位置,留下了巨大的prependable 空间,如下图所示。
这时候,如果想写入300字节,而writable 只有200字节,这时候,Muduo Buffer不会重新分配内存,而是把已有的数据移动到前面去,腾出writable 空间。如下图所示。
然后就可以写入300字节,如下图所示。
这么做的原因,如果重新分配内存,反正也是要把数据拷贝到新分配的内存区域,代价只会更大。
前方添加(prepend)
muduo Buffer提供prependable 空间,让程序能以很低的代价在数据前面添加几个字节。
比方说,程序以固定的4个字节表示消息的长度,如果需要序列化一个消息,但是不知道它有多长,那么可以一直append() 直到序列化完成,然后再往序列化数据的前面添加消息的长度。
通过预留kCheapPrependable 空间,可以简化客户代码,以空间换时间。
|