方法一:使用spring-integration-mqtt
与springboot集成度更高,灵活程度不如org.eclipse.paho.client.mqttv3
<!--mqtt 相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
?application.yml
spring:
mqtt:
enable: true
url: tcp://127.0.0.1:1883
username:
password:
#MQTT-连接服务器默认客户端ID
clientid: clientid123423
#连接超时,单位ms
timeout: 5000
#1.5*Keep Alive 的时间间隔心跳,单位为秒
keepalive: 2
# deviceId
#deviceId: your_deviceId
# mqtt-topic
producertopic: /master/info
consumertopic: /+/sys/motor, /+/sys/media, /+/sys/general, /+/sys/rc, /+/sys/rtk, /+/mission, /+/power, /+/v3/sys/general
?MqttConfig.class
package org.young.common.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@IntegrationComponentScan
@ConditionalOnProperty(value = "spring.mqtt.enable", havingValue = "true")
public class MqttConfig {
private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.clientid}")
private String clientId;
@Value("${spring.mqtt.producertopic}")
private String producerTopic;
@Value("${spring.mqtt.consumertopic}")
private String[] consumerTopic;
@Value("${spring.mqtt.timeout}")
private int timeout; //连接超时
@Value("${spring.mqtt.keepalive}")
private int keepalive; //连接超时
//入站通道名(消费者)订阅的bean名称
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
//出站通道名(生产者)发布的bean名称
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
/**
* MQTT连接器选项
*
* @return {@link MqttConnectOptions}
*/
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(keepalive);
return mqttConnectOptions;
}
/**
* MQTT客户端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/*******************************生产者*******************************************/
/**
* MQTT信息通道(生产者)
*
* @return {@link MessageChannel}
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(producerTopic);
return messageHandler;
}
/*******************************消费者*******************************************/
/**
* MQTT信息通道(消费者)
*
* @return {@link MessageChannel}
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), consumerTopic);
adapter.setCompletionTimeout(timeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*
* @return {@link MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
//方法1
// return new MessageHandler() {
// @Override
// public void handleMessage(Message<?> message) throws MessagingException {
// String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
// String msg = message.getPayload().toString();
// logger.info("接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg);
// }
// };
//方法2
return new MqttConsumer();
}
//如果我要配置多个client,只要配置多个通道即可
//通道2
// @Bean
// public MessageChannel mqttInputChannelTwo() {
// return new DirectChannel();
// }
// //配置client2,监听的topic:hell2,hello3
// @Bean
// public MessageProducer inbound1() {
// MqttPahoMessageDrivenChannelAdapter adapter =
// new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
// "hello2","hello3");
// adapter.setCompletionTimeout(timeout);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setQos(1);
// adapter.setOutputChannel(mqttInputChannelTwo());
// return adapter;
// }
//
// //通过通道2获取数据
// @Bean
// @ServiceActivator(inputChannel = "mqttInputChannelTwo")
// public MessageHandler handlerTwo() {
// return new MqttConsumer();
// }
}
MqttConsumer.class
package org.young.common.mqtt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnProperty(value = "spring.mqtt.enable", havingValue = "true")
public class MqttConsumer implements MessageHandler {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
String payload = String.valueOf(message.getPayload());
logger.info("接收到 mqtt消息,主题:{} 消息:{}", topic, payload);
}
}
MqttProducer.class
package org.young.common.mqtt;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
@Component
@ConditionalOnProperty(value = "spring.mqtt.enable", havingValue = "true")
public interface MqttProducer {
/**
* payload或者data是发送消息的内容
* topic是消息发送的主题,这里可以自己灵活定义,也可以使用默认的主题,就是配置文件的主题,qos是mqtt 对消息处理的几种机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
* 当然,这三种模式下的性能肯定也不一样,qos=0是最好的,2是最差的
*/
void sendToMqtt(String data);
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
使用的Controller
@PostMapping("/send")
public ResponseEntity send(String topic, String data) {
this.logger.info("开始发送mqtt消息,主题:{},消息:{}", topic, data);
if (StringUtils.isNotBlank(topic)) {
mqttProducer.sendToMqtt(data, topic);
this.logger.info("发送mqtt消息完成,主题:{},消息:{}", topic, data);
return ResponseUtil.buildSuccess("execute successful");
} else {
return ResponseUtil.buildError("topic is blank");
}
}
方法二:使用org.eclipse.paho.client.mqttv3
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
#与spring框架内mqtt实现方式不同在于订阅可实时
mqtt:
enable: false
url: tcp://127.0.0.1:1883
username:
password:
#MQTT-连接服务器默认客户端ID
clientid: clientid123123
#连接超时
timeout: 5000
#心跳时间
keepalive: 2
# mqtt-topic
topic: your_tpoic
MqttClientConfig.class
package org.young.common.mqtt.pahomqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
/**
* 两种实现方式,eclipse.paho可实时订阅,integration-mqtt在初始化阶段即完成订阅
* mqtt消息处理配置
* 属性文件中设置了key的值value:
* ConditionalOnProperty设置了havingValue:value=havingvalue则匹配,若不等则不匹配
* ConditionalOnproperty没有设置havingValue:value不等于false则匹配,若为false,则不匹配
*
* 属性文件中没有设置key:
* ConditionalOnProperty中matchingIfMissing:true匹配,false不匹配
*/
@Configuration
@ConditionalOnProperty(value = "mqtt.enable", havingValue = "true")
@ConfigurationProperties(prefix = "mqtt")
public class MqttClientConfig {
private String url;
private String clientid;
private String username;
private String password;
private String topic;
private int timeout;
private int keepalive;
private MqttClient client;
public MqttClient getClient() {
return this.client;
}
public void setClient(MqttClient client) {
this.client = client;
}
public String getClientid() {
return clientid;
}
public void setClientid(String clientid) {
this.clientid = clientid;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getKeepalive() {
return keepalive;
}
public void setKeepalive(int keepalive) {
this.keepalive = keepalive;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
}
MqttClientHandler.class
package org.young.common.mqtt.pahomqtt;
import lombok.SneakyThrows;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttClientHandler implements MqttCallback {
@SneakyThrows
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}
MqttClientService.class
package org.young.common.mqtt.pahomqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
@ConditionalOnProperty(value = "mqtt.enable", havingValue = "true")
public class MqttClientService {
private static final Logger logger = LoggerFactory.getLogger(MqttClientService.class);
@Autowired
MqttClientConfig clientConfig;
@PostConstruct
public void init(){
this.connect(clientConfig.getUrl(), clientConfig.getClientid(), clientConfig.getUsername(), clientConfig.getPassword(), clientConfig.getTimeout(), clientConfig.getKeepalive());
}
public void connect(String url, String clientID, String username, String password, int timeout, int keepalive){
MqttClient client;
try {
client = new MqttClient(url, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
if (!((username == null) | username.isEmpty())) {
options.setUserName(username);
}
if (!((password == null) | password.isEmpty())) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
clientConfig.setClient(client);
try {
clientConfig.getClient().setCallback(new MqttClientHandler());
clientConfig.getClient().connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void replayConnect () throws InterruptedException {
logger.warn("断开连接,建议重连");
//断开连接,重连
int tryTimes = 1;
while (!clientConfig.getClient().isConnected()) {
Thread.sleep(2000);
logger.info("重试第{}次", tryTimes);
//即使连接上也要先断开再重新连接
// client.disconnect(); //不这样就重连会报错
// 这里不能断开连接啊,断了就有问题
// log.info("重新连接");
this.connect(clientConfig.getUrl(), clientConfig.getClientid(), clientConfig.getUsername(), clientConfig.getPassword(), clientConfig.getTimeout(), clientConfig.getKeepalive());
logger.info("连接完成");
tryTimes++;
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void publish(String topic,String pushMessage){
publish(0, false, topic, pushMessage);
}
/**
* 发布
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = clientConfig.getClient().getTopic(topic);
if(null == mTopic){
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
subscribe(topic,0);
}
/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
try {
clientConfig.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
使用Controller
@PostMapping("/send")
public ResponseEntity send(String topic, String data) {
this.logger.info("开始发送mqtt消息,主题:{},消息:{}", topic, data);
if (StringUtils.isNotBlank(topic)) {
mqttClientService.publish(topic, data);
mqttClientService.subscribe("wrwerwrewer");
this.logger.info("发送mqtt消息完成,主题:{},消息:{}", topic, data);
return ResponseUtil.buildSuccess("execute successful");
} else {
return ResponseUtil.buildError("topic is blank");
}
}
|