1、环境准备
1、本地已搭建MQTT服务 参考 windows环境搭建MQTT_音乐土豆-CSDN博客
2、新建Springboot项目
2、pom文件引入依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
3、application.properties文件新增mqtt配置
server.port=8080
spring.mqtt.username=admin
spring.mqtt.password=password
spring.mqtt.url=tcp://127.0.0.1:61613
spring.mqtt.client.id=mqttId
spring.mqtt.default.topic=test
mqtt.connection.timeout=20
mqtt.keep.alive.interval=20
4、编写MqttClientUtil
@Component
@Slf4j
public class MqttClientUtil {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String host;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String topic;
@Value("${mqtt.connection.timeout}")
private int timeOut;
@Value("${mqtt.keep.alive.interval}")
private int interval;
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
@PostConstruct
private void init(){
connect(host, clientId);
}
private void connect(String host,String clientId){
try{
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
mqttConnectOptions = getMqttConnectOptions();
mqttClient.connect(mqttConnectOptions);
}catch (Exception e){
log.error("mqtt服务链接异常!");
e.printStackTrace();
}
}
private MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{host});
mqttConnectOptions.setKeepAliveInterval(interval);
mqttConnectOptions.setConnectionTimeout(timeOut);
mqttConnectOptions.setCleanSession(true);
return mqttConnectOptions;
}
private boolean isConnect(){
if(Objects.isNull(this.mqttClient)){
return false;
}
return mqttClient.isConnected();
}
private void reConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已重新链接...");
this.mqttClient.connect(this.mqttConnectOptions);
}
}
private void closeConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已断开链接...");
this.mqttClient.disconnect();
}
}
public void sendMessage(String topic,String message,int qos) throws Exception {
if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
MqttTopic mqttTopic = mqttClient.getTopic(topic);
if(Objects.nonNull(mqttTopic)){
try{
MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
if(publish.isComplete()){
log.info("消息发送成功---->{}",message);
}
}catch(Exception e){
log.error("消息发送异常",e);
}
}
}else{
reConnect();
}
}
}
5、编写controller
@Slf4j
@RestController
public class TestController {
@Autowired
private MqttClientUtil mqttClientUtil;
@RequestMapping("/send")
public String sendMessage(){
String topic = "test";
String message = "测试 mqtt 消息发布";
int qos = 2;
try{
mqttClientUtil.sendMessage(topic,message,qos);
}catch(Exception e){
log.error("mqtt 消息发送异常",e);
}
return "mqtt 消息已发送!";
}
}
6、测试
http://localhost:8080/send
也可以打开Mqtt box 订阅topic为test 可以看到,消息已经成功发出!
|