maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocketConfig:
@Component
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
WebSocketServer:
- 因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller ;
- 直接@ServerEndpoint("/imserver/{userId}") 、@Component启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。
- 新建一个ConcurrentHashMap webSocketMap 用于接收当前userId的WebSocket,方便IM之间对userId进行推送消息。单机版实现到这里就可以。
- 集群版 需要借助MQ或者redis发布订阅,把消息发送到第三方;多台服务器同时消费该条消息,哪台服务器上有这个WebSocket那台服务器就推送消息(因为webSocket中session是接口无法序列化)
- 前端url ws://172.16.15.44:8080/webSocket/参数 注意 https 需要把 ws 改为 wss
这里采用的是redis发布订阅模式
redis配置请参考 :
https://blog.csdn.net/weixin_46841515/article/details/121190753
@Component
@ServerEndpoint("/webSocket/{userno}")
@Slf4j
public class WebSocketService extends MessageListenerAdapter {
private Session session;
@Autowired
private StringRedisTemplate stringRedisTemplate = SpringUtils.getBean(StringRedisTemplate.class);
private static ConcurrentHashMap<String,ConcurrentHashMap<String,Session>> concurrentHashMap = new ConcurrentHashMap<>();
@OnOpen
public void onOpen (@PathParam(value = "userno") String param, Session session){
this.session = session;
ConcurrentHashMap<String,Session> sessionMap = (null==concurrentHashMap.get(param))? new ConcurrentHashMap<>():concurrentHashMap.get(param);
sessionMap.put(session.getId(),session);
concurrentHashMap.put(param,sessionMap);
}
@OnClose
public void onClose (Session session){
concurrentHashMap.remove(session.getId());
}
@OnMessage
public void onMessage(String message,Session session){
try {
String redisKey = "webScoket_heartbeat_"+session.getId();
stringRedisTemplate.opsForValue().set(redisKey,String.valueOf(new Date().getTime()),10, TimeUnit.SECONDS);
String[] split = message.split("_");
if (0<split.length){
JSONObject jsonObject = new JSONObject();
jsonObject.put("examId",split[0]);
jsonObject.put("message",message);
stringRedisTemplate.convertAndSend("webScoket_message",jsonObject.toJSONString());
}
}catch (Exception e){
e.printStackTrace();
}
}
public void sendMessage(Map<String, ExamAnswer> userAnswerMap,Integer optionId,Integer examId){
try {
List<ExamAnswer> collect = ExamAnswerUtil.getExamAnswerList(userAnswerMap);
JSONObject jsonObject = new JSONObject();
jsonObject.put("optionId",optionId);
jsonObject.put("list",collect);
String message = jsonObject.toString();
JSONObject messageJson = new JSONObject();
messageJson.put("examId",examId.toString());
messageJson.put("message",message);
stringRedisTemplate.convertAndSend("webScoket_message",messageJson.toJSONString());
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void onMessage(Message msgs, byte[] pattern) {
try {
byte[] body=msgs.getBody();
String topic=new String(pattern);
String result= new String(body,"utf-8");
JSONObject js= JSON.parseObject(result);
String examId=js.getString("examId");
String msg=js.getString("message");
ConcurrentHashMap<String, Session> map = concurrentHashMap.get(examId);
if (null != map){
Iterator<Map.Entry<String, Session>> iterator = map.entrySet().iterator();
while (iterator.hasNext()){
Session value = iterator.next().getValue();
boolean open = value.isOpen();
if (!open){
map.remove(value.getId());
continue;
}
if (null == value) continue;
String redisKey = "webScoket_heartbeat_"+value.getId();
String time = stringRedisTemplate.opsForValue().get(redisKey);
if (StringUtil.isBlank(time)) {
map.remove(value.getId());
continue;
}
long longTime = Long.parseLong(time);
long dateTime = new Date().getTime();
long a = dateTime - longTime;
if ((dateTime - longTime)>100000){
map.remove(value.getId());
} else {
try {
value.getAsyncRemote().sendText(msg);
}catch (Exception e){
continue;
}
}
}
}
}catch (Exception e){
log.info("onMessage exception");
e.printStackTrace();
}
}
}
|