IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> websocket实现单发群发(处理用户不在线的情况) -> 正文阅读

[网络协议]websocket实现单发群发(处理用户不在线的情况)

代码

package com.ruoyi.utils.websocket;
 
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.system.service.ISysUserService;
import com.ruoyi.utils.domain.MessageLog;
import com.ruoyi.utils.service.IMessageLogService;
import com.ruoyi.utils.service.IMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.utils.domain.Message;
import com.ruoyi.framework.web.service.TokenService;


 
/**
 * websocket
 *
 */
@Controller
@ServerEndpoint(value = "/websocket")
public class MyWebSocketController {
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    //设置为静态的 公用一个消息map ConcurrentMap为线程安全的map  HashMap不安全
    private static ConcurrentMap<String, Map<String, List<Object>>> messageMap = new ConcurrentHashMap<>();

    private String sender;//发送人

	private static IMessageService messageService;
	private static TokenService tokenService;
	private static IMessageLogService messageLogService;
	private static ISysUserService userService;

	@Autowired
	public void setIMessageService(IMessageService messageService){
		MyWebSocketController.messageService = messageService;
	}
	@Autowired
	public void setISysUserService(ISysUserService userService){
		MyWebSocketController.userService = userService;
	}

	@Autowired
	public void setTokenService(TokenService tokenService){
		MyWebSocketController.tokenService = tokenService;
	}

	@Autowired
	public void setMessageLogService(IMessageLogService messageLogService){
		MyWebSocketController.messageLogService = messageLogService;
	}

	String key = "";//当前用户id
    /**
     * /**
     * 连接建立成功调用的方法
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     * @throws Exception
     */
    @OnOpen
    public void onOpen(Session session) throws Exception {
        this.session = session;

		String token ;
		/**
		 * session.getQueryString()  获取连接?后面的参数
		 * ws://192.168.5.136:8081/websocket/?11111111111,111111
		 * 获取 11111111111,111111
		 */

		String keString = session.getQueryString();
	  	 if(!StringUtils.isEmpty(keString)) {

			 /**
			  * 获取连接后面的参数
			  * key: 发送消息的用户(当前用户)
			  */

			 token = keString;
			 LoginUser loginUser = tokenService.getLoginUser(token);
			 if(loginUser == null){

			 	sendMessage(AjaxResult.error("登录已失效"),session);
			 	return;
			 }
			 key = loginUser.getUsername();

			 Map<String, Session> stringSessionMap = WebSocketMapUtil.get(token);
			 if(stringSessionMap == null){

				 stringSessionMap = new HashMap<>();

			 }
			 stringSessionMap.put(token,session);
			 WebSocketMapUtil.put(key, stringSessionMap);
	  	 }

    	dbOfflineMsgSend(key);
    }

	/**
	 * 用户每次连接,判断是否存在离线消息,从数据库获取
	 */
	public void dbOfflineMsgSend(String key){

		Message message = new Message();
		message.setReceive(key);
		List<Message> offlineMsg = messageService.selectOfflineMsg(message);
		if (offlineMsg != null){
			offlineMsg.forEach(msgobj -> {

				String receive = msgobj.getReceive();
				String sender = msgobj.getSender();
				String msg = msgobj.getContent();

				try {
					onMessage(msg, receive, sender,true);
				} catch (IOException e) {
					e.printStackTrace();
				}

			});

			/**
			 * 发送完离线消息就删除数据,避免下次再发送
			 */
			if(offlineMsg.size() > 0){

				messageService.deleteOfflineMessageByIds(
						offlineMsg
						.stream()
						.map(Message::getId)
						.collect(Collectors.toList()).toArray(new Long[]{}));
			}
		}else {
			System.out.println("沒有离线消息");
		}

	}

    /**
     * 收到客户端消息后调用的方法  单发
     * @param message 客户端发送过来的消息
     * @throws IOException
     */
    @OnMessage
    public void onMessage(String message)  {

    	List<String> userList = new ArrayList<>();
		userList.add("ry");
		userList.add("cg");


    	String userId=message.substring(message.lastIndexOf(";")+1,message.lastIndexOf(","));//接收消息的用户
		System.out.println("发给:"+userId);
		String sendUserId=message.substring(message.lastIndexOf("[")+1,message.lastIndexOf(";"));//发送消息的用户
		System.err.println("发消息的用户:"+sendUserId);
		this.sender = sendUserId;
		message=message.substring(0, message.lastIndexOf("["));//发送的消息
		System.err.println("客户端发来的信息:"+message);

		if (userList.size() > 1){
			System.out.println("群发");
			sendMessageAll(message,userList);
		}else {
			System.out.println("单发");
			try {
				onMessage(message,userList.get(0),sendUserId, true);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

//
//		if ("1".equals(tit)) {
//			sendMessageAll(message);
//		}else if("0".equals(tit)){
//
//
//		}

	}


	/**
	 * 发送消息
	 * @param message
	 * @param userId
	 * @param sendUserId
	 */
	public void onMessage(String message,String userId,String sendUserId,boolean flag) throws IOException {
		Map<String, Session> sessionMap = WebSocketMapUtil.get(userId);

		if(!StringUtils.isEmpty(sessionMap)){


			System.out.println("在线");
			System.out.println("当前用户登录在线数 = " + sessionMap.size());
			JSONObject jsonObject = new JSONObject();
			jsonObject.put("message",message);
			jsonObject.put("sender",sendUserId);
			jsonObject.put("user",userId);

			for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
				sendMessage(AjaxResult.success(jsonObject),entry.getValue());
//				发送完消息插入数据库,群发时不插
				if(flag)
					insertMsg(message,Arrays.asList(userId),sendUserId);

			}


		}else {
			System.out.println("不在线");
			insertDbOfflineMessage(message,sendUserId,userId);
		}
	}

	/**
	 * 插入离线消息 放数据库
	 * @param message 消息
	 * @param sendUserId 发送人
	 * @param userId 接收人
	 */
	public void insertDbOfflineMessage(String message,String sendUserId,String userId){
		/**
		 * 数据库插入离线消息
		 */
//		insertMsg(message, Arrays.asList(userId), sendUserId);
		Message msg = new Message();
		msg.setContent(message);
		msg.setSender(sendUserId);
		msg.setReceive(userId);
		messageService.insertOfflineMsg(msg);
    }
    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error){
    	System.out.println("错误");
        error.printStackTrace();
        
    }

	/**
	 * 连接关闭调用的方法
	 * @throws Exception
	 */
	@OnClose
	public void onClose() {
		//从map中删除
		String queryString = session.getQueryString();
		if (!StringUtils.isEmpty(queryString)){

			String token = queryString;
			System.out.println("关闭");
			Map<String, Session> stringSessionMap = WebSocketMapUtil.get(key);
			System.out.println("stringSessionMap = " + stringSessionMap);
			stringSessionMap.remove(token);
			WebSocketMapUtil.put(key, stringSessionMap);

		}

	}

    /**
     * 发送消息方法。
     * @param ajaxResult
     * @throws IOException
     */
    public void sendMessage(AjaxResult ajaxResult,Session session)  {
		try {
			System.out.println("session.isOpen = " + session.isOpen());
			if(session.isOpen()){
				session.getBasicRemote().sendText(ajaxResult.toString());

			}
		} catch (IOException e) {
			e.printStackTrace();
		}
//		this.insertMsg(message,receive,this.sender,0l);
	}



	public void insertMsg(String msg,List<String> receives,String sender){
		Message message = new Message();
		message.setContent(msg);
		message.setPhshType("2");
		message.setReferState(0l);//未读
		message.setType(1l);
		message.setSender(sender);
		message.setReceive(String.join(",",receives));
		messageService.insertMessage(message);

		for (String receive : receives) {
			//		插入消息日志
			Map<String, Session> sessionMap = WebSocketMapUtil.get(receive);
			if(!StringUtils.isEmpty(sessionMap)){

				MessageLog messageLog = new MessageLog();
				messageLog.setReceive(receive);
				messageLog.setSender(sender);
				messageLog.setReferState(0l);
				messageLog.setmId(message.getId());
				messageLogService.insertMessageLog(messageLog);
			}

		}

	}
 
    /**
     * 群发消息方法。
     * @param message
     */
    public void sendMessageAll(String message,List<String> users) {

    	try {
    		for (String user : users) {

    			onMessage(message,user,this.sender,false);
			}
			insertMsg(message,users,this.sender);
		}catch (IOException e) {
    		e.printStackTrace();
    	}


    }
    
    
    /**
     * 获取该用户未读的消息数量
     * @param userId 当前用户id
     * @param objectUserId  对象id
     * @return
     */
    public int getMessageCount(String userId,String objectUserId) {
    	//获取该用户所有未收的消息
    	Map<String, List<Object>> listMap=messageMap.get(userId);
    	if(listMap != null) {
    		List<Object> list=listMap.get(objectUserId);
    		if(list!=null) {
    			return listMap.get(objectUserId).size();
    		}else {
    			return 0;
    		}
        	
    	}else {
    		return 0;
    	}
    	
    }
 
 
 
 
}
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-22 13:50:08  更:2021-08-22 13:51:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 6:15:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计