IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot集成MQTT---搭建以及使用 -> 正文阅读

[Java知识库]SpringBoot集成MQTT---搭建以及使用

什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)作为一款发布(pub)/订阅(sub)模式的"轻量级"通讯协议,凭借其轻量、简单、开放和易于实现等优点,在物联网领域得到了广泛应用。

开始搭建MQTT

在这里我们是基于CentOS7 来进行搭建的
在这里插入图片描述![在这里插入图片描述](https://img-blog.csdnimg.cn/512f0f6bd7b04997882adad0a96eef0f.pn

# 1. 下载
wget https://www.emqx.com/en/downloads/broker/5.0.6/emqx-5.0.6-el7-arm64.rpm
# 2. 安装
sudo yum install emqx-5.0.6-el7-arm64.rpm
# 3. 运行
sudo emqx start

下载

在这里插入图片描述

安装

在这里插入图片描述

选择y在这里插入图片描述

开始

在这里插入图片描述
这里要记住,我们如果是自己的虚拟机时我们要在防火墙中开启-- 1883,8083,8084,8883,18083这几个端口,同样是要云服务取就要配置这几个端口

安装完成后就是可以访问 服务器IP地址+18083 这个地址就好可以看到这样的
默认的账号:admin 密码:public

在这里插入图片描述在这里插入图片描述
在客户端中是能够看到我们已经链接到的客户端的
在这里插入图片描述

做测试的话我们可以下载一个工具 MQTTX

在这里插入图片描述
在这里插入图片描述
具体使用

MQTT 整合SpringBoot

所需依赖

 <!-- MQTT -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

MqttConfig.java 用于获取客户端配置

import com.hai.cardu.vo.MqttClient;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
 
import java.util.List;
 
/**
 * Mqtt配置类
 *
 * @author hai
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要创建的MQTT客户端
     */
    List<MqttClient> clientList;
}

MqttClient.java 用户存储客户端的参数

import lombok.Data;
/**
 * MQTT客户端
 *
 * @author hai
 */
@Data
public class MqttClient {
    /**
     * 客户端ID
     */
    private String clientId;
    /**
     * 监听主题
     */
    private String subscribeTopic;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
}

MqttClientCreate.java 用于创建客户端

package com.hai.cardu.myMqtt;
 

import com.hai.cardu.config.MqttConfig;
import com.hai.cardu.vo.MqttClientVO;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
 
/**
 * MQTT客户端创建
 *
 * @author hai
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager MqttClientManager;
    @Autowired
    private MqttConfig mqttConfig;
 
    /**
     * 创建MQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {
        List<MqttClientVO> mqttClientList = mqttConfig.getClientList();
 
        for (MqttClientVO mqttClient : mqttClientList) {
            log.info("{}",mqttClient);
            //创建客户端,客户端ID:demo,回调类跟客户端ID一致
            MqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}

MqttClientManager.java 管理客户端操作,进行创建客户端,让客户端订阅主题接收消息

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 *
 * @author hai
 */
@Slf4j
@Component
public class MqttClientManager {
    @Value("${customer.mqtt.broker}")
    private String mqttBroker;
    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存储MQTT客户端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public static MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 创建mqtt客户端
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     * @param userName       用户名,可为空
     * @param password       密码,可为空
     * @return mqtt客户端
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !"".equals(userName)) {
                connOpts.setUserName(userName);
            }

            if (null != password && !"".equals(password)) {
                connOpts.setPassword(password.toCharArray());
            }

            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }

                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }

            //连接mqtt服务端broker
            client.connect(connOpts);

            // 订阅主题
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                if (subscribeTopic.contains("-"))
                    client.subscribe(subscribeTopic.split("-"));
                else
                    client.subscribe(subscribeTopic);
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}

MqttCallBackContext.java 客户端订阅消息环境配置,以那种方式回调

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * MQTT订阅回调环境类
 *
 * @author wangfenglei
 */
@Component
@Slf4j
public class MqttCallBackContext {
    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();
 
    /**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.clear();
        callBackMap.forEach((k, v) -> this.callBackMap.put(k, v));
    }
 
    /**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}

AbsMqttCallBack.java 接收消息的回调处理抽象类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
 
/**
 * MQTT回调抽象类
 *
 * @author hai
 */
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {
    private String clientId;

    private MqttConnectOptions connectOptions;
 
    public String getClientId() {
        return clientId;
    }
 
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
 
    public MqttConnectOptions getConnectOptions() {
        return connectOptions;
    }
 
    public void setConnectOptions(MqttConnectOptions connectOptions) {
        this.connectOptions = connectOptions;
    }
 
    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != connectOptions) {
                    MqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                } else {
                    MqttClientManager.getMqttClientById(clientId).connect();
                }
            }
 
        } catch (Exception e) {
            log.error("{} reconnect failed!", e);
        }
    }
 
    /**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload());
     	handleReceiveMessage(topic, content);
    }
 
    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }
 
 
    /**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}

DefaultMqttCallBack.java 消息回调处理类

import com.google.gson.Gson;
import com.hai.cardu.vo.VisitorsRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * 默认回调
 *
 * @author wangfenglei
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    /**
     *
     * @param topic   主题
     * @param message 消息内容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("接收到消息---{}", message);
       // 你自己的消息处理业务

    }

MqttClientUtil.java 发送消息工具类

import com.hai.cardu.myMqtt.MqttClientManager;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
 
/**
 * MQTT客户端工具类
 *
 * @author wangfenglei
 */
@Slf4j
public class MqttClientUtil {

    /**
     * 发送消息
     * @param clientId 客户端id
     * @param topic 主题
     * @param content 消息内容
     */
    public static void sendMqttMsg(String clientId, String topic, String content) {
        try {
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(2);
            MqttClient mqttClient = MqttClientManager.getMqttClientById(clientId);
 
            if (null == mqttClient) {
                log.error("Not exist mqttClient where it's clientId is {}", clientId);
                return;
            }
 
            MqttClientManager.getMqttClientById(clientId).publish(topic, message);
            log.info("Publish to mqtt broker,message={}", message);
        } catch (MqttException e) {
            log.error("MqttClient send msg faild!",e);
        }
    }
}

application.yml

server:
    port: 8080
# 下面这里要看你自己的需求
customer:
  mqtt:
    broker: tcp://服务器IP:端口号
    clientList:
      #客户端ID
      - clientId: nxys_service
        #监听主题 同时订阅多个主题使用 - 分割开
        subscribeTopic: mqtt/printer-mqtt/face/JWJA220308396/Rec
        #用户名
        username: admin
        #密码
        password: admin111

这里我们配置完后就可以通过使用MqttClientUtil 类中的sendMqttMsg方法去发送消息测试了

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 00:56:31  更:2022-09-04 01:00:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:08:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码