????????由于项目是集群部署,需要实现对websocket的session共享,可websocket的session无法序列化,不能存放到Redis当中,因此我们可以把websocket的session存放在服务器的map上,通过Redis的广播把消息发送到指定的频道上,每个服务器节点都订阅该频道,从而消息一经发布都能收到再从map中获取session完成消息的推送
1、引入所需依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、所需常量类(根据需求自定义)
package com.zzw.redops.utils;
/**
* @description: 常量类
* @dateTime: 2021/6/17 16:21
*/
public class Constants {
/**
* UTF-8 字符集
*/
public static final String UTF8 = "UTF-8";
/** redis 订阅消息通道标识*/
public final static String REDIS_CHANNEL = "onMessage";
public final static String REDIS_CHANNEL_CLOSE="close";
public final static String REDIS_CHANNEL_SEND="send";
/** 消息体的key*/
public final static String REDIS_MESSAGE_KEY = "KEY";
/** 消息体的值*/
public final static String REDIS_MESSAGE_VALUE = "VALUE";
}
3、Redis配置文件
package com.zzw.redops.utils;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<String,Object>();
redisTemplate.setConnectionFactory(factory);
// json序列化
Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jsonRedisSerializer.setObjectMapper(om);
// String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// 设置redis各种数据类型的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
// redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定
// 该消息监听器通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_CHANNEL));
container.addMessageListener(listenerAdapter,new PatternTopic(Constants.REDIS_CHANNEL_CLOSE));
container.addMessageListener(listenerAdapter,new PatternTopic(Constants.REDIS_CHANNEL_SEND));
return container;
}
// 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
// 消息监听适配器
return new MessageListenerAdapter(receiver, "onMessage");
}
}
4、websocket配置文件
package com.zzw.redops.utils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Date:2021/5/26 19:09
*/
/**
* 开启WebSocket支持
*/
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter 作用
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
5、WebSocketSession实体类用来存放session信息
package com.zzw.redops.utils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.websocket.Session;
/**
* @description: 客户端对象
* @date 2021/11/2 9:30
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebSocketSession {
/**
* sessionID
*/
private String clientId;
/**
* 用户ID
*/
private String userId;
/**
* webSocket会话
*/
private Session session;
}
6、websocket服务类
package com.zzw.redops.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zzw.redops.common.utils.ResultVO;
import com.zzw.redops.service.RocketApiService;
import com.zzw.redops.vo.jurisdiction.ResultBack;
import com.zzw.redops.vo.jurisdiction.RocketUserLogin;
import com.zzw.redops.vo.rocket.Message;
import com.zzw.redops.vo.rocket.Statistics;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @Date:2021/5/26 19:12
*/
@Slf4j
@Component
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
*/
private static int onlineCount = 0;
/**
* concurrent 包的线程安全Set,用来存放每个客户端对应的 myWebSocket对象
* 根据userId来获取对应的 WebSocket
*/
private static ConcurrentHashMap<String, WebSocketSession> webSocketMap = new ConcurrentHashMap<>();
/**
* 注入service
*/
RocketApiService rocketApiService = SpringUtils.getBean(RocketApiService.class);
StringRedisTemplate stringRedisTemplate=SpringUtils.getBean(StringRedisTemplate.class);
/**
* 连接建立成功调用的方法
* @param session
* @param userId
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
try {
String clientId = session.getId();
WebSocketSession socketSession = WebSocketSession.builder()
.userId(userId)
.clientId(clientId)
.session(session)
.build();
webSocketMap.put(clientId, socketSession);
addOnlineCount(); // 在线数 +1
// 向Redis中存入用户ID
stringRedisTemplate.opsForValue().set("online_"+userId,userId);
log.info("有新窗口开始监听:" + userId + ",当前在线人数为" + getOnlineCount());
// 获取当前登录的用户名
String name = rocketApiService.getRocketUserNameByUserId(userId);
JSONObject jsonObject=new JSONObject();
jsonObject.put("description",name+"上线了");
jsonObject.put("message","上线通知");
ResultBack success = ResultUtils.rocketSuccess(true, "notice",userId,jsonObject);
// 获取在线用户ID
List<String> onLineUserIds = getOnLineUserIds();
log.info("连接建立后在线用户ID:{}",onLineUserIds);
// 推送消息,除当前用户
for (String item:onLineUserIds){
if (!userId.equals(item)){
sendMessage(item,JSONObject.toJSONString(success));
log.info("上线通知推送用户ID:{}",item);
}
}
log.info("新用户连接后现有sessionID:{}",webSocketMap.keySet());
} catch (Exception e) {
log.error("onOpen Exception",e);
}
}
/**
* 关闭连接
*/
@OnClose
public void onClose(Session session) {
Map<String,String> map=new HashMap<>();
map.put(Constants.REDIS_MESSAGE_KEY,session.getId());
map.put("userId",webSocketMap.get(session.getId()).getUserId());
// 广播消息
stringRedisTemplate.convertAndSend(Constants.REDIS_CHANNEL_CLOSE,JSON.toJSONString(map));
}
// 关闭
public void close(String sessionId, String userId){
try {
// 获取session对象
WebSocketSession webSocketSession = webSocketMap.get(sessionId);
if (webSocketSession == null){
log.info("close获取session为null");
return;
}
if (!webSocketSession.getUserId().equals(userId)){
log.info("close中userId不一致");
return;
}
// 删除缓存
webSocketMap.remove(sessionId);
// 删除在线用户ID缓存
Boolean delete = stringRedisTemplate.delete("online_" + userId);
log.info("连接关闭后删除在线用户ID缓存返回值:{}",delete);
// 人数 -1
subOnlineCount();
log.info("有一连接关闭,当前在线人数为:" + getOnlineCount());
// 获取当前退出的用户名
String name = rocketApiService.getRocketUserNameByUserId(userId);
JSONObject jsonObject=new JSONObject();
jsonObject.put("description",name+"下线了");
jsonObject.put("message","下线通知");
ResultBack success = ResultUtils.rocketSuccess(true, "notice",userId,jsonObject);
// 获取在线用户ID
List<String> ids = getOnLineUserIds();
log.info("连接关闭后在线用户ID:{}",ids);
// 推送消息,除当前用户
for (String item:ids){
if (!userId.equals(item)){
sendMessage(item,JSONObject.toJSONString(success));
log.info("下线通知推送用户ID:{}",item);
}
}
log.info("用户关闭后现有sessionID:{}",webSocketMap.keySet());
}catch (Exception e){
log.error("websocket断开连接 expection:",e);
}
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("现有sessionID:{}",webSocketMap.keySet());
// 获取当前用户ID
String userId = webSocketMap.get(session.getId()).getUserId();
log.info("收到来自窗口" + userId + "的信息:" + message);
sendMessageByRedis(session.getId(),message,userId);
}
// Redis广播消息
public void sendMessageByRedis(String key, String message, String userId){
if (key == null){
log.info("Redis广播消息key为null");
return;
}
String newMessge= null;
try {
newMessge = new String(message.getBytes(Constants.UTF8), Constants.UTF8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
Map<String,String> map = new HashMap<String, String>();
map.put(Constants.REDIS_MESSAGE_KEY, key);
map.put(Constants.REDIS_MESSAGE_VALUE, newMessge);
map.put("userId",userId);
// 广播
stringRedisTemplate.convertAndSend(Constants.REDIS_CHANNEL,JSON.toJSONString(map));
log.info("Redis广播消息key:{} message:{} userId:{} 成功",key,message,userId);
}
@OnError
public void onError(Session session,Throwable error) {
if (null == webSocketMap.get(session.getId())){
log.info("用户错误,已下线");
return;
}
log.error("用户错误:" + webSocketMap.get(session.getId()).getUserId() + ",原因:" + error.getMessage());
error.printStackTrace();
}
// 广播消息
public void sendMessage(String userId,String message){
Map<String,String> map=new HashMap<>();
map.put("userId",userId);
map.put("message",message);
map.put("flag","send");
stringRedisTemplate.convertAndSend(Constants.REDIS_CHANNEL_SEND,JSONObject.toJSONString(map));
}
public void sendToFront(String userId,String message) throws IOException {
log.info("sendToFront入参:userId:{}",userId);
WebSocketSession webSocketSession = getWebSocketSession(userId);
log.info("getWebSocketSession返回值:{}",webSocketSession);
if (null != webSocketSession){
sendMessage(webSocketSession.getSession(),message);
}
}
public void sendMessage(Session session,String message) throws IOException {
if (null == session){
return;
}
synchronized (session){
session.getBasicRemote().sendText(message);
}
}
// 根据用户ID获取session
public WebSocketSession getWebSocketSession(String userId) {
if (userId == null){
return null;
}
for (String key : webSocketMap.keySet()) {
WebSocketSession client = webSocketMap.get(key);
if (userId.equals(client.getUserId())) {
return client;
}
}
return null;
}
private static synchronized int getOnlineCount() {
return onlineCount;
}
private static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
private static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
在onMessage、onClose、sendMessage方法中将消息广播到Redis对应的频道中,监听到消息后进行相应的业务处理再把结果推送给客户端
7、RedisReceiver(消息监听对象,接收订阅消息进行处理)
package com.zzw.redops.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zzw.redops.vo.jurisdiction.ResultBack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
/**
* 消息监听对象,接收订阅消息
*/
@Component
public class RedisReceiver implements MessageListener {
Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private WebSocketServer webSocketServer;
/**
* 处理接收到的订阅消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取订阅的频道名称
String channel = new String(message.getChannel());
String msg = "";
try {
// 注意与发布消息编码一致,否则会乱码
msg = new String(message.getBody(), Constants.UTF8);
if (StringUtils.isNotEmpty(msg)){
if (Constants.REDIS_CHANNEL_CLOSE.endsWith(channel)){
// websocket连接关闭的消息
JSONObject json = JSON.parseObject(msg);
// 获取用户ID
String userId = json.getString("userId");
// 获取sessionID
String sessionId = json.getString(Constants.REDIS_MESSAGE_KEY);
// 调用关闭接口
webSocketServer.close(sessionId,userId);
}else if (Constants.REDIS_CHANNEL_SEND.endsWith(channel)){
// 向客户端推向消息的接口
JSONObject json = JSONObject.parseObject(msg);
if (json.getString("flag") != null){
if ("send".equals(json.getString("flag"))){
webSocketServer.sendToFront(json.getString("userId"),json.getString("message"));
}
}else {
log.info("其它消息,不予处理");
}
}
}else{
log.info("消息内容为空,不处理。");
}
} catch (Exception e) {
log.error("处理消息异常:"+e.toString());
e.printStackTrace();
}
}
}
????????省去了业务逻辑的代码,这是根据需求而异的,大家自行发挥即可;最后说一下我对这个模式的理解,刚开始做的时候也是半知半解,请教了多次公司的大佬才完成,对于为什么这样做我是这么想的:假如我们部署了A、B两台服务,有张三和李四两个用户在一个聊天室C中,张三请求到A服务上,他的信息保存在A上,李四请求到B服务信息保存在B上,如果不做session的共享,这时张三发了一条信息请求到B服务上,处理完之后需要向聊天室C中推送消息,B上面没有张三的session信息所以就无法推送成功,也就不能实现消息的实时传输;当我们通过Redis广播之后,A、B两台服务都会收到信息进行处理,在各自的map中获取张三的session信息,拿到之后就向聊天室中推送消息,拿不到就说明张三的信息不在此服务上不予处理;我们可以在业务接口中先获取session信息,拿不到就不处理,这样可以节省资源
? ? ? ? 还有一种方式是我们可以在websocket开启和关闭的时候广播到所有服务器上,这时每台服务器都有相同的session信息,而onMessage不广播消息,这样它无论请求到哪台服务器上都能进行处理
? ? ? ? 有什么错误的地方,欢迎各位指正!
|