IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 嵌入式 -> Netty IoT百万长连接优化 -> 正文阅读

[嵌入式]Netty IoT百万长连接优化

Iot推送系统?

Iot

物联网( IoT ,Internet of things )即“万物相连的互联网”,是互联网基础上的延伸和扩展的网络,将各种信息传感设备与网络结合起来而形成的一个巨大网络,实现任何时间、任何地点,人、机、物的互联互通.

也就是设备与设备 或 设备与人之间的交互。

物联网推送系统设计

物联网中的推送系统和互联网的推送系统很相似;但对于物联网有他的特性,由于要接入海量的硬件设备和传感器,且协议多样化,同时还要在极短的时间内处理大量的数据,所以对服务端的协议接入和处理能力要求极高。

  • app去连接推送服务器

  • 设备连接注册中心并获得到注册中心地址?

  • 添加多服务器来解决大批量连接问题??

  • 添加注册中心解决服务器的管理? ?所有设备去访问注册中心获取地址,做一个代理? ;

    服务器去注册更新?

  • 建立连接,以保证数据的实时性,设备订阅需要设备需要的推送消息 ,每个设备可能想要的不同

  • 创建redis 用于推送消息,缓存起来消息,记录设备连接服务器的信息 订阅的主题信息

  • ?使用httpAPI接口 rpc接口来作为消息推送件

  • mq消息中间件存储消息,用服务器去订阅消息

利用netty来构建出一个iot消息推送系统 。

 @PostConstruct
    public void init() throws Exception {

        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
        log.info("Starting MQTT transport...");

        log.info("Starting MQTT transport server");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        NioEventLoopGroup bizGroup = new NioEventLoopGroup(100, new DefaultEventExecutor(new DefaultThreadFactory("biz-pool")));
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                        pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
                        pipeline.addLast("encoder", MqttEncoder.INSTANCE);
                        //pipeline.addLast("idleStateHandler", new IdleStateHandler(10,2,12, TimeUnit.SECONDS));
                        MqttTransportHandler handler = new MqttTransportHandler(protocolProcess);
                        pipeline.addLast(bizGroup, handler);
                    }
                });

        serverChannel = b.bind(host, port).sync().channel();


        log.info("Mqtt transport started!");
    }

消息处理机制实现

?这里使用?ProtocolProcess?来作为工厂类创建多个工厂创建类

然后使用的mqtt协议来实现的消息间传输

MQTT(消息队列遥测传输)是ISO?标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件?

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

网络应用层的协议。也是广泛应用在物联网中,也是得益于他自己的格式,头部很简单,方便使用,占用宽带小也是他的特性,具体的协议格式可以看看下面的文章

MQTT协议

Netty中心跳检测机制

在针对海量设备连接iot服务端时,一定要添加完整的服务端策略,并且及时检测失效连接。防止无效的连接继续存在;需要设置合理的心跳周期,防止心跳任务挤压。使用netty提供链路空闲检测机制。不要自己创建定时任务群,以免过多消耗服务端的资源。

心跳检测周期通常不要超过60s,心跳检测超时通常为心跳检测周期的2倍

在netty中使用方式 ,在启动时获取到 channel

 //处理连接心跳包
        if (msg.variableHeader().keepAliveTimeSeconds() > 0) {
            if (channel.pipeline().names().contains("idle")) {
                channel.pipeline().remove("idle");
            }
            channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
        }

这里其中包括的参数

  • 第一个参数设置请求参数超时心跳检测时间
  • 第二个参数设置响应参数超时心跳检测时间
  • 第三个参数?将在既不执行读取也不执行写入时触发
 /**
     * Creates a new instance firing {@link IdleStateEvent}s.
     *
     * @param readerIdleTimeSeconds
     *          将在未对指定对象执行读取时触发
     * @param writerIdleTimeSeconds
     *        将在未对指定的执行写入时触发
     * @param allIdleTimeSeconds
        将在既不执行读取也不执行写入时触发
     */
    public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

怎么去实现这个心跳机制 ,在netty的注解中找到答案。

 * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
 * public class MyHandler extends {@link ChannelDuplexHandler} {
 *     {@code @Override}
 *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
 *         if (evt instanceof {@link IdleStateEvent}) {
 *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
 *             if (e.state() == {@link IdleState}.READER_IDLE) {
 *                 ctx.close();
 *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
 *                 ctx.writeAndFlush(new PingMessage());
 *             }
 *         }
 *     }
 * }

这里的触发程序一定要从IdleStateEvent? 中去检测,而使用的方式,例如?


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                Channel channel = ctx.channel();
                String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
                // 发送遗嘱消息
                if (this.protocolProcess.getGrozaSessionStoreService().containsKey(clientId)) {
                    SessionStore sessionStore = this.protocolProcess.getGrozaSessionStoreService().get(clientId);
                    if (sessionStore.getWillMessage() != null) {
                        this.protocolProcess.publish().processPublish(ctx.channel(), sessionStore.getWillMessage());
                    }
                }
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

心跳检测的过程

也就是检测客户端是否失联。

?说一点并不是所有的网络应用程序都有心跳检测机制,根据业务判断的。

百万连接优化

测试环境

计算机区分不同连接的方式:TCP连接四元组->服务器? IP +服务器port+客户端ip +客户端port

一台服务器和一台客户端最多支撑6万个连接。 而在服务端怎么区分连接的也就是服务器? IP +服务器port+客户端ip +客户端port。

调整为

这样就能解决百万连接的端口号数据不够的情况 服务端建立

        //这里开启 10000到100099这100个端口
        for (int i = 0; i < nPort; i++) {
            int port = beginPort + i;
            bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
                System.out.println("端口绑定成功: " + port);
            });
        }

这里启动服务端时,有可能会出现下面的问题

文件句柄数不够的情况

在操作系统中有个? 文件中可以控制打开文件的句柄数,通过? cat /proc/sys/fs/file-max 可以看到

?

这就需要修改一改一下配置。 使用 vim? /etc/sysctl.conf? ?

使用 sysclt -p 使之生效

?以及 单个文件打开数 使用 ulimit -a? open -file? 都需要修改?

也会报错的。

使用 vim /etc/security/limits.conf 去调整

?以及 tail -f 这些linux基本命令大家应该都会,这里就不深入讲解了。

下面对linux进行配置,在/etc/sysctl.conf? ?添加配置 一个连接大约在系统中占7.5kb

设置tcp连接内存 tcp_mem? 单位为pg 页 一个页为 4kb,? getconf pagesize 查看page占用,第一个数最小值? 第二个运行? 默认值,第三个 最大连接值

tcp_wmem tcp写入缓冲区??

tcp_rmem tcp读缓冲区

包括保持时间等等? 按照这个配置就行

一定要执行sysclt -p 然后生效。

  嵌入式 最新文章
基于高精度单片机开发红外测温仪方案
89C51单片机与DAC0832
基于51单片机宠物自动投料喂食器控制系统仿
《痞子衡嵌入式半月刊》 第 68 期
多思计组实验实验七 简单模型机实验
CSC7720
启明智显分享| ESP32学习笔记参考--PWM(脉冲
STM32初探
STM32 总结
【STM32】CubeMX例程四---定时器中断(附工
上一篇文章      下一篇文章      查看所有文章
加:2021-10-04 13:00:11  更:2021-10-04 13:00:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/1 21:46:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码