- 假设有这样一个需求,前端需要实时提醒数据库进入的数据,或者监听数据库里面的数据里面有什么变动,需要后端及时的提醒前端消息,这里可以使用websocket实现后端主动推送数据给前端,可以用redis保存每个用户的session,然后分别给不同用户发送不同的消息,这个例子我用的是redis中的发布订阅的功能(subscribe,publish命令)
- redis中可以用subscribe …(例如 subscribe channer) 订阅了channer这个频道,换句话说就是创建了这个频道,哪个用户订阅了channer这个频道,就可以接收到里面的数据,可以用publish…(例如:sublish channer “往channer频道存入数据”)往频道中发送数据
- 这里是看别人的代码拉取下来学习看的,这里做一个笔记
- 下面先看代码
- 先把项目结构贴出来
- pom.xml文件的依赖
<dependencies>
<!-- web应用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 操作redis第三方工具-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websoket依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
- 项目启动的时候就加载,将订阅的频道给创建出来,就是相当于我已经订阅 了这三个频道了(订阅哪几个频道可以自己进行设置)
package com.demo.redisandwebsocket.config;
import com.demo.redisandwebsocket.listener.SubscribeListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
List<Topic> list = new ArrayList<>();
list.add(new PatternTopic("ZHOU"));
list.add(new PatternTopic("DA"));
list.add(new PatternTopic("TOU"));
container.addMessageListener(new SubscribeListener(),list);
return container;
}
}
- 然后自定义SubscribeListener类,实现MessageListener,这个类是监听redis中的频道是否有数据进来了,如果有数据进来这个监听器就会监听到,并且触发里面onMessage方法
package com.demo.redisandwebsocket.listener;
import com.demo.redisandwebsocket.web.WebSocketServer;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import javax.annotation.Resource;
import javax.websocket.Session;
import java.io.IOException;
public class SubscribeListener implements MessageListener {
@Resource
private WebSocketServer webSocketServer;
private Session session;
public Session getSession() { return session; }
public void setSession(Session session) {
this.session = session;
}
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = new String(message.getBody());
System.out.println(new String(pattern) + "主题发布:" + msg);
if (null != session && session.isOpen()) {
try {
session.getBasicRemote().sendText(msg);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- 然后是WebSocketConfig类,这个类是初始化websocket的配置,也是初始化的时候将他对象注入到sping容器当中,因为websocket连接是基于http协议连接的,连接上以后就跟http协议没有关系了,然后就基于websocket进行连接了
package com.demo.redisandwebsocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
- springUtils工具类,目前没有能力解释,希望有人帮我解释一下
package com.demo.redisandwebsocket.util;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;
@Component
public final class SpringUtils implements BeanFactoryPostProcessor {
private static ConfigurableListableBeanFactory beanFactory;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
public static ConfigurableListableBeanFactory getBeanFactory() {
return beanFactory;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) getBeanFactory().getBean(name);
}
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = (T) getBeanFactory().getBean(clz);
return result;
}
public static boolean containsBean(String name) {
return getBeanFactory().containsBean(name);
}
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().isSingleton(name);
}
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getType(name);
}
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getAliases(name);
}
}
- 这里就是将消息发送到前端的websocket的类了,里面有四个方法(四个注解)
package com.demo.redisandwebsocket.web;
import com.demo.redisandwebsocket.listener.SubscribeListener;
import com.demo.redisandwebsocket.util.SpringUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@ServerEndpoint("/websocket/server")
public class WebSocketServer {
private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);
private static AtomicInteger onlineCount=new AtomicInteger(0);
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
private Session session;
private SubscribeListener subscribeListener;
@OnOpen
public void onOpen(Session session){
this.session = session;
webSocketSet.add(this);
addOnlineCount();
System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
subscribeListener = new SubscribeListener();
subscribeListener.setSession(session);
List<Topic> list = new ArrayList<>();
list.add(new PatternTopic("ZHOU"));
list.add(new PatternTopic("DA"));
list.add(new PatternTopic("TOU"));
redisMessageListenerContainer.addMessageListener(subscribeListener, list);
}
@OnClose
public void onClose() throws IOException {
webSocketSet.remove(this);
subOnlineCount();
redisMessageListenerContainer.removeMessageListener(subscribeListener);
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
}
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
for(WebSocketServer item: webSocketSet){
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
continue;
}
}
}
@OnError
public void onError(Session session, Throwable error){
System.out.println("发生错误");
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public int getOnlineCount() {
return onlineCount.get();
}
public void addOnlineCount() {
WebSocketServer.onlineCount.getAndIncrement();
}
public void subOnlineCount() {
WebSocketServer.onlineCount.getAndDecrement();
}
}
在这里插入代码片<!doctype html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8"></meta>
<title>websocket</title>
</head>
<h4>
使用redis订阅消息和websocket实现消息推送
</h4>
<br/>
<h5>收到的订阅消息:</h5>
<div id="message_id"></div>
</body>
<script type="text/javascript">
var websocket = null;
if("WebSocket" in window){
var url = "ws://localhost:8080/demo/websocket/server";
websocket = new WebSocket(url);
}else{
alert("浏览器不支持websocket");
}
websocket.onopen = function(event){
setMessage("打开连接");
}
websocket.onclose = function(event){
setMessage("关闭连接");
}
websocket.onmessage = function(event){
setMessage(event.data);
}
websocket.onerror = function(event){
setMessage("连接异常");
}
window.onbeforeunload = function(){
closeWebsocket();
}
function closeWebsocket(){
if(3!=websocket.readyState){
websocket.close();
}else{
alert("websocket之前已经关闭");
}
}
function setMessage(message){
document.getElementById('message_id').innerHTML += message + '<br/>';
}
</script>
</html>
如果我推送消息到ZHOU或者DA或者TOU,前端就会立马的收到消息 有兴趣的人可以用这个做一个聊天室,因为都是实时提醒的
- 后面贴出效果图
- 这里需要注意一下,如果在项目初始化的时候并没有创建的频道,消息是推送不进去的
- 这里顺便把配置文件贴出来application.properties
server.port=8080
server.servlet.context-path=/demo
server.tomcat.uri-encoding=utf-8
#reids config
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.jedis.pool.max-active=30
启动项目访问localhost:8080/demo/websocket.html就可以测试了 到这里就已经结束了 这个是一个redis中的(消息的发布/订阅)和websocket的消息推送整合
源码:https://gitee.com/zhou-datou/redis-websocket
|