什么是websocket这里就不进行介绍了,有兴趣的可以自己百度,或许后面我也会发文章介绍。 主要演示一下代码的实现,红色标注部分 需要格外注意
1、 引入依赖websocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、首先在要创建一个WebSocket的实体类,用来接收一些数据(Session 和 用户名)
@Data
public class WebSocketClient {
private Session session;
private String uri;
}
创建websocket的配置文件
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocket的具体实现逻辑,重点
@ServerEndpoint(value = "/websocket/{userName}",encoders = {ServerEncoder.class})
@Component
public class WebSocketService {
private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);
private static int onlineCount = 0;
private static ConcurrentHashMap<String, WebSocketClient> webSocketMap = new ConcurrentHashMap<>();
private Session session;
private String userName="";
@OnOpen
public void onOpen(Session session, @PathParam("userName") String userName) {
if(!webSocketMap.containsKey(userName))
{
addOnlineCount();
}
this.session = session;
this.userName= userName;
WebSocketClient client = new WebSocketClient();
client.setSession(session);
client.setUri(session.getRequestURI().toString());
webSocketMap.put(userName, client);
}
@OnClose
public void onClose() {
if(webSocketMap.containsKey(userName)){
webSocketMap.remove(userName);
if(webSocketMap.size()>0)
{
subOnlineCount();
}
}
log.info(userName+"用户退出,当前在线人数为:" + getOnlineCount());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到用户消息:"+userName+",报文:"+message);
}
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:"+this.userName+",原因:"+error.getMessage());
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
synchronized (session){
this.session.getBasicRemote().sendText(message);
}
}
public static void sendMessage(String userName,String message){
try {
WebSocketClient webSocketClient = webSocketMap.get(userName);
if(webSocketClient!=null){
webSocketClient.getSession().getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
public static void sendMessage(String userName,Object object){
try {
WebSocketClient webSocketClient = webSocketMap.get(userName);
if(webSocketClient!=null){
webSocketClient.getSession().getBasicRemote().sendObject(object);
}
} catch (IOException | EncodeException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketService.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketService.onlineCount--;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
下面主要是为了解决通过服务端向客户端传递一个对象的问题,需要指定编码器,否则会抛出异常。
public class ServerEncoder implements Encoder.Text<HashMap> {
private static final Logger log = LoggerFactory.getLogger(ServerEncoder.class);
@Override
public String encode(HashMap hashMap) throws EncodeException {
try {
return JSONObject.toJSONString(hashMap);
}catch (Exception e){
log.error("",e);
}
return null;
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
上面的Text中的HashMap就是我们需要传递的对象,我这里是通过map封装返回的结果集,如果你创建的是一个具体的对象,直接换成你的消息返回对象即可,注意Text和encode中的要保持一致。
完成这一步以后,我们可以通过在线的连接工具测试一下,检查一下我们服务端是否已经配置好 http://coolaf.com/tool/chattest
注意:如果是一个普通的没有拦截器的springboot项目,代码编写没有问题的情况下,这里是可以测通的。 如果代码中存在拦截器或者增加了一些安全框架,我们需要对访问的路径进行放行,采用的拦截方式不同,具体的放行方式会有差别,但是一定要将所请求的url放行,否则在测试时会出现连接失败的情况。
filterChainDefinitionMap.put("/websocket/**", "anon");
3、Vue的配置
created() {
this.initWebSocket();
},
destroyed() {
this.websocketclose();
},
methods: {
initWebSocket() {
if ("WebSocket" in window){
let ws = process.env.VUE_APP_BASE_WEBSOCKET + this.$store.getters.user.acctLogin;
this.websock = new WebSocket(ws);
this.websock.onopen = this.websocketonopen;
this.websock.onerror = this.websocketonerror;
this.websock.onmessage = this.websocketonmessage;
this.websock.onclose = this.websocketclose;
}
},
websocketonopen: function () {
},
websocketonerror: function (e) {
},
websocketonmessage: function (e) {
if (e){
JSON.parse(e.data);
}
},
websocketclose: function (e) {
}
}
.env.development文件中增加的内容
VUE_APP_BASE_WEBSOCKET = 'ws://127.0.0.1:7773/websocket/'
到了上面的这一步,你在控制台中就可以看到连接建立的情况了。 ?
4、服务端主动向客户端推送消息
WebSocketService.sendMessage("消息的内容");
WebSocketService.sendMessage("接收者的账号","消息的内容");
?
?
注意:websocket的连接不是一直持续的,是有时长的,超过一分钟连接就会关闭,因此我们需要引入心跳来保证连接的持续调用(每一次连接关闭时,调用重连方法,保证连接一直在线) 加入心跳机制后的代码
data(){
websock:null,
lockReconnect: false,
timeout: 28 * 1000,
timeoutObj: null,
serverTimeoutObj: null,
timeoutnum: null,
......
},
method(){
initWebSocket() {
if ("WebSocket" in window){
let ws = process.env.VUE_APP_BASE_WEBSOCKET + this.$store.getters.user.acctLogin;
this.websock = new WebSocket(ws);
this.websock.onopen = this.websocketonopen;
this.websock.onerror = this.websocketonerror;
this.websock.onmessage = this.websocketonmessage;
this.websock.onclose = this.websocketclose;
}
},
reconnect() {
var that = this;
if (that.lockReconnect) {
return;
}
that.lockReconnect = true;
that.timeoutnum && clearTimeout(that.timeoutnum);
that.timeoutnum = setTimeout(function () {
that.initWebSocket();
that.lockReconnect = false;
}, 5000);
},
reset() {
var that = this;
clearTimeout(that.timeoutObj);
clearTimeout(that.serverTimeoutObj);
that.start();
},
start() {
var self = this;
self.timeoutObj && clearTimeout(self.timeoutObj);
self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);
self.timeoutObj = setTimeout(function () {
if (self.websock.readyState == 1) {
} else {
self.reconnect();
}
self.serverTimeoutObj = setTimeout(function () {
self.websock.close();
}, self.timeout);
}, self.timeout);
},
websocketonopen: function () {
},
websocketonerror: function (e) {
this.reconnect();
},
websocketonmessage: function (e) {
if (e){
let parse = JSON.parse(e.data)
if (parse.type === 'add'){
this.total = this.total + 1
if (this.$route.path !== '/system_manager/system_manager_message'){
this.mess = this.$message({
showClose: true,
message: '你有一条新的系统通知待查看',
type: 'success',
center:true,
duration:0
});
}else {
vue.$emit('flush',true)
}
} else if(parse.type === 'del'){
this.total = this.total - 1
}
if (parse.type === 'message'){
if(parse.maint === 'outSide'){
this.jumpRouter = API.backFlowApproveOutSidePath
if (this.$route.path === API.backFlowApproveOutSidePath){
vue.$emit('flush',true)
}
}else {
this.jumpRouter = API.backFlowApprovePath
if (this.$route.path === API.backFlowApprovePath){
vue.$emit('flush',true)
}
}
let notification = document.getElementsByClassName('el-notification')
if (notification.length === 0){
let that = this
this.notifi = this.$notify({
title: '维保流程消息提醒',
message: parse.message,
offset: 100,
type:'success',
duration: 0,
onClick:function(){
that.$router.push({
path: that.jumpRouter
});
that.notifi.close();
}
});
}
}
this.reset();
}
},
websocketclose: function (e) {
this.reconnect();
},
}
?
?
5、Nginx的配置
server {
listen 80;
server_name 域名;
location / {
proxy_pass http://127.0.0.1:8080/; // 代理转发地址
proxy_http_version 1.1;
proxy_read_timeout 3600s; // 超时设置
// 启用支持websocket连接
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
location /upload { // 静态资源地址
root /mnt/resources;
}
}
//proxy_http_version http的版本
|