IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> SpringBoot + Vue + WebSocket 实时通信 -> 正文阅读

[网络协议]SpringBoot + Vue + WebSocket 实时通信

作者:token comment

什么是websocket这里就不进行介绍了,有兴趣的可以自己百度,或许后面我也会发文章介绍。
主要演示一下代码的实现,红色标注部分 需要格外注意


1、 引入依赖websocket

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、首先在要创建一个WebSocket的实体类,用来接收一些数据(Session 和 用户名)

//这里我使用了Lombok的注解,如果没有添加这个依赖 可以创建get set方法
@Data
public class WebSocketClient {

    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    //连接的uri
    private String uri;

}

创建websocket的配置文件

@Configuration
@EnableWebSocket
public class WebSocketConfig  {
    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

WebSocket的具体实现逻辑,重点

/**
 * @desc: WebSocketService实现
 * @author: LiuCh
 * @since: 2021/8/16
 */

//ServerEncoder 是为了解决编码异常,如果不需要使用sendObject()方法,这个可以忽略,只写value即可
@ServerEndpoint(value = "/websocket/{userName}",encoders = {ServerEncoder.class})
@Component
public class WebSocketService {
    private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);

    /**
     *     静态变量,用来记录当前在线连接数
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
     */
    private static ConcurrentHashMap<String, WebSocketClient> webSocketMap = new ConcurrentHashMap<>();


    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userName 用来区别不同的用户*/
    private String userName="";
    /**
     * 连接建立成功调用的方法 可根据自己的业务需求做不同的处理*/
    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        if(!webSocketMap.containsKey(userName))
        {
            addOnlineCount(); // 在线数 +1
        }
        this.session = session;
        this.userName= userName;
        WebSocketClient client = new WebSocketClient();
        client.setSession(session);
        client.setUri(session.getRequestURI().toString());
        webSocketMap.put(userName, client);
//        log.info("用户连接:"+userName+",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userName)){
            webSocketMap.remove(userName);
            if(webSocketMap.size()>0)
            {
                //从set中删除
                subOnlineCount();
            }
        }
        log.info(userName+"用户退出,当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到用户消息:"+userName+",报文:"+message);
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:"+this.userName+",原因:"+error.getMessage());

        error.printStackTrace();
    }

    /**
     * 连接服务器成功后主动推送
     */
    public void sendMessage(String message) throws IOException {
        synchronized (session){
            this.session.getBasicRemote().sendText(message);
        }
    }

    /**
     * 向指定客户端发送消息(字符串形式)
     * @param userName
     * @param message
     */
    public static void sendMessage(String userName,String message){
        try {
            WebSocketClient webSocketClient = webSocketMap.get(userName);
            if(webSocketClient!=null){
                webSocketClient.getSession().getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * 向指定客户端发送消息(对象的形式)
     * @param userName
     * @param object
     */
    public static void sendMessage(String userName,Object object){
        try {
            WebSocketClient webSocketClient = webSocketMap.get(userName);
            if(webSocketClient!=null){
                webSocketClient.getSession().getBasicRemote().sendObject(object);
            }
        } catch (IOException | EncodeException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketService.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketService.onlineCount--;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }


}

下面主要是为了解决通过服务端向客户端传递一个对象的问题,需要指定编码器,否则会抛出异常。

/**
 * @desc: WebSocket编码器
 * @author: LiuCh
 * @since: 2021/8/18
 */
public class ServerEncoder implements Encoder.Text<HashMap> {
    private static final Logger log = LoggerFactory.getLogger(ServerEncoder.class);

    /**
     * 这里的参数 hashMap 要和  Encoder.Text<T>保持一致
     * @param hashMap
     * @return
     * @throws EncodeException
     */
    @Override
    public String encode(HashMap hashMap) throws EncodeException {
        /*
         * 这里是重点,只需要返回Object序列化后的json字符串就行
         * 你也可以使用gosn,fastJson来序列化。
         * 这里我使用fastjson
         */
       try {
           return JSONObject.toJSONString(hashMap);
       }catch (Exception e){
           log.error("",e);
       }
        return null;
    }

    @Override
    public void init(EndpointConfig endpointConfig) {
        //可忽略
    }

    @Override
    public void destroy() {
        //可忽略
    }
}

上面的Text中的HashMap就是我们需要传递的对象,我这里是通过map封装返回的结果集,如果你创建的是一个具体的对象,直接换成你的消息返回对象即可,注意Text和encode中的要保持一致。




完成这一步以后,我们可以通过在线的连接工具测试一下,检查一下我们服务端是否已经配置好
http://coolaf.com/tool/chattest

在这里插入图片描述


注意:如果是一个普通的没有拦截器的springboot项目,代码编写没有问题的情况下,这里是可以测通的。
如果代码中存在拦截器或者增加了一些安全框架,我们需要对访问的路径进行放行,采用的拦截方式不同,具体的放行方式会有差别,但是一定要将所请求的url放行,否则在测试时会出现连接失败的情况。

//WebSocket 请求放行
filterChainDefinitionMap.put("/websocket/**", "anon");


3、Vue的配置
  created() {
    // 初始化websocket
    this.initWebSocket();
  },
  destroyed() {
    //销毁
    this.websocketclose();
  },
  methods: {
  	initWebSocket() {

      //建立websocket连接
      if ("WebSocket" in window){
        //连接服务端访问的url,我这里配置在了env中,就是上面在线测试工具中的地址,下面放了实例
        let ws = process.env.VUE_APP_BASE_WEBSOCKET + this.$store.getters.user.acctLogin;
        this.websock = new WebSocket(ws);
        this.websock.onopen = this.websocketonopen;
        this.websock.onerror = this.websocketonerror;
        this.websock.onmessage = this.websocketonmessage;
        this.websock.onclose = this.websocketclose;
      }
    },
       websocketonopen: function () {
      // console.log("连接成功",)
      //可以通过send方法来向服务端推送消息,推送的消息在onMessage中可以打印出来查看并做一些业务处理。
      //this.websock.send("向服务端推送的消息内容")
    },
    websocketonerror: function (e) {
      // console.log("连接失败",)
    },
    websocketonmessage: function (e) { 
      // console.log("服务端消息的内容",e.data)
      if (e){
        JSON.parse(e.data);//这个是收到后端主动推送的json字符串转化为对象(必须保证服务端传递的是json字符串,否则会报错)
				//你的业务处理...
        
      }

    },
    websocketclose: function (e) {
      // console.log("连接关闭",)
    }
  
  }

.env.development文件中增加的内容

VUE_APP_BASE_WEBSOCKET = 'ws://127.0.0.1:7773/websocket/'

到了上面的这一步,你在控制台中就可以看到连接建立的情况了。
?

4、服务端主动向客户端推送消息

WebSocketService.sendMessage("消息的内容"); //调用这个方法即可 这种是向所有的在线的推送消息  广播
WebSocketService.sendMessage("接收者的账号","消息的内容"); //向指定账号推送消息 单点
//这里消息的内容也可以封装为一个对象进行返回,使用对象时一定要指定编码器

?

?

注意:websocket的连接不是一直持续的,是有时长的,超过一分钟连接就会关闭,因此我们需要引入心跳来保证连接的持续调用(每一次连接关闭时,调用重连方法,保证连接一直在线)
加入心跳机制后的代码

data(){
      websock:null,
      lockReconnect: false, //是否真正建立连接
      timeout: 28 * 1000, //保持websocket的连接
      timeoutObj: null, //心跳心跳倒计时
      serverTimeoutObj: null, //心跳倒计时
      timeoutnum: null, //断开 重连倒计时
      ......
},
  
method(){
  initWebSocket() {

      //建立websocket连接
      if ("WebSocket" in window){
        // let ws = Config.wsUrl + this.$store.getters.user.acctLogin;
        let ws = process.env.VUE_APP_BASE_WEBSOCKET + this.$store.getters.user.acctLogin;
        this.websock = new WebSocket(ws);
        this.websock.onopen = this.websocketonopen;
        this.websock.onerror = this.websocketonerror;
        this.websock.onmessage = this.websocketonmessage;
        this.websock.onclose = this.websocketclose;
      }
    },
    //重新连接
    reconnect() {
      var that = this;
      if (that.lockReconnect) {
        return;
      }
      that.lockReconnect = true;
      //没连接上会一直重连,设置延迟避免请求过多
      that.timeoutnum && clearTimeout(that.timeoutnum);
      that.timeoutnum = setTimeout(function () {
        //新连接
        that.initWebSocket();
        that.lockReconnect = false;
      }, 5000);
    },
    //重置心跳
    reset() {
      var that = this;
      //清除时间
      clearTimeout(that.timeoutObj);
      clearTimeout(that.serverTimeoutObj);
      //重启心跳
      that.start();
    },
    //开启心跳
    start() {
      var self = this;
      self.timeoutObj && clearTimeout(self.timeoutObj);
      self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);
      self.timeoutObj = setTimeout(function () {
        //这里发送一个心跳,后端收到后,返回一个心跳消息,
        if (self.websock.readyState == 1) {
          //连接正常

        } else {
          //否则重连
          self.reconnect();
        }
        self.serverTimeoutObj = setTimeout(function () {
          //超时关闭
          self.websock.close();
        }, self.timeout);
      }, self.timeout);
    },
    websocketonopen: function () {
      // console.log("连接成功",)
    },
    websocketonerror: function (e) {
      // console.log("连接失败",)
      //重连
      this.reconnect();
    },
    websocketonmessage: function (e) {//JSON.parse(e.data); //这个是收到后端主动推送的值
      // console.log("服务端消息的内容",e.data)
      if (e){
        let parse = JSON.parse(e.data) //将json字符串转为对象
        if (parse.type === 'add'){ //消息个数+1
          this.total = this.total + 1
          if (this.$route.path !== '/system_manager/system_manager_message'){
            this.mess = this.$message({
              showClose: true,
              message: '你有一条新的系统通知待查看',
              type: 'success',
              center:true,
              duration:0
            });
          }else {
            vue.$emit('flush',true)
          }
        } else if(parse.type === 'del'){
          this.total = this.total - 1
        }
        if (parse.type === 'message'){
          //判断是外厂还是内厂操作 跳转的route不同
          if(parse.maint === 'outSide'){ //外厂
            this.jumpRouter = API.backFlowApproveOutSidePath
            //如果当前页面就是将要跳转的页面,直接刷新当前页
            if (this.$route.path === API.backFlowApproveOutSidePath){
              vue.$emit('flush',true)
            }
          }else { //内厂
            this.jumpRouter = API.backFlowApprovePath
            //如果当前页面就是将要跳转的页面,直接刷新当前页
            if (this.$route.path === API.backFlowApprovePath){
              vue.$emit('flush',true)
            }
          }
          let notification = document.getElementsByClassName('el-notification')
          if (notification.length === 0){
            let that = this
            this.notifi = this.$notify({
              title: '维保流程消息提醒',
              message: parse.message,
              offset: 100,
              type:'success',
              duration: 0,
              onClick:function(){
                that.$router.push({
                  path: that.jumpRouter
                });
                //关闭消息通知弹窗
                that.notifi.close();
              }
            });
          }

        }
        //收到服务器信息,心跳重置
        this.reset();

      }

    },
    websocketclose: function (e) {
      // console.log("连接关闭",)
      //重连
      this.reconnect();
    },
  
}

?

?

5、Nginx的配置

server {
      listen   80;
      server_name 域名;
      location / {
        proxy_pass   http://127.0.0.1:8080/; // 代理转发地址
     proxy_http_version 1.1;
        proxy_read_timeout   3600s; // 超时设置
        // 启用支持websocket连接
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
      }
      location /upload { // 静态资源地址
            root   /mnt/resources;        
      }
}
//proxy_http_version http的版本
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:28:31  更:2021-08-26 12:30:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 21:47:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码