■1.消息推送的逻辑结构设计思路
消息推送一般为某角色用户在页面上做申请操作,然后通过消息推送提醒下一个任务节点用户对上述申请数据做出处理。这里我们把消息推送分为,消息生成和消息推送2个部分。消息生成后由于我们不知道被推送者当前是否在线,所以产生的消息可不直接推送,而是将消息内容放入数据库的消息推送表中。然后,通过定时任务去不断获取需要推送的消息,判断被推送者是否在线如果在线就将消息推送出去,并更改消息状态为已推送。如果被推送者当前不在线,那么就不改变当前消息的推送状态。
■2.代码的简单实现
SpringBoot+SpringTask+WebSocket+LayUI
为了代码简单起见,我们这里就不连接数据库,而是在代码中随机生成消息模拟消息来着于数据库表。另外,定时任务我们这里采用比较简单的SpringTask实现。
websoket设置部分代码
WebSocketConfig.java
package com.websendmsg.webSocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketServerEndpoint.java
这里和客户端连接的地址为"/ws/message/{userId}"
package com.websendmsg.webSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* ServerEndpoint
* <p>
* 使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
* <p>
* 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
*/
@ServerEndpoint("/ws/message/{userId}") //WebSocket客户端建立连接的地址
@Component
public class WebSocketServerEndpoint {
private final static Logger log = LoggerFactory.getLogger(WebSocketServerEndpoint.class);
/**
* 存活的session集合(使用线程安全的map保存)
*/
private static Map<String, Session> livingSessions = new ConcurrentHashMap<>();
/**
* 未发送出的消息
*/
public static Map<String,Map<String,String>> unSensMessages = new ConcurrentHashMap<>();
/**
* 建立连接的回调方法
*
* @param session 与客户端的WebSocket连接会话
* @param userId 用户名,WebSocket支持路径参数
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
// 建立连接时 保存当前登陆人到已登录集合中
livingSessions.put(userId, session);
// 判断当前登陆人是否有未读的消息 有则发送
Iterator<Map.Entry<String, Map<String, String>>> iterator = unSensMessages.entrySet().iterator();
while (iterator.hasNext()){
// 迭代器迭代每一个事件 key: 事件id, value: 用户的消息
Map.Entry<String, Map<String, String>> next = iterator.next();
// 消息 key: userId, value: message
Map<String, String> nextValue = next.getValue();
Iterator<Map.Entry<String, String>> iterator1 = nextValue.entrySet().iterator();
while (iterator1.hasNext()){
// 迭代每一个消息
Map.Entry<String, String> next1 = iterator1.next();
// 判断登陆人是否有消息
if(next1.getKey().equals(userId)){
//发送
sendMessage(session,next1.getValue());
//移除已发送的消息
iterator1.remove();
}
}
// 判断该事件是否为空了,(该事件消息已全部发送)
if(next.getValue().size() == 0){
// 当前事件已全部发送 移除该事件
iterator.remove();
}
}
log.info(userId + "进入连接★★★★★★★★★★★★★★★★★★");
}
@OnMessage
public void onMessage(String message, Session session, @PathParam("userId") String userId) {
log.info(userId + " : " + message);
//sendMessageToAll(userId + " : " + message);
}
@OnError
public void onError(Session session, Throwable error) {
log.info("发生错误");
log.error(error.getStackTrace() + "");
}
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
livingSessions.remove(userId);
log.info(userId + " ■■■■■■■■■■■■■■■■■■■关闭连接");
}
/**
* 单独发送消息
*
* @param session
* @param message
*/
public void sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(message.replace('\"', '\''));
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendMsg(String userId ,String msg){
// 判断user是否登录状态
if(livingSessions.get(userId) != null){
System.out.println("发送消息");
//当前user已登录,发送消息
sendMessage(livingSessions.get(userId), msg);
}
}
// /**
// * 群发消息
// * userIds 是需要接收消息的用户id集合 可单发,可群发
// *
// * @param message
// */
// public void sendMessageToAll(String message) {
// // 将json字符串转为message类
// MessageVo messageVo = JSONObject.parseObject(message, MessageVo.class);
//
// // 需要发送的人可能未上线 将未发送的消息保存到未发送集合中
// Map<String,String> unSendsUsers = new ConcurrentHashMap<>();
// // 遍历需要发送到的人
// for (Integer userId : messageVo.getUserIds()) {
// // 当前已登录的人
// if(livingSessions.get(String.valueOf(userId)) != null){
// //当前user已登录,发送消息
// sendMessage(livingSessions.get(String.valueOf(userId)), message);
// }else{
// // 当前user未登录,保存到集合中
// unSendsUsers.put(String.valueOf(userId),message);
// }
// };
// // 这些消息属于同一个事件,放入集合中
// unSensMessages.put(messageVo.getBussinessKey(),unSendsUsers);
// }
}
消息实体类
MsgEntity.java
package com.websendmsg.entity;
import java.util.List;
public class MsgEntity {
//消息的唯一标识
private String bussinessKey;
//消息内容
private String message;
//可以收到的用户id
private List<Integer> userIds;
// 自定义事件类型: todo 待办,taskPool 任务池, evaluate 评价
private String type;
// 字符串类型的消息接收时间
private String date;
// 事由/原因
private String reason;
// 状态 : 默认 log, 成功 success,警告 warn,危险 error
private String status;
public String getBussinessKey() {
return bussinessKey;
}
public void setBussinessKey(String bussinessKey) {
this.bussinessKey = bussinessKey;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public List<Integer> getUserIds() {
return userIds;
}
public void setUserIds(List<Integer> userIds) {
this.userIds = userIds;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getReason() {
return reason;
}
public void setReason(String reason) {
this.reason = reason;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
测试用的controller类
IndexController.java
package com.websendmsg.controller;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class IndexController {
@RequestMapping("/index")
public String index(Model model){
// 假设用户1登陆
model.addAttribute("userId","1");
return "index";
}
@RequestMapping("/index2")
public String index2(Model model){
// 假设用户2登陆
model.addAttribute("userId","2");
return "index";
}
}
前端代码
index.html
连接服务端代码[websocket = new WebSocket("ws://localhost:8080/ws/message/"+userId);]
这里我们通过userId来定位消息推送时发给谁。所以我们在登录的时候要将登录者的userId传给页面。另外正式项目中消息推送部分代码应放在共通页面中然后在个页面中引入,而且userId作为敏感信息也需要做脱敏加密处理。※注释:一旦我们的访问的页面中没有上述连接服务端代码,即表示和服务端切断联系,就不能在推送消息。这也是需要把推送消息部分代码放入共通页面的原因。
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>message</title>
<link rel="stylesheet" href="./layui/css/layui.css">
<link href="./css/message/bootstrap-grid.min.css" rel="stylesheet"/>
<link href="./css/message/naranja.min.css" rel="stylesheet"/>
</head>
<body class="layui-layout-body">
<div class="layui-layout layui-layout-admin">
<div class="layui-header">
<div class="layui-logo">layui 后台布局</div>
<!-- 头部区域(可配合layui已有的水平导航) -->
<ul class="layui-nav layui-layout-left">
<li class="layui-nav-item"><a href="">控制台</a></li>
<li class="layui-nav-item"><a href="">商品管理</a></li>
<li class="layui-nav-item"><a href="">用户</a></li>
<li class="layui-nav-item">
<a href="javascript:;">其它系统</a>
<dl class="layui-nav-child">
<dd><a href="">邮件管理</a></dd>
<dd><a href="">消息管理</a></dd>
<dd><a href="">授权管理</a></dd>
</dl>
</li>
</ul>
<ul class="layui-nav layui-layout-right">
<li class="layui-nav-item">
<a href="javascript:;">
<img src="http://t.cn/RCzsdCq" class="layui-nav-img">
贤心
</a>
<dl class="layui-nav-child">
<dd><a href="">基本资料</a></dd>
<dd><a href="">安全设置</a></dd>
</dl>
</li>
<li class="layui-nav-item"><a href="">退了</a></li>
</ul>
</div>
<div class="layui-side layui-bg-black">
<div class="layui-side-scroll">
<!-- 左侧导航区域(可配合layui已有的垂直导航) -->
<ul class="layui-nav layui-nav-tree" lay-filter="test">
<li class="layui-nav-item layui-nav-itemed">
<a class="" href="javascript:;">所有商品</a>
<dl class="layui-nav-child">
<dd><a href="javascript:;">列表一</a></dd>
<dd><a href="javascript:;">列表二</a></dd>
<dd><a href="javascript:;">列表三</a></dd>
<dd><a href="">超链接</a></dd>
</dl>
</li>
<li class="layui-nav-item">
<a href="javascript:;">解决方案</a>
<dl class="layui-nav-child">
<dd><a href="javascript:;">列表一</a></dd>
<dd><a href="javascript:;">列表二</a></dd>
<dd><a href="">超链接</a></dd>
</dl>
</li>
<li class="layui-nav-item"><a href="">云市场</a></li>
<li class="layui-nav-item"><a href="">发布商品</a></li>
</ul>
</div>
</div>
<div class="layui-body">
<input type="hidden" id="userId" name="userId" th:value="${userId}" />
<!-- 内容主体区域 -->
<div style="padding: 15px;">
<fieldset class="layui-elem-field site-demo-button" style="margin-top: 30px; padding: 30px;">
<legend>模拟发送消息</legend>
<div>
<button class="layui-btn layui-btn-primary" id="sendOne">发送一条消息</button>
<button class="layui-btn">默认按钮</button>
<button class="layui-btn layui-btn-normal">百搭按钮</button>
</div>
</fieldset>
</div>
</div>
<div class="layui-footer">
<!-- 底部固定区域 -->
? layui.com - 底部固定区域
</div>
</div>
</body>
<script src="./jquery/jquery.min.js"></script>
<script src="./layui/layui.js"></script>
<script src="./js/message/naranja.js"></script>
<script>
layui.use(['layer',"jquery"], function(){
var layer = layui.layer;
$ = layui.jquery;
$("#sendOne").click(function () {
$.ajax({
url:"/hello",
type:"post",
success: function (data) {
if(data.code === 200){
console.log(data.message);
}
}
})
});
var websocket = null;
var userId = $("#userId").val();
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
// 连接服务端
websocket = new WebSocket("ws://localhost:8080/ws/message/"+userId);
}
else {
alert('当前浏览器 不支持WebSocket')
}
//连接发生错误的回调方法
websocket.onerror = function () {
//setMessageInnerHTML("连接发生错误");
console.log("webSocket 连接发生错误")
};
//连接成功建立的回调方法
websocket.onopen = function () {
console.log("webSocket 连接成功")
};
//接收到消息的回调方法,此处添加处理接收消息方法,当前是将接收到的信息显示在网页上
websocket.onmessage = function (event) {
setMessageInnerHTML(event.data);
};
//连接关闭的回调方法
websocket.onclose = function () {
console.log("webSocket 连接关闭,如需登录请刷新页面。")
};
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
websocket.close();
};
//将消息显示在网页上,如果不需要显示在网页上,则不调用该方法
function setMessageInnerHTML(result) {
var message = JSON.parse(result.replace(/'/g,'\"'));
// 消息不为空
if(message !== undefined && message !== null){
// 播放通知
//$("#chatAudio").get(0).play();
// 消息状态 log(默认), success(成功), warn(警告), error(危险)
var status = message.status;
naranja()[status]({
title: '新消息',
text: "<div class='goto' m-type='"+message.type+"' m-key='"+message.bussinessKey+"'><a href='javascript:void(0)'>"+message.message+"</a></div>",
timeout: 'keep',
buttons: [{
text: '我知道了',
click: function (e) {
e.closeNotification();
}
}]
})
}
}
/**
* 链接点击事件
*/
$(document).on("click",".goto",function () {
// 消息类型
var type = $(this).attr("m-type");
// 消息key
var key = $(this).attr("m-key");
// 点击之后关闭当前消息框
naranja().closeThisNotification(this);
// 根据类型去相应页面
if(type === "todo"){
// 根据 type请求不同的页面,key可作为参数携带
}
});
});
</script>
</html>
|