IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 国内中间件websocket解决方案 -> 正文阅读

[网络协议]国内中间件websocket解决方案

1.前言

  • 目前国内主流的中间件(金蝶、东方通、中创)对spring提供的websocket服务都支持得不好,程序在运行过程中都会让websocket不同程度上的死掉。导致websocket功能无法正常使用,严重情况下,会让自身的中间件卡住(或者死掉)。导致整个应用不能访问。
  • 为了解决这类问题,我们必须找到更加完美,更加贴合我们应用的websocket组件。org.java_websocket就是其中的解决方案之一,以下针对org.java_websocket组件实现聊天和消息提醒的替换方案的阐述。

2.java_websocket方案图

在这里插入图片描述

  • java_websocket:用于创建websocket服务,及websocket的onOpen/onClose/onMessage/onError等方法的维护,对应JavaWebSocketServer和WebSocketConfig类。
  • ws集中处理及业务分发:对websocket的onOpen/onClose/onMessage/onError等方法的集中处理及websocket业务分类,实现WebSocketService接口。
  • 聊天业务处理:对聊天websocket的session管理及相关业务实现,实现WebSocketBusinessService接口。
  • 消息提醒业务处理:对消息提醒websocket的session管理及相关业务实现,实现WebSocketBusinessService接口。

3.java_websocket组件使用

3.1 maven

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.37</version>
</dependency>

3.2 JavaWebSocketServer实现

    JavaWebSocketServer需要继承java_websocket组件的WebSocketServer类。如下代码。
    注意:
    - 代码中必须要使用线程将接收到的WebSocket进行分发处理,否则很容易让websocket服务自动挂掉。
    - 调用WebSocketService接口,将WebSocket进行集中处理。
package com.lylp.sys.service;

import com.lylp.common.utils.SpringContextUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * WebSocket服务
 *
 * @author
 * @date 2021/7/1 10:53
 */
public class JavaWebSocketServer extends WebSocketServer {
    private static Logger logger = LoggerFactory.getLogger(JavaWebSocketServer.class);

    /**
 * 定义线程池<br>
 * websocket的onOpen/onClose/onMessage/onError接收消息后,<br>
 * 需要使用线程去处理相关业务逻辑,<br>
 * 如果自身处理的业务逻辑复杂,处理时间稍久一点,websocket服务就会自动关闭。
 */
 private ExecutorService executorService = new ThreadPoolExecutor(50, 200, 30, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            new BasicThreadFactory.Builder().namingPattern("thread-pool-%d").daemon(true).build());

    public JavaWebSocketServer(Integer port) {
        super(new InetSocketAddress(port));
    }

    @Override
 public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
        executorService.execute(() -> {
            WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
            bean.onOpen(webSocket, clientHandshake.getResourceDescriptor());
        });

        logger.debug("JavaWebSocketServer online " + this.connections().size());
    }

    @Override
 public void onClose(WebSocket webSocket, int i, String s, boolean b) {
        if (webSocket == null) {
            return;
        }
        executorService.execute(() -> {
            WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
            bean.onClose(webSocket, i, s, b);
        });
    }

    @Override
 public void onMessage(WebSocket webSocket, String s) {
        executorService.execute(() -> {
            WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
            bean.onMessage(webSocket, s);
        });
    }

    @Override
 public void onError(WebSocket webSocket, Exception e) {
        if (webSocket == null) {
            logger.info("JavaWebSocketServer on error. websocket is null.");
            logger.error(e.getMessage(), e);
            return;
        }
        executorService.execute(() -> {
            WebSocketService bean = SpringContextUtils.getBean(WebSocketService.class);
            bean.onError(webSocket, e);
        });
    }

    @Override
 public void onStart() {
        logger.info("JavaWebSocketServer start");
    }

}

3.3WebSocketConfig类

  WebSocketConfig对websocket端口号、websocket服务器启动进行配置,代码如下
  注意:java_websocket组件需要独立占用一个端口号。
package com.lylp.sys.config;

import com.lylp.common.exception.LYLPException;
import com.lylp.sys.service.JavaWebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WebSocketConfig {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${core.configuration.websocket.port}")
    private Integer websocketPort;

    @Bean
 public JavaWebSocketServer javaWebSocketServer() {
        if (websocketPort == null) {
            logger.error("javaWebSocketServer start failed, reason: core.configuration.websocket.port is null");
            throw new LYLPException("javaWebSocketServer start failed");
        }
        JavaWebSocketServer javaWebSocketServer = new JavaWebSocketServer(websocketPort);
        javaWebSocketServer.start();
        return javaWebSocketServer;
    }

}

4.ws集中处理及业务分发

WebSocketService接口用于集中处理WebSocket信息,包括WebSocket消息的分类,token处理,获取当前用户信息等处理。
注意:java_websocket是通过url进行业务分类的。
接口代码如下:
package com.lylp.sys.service;

import org.java_websocket.WebSocket;

/**
* websocket处理
*
* @author 
* @date 2021/7/1 10:55
*/
public interface WebSocketService {

   /**
* onOpen
*
* @param webSocket session
* @param url url
*/
void onOpen(WebSocket webSocket, String url);

   /**
* onClose
*
* @param webSocket session
* @param i
* @param message 消息
* @param b
*/
void onClose(WebSocket webSocket, int i, String message, boolean b);

   /**
* 接收消息
*
* @param webSocket session
* @param message 消息
*/
void onMessage(WebSocket webSocket, String message);

   /**
* onError
*
* @param webSocket session
* @param e 异常
*/
void onError(WebSocket webSocket, Exception e);
}
实现代码如下:
注意:
-java_websocket的WebSocket类中有一个attachment属性,可以拥有存储附件信息。这里用于存储用户信息。attachment属性会在WebSocket生命周期中都会存在。比如在onOpen设置了此属性,则在onMessage/onClose/onError方法的WebSocket参数会自动携带attachment信息。
-在onOpen/onMessage/onClose/OnError根据解析的WebSocket类型,自动查找相应业务(即WebSocketBusinessService接口对应的实现类),对相关业务进行处理。
package com.lylp.sys.service.impl;

import com.lylp.common.exception.LYLPException;
import com.lylp.common.utils.HttpUtils;
import com.lylp.common.utils.ResponseResult;
import com.lylp.common.utils.SpringContextUtils;
import com.lylp.sys.dao.dto.SysUserAccountDto;
import com.lylp.sys.dao.dto.WsUserDto;
import com.lylp.sys.service.WebSocketBusinessService;
import com.lylp.sys.service.WebSocketService;
import org.apache.commons.lang.StringUtils;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
* WebSocket集中处理服务
*
* @author 
* @date 2021/7/1 13:37
*/
@Service
public class WebSocketServiceImpl implements WebSocketService {

   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Value("${server.servlet.context-path}")
   private String contextPath;

   @Override
    public void onOpen(WebSocket webSocket, String url) {

   if (StringUtils.isBlank(url)) {
       webSocket.close();
       logger.error("非法的websocket链接,原因:url is blank。连接被关闭");
       return;
   }

       /**
   		 * 例如聊天和消息功能的websocket如下
   		 * 聊天功能websocket连接URL-> http://localhost:6896/lylp/webSocketIm?token=91144e5b74d537155f96cecdc7afe516
   		 * 消息功能websocket连接URL-> http://localhost:6896/lylp/websocketMsgNotice?token=91144e5b74d537155f96cecdc7afe516
   		 * 我们需要解析出URL中的webSocketIm,websocketMsgNotice,和token信息。
   		 * 在WebSocket中webSocketIm表示聊天的WebSocket连接;websocketMsgNotice表示消息的WebSocket连接。
   		 * 第一步解析出WebSocket,即得到webSocketIm(或websocketMsgNotice)信息。
   		 * 第二步解析出token信息,用户查询相关用户信息。
   	 	*/
		logger.debug("WebSocket onOpen url:" + url);
       //获取websocket类型和token信息
		String base = StringUtils.isBlank(contextPath) ? "/" : contextPath + "/";
       url = url.replace(base, "");
       url = url.replace("/", "");
       url = url.replace("?", " ");
       String[] split = url.split(" token=");
       if(split.length != 2 || StringUtils.isBlank(split[0]) || StringUtils.isBlank(split[1])){
           webSocket.close();
           logger.error("非法的websocket链接,原因:无token信息。连接被关闭");
           return;
       }
       WsUserDto dto = new WsUserDto();
       dto.setWsType(split[0]);
       dto.setToken(split[1]);
       //根据业务类型,获取相关业务处理的实现类。
		WebSocketBusinessService service;
       try {
           service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
       }catch (Exception e){
           webSocket.close();
           logger.error(dto.getWsType() + " bean not found。连接被关闭", e);
           return;
       }
       //通过token信息获取当前登录用户信息
		ResponseResult responseResult = HttpUtils.getAccountInfo(dto.getToken());
       if(responseResult == null || responseResult.getCode() != 0){
            webSocket.close();
            logger.error("[" + dto.getToken() + "] token 失效。连接被关闭");
            return;
       }
       //给websocket绑定用户信息
		SysUserAccountDto accountDto = (SysUserAccountDto) responseResult.getData();
       dto.setUserId(accountDto.getUserId());
       dto.setUserName(accountDto.getUserName());
       dto.setOrgId(accountDto.getOrgId());
       dto.setOrgName(accountDto.getOrgName());
       webSocket.setAttachment(dto);
       //业务分发处理
		service.addSession(webSocket);
   }

   @Override
	public void onClose(WebSocket webSocket, int i, String message, boolean b) {
       WsUserDto dto = webSocket.getAttachment();
   	   if (dto == null) {
   	            return;
       }
       logger.debug("SWebSocket on close:" + dto.getWsType() + "-" + dto.getUserName());
       SWebSocketBusinessService service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
       service.removeSession(webSocket);
   }

   @Override
	public void onMessage(WebSocket webSocket, String message) {
       WsUserDto dto = webSocket.getAttachment();
   		 if (dto == null) {
          return;
       }
       logger.debug("WebSocket on message:" + dto.getWsType() + "-" + dto.getUserName());
       WebSocketBusinessService service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
       service.onMessage(webSocket, message);
   }

   @Override
	public void onError(WebSocket webSocket, Exception e) {
       WsUserDto dto = webSocket.getAttachment();
		if (dto == null) {
          return;
       }
       logger.debug("WebSocket on error:" + dto.getWsType() + "-" + dto.getUserName());
       WebSocketBusinessService service = (WebSocketBusinessService) SpringContextUtils.getBean(dto.getWsType());
       service.onError(webSocket, e);
   }
}

5.业务处理接口

当然根据业务需求,业务的处理可以集中处理,不用分类处理。
WebSocketBusinessService根据不同业务场景定义相关的业务处理接口。代码如下:
package com.lylp.sys.service;

import com.lylp.sys.dao.dto.WsUserDto;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.BooleanUtils;
import org.java_websocket.WebSocket;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;

/**
* websocket业务处理接口
*
* @author 
* @date 2021/7/1 14:05
*/
public interface WebSocketBusinessService {

   Logger LOGGER = LoggerFactory.getLogger(WebSocketBusinessService.class);

   /**
    * 增加websocket
    *
    * @param webSocket session
    */
    void addSession(WebSocket webSocket);
   
    /**
    * 移除session
    *
    * @param webSocket session
    */
    void removeSession(WebSocket webSocket);
   
     /**
    * 接收消息处理
    *
    * @param webSocket session
    * @param message 消息
    */
    void onMessage(WebSocket webSocket, String message);
   
     /**
    * 异常处理
    *
    * @param webSocket session
    * @param e 异常
    */
    void onError(WebSocket webSocket, Exception e);
   
     /**
    * 向某个用户发送消息
    *
    * @param sessions websocket集合
    * @param userId 用户ID
    * @param message 消息
    * @return 发送结果
    */
	default boolean sendMessageByUserId(ConcurrentHashMap<String, WebSocket> sessions, String userId, String message){
      if (StringUtils.isEmpty(userId) || message == null) {
          return false;
      }
      if (sessions.containsKey(userId)) {
          return this.sendMessage(sessions.get(userId), message);
      }
      return false;
  }

   /**
    * 向某个机构下的所有用户发送消息
    *
    * @param webSockets websocket集合
    * @param orgId 用户ID
    * @param message 消息
    * @return 发送结果
    */
    default boolean sendMessageByOrgId(Collection<WebSocket> webSockets, String orgId, String message){
       if(CollectionUtils.isEmpty(webSockets) || StringUtils.isEmpty(orgId) || message == null){
           return false;
       }
       for (WebSocket webSocket : webSockets) {
           if(webSocket == null){
               continue;
           }
           WsUserDto dto = webSocket.getAttachment();
           if (orgId.equals(dto.getOrgId())) {
               this.sendMessage(webSocket, message);
           }
       }
       return true;
   }

   /**
    * 发送消息
    * @param webSockets websocket
    * @param message 消息
    * @return 结果
    */
    default boolean sendMessage(Collection<WebSocket> webSockets, String message){
       if(CollectionUtils.isEmpty(webSockets) || message == null){
           return false;
       }
       for (WebSocket webSocket : webSockets) {
           sendMessage(webSocket, message);
       }
       return true;
   }

   /**
    * 发送消息
    * @param webSocket websocket
    * @param message 消息
    * @return 发送结果
    */
    default boolean sendMessage(WebSocket webSocket, String message) {
       if (webSocket == null || message == null) {
           return false;
       }
       WsUserDto dto = webSocket.getAttachment();
       if (!BooleanUtils.isTrue(dto.getEnabled())) {
           return false;
       }
       if (webSocket.isOpen()) {
           try {
               webSocket.send(message);
               return true;
           }catch (Exception e){
               LOGGER.error(e.getMessage(), e);
               return false;
           }
       }
       return false;
   }

   /**
    * 发送心跳数据
    */
    void sendHeartbeat();
   }

6.聊天业务处理

聊天业务处理时WebSocketBusinessService接口的其中一个实现类,代码如下:
package com.lylp.sys.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.lylp.common.utils.CommUtil;
import com.lylp.common.utils.DateUtils;
import com.lylp.sys.dao.dto.WsUserDto;
import com.lylp.sys.service.WebSocketBusinessService;
import com.lylp.sys.dao.dto.WebSocketImDto;
import com.lylp.sys.dao.entity.ImMessage;
import com.lylp.sys.service.IImMessageService;
import org.apache.commons.lang3.BooleanUtils;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 聊天服务
 *
 * @author 
 * @date 2021/7/1 16:12
 */
@Service("webSocketIm")
public class WebSocketImServiceImpl implements WebSocketBusinessService {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
 * 存储在线的WebSocket
 */
 private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

    @Autowired
 	private IImMessageService iImMessageService;

    @Override
	 public void addSession(WebSocket webSocket) {
        WsUserDto dto = webSocket.getAttachment();
        if (webSocketSet.containsKey(dto.getUserId())) {
            WebSocket old = webSocketSet.get(dto.getUserId());
            if (old != null) {
                WsUserDto oldDto = old.getAttachment();
                oldDto.setEnabled(false);
                if (old.isOpen()) {
                    old.close();
                }
            }
        }
        webSocketSet.put(dto.getUserId(), webSocket);
        logger.debug("webSocketIm online " + webSocketSet.size());
        //获取当前在线部门
 		sendOnlineOrgMessage();
    }

    @Override
 	public void removeSession(WebSocket webSocket) {
        WsUserDto userDto = webSocket.getAttachment();
        if (!BooleanUtils.isTrue(userDto.getEnabled())) {
            return;
        }
        WebSocketImDto dto = new WebSocketImDto();
        dto.setObjuid(userDto.getOrgId());
        dto.setSendOrgId(userDto.getOrgId());
        dto.setSendOrgName(userDto.getOrgName());

        String key = userDto.getUserId();
        if (webSocketSet.containsKey(key)) {
            WebSocket old = webSocketSet.get(key);
            if (old == null) {
                webSocketSet.remove(key);
            } else if (userDto.getSessionId().equals(((WsUserDto) old.getAttachment()).getSessionId())) {
                webSocketSet.remove(key);
            }
        }

        if (!isOnline(dto)) {
            sendOnlineOrgMessage();
        }
    }

    @Override
 	@Transactional(rollbackFor = Exception.class)
    public void onMessage(WebSocket webSocket, String message) {
        WebSocketImDto dto = JSON.parseObject(message, WebSocketImDto.class);
        //标记已读
 		if (dto.getType().equals("3")) {
            ImMessage msg = new ImMessage();
            msg.setRead(true);
            EntityWrapper<ImMessage> wrapper = new EntityWrapper<>();
            wrapper.eq("objuid", dto.getObjuid());
            iImMessageService.update(msg, wrapper);
            String msgInfo = JSON.toJSONString(dto);
            sendInfoOrg(msgInfo, dto.getReceiveOrgId(), dto.getSendOrgId());
        }
        //获取在线单位
 		else if (dto.getType().equals("2")) {
            sendOnlineOrgMessage();
        }
        //向单位发送消息
		 else if (dto.getType().equals("1")) {
            WsUserDto userDto = webSocket.getAttachment();
            ImMessage im = new ImMessage();
            im.setSend(true);
            im.setRead(false);
            im.setSuccess(true);
            im.setSendOrgId(userDto.getOrgId());
            im.setSendUserId(userDto.getUserId());
            im.setCreateTime(DateUtils.getNowTimeDate());
            im.setObjuid(CommUtil.getUUID());
            im.setReceiveOrgId(dto.getReceiveOrgId());
            im.setContent(dto.getContent());
            im.setSendOrgName(userDto.getOrgName());
            im.setReceiveOrgName(dto.getReceiveOrgName());
            im.setSendUserName(userDto.getUserName());
            iImMessageService.insert(im);
            BeanUtils.copyProperties(im, dto);

            //获取接收部门
 			String orgId = im.getReceiveOrgId();
            String msg = JSON.toJSONString(dto);
            sendInfoOrg(msg, orgId, dto.getSendOrgId());
        }
    }

    @Override
 	public void onError(WebSocket webSocket, Exception e) {
        logger.error(e.getMessage(), e);
        if (webSocket.isOpen()) {
            webSocket.close();
        } else {
            removeSession(webSocket);
        }
    }

    @Override
 	public void sendHeartbeat() {
        logger.debug("即时通讯,websocket发送心跳,在线数:" + webSocketSet.size());
        this.sendMessage(webSocketSet.values(), "chat heartbeat");
    }

    /**
	 * 判断单位是否在线
	 *
	 * @param dto 单位信息
	 * @return 是否在线
	 */
	 private boolean isOnline(WebSocketImDto dto) {
        for (WebSocket webSocket : webSocketSet.values()) {
            if (webSocket == null) {
                continue;
            }
            WsUserDto userDto = webSocket.getAttachment();
            if (userDto.getOrgId().equals(dto.getObjuid())) {
                return true;
            }
        }
        return false;
    }

    /**
	 * 向某个单位的所有用户发送消息
	 *
	 * @param message 消息
	 * @param orgId 接收单位
	 * @param sendOrgId 发送单位
	 * @return 结果
	 */
	 private void sendInfoOrg(String message, String orgId, String sendOrgId) {
        for (WebSocket webSocket : webSocketSet.values()) {
            if (webSocket == null) {
                continue;
            }
            WsUserDto userDto = webSocket.getAttachment();
            if (userDto.getOrgId().equals(orgId) || userDto.getOrgId().equals(sendOrgId)) {
                sendMessage(webSocket, message);
            }
        }
    }

    /**
	 * 发送在线单位消息
	 */
	 private void sendOnlineOrgMessage() {
        Map<String, Object> map = new HashMap<>(8);
        Set<WebSocketImDto> set = getOnlineOrg();
        map.put("type", "2");
        map.put("date", set);
        sendMessage(webSocketSet.values(), JSON.toJSONString(map));
    }

    /**
	 * 获取所有缓存的在线机构信息
	 *
	 * @return 在线机构信息
	 */
	 private Set<WebSocketImDto> getOnlineOrg() {
        Set<WebSocketImDto> set = new HashSet<>();
        for (WebSocket webSocket : webSocketSet.values()) {
            if (webSocket == null) {
                continue;
            }
            WsUserDto userDto = webSocket.getAttachment();
            WebSocketImDto org = new WebSocketImDto();
            org.setObjuid(userDto.getOrgId());
            org.setOrgName(userDto.getOrgName());
            set.add(org);
        }
        return set;
    }

}

7.消息提醒业务处理

消息提醒业务处理时WebSocketBusinessService接口的其中一个实现类,代码如下:
package com.lylp.sys.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.lylp.sys.dao.dto.SysUserAccountDto;
import com.lylp.sys.dao.dto.WsUserDto;
import com.lylp.sys.dao.entity.SysMessageEntity;
import com.lylp.sys.service.ShiroService;
import com.lylp.sys.service.SysMessageService;
import com.lylp.sys.service.WebSocketBusinessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 消息提醒业务处理
 *
 * @author 
 * @date 2021/7/1 14:26
 */
@Service("websocketMsgNotice")
public class SysWsMsgNoticeServiceImpl implements WebSocketBusinessService {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private static ConcurrentHashMap<String, WebSocket> sessionMap = new ConcurrentHashMap<>();

    @Autowired
 	private SysMessageService sysMessageService;
    @Autowired
 	private ShiroService shiroService;

    @Override
	 public void addSession(WebSocket webSocket) {
        //同一个用户保存一个websocket
		 WsUserDto dto = webSocket.getAttachment();
        //如果用户已经存在websocket,则将之前的websocket关闭,使用新的websocket
		 if (sessionMap.containsKey(dto.getUserId())) {
            //将原来的websocket设置为不可用,并关闭websocket
 		WebSocket old = sessionMap.remove(dto.getUserId());
            if (old != null) {
                WsUserDto oldDto = old.getAttachment();
                oldDto.setEnabled(false);
                dto.setPermissions(((WsUserDto) old.getAttachment()).getPermissions());
                if (old.isOpen()) {
                    old.close();
                }
            }
        } else {
            try {
                dto.setPermissions(shiroService.getUserPermissionsByToken(dto.getToken()));
            } catch (Exception e) {
                dto.setPermissions(new HashSet<>());
                logger.error("获取用户权限失败", e);
            }
        }
        sessionMap.put(dto.getUserId(), webSocket);
        logger.debug("SysWsMsgNotice online " + sessionMap.size());
        try {
            EntityWrapper<SysMessageEntity> wrapper = new EntityWrapper<>();
            wrapper.eq("msg_state", 1);
            List<SysMessageEntity> list = sysMessageService.list(wrapper, this.getAccount(dto));
            if (!CollectionUtils.isEmpty(list)) {
                this.sendMessage(webSocket, JSON.toJSONString(list));
                // //修改推送状态 这里暂时不修改了,推送到前台后,点击阅读了再修改推送状态
 				sysMessageService.updateSendState(list);
            }
        } catch (Exception e) {
            logger.error("消息初始化发送失败", e);
        }
    }

    @Override
 	public void removeSession(WebSocket webSocket) {
        WsUserDto dto = webSocket.getAttachment();
        if (!BooleanUtils.isTrue(dto.getEnabled())) {
            return;
        }
        String key = dto.getUserId();
        if (sessionMap.containsKey(key)) {
            WebSocket old = sessionMap.get(key);
            if (old == null) {
                sessionMap.remove(key);
            } else if (dto.getSessionId().equals(((WsUserDto) old.getAttachment()).getSessionId())) {
                sessionMap.remove(key);
            }
        }
    }

    @Override
 	public void onMessage(WebSocket webSocket, String message) {

    }

    @Override
 	public void onError(WebSocket webSocket, Exception e) {
        WsUserDto dto = webSocket.getAttachment();
        logger.error("websocketMsgNotice [" + dto.getUserId() + "-" + dto.getUserName() + "] websocket on error. close connection.", e);
        if (webSocket.isOpen()) {
            webSocket.close();
        } else {
            removeSession(webSocket);
        }
    }

    /**
	 * 获取用户信息
	 *
	 * @param dto dto
	 * @return 用户信息
	 */
	 private SysUserAccountDto getAccount(WsUserDto dto) {
        SysUserAccountDto user = new SysUserAccountDto();
        user.setUserId(dto.getUserId());
        user.setOrgId(dto.getOrgId());
        user.setPermissions(dto.getPermissions());
        return user;
    }

    @Override
 	public void sendHeartbeat() {
        this.sendMessage(sessionMap.values(), "message heartbeat");
    }

    /**
	 * 定时推送消息
	 */
	 @Scheduled(cron = "1 * * * * ?")
    public void pushMsg() {
        try {
            for (WebSocket webSocket : sessionMap.values()) {
                if (webSocket == null) {
                    continue;
                }
                WsUserDto dto = webSocket.getAttachment();
                EntityWrapper<SysMessageEntity> wrapper = new EntityWrapper<>();
                wrapper.in("send_state", 0, 2);
                List<SysMessageEntity> list = sysMessageService.list(wrapper, this.getAccount(dto));
                if (list != null && list.size() > 0) {
                    this.sendMessage(webSocket, JSON.toJSONString(list));
                    //修改推送状态
					 sysMessageService.updateSendState(list);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-12-01 18:03:38  更:2021-12-01 18:04:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/7 5:59:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码