IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Java 使用Websocket 与MQ消息队列实现即时消息 -> 正文阅读

[网络协议]Java 使用Websocket 与MQ消息队列实现即时消息

Java 使用Websocket 与MQ消息队列实现即时消息

项目需求:根据不同用户账号产生的数据需要即时展示到首页大屏中进行展示,实现方式
1:前端短时间内轮训调用后端接口,后端返回最新相关数据进行展示
2:使用websocket即时通信,一产生新数据,就立即发送。数据产生有MQ进行推送,保证实时性
第一种方式舍弃,频繁请求接口,大部分请求都无效请求,成本过大
实现思路:
1:建立websocket连接,缓存连接用户信息,使用session,保证即时同账号不同登录页也能接收
2:使用MQ,监听MQ产生的推送消息topic
3: MQ监听消息处理类接收消息,根据消息处理业务情况,并根据数据筛选出需要推送到所属用户
4:保持在线用户连接,定时任务每30秒发送缓存内还保持连接的用户心跳数据
技术选型使用:netty-websocket
详细说明查看:
https://gitee.com/Yeauty/netty-websocket-spring-boot-starter
websocket在线测试工具,可在线测试:
http://coolaf.com/tool/chattest

前言

在实际开发使用过程中,产线环境都是使用HTTPS 以及配合 Nginx进行使用,
但是在测试环境下,自己则是通过ws 的方式进行连接测试,即:ws://IP地址 + 端口号/websocket
所以关于HTTPS下使用 wss 协议的问题,以及配合 Nginx 使用域名方式建立连接
不使用 IP地址 + 端口号 连接 WebSocket,因为这种方式不够优雅

ws 和 wss 又是什么鬼?

Websocket使用 ws 或 wss 的统一资源标志符,类似于 HTTP 或 HTTPS
其中 wss 表示在 TLS 之上的 Websocket ,相当于 HTTPS 了
如:
ws://example.com/Websocket
wss://example.com/Websocket
默认情况下,Websocket 的 ws 协议使用 80 端口;运行在TLS之上时,wss 协议默认使用 443 端口。其实说白了,wss 就是 ws 基于 SSL 的安全传输,与 HTTPS 一样样的道理。
如果你的网站是 HTTPS 协议的,那你就不能使用 ws:// 了
浏览器会 block 掉连接,和 HTTPS 下不允许 HTTP 请求一样,如下图:

ws_https

Mixed Content: The page at 'https://domain.com/' was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://x.x.x.x:xxxx/'. This request has been blocked; this endpoint must be available over WSS.
这种情况,我们就需要使用 wss:\\ 安全协议了
如果把ws的方式来去使用wss的时候

在这里插入图片描述

VM512:35 WebSocket connection to 'wss://IP地址:端口号/websocket' failed: Error in connection establishment: net::ERR_SSL_PROTOCOL_ERROR
很明显 SSL 协议错误,说明就是证书问题了。
这时候我们一直拿的是 IP地址 + 端口号 这种方式连接 WebSocket 的
这没有证书存在,生产环境不可能用 IP地址 + 端口号 这种方式连接 WebSocket 的
要用域名方式连接 WebSocket 。

Nginx 配置域名支持 WSS

Nginx配置 HTTPS 域名位置加入配置:

location /websocket {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

接着拿域名再次连接试一下,不出意外会看 101 状态码:
在这里插入图片描述

这样就完成了在 HTTPPS 下以域名方式连接 WebSocket 
接下来直接上代码

Maven导包

	  <dependency>
          <groupId>org.yeauty</groupId>
          <artifactId>netty-websocket-spring-boot-starter</artifactId>
          <version>0.12.0</version>
      </dependency>

JAVA代码

websocket处理类

package com.biz.controller.home;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.redis.provider.impl.StringRedisProvider;
import com.core.constant.SecurityConstant;
import com.security.def.BoyunLoginUser;
import com.security.def.BoyunUserDTO;
import com.biz.def.UserInfoWebSocket;
import com.biz.def.WebSocketServerDto;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.yeauty.annotation.*;
import org.yeauty.pojo.Session;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * webSocket
 *
 * @author 夕四
 * @date 2022-01-24 16:10
 **/
@Slf4j
@ServerEndpoint(path = "/ws/{sid}", port = "9011")
public class MyWebSocket {

    /**
     * 当前在线连接数
     */
    public static final AtomicInteger ONLINE_NUM = new AtomicInteger(0);
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServerDto> webSocketMap = new ConcurrentHashMap<>();

    @Autowired
    private StringRedisProvider stringRedisProvider;


    @BeforeHandshake
    public void handshake(Session session) {
        session.setSubprotocols("stomp");
    }

    @OnOpen
    public void onOpen(Session session, @PathVariable String sid) {
        // 连接数 +1
        ONLINE_NUM.incrementAndGet();
        log.info("{}连接成功,当前在线数量:{}", sid, ONLINE_NUM.get());
        if (StringUtils.isBlank(sid)) {
            session.close();
            return;
        }
        log.info(">>>>>>>>>>>>>>>>> sid:{}", sid);
        session.setAttribute(SecurityConstant.TOKEN, sid);
        log.info(">>>>>>>>>>>>>>>>> sid2:{}", sid);
        // redis判断sid的用户值
        String value = stringRedisProvider.get(SecurityConstant.PROJECT_PREFIX + sid);
        log.info(">>>>>>>>>>>>>>>>> value:{}", value);
        if (StringUtils.isBlank(value)) {
            log.debug("{} 未登录", sid);
            session.close();
            return;
        }
        BoyunLoginUser<BoyunUserDTO> tokenUser = JSON.parseObject(value, new TypeReference<BoyunLoginUser<BoyunUserDTO>>() {
        });
        log.info(">>>>>>>>>>>>>>>>> tokenUser:{}", tokenUser.toString());
        // 绑定自定义属性
        Long userId = tokenUser.getUser().getUserId();
        session.setAttribute(SecurityConstant.INNER_USER_ID, userId);
        WebSocketServerDto webSocketServerDto = webSocketMap.get(sid);
        if (webSocketServerDto != null) {
            log.info("关闭");
            // 如果已存在,先关闭以前的连接,可能会出现覆盖后再添加
            webSocketServerDto.getSession().close();
        }
        webSocketServerDto = new WebSocketServerDto();
        webSocketServerDto.setSession(session);
        webSocketMap.put(sid, webSocketServerDto);
        UserInfoWebSocket.setSessionIdMap(userId, sid);
    }

    @OnClose
    public void onClose(Session session) throws IOException {
        // 连接数 -1
        ONLINE_NUM.decrementAndGet();
        String sid = session.getAttribute(SecurityConstant.TOKEN);
        Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);
        log.info("{} 连接关闭,在线数量:{}", sid, ONLINE_NUM.get());
        webSocketMap.remove(sid);
        UserInfoWebSocket.delSessionIdMap(userId, sid);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        log.error("发生错误的连接:{},userId:{}", session.getAttribute(SecurityConstant.TOKEN), session.getAttribute(SecurityConstant.INNER_USER_ID));
    }

    @SneakyThrows
    @OnMessage
    public void onMessage(Session session, String message) {
        if (!"123456789".equals(message)) {
//            session.sendText(message);
//            log.info("发送消息:{}", message);
        }
    }

    @OnBinary
    public void onBinary(Session session, byte[] bytes) {
        // 不打印心跳数据
        if (!"123456789".equals(new String(bytes))) {
            log.info("收到客户端:{} 的消息:{}", session.getAttribute(SecurityConstant.TOKEN), new String(bytes));
//            session.sendText("123456789");
        }
    }

    @OnEvent
    public void onEvent(Session session, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    log.info("读数据闲置");
                    break;
                case WRITER_IDLE:
                    log.info("写数据闲置");
                    break;
                case ALL_IDLE:
                    log.info("读、写数据闲置");
                    break;
                default:
                    break;
            }
        }
    }

    /**
     * 发送自定义消息
     */
    public static void sendInfo(String message, String sid) {
        // 任意一个参数是空,返回false
        if (StringUtils.isAnyBlank(message, sid)) {
            log.debug("参数错误,sid:{},message:{}", sid, message);
        }
        log.debug("推送消息到窗口{},推送内容: {}", sid, message);
        WebSocketServerDto webSocketServer = webSocketMap.get(sid);
        if (webSocketServer != null) {
            Session session = webSocketServer.getSession();
            if (session == null) {
                log.error("{} 不存在session", sid);
                return;
            }
            // 不活跃了
            if (!session.isActive()) {
                // 移除
                Long userId = session.getAttribute(SecurityConstant.INNER_USER_ID);
                webSocketMap.remove(sid);
                UserInfoWebSocket.delSessionIdMap(userId, sid);
            }
            session.sendText(message);
        }
    }

}

WebSocket 需要推送的用户信息缓存

package com.biz.def;

import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * WebSocket信息缓存
 *
 * @author 夕四
 * @date 2022-02-23
 */
@Slf4j
public class UserInfoWebSocket {
    /**
     * key为锁id,value为锁id对应的用户列表
     */
    private static Map<Long, Set<String>> cachedMap = new HashMap<>();

    private static ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();

    /**
     * 根据用户id获取sessionId
     *
     * @param userId
     * @return
     */
    public static Set<String> getSessionIdSet(Long userId) {
        rwlock.readLock().lock();
        try {
            return cachedMap.get(userId);
        } finally {
            rwlock.readLock().unlock();
        }
    }

    /**
     * 根据用户id集合获取sessionId集合
     *
     * @param userIdList
     * @return
     */
    public static Set<String> getSessionIdSetByUserIdList(List<Long> userIdList) {
        rwlock.readLock().lock();
        Set<String> result = new HashSet<>();
        try {
            log.info("<<<<<<<<<<<<<<<<<<<<   cachedMap:{}",cachedMap);
            for (Long userId : userIdList) {
                Set<String> sidList = cachedMap.get(userId);
                if (sidList != null && !sidList.isEmpty()) {
                    result.addAll(sidList);
                }
            }
        } finally {
            rwlock.readLock().unlock();
        }
        return result;
    }

    /**
     * 获取sessionId集合
     *
     * @return
     */
    public static Set<String> getSessionIdSet() {
        rwlock.readLock().lock();
        Set<String> result = new HashSet<>();
        try {
            for (Map.Entry<Long, Set<String>> longSetEntry : cachedMap.entrySet()) {
                Set<String> sidList = longSetEntry.getValue();
                result.addAll(sidList);
            }
        } finally {
            rwlock.readLock().unlock();
        }
        return result;
    }

    /**
     * 添加用户id对应的sessionId
     *
     * @param userId
     * @param sid
     */
    public static void setSessionIdMap(Long userId, String sid) {
        rwlock.writeLock().lock();
        try {
            Set<String> list = cachedMap.get(userId);

            if (list == null) {
                list = new HashSet<>();
            }
            list.add(sid);
            cachedMap.put(userId, list);
        } finally {
            rwlock.writeLock().unlock();
        }
    }

    /**
     * 删除某个用户的sessionId
     *
     * @param userId
     * @param sid
     */
    public static void delSessionIdMap(Long userId, String sid) {
        rwlock.writeLock().lock();
        try {
            Set<String> set = cachedMap.get(userId);
            if (set == null) {
                return;
            }
            set.remove(sid);
            if (set.isEmpty()) {
                cachedMap.remove(userId);
            } else {
                cachedMap.put(userId, set);
            }
        } finally {
            rwlock.writeLock().unlock();
        }
    }


    /**
     * 删除用户id
     *
     * @param userId
     */
    public static void delUserId(Long userId) {
        rwlock.writeLock().lock();
        try {
            cachedMap.remove(userId);
        } finally {
            rwlock.writeLock().unlock();
        }
    }

}

webSocket传输对象

package com.biz.def;

import org.yeauty.pojo.Session;

import java.util.Date;
import java.util.Objects;

/**
 * webSocket传输对象
 */
public class WebSocketServerDto {
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收sid
     */
    private String sid;
    /**
     * 心跳时间
     */
    private Date heartTime;

    public WebSocketServerDto() {
    }

    public WebSocketServerDto(Session session, String sid, Date heartTime) {
        this.session = session;
        this.sid = sid;
        this.heartTime = heartTime;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public String getSid() {
        return sid;
    }

    public void setSid(String sid) {
        this.sid = sid;
    }

    public Date getHeartTime() {
        return heartTime;
    }

    public void setHeartTime(Date heartTime) {
        this.heartTime = heartTime;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WebSocketServerDto that = (WebSocketServerDto) o;
        return Objects.equals(session, that.session) &&
                Objects.equals(sid, that.sid) &&
                Objects.equals(heartTime, that.heartTime);
    }

    @Override
    public int hashCode() {

        return Objects.hash(session, sid, heartTime);
    }

    @Override
    public String toString() {
        return "WebSocketServerDto{" +
                "session=" + session +
                ", sid='" + sid + '\'' +
                ", heartTime=" + heartTime +
                '}';
    }
}

websocket的推送消息体

package com.bsj.studentcard.upms.pc.biz.def;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * websocket推送消息
 *
 * @author 夕四
 * @date 2022-02-23 12:55
 **/
@Data
@NoArgsConstructor
public class WsMsgDataVO<T> {
    /**
     * 推送数据
     */
    private T data;
    /**
     * 标记标签
     */
    private String tag;
    public WsMsgDataVO(T data, String tag) {
        this.data = data;
        this.tag = tag;
    }
}

MQ的topic监听

package com.bsj.studentcard.upms.pc.biz.config.mq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 消息接口
 *
 * @author 夕四
 * @date 2022-05-14 16:25
 **/
public interface MySource {
    /**
     * 报警回调
     */
    String ALARM_SOS = "alarmSos";

    /**
     * 报警回调发送
     *
     * @return
     */
    @Input(ALARM_SOS)
    SubscribableChannel alarmSos();
}

MQ监听消息处理websock发送

package com.bsj.studentcard.upms.pc.biz.config.mq;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.bsj.studentcard.upms.pc.biz.controller.home.MyWebSocket;
import com.bsj.studentcard.upms.pc.biz.def.UserInfoWebSocket;
import com.bsj.studentcard.upms.pc.biz.def.WsMsgDataVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeTypeUtils;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * 发送消息配置
 *
 * @author 夕四
 * @date 2022-05-14 16:29
 **/
@Slf4j
@Component
@RequiredArgsConstructor
public class MqSenderService {

    private final MySource source;

    /**
     * 需要MQ推送至websocket的topic
     *
     * @param msg
     */
    @StreamListener(MySource.ALARM_SOS)
    public void sosAlarm(Message<String> message) {
        String result = message.getPayload();
        if (StrUtil.isEmpty(result)) {
            log.warn("检测到报警消息为空");
            return;
        }
        List<AlarmLogDTO> oCardCallBackVOS = JSONObject.parseArray(result, AlarmLogDTO.class);
        if (CollUtil.isEmpty(oCardCallBackVOS)) {
            log.warn("检测到SOS报警消息为空");
            return;
        }
        log.info("监听到sos报警输出为:{}", result);
        //接下来的就是业务处理数据
        List<Long> cardIds = oCardCallBackVOS.stream().map(AlarmLogDTO::getCardId)
                .distinct().collect(Collectors.toList());
        List<UserDataVO> userDataVOS = CommonBaseCacheForest.listTopUserList(cardIds);
        HashMap<Long, UserDataVO> commonMap = userDataVOS.stream().collect(Collectors.toMap(UserDataVO::getCardId,
                Function.identity(), (key1, key2) -> key2, HashMap::new));
        for (AlarmLogDTO callBack : oCardCallBackVOS) {
            Long cardId = callBack.getCardId();
            //根据产生的数据查找对应推送到所属的用户
            UserDataVO userDataVO = commonMap.get(cardId);
            List<Long> userIds = userDataVO.getUserId();
            if (CollUtil.isEmpty(userIds)) {
                log.info("该数据不属于任何用户,cardId:{}", cardId);
                continue;
            }
            Set<String> sidSet = UserInfoWebSocket.getSessionIdSetByUserIdList(userIds);
            if (CollUtil.isEmpty(sidSet)) {
                log.info("当前用户没一个登录,不发送");
                continue;
            }
            //组装推送消息的消息体
            OAlarmLogPageVO oAlarmLogPageVO = formatData(callBack, commonDTO);
            WsMsgDataVO<OAlarmLogPageVO> msg = new WsMsgDataVO<OAlarmLogPageVO>(oAlarmLogPageVO, AlarmTypeEnum.SOS_ALARM.name());
            sidSet.forEach(sid -> MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));
        }
    }

	/**
     * 每30秒对缓存起来的用户保持心跳连接,防止掉线
     *
     */
    @Scheduled(cron = "*/30 * * * * ?")
    public void keepHeartbeat() {
        Set<String> sidSet = UserInfoWebSocket.getSessionIdSet();
        WsMsgDataVO<String> msg = new WsMsgDataVO<String>("心跳连接", "heartBeat");
        sidSet.forEach(sid -> MyWebSocket.sendInfo(JSONObject.toJSONString(msg), sid));
    }
}

  网络协议 最新文章
如何判断SSL证书的安全性高低?越贵越好?懂
【UDS】ISO14229之0x22服务
测开面经汇总
DAY17、CSRF 漏洞
Gitlab SSRF 漏洞复现 CVE-2021-22214
Apache Calcite Avatica官方通告一个RCE漏洞
OpenSSL SSL_read: Connection was reset e
数据通信与网络:CH15 Connecting LANs Bac
TCP连接图(客户端/服务器端)
sscom串口网络数据调试器使用post方法向华为
上一篇文章      下一篇文章      查看所有文章
加:2022-06-29 19:26:03  更:2022-06-29 19:26:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
360图书馆 购物 三丰科技 阅读网 日历 万年历 2022年8日历 -2022/8/7 23:56:17-
图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码