解决webSocket在针对指定用户推送消息时,需要保存用户id至redis中,保存redis时会报Null pointer exception
最近在做一个项目遇到,websocket针对指定用户推送消息时,需要保存用户的socketId至redis中,存入redis中的key是一个字符串常量"socketNumber",value值是socketId的json串,每当用户建立连接之后,调用存储socketId到redis这一步就会报空指针异常。 定义的key是一个静态常量,没理由存不到redis中,百思不得其解。接下来先贴一段报空指针异常的websocket服务类方法,然后分析下报错原因,给出解决方法。
websocket服务类方法
导入redisClient工具类方式如下: 设置redis的key是静态常量 redis存储用户userId方法,每次断点过redisClient.set(socketNumber ,JsonUtil.toJson(socketIds));这部分总会报空指针,但是socketNumber定义的是静态常量,没有道理值不存在。
@ServerEndpoint(value = "/webSocket/{socketId}", encoders = {WebSocketCustomEncoding.class})
@Component
public class WebSocketServer {
private static final AriesJcLogger logger = AriesJcLoggerFactory.getLogger(WebSocketServer.class);
private static AtomicInteger online = new AtomicInteger();
private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
private static List<String> socketIds = new ArrayList<>();
public static String socketNumber ="socketNumber";
@Autowired
private RedisClient redisClient;
public void sendMessage(Session session, BusinessRealTimeData message) throws IOException, EncodeException {
if (session != null) {
session.getBasicRemote().sendObject(message);
}
}
@OnOpen
public void onOpen(Session session, @PathParam(value = "socketId") String socketId) {
socketIds.add(socketId);
redisClient.set(socketNumber ,JsonUtil.toJson(socketIds));
sessionPools.put(socketId, session);
addOnlineCount();
logger.info("====加入新连接session:{}!当前在线人数为:{}", session.getId());
}
@OnClose
public void onClose(@PathParam(value = "socketId") String socketId) {
redisClient.removeForSet(socketNumber, socketId);
String plate = redisClient.get("plate_" + socketId);
List<String> plates = JSONObject.parseArray(plate,String.class);
redisClient.removeForSet("plate_"+socketId,plates);
String camera = redisClient.get("camera_"+socketId);
List<String> cameras = JSONObject.parseArray(camera,String.class);
redisClient.removeForSet("camera_"+socketId,cameras);
sessionPools.remove(socketId);
subOnlineCount();
logger.info("====关闭连接userid:{}", socketId);
}
@OnMessage
public void onMessage(String message) throws IOException {
logger.debug("====来自客户端的消息session:{}---message:{}", message);
for (Session session : sessionPools.values()) {
try {
BusinessRealTimeData businessRealTimeData = JSONObject.parseObject(message, BusinessRealTimeData.class);
sendMessage(session, businessRealTimeData);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
@OnError
public void onError(Session session, Throwable throwable) {
logger.error("websocket 发生错误", throwable);
throwable.printStackTrace();
}
public void sendInfo(String socketId, BusinessRealTimeData message) {
logger.debug("发送指定客户端的消息:{}", JsonUtil.toJson(message));
logger.debug("发送指定客户端socketId:{}", JsonUtil.toJson(socketId));
Session session = sessionPools.get(socketId);
try {
sendMessage(session, message);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendtoAll(BusinessRealTimeData message) throws IOException {
logger.debug("发送客户端的消息:{}", JsonUtil.toJson(message));
for (Session session : sessionPools.values()) {
try {
sendMessage(session, message);
} catch (Exception e) {
e.printStackTrace();
continue;
}
问题分析
在@ServerEndpoint注解类中使用注解@Autowired注入RedisClient,注入RedisClient失败,因此报空指针异常。 为什么在websocket中不能通过@Autowire注解注入呢? 由于websocket本身是线程安全的,建立连接时候就会创建一个端点实例,一个websocket可以建立多个连接,因此会创建多个端点实例对象。@Autowired注解在Spring初始化对象实例时,会调用此构造函数,进行对象的实例化。项目初始化时,初始化一个websocket对象,此时的websocket对象没有建立连接,成功注入容器。当用户建立了连接,又会重新创建一个新的websocket,由于此前已经注入了一个没有建立连接的websocket对象,Spring默认支持单例模式,因此后边建立连接的websocket对象就不能被注入了,因此会报空指针异常。
解决方法
如何解决无法注入的问题?可以通过使用 getBean的方法主动获取实例。
@Component
public final class SpringUtils implements BeanFactoryPostProcessor {
private static ConfigurableListableBeanFactory beanFactory;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
public static ConfigurableListableBeanFactory getBeanFactory() {
return beanFactory;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
if (getBeanFactory() == null) {
System.out.println("本地调试Main模式,没有BeanFactory,忽略错误");
return null;
} else {
T result = (T) getBeanFactory().getBean(name);
return result;
}
}
public static <T> T getBean(Class<T> name) throws BeansException {
if (getBeanFactory() == null) {
System.out.println("本地调试Main模式,没有BeanFactory,忽略错误");
return null;
} else {
T result = (T) getBeanFactory().getBean(name);
return result;
}
}
public static boolean containsBean(String name) {
return getBeanFactory().containsBean(name);
}
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().isSingleton(name);
}
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getType(name);
}
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return getBeanFactory().getAliases(name);
}
}
-在websocket服务类中可以通过@Resource注解通过 private RedisClient redisClient = SpringUtils.getBean(RedisClient.class) ;方式导入redis工具类
@ServerEndpoint(value = "/webSocket/{socketId}", encoders = {WebSocketCustomEncoding.class})
@Component
public class WebSocketServer {
private static final AriesJcLogger logger = AriesJcLoggerFactory.getLogger(WebSocketServer.class);
private static AtomicInteger online = new AtomicInteger();
private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
private static List<String> socketIds = new ArrayList<>();
public static String socketNumber ="socketNumber";
@Resource
private RedisClient redisClient = SpringUtils.getBean(RedisClient.class);
public void sendMessage(Session session, BusinessRealTimeData message) throws IOException, EncodeException {
if (session != null) {
session.getBasicRemote().sendObject(message);
}
}
@OnOpen
public void onOpen(Session session, @PathParam(value = "socketId") String socketId) {
socketIds.add(socketId);
redisClient.set(socketNumber ,JsonUtil.toJson(socketIds));
sessionPools.put(socketId, session);
addOnlineCount();
logger.info("====加入新连接session:{}!当前在线人数为:{}", session.getId());
}
@OnClose
public void onClose(@PathParam(value = "socketId") String socketId) {
redisClient.removeForSet(socketNumber, socketId);
String plate = redisClient.get("plate_" + socketId);
List<String> plates = JSONObject.parseArray(plate,String.class);
redisClient.removeForSet("plate_"+socketId,plates);
String camera = redisClient.get("camera_"+socketId);
List<String> cameras = JSONObject.parseArray(camera,String.class);
redisClient.removeForSet("camera_"+socketId,cameras);
sessionPools.remove(socketId);
subOnlineCount();
logger.info("====关闭连接userid:{}", socketId);
}
@OnMessage
public void onMessage(String message) throws IOException {
logger.debug("====来自客户端的消息session:{}---message:{}", message);
for (Session session : sessionPools.values()) {
try {
BusinessRealTimeData businessRealTimeData = JSONObject.parseObject(message, BusinessRealTimeData.class);
sendMessage(session, businessRealTimeData);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
@OnError
public void onError(Session session, Throwable throwable) {
logger.error("websocket 发生错误", throwable);
throwable.printStackTrace();
}
public void sendInfo(String socketId, BusinessRealTimeData message) {
logger.debug("发送指定客户端的消息:{}", JsonUtil.toJson(message));
logger.debug("发送指定客户端socketId:{}", JsonUtil.toJson(socketId));
Session session = sessionPools.get(socketId);
try {
sendMessage(session, message);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendtoAll(BusinessRealTimeData message) throws IOException {
logger.debug("发送客户端的消息:{}", JsonUtil.toJson(message));
for (Session session : sessionPools.values()) {
try {
sendMessage(session, message);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
public static void addOnlineCount() {
online.incrementAndGet();
}
public static void subOnlineCount() {
online.decrementAndGet();
}
}
|