CS144实验记录(四):lab3
在lab3中,我们需要实现TCPsender 。TCPsender 负责接收对方发送的TCPsegment 中的ack号和接收窗口大小(first unassembled索引和first unacceptable索引的距离),应用层通过socket将字节流写入TCPsender 中的ByteStream ,TCPsender 根据接收到的ackno 和window size,从ByteStream 中读取出来,将ByteStream 中的字节流转化为连续的TCPsegment ,发送给对方。
而在接收方,TCPReceiver 将收到的TCPsegment 转化回原始的BYteStream 中的字节流,并发送ackno 和window size给sender。
TCPsender和TCPReceiver各负责发送和接收TCPsegment的一部分:
- 由TCPsender发送,被TCPReceiver接收的部分:
- 序列号seqno
- SYN and FIN flags
- Payload
- 由TCPReceiver发送,被TCPsender接收的部分:
这是 TCPsegment的结构,红色部分突出显示的是TCPsender将读取的字段
TCPSender’s responsibility:
- 追踪TCPReceiver的接收窗口(处理收到的ackno和windowsize)
- 通过从ByteStream读取,创建新的TCPsegment(如果需要的话,包括SYN和FIN标志)并发送,尽可能地填满接收窗口。只有当接收窗口已满或TCPsender的BYteStream为空时,TCPsender才能停止发送segments。
- 追踪已发送但是没有收到ackno的segments(称为outstanding segments),超时之后重新发送这些segments。
TCPsender 发送的TCPsegment 每一个都由ByteStream 字节流中的子串组成(也可能为空),此segment 中还包含seqno ,标识这个子串在字节流中的索引;以及SYN和FIN标志位,表示是否是字节流的开头和结尾。
TCPsender需要维护一个数据结构,存储已发送未确认的TCPsegment集合(也可以称为发送窗口),如果最早发送的segment超时还没有被确认,就重传。
至此,我们的lab都是单向通信。发送方只有一个TCPsender ,接收方只有一个TCPReceiver ,由TCPsender 发送的和TCPReceiver 返回的都是TCPsegment 格式的TCP报文段,只不过由于单向通信,所以发送方的TCPsegment 中只包含序列号seqno 、 Payload、SYN 和 FIN,接收方返回的TCPsegment 中只包含ackno 和window size。
TCPReceiver返回的window size是TCPReceiver还可以接收多少字节的载荷,不包括FIN和SYN。而TCPsender用window size来限制发送窗口的字节长度,发送窗口中的字节长度是载荷加上FIN和SYN。
具体实现细节:
-
每隔几毫秒,TCPSender 的 tick 方法将被调用,并带有一个参数,该参数告诉它自上次调用该方法以来已经过去了多少毫秒。 使用它来维护 TCPSender 一直存活的总毫秒数。 -
当 TCPSender 被构造时,它被赋予一个参数,告诉它重传超时 (retransmission timeout,RTO ) 的“初始值”。 RTO 是在重传之前等待的毫秒数。 RTO 的值会随时间变化,但“初始值”保持不变。 初始代码将 RTO 的“初始值”保存在名为_initial_retransmission_timeout 的成员变量中 -
我们需要实现一个超时计时器,从某一个特定时间开始计时,当RTO时间过去后,该计时器重启。消耗的时间的概念来自于被调用的tick方法,而不是实际的时间。 -
每当一个包含数据的segment(长度不为0)被发送时,如果计时器没有启动,就启动一个计时器,在RTO时间后“报警”。同时,发送窗口的后沿向后移动(即将该segment送入发送窗口)。 -
当所有已发送未确认的segments都被确认,停止计时器。 -
计时器超时后:
-
TCPsender收到ackno(ackno表示累积确认,即在此之前的(seqno小于ackno的)所有segments都已经收到):
-
在此lab中,我们认为只有能确认接收窗口中一整段segment的ackno才算数。 如果ackno大于sendBase(发送窗口的前沿,即已发送未确认的最早的字节的序号,就表示sendBase及其之前的所有字节都已确认),并且ackno可以确认至少一整段segment: 由于收到了合法的ackno,说明网络拥塞状态有所缓解,所以:
除此之外还需要:
3.2 Implementing the TCP sender
class TCPSender {
private:
WrappingInt32 _isn;
std::queue<TCPSegment> _segments_out{};
unsigned int _initial_retransmission_timeout;
ByteStream _stream;
uint64_t _next_seqno{0};
public:
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
void send_empty_segment();
void fill_window();
void tick(const size_t ms_since_last_tick);
size_t bytes_in_flight() const;
unsigned int consecutive_retransmissions() const;
std::queue<TCPSegment> &segments_out() { return _segments_out; }
uint64_t next_seqno_absolute() const { return _next_seqno; }
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
};
TCPsender的四个接口如下,每个接口对应一个TCPsender需要处理的重要事件,每个事件最终都需要发送一个TCPsegment:
-
void ack_received( const WrappingInt32 ackno,kkk const uint16_t windows_size ) 接收到从TCPReceiver 返回的ack segments,提取出其中的ackno和window size,TCPsender需要将发送窗口中seqno 小于ackno 的segments删除,根据window size调整发送窗口的大小,调用 fill_windows() 继续传送 -
void fill_windows() TCPSender 被要求填充窗口:它从它的输入 ByteStream 中读取字节,然后构造成TCPSegment (加上SYN和FIN,并且占据序列号和空间)发送尽可能多的字节,只要ByteStream中有字节可以读取并且接收窗口有可用空间(window size> 0)发送窗口有空位。 我们发送的每一个TCPsegment的载荷要尽可能地大,但是不要超过TCPConfig::MAX_PAYLOAD_SIZE (1452字节,链路层MTU减去IP数据报首部和TCP报文段首部) 我们可以使用TCPSegment::length_in_sequence_space() 方法计算出一个segment所占据的序列号的长度,如果segment中包括SYN和FIN标志位,也需要各占据一个序列号。segment存储在发送窗口中,所以SYN和FIN在发送窗口也需要占据空间。
- 注意,如果TCPReceiver返回的ack segment中的window size为0,那么
fill_windows() 方法将window size视为1,TCPsender会发送一个被接收方拒绝(并且未确认)的字节,这样会促使接收方发送一个新的ack segment,从而表明此时的window size的大小。 如果不这样,即使接收窗口有空闲的空间了,发送者也不知道它可以再次开始发送。 -
void tick( const size_t ms_since_last_tick ) 每隔几毫秒,TCPSender 的 tick 方法将自动被调用(不需要我们调用),并带有一个参数,该参数告诉它自上次调用该方法以来已经过去了多少毫秒, 使用它来维护 TCPSender 一直存活的总毫秒数。我们只需要在 tick 中实现,通过参数判断过去了多少时间,需要执行何种操作即可 -
void send_empty_segment() TCPsender发送一个序列号长度为0的TCPsegment,但seqno被正确的设置。当TCP连接想要发送一个空ACK segment时,可以使用此方法。
- 注意,一个序列号长度为0的segment,不需要作为outstanding segments放入发送窗口中被追踪,也不需要重传。
我们需要在TCPsender中实现一个队列_outstanding_segments 存储已发送未确认的segments,也就是以上所说的发送窗口。
在本lab中,我们假设TCPsegment发送到_segments_out 队列中就算发送出去了,就TCPsender而言,一旦我们将TCPsegment推送到此队列,我们就认为它已经发送。很快,TCPsender的所有者(我们将在lab4中实现的TCPconnection)会对它进行pop(使用公共方法segments_out() 访问该队列),并真正地发送它。
在从TCPReceiver获取ackno之前,TCPsender假设window size为一个字节。
如果接收到outstanding segments的部分ackno,实际的TCPsender可以对segments已确认的部分进行切割,而我们的TCPsender不要求实现此功能,一个TCPsegment只有全部被ack才能被remove。
具体实现
tcp_sender.hh
#include "byte_stream.hh"
#include "tcp_config.hh"
#include "tcp_segment.hh"
#include "wrapping_integers.hh"
#include <functional>
#include <queue>
class TCPSender {
private:
WrappingInt32 _isn;
std::queue<TCPSegment> _segments_out{};
unsigned int _initial_retransmission_timeout;
uint32_t _RTO;
bool _back_off = true;
ByteStream _stream;
uint64_t _next_seqno{0};
uint64_t _bytes_in_flight{0};
std::queue<TCPSegment> _outstanding_segments{};
bool _timer_running = false;
uint32_t _time_elipsed = 0;
uint16_t _consecutive_retransmissions = 0;
uint16_t _receiver_window_size = 1;
uint16_t _receiver_free_space = 0;
bool _syn_sent = false;
bool _fin_sent = false;
void _send_segments(TCPSegment & seg);
public:
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
bool valid_ack(uint64_t abs_ack);
void send_empty_segment();
void fill_window();
void tick(const size_t ms_since_last_tick);
size_t bytes_in_flight() const;
unsigned int consecutive_retransmissions() const;
std::queue<TCPSegment> &segments_out() { return _segments_out; }
uint64_t next_seqno_absolute() const { return _next_seqno; }
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
};
tcp_sender.cc
#include "tcp_sender.hh"
#include "tcp_config.hh"
#include <random>
template <typename... Targs>
void DUMMY_CODE(Targs &&... ) {}
using namespace std;
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _RTO(retx_timeout)
, _stream(capacity){}
uint64_t TCPSender::bytes_in_flight() const {
return _bytes_in_flight ;
}
void TCPSender::fill_window() {
if(!_syn_sent){
TCPSegment seg;
_syn_sent = true;
seg.header().syn = true;
_send_segments(seg);
return;
}
if(!_outstanding_segments.empty() && _outstanding_segments.front().header().syn)
return;
if(_fin_sent)return;
_receiver_free_space = _receiver_window_size <= _bytes_in_flight ? 0 : _receiver_window_size - _bytes_in_flight;
if(_stream.eof() && _receiver_free_space >= 1){
TCPSegment seg;
seg.header().fin = true;
_fin_sent = true;
_send_segments(seg);
return;
}
_receiver_free_space = _receiver_window_size <= _bytes_in_flight ? 0 : _receiver_window_size - _bytes_in_flight;
while(!_stream.buffer_empty() && _receiver_free_space > 0){
TCPSegment seg;
size_t temp = _receiver_free_space > TCPConfig::MAX_PAYLOAD_SIZE ? TCPConfig::MAX_PAYLOAD_SIZE : _receiver_free_space;
size_t payload_size = _stream.buffer_size() > temp ?temp :_stream.buffer_size();
seg.payload() = _stream.read(payload_size);
if(_stream.eof() && _receiver_free_space >= 1 + seg.length_in_sequence_space()){
seg.header().fin = true;
_fin_sent = true;
}
_send_segments(seg);
_receiver_free_space = _receiver_window_size <= _bytes_in_flight ? 0 : _receiver_window_size - _bytes_in_flight;
}
}
void TCPSender::_send_segments(TCPSegment &seg){
seg.header().seqno = wrap(_next_seqno,_isn);
_next_seqno += seg.length_in_sequence_space();
_bytes_in_flight += seg.length_in_sequence_space();
_segments_out.push(seg);
_outstanding_segments.push(seg);
if(!_timer_running){
_timer_running = true;
_time_elipsed = 0;
}
}
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t absolute_ackno = unwrap(ackno,_isn,_next_seqno);
if(!valid_ack(absolute_ackno)){
return;
}
this->_receiver_window_size = window_size ;
if(window_size == 0){
this->_receiver_window_size = 1;
_back_off = false;
}else
_back_off = true;
while(!_outstanding_segments.empty()){
TCPSegment front_outstanding_segment = _outstanding_segments.front();
uint64_t front_abs_seqno = unwrap(front_outstanding_segment.header().seqno,_isn,_next_seqno);
if(absolute_ackno >= front_abs_seqno + front_outstanding_segment.length_in_sequence_space()){
_RTO = _initial_retransmission_timeout;
_consecutive_retransmissions = 0;
_outstanding_segments.pop();
_bytes_in_flight -= front_outstanding_segment.length_in_sequence_space();
_time_elipsed = 0;
}else
break;
}
if(_outstanding_segments.empty()){
_timer_running = false;
_time_elipsed = 0;
}
fill_window();
}
bool TCPSender::valid_ack(uint64_t abs_ack){
TCPSegment front_outstanding_segment = _outstanding_segments.front();
return abs_ack <= _next_seqno &&
abs_ack >= unwrap( front_outstanding_segment.header().seqno ,_isn,_next_seqno) ;
}
void TCPSender::tick(const size_t ms_since_last_tick) {
if(_timer_running){
_time_elipsed += ms_since_last_tick;
if(_time_elipsed >= _RTO){
_segments_out.push(_outstanding_segments.front());
if(_receiver_window_size != 0 && _back_off){
_consecutive_retransmissions++;
_RTO *= 2;
}
_time_elipsed = 0;
}
}
}
unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions;}
void TCPSender::send_empty_segment() {
TCPSegment seg;
seg.header().seqno = wrap(_next_seqno,_isn);
_segments_out.push(seg);
}
关于此lab的疑问:
- TCPReceiver返回的window size是TCPReceiver还可以接收多少字节的载荷,不包括FIN和SYN。而TCPsender用window size来限制发送窗口的字节长度,但是发送窗口中的字节长度却是载荷加上FIN和SYN。为什么这样安排?
- 为什么要用unsigned类型?本lab中几个unsigned的变量相互减,会出现溢出的情况,难以发觉。
|