1.前言
- 目前国内主流的中间件(金蝶、东方通、中创)对spring提供的websocket服务都支持得不好,程序在运行过程中都会让websocket不同程度上的死掉。导致websocket功能无法正常使用,严重情况下,会让自身的中间件卡住(或者死掉)。导致整个应用不能访问。
- 为了解决这类问题,我们必须找到更加完美,更加贴合我们应用的websocket组件。org.java_websocket就是其中的解决方案之一,以下针对org.java_websocket组件实现聊天和消息提醒的替换方案的阐述。
2.java_websocket方案图
- java_websocket:用于创建websocket服务,及websocket的onOpen/onClose/onMessage/onError等方法的维护,对应JavaWebSocketServer和WebSocketConfig类。
- ws集中处理及业务分发:对websocket的onOpen/onClose/onMessage/onError等方法的集中处理及websocket业务分类,实现WebSocketService接口。
- 聊天业务处理:对聊天websocket的session管理及相关业务实现,实现WebSocketBusinessService接口。
- 消息提醒业务处理:对消息提醒websocket的session管理及相关业务实现,实现WebSocketBusinessService接口。
3.java_websocket组件使用
3.1 maven
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.37</version>
</dependency>
3.2 JavaWebSocketServer实现
JavaWebSocketServer需要继承java_websocket组件的WebSocketServer类。如下代码。
注意:
- 代码中必须要使用线程将接收到的WebSocket进行分发处理,否则很容易让websocket服务自动挂掉。
- 调用WebSocketService接口,将WebSocket进行集中处理。
package com.lylp.sys.service;
import com.lylp.common.utils.SpringContextUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class JavaWebSocketServer extends WebSocketServer {
private static Logger logger = LoggerFactory.getLogger(JavaWebSocketServer.class);
private ExecutorService executorService = new ThreadPoolExecutor(50, 200, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new BasicThreadFactory.Builder().namingPattern("thread-pool-%d").daemon(true).build());
public JavaWebSocketServer(Integer port) {
super(new InetSocketAddress(port));
}
@Override
public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
executorService.execute(() -> {
WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
bean.onOpen(webSocket, clientHandshake.getResourceDescriptor());
});
logger.debug("JavaWebSocketServer online " + this.connections().size());
}
@Override
public void onClose(WebSocket webSocket, int i, String s, boolean b) {
if (webSocket == null) {
return;
}
executorService.execute(() -> {
WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
bean.onClose(webSocket, i, s, b);
});
}
@Override
public void onMessage(WebSocket webSocket, String s) {
executorService.execute(() -> {
WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
bean.onMessage(webSocket, s);
});
}
@Override
public void onError(WebSocket webSocket, Exception e) {
if (webSocket == null) {
logger.info("JavaWebSocketServer on error. websocket is null.");
logger.error(e.getMessage(), e);
return;
}
executorService.execute(() -> {
WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
bean.onError(webSocket, e);
});
}
@Override
public void onStart() {
logger.info("JavaWebSocketServer start");
}
}
3.3WebSocketConfig类
WebSocketConfig对websocket端口号、websocket服务器启动进行配置,代码如下
注意:java_websocket组件需要独立占用一个端口号。
package com.lylp.sys.config;
import com.lylp.common.exception.LYLPException;
import com.lylp.sys.service.JavaWebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WebSocketConfig {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${core.configuration.websocket.port}")
private Integer websocketPort;
@Bean
public JavaWebSocketServer javaWebSocketServer() {
if (websocketPort == null) {
logger.error("javaWebSocketServer start failed, reason: core.configuration.websocket.port is null");
throw new LYLPException("javaWebSocketServer start failed");
}
JavaWebSocketServer javaWebSocketServer = new JavaWebSocketServer(websocketPort);
javaWebSocketServer.start();
return javaWebSocketServer;
}
}
4.ws集中处理及业务分发
WebSocketService接口用于集中处理WebSocket信息,包括WebSocket消息的分类,token处理,获取当前用户信息等处理。
注意:java_websocket是通过url进行业务分类的。
接口代码如下:
package com.lylp.sys.service;
import org.java_websocket.WebSocket;
public interface WebSocketService {
void onOpen(WebSocket webSocket, String url);
void onClose(WebSocket webSocket, int i, String message, boolean b);
void onMessage(WebSocket webSocket, String message);
void onError(WebSocket webSocket, Exception e);
}
实现代码如下:
注意:
-java_websocket的WebSocket类中有一个attachment属性,可以拥有存储附件信息。这里用于存储用户信息。attachment属性会在WebSocket生命周期中都会存在。比如在onOpen设置了此属性,则在onMessage/onClose/onError方法的WebSocket参数会自动携带attachment信息。
-在onOpen/onMessage/onClose/OnError根据解析的WebSocket类型,自动查找相应业务(即WebSocketBusinessService接口对应的实现类),对相关业务进行处理。
package com.lylp.sys.service.impl;
import com.lylp.common.exception.LYLPException;
import com.lylp.common.utils.HttpUtils;
import com.lylp.common.utils.ResponseResult;
import com.lylp.common.utils.SpringContextUtils;
import com.lylp.sys.dao.dto.SysUserAccountDto;
import com.lylp.sys.dao.dto.WsUserDto;
import com.lylp.sys.service.WebSocketBusinessService;
import com.lylp.sys.service.WebSocketService;
import org.apache.commons.lang.StringUtils;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class WebSocketServiceImpl implements WebSocketService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${server.servlet.context-path}")
private String contextPath;
@Override
public void onOpen(WebSocket webSocket, String url) {
if (StringUtils.isBlank(url)) {
webSocket.close();
logger.error("非法的websocket链接,原因:url is blank。连接被关闭");
return;
}
logger.debug("WebSocket onOpen url:" + url);
String base = StringUtils.isBlank(contextPath) ? "/" : contextPath + "/";
url = url.replace(base, "");
url = url.replace("/", "");
url = url.replace("?", " ");
String[] split = url.split(" token=");
if(split.length != 2 || StringUtils.isBlank(split[0]) || StringUtils.isBlank(split[1])){
webSocket.close();
logger.error("非法的websocket链接,原因:无token信息。连接被关闭");
return;
}
WsUserDto dto = new WsUserDto();
dto.setWsType(split[0]);
dto.setToken(split[1]);
WebSocketBusinessService service;
try {
service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
}catch (Exception e){
webSocket.close();
logger.error(dto.getWsType() + " bean not found。连接被关闭", e);
return;
}
ResponseResult responseResult = HttpUtils.getAccountInfo(dto.getToken());
if(responseResult == null || responseResult.getCode() != 0){
webSocket.close();
logger.error("[" + dto.getToken() + "] token 失效。连接被关闭");
return;
}
SysUserAccountDto accountDto = (SysUserAccountDto) responseResult.getData();
dto.setUserId(accountDto.getUserId());
dto.setUserName(accountDto.getUserName());
dto.setOrgId(accountDto.getOrgId());
dto.setOrgName(accountDto.getOrgName());
webSocket.setAttachment(dto);
service.addSession(webSocket);
}
@Override
public void onClose(WebSocket webSocket, int i, String message, boolean b) {
WsUserDto dto = webSocket.getAttachment();
if (dto == null) {
return;
}
logger.debug("SWebSocket on close:" + dto.getWsType() + "-" + dto.getUserName());
SWebSocketBusinessService service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
service.removeSession(webSocket);
}
@Override
public void onMessage(WebSocket webSocket, String message) {
WsUserDto dto = webSocket.getAttachment();
if (dto == null) {
return;
}
logger.debug("WebSocket on message:" + dto.getWsType() + "-" + dto.getUserName());
WebSocketBusinessService service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
service.onMessage(webSocket, message);
}
@Override
public void onError(WebSocket webSocket, Exception e) {
WsUserDto dto = webSocket.getAttachment();
if (dto == null) {
return;
}
logger.debug("WebSocket on error:" + dto.getWsType() + "-" + dto.getUserName());
WebSocketBusinessService service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
service.onError(webSocket, e);
}
}
5.业务处理接口
当然根据业务需求,业务的处理可以集中处理,不用分类处理。
WebSocketBusinessService根据不同业务场景定义相关的业务处理接口。代码如下:
package com.lylp.sys.service;
import com.lylp.sys.dao.dto.WsUserDto;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.BooleanUtils;
import org.java_websocket.WebSocket;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
public interface WebSocketBusinessService {
Logger LOGGER = LoggerFactory.getLogger(WebSocketBusinessService.class);
void addSession(WebSocket webSocket);
void removeSession(WebSocket webSocket);
void onMessage(WebSocket webSocket, String message);
void onError(WebSocket webSocket, Exception e);
default boolean sendMessageByUserId(ConcurrentHashMap<String, WebSocket> sessions, String userId, String message){
if (StringUtils.isEmpty(userId) || message == null) {
return false;
}
if (sessions.containsKey(userId)) {
return this.sendMessage(sessions.get(userId), message);
}
return false;
}
default boolean sendMessageByOrgId(Collection<WebSocket> webSockets, String orgId, String message){
if(CollectionUtils.isEmpty(webSockets) || StringUtils.isEmpty(orgId) || message == null){
return false;
}
for (WebSocket webSocket : webSockets) {
if(webSocket == null){
continue;
}
WsUserDto dto = webSocket.getAttachment();
if (orgId.equals(dto.getOrgId())) {
this.sendMessage(webSocket, message);
}
}
return true;
}
default boolean sendMessage(Collection<WebSocket> webSockets, String message){
if(CollectionUtils.isEmpty(webSockets) || message == null){
return false;
}
for (WebSocket webSocket : webSockets) {
sendMessage(webSocket, message);
}
return true;
}
default boolean sendMessage(WebSocket webSocket, String message) {
if (webSocket == null || message == null) {
return false;
}
WsUserDto dto = webSocket.getAttachment();
if (!BooleanUtils.isTrue(dto.getEnabled())) {
return false;
}
if (webSocket.isOpen()) {
try {
webSocket.send(message);
return true;
}catch (Exception e){
LOGGER.error(e.getMessage(), e);
return false;
}
}
return false;
}
void sendHeartbeat();
}
6.聊天业务处理
聊天业务处理时WebSocketBusinessService接口的其中一个实现类,代码如下:
package com.lylp.sys.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.lylp.common.utils.CommUtil;
import com.lylp.common.utils.DateUtils;
import com.lylp.sys.dao.dto.WsUserDto;
import com.lylp.sys.service.WebSocketBusinessService;
import com.lylp.sys.dao.dto.WebSocketImDto;
import com.lylp.sys.dao.entity.ImMessage;
import com.lylp.sys.service.IImMessageService;
import org.apache.commons.lang3.BooleanUtils;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Service("webSocketIm")
public class WebSocketImServiceImpl implements WebSocketBusinessService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
@Autowired
private IImMessageService iImMessageService;
@Override
public void addSession(WebSocket webSocket) {
WsUserDto dto = webSocket.getAttachment();
if (webSocketSet.containsKey(dto.getUserId())) {
WebSocket old = webSocketSet.get(dto.getUserId());
if (old != null) {
WsUserDto oldDto = old.getAttachment();
oldDto.setEnabled(false);
if (old.isOpen()) {
old.close();
}
}
}
webSocketSet.put(dto.getUserId(), webSocket);
logger.debug("webSocketIm online " + webSocketSet.size());
sendOnlineOrgMessage();
}
@Override
public void removeSession(WebSocket webSocket) {
WsUserDto userDto = webSocket.getAttachment();
if (!BooleanUtils.isTrue(userDto.getEnabled())) {
return;
}
WebSocketImDto dto = new WebSocketImDto();
dto.setObjuid(userDto.getOrgId());
dto.setSendOrgId(userDto.getOrgId());
dto.setSendOrgName(userDto.getOrgName());
String key = userDto.getUserId();
if (webSocketSet.containsKey(key)) {
WebSocket old = webSocketSet.get(key);
if (old == null) {
webSocketSet.remove(key);
} else if (userDto.getSessionId().equals(((WsUserDto) old.getAttachment()).getSessionId())) {
webSocketSet.remove(key);
}
}
if (!isOnline(dto)) {
sendOnlineOrgMessage();
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(WebSocket webSocket, String message) {
WebSocketImDto dto = JSON.parseObject(message, WebSocketImDto.class);
if (dto.getType().equals("3")) {
ImMessage msg = new ImMessage();
msg.setRead(true);
EntityWrapper<ImMessage> wrapper = new EntityWrapper<>();
wrapper.eq("objuid", dto.getObjuid());
iImMessageService.update(msg, wrapper);
String msgInfo = JSON.toJSONString(dto);
sendInfoOrg(msgInfo, dto.getReceiveOrgId(), dto.getSendOrgId());
}
else if (dto.getType().equals("2")) {
sendOnlineOrgMessage();
}
else if (dto.getType().equals("1")) {
WsUserDto userDto = webSocket.getAttachment();
ImMessage im = new ImMessage();
im.setSend(true);
im.setRead(false);
im.setSuccess(true);
im.setSendOrgId(userDto.getOrgId());
im.setSendUserId(userDto.getUserId());
im.setCreateTime(DateUtils.getNowTimeDate());
im.setObjuid(CommUtil.getUUID());
im.setReceiveOrgId(dto.getReceiveOrgId());
im.setContent(dto.getContent());
im.setSendOrgName(userDto.getOrgName());
im.setReceiveOrgName(dto.getReceiveOrgName());
im.setSendUserName(userDto.getUserName());
iImMessageService.insert(im);
BeanUtils.copyProperties(im, dto);
String orgId = im.getReceiveOrgId();
String msg = JSON.toJSONString(dto);
sendInfoOrg(msg, orgId, dto.getSendOrgId());
}
}
@Override
public void onError(WebSocket webSocket, Exception e) {
logger.error(e.getMessage(), e);
if (webSocket.isOpen()) {
webSocket.close();
} else {
removeSession(webSocket);
}
}
@Override
public void sendHeartbeat() {
logger.debug("即时通讯,websocket发送心跳,在线数:" + webSocketSet.size());
this.sendMessage(webSocketSet.values(), "chat heartbeat");
}
private boolean isOnline(WebSocketImDto dto) {
for (WebSocket webSocket : webSocketSet.values()) {
if (webSocket == null) {
continue;
}
WsUserDto userDto = webSocket.getAttachment();
if (userDto.getOrgId().equals(dto.getObjuid())) {
return true;
}
}
return false;
}
private void sendInfoOrg(String message, String orgId, String sendOrgId) {
for (WebSocket webSocket : webSocketSet.values()) {
if (webSocket == null) {
continue;
}
WsUserDto userDto = webSocket.getAttachment();
if (userDto.getOrgId().equals(orgId) || userDto.getOrgId().equals(sendOrgId)) {
sendMessage(webSocket, message);
}
}
}
private void sendOnlineOrgMessage() {
Map<String, Object> map = new HashMap<>(8);
Set<WebSocketImDto> set = getOnlineOrg();
map.put("type", "2");
map.put("date", set);
sendMessage(webSocketSet.values(), JSON.toJSONString(map));
}
private Set<WebSocketImDto> getOnlineOrg() {
Set<WebSocketImDto> set = new HashSet<>();
for (WebSocket webSocket : webSocketSet.values()) {
if (webSocket == null) {
continue;
}
WsUserDto userDto = webSocket.getAttachment();
WebSocketImDto org = new WebSocketImDto();
org.setObjuid(userDto.getOrgId());
org.setOrgName(userDto.getOrgName());
set.add(org);
}
return set;
}
}
7.消息提醒业务处理
消息提醒业务处理时WebSocketBusinessService接口的其中一个实现类,代码如下:
package com.lylp.sys.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.lylp.sys.dao.dto.SysUserAccountDto;
import com.lylp.sys.dao.dto.WsUserDto;
import com.lylp.sys.dao.entity.SysMessageEntity;
import com.lylp.sys.service.ShiroService;
import com.lylp.sys.service.SysMessageService;
import com.lylp.sys.service.WebSocketBusinessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Service("websocketMsgNotice")
public class SysWsMsgNoticeServiceImpl implements WebSocketBusinessService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static ConcurrentHashMap<String, WebSocket> sessionMap = new ConcurrentHashMap<>();
@Autowired
private SysMessageService sysMessageService;
@Autowired
private ShiroService shiroService;
@Override
public void addSession(WebSocket webSocket) {
WsUserDto dto = webSocket.getAttachment();
if (sessionMap.containsKey(dto.getUserId())) {
WebSocket old = sessionMap.remove(dto.getUserId());
if (old != null) {
WsUserDto oldDto = old.getAttachment();
oldDto.setEnabled(false);
dto.setPermissions(((WsUserDto) old.getAttachment()).getPermissions());
if (old.isOpen()) {
old.close();
}
}
} else {
try {
dto.setPermissions(shiroService.getUserPermissionsByToken(dto.getToken()));
} catch (Exception e) {
dto.setPermissions(new HashSet<>());
logger.error("获取用户权限失败", e);
}
}
sessionMap.put(dto.getUserId(), webSocket);
logger.debug("SysWsMsgNotice online " + sessionMap.size());
try {
EntityWrapper<SysMessageEntity> wrapper = new EntityWrapper<>();
wrapper.eq("msg_state", 1);
List<SysMessageEntity> list = sysMessageService.list(wrapper, this.getAccount(dto));
if (!CollectionUtils.isEmpty(list)) {
this.sendMessage(webSocket, JSON.toJSONString(list));
sysMessageService.updateSendState(list);
}
} catch (Exception e) {
logger.error("消息初始化发送失败", e);
}
}
@Override
public void removeSession(WebSocket webSocket) {
WsUserDto dto = webSocket.getAttachment();
if (!BooleanUtils.isTrue(dto.getEnabled())) {
return;
}
String key = dto.getUserId();
if (sessionMap.containsKey(key)) {
WebSocket old = sessionMap.get(key);
if (old == null) {
sessionMap.remove(key);
} else if (dto.getSessionId().equals(((WsUserDto) old.getAttachment()).getSessionId())) {
sessionMap.remove(key);
}
}
}
@Override
public void onMessage(WebSocket webSocket, String message) {
}
@Override
public void onError(WebSocket webSocket, Exception e) {
WsUserDto dto = webSocket.getAttachment();
logger.error("websocketMsgNotice [" + dto.getUserId() + "-" + dto.getUserName() + "] websocket on error. close connection.", e);
if (webSocket.isOpen()) {
webSocket.close();
} else {
removeSession(webSocket);
}
}
private SysUserAccountDto getAccount(WsUserDto dto) {
SysUserAccountDto user = new SysUserAccountDto();
user.setUserId(dto.getUserId());
user.setOrgId(dto.getOrgId());
user.setPermissions(dto.getPermissions());
return user;
}
@Override
public void sendHeartbeat() {
this.sendMessage(sessionMap.values(), "message heartbeat");
}
@Scheduled(cron = "1 * * * * ?")
public void pushMsg() {
try {
for (WebSocket webSocket : sessionMap.values()) {
if (webSocket == null) {
continue;
}
WsUserDto dto = webSocket.getAttachment();
EntityWrapper<SysMessageEntity> wrapper = new EntityWrapper<>();
wrapper.in("send_state", 0, 2);
List<SysMessageEntity> list = sysMessageService.list(wrapper, this.getAccount(dto));
if (list != null && list.size() > 0) {
this.sendMessage(webSocket, JSON.toJSONString(list));
sysMessageService.updateSendState(list);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
|