MQTT简介
MQTT: Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。 MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。 MQTT属于那一层: TCP/IP参考模型可以分为四层:应用层、传输层、网络层、链路层。TCP和UDP位于传输层,应用层常见的协议有HTTP、FTP、SSH等。MQTT协议运行于TCP之上,属于应用层协议,因此只要是支持TCP/IP协议栈的地方,都可以使用MQTT 流程图:
MQTT特性
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
- 对负载内容屏蔽的消息传输
- 使用 TCP/IP 提供网络连接
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。使用 Last Will 和 Testament 特性(最后遗嘱)通知有关各方客户端异常中断的机制
- 有三种消息发布服务质量:
qos为0:“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 qos为1:“至少一次”,确保消息到达,但消息重复可能会发生。这一级别可用于如下情况,你需要获得每一条消息,并且消息重复发送对你的使用场景无影响。 qos为2:“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
MQTT协议原理
实现方式 : 实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。 MQTT传输的消息分为:主题(Topic)和负载(payload)两部分: (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload); (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
环境搭建
服务端搭建
-
开源的mqtt服务器有:EMQX,Mosquitto,Apollo:ActiveMQ的升级版 这里我使用的是Apollo,可以直接在Windows上安装. 下载 Apollo 下载:http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/ -
安装 解压得到如下文件。 这里需要注意了,解压 apache-apollo-1.7.1 所在文件夹名称不能有 中文或者空格,后面会提到出现什么错误。 安装 JDK,配置环境变量 进入 apache-apollo-1.7.1-windows-distro\apache-apollo-1.7.1\bin 文件夹 cmd 窗口执行 apollo.cmd,可以看到如下的信息 -
创建服务器实例 在 CMD 命令窗口输入 apollo create mqtt,当然 mqtt 这个名字可以随便起。 -
创建成功后: 然后查看 mqtt可以发现里面包含有很多信息,其中etc\apollo.xml文件下是配置服务器信息的文件,etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,可以修改原始的admin=password,可以接着换行添加新的用户名密码。 bin:保存与该实例关联的执行脚本。 etc:保存实例配置文件 data:保存用于存储持久消息的数据文件 日志:保存旋转日志文件 tmp:保存在代理运行之间安全删除的临时文件 -
打开cmd,运行 apache-apollo-1.7.1\bin\mybroker\bin\apollo-broker.cmd run 开启服务器,如下图所示: 红框中的地址和端口会用到 然后打开浏览器上,输入 http://127.0.0.1:61680/ 或 https://127.0.0.1:61681/ 即可进入 Apollo Console 窗口进行登录。 -
安装mqtt客户端 可以用MQTTLens,paho,连接apollo服务,我这里用的paho 设置好参数后,就可以连接了 -
apollo目录下有一个example,其中有个文件,可以双击点开然后也可以连接apollo,来发布和订阅topic消息,从而可以和phao互动,达到聊天室的功能 Android客户端: -
Android demo 1.build.gradle中添加依赖(根目录) 模块下的build.gradle导入paho 2.开启服务,如下:在deviceservice进程开启MQTT服务
private void init() {
String serverURI = HOST;
mqttAndroidClient = new MqttAndroidClient (this, serverURI, CLIENT_ID);
mqttAndroidClient.setCallback(mqttCallback);
mMqttConnectOptions = new MqttConnectOptions ();
mMqttConnectOptions.setCleanSession(true);
mMqttConnectOptions.setConnectionTimeout(10);
mMqttConnectOptions.setKeepAliveInterval(20);
mMqttConnectOptions.setUserName(USERNAME);
mMqttConnectOptions.setPassword(PASSWORD.toCharArray());
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + CLIENT_ID + "\"}";
String topic = PUBLISH_TOPIC;
int qos = 2;
boolean retained = false;
if (!TextUtils.isEmpty (message) && !TextUtils.isEmpty (topic)) {
try {
mMqttConnectOptions.setWill(topic, message.getBytes(), qos, retained);
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}
private void doClientConnection() {
if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
try {
mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "连接成功 ");
try {
mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "连接失败 ");
doClientConnection();
}
};
public static void publish(String message) {
String topic = PUBLISH_TOPIC;
int qos = 2;
boolean retained = false;
try {
mqttAndroidClient.publish(topic, message.getBytes(), qos, retained);
} catch (MqttException e) {
e.printStackTrace();
}
}
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
Toast.makeText(getApplicationContext(), "messageArrived: " + new String(message.getPayload()), Toast.LENGTH_LONG).show();
response("message arrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "连接断开 ");
doClientConnection();
}
};
MQTT协议数据包结构
数据结构: 在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下: (1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。 (2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。 (3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。 固定头:
-
MQTT固定报文头最少有两个字节,第一字节包含消息类型(Message Type)和QoS级别等标志位。第二字节开始是剩余长度字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节 剩余长度字段单个字节最大值为二进制0b0111 1111,16进制0x7F。也就是说,单个字节可以描述的最大长度是127字节。为什么不是256字节呢?因为MQTT协议规定,单个字节第八位(最高位)若为1,则表示后续还有字节存在,第八位起“延续位”的作用。 例如,数字64,编码为一个字节,十进制表示为64,十六进制表示为0×40。数字321(65+2*128)编码为两个字节,重要性最低的放在前面,第一个字节为65+128=193(0xC1),第二个字节是2(0x02),表示2×128。 -
假如数字 68 ,16进制表示 0X44 ,大小小于127 ,所以 编码规则和常规一样。就是0X44. 假如数字 321 大于127 ,所以编码需要遵守编码规则,应该是 0XC1 0X02,具体的解释如图 -
数据总长155字节:34 98 01 00 0a 74 6f 70 69 63 5f 74 65 73 74 00 02 e4 bd a0 e5 a5 bd e5 91 80 e3 80 82 e3 80 82 66 67 67 67 25 79 67 67 67 68 68 68 68 68 68 68 68 75 75 68 68 68 68 68 67 67 67 68 68 68 67 68 68 68 75 6a 68 68 62 68 67 67 67 76 76 76 76 76 76 76 76 62 62 62 68 6a 6a 6a 6a 6a 6a 6a 6a 6e 62 67 67 79 68 68 6a 68 68 68 68 68 6a 6a 68 6a 6a 6a 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 68 62 62 67 67 68 6a 6a 68 68 67 67 67 62 62 62 76 66 67 67 68 68 68 内容长度:98 01 0x98 二进制:1001 1000 出去第一位标志位后:11000 十进制为24 内容长度为:24+128*01=152 总长是155减去前面3个字节为152 -
数据总长36字节:34 22 00 0f 6d 65 73 73 61 67 65 5f 61 72 72 69 76 65 64 00 05 6d 65 73 73 61 67 65 20 61 72 72 69 76 65 64 内容长度:0x22 十进制为34 内容长度为:34 总长是36减去前面2个字节为34 -
由于MQTT协议最多只允许使用四个字节表示剩余长度(如表1),并且最后一字节最大值只能是0x7F不能是0xFF,所以能发送的最大消息长度是256MB,而不是512MB。 可变报文头: MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是作为包的标识: 很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。 消息类型: 固定报文头中的第一个字节包含连接标志(Connect Flags),连接标志用来区分MQTT的消息类型。MQTT协议拥有14种不同的消息类型(如表2),可简单分为连接及终止、发布和订阅、QoS 2消息的机制以及各种确认ACK。至于每一个消息类型会携带什么内容,这里不多阐述 消息质量: MQTT消息质量有三个等级,QoS 0,QoS 1和 QoS 2。 QoS 0:最多分发一次。消息的传递完全依赖底层的TCP/IP网络,协议里没有定义应答和重试,消息要么只会到达服务端一次,要么根本没有到达。 QoS 1:至少分发一次。服务器的消息接收由PUBACK消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。 QoS 2:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。 通过下面的例子可以更深刻的理解上面三个传输质量等级。 比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。最后用户用完单车后,需要提交付款表单,可以使用QoS level 2质量消息,这样确保只传递一次数据,否则用户就会多付钱了。
遗愿标志(Will Flag): 在可变报文头的连接标志位字段(Connect Flags)里有三个Will标志位:Will Flag、Will QoS和Will Retain Flag,这些Will字段用于监控客户端与服务器之间的连接状况。如果设置了Will Flag,就必须设置Will QoS和Will Retain标志位,消息主体中也必须有Will Topic和Will Message字段。 那遗愿消息是怎么回事呢?服务器与客户端通信时,当遇到异常或客户端心跳超时的情况,MQTT服务器会替客户端发布一个Will消息。当然如果服务器收到来自客户端的DISCONNECT消息,则不会触发Will消息的发送。 因此,Will字段可以应用于设备掉线后需要通知用户的场景。
保活机制: MQTT客户端可以设置一个心跳间隔时间(keep Alive Timer),表示在每个心跳检测时间内发送一条消息。如果在这个时间周期内,没有业务数据相关的消息,客户端会发送一个PINGREQ消息,相应的,服务器会返回一个PINGRESP消息进行确认。 如果服务器在一个半(1.5)个心跳间隔时间周期内没有收到来自客户端的消息,就会断开与客户端的连接。心跳间隔时间最大值可以设置为18个小时,0表示客户端不会断开
报文内容解析: 帧内容 依据传输内容的不一样,所占字节的长度也不一样。比如会包含用户名,密码的信息,不同的服务器不同的用户肯定不一样。注意:帧内容里的数据主要是字符串,需要符合UTF-8编码规范。 简单说一下帧内容的出现场合
-
连接报文: 10 4a 00 04 4d 51 54 54 04 d6 00 14 00 06 31 32 33 34 35 36 00 0a 74 6f 70 69 63 5f 74 65 73 74 00 19 7b 22 74 65 72 6d 69 6e 61 6c 5f 75 69 64 22 3a 22 31 32 33 34 35 36 22 7d 00 05 61 64 6d 69 6e 00 08 70 61 73 73 77 6f 72 64 连接报文 连接请求第一字节固定是0x10 第二个字节4a 为内容长度0x4a 十进制为74 数据长度 -
可变头”:协议名(UTF-8编码)+协议版本1字节+连接的标识符1字节+心跳包时间2字节 协议名: 00 04 4d 51 54 54 “MQTT”固定值 协议版本:04 一字节 连接标识符:d6 一字节 二进制为 1101 0110 (有用户名,有密码,客户端掉线后服务器清空客户端的信息) 心跳包时间: 00 14 十进制为 20 s -
有效内容: “用户ID” + “临终消息主题” + “临终消息” + “用户名” + “密码” 00 06 31 32 33 34 35 36 00 0a 74 6f 70 69 63 5f 74 65 73 74 00 19 7b 22 74 65 72 6d 69 6e 61 6c 5f 75 69 64 22 3a 22 31 32 33 34 35 36 22 7d 00 05 61 64 6d 69 6e 00 08 70 61 73 73 77 6f 72 64 用户ID 必须保持唯一,在一个服务器上的所有设备,每个设备的ID都不一样;应该从服务器那里获取用户名和密码;“临终消息主题”和“临终消息”如果可变头里面的 连接标识符没有允许,就不要添加了。 2字节长度: 00 06 用户名: 31 32 33 34 35 36 “123456” 2字节长度: 00 0a 临终主题: 74 6f 70 69 63 5f 74 65 73 74 “topic_test” 2字节长度: 00 19 临终主题: 7b 22 74 65 72 6d 69 6e 61 6c 5f 75 69 64 22 3a 22 31 32 33 34 35 36 22 7d “{“terminal_uid”:“123456”}” 2字节长度: 00 05 用户名: 61 64 6d 69 6e “admin” 2字节长度: 00 08 用户名: 70 61 73 73 77 6f 72 64 “password” -
连接确认报文 20 02 00 00 发送成功之后,服务器会返回(16进制格式) 20 02 00 00 如果接收到上述返回的消息,证明已经成功和服务器建立了可靠连接。分析一下来自服务器的消息含义:第一个字节 20 可以从固定头那一节查到,是服务器发送的数据,叫做“连接确认”;第二个字节02表示后面还有两个数据;后面的两个数据 00 00 ,表示两个有效数据。后面发送两个数据的一层意思是,验证从机连接的协议是否正确,即从机接收到02这个数据后应该判断是否真的接收到了两个数据,如果不是,那证明通信时有问题的。 -
_按时发送心跳包,和服务器保持联系 其实这里的心跳包,是通过发送 “Ping 请求” 给服务器来实现的。在连接服务器的时候,我们设置了心跳包时间为60S,那么我们需要每60S之内就和服务器“Ping 请求”一次,证明网络链接是可靠的,如果接收不到服务器的返回,那么可能是我们的网络掉线了。 发送: C0 00 接收: D0 00 _ -
客户端发送订阅请求: 82 0f 00 01 00 0a 74 6f 70 69 63 5f 74 65 73 74 02 第一字节:0x82 第二字节:0x0f 数据长度 15 -
服务端发送订阅确定: 90 03 00 01 02 第一字节:0x90 第二字节:0x03 数据长度 3 -
服务端发送消息: 34 1d 00 0a 74 6f 70 69 63 5f 74 65 73 74 00 02 e4 bd a0 e5 a5 bd e5 91 80 e3 80 82 e3 80 82第一字节:0x34 发布消息的固定报文类型是3 DUP是重发标志,如果DUP标志被设置为0,表示这是客户端或服务端第一次请求发送这个PUBLISH报文。如果DUP标志被设置为1,表示这可能是一个早前报文请求的重发。 Qos是服务质量等级,有三种状态。 第二字节:0x1d 数据长度 29 第三四字节:00 0a 主题内容长度长度 10 74 6f 70 69 63 5f 74 65 73 74 为主题 “topic_test” -
服务端发送订阅确定: 34 1d 00 0a 74 6f 70 69 63 5f 74 65 73 74 00 02 e4 bd a0 e5 a5 bd e5 91 80 e3 80 82 e3 80 82 第一字节0x34 可以看出 Qos消息质量为 Qos2 00 02 消息标识,消息id 当Qos 为1或者2是每个消息都有一个id,因为需要消息确认 e4 bd a0 e5 a5 bd e5 91 80 e3 80 82 e3 80 82 为消息内容 “你好呀。。” -
qos0:最多就发送一次,你别告诉我你收没收到,你找到订阅这个主题的你就推就行。 qos1:至少发送一次,发送完你告诉我你收没收到(PUBACK),如果你不告诉我,我就一直发。 qos2:确保一次送达,我给你发(PUBLISH),你给我回一个你收到了(PUBREC),我再给你发一个你确定你收到了吗(PUBREL),你再给我回一个收到了(PUBCOMP -
PUBACK报文是对QoS 1等级的PUBLISH报文的响应。 1.固定头 没啥说的,类型4. 所以返回头是 0x4…开头 2.可变头 可变头就俩个字节。就是报文标识符。 因为我使用的是Qos2 所以没有收到发布确认 -
发布收到 PUBREC报文是对QoS等级2的PUBLISH报文的响应。它是QoS 2等级协议交换的第二个报文。 1.固定头 没啥说的,类型5. 2.可变头 可变头就俩个字节。就是报文标识符。 50 02 00 11 50固定头,02为长度,00 11 为消息id -
发布释放 PUBREL报文是对PUBREC报文的响应。它是QoS 2等级协议交换的第三个报文。 1.固定头 没啥说的,类型6. 2.可变头 可变头就俩个字节。就是报文标识符。 -
发布完成 PUBCOMP报文是对PUBREL报文的响应。它是QoS 2等级协议交换的第四个也是最后一个报文。 1.固定头 没啥说的,类型7. 2.可变头 可变头就俩个字节。就是报文标识符。 70 02 00 11
MQTT数据安全
MQTT的安全: 由于MQTT运行于TCP层之上并以明文方式传输,这就相当于HTTP的明文传输,使用Wireshark可以完全看到MQTT发送的所有消息,消息指令一览无遗,如图1所示。 这样可能会产生以下风险: 设备可能会被盗用; 客户端和服务端的静态数据可能是可访问的(可能会被修改); 协议行为可能有副作用(如计时器攻击); 拒绝服务攻击; 通信可能会被拦截、修改、重定向或者泄露; 虚假控制报文注入。
- 安全功能可以从三个层次来考虑——应用层、传输层、网络层
(1)应用层:在应用层上,MQTT提供了客户标识(Client Identifier)以及用户名和密码,可以在应用层验证设备。 (2)传输层:类似于HTTPS,MQTT基于TCP连接,也可以加上一层TLS,传输层使用TLS加密是确保安全的一个好手段,可以防止中间人攻击。客户端证书不但可以作为设备的身份凭证,还可以用来验证设备。 (3)网络层:如果有条件的话,可以通过拉专线或者使用VPN来连接设备与MQTT代理,以提高网络传输的安全性。 - MQTT支持两种层次的认证:
应用层:MQTT支持客户标识、用户名和密码认证; 传输层:传输层可以使用TLS,除了加密通讯,还可以使用X509证书来认证设备。 客户标识 MQTT客户端可以发送最多65535个字符作为客户标识(Client Identifier),一般来说可以使用嵌入式芯片的MAC地址或者芯片序列号。虽然使用客户标识来认证可能不可靠,但是在某些封闭环境或许已经足够了。 用户名和密码 MQTT协议支持通过CONNECT消息的username和password字段发送用户名和密码。 用户名及密码的认证使用起来非常方便,不过由于它们是以明文形式传输,所以使用抓包工具就可以轻易的获取。 数据加密 RSA MD5等 - 在传输层认证
在传输层认证是这样的:MQTT代理在TLS握手成功之后可以继续发送客户端的X509证书来认证设备,如果设备不合法便可以中断连接。使用X509认证的好处是,在传输层就可以验证设备的合法性,在发送CONNECT消息之前便可以阻隔非法设备的连接,以节省后续不必要的资源浪费。而且,MQTT协议运行在使用TLS时,除了提供身份认证,还可以确保消息的完整性和保密性。
如有不对,不足之处请帮忙指出,谢谢
|