CS144实验记录(五):lab4
在lab 4中,我们将创建总体模块,称为TCP connection,该模块将TCPSender和TCPReceiver结合起来。
我们的TCP segment可以封装到用户(TCP-In-UDP)或IP(TCP/IP)数据报的有效载荷中。
本lab提供了代码支持从用户数据包或IP数据报中读取或写入TCPsegment,还提供了CS144TCPSocket 类,将我们的TCPConnection包装,使它表现得像一个普通的流套接字,就像在lab 0中用来实现webget的TCPSocket一样。
我们需要做的是将之前已经实现的TCPSender 和TCPReceiver 结合成一个对象(TCPConnection ),并处理一些连接全局的管理任务。
TCPsegment的报文格式:
- ACK标志位用于指示确认字段中的值是有效的,即该报文段包括对一个已被成功接收报文段的确认。在TCP中,只有第一次连接请求的ACK等于0,其余的ACK都为1.
- RST表示复位,表示TCP连接中出现异常必须强制断开连接。发送RST包关闭连接时,不必等缓冲区的包都发出去(不像上面的FIN包),直接就丢弃缓存区的包发送RST包。而接收端收到RST包后,也不必发送ACK包来确认。
- SYN同步: 表示开始会话请求,用来发起一个连接,建立连接。SYN为1表示希望建立连接,并在其序列号的字段进行序列号初始值的设定。(Synchronize本身有同步的意思。也就是意味着建立连接的双方,序列号和确认应答号要保持同步)
TCPConnection需要遵守的基本规则:
TCP连接是如何实际发送一个段?
- TCPConnection发送segment类似于TCPSender,将segment push到_segments_out队列中即可。一旦我们将TCPsegment推送到此队列,我们就认为它已经发送。很快,所有者会对它进行pop(使用公共方法
segments_out() 访问该队列),并真正地发送它。
当TCPConnection收到带有RST标志位的segment该做什么?
- 如果TCPConnection收到(或发送)一个RST标志位被置位的segment,表示连接的立即终止。我们应该设置inbound 和 outbound的ByteStream的error flag,并且后续任何对
TCPConnection::active() 的调用都应该返回false 。
遇到以下两种情况TCPConnection需要发送RST,即放弃整个连接:
- 连续重传次数超过上限(
TCPConfig::MAX_RETX_ATTEMPTS ) - 在连接还是active时(
active() 函数返回true),TCPConnection的destructor被调用。
如何制作一个可以设置RST标志位的segment:
- 可以通过调用
send_empty_segment() 方法生成一个正确seqno的空segment,将其RST标志位置位即可。
ACK标志位的作用是什么?
- ACK标志位用于指示确认字段中的值(即ackno和window size)是有效的,即该报文段包括对一个已被成功接收报文段的确认。在TCP中,只有第一次连接请求的ACK等于0,其余的ACK都为1.
- 对于TCPSender输出的segments,当TCPReceiver的
ackno() 方法返回的std::optional<WrappingInt32> 非空时(使用 has_value()方法测试),就可以设置发送segment中的ackno和window size,并将ACK标志位置为1 - 对于TCPReceiver接收的segments,当ACK标志位为1时,我们可以查看ackno,并将ackno和window size传递给TCPSender
连接建立流程
TCPSender的接口:
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
void send_empty_segment();
void fill_window();
void tick(const size_t ms_since_last_tick);
TCPReceiver的接口:
std::optional<WrappingInt32> ackno() const;
size_t window_size() const;
void segment_received(const TCPSegment &seg);
客户端:
-
客户端先调用TCPSender中的fill_window() 方法,检查类内维护的变量_syn_sent ,没有发送过SYN,所以首先生成一个SYN=1的segment,此segment的seqno为ISN(客户端的)。并且检测到计时器没有启动,还要启动计时器。 void TCPSender::fill_window() {
if(!_syn_sent){
TCPSegment seg;
_syn_sent = true;
seg.header().syn = true;
_send_segments(seg);
return;
}
}
-
再调用TCPReceiver中的ackno() 方法,检查类内维护的变量_syn ,没有收到过SYN,所以返回的std::optional<WrappingInt32> 为空。 optional<WrappingInt32> TCPReceiver::ackno() const {
if(!_syn)return {};
}
-
所以将segment中的ACK标志位置为0,ackno和window size都不需要填写。将该segment发送出去
服务器:
- 调用TCPReceiver中的
segment_received() 方法,接收segment。记录客户端的ISN,以及收到了SYN。 - 检测到ACK=0,所以不需要调用TCPSender中的
ack_received() 方法。 - 调用TCPSender中的
fill_window() 方法,检测到服务器端没有发送过SYN,所以生成一个SYN=1的segment,此segment的seqno为ISN(服务器端的)。并且检测到计时器没有启动,还要启动计时器。 - 再调用TCPReceiver中的
ackno() 方法,检查类内维护的变量_syn ,收到过SYN,返回的std::optional<WrappingInt32> 为非空。 - 所以将segment中的ACK标志位置为1,ackno和window size填写为调用TCPReceiver中的
ackno() 和window_size() 方法的返回值。将该segment发送出去
收到segment时,将segment作为参数调用TCPReceiver中的segment_received() ,检查segment中的ACK标志位是否为1,如果是,则将segment中的ackno和window_size提取出来,作为参数调用TCPSender中的ack_received() 。
发送segment时,调用TCPSender中的fill_window() 方法,再与TCPReceiver中的ackno() 和window_size() 的方法的结果(如果收到过SYN,有结果的话)结合,生成完整的segment,发送出去。
连接结束流程
客户端断开连接的流程:
输出流的结束:
-
如果应用层决定将输出流结束(调用ByteStream的end_input() 方法),并且等待输出的缓冲区为空,表明整个输出流的结束,即eof,此时才可以向对方发送FIN。 void TCPSender::fill_window() {
if(_stream.eof() && _receiver_free_space >= 1){
TCPSegment seg;
seg.header().fin = true;
_fin_sent = true;
_send_segments(seg);
return;
}
}
输入流的结束:
输入流只能被动结束:
-
对TCPConnection来说,收到了FIN 并且 等待被重组的缓冲区为空表明对等方的输入结束(收到FIN并不代表输入结束,有可能发生乱序) void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
segment seg={index,data,eof};
if(_segs_to_be_reassembled.empty() == true && _eof == true)
_output.end_input();
}
-
对应用层来说,对等方的输入结束 并且 等待被读取的缓冲区为空 表明整个输入流的结束(eof)。
bool ByteStream::eof() const {
return buffer_empty()&&input_ended();
}
如果是本地端先结束连接,则应用层可以随便什么时候end_input() ;如果是本地端先接收到对等端输入的结束(FIN进入ByteStream ),则应用层需要结束输出流,调用end_input() ,停止往ByteStream中写入内容,等待ByteStream空了之后,发送FIN。
只要发送了FIN,就不再对对等端任何的非FIN的segment进行回复确认,不管这个FIN对方有没有收到。
TCPConnection的一个重要功能是决定TCP 连接什么时候完全结束。当TCP连接完全结束时,停止对任何接收到的segment回复ackno,并且active() 方法返回false 。
TCP连接有两种关闭的方法:
-
不干净的关闭:TCPConnection发送或接收到一个首部字段中的RST标志位被设置的segment 。这种情况下,inbound和outbound的ByteStream都处于error state,并且active() 方法可以马上返回false 。 -
干净的关闭:在没有error的情况下关闭(active() =false)。这种情况可以尽可能地保证两个字节流都完全可靠地交付到接收对等方。 由于两将军问题,不可能保证对等方都能完全干净关闭连接,但是可以非常接近。 从一个对等设备的角度来看,对其与“远程”对等设备的连接进行干净关闭有四个先决条件,条件1保证了输入流被读取干净了,条件2和3保证了输出流被对等方读取干净了。条件4也是关于输入流的,保证了输入流的正常关闭。
-
输入流被完全确认(StreamReassembler 为空)并且结束(收到了FIN) -
输出流被应用层结束(调用ByteStream的end_input() 方法),并且被完全发送出去(ByteStream 为空),首部字段包括FIN的segment也被发送出去。 -
输出流被对等方完全确认(对方的StreamReassembler 为空,实际上要求本地的_outstanding_segments 为空) -
本地TCPConnection需要让对等方满足条件3。有两种方式:
-
选择A:在两个流都已经结束后 linger 一段时间: 本地TCPConnection确认了整个输入流,但是难以确认对等端是否知道自己确认了整个输入流,即对等端是否收到ack(因为对等端不会对本地发送的ack进行ack )。如果不能确保对等端收到ack,也就不能确保对等端的_outstanding_segments 为空,那么对等端就有可能不停地重传无法得到本地确认的segment,输入流永远无法关闭。 我们可以让本地的TCPConnection等待一段时间,如果对等端没有重传任何东西,那么就可以相信对等端收到了ack。 具体地,当一个连接满足条件1到条件3,并且在收到对等端最后一个segment后,等待了至少 初始重传超时时间(_cfg.rt_timeout )的十倍,才能断开。 这意味着TCPConnection需要保持alive一段时间,保持对本地端口号的独占声明并可能发送 acks 以响应传入的segment,即使在 TCPSender 和 TCPReceiver 完全完成其工作并且两个流都已经结束了。 -
选择B:被动关闭 如果在TCPConnection发送FIN之前,TCPConnection的输入流就结束了(收到了FIN),那么这个TCPConnection在两个流结束后不需要 linger 。(因为FIN在发送ack之后,所以FIN的seqno大于之前发送的ack,所以对方对FIN的确认,就相当于确认了之前发送的所有ack)
先发送FIN的端在两个流结束后需要TIME_WAIT,后发送FIN的端不需要TIME_WAIT 。在实际实现中,TIME_WAIT为segment最大生命周期(MSL)的两倍时间。
连接结束的实现总结
TCPConnection中有一个成员变量叫_linger_after_streams_finish ,通过state() 方法暴露于测试中。此变量的初始值为true。如果输入流先结束(收到了FIN,并且StreamReassembler为空),TCPConnection的输出流才达到EOF(应用层调用ByteStream的end_input() 方法结束输出,并且ByteStream为空),此变量被设为false。
在满足先决条件1到3的任何时候,如果_linger_after_streams_finish 为 false,则连接结束(并且 active() 应返回 false)。 否则,连接需要在收到最后一个segment后 10 *_cfg.rt_timeout 时间过去后才结束。
具体实现
开始时,发送方处于CLOSED状态,接收方处于LISTEN状态。
第一个SYN可以携带data,第二个SYN不能携带data,第三次握手可以携带data。
主动连接时,应用层调用connect() 方法,调用fill_window() 方法,向某一对等端发送包含SYN=1,ACK=0的包,进入SYN_SENT状态。
- 当从网络中收到segment时,调用
segment_received() 方法,此时只接收SYN=1的包
- 如果有效载荷不为空,不符合SYN,丢弃
- 如果ACK=1,那么正常接收,发送SYN=0,ACK=1的包,进入ESTABLISHED状态。
- 如果ACK=0,可能是双方同时尝试建立连接,receiver正常接收,发送SYN=1,ACK=1且有效载荷为空的包
被动连接时,调用segment_received() 方法,只接收SYN=1,ACK=0的包,发送SYN=1,ACK=1且有效载荷为空的包,进入SYN_RECV状态。后续再在segment_received() 方法中接收到含有ACK的包,就进入ESTABLISHED状态。
连接建立后:
- 应用层可以调用
write 方法,向发送缓冲区(ByteStream)中写入想要发送的内容,再调用_send_segments 方法,将其发送。 - 当从网络中收到segments时,调用
segment_received() 方法。将包的接收窗口大小(window size)、ackno交给sender,将其余的部分(seqno、有效载荷、FIN和SYN)交给receiver,由它们各自处理。 - 由操作系统周期性地调用
tick() 方法,指示时间的流逝,以对已发送未确认的最早的包进行超时重传。还需要检查连续重传的次数是否超过上限,如果超过,则需要调用_unclean_shutdown 强制关闭连接。还需要调用_send_segments 发送segments 。 - 由tick()方法时不时地调用
_send_segments 方法,如果TCPSender的_segments_out 队列中有可以发送的segment,就设置它的ackno,window size以及ACK标志位,然后将其真正地发送出去。每次调用_send_segments 方法都会调用_clean_shutdown() 来判断是否需要关闭连接。
应用层可以调用 end_input_stream 方法,结束向TCPConnection中写入,表明应用层想要干净地主动结束连接。 end_input_stream 方法中调用fill_window() 方法,和_send_segments 方法,将ByteStream中的值发送出去。如果接收窗口空间不够,无法一次性将ByteStream发送完,此时只能等segment_received() 接收到对方的新的包,调用sender的ack_received() 方法,更新window size,再调用fill_window() 方法,将sender的发送缓冲区(ByteStream)变成空的,再调用_send_segments 方法,才能发送FIN。最后调用_clean_shutdown() 来判断是否需要关闭连接。
当TCP连接还处于active状态时,TCPConnection对象被析构,此时会导致TCPConnection的异常中断。在析构函数的内部调用_unclean_shutdown 函数,该函数直接强制关闭连接:将输入流和输出流设置为错误状态,TCPConnection的状态马上变为false,然后向对等方发送包含RST的segment。
#include "tcp_connection.hh"
#include <iostream>
using namespace std;
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }
size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }
size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }
size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received;}
bool TCPConnection::active() const { return _active; }
void TCPConnection::segment_received(const TCPSegment &seg) {
if(!_active){
return;
}
_time_since_last_segment_received = 0;
if(!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0){
if(!seg.header().syn){
return;
}
_receiver.segment_received(seg);
connect();
return;
}
if(_sender.next_seqno_absolute() > 0 && _sender.bytes_in_flight() == _sender.next_seqno_absolute() &&
!_receiver.ackno().has_value()){
if(seg.payload().size() ){
return;
}
if(!seg.header().ack){
if(seg.header().syn){
_receiver.segment_received(seg);
_sender.send_empty_segment();
}
return;
}
if(seg.header().rst){
_receiver.stream_out().set_error();
_sender.stream_in().set_error();
_active = false;
return;
}
}
_receiver.segment_received(seg);
_sender.ack_received(seg.header().ackno,seg.header().win);
if (_sender.stream_in().buffer_empty() && seg.length_in_sequence_space())
_sender.send_empty_segment();
if (seg.header().rst) {
_sender.send_empty_segment();
unclean_shutdown();
return;
}
send_sender_segments();
}
size_t TCPConnection::write(const string &data) {
if(data.size() == 0){
return 0;
}
size_t write_size = _sender.stream_in().write(data);
_sender.fill_window();
send_sender_segments();
return write_size;
}
void TCPConnection::tick(const size_t ms_since_last_tick) {
if(!_active){
return;
}
_time_since_last_segment_received += ms_since_last_tick;
_sender.tick(ms_since_last_tick);
if(_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS){
unclean_shutdown();
}
send_sender_segments();
}
void TCPConnection::end_input_stream() {
_sender.stream_in().end_input();
_sender.fill_window();
send_sender_segments();
}
void TCPConnection::connect() {
_sender.fill_window();
send_sender_segments();
}
void TCPConnection::send_sender_segments(){
while(!_sender.segments_out().empty()){
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
if(_receiver.ackno().has_value()){
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
seg.header().win = _receiver.window_size();
}
_segments_out.push(seg);
}
clean_shutdown();
}
void TCPConnection::unclean_shutdown(){
_receiver.stream_out().set_error();
_sender.stream_in().set_error();
_active = false;
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
seg.header().ack = true;
if(_receiver.ackno().has_value()){
seg.header().ackno = _receiver.ackno().value();
}
seg.header().win = _receiver.window_size();
seg.header().rst = true;
_segments_out.push(seg);
}
void TCPConnection::clean_shutdown(){
if(_receiver.stream_out().input_ended()){
if(!_sender.stream_in().eof()){
_linger_after_streams_finish = false;
}else if(_sender.bytes_in_flight() == 0){
if(!_linger_after_streams_finish || time_since_last_segment_received() >= 10 * _cfg.rt_timeout){
_active = false;
}
}
}
}
TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
_sender.send_empty_segment();
unclean_shutdown();
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}
以上做法无法完全通过测试,应该是之前的lab的问题。
|