IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> [CS144] Lab 4: The TCP connection -> 正文阅读

[网络协议][CS144] Lab 4: The TCP connection

3 Lab 4: The TCP connection

要点

  • 根据任务指导将发送报文段接受报文段的过程在 TCPConnection 的方法中进行实现
  • TCP 的四次挥手过程的实现, 即 TCP 的正常关闭(clean shutdown). 特别是关于是否延迟关闭(linger after streams finish)的判断. TCP 可以正常关闭的 4 个前提是:
    • Prereq #1: 入站流(接收的数据流)已经被完全重组且已经结束.
    • Prereq #2: 出站流(发送的数据流)已由本地应用程序结束写入并且完全发送(包括结束带有 FIN 标志位的报文段)到远程对等点.
    • Prereq #3: 出站流已被远程对等点完全确认.
    • Prereq #4: 本地 TCP 连接确信远程对等点可以满足前提 #3.

思路

直接调用成员方法

根据任务指导 FAQs 部分, 可以实验可以从 remaining_outbound_capacity(), bytes_in_flight()unassembled_bytes() 这几个函数开始. 这些函数的实现十分简单, 可以直接调用 _sender_receiver 的方法即可满足, 在此不多赘述.

发送报文段

在整个 TCPConnection 所要处理的部分中, 发送报文段的内容相对比较简单和清晰, 可以直接按照任务指导完成. 不过根据 TCPConnection 提供的公共方法来看, 并没有专门用于发送的方法, 而是作为其中一些方法中的一部分存在的. 考虑到方法会多次重用, 笔者定义了一个私有方法 send_segments() 用于发送报文段. 代码如下:

void TCPConnection::send_segments() {
    while (!_sender.segments_out().empty()) {
        TCPSegment segment = _sender.segments_out().front();
        _sender.segments_out().pop();
        optional<WrappingInt32> ackno = _receiver.ackno();
        // if TCP does not receive SYN segments from the remote peer, i.e. at SYN_SENT state
        // TCP will not set ACK flag and seqno
        if (ackno.has_value()) {
            segment.header().ack = true;
            segment.header().ackno = ackno.value();
        }
        // set the local receiver's window size
        segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
                                   ? _receiver.window_size()
                                   : numeric_limits<uint16_t>::max();
        _segments_out.emplace(segment);
    }
}

首先最外层是一个 while 循环, 即将当前 _sender 的发送队列 _sender.segments_out() 中的所有报文段全部转移至 TCPConnection 自己的发送队列 _segments_out. 循环内部主要就是设置根据 _receiver 确定的确认号以及窗口大小. 窗口大小是 uint16_t 类型, _receiver.window_sizesize_t 类型, 参照 FAQs 利用 numeric_limits 来取最大值, 防止赋值时越界的情况.
而对于发送报文段, 需要重点注意的是发送报文段的时机. 根据任务指导, segment_received() write() tick()``end_input_stream()以及 connect() 方法均需要调用 send_segments().

connect() 方法

该方法即用于开始时主动建立 TCP 连接, 因此必然需要 send_segmens() 方法, 而在此之前, 要先让 _sender 构造报文, 即调用 _sender.fill_window(), 初始时发送窗口为 1, 自然就能发送 SYN 报文段.

void TCPConnection::connect() {
    _sender.fill_window();
    send_segments();
}
write() 方法

该方法即上层应用向 TCP 的字节流中写入数据进行发送. 因此需要调用 _sender.stream_in().write() 方法. 在调用之后同样需要移动发送窗口并且发送报文段, 以及时将写入的数据发送出去.

size_t TCPConnection::write(const string &data) {
    if (!_active) {
        return 0;
    }
    size_t ret = _sender.stream_in().write(data);
    _sender.fill_window();
    send_segments();
    return ret;
}
end_input_stream() 方法

该方法即结束需要发送的数据流, 即出站流. 因此需要调用 _sender.steam_in().end_input() 方法. 而结束流的隐含信息是要发送一个 FIN 报文段, 因此此时同样需要移动发送窗口, 并且发送报文段, 以确保 FIN 报文段能够及时发出.

void TCPConnection::end_input_stream() {
    _sender.stream_in().end_input();
    _sender.fill_window();
    send_segments();
}

接收报文段

接收报文段即对应 segment_received() 方法, 也是 TCPConnection 中关键的方法之一. 其主要部分根据任务指导完成, 但其中也隐含有其它一些需要注意的地方, 需要结合 TCP 的状态图去考虑.

void TCPConnection::segment_received(const TCPSegment &seg) {
    if (!_active) {
        return;
    }
    // reset the timer
    _time_since_last_segment_received = 0;
    const TCPHeader &header = seg.header();
    // if TCP does not receive SYN from remote peer, and not send SYN to remote peer
    if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) {
        // at this time, TCP acts as a server,
        // and should not receive any segment except it has SYN flag
        if(!header.syn) {
            return;
        }
        _receiver.segment_received(seg);
        // try to send SYN segment, use for opening TCP at the same time
        connect();
        return;
    }
    // the TCP ends in an unclean shutdown when receiving RST segment
    if (header.rst) {
        unclean_shutdown();
        return;
    }
    _receiver.segment_received(seg);
    // if TCP sends SYN segment as a client but does not receive SYN from remote peer,
    // the local TCP should not handle it, too. 
    if (!_receiver.ackno().has_value()) {
        return;
    }

    // set the `_linger_after_streams_finish` the first time the inbound stream ends
    if (!_sender.stream_in().eof() && _receiver.stream_out().input_ended()) {
        _linger_after_streams_finish = false;
    }
    // use the remote peer's ackno and window size to update the local sending window
    if (header.ack) {
        _sender.ack_received(header.ackno, header.win);
    }

    _sender.fill_window();
    // makes sure that at least one segment is sent in reply
    if (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {
        _sender.send_empty_segment();
    }
    // an extra special case to respond to a keep-alive segment
    if (seg.length_in_sequence_space() == 0 && header.seqno == _receiver.ackno().value() - 1) {
        _sender.send_empty_segment();
    }
    send_segments();
}

在接收到报文判断 RST 标志位之前, 首先有一个条件判断 if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0). 该判断即当前 TCP 连接既没有收到 SYN 报文段, 也没有发送报文段的情形 此时 TCP 作为一个服务端, 在 LISTEN 状态, 此时只会考虑接收 SYN 报文段, 并且接收到 SYN 报文段发送 SYN+ACK 报文段进入 SYN_RECV 状态.
这里值得一提的一点是, 如果本地和远程同时打开 TCP 连接, 同时发送了 SYN 报文段, 根据 TCP 状态图, 即两个 TCP 连接均处于 SYN_SENT 状态, 这时双方接收到彼此的 SYN 报文段后都应该发 SYN+ACK 报文, 进入 SYN_RECV 状态. 这里笔者考虑单独调用 _receiver.segment_received()connect() 原因正是为了处理这种情况. 如果去掉 15~18 行代码, 仍然能通过测试, 但遇到同时打开的情况时则会发送一个 ACK 的空报文段, 不符合 TCP 的状态转移. 而在这种情况下, 由于没有收到 ACK, 也就不会更新发送窗口, 因此调用 connect() 方法, 乃至用户使用 write() 方法都不会发送报文, 而是等到超时通过 _sender.tick() 方法重发一个 SYN+ACK 报文, 从而满足了 TCP 的正确转移状态的要求(注: 正常情况下 SYN 和 FIN 标志位不会出现在同一报文段中).
接下来即考虑收到 RST 报文段, 进行 TCP 连接的非正常关闭(unclean shutdown). 具体关于 TCP 连接的关闭方式见后文.
在经 _receiver.segment_received() 方法处理后, 仍有一个 if (!_receiver.ackno().has_value()) 的条件判断. 改判断是考虑 TCP 作为客户端, 已发送了一个 SYN 报文段, 此时在 SYN_SENT 状态, 这时若未收到对方的 SYN 报文(但可能收到了 RST 报文, 因此在 RST 报文判断之后), 则为无效报文, 不进行后续操作.
之后是对 _linger_after_streams_finish 变量的设置, 这里涉及 TCP 的正常关闭具体见后文.
紧接着是如果有确认标志位时, 通过远程对等点发送过来的确认号和接收窗口大小来更新本地 TCP 连接的发送窗口.
接下来第一个 if 语句是在收到一个占有序列号的报文段时, 若此时没有数据需要发送, 即 _sender 的发送队列为空, 便构造一个空报文段来进行确认. 第二个 if 语句则是任务指导中提到的特殊情况. 在此之前需要注意的是要调用 _sender.fill_window() 来更新发送窗口, 只有这样才能确定此时 _sender 的发送队列是否真正为空, 以避免发送多余的空报文段.
最后调用 send_segments() 即用于对接收到的报文进行确认.

TCP 连接的关闭

TCP 连接的关闭分为两种: 非正常关闭(unclean shutdown)和正常关闭(clean shutdown), 具体可见任务指导第 5 部分. 此处创建了一个私有成员 _active 用于反应 TCP 的连接状态, 由 active() 方法返回.

unclean shutdown

非正常关闭相对简单, 即收到或发出 RST 报文的情况时, 需要对 TCP 连接进行关闭, 主要就是关闭发送和接收的字节流同时置 _activefalse.

inline void TCPConnection::unclean_shutdown() {
    _sender.stream_in().set_error();
    _receiver.stream_out().set_error();
    _active = false;
}

发送 RST 报文主要在 tick() 方法和 TCPConnection 的析构函数中, 此处笔者又封装了一个 send_RST() 方法, 实际上就是从发送队列拿去一个报文添加上 RST 标志位.

void TCPConnection::send_RST() {
    _sender.fill_window();
    if (_sender.segments_out().empty()) {
        _sender.send_empty_segment();
    }
    TCPSegment segment = _sender.segments_out().front();
    _sender.segments_out().pop();
    optional<WrappingInt32> ackno = _receiver.ackno();
    if (ackno.has_value()) {
        segment.header().ack = true;
        segment.header().ackno = ackno.value();
    }
    segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
                               ? _receiver.window_size()
                               : numeric_limits<uint16_t>::max();
    segment.header().rst = true;
    _segments_out.emplace(segment);

    unclean_shutdown();
}
  • 这里笔者有个问题不是很确定: 一是对于 RST 报文段, 实际上不添加确认号和窗口大小也可以通过本实验测试, 是否有必要或者是否应该添加这两个字段? 特别是窗口大小字段, 此时告知对方窗口大小是没有意义的, 因为连接就要关闭了.
clean shutdown

TCP 的正常关闭即 TCP 的四次挥手, 是本实验中最为复杂的地方, 其中也有笔者不是很确定的地方.
TCP 正常关闭的四个前提已经在 #要点 中提到, 如下即前提与代码的对应:

  • Prereq#1: _receiver.unassembled_bytes()==0 && _receiver.stream_out().input_ended(), 实际上可以直接转化为 _receiver.stream_out().input_ended()(此处使用 _receiver.stream_out().eof() 方法同样能通过测试, 任务指导中描述使用的 ended, 因此这里笔者选择了前者).
  • Prereq#2: _sender.stream_in().eof(). 这里需要是 eof() 方法而非 input_ended(), 因为要确保发送字节流的所有数据已经全部发送出去.
  • Prereq#3: _sender.bytes_in_flight()==0

此外, TCP 的主动关闭方会有一个 TIME_WAIT 状态, 需要进行延迟(linger)关闭, 涉及变量 _linger_after_streams_finish. 根据任务指导, 入站流结束但出站流未到达 EOF 时会将该变量置为 false. 即作为被动关闭方是无需延迟的. 此处, 笔者将该变量的设置写在了 segment_received() 方法中, 因为在接收到报文段后会更新入站流 _receiver.stream_in() 的状态, 一旦入站流结束能够在第一时间将 _linger_after_streams_finish 置为 false.
因此主动关闭的代码如下:

    // TCP clean shutdown
    if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {
        _active = false;
        return;
    }

前面三个即 TCP 连接正常关闭的前三个前提. 对于 _linger_after_streams_finishfalse 的情况可以直接关闭; 否则则需要延迟一段时间(2MSL)后关闭.

tick() 方法

根据实验指导, tick() 方法主要就是三个作用, 一是传递给 _sender.tick() 方法; 一个是在超时重发次数超过上限时发送 RST 报文关闭连接, 以及必要时正常关闭连接.
这里有额外两行代码: _time_since_last_segment_received += ms_since_last_tick 是用于更新 _time_since_last_segment_received 变量, 配合 segment_received() 方法中该变量的重置, 达到记录距离收到上个报文的时间的目的. 而最后的 send_segments() 是考虑到 _sender.tick() 方法可能会重发一个报文到其发送队列, 此时再转移至 TCPConnection 的发送队列进行发送.

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
    if (!_active) {
        return;
    }
    _time_since_last_segment_received += ms_since_last_tick;
    _sender.tick(ms_since_last_tick);
    // TCP unclean shutdown if the number of consecutive retransmissions 
    // is more than an upper limit
    if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
        send_RST();
        return;
    }
    // TCP clean shutdown if necessary
    if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 &&
        (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {
        _active = false;
        return;
    }
    // send segments when `_sender.tick()` has a retransmission
    send_segments();
}

代码

libsponge/tcp_connection.hh

#ifndef SPONGE_LIBSPONGE_TCP_FACTORED_HH
#define SPONGE_LIBSPONGE_TCP_FACTORED_HH

#include "tcp_config.hh"
#include "tcp_receiver.hh"
#include "tcp_sender.hh"
#include "tcp_state.hh"

//! \brief A complete endpoint of a TCP connection
class TCPConnection {
  private:
    TCPConfig _cfg;
    TCPReceiver _receiver{_cfg.recv_capacity};
    TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn};

    //! outbound queue of segments that the TCPConnection wants sent
    std::queue<TCPSegment> _segments_out{};

    //! Should the TCPConnection stay active (and keep ACKing)
    //! for 10 * _cfg.rt_timeout milliseconds after both streams have ended,
    //! in case the remote TCPConnection doesn't know we've received its whole stream?
    bool _linger_after_streams_finish{true};

    //! the active state of the TCPConnection, it will be set false when TCPConnection has a shutdown 
    bool _active{true};

    //! the time since last segment received
    size_t _time_since_last_segment_received{0};

    //! send segments to outbound queue
    void send_segments();

    //! send a segment with RST flag
    void send_RST();

    //! TCPConnection unclean shutdown
    void unclean_shutdown();

  public:
    //! \name "Input" interface for the writer
    //!@{

    //! \brief Initiate a connection by sending a SYN segment
    void connect();

    //! \brief Write data to the outbound byte stream, and send it over TCP if possible
    //! \returns the number of bytes from `data` that were actually written.
    size_t write(const std::string &data);

    //! \returns the number of `bytes` that can be written right now.
    size_t remaining_outbound_capacity() const;

    //! \brief Shut down the outbound byte stream (still allows reading incoming data)
    void end_input_stream();
    //!@}

    //! \name "Output" interface for the reader
    //!@{

    //! \brief The inbound byte stream received from the peer
    ByteStream &inbound_stream() { return _receiver.stream_out(); }
    //!@}

    //! \name Accessors used for testing

    //!@{
    //! \brief number of bytes sent and not yet acknowledged, counting SYN/FIN each as one byte
    size_t bytes_in_flight() const;
    //! \brief number of bytes not yet reassembled
    size_t unassembled_bytes() const;
    //! \brief Number of milliseconds since the last segment was received
    size_t time_since_last_segment_received() const;
    //!< \brief summarize the state of the sender, receiver, and the connection
    TCPState state() const { return {_sender, _receiver, active(), _linger_after_streams_finish}; };
    //!@}

    //! \name Methods for the owner or operating system to call
    //!@{

    //! Called when a new segment has been received from the network
    void segment_received(const TCPSegment &seg);

    //! Called periodically when time elapses
    void tick(const size_t ms_since_last_tick);

    //! \brief TCPSegments that the TCPConnection has enqueued for transmission.
    //! \note The owner or operating system will dequeue these and
    //! put each one into the payload of a lower-layer datagram (usually Internet datagrams (IP),
    //! but could also be user datagrams (UDP) or any other kind).
    std::queue<TCPSegment> &segments_out() { return _segments_out; }

    //! \brief Is the connection still alive in any way?
    //! \returns `true` if either stream is still running or if the TCPConnection is lingering
    //! after both streams have finished (e.g. to ACK retransmissions from the peer)
    bool active() const;
    //!@}

    //! Construct a new connection from a configuration
    explicit TCPConnection(const TCPConfig &cfg) : _cfg{cfg} {}

    //! \name construction and destruction
    //! moving is allowed; copying is disallowed; default construction not possible

    //!@{
    ~TCPConnection();  //!< destructor sends a RST if the connection is still open
    TCPConnection() = delete;
    TCPConnection(TCPConnection &&other) = default;
    TCPConnection &operator=(TCPConnection &&other) = default;
    TCPConnection(const TCPConnection &other) = delete;
    TCPConnection &operator=(const TCPConnection &other) = delete;
    //!@}
};

#endif  // SPONGE_LIBSPONGE_TCP_FACTORED_HH

libsponge/tcp_connection.cc

#include "tcp_connection.hh"

#include <iostream>

// Dummy implementation of a TCP connection

// For Lab 4, please replace with a real implementation that passes the
// automated checks run by `make check`.

template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}

using namespace std;

size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }

void TCPConnection::segment_received(const TCPSegment &seg) {
    if (!_active) {
        return;
    }
    // reset the timer
    _time_since_last_segment_received = 0;
    const TCPHeader &header = seg.header();
    // if TCP does not receive SYN from remote peer, and not send SYN to remote peer
    if (!_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0) {
        // at this time, TCP acts as a server,
        // and should not receive any segment except it has SYN flag
        if (!header.syn) {
            return;
        }
        _receiver.segment_received(seg);
        // try to send SYN segment, use for opening TCP at the same time
        connect();
        return;
    }
    // the TCP ends in an unclean shutdown when receiving RST segment
    if (header.rst) {
        unclean_shutdown();
        return;
    }
    _receiver.segment_received(seg);
    // if TCP sends SYN segment as a client but does not receive SYN from remote peer,
    // the local TCP should not handle it, too.
    if (!_receiver.ackno().has_value()) {
        return;
    }

    // set the `_linger_after_streams_finish` the first time the inbound stream ends
    if (!_sender.stream_in().eof() && _receiver.stream_out().input_ended()) {
        _linger_after_streams_finish = false;
    }
    // use the remote peer's ackno and window size to update the local sending window
    if (header.ack) {
        _sender.ack_received(header.ackno, header.win);
    }

    _sender.fill_window();
    // makes sure that at least one segment is sent in reply
    if (seg.length_in_sequence_space() > 0 && _sender.segments_out().empty()) {
        _sender.send_empty_segment();
    }
    // an extra special case to respond to a keep-alive segment
    if (seg.length_in_sequence_space() == 0 && header.seqno == _receiver.ackno().value() - 1) {
        _sender.send_empty_segment();
    }
    send_segments();
}

bool TCPConnection::active() const { return _active; }

size_t TCPConnection::write(const string &data) {
    if (!_active) {
        return 0;
    }
    size_t ret = _sender.stream_in().write(data);
    _sender.fill_window();
    send_segments();
    return ret;
}

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
    if (!_active) {
        return;
    }
    _time_since_last_segment_received += ms_since_last_tick;
    _sender.tick(ms_since_last_tick);
    // TCP unclean shutdown if the number of consecutive retransmissions
    // is more than an upper limit
    if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
        send_RST();
        return;
    }
    // TCP clean shutdown if necessary
    if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 &&
        (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)) {
        _active = false;
        return;
    }
    // send segments when `_sender.tick()` has a retransmission
    send_segments();
}

void TCPConnection::end_input_stream() {
    _sender.stream_in().end_input();
    _sender.fill_window();
    send_segments();
}

void TCPConnection::connect() {
    _sender.fill_window();
    send_segments();
}

TCPConnection::~TCPConnection() {
    try {
        if (active()) {
            cerr << "Warning: Unclean shutdown of TCPConnection\n";

            // Your code here: need to send a RST segment to the peer
            send_RST();
        }
    } catch (const exception &e) {
        std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
    }
}

void TCPConnection::send_segments() {
    while (!_sender.segments_out().empty()) {
        TCPSegment segment = _sender.segments_out().front();
        _sender.segments_out().pop();
        optional<WrappingInt32> ackno = _receiver.ackno();
        // if TCP does not receive SYN segments from the remote peer, i.e. at SYN_SENT state
        // TCP will not set ACK flag and seqno
        if (ackno.has_value()) {
            segment.header().ack = true;
            segment.header().ackno = ackno.value();
        }
        // set the local receiver's window size
        segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max()
                                   ? _receiver.window_size()
                                   : numeric_limits<uint16_t>::max();
        _segments_out.emplace(segment);
    }
}

void TCPConnection::send_RST() {
    _sender.fill_window();
    if (_sender.segments_out().empty()) {
        _sender.send_empty_segment();
    }
    TCPSegment segment = _sender.segments_out().front();
    _sender.segments_out().pop();
    optional<WrappingInt32> ackno = _receiver.ackno();
    if (ackno.has_value()) {
        segment.header().ack = true;
        segment.header().ackno = ackno.value();
    }
    segment.header().win = _receiver.window_size() <= numeric_limits<uint16_t>::max() ? _receiver.window_size()
                                                                                      : numeric_limits<uint16_t>::max();
    segment.header().rst = true;
    _segments_out.emplace(segment);

    unclean_shutdown();
}

inline void TCPConnection::unclean_shutdown() {
    _sender.stream_in().set_error();
    _receiver.stream_out().set_error();
    _active = false;
}

遇到问题

  • Test #36-test#1: close
    在这里插入图片描述
    解决: 原因在于 TCPConnection::end_input_stream() 方法中结束了发送的数据流, 同时调用了 _sender.fill_window() 填充窗口发送, 但是报文段仍在 _sender 的发送队列中, 应该转移至 TCPConnection 的发送队列 _segment_out 中, 这样才能真正将带有 FIN 标志位的报文发出.

  • Test#36-test#1: 1ms pass
    在这里插入图片描述
    解决: 出现该问题的原因在于服务端的推迟(linger)时间是 10*_cfg.rt_timeout, 在这个时刻已经要关闭连接, 而非超过该时间, 在 tick() 方法中是 _time_since_last_segment_received >= 10 * _cfg.rt_timeout.

  • Test#37-test#1: 4000ms pass
    在这里插入图片描述
    解决: 原因就是 _linger_after_streams_finish 的状态未正确设置. 任务指导中的描述是: “If the inbound stream ends before the TCPConnection
    has reached EOF on its outbound stream, this variable needs to be set to false
    . 这里笔者开始将该变量设置为 false 的时机设在了 tick() 方法中, 但根据测试用例, 可能调用 tick() 时输出流也已经完毕, 此时便不能判断为被动关闭方从而设置 _linger_after_streams_finish = false, 因此最后将判断的位置改到了 segment_received() 方法中, 这样在收到远端的 FIN 标志位的第一时间即可设置 _linger_after_streams_finish 变量.

  • Test#45-test#1: 1ms pass
    在这里插入图片描述
    解决: 出现的原因在于作为客户端建立 TCP 连接时没有发送的报文没有 SYN 标志位. 原因在于指导书中描述了 segment_received() 方法中, 在收到报文后一定要发送一个确认报文, 必要时, 即当前 _sender.segment_out() 队列中为空时构建空报文. 但这个就需要事先调用 _sender.fill_window() 方法来更新 _sender 的发送队列. 不然在初始收到 SYN 报文时, 此时 _sender 的发送队列自然为空, 便会发送只有 ACK 字段的空报文.

测试

build 目录下执行 make 后执行 make check_lab4:
在这里插入图片描述
在这里插入图片描述

遗留问题

该问题是窗口大小为 1 的超时问题, 上述代码没有该问题, 但笔者并未弄清楚发生该问题的根本原因, 在此主要记录, 若有解决或明白的同学欢迎交流, 十分感谢!

问题代码

出现问题的代码是segment_received() 方法的中增加被动关闭的代码:

void TCPConnection::segment_received(const TCPSegment &seg) {
    // ....
    // use the remote peer's ackno and window size to update the local sending window
    if (header.ack) {
        _sender.ack_received(header.ackno, header.win);
        // the bug codes are follow:
        if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended() &&
            !_linger_after_streams_finish) {
            _active = false;
            return;
        }
    }
    //...
}

如上所示, 当满足 3 个前提且 _linger_after_streams_finishfalse 时直接关闭 TCP 连接并返回.
而对于 tick() 方法中正常关闭部分的判断 if (_receiver.stream_out().input_ended() && _sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && (!_linger_after_streams_finish || _time_since_last_segment_received >= 10 * _cfg.rt_timeout)), 无论是否删除 !_linger_after_streams_finish || 都会通不过测试.

问题代码思路

下面解释一下上述问题代码的思路:
这里笔者将被动关闭的条件判断放置到了 segment_received() 方法中, 这么考虑的原因在于笔者认为 TCP 连接正常关闭的前 3 个前提满足时第一时间能从该方法中判断出. 再次回顾这 3 个前提: #1是要求入站流已结束且重组完毕, #2是要求出站流已全部发送, #3是要求出站流已被远程对等点全部确认. 很明显的是前提#1一定能第一时间从 segment_received() 中获取到, 因为收到报文后通过该方法调用 _receiver.segment_received(), 内部的重组器和字节流会判断 FIN 标志位, 同时更新未重组的字节数, 这里是没问题的, 而且笔者更新 _linger_after_streams_finish 变量也设置在该方法中. 而前提#2和#3是有关系的, #3满足时#2一定满足, 不然远程对等点无法全部确认本地发送的全部数据. 而远程对等点确认的方法即发送一个带有确认号的报文段(可以是空报文段)给本地, 也会在 segment_received() 方法中得到判断, 并且是在判断报文有 ACK 标志位的前提下. 这也是上述代码的思路. 这样, 被动关闭方在收到给对方的 FIN 确认后即可关闭 TCP 连接. 在下面测试中, 实际也证实了被动关闭方可以正常关闭.
这里额外说一下, 笔者浏览了网上的代码, 大部分都是将 _linger_after_streams_finishfalse 以及根据 !_linger_after_streams_finish|| _time_since_last_segment_received >= 10 * _cfg.rt_timeout 正常关闭的两个判断封装成 clean_shutdown() 方法, 并在 send_segments() 方法的最后调用. 这与笔者的实现是不同的, 笔者将 _linger_after_streams_finishfalse 写在了 segment_received() 中, 而正常关闭写在了 tick() 方法中. 自然, 这两种实现最后都是能够通过测试的, 因为 send_segments() 方法会被 tick() 方法调用, 而 tick() 方法是由操作系统频繁调用的.

问题表现

上述问题代码的运行结果是执行 make check_lab4 后 10 个窗口大小为 1 的测试用例均不能通过, 如下图所示:
在这里插入图片描述
此处选择测试 #64 - t_ucS_16_1 进行单测, 输出如下(这里对 文件进行了修改, “DEBUG” 字符后面输出的是 TCP 的源地址):
在这里插入图片描述
而下图则是没有问题的代码在该测试用例的输出, 其中红色部分则是上图中缺少的部分:
在这里插入图片描述
下图则是根据上述信息绘制的 TCP 连接时序图(绿色划线部分为缺少的部分):
在这里插入图片描述
综上可以分析出, 当前测试未通过的原因是: TCP 连接的主动关闭方不能正常关闭, 缺少前提#1入站流结束, 即未收到被动关闭方的 FIN 报文段. 但此时被动关闭方已经关闭, 则应该满足主动方已经收到了其全部数据, 包括最后的 FIN 报文. 这也是笔者感到奇怪的地方.
此外, 如下图所示, 笔者自己进行窗口大小为 1 的测试时 TCP 连接可以正常关闭:
在这里插入图片描述
在这里插入图片描述
目前问题的原因和有问题的代码都以确定, 但是笔者仍未弄清楚出现该问题的本质原因是什么, 笔者修改的是对 TCP 连接正常关闭被动关闭方的代码, 但实际情况是测试中主动关闭方不能正常关闭. 如果有解决或弄清楚该问题的同学, 恳请赐教.

7 Performance

优化前

在优化代码之前, 即直接使用之前实验中完成的组件进行 TCP 的性能测试.
build 目录下执行 ./apps/tcp_benchmark 后输出如下, 通过 3 次测试可以看到, 在为优化前, 笔者实现的 TCP 连接的吞吐量为 0.5Gb/s 左右. 也满足了实验 0.1Gb/s 的最低要求.
在这里插入图片描述
上述代码是在 ByteStream, StreamReassembler 类中的数据结构是 C++ std 中常用的标准模板库的基础上完成的. 其中 ByteStream::_bufferdeque<char>, StreamReassembler::_bufferdeque<char>, TCPSender::_outstanding_segmentsqueue<TCPSegment>.
这里值得一提的是, Lab0 中 ByteStream 使用 deque<char> 还是 string 笔者考虑了好久, 最后考虑到 deque 在对头部删除时性能更好最后选择了 deque, 但实际若换成 string 性能更好. 如下所示, 可以到达 0.66Gb/s:
在这里插入图片描述

优化 v1

首先参照网上大部分的优化策略, 即将 ByteStream 中的缓冲区数据结构由 string 替换为内置的数据结构 BufferList, 位于 libsponge/util/buffer.hh 中. 这个数据结构的底层实际上就是一个 deque<Buffer>. 而 Buffer 是利用智能指针 shared_ptr 封装的字符串, 且有一个头部偏移 _starting_offset. 这个数据结构的优点一方面在于通过智能指针和 move 操作可以避免字符串的多次拷贝, 另一方面利用偏移量来减少字符串释放内存的开销, 只有整个字符串全部移除后才会统一释放内存(具体可见其代码), 这样自然能够提高性能, 因此选择 BufferList 作为 ByteStream 的缓冲区数据结构, 能够得到一定的性能提升. 如下图所示:
在这里插入图片描述
这里值得一提的是, BufferListsize() 方法需要遍历其中的所有 Buffer, 是一个线性时间复杂度的算法, 由于在 ByteStream 中会被频繁调用影响性能, 因此这里额外使用 _buffer_size 变量来直接记录其大小.
此外, string::assign() 方法比 string::substr() 和直接调用构造函数的性能都要好一点, 在提取字符串子串时使用该方法可以得到细微的性能提升.
在这里插入图片描述
更新的 ByteStream 代码如下:

libsponge/byte_stream.hh

#ifndef SPONGE_LIBSPONGE_BYTE_STREAM_HH
#define SPONGE_LIBSPONGE_BYTE_STREAM_HH

#include "buffer.hh"

#include <string>

//! \brief An in-order byte stream.

//! Bytes are written on the "input" side and read from the "output"
//! side.  The byte stream is finite: the writer can end the input,
//! and then no more bytes can be written.
class ByteStream {
  private:
    // Your code here -- add private members as necessary.

    // Hint: This doesn't need to be a sophisticated data structure at
    // all, but if any of your tests are taking longer than a second,
    // that's a sign that you probably want to keep exploring
    // different approaches.
    const size_t _cap;  //! The capacity of the stream buffer

    BufferList _buffer{};       //!< Byte stream buffer
    size_t _buffer_size = 0;    //! Total number of bytes in buffer
    size_t _total_read = 0;     //!< Total number of bytes written
    size_t _total_written = 0;  //!< Total number of bytes popped
    bool _end = false;          //!< Flag indicating that the byte stream has reached its ending.
    bool _error = false;        //!< Flag indicating that the stream suffered an error.

  public:
    //! Construct a stream with room for `capacity` bytes.
    ByteStream(const size_t capacity);

    //! \name "Input" interface for the writer
    //!@{

    //! Write a string of bytes into the stream. Write as many
    //! as will fit, and return how many were written.
    //! \returns the number of bytes accepted into the stream
    size_t write(const std::string &data);

    //! \returns the number of additional bytes that the stream has space for
    size_t remaining_capacity() const;

    //! Signal that the byte stream has reached its ending
    void end_input();

    //! Indicate that the stream suffered an error.
    void set_error() { _error = true; }
    //!@}

    //! \name "Output" interface for the reader
    //!@{

    //! Peek at next "len" bytes of the stream
    //! \returns a string
    std::string peek_output(const size_t len) const;

    //! Remove bytes from the buffer
    void pop_output(const size_t len);

    //! Read (i.e., copy and then pop) the next "len" bytes of the stream
    //! \returns a string
    std::string read(const size_t len);

    //! \returns `true` if the stream input has ended
    bool input_ended() const;

    //! \returns `true` if the stream has suffered an error
    bool error() const { return _error; }

    //! \returns the maximum amount that can currently be read from the stream
    size_t buffer_size() const;

    //! \returns `true` if the buffer is empty
    bool buffer_empty() const;

    //! \returns `true` if the output has reached the ending
    bool eof() const;
    //!@}

    //! \name General accounting
    //!@{

    //! Total number of bytes written
    size_t bytes_written() const;

    //! Total number of bytes popped
    size_t bytes_read() const;
    //!@}
};

#endif  // SPONGE_LIBSPONGE_BYTE_STREAM_HH

libsponge/byte_stream.cc

#include "byte_stream.hh"

// Dummy implementation of a flow-controlled in-memory byte stream.

// For Lab 0, please replace with a real implementation that passes the
// automated checks run by `make check_lab0`.

// You will need to add private members to the class declaration in `byte_stream.hh`

template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}

using namespace std;

ByteStream::ByteStream(const size_t capacity) : _cap(capacity) {}

size_t ByteStream::write(const string &data) {
    const size_t size = min(data.size(), _cap - _buffer_size);
    //
    _buffer.append(BufferList(move(string().assign(data.begin(), data.begin() + size))));
    _buffer_size += size;
    _total_written += size;
    return size;
}

//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {
    string str = _buffer.concatenate();
    return string().assign(string().assign(str.begin(), str.begin() + min(len, _buffer_size)));
}

//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {
    const size_t size = min(len, _buffer_size);
    _buffer.remove_prefix(size);
    _buffer_size -= size;
    _total_read += size;
}

//! Read (i.e., copy and then pop) the next "len" bytes of the stream
//! \param[in] len bytes will be popped and returned
//! \returns a string
std::string ByteStream::read(const size_t len) {
    const size_t size = min(len, _buffer_size);
    string str = _buffer.concatenate();
    _buffer.remove_prefix(size);
    _buffer_size -= size;
    _total_read += size;
    return str.assign(str.begin(), str.begin() + size);
}

void ByteStream::end_input() { _end = true; }

bool ByteStream::input_ended() const { return _end; }

size_t ByteStream::buffer_size() const { return _buffer_size; }

bool ByteStream::buffer_empty() const { return _buffer_size == 0; }

bool ByteStream::eof() const { return _end && buffer_empty(); }

size_t ByteStream::bytes_written() const { return _total_written; }

size_t ByteStream::bytes_read() const { return _total_read; }

size_t ByteStream::remaining_capacity() const { return _cap - _buffer_size; }

优化 v2

网上主要使用的就是 #优化 v1 中提到的方法, 且吞吐量基本上能够到 2Gb/s 以上. 此处笔者还是 0.7Gb/s 上下. 因此笔者分析仍有一个地方严重影响性能. 容易分析得到问题出在 StreamRessambler 上.
原本笔者 StreamRessambler 的实现是基于 deque 的, 需要对收到的字符串逐字节扫描, 在字符串很长的情况下性能很差. 后来笔者使用 set 容器按照字符串片段来存储, 同时配合一个类似内置 Buffer 类的 BufferPlus 类进行字符串的存储避免复制, 最终显著提高了性能.
关于具体优化可见 Lab1 #优化后 部分.
优化后的 TCP 吞吐量达到了 2.6Gb/s, 如下图所示:
在这里插入图片描述

8 webget revisited

按照任务指导修改 apps/webget.cc 中代码后, 在 build 目录下执行 make 然后执行 make check_webget:
在这里插入图片描述

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:17:01  更:2022-05-21 19:17:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/29 11:09:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计