3 Lab 4: The TCP connection
要点
- 根据任务指导将发送报文段和接受报文段的过程在
TCPConnection 的方法中进行实现 - TCP 的四次挥手过程的实现, 即 TCP 的正常关闭(clean shutdown). 特别是关于是否延迟关闭(linger after streams finish)的判断. TCP 可以正常关闭的 4 个前提是:
- Prereq #1: 入站流(接收的数据流)已经被完全重组且已经结束.
- Prereq #2: 出站流(发送的数据流)已由本地应用程序结束写入并且完全发送(包括结束带有 FIN 标志位的报文段)到远程对等点.
- Prereq #3: 出站流已被远程对等点完全确认.
- Prereq #4: 本地 TCP 连接确信远程对等点可以满足前提 #3.
思路
直接调用成员方法
根据任务指导 FAQs 部分, 可以实验可以从 remaining_outbound_capacity() , bytes_in_flight() 和 unassembled_bytes() 这几个函数开始. 这些函数的实现十分简单, 可以直接调用 _sender 或 _receiver 的方法即可满足, 在此不多赘述.
发送报文段
在整个 TCPConnection 所要处理的部分中, 发送报文段的内容相对比较简单和清晰, 可以直接按照任务指导完成. 不过根据 TCPConnection 提供的公共方法来看, 并没有专门用于发送的方法, 而是作为其中一些方法中的一部分存在的. 考虑到方法会多次重用, 笔者定义了一个私有方法 send_segments() 用于发送报文段. 代码如下:
void TCPConnection::send_segments() {
while (!_sender.segments_out().empty()) {
TCPSegment segment = _sender.segments_out().front();
_sender.segments_out().pop();
optional<WrappingInt32> ackno = _receiver.ackno();
if (ackno.has_value()) {
segment.header().ack = true;
segment.header().ackno = ackno.value();
}
segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
? _receiver.window_size()
: numeric_limits<uint16_t>::max();
_segments_out.emplace(segment);
}
}
首先最外层是一个 while 循环, 即将当前 _sender 的发送队列 _sender.segments_out() 中的所有报文段全部转移至 TCPConnection 自己的发送队列 _segments_out . 循环内部主要就是设置根据 _receiver 确定的确认号以及窗口大小. 窗口大小是 uint16_t 类型, _receiver.window_size 是 size_t 类型, 参照 FAQs 利用 numeric_limits 来取最大值, 防止赋值时越界的情况. 而对于发送报文段, 需要重点注意的是发送报文段的时机. 根据任务指导, segment_received() write() tick()``end_input_stream() 以及 connect() 方法均需要调用 send_segments() .
connect() 方法
该方法即用于开始时主动建立 TCP 连接, 因此必然需要 send_segmens() 方法, 而在此之前, 要先让 _sender 构造报文, 即调用 _sender.fill_window() , 初始时发送窗口为 1, 自然就能发送 SYN 报文段.
void TCPConnection::connect() {
_sender.fill_window();
send_segments();
}
write() 方法
该方法即上层应用向 TCP 的字节流中写入数据进行发送. 因此需要调用 _sender.stream_in().write() 方法. 在调用之后同样需要移动发送窗口并且发送报文段, 以及时将写入的数据发送出去.
size_t TCPConnection::write(const string &data) {
if (!_active) {
return 0;
}
size_t ret = _sender.stream_in().write(data);
_sender.fill_window();
send_segments();
return ret;
}
end_input_stream() 方法
该方法即结束需要发送的数据流, 即出站流. 因此需要调用 _sender.steam_in().end_input() 方法. 而结束流的隐含信息是要发送一个 FIN 报文段, 因此此时同样需要移动发送窗口, 并且发送报文段, 以确保 FIN 报文段能够及时发出.
void TCPConnection::end_input_stream() {
_sender.stream_in().end_input();
_sender.fill_window();
send_segments();
}
接收报文段
接收报文段即对应 segment_received() 方法, 也是 TCPConnection 中关键的方法之一. 其主要部分根据任务指导完成, 但其中也隐含有其它一些需要注意的地方, 需要结合 TCP 的状态图去考虑.
void TCPConnection::segment_received(const TCPSegment &seg) {
if (!_active) {
return;
}
_time_since_last_segment_received = 0;
const TCPHeader &header = seg.header();
if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) {
if(!header.syn) {
return;
}
_receiver.segment_received(seg);
connect();
return;
}
if (header.rst) {
unclean_shutdown();
return;
}
_receiver.segment_received(seg);
if (!_receiver.ackno().has_value()) {
return;
}
if (!_sender.stream_in().eof() && _receiver.stream_out().input_ended()) {
_linger_after_streams_finish = false;
}
if (header.ack) {
_sender.ack_received(header.ackno, header.win);
}
_sender.fill_window();
if (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {
_sender.send_empty_segment();
}
if (seg.length_in_sequence_space() == 0 && header.seqno == _receiver.ackno().value() - 1) {
_sender.send_empty_segment();
}
send_segments();
}
在接收到报文判断 RST 标志位之前, 首先有一个条件判断 if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) . 该判断即当前 TCP 连接既没有收到 SYN 报文段, 也没有发送报文段的情形 此时 TCP 作为一个服务端, 在 LISTEN 状态, 此时只会考虑接收 SYN 报文段, 并且接收到 SYN 报文段发送 SYN+ACK 报文段进入 SYN_RECV 状态. 这里值得一提的一点是, 如果本地和远程同时打开 TCP 连接, 同时发送了 SYN 报文段, 根据 TCP 状态图, 即两个 TCP 连接均处于 SYN_SENT 状态, 这时双方接收到彼此的 SYN 报文段后都应该发 SYN+ACK 报文, 进入 SYN_RECV 状态. 这里笔者考虑单独调用 _receiver.segment_received() 和 connect() 原因正是为了处理这种情况. 如果去掉 15~18 行代码, 仍然能通过测试, 但遇到同时打开的情况时则会发送一个 ACK 的空报文段, 不符合 TCP 的状态转移. 而在这种情况下, 由于没有收到 ACK, 也就不会更新发送窗口, 因此调用 connect() 方法, 乃至用户使用 write() 方法都不会发送报文, 而是等到超时通过 _sender.tick() 方法重发一个 SYN+ACK 报文, 从而满足了 TCP 的正确转移状态的要求(注: 正常情况下 SYN 和 FIN 标志位不会出现在同一报文段中). 接下来即考虑收到 RST 报文段, 进行 TCP 连接的非正常关闭(unclean shutdown). 具体关于 TCP 连接的关闭方式见后文. 在经 _receiver.segment_received() 方法处理后, 仍有一个 if (!_receiver.ackno().has_value()) 的条件判断. 改判断是考虑 TCP 作为客户端, 已发送了一个 SYN 报文段, 此时在 SYN_SENT 状态, 这时若未收到对方的 SYN 报文(但可能收到了 RST 报文, 因此在 RST 报文判断之后), 则为无效报文, 不进行后续操作. 之后是对 _linger_after_streams_finish 变量的设置, 这里涉及 TCP 的正常关闭具体见后文. 紧接着是如果有确认标志位时, 通过远程对等点发送过来的确认号和接收窗口大小来更新本地 TCP 连接的发送窗口. 接下来第一个 if 语句是在收到一个占有序列号的报文段时, 若此时没有数据需要发送, 即 _sender 的发送队列为空, 便构造一个空报文段来进行确认. 第二个 if 语句则是任务指导中提到的特殊情况. 在此之前需要注意的是要调用 _sender.fill_window() 来更新发送窗口, 只有这样才能确定此时 _sender 的发送队列是否真正为空, 以避免发送多余的空报文段. 最后调用 send_segments() 即用于对接收到的报文进行确认.
TCP 连接的关闭
TCP 连接的关闭分为两种: 非正常关闭(unclean shutdown)和正常关闭(clean shutdown), 具体可见任务指导第 5 部分. 此处创建了一个私有成员 _active 用于反应 TCP 的连接状态, 由 active() 方法返回.
unclean shutdown
非正常关闭相对简单, 即收到或发出 RST 报文的情况时, 需要对 TCP 连接进行关闭, 主要就是关闭发送和接收的字节流同时置 _active 为 false .
inline void TCPConnection::unclean_shutdown() {
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_active = false;
}
发送 RST 报文主要在 tick() 方法和 TCPConnection 的析构函数中, 此处笔者又封装了一个 send_RST() 方法, 实际上就是从发送队列拿去一个报文添加上 RST 标志位.
void TCPConnection::send_RST() {
_sender.fill_window();
if (_sender.segments_out().empty()) {
_sender.send_empty_segment();
}
TCPSegment segment = _sender.segments_out().front();
_sender.segments_out().pop();
optional<WrappingInt32> ackno = _receiver.ackno();
if (ackno.has_value()) {
segment.header().ack = true;
segment.header().ackno = ackno.value();
}
segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
? _receiver.window_size()
: numeric_limits<uint16_t>::max();
segment.header().rst = true;
_segments_out.emplace(segment);
unclean_shutdown();
}
- 这里笔者有个问题不是很确定: 一是对于 RST 报文段, 实际上不添加确认号和窗口大小也可以通过本实验测试, 是否有必要或者是否应该添加这两个字段? 特别是窗口大小字段, 此时告知对方窗口大小是没有意义的, 因为连接就要关闭了.
clean shutdown
TCP 的正常关闭即 TCP 的四次挥手, 是本实验中最为复杂的地方, 其中也有笔者不是很确定的地方. TCP 正常关闭的四个前提已经在 #要点 中提到, 如下即前提与代码的对应:
- Prereq#1:
_receiver.unassembled_bytes()==0 && _receiver.stream_out().input_ended() , 实际上可以直接转化为 _receiver.stream_out().input_ended() (此处使用 _receiver.stream_out().eof() 方法同样能通过测试, 任务指导中描述使用的 ended, 因此这里笔者选择了前者). - Prereq#2:
_sender.stream_in().eof() . 这里需要是 eof() 方法而非 input_ended() , 因为要确保发送字节流的所有数据已经全部发送出去. - Prereq#3:
_sender.bytes_in_flight()==0
此外, TCP 的主动关闭方会有一个 TIME_WAIT 状态, 需要进行延迟(linger)关闭, 涉及变量 _linger_after_streams_finish . 根据任务指导, 入站流结束但出站流未到达 EOF 时会将该变量置为 false . 即作为被动关闭方是无需延迟的. 此处, 笔者将该变量的设置写在了 segment_received() 方法中, 因为在接收到报文段后会更新入站流 _receiver.stream_in() 的状态, 一旦入站流结束能够在第一时间将 _linger_after_streams_finish 置为 false . 因此主动关闭的代码如下:
if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {
_active = false;
return;
}
前面三个即 TCP 连接正常关闭的前三个前提. 对于 _linger_after_streams_finish 为 false 的情况可以直接关闭; 否则则需要延迟一段时间(2MSL)后关闭.
tick() 方法
根据实验指导, tick() 方法主要就是三个作用, 一是传递给 _sender.tick() 方法; 一个是在超时重发次数超过上限时发送 RST 报文关闭连接, 以及必要时正常关闭连接. 这里有额外两行代码: _time_since_last_segment_received += ms_since_last_tick 是用于更新 _time_since_last_segment_received 变量, 配合 segment_received() 方法中该变量的重置, 达到记录距离收到上个报文的时间的目的. 而最后的 send_segments() 是考虑到 _sender.tick() 方法可能会重发一个报文到其发送队列, 此时再转移至 TCPConnection 的发送队列进行发送.
void TCPConnection::tick(const size_t ms_since_last_tick) {
if (!_active) {
return;
}
_time_since_last_segment_received += ms_since_last_tick;
_sender.tick(ms_since_last_tick);
if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
send_RST();
return;
}
if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 &&
(!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {
_active = false;
return;
}
send_segments();
}
代码
libsponge/tcp_connection.hh
#ifndef SPONGE_LIBSPONGE_TCP_FACTORED_HH
#define SPONGE_LIBSPONGE_TCP_FACTORED_HH
#include "tcp_config.hh"
#include "tcp_receiver.hh"
#include "tcp_sender.hh"
#include "tcp_state.hh"
class TCPConnection {
private:
TCPConfig _cfg;
TCPReceiver _receiver{_cfg.recv_capacity};
TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn};
std::queue<TCPSegment> _segments_out{};
bool _linger_after_streams_finish{true};
bool _active{true};
size_t _time_since_last_segment_received{0};
void send_segments();
void send_RST();
void unclean_shutdown();
public:
void connect();
size_t write(const std::string &data);
size_t remaining_outbound_capacity() const;
void end_input_stream();
ByteStream &inbound_stream() { return _receiver.stream_out(); }
size_t bytes_in_flight() const;
size_t unassembled_bytes() const;
size_t time_since_last_segment_received() const;
TCPState state() const { return {_sender, _receiver, active(), _linger_after_streams_finish}; };
void segment_received(const TCPSegment &seg);
void tick(const size_t ms_since_last_tick);
std::queue<TCPSegment> &segments_out() { return _segments_out; }
bool active() const;
explicit TCPConnection(const TCPConfig &cfg) : _cfg{cfg} {}
~TCPConnection();
TCPConnection() = delete;
TCPConnection(TCPConnection &&other) = default;
TCPConnection &operator=(TCPConnection &&other) = default;
TCPConnection(const TCPConnection &other) = delete;
TCPConnection &operator=(const TCPConnection &other) = delete;
};
#endif
libsponge/tcp_connection.cc
#include "tcp_connection.hh"
#include <iostream>
template <typename... Targs>
void DUMMY_CODE(Targs &&...) {}
using namespace std;
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }
size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }
size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }
size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }
void TCPConnection::segment_received(const TCPSegment &seg) {
if (!_active) {
return;
}
_time_since_last_segment_received = 0;
const TCPHeader &header = seg.header();
if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) {
if (!header.syn) {
return;
}
_receiver.segment_received(seg);
connect();
return;
}
if (header.rst) {
unclean_shutdown();
return;
}
_receiver.segment_received(seg);
if (!_receiver.ackno().has_value()) {
return;
}
if (!_sender.stream_in().eof() && _receiver.stream_out().input_ended()) {
_linger_after_streams_finish = false;
}
if (header.ack) {
_sender.ack_received(header.ackno, header.win);
}
_sender.fill_window();
if (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {
_sender.send_empty_segment();
}
if (seg.length_in_sequence_space() == 0 && header.seqno == _receiver.ackno().value() - 1) {
_sender.send_empty_segment();
}
send_segments();
}
bool TCPConnection::active() const { return _active; }
size_t TCPConnection::write(const string &data) {
if (!_active) {
return 0;
}
size_t ret = _sender.stream_in().write(data);
_sender.fill_window();
send_segments();
return ret;
}
void TCPConnection::tick(const size_t ms_since_last_tick) {
if (!_active) {
return;
}
_time_since_last_segment_received += ms_since_last_tick;
_sender.tick(ms_since_last_tick);
if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
send_RST();
return;
}
if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 &&
(!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {
_active = false;
return;
}
send_segments();
}
void TCPConnection::end_input_stream() {
_sender.stream_in().end_input();
_sender.fill_window();
send_segments();
}
void TCPConnection::connect() {
_sender.fill_window();
send_segments();
}
TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
send_RST();
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}
void TCPConnection::send_segments() {
while (!_sender.segments_out().empty()) {
TCPSegment segment = _sender.segments_out().front();
_sender.segments_out().pop();
optional<WrappingInt32> ackno = _receiver.ackno();
if (ackno.has_value()) {
segment.header().ack = true;
segment.header().ackno = ackno.value();
}
segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
? _receiver.window_size()
: numeric_limits<uint16_t>::max();
_segments_out.emplace(segment);
}
}
void TCPConnection::send_RST() {
_sender.fill_window();
if (_sender.segments_out().empty()) {
_sender.send_empty_segment();
}
TCPSegment segment = _sender.segments_out().front();
_sender.segments_out().pop();
optional<WrappingInt32> ackno = _receiver.ackno();
if (ackno.has_value()) {
segment.header().ack = true;
segment.header().ackno = ackno.value();
}
segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max() ? _receiver.window_size()
: numeric_limits<uint16_t>::max();
segment.header().rst = true;
_segments_out.emplace(segment);
unclean_shutdown();
}
inline void TCPConnection::unclean_shutdown() {
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_active = false;
}
遇到问题
-
Test #36-test#1: close 解决: 原因在于 TCPConnection::end_input_stream() 方法中结束了发送的数据流, 同时调用了 _sender.fill_window() 填充窗口发送, 但是报文段仍在 _sender 的发送队列中, 应该转移至 TCPConnection 的发送队列 _segment_out 中, 这样才能真正将带有 FIN 标志位的报文发出. -
Test#36-test#1: 1ms pass 解决: 出现该问题的原因在于服务端的推迟(linger)时间是 10*_cfg.rt_timeout , 在这个时刻已经要关闭连接, 而非超过该时间, 在 tick() 方法中是 _time_since_last_segment_received >= 10 * _cfg.rt_timeout . -
Test#37-test#1: 4000ms pass 解决: 原因就是 _linger_after_streams_finish 的状态未正确设置. 任务指导中的描述是: “If the inbound stream ends before the TCPConnection has reached EOF on its outbound stream, this variable needs to be set to false ”. 这里笔者开始将该变量设置为 false 的时机设在了 tick() 方法中, 但根据测试用例, 可能调用 tick() 时输出流也已经完毕, 此时便不能判断为被动关闭方从而设置 _linger_after_streams_finish = false , 因此最后将判断的位置改到了 segment_received() 方法中, 这样在收到远端的 FIN 标志位的第一时间即可设置 _linger_after_streams_finish 变量. -
Test#45-test#1: 1ms pass 解决: 出现的原因在于作为客户端建立 TCP 连接时没有发送的报文没有 SYN 标志位. 原因在于指导书中描述了 segment_received() 方法中, 在收到报文后一定要发送一个确认报文, 必要时, 即当前 _sender.segment_out() 队列中为空时构建空报文. 但这个就需要事先调用 _sender.fill_window() 方法来更新 _sender 的发送队列. 不然在初始收到 SYN 报文时, 此时 _sender 的发送队列自然为空, 便会发送只有 ACK 字段的空报文.
测试
在 build 目录下执行 make 后执行 make check_lab4 :
遗留问题
该问题是窗口大小为 1 的超时问题, 上述代码没有该问题, 但笔者并未弄清楚发生该问题的根本原因, 在此主要记录, 若有解决或明白的同学欢迎交流, 十分感谢!
问题代码
出现问题的代码是在 segment_received() 方法的中增加被动关闭的代码:
void TCPConnection::segment_received(const TCPSegment &seg) {
if (header.ack) {
_sender.ack_received(header.ackno, header.win);
if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended() &&
!_linger_after_streams_finish) {
_active = false;
return;
}
}
}
如上所示, 当满足 3 个前提且 _linger_after_streams_finish 为 false 时直接关闭 TCP 连接并返回. 而对于 tick() 方法中正常关闭部分的判断 if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) , 无论是否删除 !_linger_after_streams_finish || 都会通不过测试.
问题代码思路
下面解释一下上述问题代码的思路: 这里笔者将被动关闭的条件判断放置到了 segment_received() 方法中, 这么考虑的原因在于笔者认为 TCP 连接正常关闭的前 3 个前提满足时第一时间能从该方法中判断出. 再次回顾这 3 个前提: #1是要求入站流已结束且重组完毕, #2是要求出站流已全部发送, #3是要求出站流已被远程对等点全部确认. 很明显的是前提#1一定能第一时间从 segment_received() 中获取到, 因为收到报文后通过该方法调用 _receiver.segment_received() , 内部的重组器和字节流会判断 FIN 标志位, 同时更新未重组的字节数, 这里是没问题的, 而且笔者更新 _linger_after_streams_finish 变量也设置在该方法中. 而前提#2和#3是有关系的, #3满足时#2一定满足, 不然远程对等点无法全部确认本地发送的全部数据. 而远程对等点确认的方法即发送一个带有确认号的报文段(可以是空报文段)给本地, 也会在 segment_received() 方法中得到判断, 并且是在判断报文有 ACK 标志位的前提下. 这也是上述代码的思路. 这样, 被动关闭方在收到给对方的 FIN 确认后即可关闭 TCP 连接. 在下面测试中, 实际也证实了被动关闭方可以正常关闭. 这里额外说一下, 笔者浏览了网上的代码, 大部分都是将 _linger_after_streams_finish 置 false 以及根据 !_linger_after_streams_finish|| _time_since_last_segment_received >= 10 * _cfg.rt_timeout 正常关闭的两个判断封装成 clean_shutdown() 方法, 并在 send_segments() 方法的最后调用. 这与笔者的实现是不同的, 笔者将 _linger_after_streams_finish 置 false 写在了 segment_received() 中, 而正常关闭写在了 tick() 方法中. 自然, 这两种实现最后都是能够通过测试的, 因为 send_segments() 方法会被 tick() 方法调用, 而 tick() 方法是由操作系统频繁调用的.
问题表现
上述问题代码的运行结果是执行 make check_lab4 后 10 个窗口大小为 1 的测试用例均不能通过, 如下图所示: 此处选择测试 #64 - t_ucS_16_1 进行单测, 输出如下(这里对 文件进行了修改, “DEBUG” 字符后面输出的是 TCP 的源地址): 而下图则是没有问题的代码在该测试用例的输出, 其中红色部分则是上图中缺少的部分: 下图则是根据上述信息绘制的 TCP 连接时序图(绿色划线部分为缺少的部分): 综上可以分析出, 当前测试未通过的原因是: TCP 连接的主动关闭方不能正常关闭, 缺少前提#1入站流结束, 即未收到被动关闭方的 FIN 报文段. 但此时被动关闭方已经关闭, 则应该满足主动方已经收到了其全部数据, 包括最后的 FIN 报文. 这也是笔者感到奇怪的地方. 此外, 如下图所示, 笔者自己进行窗口大小为 1 的测试时 TCP 连接可以正常关闭: 目前问题的原因和有问题的代码都以确定, 但是笔者仍未弄清楚出现该问题的本质原因是什么, 笔者修改的是对 TCP 连接正常关闭被动关闭方的代码, 但实际情况是测试中主动关闭方不能正常关闭. 如果有解决或弄清楚该问题的同学, 恳请赐教.
7 Performance
优化前
在优化代码之前, 即直接使用之前实验中完成的组件进行 TCP 的性能测试. 在 build 目录下执行 ./apps/tcp_benchmark 后输出如下, 通过 3 次测试可以看到, 在为优化前, 笔者实现的 TCP 连接的吞吐量为 0.5Gb/s 左右. 也满足了实验 0.1Gb/s 的最低要求. 上述代码是在 ByteStream , StreamReassembler 类中的数据结构是 C++ std 中常用的标准模板库的基础上完成的. 其中 ByteStream::_buffer 是 deque<char> , StreamReassembler::_buffer 是 deque<char> , TCPSender::_outstanding_segments 是 queue<TCPSegment> . 这里值得一提的是, Lab0 中 ByteStream 使用 deque<char> 还是 string 笔者考虑了好久, 最后考虑到 deque 在对头部删除时性能更好最后选择了 deque , 但实际若换成 string 性能更好. 如下所示, 可以到达 0.66Gb/s :
优化 v1
首先参照网上大部分的优化策略, 即将 ByteStream 中的缓冲区数据结构由 string 替换为内置的数据结构 BufferList , 位于 libsponge/util/buffer.hh 中. 这个数据结构的底层实际上就是一个 deque<Buffer> . 而 Buffer 是利用智能指针 shared_ptr 封装的字符串, 且有一个头部偏移 _starting_offset . 这个数据结构的优点一方面在于通过智能指针和 move 操作可以避免字符串的多次拷贝, 另一方面利用偏移量来减少字符串释放内存的开销, 只有整个字符串全部移除后才会统一释放内存(具体可见其代码), 这样自然能够提高性能, 因此选择 BufferList 作为 ByteStream 的缓冲区数据结构, 能够得到一定的性能提升. 如下图所示: 这里值得一提的是, BufferList 的 size() 方法需要遍历其中的所有 Buffer , 是一个线性时间复杂度的算法, 由于在 ByteStream 中会被频繁调用影响性能, 因此这里额外使用 _buffer_size 变量来直接记录其大小. 此外, string::assign() 方法比 string::substr() 和直接调用构造函数的性能都要好一点, 在提取字符串子串时使用该方法可以得到细微的性能提升. 更新的 ByteStream 代码如下:
libsponge/byte_stream.hh
#ifndef SPONGE_LIBSPONGE_BYTE_STREAM_HH
#define SPONGE_LIBSPONGE_BYTE_STREAM_HH
#include "buffer.hh"
#include <string>
class ByteStream {
private:
const size_t _cap;
BufferList _buffer{};
size_t _buffer_size = 0;
size_t _total_read = 0;
size_t _total_written = 0;
bool _end = false;
bool _error = false;
public:
ByteStream(const size_t capacity);
size_t write(const std::string &data);
size_t remaining_capacity() const;
void end_input();
void set_error() { _error = true; }
std::string peek_output(const size_t len) const;
void pop_output(const size_t len);
std::string read(const size_t len);
bool input_ended() const;
bool error() const { return _error; }
size_t buffer_size() const;
bool buffer_empty() const;
bool eof() const;
size_t bytes_written() const;
size_t bytes_read() const;
};
#endif
libsponge/byte_stream.cc
#include "byte_stream.hh"
template <typename... Targs>
void DUMMY_CODE(Targs &&...) {}
using namespace std;
ByteStream::ByteStream(const size_t capacity) : _cap(capacity) {}
size_t ByteStream::write(const string &data) {
const size_t size = min(data.size(), _cap - _buffer_size);
_buffer.append(BufferList(move(string().assign(data.begin(), data.begin() + size))));
_buffer_size += size;
_total_written += size;
return size;
}
string ByteStream::peek_output(const size_t len) const {
string str = _buffer.concatenate();
return string().assign(string().assign(str.begin(), str.begin() + min(len, _buffer_size)));
}
void ByteStream::pop_output(const size_t len) {
const size_t size = min(len, _buffer_size);
_buffer.remove_prefix(size);
_buffer_size -= size;
_total_read += size;
}
std::string ByteStream::read(const size_t len) {
const size_t size = min(len, _buffer_size);
string str = _buffer.concatenate();
_buffer.remove_prefix(size);
_buffer_size -= size;
_total_read += size;
return str.assign(str.begin(), str.begin() + size);
}
void ByteStream::end_input() { _end = true; }
bool ByteStream::input_ended() const { return _end; }
size_t ByteStream::buffer_size() const { return _buffer_size; }
bool ByteStream::buffer_empty() const { return _buffer_size == 0; }
bool ByteStream::eof() const { return _end && buffer_empty(); }
size_t ByteStream::bytes_written() const { return _total_written; }
size_t ByteStream::bytes_read() const { return _total_read; }
size_t ByteStream::remaining_capacity() const { return _cap - _buffer_size; }
优化 v2
网上主要使用的就是 #优化 v1 中提到的方法, 且吞吐量基本上能够到 2Gb/s 以上. 此处笔者还是 0.7Gb/s 上下. 因此笔者分析仍有一个地方严重影响性能. 容易分析得到问题出在 StreamRessambler 上. 原本笔者 StreamRessambler 的实现是基于 deque 的, 需要对收到的字符串逐字节扫描, 在字符串很长的情况下性能很差. 后来笔者使用 set 容器按照字符串片段来存储, 同时配合一个类似内置 Buffer 类的 BufferPlus 类进行字符串的存储避免复制, 最终显著提高了性能. 关于具体优化可见 Lab1 #优化后 部分. 优化后的 TCP 吞吐量达到了 2.6Gb/s , 如下图所示:
8 webget revisited
按照任务指导修改 apps/webget.cc 中代码后, 在 build 目录下执行 make 然后执行 make check_webget :
|