一、简介
? ? MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。MQTT适用于移动端场景,侧重多语言多平台的海量设备接入,如遥感数据、汽车、智能家居等。
二、概念
- QoS(服务质量)
? 用于保证消息稳定传输的机制,包括消息应答、存储和重传。QoS0:至多一次,存在消息漏收的情况;QoS1:至少一次,有应答机制;QoS2:确保只有一次,开销大。 ? QoS降级:实际的订阅QoS = Min(Publisher发布消息指定的QoS,Subscriber订阅与Broker协商的QoS)。即:实际的订阅QoS是消息过程中小的那个QoS值。
三、SpringBoot+Maven实现发布、订阅
- pom中添加依赖
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
- yml中添加配置
spring:
mqtt:
username: username
password: password
url: tcp://ip:port
clientId: mqttclient
topic: topic1
completionTimeout: 2000
- Java实现
?为使结构清晰,分为以下五部分。其中,前三个为配置类。 ?MqttBaseConfig:配置Mqtt公用部分,主要是提供Mqtt客户端Bean。 ?MqttInConfig:配置入站消息通道及相关处理Bean。 ?MqttOutConfig:配置出站消息通道及相关处理Bean。 ?MqttMessageReceiver:入站消息处理。 ?MqttMessageSender:出站消息发送接口。
四、MQTT代理
?市面上有相当多的高质量MQTT代理,其中mosquitto是一个开源的轻量级的C实现,完全兼容了MQTT 3.1和MQTT 3.1.1。
订阅命令:
mosquitto_sub -d -t 'topic1'
发布命令:
mosquitto_pub -d -t 'topic1' -m 'test123'
五、开发遇到的问题
- 提示已断开连接; retrying…
?断开的原因是clientId相同,可以拼上当前时间戳确保clientId不同。发布端可以固定clientId,消费端需确保clientId不同,否则会出现接受不到消息的情况。 - 发布消息后,订阅服务器漏收
?可能与Qos属性未设置有关。添加handler.setDefaultQos(1);可解决。
|