如果问题欢迎大佬指正!!!
导包
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
配置MQTT连接信息
mqtt:
addr: tcp://localhost:1883
clientId: client_1
username: admin
password: public
keepAlive: 60
timeout: 60
Mqtt消息回调类
@Component
public class PushCallBack implements MqttCallback {
@Autowired
private MqttConfig config;
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接断开");
MqttPushClient client = config.getMqttPushClient();
if(client != null){
client.connect(config.getAddress(),config.getClientId(),config.getUsername(),config.getPassword(),config.getTimeout(),config.getKeepalive());
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("收到消息:"+topic+ " "+new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("消息发送完成");
}
}
Mqtt发送消息类
@Component
@Data
public class MqttPushClient {
@Autowired
private PushCallBack pushCallBack;
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
public static MqttClient getClient() {
return client;
}
public void connect(String host,String clientId,String username,String password,int timeout,int keepalive){
MqttClient client;
try{
client = new MqttClient(host,clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(false);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
client.setCallback(pushCallBack);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
System.out.println(complete?"连接成功!":"连接失败!");
}catch (Exception e){
e.printStackTrace();
}
}
public void push(String topic,String msg,int qos){
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(qos);
message.setRetained(false);
MqttTopic mqttTopic = client.getTopic(topic);
if (mqttTopic == null){
System.out.println("主题不存在!");
}else {
try {
client.publish(topic,message);
} catch (MqttException e) {
e.printStackTrace();
System.out.println("消息发送失败!");
}
}
}
}
初始化MqttClient
@Configuration
@Data
public class MqttConfig {
@Value("${mqtt.addr}")
private String address;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.timeout}")
private Integer timeout;
@Value("${mqtt.keepalive}")
private Integer keepalive;
@Autowired
private MqttPushClient mqttPushClient;
@PostConstruct
public void mqttInit(){
mqttPushClient.connect(address,clientId,username,password,timeout,keepalive);
}
}
测试
@RequestMapping("/mqtt")
@RestController
public class MqttController {
@Autowired
private MqttPushClient client;
@RequestMapping("/send/{msg}")
public String send(@PathVariable("msg") String msg) throws MqttException {
client.push("wsl",msg,1);
return "SUCESS";
}
@RequestMapping("/sub/{topic}")
public String subscribe(@PathVariable("topic") String topic) throws MqttException {
MqttPushClient.getClient().subscribe(topic,1);
return "SUCESS";
}
}
|