IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 基于WebSocket进行MQTT通信 -> 正文阅读

[网络协议]基于WebSocket进行MQTT通信

1、概述

MQTT是物联网主流通信协议,但是很多终端天然不具备Mqtt通信能力,比如Web H5、小程序等终端形式,这些终端提供更底层的WebSocket通信方式。因此,研究基于WebSocket进行Mqtt通信是非常普遍的需求。

?

2、基于WebSocket进行MQTT通信

2.1通信框架

基于WebSocket进行MQTT通信框架代码如下:

public void startup() {
		mainGroup = new NioEventLoopGroup();
		subGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap server = new ServerBootstrap();
			// 绑定两个线程组
			server.group(mainGroup, subGroup)
					// 指定NIO的模式
					.channel(NioServerSocketChannel.class)
					// 子处理器,用于处理workerGroup
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();

							ch.pipeline().addLast("logging", new LoggingHandler("DEBUG"));// 设置log监听器,并且日志级别为debug,方便观察运行流程

							// websocket 基于http协议,所以要有http编解码器 服务端用HttpServerCodec
							pipeline.addLast(new HttpServerCodec());
							// 对写大数据流的支持
							pipeline.addLast(new ChunkedWriteHandler());

							/**
							 * 我们通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,我们需要绑定HttpObjectAggregator,然后我们
							 * 就可以收到一个FullHttpRequest-是一个完整的请求信息。
							 * 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
							 * 几乎在netty中的编程,都会使用到此hanler
							 */
							pipeline.addLast(new HttpObjectAggregator(1024 * 64));

							// ====================== 以上是用于支持http协议 , 以下是支持httpWebsocket
							// ======================

							/**
							 * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws 本handler会帮你处理一些繁重的复杂的事 会帮你处理握手动作:
							 * handshaking(close, ping, pong) ping + pong = 心跳
							 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
							 */
							// pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

							// 自定义的handler
							pipeline.addLast(new WebsocketMqttHandler());
							pipeline.addLast(new MqttMessageWebSocketFrameEncoder());
							pipeline.addLast(MqttEncoder.INSTANCE);
							pipeline.addLast(new MqttDecoder());
							pipeline.addLast(new MqttHandler());
						}
					});

			// 启动server,并且设置8088为启动的端口号,同时启动方式为同步
			ChannelFuture future = server.bind(port).sync();
			// 监听关闭的channel,设置位同步方式
			future.channel().closeFuture().sync();
		} catch (Exception e) {
			System.out.println("start exception" + e.toString());
		} finally {
			// 退出线程组
			mainGroup.shutdownGracefully();
			subGroup.shutdownGracefully();
		}
	}

框架基于Java netty库实现,本文关注基于WebSocket的MQTT通信,MQTT本身的Java实现不是本文分析重点,?详情请参考MQTT物联网网关Broker与Java开源实现?。第44-46行添加的MqttEncoder、MqttDecoder和MqttHandler和MQTT物联网网关Broker与Java开源实现?描述的功能相同,共同完成Mqtt协议的处理。第42行的WebsocketMqttHandler需要在Mqtt协议处理之前从WebSocket报文内容里面提取出Mqtt报文;第43行的MqttMessageWebSocketFrameEncoder用于将要发送出去的Mqtt报文编码成WebSocket报文。

WebsocketMqttHandler两大功能:建立连接、收发报文。

WebsocketMqttHandler的核心代码如下:

@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 获取客户端传输过来的消息
		logger.info("收到消息:" + msg);
		if (msg instanceof FullHttpRequest) {
			// 以http请求形式接入,但是走的是websocket
			handleHttpRequest(ctx, (FullHttpRequest) msg);
		} else if (msg instanceof WebSocketFrame) {
			// 处理websocket客户端的消息
			handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
		}

	}

其主体功能包括对Http报文的处理和对WebSocket帧的处理。

  • Http报文处理:http报文用于客户端和Broker之间建立连接;
  • WebSocket帧处理:从WebSocket报文帧里面提取(组合)Mqtt报文。?

2.2建立WebSocket连接

以微信小程序作为客户端,建立与服务端简的WebSocket连接,客户端操作详情请参考微信小程序MQTT通信及开源框架实现,本文关注基于WebSocket进行MQTT的流程。

建立WebSocket连接由两个步骤完成:

  • ?客户端首先通过http协议发送一条协议升级(升级到websocket)请求报文;
  • 服务端进行握手成功后返回一条101 http协议报文表示WebSocket连接已建立。

其代码实现如下:

/**
	 * 唯一的一次http请求,用于创建websocket
	 */
	private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
		// 要求Upgrade为websocket,过滤掉get/Post
		if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
			// 若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
			sendHttpResponse(ctx, req,
					new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
			return;
		}
		logger.info(req.content());
		// ctx.fireChannelRead(req.content());
		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
				"ws://localhost:1885/websocket",
				"mqtt", false);
		handshaker = wsFactory.newHandshaker(req);
		if (handshaker == null) {
			WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
		} else {
			req.headers().set("Sec-WebSocket-Protocol", "mqtt");
			handshaker.handshake(ctx.channel(), req);
		}
	}

?识别出收到的报文是Http报文时在第22行进行握手完成协议升级成websocket协议建立连接。

建立连接过程跟踪

采用wireshark工具对连接过程进行跟踪可以看到如下报文信息:

WebSocket连接建立报文跟踪?

从服务器端返回的101报文表示建立连接成功。

?2.3基于WebSocket进行MQTT通信

建立websocket连接后,Mqtt报文基于WebSocket进行传输,结束WebSocket报文后进行如下处理:

接收报文

private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
		// 判断是否关闭链路的指令
		if (frame instanceof CloseWebSocketFrame) {
			handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
			return;
		}
		// 判断是否ping消息
		if (frame instanceof PingWebSocketFrame) {
			ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
			return;
		}
		ByteBuf echoMsg = frame.content();
		frame.retain();
		ctx.fireChannelRead(echoMsg);
	}

?在第14行继续交给下一个handler(即Mqtt相关handler,包括MqttDecoder、MqttHandler)处理。

发送报文

在MqttMessageWebSocketFrameEncoder里对要发送的Mqtt报文进行WebSocket封装,如下第9行所示:

@Override
	protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		if (msg == null)
			return;

		// byte[] data = ;

		// out.add(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
		ctx.channel().writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(msg)));
	}

3、更多

开源项目:Open-Api

?更多信息:www.lokei.cn?

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-09-15 02:20:34  更:2022-09-15 02:20:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/19 10:03:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码