Java 使用Websocket 与MQ消息队列实现即时消息
项目需求:根据不同用户账号产生的数据需要即时展示到首页大屏中进行展示,实现方式
1:前端短时间内轮训调用后端接口,后端返回最新相关数据进行展示
2:使用websocket即时通信,一产生新数据,就立即发送。数据产生有MQ进行推送,保证实时性
第一种方式舍弃,频繁请求接口,大部分请求都无效请求,成本过大
实现思路:
1:建立websocket连接,缓存连接用户信息,使用session,保证即时同账号不同登录页也能接收
2:使用MQ,监听MQ产生的推送消息topic
3: MQ监听消息处理类接收消息,根据消息处理业务情况,并根据数据筛选出需要推送到所属用户
4:保持在线用户连接,定时任务每30秒发送缓存内还保持连接的用户心跳数据
技术选型使用:netty-websocket
详细说明查看:
https://gitee.com/Yeauty/netty-websocket-spring-boot-starter
websocket在线测试工具,可在线测试:
http://coolaf.com/tool/chattest
前言
在实际开发使用过程中,产线环境都是使用HTTPS 以及配合 Nginx进行使用,
但是在测试环境下,自己则是通过ws 的方式进行连接测试,即:ws://IP地址 + 端口号/websocket
所以关于HTTPS下使用 wss 协议的问题,以及配合 Nginx 使用域名方式建立连接
不使用 IP地址 + 端口号 连接 WebSocket,因为这种方式不够优雅
ws 和 wss 又是什么鬼?
Websocket使用 ws 或 wss 的统一资源标志符,类似于 HTTP 或 HTTPS
其中 wss 表示在 TLS 之上的 Websocket ,相当于 HTTPS 了
如:
ws://example.com/Websocket
wss://example.com/Websocket
默认情况下,Websocket 的 ws 协议使用 80 端口;运行在TLS之上时,wss 协议默认使用 443 端口。其实说白了,wss 就是 ws 基于 SSL 的安全传输,与 HTTPS 一样样的道理。
如果你的网站是 HTTPS 协议的,那你就不能使用 ws:// 了
浏览器会 block 掉连接,和 HTTPS 下不允许 HTTP 请求一样,如下图:
Mixed Content: The page at 'https://domain.com/' was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://x.x.x.x:xxxx/'. This request has been blocked; this endpoint must be available over WSS.
这种情况,我们就需要使用 wss:\\ 安全协议了
如果把ws的方式来去使用wss的时候
VM512:35 WebSocket connection to 'wss://IP地址:端口号/websocket' failed: Error in connection establishment: net::ERR_SSL_PROTOCOL_ERROR
很明显 SSL 协议错误,说明就是证书问题了。
这时候我们一直拿的是 IP地址 + 端口号 这种方式连接 WebSocket 的
这没有证书存在,生产环境不可能用 IP地址 + 端口号 这种方式连接 WebSocket 的
要用域名方式连接 WebSocket 。
Nginx 配置域名支持 WSS
Nginx配置 HTTPS 域名位置加入配置:
location /websocket {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
接着拿域名再次连接试一下,不出意外会看 101 状态码:
这样就完成了在 HTTPPS 下以域名方式连接 WebSocket
接下来直接上代码
Maven导包
<dependency>
<groupId>org.yeauty</groupId>
<artifactId>netty-websocket-spring-boot-starter</artifactId>
<version>0.12.0</version>
</dependency>
JAVA代码
websocket处理类
package com.biz.controller.home;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.redis.provider.impl.StringRedisProvider;
import com.core.constant.SecurityConstant;
import com.security.def.BoyunLoginUser;
import com.security.def.BoyunUserDTO;
import com.biz.def.UserInfoWebSocket;
import com.biz.def.WebSocketServerDto;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.yeauty.annotation.*;
import org.yeauty.pojo.Session;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ServerEndpoint(path = "/ws/{sid}", port = "9011")
public class MyWebSocket {
public static final AtomicInteger ONLINE_NUM = new AtomicInteger(0);
private static ConcurrentHashMap<String, WebSocketServerDto> webSocketMap = new ConcurrentHashMap<>();
@Autowired
private StringRedisProvider stringRedisProvider;
@BeforeHandshake
public void handshake(Session session) {
session.setSubprotocols("stomp");
}
@OnOpen
public void onOpen(Session session, @PathVariable String sid) {
ONLINE_NUM.incrementAndGet();
log.info("{}连接成功,当前在线数量:{}", sid, ONLINE_NUM.get());
if (StringUtils.isBlank(sid)) {
session.close();
return;
}
log.info(">>>>>>>>>>>>>>>>> sid:{}", sid);
session.setAttribute(SecurityConstant.TOKEN, sid);
log.info(">>>>>>>>>>>>>>>>> sid2:{}", sid);
String value = stringRedisProvider.get(SecurityConstant.PROJECT_PREFIX + sid);
log.info(">>>>>>>>>>>>>>>>> value:{}", value);
if (StringUtils.isBlank(value)) {
log.debug("{} 未登录", sid);
session.close();
return;
}
BoyunLoginUser<BoyunUserDTO> tokenUser = JSON.parseObject(value, new TypeReference<BoyunLoginUser<BoyunUserDTO>>() {
});
log.info(">>>>>>>>>>>>>>>>> tokenUser:{}", tokenUser.toString());
Long userId = tokenUser.getUser().getUserId();
session.setAttribute(SecurityConstant.INNER_USER_ID, userId);
WebSocketServerDto webSocketServerDto = webSocketMap.get(sid);
if (webSocketServerDto != null) {
log.info("关闭");
webSocketServerDto.getSession().close();
}
webSocketServerDto = new WebSocketServerDto();
webSocketServerDto.setSession(session);
webSocketMap.put(sid, webSocketServerDto);
UserInfoWebSocket.setSessionIdMap(userId, sid);
}
@OnClose
public void onClose(Session session) throws IOException {
ONLINE_NUM.decrementAndGet();
String sid = session.getAttribute(SecurityConstant.TOKEN);
Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);
log.info("{} 连接关闭,在线数量:{}", sid, ONLINE_NUM.get());
webSocketMap.remove(sid);
UserInfoWebSocket.delSessionIdMap(userId, sid);
}
@OnError
public void onError(Session session, Throwable throwable) {
log.error("发生错误的连接:{},userId:{}", session.getAttribute(SecurityConstant.TOKEN), session.getAttribute(SecurityConstant.INNER_USER_ID));
}
@SneakyThrows
@OnMessage
public void onMessage(Session session, String message) {
if (!"123456789".equals(message)) {
}
}
@OnBinary
public void onBinary(Session session, byte[] bytes) {
if (!"123456789".equals(new String(bytes))) {
log.info("收到客户端:{} 的消息:{}", session.getAttribute(SecurityConstant.TOKEN), new String(bytes));
}
}
@OnEvent
public void onEvent(Session session, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
switch (idleStateEvent.state()) {
case READER_IDLE:
log.info("读数据闲置");
break;
case WRITER_IDLE:
log.info("写数据闲置");
break;
case ALL_IDLE:
log.info("读、写数据闲置");
break;
default:
break;
}
}
}
public static void sendInfo(String message, String sid) {
if (StringUtils.isAnyBlank(message, sid)) {
log.debug("参数错误,sid:{},message:{}", sid, message);
}
log.debug("推送消息到窗口{},推送内容: {}", sid, message);
WebSocketServerDto webSocketServer = webSocketMap.get(sid);
if (webSocketServer != null) {
Session session = webSocketServer.getSession();
if (session == null) {
log.error("{} 不存在session", sid);
return;
}
if (!session.isActive()) {
Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);
webSocketMap.remove(sid);
UserInfoWebSocket.delSessionIdMap(userId, sid);
}
session.sendText(message);
}
}
}
WebSocket 需要推送的用户信息缓存
package com.biz.def;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
public class UserInfoWebSocket {
private static Map<Long, Set<String>> cachedMap = new HashMap<>();
private static ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
public static Set<String> getSessionIdSet(Long userId) {
rwlock.readLock().lock();
try {
return cachedMap.get(userId);
} finally {
rwlock.readLock().unlock();
}
}
public static Set<String> getSessionIdSetByUserIdList(List<Long> userIdList) {
rwlock.readLock().lock();
Set<String> result = new HashSet<>();
try {
log.info("<<<<<<<<<<<<<<<<<<<< cachedMap:{}",cachedMap);
for (Long userId : userIdList) {
Set<String> sidList = cachedMap.get(userId);
if (sidList != null && !sidList.isEmpty()) {
result.addAll(sidList);
}
}
} finally {
rwlock.readLock().unlock();
}
return result;
}
public static Set<String> getSessionIdSet() {
rwlock.readLock().lock();
Set<String> result = new HashSet<>();
try {
for (Map.Entry<Long, Set<String>> longSetEntry : cachedMap.entrySet()) {
Set<String> sidList = longSetEntry.getValue();
result.addAll(sidList);
}
} finally {
rwlock.readLock().unlock();
}
return result;
}
public static void setSessionIdMap(Long userId, String sid) {
rwlock.writeLock().lock();
try {
Set<String> list = cachedMap.get(userId);
if (list == null) {
list = new HashSet<>();
}
list.add(sid);
cachedMap.put(userId, list);
} finally {
rwlock.writeLock().unlock();
}
}
public static void delSessionIdMap(Long userId, String sid) {
rwlock.writeLock().lock();
try {
Set<String> set = cachedMap.get(userId);
if (set == null) {
return;
}
set.remove(sid);
if (set.isEmpty()) {
cachedMap.remove(userId);
} else {
cachedMap.put(userId, set);
}
} finally {
rwlock.writeLock().unlock();
}
}
public static void delUserId(Long userId) {
rwlock.writeLock().lock();
try {
cachedMap.remove(userId);
} finally {
rwlock.writeLock().unlock();
}
}
}
webSocket传输对象
package com.biz.def;
import org.yeauty.pojo.Session;
import java.util.Date;
import java.util.Objects;
public class WebSocketServerDto {
private Session session;
private String sid;
private Date heartTime;
public WebSocketServerDto() {
}
public WebSocketServerDto(Session session, String sid, Date heartTime) {
this.session = session;
this.sid = sid;
this.heartTime = heartTime;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public Date getHeartTime() {
return heartTime;
}
public void setHeartTime(Date heartTime) {
this.heartTime = heartTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WebSocketServerDto that = (WebSocketServerDto) o;
return Objects.equals(session, that.session) &&
Objects.equals(sid, that.sid) &&
Objects.equals(heartTime, that.heartTime);
}
@Override
public int hashCode() {
return Objects.hash(session, sid, heartTime);
}
@Override
public String toString() {
return "WebSocketServerDto{" +
"session=" + session +
", sid='" + sid + '\'' +
", heartTime=" + heartTime +
'}';
}
}
websocket的推送消息体
package com.bsj.studentcard.upms.pc.biz.def;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class WsMsgDataVO<T> {
private T data;
private String tag;
public WsMsgDataVO(T data, String tag) {
this.data = data;
this.tag = tag;
}
}
MQ的topic监听
package com.bsj.studentcard.upms.pc.biz.config.mq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MySource {
String ALARM_SOS = "alarmSos";
@Input(ALARM_SOS)
SubscribableChannel alarmSos();
}
MQ监听消息处理websock发送
package com.bsj.studentcard.upms.pc.biz.config.mq;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.bsj.studentcard.upms.pc.biz.controller.home.MyWebSocket;
import com.bsj.studentcard.upms.pc.biz.def.UserInfoWebSocket;
import com.bsj.studentcard.upms.pc.biz.def.WsMsgDataVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeTypeUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class MqSenderService {
private final MySource source;
@StreamListener(MySource.ALARM_SOS)
public void sosAlarm(Message<String> message) {
String result = message.getPayload();
if (StrUtil.isEmpty(result)) {
log.warn("检测到报警消息为空");
return;
}
List<AlarmLogDTO> oCardCallBackVOS = JSONObject.parseArray(result, AlarmLogDTO.class);
if (CollUtil.isEmpty(oCardCallBackVOS)) {
log.warn("检测到SOS报警消息为空");
return;
}
log.info("监听到sos报警输出为:{}", result);
List<Long> cardIds = oCardCallBackVOS.stream().map(AlarmLogDTO::getCardId)
.distinct().collect(Collectors.toList());
List<UserDataVO> userDataVOS = CommonBaseCacheForest.listTopUserList(cardIds);
HashMap<Long, UserDataVO> commonMap = userDataVOS.stream().collect(Collectors.toMap(UserDataVO::getCardId,
Function.identity(), (key1, key2) -> key2, HashMap::new));
for (AlarmLogDTO callBack : oCardCallBackVOS) {
Long cardId = callBack.getCardId();
UserDataVO userDataVO = commonMap.get(cardId);
List<Long> userIds = userDataVO.getUserId();
if (CollUtil.isEmpty(userIds)) {
log.info("该数据不属于任何用户,cardId:{}", cardId);
continue;
}
Set<String> sidSet = UserInfoWebSocket.getSessionIdSetByUserIdList(userIds);
if (CollUtil.isEmpty(sidSet)) {
log.info("当前用户没一个登录,不发送");
continue;
}
OAlarmLogPageVO oAlarmLogPageVO = formatData(callBack, commonDTO);
WsMsgDataVO<OAlarmLogPageVO> msg = new WsMsgDataVO<OAlarmLogPageVO>(oAlarmLogPageVO, AlarmTypeEnum.SOS_ALARM.name());
sidSet.forEach(sid -> MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));
}
}
@Scheduled(cron = "*/30 * * * * ?")
public void keepHeartbeat() {
Set<String> sidSet = UserInfoWebSocket.getSessionIdSet();
WsMsgDataVO<String> msg = new WsMsgDataVO<String>("心跳连接", "heartBeat");
sidSet.forEach(sid -> MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));
}
}
|