IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> websocket通过redis实现集群方案 -> 正文阅读

[网络协议]websocket通过redis实现集群方案

websocket通过redis实现集群方案

一、前言

1、使用websocket前后端通信时,若后台是集群部署,那么连接只能与其中一台握手建立连接,当nginx做负载后触发节点在未与websocket建立连接的服务上,此时就会出现问题。

二、解决方案

1、使用redis的发布订阅方式。(mq同理)
2、当需要websocket需求发送消息时,使用redis的发布订阅功能,将消息推送到redis中,所有需求消息的服务都监听这个Topic,接收到消息后,判断是否与前端建立连接,如果建立连接,将消息通过websocket发送出去,前端接收。若没有连接直接跳过即可。

三、解决步骤

1、引入websocket和redis依赖

		<!--webSocket-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
		<!--redis相关-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2、建一个websocket的配置

@Configuration
public class WebSocketConfig {

	@Bean
	public ServerEndpointExporter serverEndpointExporter() {
		return new ServerEndpointExporter();
	}

}

3、建websocket核心服务

@Slf4j
@Component
@ServerEndpoint("/socketServer/{userCode}")
public class WebSocketServer {

	/**
	 * nginx
	 * location / {
	 *           #  root   html;
	 *                 proxy_pass  http://base-web;
	 * 				proxy_set_header Upgrade $http_upgrade;
	 *         	proxy_set_header Connection "upgrade";
	 * 			proxy_set_header   Host    $host;
	 * 			proxy_set_header   X-Real-IP   $remote_addr;
	 * 			proxy_set_header   X-Forwarded-For $proxy_add_x_forwarded_for;
	 *            # index  index.html index.htm;
	 *  }
	 *
	 *
	 */

	/**
	 * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
	 */
	private static int onlineCount = 0;
	/**
	 * 用来存放每个客户端对应的MyWebSocket对象
	 */
	private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
	/**
	 * 与某个客户端的连接会话,需要通过它来给客户端发送数据
	 */

	private Session session;
	/**
	 * 接收 userCode
	 */
	private String userCode = "";

	/**
	 * 连接建立成功调用的方法
	 */
	@OnOpen
	public void onOpen(Session session, @PathParam("userCode") String userCode) {
		this.session = session;
		this.userCode = userCode;
		if (webSocketMap.containsKey(userCode)) {
			webSocketMap.remove(userCode);
			webSocketMap.put(userCode, this);
		} else {
			webSocketMap.put(userCode, this);
			addOnlineCount();
		}
		log.info("用户连接:" + userCode + ",当前在线人数为:" + getOnlineCount());
		try {
			sendMessage("连接成功");
		} catch (IOException e) {
			log.error("用户:" + userCode + ",网络异常");
		}
	}

	/**
	 * 连接关闭调用的方法
	 */
	@OnClose
	public void onClose() {
		if (webSocketMap.containsKey(userCode)) {
			webSocketMap.remove(userCode);
			subOnlineCount();
		}
		log.info("用户退出:" + userCode + ",当前在线人数为:" + getOnlineCount());
	}

	/**
	 * 收到客户端消息后调用的方法
	 *
	 * @param message 客户端发送过来的消息
	 */
	@OnMessage
	public void onMessage(String message, Session session) {
		log.info("用户消息:" + userCode + ",报文:" + message);
		//可以群发消息
		//消息保存到数据库、redis
		if (StringUtils.isNotBlank(message)) {
			try {
				//解析发送的报文
				JSONObject jsonObject = JSON.parseObject(message);
				//追加发送人(防止串改)
				jsonObject.put("fromUserId", this.userCode);
				String toUserCode = jsonObject.getString("toUserId");
				//传送给对应toUserId用户的websocket
				if (StringUtils.isNotBlank(toUserCode) && webSocketMap.containsKey(toUserCode)) {
					webSocketMap.get(toUserCode).sendMessage(jsonObject.toJSONString());
				} else {
					log.error("请求的toUserCode:" + toUserCode + "不在该服务器上");
					//否则不在这个服务器上,发送到mysql或者redis
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * @param session
	 * @param error
	 */
	@OnError
	public void onError(Session session, Throwable error) {
		log.error("用户错误:" + this.userCode + ",原因:" + error.getMessage());
		error.printStackTrace();
	}

	/**
	 * 实现服务器主动推送
	 */
	public void sendMessage(String message) throws IOException {
		this.session.getBasicRemote().sendText(message);
	}

	/**
	 * 发送自定义消息
	 */
	public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
		log.info("发送消息到:" + userId + ",报文:" + message);
		if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
			webSocketMap.get(userId).sendMessage(message);
		} else {
			log.error("用户" + userId + ",不在线!");
		}
	}

	public static synchronized int getOnlineCount() {
		return onlineCount;
	}

	public static synchronized void addOnlineCount() {
		WebSocketServer.onlineCount++;
	}

	public static synchronized void subOnlineCount() {
		WebSocketServer.onlineCount--;
	}

}

4、前端代码

if (typeof (WebSocket) == "undefined") {
					console.log("您的浏览器不支持WebSocket");
				} else {
					const _this = this;
					console.log("您的浏览器支持WebSocket");
					//实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
					//等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
					//var socketUrl="${request.contextPath}/im/"+$("#userId").val();
					var socketUrl = "http://localhost:80/socketServer/" + this.userCode;
					socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
					console.log(socketUrl);
					if (this.socket != null) {
						this.socket.close();
						this.socket = null;
					}
					this.socket = new WebSocket(socketUrl);
					//打开事件
					this.socket.onopen = function () {
						console.log("websocket已打开");
						//socket.send("这是来自客户端的消息" + location.href + new Date());
					};
					//获得消息事件
					this.socket.onmessage = function (msg) {
						console.log(msg.data);
						//发现消息进入    开始处理前端触发逻辑
						_this.getList();
					};
					//关闭事件
					this.socket.onclose = function () {
						console.log("websocket已关闭");
					};
					//发生了错误事件
					this.socket.onerror = function () {
						console.log("websocket发生了错误");
					}
				}

5、测试是否可以连接
在这里插入图片描述
6、加入redis发布订阅
(1)redis发布消息
topic:topic名称
msg:发送的消息内容

@Component
public class RedisTopicSendTemplate {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 发送消息
	 * @param msg
	 */
	public void sendMsg(String topic, String msg) {
		redisTemplate.convertAndSend(topic, msg);
	}
}

(2)监听消息

@Component
@Slf4j
public class SysUserListener implements MessageListener {

	/**
	 * 消息监听
	 * @param message
	 * @param pattern
	 */
	@SneakyThrows
	@Override
	public void onMessage(Message message, byte[] pattern) {
		log.info("监听到redis消息:{}", message);
		byte[] body = message.getBody();
		if (body.length > 0){
			String userCode = new String(message.getBody(), StandardCharsets.UTF_8).trim().replaceAll("\"", "");
			// 发送消息
			WebSocketServer.sendInfo(userCode, userCode);
		}
	}
}
@Configuration
public class SubscriptionConfig {

	@Bean
	MessageListenerAdapter messageListener() {
		return new MessageListenerAdapter(new SysUserListener());
	}

	@Bean
	RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(factory);
		// 支持多个topic
		container.addMessageListener(messageListener(), new ChannelTopic(RedisConstant.SYS_USER_TOPIC));
		return container;
	}
}

7、nginx配置
在这里插入图片描述

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-05 11:56:47  更:2022-05-05 11:59:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/29 11:22:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计