IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> springboot集成mqtt,及本地搭建emqX服务测试案例 -> 正文阅读

[Java知识库]springboot集成mqtt,及本地搭建emqX服务测试案例

springboot项目集成mqtt

1、引入pom依赖

		<!-- MQTT -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、yml文件配置

# Mqtt配置
mqtt:
 #我是在本地搭建了emqx服务,192.168.2.31就是我本地emqx服务的地址。也可用公共测试不需要搭建emqx服务,将ip改为broker.emqx.io即可。
  serverURIs: tcp://192.168.2.31:1883 
  username: admin #可不填写
  password: public #可不填写
  qos: 1 #等级 有 0 1 2 三种
  clientId: mqttx_d48b 
  topic: topic_ljh  #订阅的主题,多个时可以使用逗号分开 如:topic1,topic2,topic
  enabled: true  #是否打开mqtt服务  
  keepalive: 100 #心跳时间  不需要动
  timeout: 100 # 超时时间秒 不需要动

3、MqttConfig (用来获取yml中配置的参数&启动订阅主题功能)

package com.boc.ljh.utils.mqtt;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @Author: ljh
 * @ClassName MqttConfig
 * @Description TODO
 * @date 2022/7/14 18:09
 * @Version 1.0
 */
@Configuration
public class MqttConfig {

    @Autowired
    private MqttPushClient mqttPushClient;

    @Value("${mqtt.username:{null}}")
    private String username;

    @Value("${mqtt.password:{null}}")
    private String password;

    @Value("${mqtt.serverURIs:{null}}")
    private String hostUrl;

    @Value("${mqtt.clientId:{null}}")
    private String clientId;

    @Value("${mqtt.topic:{null}}")
    private String defaultTopic;

    @Value("${mqtt.qos:{null}}")
    private int qos;

    @Value("${mqtt.enabled:{null}}")
    private boolean enabled;

    @Value("${mqtt.keepalive:{null}}")
    private int keepalive;

    @Value("${mqtt.timeout:{null}}")
    private int timeout;

    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = defaultTopic.split(",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接
            for(int i=0; i<mqtt_topic.length; i++){
                mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
            }
        }
        return mqttPushClient;
    }



}

4、MqttPushClient(发布&订阅主题的详细方法,执行发布&订阅操作时调用即可)

package com.boc.ljh.utils.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/**
 * @Author: ljh
 * @ClassName MqttPushClient
 * @Description TODO
 * @date 2022/7/15 15:15
 * @Version 1.0
 */
@Component
public class MqttPushClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public boolean publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            return true;
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
            return false;
        } catch (MqttException e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("开始订阅主题" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

5、PushCallback(主要打印接收数据和响应的结果)

package com.boc.ljh.utils.mqtt;


import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author: ljh
 * @ClassName PushCallback
 * @Description TODO
 * @date 2022/7/15 15:16
 * @Version 1.0
 */
@Component
public class PushCallback implements MqttCallback{
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        logger.info("连接断开,可以做重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));

        _topic = topic;
        _qos = mqttMessage.getQos()+"";
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    //别的Controller层会调用这个方法来  获取  接收到的硬件数据
    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }
}

本地搭建EMQX服务

1、windows搭建

使用下面链接下载emqx

链接:https://pan.baidu.com/s/1wuJMss0JIZAn6ayaCbOtXQ?pwd=xmcm
提取码:xmcm

下载完成后随便找个地方解压文件,然后进入到bin目录中,打开cmd窗口,输入emqx start即可开启服务。emqx stop关闭服务
在这里插入图片描述
然后访问http://127.0.0.1:18083地址,默认用户名:admin,密码:public
在这里插入图片描述

2、centos7搭建emqx服务

按照顺序输入一下命令即可:
1、wget https://www.emqx.com/zh/downloads/broker/5.0.3/emqx-5.0.3-el7-amd64.tar.gz 下载emqx压缩文件
2、mkdir -p emqx && tar -zxvf emqx-5.0.3-el7-amd64.tar.gz -C emqx 解压文件
3、./emqx/bin/emqx start 启动服务
在这里插入图片描述

4、iptables -A INPUT -p tcp --dport 18083 -j ACCEPT 打开emqx端口

测试

1、下载工具MQTTX, 地址:https://mqttx.app/zh

2、设置中文

在这里插入图片描述

3、连接emqx服务,

在这里插入图片描述

4、发送数据

在这里插入图片描述

5、项目中接收数据

项目里面是订阅了topic_ljh这个主题,当往这个主题发送数据的时候,项目里面就可以接收到数据内容。
在这里插入图片描述
发送到topic_ljh主题
在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-07-21 21:22:54  更:2022-07-21 21:24:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:27:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码