IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> java-MQTT的使用原理讲解 -> 正文阅读

[网络协议]java-MQTT的使用原理讲解

前端时间因为工作需求研究了mqtt客服端(时间有点久远记不太清楚了)。

工作场景是这样的:房间内有警报的硬件,每次按警报时,硬件会发布一次主题,而我们需要接受这个主题信息,并把主题内的相关数据插入到数据库。硬件是第三方的,发布消息由第三方完成,我们只需要订阅消息。

所写demo可自行下载(https://download.csdn.net/download/weixin_44451527/85047878

解析数据后如下图(根据实际情况进行相对应的解析)在这里插入图片描述

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,一个消息中间件。

mqtt分为客户端;服务端。
客户端包括订阅者和发布者; 服务端是指mqtt服务器用来处理发布者发布的消息以及下发给订阅者。

mqtt的工作原理:
发布者发布主题会传到服务器,由服务器下发到订阅该主题的订阅者。并非发布者和订阅者直接对接。

mqtt的介绍可以看这篇文章:https://blog.csdn.net/weixin_40129263/article/details/80983858
介绍的已经很想详细清楚了。

因为只用到了客户端订阅者,服务端以及客户端发布者是由第三方去完成的,所以就讲讲客服端订阅者的使用

以下属性是用来订阅消息的一些相关属性
host:MQTT服务端ip以及端口(服务器的ip,并非发布者的ip)
topIc:消息主题,发布了主题,订阅了主题,发布者和订阅者通过主题对应
clientId:客户端唯一标识
qos:服务质量:(接受消息次数,以及接受消息过程中是否会丢失)
在这里插入图片描述
订阅主题:通过该方法订阅主题
在这里插入图片描述

订阅发布者发布的主题之后,还需要实现MqttCallback 接口类;这样就可以接收到发布者发布的主题了,如何处理接收到的主题信息呢

实现MqttCallback 的 messageArrived方法即可(该方法就是处理消息主题的,有两个参数String topic, MqttMessage message,message就是存储数据的参数,所需要的信息基本都在这里)
相应的,我们的业务也就需要在该方法里处理。就不再过多说明了 。

package com.rzkj.mqtt.mqtt; /**
 *
 * @Desc:mqtt发布消息的回调类
 * @author:hyl
 */

import com.rzkj.mqtt.model.BaseBed;
import com.rzkj.mqtt.model.CallDevice;
import com.rzkj.mqtt.model.SaveBed;
import com.rzkj.mqtt.model.SysDept;
import com.rzkj.mqtt.model.vo.SaveBedVo;
import com.rzkj.mqtt.service.IBaseBedService;
import com.rzkj.mqtt.service.ICallDeviceService;
import com.rzkj.mqtt.model.vo.DataModel;
import com.rzkj.mqtt.service.ISysDeptService;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import net.sf.json.util.JSONUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.ui.Model;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 *  public void connectionLost(Throwable cause)在断开连接时调用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 *  由 MqttClient.connect 激活此回调。
 *
 */

@Component                
public class PushCallback implements MqttCallback {
private static Logger logger= LoggerFactory.getLogger(PushCallback.class);

@Autowired
private ICallDeviceService callDeviceService;
@Autowired
private ISysDeptService sysDeptService;
@Autowired
private IBaseBedService baseBedService;
@Autowired
private static PushCallback pushCallback;
@Autowired
private ClientMQTT clientMQTT;

@PostConstruct
public void init(){
    pushCallback = this;
    pushCallback.clientMQTT=this.clientMQTT;
    pushCallback.callDeviceService=this.callDeviceService;
    pushCallback.sysDeptService=this.sysDeptService;
}


public void connectionLost(Throwable cause) {
    logger.info("MQTT连接断开了,准备进行重连");
    while (true){
        logger.info("正在尝试重连");
        try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
            for(int i=0; i<20; i++){
                 if ( Thread.currentThread().interrupted() ) {
                     break;
                    }
            }

            Thread.sleep(10000);//10秒
            //当设置 options.setAutomaticReconnect(true);不需要写重连方法,设置false需要写重连方法
            //pushCallback.clientMQTT.start();
            logger.info("MQTT:连接成功");
            break;
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("deliveryComplete---------" + token.isComplete());
}





//业务处理
public void messageArrived(String topic, MqttMessage message) throws Exception {
     List<SaveBed> saveBedList=new ArrayList<>();

    if (!"close".equals(new String(message.getPayload()))&& !"null".equals(message.getPayload())){

        CallDevice callDevice=new CallDevice();
        String record=new String(message.getPayload(),"GBK");//字符串信息
        System.out.println("接收消息内容 : " + record);
        callDevice.setRecord(record);
        JSONObject fullData= JSONObject.fromObject(record);//完整json信息
        DataModel dataModel=(DataModel)JSONObject.toBean(fullData, DataModel.class);//接受到的完整对象信息
        if (dataModel!=null){
            //System.out.println("接收消息内容 : " + dataModel);
            callDevice.setCmd(dataModel.getCmd());
            callDevice.setDeviceId(dataModel.getDeviceId());
            callDevice.setDeviceModel(dataModel.getDeviceModel());
            callDevice.setOrgId(dataModel.getOrgId());
            callDevice.setProducedTime(dataModel.getProducedTime());
            callDevice.setSerialNo(dataModel.getSerialNo());
            callDevice.setVersion(dataModel.getVersion());
            JSONObject data= fullData.getJSONObject("data");//数组信息
            
            boolean bedIsNull = JSONUtils.isNull( data.get("callRoomList"));
            if (!bedIsNull){
                String callingBedList= data.getJSONArray("callingBedList").toString();//正在呼叫列表
                callDevice.setCallingBedList(callingBedList);
            }

            boolean roomIsNull = JSONUtils.isNull( data.get("callRoomList"));
            if (!roomIsNull){
                String callRoomList= data.getJSONArray("callRoomList").toString();//门口振铃列表
                callDevice.setCallRoomList(callRoomList);
            }

            boolean toiletIsNull = JSONUtils.isNull( data.get("callToiletList"));
            if (!toiletIsNull){
                String callToiletList= data.getJSONArray("callToiletList").toString();//卫生间振铃列表
                callDevice.setCallToiletList(callToiletList);
            }

            boolean saveBedIsNull = JSONUtils.isNull( data.get("saveBedList"));
//                if (!saveBedIsNull){
//                    String saveBed= data.getJSONArray("saveBedList").toString();//床头存储列表
//                    List<SaveBedVo> saveBedVoList=	(List<SaveBedVo>)JSONArray.toList(JSONArray.fromObject(saveBed), SaveBedVo.class);//床头/卫生间信息
//                    for (SaveBedVo saveBedVo:saveBedVoList){
//                       List<SysDept>sysDeptList= 	pushCallback.sysDeptService.selectChildrenDeptById(Long.parseLong(saveBedVo.getDeptCode())); //机构下的部门信息
//                       List<BaseBed> 	baseBedList=pushCallback.baseBedService.selectBaseBedsList(Long.parseLong(saveBedVo.getBedName()));//对应床	位的部门信息
//                       if (!CollectionUtils.isEmpty(sysDeptList)&&!CollectionUtils.isEmpty(baseBedList)){
//                           //交集对象
//                           List<BaseBed> bedList = baseBedList.stream().filter(obj -> find(String.valueOf(obj.getDeptId()), 	sysDeptList)).collect(Collectors.toList());
//                           if (!CollectionUtils.isEmpty(bedList)){
//                               SaveBed model=new SaveBed();
//                               saveBedList.add(model);
//                           }
//
//                       }
//                    }
//                    JSONArray jsonArray = JSONArray.fromObject(saveBedList);
//
//                    callDevice.setSaveBedList(jsonArray.toString());//老人ID进行json化
//
//                    System.out.println("床位信息:"+saveBedVoList);
//                }
          	  //pushCallback.callDeviceService.insertCallDevice(callDevice);
        }
    }

}

}

发布者和订阅者原理相差不多,只不过一个发,一个接。
其实订阅者订阅消息很好处理,就不过多说什么了。写的比较粗糙,欢迎大家留言交流。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-03-30 19:06:09  更:2022-03-30 19:07:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 2:58:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码