IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 《ZLToolKit源码学习笔记》(23)网络模块之UdpServer -> 正文阅读

[网络协议]《ZLToolKit源码学习笔记》(23)网络模块之UdpServer

系列文章目录

《ZLToolKit源码学习笔记》(1)VS2019源码编译

《ZLToolKit源码学习笔记》(2)工具模块之日志功能分析

《ZLToolKit源码学习笔记》(3)工具模块之终端命令解析

《ZLToolKit源码学习笔记》(4)工具模块之消息广播器

《ZLToolKit源码学习笔记》(5)工具模块之资源池

《ZLToolKit源码学习笔记》(6)线程模块之整体框架概述

《ZLToolKit源码学习笔记》(7)线程模块之线程池组件:任务队列与线程组

《ZLToolKit源码学习笔记》(8)线程模块之线程负载计算器

《ZLToolKit源码学习笔记》(9)线程模块之任务执行器

《ZLToolKit源码学习笔记》(10)线程模块之线程池

《ZLToolKit源码学习笔记》(11)线程模块之工作线程池WorkThreadPool

《ZLToolKit源码学习笔记》(12)事件轮询模块之整体框架概述

《ZLToolKit源码学习笔记》(13)事件轮询模块之管道的简单封装

《ZLToolKit源码学习笔记》(14)事件轮询模块之定时器

《ZLToolKit源码学习笔记》(15)事件轮询模块之事件轮询器EventPoller

《ZLToolKit源码学习笔记》(16)网络模块之整体框架概述

《ZLToolKit源码学习笔记》(17)网络模块之基础接口封装类SockUtil

《ZLToolKit源码学习笔记》(18)网络模块之Buffer缓存

《ZLToolKit源码学习笔记》(19)网络模块之套接字封装

《ZLToolKit源码学习笔记》(20)网络模块之TcpServer

《ZLToolKit源码学习笔记》(21)网络模块之TcpClient

《ZLToolKit源码学习笔记》(22)网络模块之TcpSession

《ZLToolKit源码学习笔记》(23)网络模块之UdpServer(本文)


?前言

本节学习ZLToolKit的UDP服务器封装。


目录

系列文章目录

?前言

一、概述

1.1、类图

1.2、网络模型

1.3、如何实现连接的负载均衡

1.4、UDP服务器如何实现监听socket与会话socket的分离

二、代码实现

2.1、新socket的创建过程

2.2、处理server fd多次收到同一client连接发送的数据

2.3、新socket分配到其它线程可能存在的问题


一、概述

1.1、类图

Server类作为基类,将服务器和EventPoller进行了关联。

1.2、网络模型

服务器模型:与TcpServer类似,使用多线程+epoll (select),一个Server?fd + 多个epoll实例

每一个线程都创建了一个epoll实例,并以ET边沿触发模式监听同一个Server fd的读事件,使用EPOLLEXCLUSIVE?标志位防止惊群效应,线程阻塞在epoll_wait上等待客户端连接。

当有客户端发送数据到Server fd时,针对该客户端创建一个新的会话(新的文件描述符,同样使用ET边沿触发),后续该客户端的数据将不会再发送到Server fd。

UdpServer与TcpServer流程基本一致,以下仅简单分析下。

1.3、如何实现连接的负载均衡

ZLToolKit使用多线程技术,每一个线程中都创建了自己私有的epoll实例(非linux系统使用select,不做分析),并以ET模式监听同一个server fd的读(UDP还有写)事件,这种方式我们知道是有惊群效应的,所以需要给每一个fd事件加上EPOLLEXCLUSIVE标志位(内核4.5+支持)来避免惊群。后续客户端的fd将均匀的分配到这多个线程中处理。

每一个线程都能接受新的客户端连接,并且管理一部分已有的客户端连接。

1.4、UDP服务器如何实现监听socket与会话socket的分离

对于TCP服务器,由于TCP是有连接的,server fd使用listen系统调用,只用来处理client连接过程,然后使用accept为客户端连接分配一个新的client fd,后续针对该客户端的会话数据读写就和server fd没关系了。说白了,就是TCP四元组在这个过程中就唯一确定并缓存下来了,后续该客户端发送的数据,通过查询四元组,就可以唯一匹配到对应的client fd上。

可以看到,TCP服务器的监听socket与会话socket天然就是分开的,每一个客户端都有独立的client fd。

但是对于UDP服务器,由于它是无连接的,不用经过三次握手的过程就可以直接发送数据,一个UDP socket没有服务器客户端之分,对于服务器来说,一个socket就可以接受所有的客户端数据,这必然导致数据的处理变得非常复杂,比如,如何区分不同的客户端,如何高效且安全的在多线程下处理客户端数据等。

我们看下ZLToolKit是如何解决的。在TCP服务器中,有一个只处理客户端连接的socket(listen fd),然后每一个客户端都有自己的client socket。接受新连接和客户端的数据交互是分开的。

而UDP中,这两者是混杂在一起的。所以,我们可以模拟TCP,当server fd对应的socket上收到某个客户端发来的数据后,不通过server socket去响应该客户端,而是新建立一个socket,该socket与server socket绑定相同的IP和端口(通过SO_REUSEADDR实现),并且,为了确定唯一的四元组,在该socket上调用connect API与客户端进行了关联。这样,后续客户端发送的数据将被转发到这个新的socket上来处理。

当然,使用这种方式实现的UDP服务器还有很多弊端,比如客户端程序必须绑定固定的IP和端口,不然每次客户端都随机使用一个端口来发送数据,会导致服务器的四元组失效。更详细的介绍可以参见以下文章:

告知你不为人知的 UDP:连接性和负载均衡 - 知乎


二、代码实现

2.1、新socket的创建过程

?createSession函数中,以下三行代码就是创建新socket的过程,新socket与server socket绑定相同的IP和端口,然后再connect到peer客户端,确定四元组。

socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());
socket->bindPeerAddr((struct sockaddr *)addr_str.data(), addr_str.size());
//在connect peer后再取消绑定关系, 避免在 server 的 socket 或其他cloned server中收到后续数据包.
SockUtil::dissolveUdpSock(_socket->rawFD());

2.2、处理server fd多次收到同一client连接发送的数据

一般仅在一个新的客户端连接第一次发送数据时,data才会在server fd上被接收到,其它时间都会发送到已创建的新socket上。但这不是绝对的,所以服务器还需要处理多次收到同一client发送数据的情况。

void UdpServer::onRead(const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
    const auto id = makeSockId(addr, addr_len);
    onRead_l(true, id, buf, addr, addr_len);
}

void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
    // udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
    bool is_new = false;
    if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
        if (session->getPoller()->isCurrentThread()) {
            //当前线程收到数据,直接处理数据
            session->onRecv(buf);
        } else {
            //数据漂移到其他线程,需要先切换线程
            WarnL << "udp packet incoming from other thread";
            std::weak_ptr<Session> weak_session = session;
            //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
            auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
            session->async([weak_session, cacheable_buf]() {
                if (auto strong_session = weak_session.lock()) {
                    strong_session->onRecv(cacheable_buf);
                }
            });
        }

#if !defined(NDEBUG)
        if (!is_new) {
            TraceL << "udp packet incoming from " << (is_server_fd ? "server fd" : "other peer fd");
        }
#endif
    }
}

server fd接收到客户端数据时,获取已存在的会话或者创建一个新的会话(创建新的sokcet的过程),该会话将处理后续所有这个客户端的数据,server fd将不会再收到该客户端的数据。

这里有个注意点,在创建新的会话时,可能客户端已经发送了多条数据了,这多次数据都发送到了server fd上,server fd在处理第一条数据时,虽然已经创建新的会话处理后续客户端发送的数据,但是已经发出来的数据还是会在server fd上处理,所以,此处才需要查找该客户端对应的已存在会话,让其来处理这部分已经发送出来的数据。

为什么要判断是不是当前线程呢?从上边的描述可以看到,server fd可能收到多次客户端发来的数据,在第一条数据时,我们已经创建了一个新的会话socket,这个会话socket可能会被放在其它poller线程去监听,所以,server fd上后续的数据,就需要转移到这个poller线程中,确保同一个client发送过来的数据始终在一个线程中处理。但实际上,ZLToolkit的实现中,将新的会话socket还是放在当前线程中处理了,并没有放在其它线程。以下代码可以看到,新socket使用的还是server socket的poller线程。

auto socket = createSocket(_poller, buf, addr, addr_len);

而在TCP服务器的实现中,是选择了负载最轻的线程来管理新的会话socket,所以存在socket被其它线程管理的情况,参见以下代码:

Socket::Ptr TcpServer::onBeforeAcceptConnection(const EventPoller::Ptr &poller) {
    assert(_poller->isCurrentThread()); //在_poller线程则允许继续执行,执行accept操作的就是当前_poller线程
    //此处改成自定义获取poller对象,防止负载不均衡
    return createSocket(EventPollerPool::Instance().getPoller(false));
}

2.3、新socket分配到其它线程可能存在的问题

createSession函数中,以下代码本人觉得可能存在问题。

   if (socket->getPoller()->isCurrentThread()) {
        //该socket分配在本线程,直接创建session对象,并处理数据
        return session_creator();
    }

    //该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
    auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
    socket->getPoller()->async([session_creator, cacheable_buf]() {
        //在该socket所在线程创建session对象
        auto session = session_creator();
        if (session) {
            //该数据不能丢弃,给session对象消费
            session->onRecv(cacheable_buf);
        }
    });
    return s_null_session;

如果新的会话socket分配在其它线程,将使用异步任务执行,假设会话在还没有创建成功前,当前server线程再次收到该客户端数据,在_session_map中没有查询到对应的会话,就会再次触发创建新会话的操作。

目前的实现新会话socket还是在当前线程,所以不存在上述问题。

?

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-01-17 11:49:32  更:2022-01-17 11:49:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 5:56:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码