项目背景:
实现人离开房间超过一定时间,自动关闭空调联动控制功能。
实现原理:
通过客流密度摄像机监测客流人数变化,发送订阅消息到mqtt 消息服务器,WEB后台服务器订阅mqtt主题,接收客流密度摄像机发送的订阅消息后,处理订阅消息通过后台webSocket 客户端发送关闭空调的主题到webSocket 服务端,webSocket 中间件(C语言开发的)进而控制空调的关闭。
需求分析:
1.通过mqtt 订阅客流密度主题:topic/whg/areapeoplecount/publish/WHG37020001 注: 此主题配置到 Spring properties 配置文件 接收到客流密度设备推送的订阅消息。 消息格式为:{ organCode: ‘’, areaCode: ‘’, deviceCode: ‘’, insidePeopleNum: ‘’, dataTime: ‘’ } 创建po:AutoColseAirPo 缓存到全局 Map 里面,开启线程定时轮训Map 里面设备是否超时,如果超时则执行关闭空调,从map里面删除该条记录;期间如果有人进入则刷新 AutoColseAirPo{ organCode: ‘’, areaCode: ‘’, deviceCode: ‘’, insidePeopleNum: ‘’, dataTime: ‘’,yyyy-MM-dd HH:mm:ss startTime: ‘’, yyyy-MM-dd HH:mm:ss expireTime: 20 } 当收到客流消息判断: 设备类型 type.equals(‘humanDensity’) && insidePeopleNum == 0 && currentDateTime-dataTime>20 时向webSocket 发布订阅消息控制空调关闭 “CONFIG|”+code+“|”+value+“|END”
创建关联表:ibs_linkagectrl_rel id ctrled_device_id(被控设备id) ctrl_device_code(客流密度设备code) 自增主键 303 WHG37020001A0001 CREATE TABLE ibs_linkagectrl_rel ( id INT PRIMARY KEY AUTO_INCREMENT COMMENT ‘自增主键’, ctrled_device_id INT COMMENT ‘受控设备id’, ctrl_device_code VARCHAR(20) COMMENT ‘控制设备code’ )COMMENT=‘设备联动关联表’ INSERT INTO ibs_linkagectrl_rel(ctrled_device_id,ctrl_device_code)VALUES (303,‘WHG37020001A0001’) 根据 客流密度消息返回的ctrl_code 查询出 ctrled_id ,再根据ctrled_id 查询出受控设备,再查询出受控设备的 code 具体sql: SELECT * from ibs_item_parameter where item_id =‘303’ and parameter_type = QTSZ 查询出: code = Channel131.Device0403.T008 关闭空调: value = 0 “CONFIG|Channel131.Device0403.T008|0|END”
注: 空调设备类型:DLJNJ 客流密度设备类型:humanDensity
代码实现:
MqttConfig.java
package com.wzw.config.mq;
import com.wzw.common.util.StringUtil;
import com.wzw.service.IbsAlarmRecordService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@Configuration
@ConditionalOnProperty(name = "spring.mqtt.enabled",havingValue = "true")
public class MqttConfig {
public static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
public static final String PEOPLE_TOPIC="topic/whg/peoplecount/publish";
public static final String ACS_RECORD_TOPIC="topic/whg/acs/record";
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String[] hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("${spring.mqtt.completionTimeout:5000}")
private int completionTimeout;
@Value("${spring.mqtt.keepAliveInterval:30000}")
private int keepAliveInterval;
@Value("${props.linkagectrl.mqtt.humanDensityTopic}")
private String humanDensityTopic;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(hostUrl);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
checkMqttServer();
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public MessageChannel mqttInputChannelForLinkageCtrl(){
return new DirectChannel();
}
private boolean checkMqttServer(){
if(hostUrl==null||hostUrl.length<=0||hostUrl[0].equals("")){
logger.error("mqtthostUrl is null or empty");
return false;
}
String[] temp= hostUrl[0].split(":");
if(temp.length<3) return false;
boolean res = false;
for(int i=0;i<200;i++){
Socket connect = new Socket();
try {
connect.connect(new InetSocketAddress(temp[1].replaceAll("/", ""), Integer.parseInt(temp[2])),100);
res = connect.isConnected();
if(res){
logger.info("connect to artemis success!");
return res;
}else{
logger.error("connect to artemis failed!");
}
} catch (IOException e) {
logger.error("connect to artemis failed! "+e.getMessage());
}finally{
try {
connect.close();
connect=null;
} catch (IOException e) {
logger.error("close connect to artemis failed! "+e.getMessage());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return false;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId+"out_"+ StringUtil.getUUID_8(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultQos(0);
return messageHandler;
}
@Bean
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public MessageProducer mqttInboundForLinkageCtrl(){
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"LinkageCtrl_in_"+StringUtil.getUUID_8(),mqttClientFactory(),humanDensityTopic+"/+");
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannelForLinkageCtrl());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannelForLinkageCtrl")
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public MessageHandler mqttOutLinkageCtrl(){
MqttLinkageCtrlMessageHandler linkageCtrlMessageHandler = new MqttLinkageCtrlMessageHandler();
linkageCtrlMessageHandler.setHumanDensityTopic(this.humanDensityTopic);
return linkageCtrlMessageHandler;
}
public String getBranchMsgTopic() {
return branchMsgTopic;
}
public String getControlMsgTopic() {
return controlMsgTopic;
}
public String getWarnMsgTopic() {
return warnMsgTopic;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String[] getHostUrl() {
return hostUrl;
}
}
application.properties
# 客流密度联动控制空调设备配置
props.linkagectrl.enabled=true
props.linkagectrl.air.expireTime=600000
props.linkagectrl.wsServerUrl=ws://127.0.0.1:8082/webSocket
props.linkagectrl.mqtt.humanDensityTopic=topic/whg/areapeoplecount/publish
MqttLinkageCtrlMessageHandler.java
package com.wzw.config.mq;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.wzw.common.po.AutoCloseAirPo;
import com.wzw.common.po.AutoCloseAirPoFactory;
import com.wzw.service.IbsItemService;
import com.wzw.vo.IbsItemVO;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class MqttLinkageCtrlMessageHandler implements MessageHandler {
private static final String HUMAN_DENSITY_DEVICE_TYPE = "humanDensity";
private static Map<String, Object> cache = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
private String humanDensityTopic;
@Autowired
private IbsItemService ibsItemService;
private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutorService;
@Autowired
private AutoCloseAirPoFactory autoCloseAirPoFactory;
@Autowired
private WebSocketClient webSocketClient;
private volatile boolean isOpen = false;
private Timer timer ;
@PostConstruct
public void init() {
if (null == executorService) {
executorService = new ThreadPoolExecutor(5,5,60L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy());
}
if (null == scheduledExecutorService) {
scheduledExecutorService = Executors.newScheduledThreadPool(5);
}
isOpen = webSocketClient.isOpen();
if (!isOpen){
reconnet();
}
while (isOpen){
closeAir();
break;
}
}
public synchronized void reconnet(){
if (webSocketClient!=null&&!webSocketClient.isOpen()){
timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (webSocketClient.getReadyState().equals(WebSocket.READYSTATE.NOT_YET_CONNECTED)){
try {
isOpen = webSocketClient.connectBlocking();
if (isOpen){
closeAir();
timer.cancel();
timer=null;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}else if (webSocketClient.getReadyState().equals(WebSocket.READYSTATE.CLOSING)||webSocketClient.getReadyState().equals(WebSocket.READYSTATE.CLOSED)){
try {
isOpen = webSocketClient.reconnectBlocking();
if (isOpen){
closeAir();
timer.cancel();
timer = null;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},0,1);
}
}
@PreDestroy
public void destroy() {
if (null != executorService) {
executorService = null;
}
if (null != scheduledExecutorService) {
scheduledExecutorService = null;
}
}
private void closeAir() {
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (cache.size() > 0) {
try {
lock.lock();
for (Map.Entry<String, Object> entry : cache.entrySet()) {
String key = entry.getKey();
AutoCloseAirPo airPo = (AutoCloseAirPo) entry.getValue();
LocalDateTime startTime = airPo.getStartTime();
LocalDateTime now = LocalDateTime.now();
if (startTime.plusSeconds(airPo.getExpireTime()).isBefore(now)) {
List<Map<Long, String>> closeCmdList = airPo.getCloseCmdList();
for (Map<Long, String> map : closeCmdList) {
for (Map.Entry entry1 : map.entrySet()) {
String closeCmd = (String) entry1.getValue();
webSocketClient.send(closeCmd);
log.info("-------[发送联动控制指令成功:"+closeCmd+"]");
cache.remove(key);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}, 5, 10, TimeUnit.SECONDS);
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_topic").toString();
if (topic.startsWith(humanDensityTopic)) {
dealCtrlAirClose(message);
}
}
private void dealCtrlAirClose(Message<?> message) {
executorService.execute(new Runnable() {
@Override
public void run() {
JsonObject jsonObject = JsonParser.parseString(message.getPayload().toString()).getAsJsonObject();
String deviceCode = jsonObject.get("deviceCode").getAsString();
int insidePeopleNum = jsonObject.get("insidePeopleNum").getAsInt();
IbsItemVO ctrlDevice = ibsItemService.getIbsItemByCode(deviceCode);
if (null != ctrlDevice && HUMAN_DENSITY_DEVICE_TYPE.equals(ctrlDevice.getType())) {
if (insidePeopleNum == 0) {
AutoCloseAirPo oldAirPo = (AutoCloseAirPo) cache.get(deviceCode);
if (null == oldAirPo) {
AutoCloseAirPo autoCloseAirPo = autoCloseAirPoFactory.build(jsonObject, ctrlDevice);
cache.put(autoCloseAirPo.getCtrlDeviceCode(), autoCloseAirPo);
} else {
oldAirPo.setStartTime(LocalDateTime.now());
}
} else {
AutoCloseAirPo oldAirPo = (AutoCloseAirPo) cache.get(deviceCode);
if (null != oldAirPo) {
oldAirPo.setStartTime(LocalDateTime.now());
}
}
}
}
});
}
public String getHumanDensityTopic() {
return humanDensityTopic;
}
public void setHumanDensityTopic(String humanDensityTopic) {
this.humanDensityTopic = humanDensityTopic;
}
}
MqttSenderTool.java 发送消息到mqtt 工具类
package com.wzw.config.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Component
public class MqttSenderTool {
@Autowired
private MqttGateway mqttGateway;
@Value("${spring.mqtt.controlMsgTopic:/topic/controlMsg/smartLight}")
private String controlMsgTopic;
@Value("${spring.mqtt.warnMsgTopic:/topic/warnMsg/smartLight}")
private String warnMsgTopic;
public String sendControlMsg(String sendData){
mqttGateway.sendToMqtt(controlMsgTopic, 2, sendData);
return "OK";
}
public String sendWarnMsg(String sendData){
mqttGateway.sendToMqtt(warnMsgTopic, 1, sendData);
return "OK";
}
}
SpringBoot 后台 实现WebSocketClient 配置 pom.xml 里
<!--websocket 后台客户端-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.8</version>
</dependency>
WebSocketConfig.java
package com.wzw.config;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import java.net.URI;
@Slf4j
@Configuration
public class WebSocketConfig {
@Value("${props.linkagectrl.wsServerUrl}")
private String wsServerUrl;
@Profile({"dev", "test","dev2","local"})
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
@Bean
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public WebSocketClient webSocketClient() {
try {
WebSocketClient webSocketClient = new WebSocketClient(new URI(wsServerUrl),new Draft_6455()) {
@Override
public void onOpen(ServerHandshake handshakedata) {
log.info("[websocket] 连接成功");
}
@Override
public void onMessage(String message) {
log.info("[websocket] 收到消息={}",message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("[websocket] 退出连接");
}
@Override
public void onError(Exception ex) {
log.info("[websocket] 连接错误={}",ex.getMessage());
}
};
webSocketClient.connect();
return webSocketClient;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
TestWebSocketClientController.java 测试类
package com.wzw.controller;
import com.wzw.config.annotation.IgnoreAuth;
import org.java_websocket.client.WebSocketClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@RestController
@RequestMapping("/webSocketClient")
@ConditionalOnProperty(name = "props.linkagectrl.enabled",havingValue = "true")
public class TestWebSocketClientController {
@Autowired
private WebSocketClient webSocketClient;
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
private boolean flag = false;
@RequestMapping("/send")
@IgnoreAuth
public String send(){
webSocketClient.send("dechnicWebSocketClient 发送订阅信息成功!");
return "发送订阅成功";
}
}
SpringBoot webSocketServer 配置
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
@Component
@ServerEndpoint("/webSocket")
public class WebSocketServer {
private static CopyOnWriteArrayList<WebSocketServer> list = new CopyOnWriteArrayList<>();
@OnOpen
public void onOpen(Session session) {
log.info("【websocket消息推送模块】客户端已连接!sessionID={}",session.getId());
}
@OnClose
public void onClose(Session session) {
log.info("【websocket消息推送模块】断开连接---- sessionID={}",session.getId());
list.remove(this);
}
@OnError
public void onError(Throwable error){
log.info("ws连接出错:{}", error.getMessage());
error.printStackTrace();
}
@OnMessage
public void onMessage(Session session, String message) {
log.debug("WebSocketServer onMessage: sessionID={}, message={}, ", session.getId(), message);
}
}
测试:
|