前端时间因为工作需求研究了mqtt客服端(时间有点久远记不太清楚了)。
工作场景是这样的:房间内有警报的硬件,每次按警报时,硬件会发布一次主题,而我们需要接受这个主题信息,并把主题内的相关数据插入到数据库。硬件是第三方的,发布消息由第三方完成,我们只需要订阅消息。
所写demo可自行下载(https://download.csdn.net/download/weixin_44451527/85047878)
解析数据后如下图(根据实际情况进行相对应的解析)
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,一个消息中间件。
mqtt分为客户端;服务端。 客户端包括订阅者和发布者; 服务端是指mqtt服务器用来处理发布者发布的消息以及下发给订阅者。
mqtt的工作原理: 发布者发布主题会传到服务器,由服务器下发到订阅该主题的订阅者。并非发布者和订阅者直接对接。
mqtt的介绍可以看这篇文章:https://blog.csdn.net/weixin_40129263/article/details/80983858 介绍的已经很想详细清楚了。
因为只用到了客户端订阅者,服务端以及客户端发布者是由第三方去完成的,所以就讲讲客服端订阅者的使用
以下属性是用来订阅消息的一些相关属性 host:MQTT服务端ip以及端口(服务器的ip,并非发布者的ip) topIc:消息主题,发布了主题,订阅了主题,发布者和订阅者通过主题对应 clientId:客户端唯一标识 qos:服务质量:(接受消息次数,以及接受消息过程中是否会丢失) 订阅主题:通过该方法订阅主题
订阅发布者发布的主题之后,还需要实现MqttCallback 接口类;这样就可以接收到发布者发布的主题了,如何处理接收到的主题信息呢
实现MqttCallback 的 messageArrived方法即可(该方法就是处理消息主题的,有两个参数String topic, MqttMessage message,message就是存储数据的参数,所需要的信息基本都在这里) 相应的,我们的业务也就需要在该方法里处理。就不再过多说明了 。
package com.rzkj.mqtt.mqtt; /**
*
* @Desc:mqtt发布消息的回调类
* @author:hyl
*/
import com.rzkj.mqtt.model.BaseBed;
import com.rzkj.mqtt.model.CallDevice;
import com.rzkj.mqtt.model.SaveBed;
import com.rzkj.mqtt.model.SysDept;
import com.rzkj.mqtt.model.vo.SaveBedVo;
import com.rzkj.mqtt.service.IBaseBedService;
import com.rzkj.mqtt.service.ICallDeviceService;
import com.rzkj.mqtt.model.vo.DataModel;
import com.rzkj.mqtt.service.ISysDeptService;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.ui.Model;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* public void connectionLost(Throwable cause)在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* 由 MqttClient.connect 激活此回调。
*
*/
@Component
public class PushCallback implements MqttCallback {
private static Logger logger= LoggerFactory.getLogger(PushCallback.class);
@Autowired
private ICallDeviceService callDeviceService;
@Autowired
private ISysDeptService sysDeptService;
@Autowired
private IBaseBedService baseBedService;
@Autowired
private static PushCallback pushCallback;
@Autowired
private ClientMQTT clientMQTT;
@PostConstruct
public void init(){
pushCallback = this;
pushCallback.clientMQTT=this.clientMQTT;
pushCallback.callDeviceService=this.callDeviceService;
pushCallback.sysDeptService=this.sysDeptService;
}
public void connectionLost(Throwable cause) {
logger.info("MQTT连接断开了,准备进行重连");
while (true){
logger.info("正在尝试重连");
try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
for(int i=0; i<20; i++){
if ( Thread.currentThread().interrupted() ) {
break;
}
}
Thread.sleep(10000);//10秒
//当设置 options.setAutomaticReconnect(true);不需要写重连方法,设置false需要写重连方法
//pushCallback.clientMQTT.start();
logger.info("MQTT:连接成功");
break;
}catch (Exception e){
e.printStackTrace();
}
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
//业务处理
public void messageArrived(String topic, MqttMessage message) throws Exception {
List<SaveBed> saveBedList=new ArrayList<>();
if (!"close".equals(new String(message.getPayload()))&& !"null".equals(message.getPayload())){
CallDevice callDevice=new CallDevice();
String record=new String(message.getPayload(),"GBK");//字符串信息
System.out.println("接收消息内容 : " + record);
callDevice.setRecord(record);
JSONObject fullData= JSONObject.fromObject(record);//完整json信息
DataModel dataModel=(DataModel)JSONObject.toBean(fullData, DataModel.class);//接受到的完整对象信息
if (dataModel!=null){
//System.out.println("接收消息内容 : " + dataModel);
callDevice.setCmd(dataModel.getCmd());
callDevice.setDeviceId(dataModel.getDeviceId());
callDevice.setDeviceModel(dataModel.getDeviceModel());
callDevice.setOrgId(dataModel.getOrgId());
callDevice.setProducedTime(dataModel.getProducedTime());
callDevice.setSerialNo(dataModel.getSerialNo());
callDevice.setVersion(dataModel.getVersion());
JSONObject data= fullData.getJSONObject("data");//数组信息
boolean bedIsNull = JSONUtils.isNull( data.get("callRoomList"));
if (!bedIsNull){
String callingBedList= data.getJSONArray("callingBedList").toString();//正在呼叫列表
callDevice.setCallingBedList(callingBedList);
}
boolean roomIsNull = JSONUtils.isNull( data.get("callRoomList"));
if (!roomIsNull){
String callRoomList= data.getJSONArray("callRoomList").toString();//门口振铃列表
callDevice.setCallRoomList(callRoomList);
}
boolean toiletIsNull = JSONUtils.isNull( data.get("callToiletList"));
if (!toiletIsNull){
String callToiletList= data.getJSONArray("callToiletList").toString();//卫生间振铃列表
callDevice.setCallToiletList(callToiletList);
}
boolean saveBedIsNull = JSONUtils.isNull( data.get("saveBedList"));
// if (!saveBedIsNull){
// String saveBed= data.getJSONArray("saveBedList").toString();//床头存储列表
// List<SaveBedVo> saveBedVoList= (List<SaveBedVo>)JSONArray.toList(JSONArray.fromObject(saveBed), SaveBedVo.class);//床头/卫生间信息
// for (SaveBedVo saveBedVo:saveBedVoList){
// List<SysDept>sysDeptList= pushCallback.sysDeptService.selectChildrenDeptById(Long.parseLong(saveBedVo.getDeptCode())); //机构下的部门信息
// List<BaseBed> baseBedList=pushCallback.baseBedService.selectBaseBedsList(Long.parseLong(saveBedVo.getBedName()));//对应床 位的部门信息
// if (!CollectionUtils.isEmpty(sysDeptList)&&!CollectionUtils.isEmpty(baseBedList)){
// //交集对象
// List<BaseBed> bedList = baseBedList.stream().filter(obj -> find(String.valueOf(obj.getDeptId()), sysDeptList)).collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(bedList)){
// SaveBed model=new SaveBed();
// saveBedList.add(model);
// }
//
// }
// }
// JSONArray jsonArray = JSONArray.fromObject(saveBedList);
//
// callDevice.setSaveBedList(jsonArray.toString());//老人ID进行json化
//
// System.out.println("床位信息:"+saveBedVoList);
// }
//pushCallback.callDeviceService.insertCallDevice(callDevice);
}
}
}
}
发布者和订阅者原理相差不多,只不过一个发,一个接。 其实订阅者订阅消息很好处理,就不过多说什么了。写的比较粗糙,欢迎大家留言交流。
|