SpringCloud ZUUL集群 + Nginx + Redis 实现Websocket向客户端推送消息
简介
本文主要是针对分布式场景下的使用websocket的一个解决方案。很遗憾的是,websocketsession是不支持序列化操作,所以也就不可能存在redis中。
我们知道在单节点中我们只需要把websocketsession存储在Map中就OK,每次发送通知都从map中根据clientID获取对应的websocket的session进行消息通知。但是在分布式多节点的系统中,每个节点的websocketsession是存在当前节点的内存中的,当A服务向A客户端推送消息时,B服务并不知道,此时B客户端就会无动于衷。所以存在websocketsession共享的问题,本文通过redis订阅广播的消息实现多节点服务同时向客户端推送消息。
Nginx配置
Nginx配置:
-
Windows Nginx安装: 官网下载链接: link. 选择Windows版本下载并解压 双击nginx.exe即可启动nginx 解压之后把前端文件放在nginx服务器上(nginx文件夹下) -
在启动Nginx之前要配置nginx.conf,配置文件在conf文件夹下
#user nobody;
worker_processes 1;
error_log logs/error.log;
error_log logs/error.log notice;
error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
# 开启nginx对websocket的支持
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
#配置上游服务器网关端口集群
#upstream backServer{
# 127.0.0.1:81 为网关地址 weight 为权重,值越大,访问到该台网关的几率越大
#server 127.0.0.1:8999 weight=1;
#server 127.0.0.1:82 weight=1;
#}
#配置上游服务器 集群,默认轮询机制
upstream backServer{
#每个请求按访问ip的hash结果分配,这样每个访客固定访问一个后端服务器,可以解决session的问题.可查看参考Nginx的Upstream5种分配的方式
ip_hash;
server 127.0.0.1:8999; # 网关地址
server 127.0.0.1:8777; # 网关地址
# 补充: backup表示从服务器或者叫备用服务器 只有当主服务器(81、82端口)都不能访问时才会访问此(83端口)备用服务器 当主服务器恢复正常后 则访问主服务器
#server 127.0.0.1:83 backup;
}
server {
# 监听的请求地址及端口号
listen 8200;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root C:/nginx/nginx-1.21.1/front/;
index login.html;
}
# /aaa 代表请求地址中包含/aaa的会被分发到网关地址
location /aaa {
proxy_pass http://backServer;
proxy_redirect default;
# 开启跨域支持
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods *;
add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
proxy_set_header Host $host:$server_port;
proxy_set_header X-Real-IP $remote_addr;
}
# /websocket-env-data 拦截并分发到websocket的地址
location /websocket-env-data {
proxy_pass http://backServer;
proxy_redirect default;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods *;
add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
proxy_set_header Host $host:$server_port;
proxy_set_header X-Real-IP $remote_addr;
proxy_http_version 1.1;
# 开启nginx对websocket的支持,会将http请求转为websocket请求
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /login.html {
root front;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}
Zuul websocket配置
- zuul集群此处不再详细配置,具体参考网上教程
- MyRequestInterceptor.java
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import javax.servlet.http.HttpServletRequest;
@Configuration
@Component
public class MyRequestInterceptor implements RequestInterceptor {
@Autowired
HttpServletRequest request;
@Override
public void apply(RequestTemplate requestTemplate) {
try {
String sessionId = RequestContextHolder.currentRequestAttributes().getSessionId();
if (null != sessionId) {
requestTemplate.header("Cookie", "SESSION=" + sessionId);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
SessionConfig.java
import jdk.nashorn.internal.runtime.GlobalConstants;
import org.springframework.context.annotation.Configuration;
import org.springframework.session.data.redis.RedisFlushMode;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
@Configuration
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 86400*30, redisFlushMode = RedisFlushMode.IMMEDIATE)
public class SessionConfig {
}
WebSocketFilter.java
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
@Component
public class WebSocketFilter extends ZuulFilter {
@Override
public String filterType() {
return "pre";
}
@Override
public int filterOrder() {
return 0;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
HttpServletRequest request = context.getRequest();
System.out.println("request1:"+request.getHeader("host").toString());
String upgradeHeader = request.getHeader("Upgrade");
if (null == upgradeHeader) {
upgradeHeader = request.getHeader("upgrade");
}
if (null != upgradeHeader && "websocket".equalsIgnoreCase(upgradeHeader)) {
context.addZuulRequestHeader("connection", "Upgrade");
}
return null;
}
}
在ZuulApplication开启Websocket过滤
@Bean
public WebSocketFilter webSocketFilter() {
return new WebSocketFilter();
}
在zuul的配置文件中加入
## @FeignClient(value = "服务名r") 设置可以有多个类存在相同的FeignClient 中的value值
spring.main.allow-bean-definition-overriding=true
在pom.xml中加入相关jar包
<!-- https://mvnrepository.com/artifact/org.springframework.session/spring-session-data-redis -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-redis -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
Redis配置及websocket配置
RedisConfig
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisTemplate<String, Object> functionDomainRedisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
initDomainRedisTemplate(redisTemplate, redisConnectionFactory);
return redisTemplate;
}
private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setConnectionFactory(factory);
}
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
}
@Bean
public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForValue();
}
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForList();
}
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForSet();
}
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForZSet();
}
}
RedisMsg
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
@Component
public interface RedisMsg {
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception;
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception;
boolean supportsPartialMessages();
public void receiveMessage(String message);
}
RedisPublishConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class RedisPublishConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(RedisMsg receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
SpringUtilsCopy WebsocketHandler不支持自动注入
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;
@Component
public class SpringUtilsCopy implements BeanFactoryPostProcessor {
private static ConfigurableListableBeanFactory beanFactory;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtilsCopy.beanFactory = beanFactory;
}
public static ConfigurableListableBeanFactory getBeanFactory() {
return beanFactory;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) getBeanFactory().getBean(name);
}
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = (T) getBeanFactory().getBean(clz);
return result;
}
public static boolean containsBean(String name) {
return getBeanFactory().containsBean(name);
}
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().isSingleton(name);
}
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getType(name);
}
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getAliases(name);
}
}
WebSocketClient
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class WebSocketClient {
@Autowired
private RedisTemplate redisTemplate;
public void pushInfo(String messageContent,String userName){
JSONObject result = new JSONObject();
if(StringUtil.isBlank(messageContent)) {
result.put("result", "error");
}else {
try {
JSONObject message = new JSONObject();
message.put("userName", userName);
message.put("message", messageContent);
redisTemplate.convertAndSend("chat",message.toString());
} catch (Exception e) {
e.printStackTrace();
log.error("推送给客户端失败");
}
result.put("result", "success");
}
return ;
}
public void pushInfoToAll(String messageContent){
JSONObject result = new JSONObject();
if(StringUtil.isBlank(messageContent)) {
result.put("result", "error");
}else {
try {
JSONObject message = new JSONObject();
message.put("userName", "");
message.put("message", messageContent);
redisTemplate.convertAndSend("chat",message.toString());
} catch (Exception e) {
e.printStackTrace();
log.error("推送给客户端失败");
}
result.put("result", "success");
}
return ;
}
public void pushInfoToUsers(String messageContent, List<SysUser> userList){
JSONObject result = new JSONObject();
if(StringUtil.isBlank(messageContent)) {
result.put("result", "error");
}else {
try {
userList.stream().forEach(p ->{
JSONObject message = new JSONObject();
message.put("userName", p.getLoginName());
message.put("message", messageContent);
redisTemplate.convertAndSend("chat",message.toString());
});
} catch (Exception e) {
e.printStackTrace();
log.error("推送给客户端失败");
}
result.put("result", "success");
}
return ;
}
}
WebSocketConfig
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new CTIHandler(), "/websocket-env-data/{userId}")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*");
registry.addHandler(new CTIHandler(), "/sock-js")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*")
.withSockJS();
registry.addHandler(new CTIHandler(), "/websocket-env-data1")
.addInterceptors(new WebSocketInterceptor())
.setAllowedOrigins("*");
}
}
WebSocketInterceptor
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import java.util.Map;
@Slf4j
public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse seHttpResponse,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String[] aa = serverHttpRequest.getURI().toString().split("/");
String userName = aa[aa.length-1];
attributes.put("userName", userName);
log.info("握手之前");
return super.beforeHandshake(serverHttpRequest, seHttpResponse, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception ex) {
log.info("握手之后");
super.afterHandshake(request, response, wsHandler, ex);
}
}
CTIHandler
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class CTIHandler implements WebSocketHandler,RedisMsg{
private static ConcurrentHashMap<String, WebSocketSession> socketMap = new ConcurrentHashMap<String, WebSocketSession>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("websocket连接成功");
String userName = (String) session.getAttributes().get("userName");
if(socketMap.get(userName)==null) {
socketMap.put(userName,session);
sendMessageToUser(userName, new TextMessage("链接建立成功"));
}
}
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
String userName = (String) webSocketSession.getAttributes().get("userName");
synchronized (webSocketSession){
webSocketSession.sendMessage(new TextMessage("aaa"));
}
sendMessageToUser(userName, new TextMessage("我收到你的信息了"));
}
public boolean sendMessageToUser(String clientId, TextMessage message) {
socketMap.forEach((key, value) -> {
boolean flag = true;
if(StringUtil.isNotBlank(clientId)){
if(key.contains(clientId+"websocket")){
WebSocketSession session = value;
if(session==null) {
flag = false;
}
if (!session.isOpen()) {
flag = false;
}
try {
if(flag){
synchronized (session){
session.sendMessage(message);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}else{
WebSocketSession session = value;
if(session==null) {
flag = false;
}
if (!session.isOpen()) {
flag = false;
}
try {
if(flag){
synchronized (session){
session.sendMessage(message);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
return true;
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
session.close();
}
log.info("连接出错");
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String userName = (String) session.getAttributes().get("userName");
if(socketMap.get(userName)!=null) {
socketMap.remove(userName);
}
log.info("连接已关闭:" + status);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
@Override
public void receiveMessage(String message) {
JSONObject sendMsg = JSONObject.parseObject(message.substring(message.indexOf("{")));
String clientId = sendMsg.getString("userName");
TextMessage receiveMessage = new TextMessage(sendMsg.getString("message"));
boolean flag = sendMessageToUser(clientId, receiveMessage);
if(flag) {
log.info("推送消息("+sendMsg.getString("message")+")成功!!!");
}
}
}
调用WebSocketClient的pushInfo方法即可实现推送
前端代码
let callback = null;
let ws = null;
let close = null;
let lockReconnect = false;
let stop = false;
let closeCallback = null;
let errorCallback = null
function getIp() {
var ip
if (httpUrl) {
return ip = httpUrl.split(":")[1]
}
}
function initWebSocket() {
if (typeof WebSocket == "undefined") {
console.log("当前浏览器 Not support websocket");
} else {
var loginName = JSON.parse(localStorage.getItem('userInfo')).loginName;
var userToken = localStorage.getItem('token');
var userFlag = loginName + "websocket" + userToken;
var wsUrl = "ws:" + getIp() + ":8080/websocket-env-data/"+userFlag;
console.log("websocket URL:"+wsUrl);
if (window.soketFlag == null) {
ws = new WebSocket(wsUrl);
window.soketFlag = ws;
} else {
ws = window.soketFlag;
}
ws.onopen = function () {
heartCheck.reset().start();
console.log("WebSocket连接成功");
};
ws.onerror = function () {
console.log("WebSocket连接失败1");
reconnect(wsUrl);
console.log("WebSocket连接失败2");
if (typeof errorCallback === "function") {
errorCallback("WebSocket连接失败");
}
};
ws.onclose = function () {
reconnect(wsUrl);
console.log("WebSocket关闭");
if (typeof closeCallback === "function") {
closeCallback("WebSocket关闭");
}
};
close = ws.onclose;
ws.onmessage = function (e) {
heartCheck.reset().start();
if (typeof callback === "function") {
callback(e.data);
}
};
}
}
function setStop() {
stop = true;
}
function reconnect(url) {
if (stop) {
return;
}
if (lockReconnect) {
return;
}
lockReconnect = true;
setTimeout(function () {
console.log("重连中");
initWebSocket();
lockReconnect = false;
}, 2000);
}
var heartCheck = {
timeout: 1000 * 25,
timeoutObj: null,
reset: function () {
clearTimeout(this.timeoutObj);
return this;
},
start: function () {
var self = this;
this.timeoutObj = setTimeout(function () {
ws.send("HeartBeat");
}, this.timeout);
}
};
function websock(sendData) {
if (ws.readyState === ws.OPEN) {
} else if (ws.readyState === ws.CONNECTING) {
setTimeout(function () {
websock(sendData);
}, 1000);
} else if (ws.readyState === ws.CLOSED) {
setTimeout(function () {
initWebSocket();
websock(sendData);
}, 1000);
} else {
setTimeout(function () {
websock(sendData);
}, 1000);
}
}
function bingWebsockMsg(call) {
callback = call;
}
function wsOnCloseMsg(call) {
closeCallback = call;
}
function wsOnErrorMsg(call) {
errorCallback = call
}
function closeWs() {
close();
}
function divShow(e) {
if (e == "消息发布") {
wsFlag = true
isHasMsg2()
}
}
|