websocket实现有四种方法,这里展示相对简单的一种方法
这里使用原生注解的方法实现
说明需要注意的点:
- 引用的包都在 **javax.websocket **下。并不是 spring 提供的,而 jdk 自带的。
下面关于使用到的几个注解的说明:
- @ServerEndpoint :通过这个 spring boot 就可以知道你暴露出去的 ws 应用的路径,有点类似我们经常用的@RequestMapping。比如你的启动端口是 8080,而这个注解的值是 ws,那我们就可以通过 ws://127.0.0.1:8080/ws 来连接你的应用
- @OnOpen:当 websocket 建立连接成功后会触发这个注解修饰的方法,注意它有一个 Session 参数
- @OnClose: 当 websocket 建立的连接断开后会触发这个注解修饰的方法
- @OnMessage: 当客户端发送消息到服务端时,会触发这个注解修改的方法,如果需要做心跳检测可以在这里做。
- @OnError::当 websocket 建立连接时出现异常会触发这个注解修饰的方法
- 使用 session.getBasicRemote().sendText(*) 向客户端发送消息
如下是具体实现
- pom.xml 引入架包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.WebSocketServer
package com.dw.sprboosoc.service;
import com.alibaba.fastjson.JSON;
import com.dw.sprboosoc.constant.MessageEnum;
import com.dw.sprboosoc.dto.WebSocketMessageDto;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
@ServerEndpoint(value = "/websocket/{sendUserId}")
public class WebSocketServer {
private static final AtomicInteger currentOnlineNumber = new AtomicInteger();
private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Map<String, List<WebSocketMessageDto>>> messageMap = new ConcurrentHashMap<>();
public void sendMessage(WebSocketMessageDto webSocketMessageDto) throws IOException {
try {
switch (webSocketMessageDto.getMessageEnum()) {
case ALL:
sessionPool.values().forEach(se -> {
try {
se
.getBasicRemote()
.sendText(webSocketMessageDto.toString());
} catch (IOException e) {
log.info("群发");
}
});
log.info("websocket: 广播消息:" + webSocketMessageDto);
storeOfflineMessage(webSocketMessageDto);
break;
case ONE:
String message = webSocketMessageDto.getMessage();
if (message.equals("ping")) {
WebSocketMessageDto wd = new WebSocketMessageDto();
wd.setMessage("pong");
wd.setRecvUserId("");
wd.setMessageEnum(MessageEnum.ONE);
sessionPool.get(webSocketMessageDto.getSendUserId())
.getBasicRemote()
.sendText(JSON.toJSONString(wd));
} else {
if (judgeUserOnline(webSocketMessageDto.getRecvUserId())) {
sessionPool.get(webSocketMessageDto.getRecvUserId())
.getBasicRemote()
.sendText(JSON.toJSONString(webSocketMessageDto));
} else {
storeOfflineMessage(webSocketMessageDto);
}
log.info("websocket: 私发消息," + webSocketMessageDto);
break;
}
}
} catch (Exception exception) {
log.error("websocket: 发送消息发生了错误");
}
}
@OnMessage
public void onMessage(String webSocketMessageDtoStr) throws IOException {
WebSocketMessageDto webSocketMessageDto = JSON.parseObject(webSocketMessageDtoStr, WebSocketMessageDto.class);
log.info("websocket:" + webSocketMessageDto.getRecvUserId() + "收到,来自:" + webSocketMessageDto.getSendUserId() + ",发送的消息:" + webSocketMessageDto.getMessage());
sendMessage(webSocketMessageDto);
}
public boolean judgeUserOnline(String recvUserId) {
boolean flag = !ObjectUtils.isEmpty(sessionPool.get(recvUserId));
String flagStr = flag ? "在线" : "离线";
log.info("websocket: " + recvUserId + ":" + flagStr);
return flag;
}
public void storeOfflineMessage(WebSocketMessageDto webSocketMessageDto) {
if (ObjectUtils.isEmpty(messageMap.get(webSocketMessageDto.getRecvUserId()))) {
Map<String, List<WebSocketMessageDto>> maps = new HashMap<>();
List<WebSocketMessageDto> list = new ArrayList<>();
list.add(webSocketMessageDto);
maps.put(webSocketMessageDto.getRecvUserId(), list);
messageMap.put(webSocketMessageDto.getRecvUserId(), maps);
} else {
Map<String, List<WebSocketMessageDto>> listObject = messageMap.get(webSocketMessageDto.getRecvUserId());
List<WebSocketMessageDto> objects = new ArrayList<>();
if (!ObjectUtils.isEmpty(listObject.get(webSocketMessageDto.getRecvUserId()))) {
objects = listObject.get(webSocketMessageDto.getRecvUserId());
objects.add(webSocketMessageDto);
listObject.put(webSocketMessageDto.getRecvUserId(), objects);
} else {
objects.add(webSocketMessageDto);
listObject.put(webSocketMessageDto.getRecvUserId(), objects);
}
messageMap.put(webSocketMessageDto.getRecvUserId(), listObject);
}
}
@OnOpen
public void onOpen(Session session, @PathParam(value = "sendUserId") String sendUserId) throws IOException {
sessionPool.put(sendUserId, session);
currentOnlineNumber.incrementAndGet();
log.info("websocket:" + sendUserId + "加入连接,当前在线用户" + currentOnlineNumber + "未读消息数:" + getMessageCount(sendUserId));
sendOffLineMessage(sendUserId);
}
@SneakyThrows
public void sendOffLineMessage(String sendUserId) {
if (ObjectUtils.isEmpty(messageMap.get(sendUserId))) {
return;
}
Map<String, List<WebSocketMessageDto>> lists = messageMap.get(sendUserId);
List<WebSocketMessageDto> list = lists.get(sendUserId);
if (list != null) {
for (WebSocketMessageDto webSocketMessageDto : list) {
onMessage(JSON.toJSONString(webSocketMessageDto));
}
}
removeHasBeenSentMessage(sendUserId, lists);
}
public void removeHasBeenSentMessage(String sendUserId, Map<String, List<WebSocketMessageDto>> map) {
Iterator iterator = map.keySet().iterator();
while (iterator.hasNext()) {
String keys = (String) iterator.next();
if (sendUserId.equals(keys)) {
iterator.remove();
}
}
}
@OnClose
public void onClose(@PathParam(value = "sendUserId") String sendUserId) {
sessionPool.remove(sendUserId);
currentOnlineNumber.decrementAndGet();
log.info("websocket:" + sendUserId + "断开连接,当前在线用户" + currentOnlineNumber);
}
@OnError
public void onError(Throwable throwable) {
log.error("websocket: 发生了错误");
throwable.printStackTrace();
}
public int getMessageCount(String recvUserId) {
Map<String, List<WebSocketMessageDto>> listMap = messageMap.get(recvUserId);
if (listMap != null) {
List<WebSocketMessageDto> list = listMap.get(recvUserId);
if (list != null) {
return listMap.get(recvUserId).size();
} else {
return 0;
}
} else {
return 0;
}
}
}
4.WebSocket 消息 DTO
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
@Getter
@Setter
@ToString
public class WebSocketMessageDto implements Serializable {
private static final long serialVersionUID = 4153093005674764992L;
private String sendUserId;
private String recvUserId;
private String message;
private MessageEnum messageEnum;
}
5.MessageEnum 消息体枚举
import lombok.Getter;
@Getter
public enum MessageEnum {
ONE("one", "私发"),
ALL("all", "群发"),
OTHER("other", "其他");
private final String messageType;
private final String desc;
MessageEnum(String messageType, String desc) {
this.messageType = messageType;
this.desc = desc;
}
}
6.结果验证:建立连接方式可以在 http://www.jsons.cn/websocket/ 页面输入 ws://127.0.0.1:端口号/websocket/1234 建立连接、发送消息等
|