场景
若依(基于SpringBoot的权限管理系统)集成MobileIMSDK实现IM服务端的搭建:
若依(基于SpringBoot的权限管理系统)集成MobileIMSDK实现IM服务端的搭建_霸道流氓气质的博客-CSDN博客_mobileimsdk
在上面将MobileIMSDK集成到SpringBoot的基础上,怎样对在线的IM用户进行存储记录,并实现
群发消息功能。
这里的对方账号为0代表是给服务端发送消息,在服务端收到消息的回调中群发给所有的用户。
注:
博客: 霸道流氓气质的博客_CSDN博客-C#,架构之路,SpringBoot领域博主 关注公众号 霸道的程序猿 获取编程相关电子书、教程推送与免费下载。
实现
1、实现记录所有在线用户的逻辑可参考如下
Java中使用Map存储在线用户的集合(登录新增、退出移除)-SpringBoot中集成websocket示例:
Java中使用Map存储在线用户的集合(登录新增、退出移除)-SpringBoot中集成websocket示例_霸道流氓气质的博客-CSDN博客
新建一个IMUsers类用来存储用户的相关信息
package com.chrisf.imextend;
?
import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.chrisf.sdk.protocal.Protocal;
import com.chrisf.sdk.utils.LocalSendHelper;
import io.netty.channel.Channel;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
/**
?* im 客户端用户集
?*
?* @author
?*/
public class ImUsers {
??? private static ExecutorService pool = ExecutorBuilder.create()
??????????? .setCorePoolSize(20)//初始20个线程
??????????? .setMaxPoolSize(40)//最大40个线程
??????????? .setWorkQueue(new LinkedBlockingQueue<>(60))//有界等待队列,最大等待数是60
??????????? .setThreadFactory(ThreadFactoryBuilder.create().setNamePrefix("IM-Pool-").build())//设置线程前缀
??????????? .build();
??? /**
???? * 用户集
???? */
??? private static Map<String, Channel> USERS = new ConcurrentHashMap<String, Channel>();
??? /**
???? * 存储用户
???? *
???? * @param key???? 唯一键
???? * @param session 用户信息
???? */
??? public static void put(String key, Channel session) {
??????? USERS.put(key, session);
??? }
??? /**
???? * 移除用户
???? *
???? * @param session 用户信息
???? * @return 移除结果
???? */
??? public static boolean remove(Channel session) {
??????? String key = null;
??????? boolean flag = USERS.containsValue(session);
??????? if (flag) {
??????????? Set<Map.Entry<String, Channel>> entries = USERS.entrySet();
??????????? for (Map.Entry<String, Channel> entry : entries) {
??????????????? Channel value = entry.getValue();
??????????????? if (value.equals(session)) {
??????????????????? key = entry.getKey();
??????????????????? break;
??????????????? }
??????????? }
??????? } else {
??????????? return true;
??????? }
??????? return remove(key);
??? }
??? /**
???? * 移出用户
???? *
???? * @param key 键
???? */
??? public static boolean remove(String key) {
??????? Channel remove = USERS.remove(key);
??????? if (remove != null) {
??????????? boolean containsValue = USERS.containsValue(remove);
??????????? return containsValue;
??????? } else {
??????????? return true;
??????? }
??? }
??? /**
???? * 获取在线用户列表
???? *
???? * @return 返回用户集合
???? */
??? public static Map<String, Channel> getUsers() {
??????? return USERS;
??? }
??? /**
???? * 群发消息文本消息
???? *
???? * @param protocal 消息内容
???? */
??? public static void sendMessageToUsersByText(Protocal protocal) {
??????? Collection<Channel> values = USERS.values();
??????? for (Channel value : values) {
??????????? pool.submit(() -> sendMessageToUserByText(value, protocal));
??????? }
??? }
??? /**
???? * 发送消息
???? *
???? * @param session
???? * @param protocal
???? */
??? public static void sendMessageToUserByText(Channel session, Protocal protocal) {
??????? if (session != null) {
??????????? synchronized (session) {
??????????????? try {
??????????????????? LocalSendHelper.sendData(session,protocal,null);
??????????????? } catch (Exception e) {
??????????????????? e.printStackTrace();
??????????????? }
??????????? }
??????? } else {
??????? }
??? }
}
注意这里的群发消息时使用的自定义线程池,并且使用的Hutool的ExecutorBuilder。
所以需要引入Hutool的依赖
??????? <dependency>
??????????? <groupId>cn.hutool</groupId>
??????????? <artifactId>hutool-all</artifactId>
??????????? <version>5.8.3</version>
??????? </dependency>
这里考虑了群发消息频率较高且用户较大的情况,如果不存在可以直接将自定义线程池去掉,然后在群发消息
时直接用循环就可以。
2、上面群发消息时调用的
LocalSendHelper.sendData(session,protocal,null);
这是官方文档说明的发送消息的方式。
其中第三个参数resultObserver根据自己业务需要决定,这里设置为null。
群发消息就是获取所有的用户,然后遍历调用发送消息的方法。
存储用户信息使用的
private static Map<String, Channel> USERS = new ConcurrentHashMap<String, Channel>();
其中String为key,可使用用户的唯一标识,Channel是IM中集成Netty的代表客户端连接对应的netty会话。
3、然后在IM用户登录和退出登录的回调方法中对用户进行添加和移除
找到ServerEventListenerImpl下的onUserLoginSucess
?@Override
?public void onUserLoginSucess(String userId, String extra, Channel session)
?{
??//添加用户
??ImUsers.put(userId,session);
??logger.debug("【IM_回调通知OnUserLoginAction_CallBack】用户:"+userId+" 上线了!");
?}
以及onUserLogout?
@Override
?public void onUserLogout(String userId, Object obj, Channel session)
?{
??//移除用户
??ImUsers.remove(userId);
??logger.debug("【DEBUG_回调通知OnUserLogoutAction_CallBack】用户:"+userId+" 离线了!");
?}
这样就实现了在用户登录和退出登录之后对用户集合进行添加和删除。
4、实现群发消息的逻辑处理
找到ServerEventListenerImpl下的onTransferMessage4C2S回调方法,此方法是
收到客户端发送给“服务端”的数据回调通知(即:消息路径为“C2S”的消息)
所以在此回调方法中进行群发消息的逻辑处理,届时客户端调用给服务端发送消息的api即可,当接收方为0代表接收目标是
服务器。
?
?
这里可以通过和客户端约定添加typeu字段并指定某个值为群发功能,然后调用ImUsers的群发的方法。
?@Override
?public boolean onTransferMessage4C2S(Protocal p, Channel session)
?{
??// 接收者uid
??String userId = p.getTo();
??// 发送者uid
??String from_user_id = p.getFrom();
??// 消息或指令内容
??String dataContent = p.getDataContent();
??// 消息或指令指纹码(即唯一ID)
??String fingerPrint = p.getFp();
??// 【重要】用户定义的消息或指令协议类型(开发者可据此类型来区分具体的消息或指令)
??int typeu = p.getTypeu();
??//聊天室群发
??if(ImTypeUEnum.CHAT_ROOM.getCode() == typeu)
??{
???ImUsers.sendMessageToUsersByText(p);
??}
??logger.debug("【DEBUG_回调通知】[typeu="+typeu+"]收到了客户端"+from_user_id+"发给服务端的消息:str="+dataContent);
??return true;
?}
其中ImTypeUEnum为自定义的指令值枚举类。
package com.chrisf.enums;
/**
?* Im收到Server端的typeU指令类型
?*/
public enum ImTypeUEnum {
??? CHAT_ROOM(1, "聊天室群发");
??? private final Integer code;
??? private final String info;
??? ImTypeUEnum(Integer code, String info)
??? {
??????? this.code = code;
??????? this.info = info;
??? }
??? public Integer getCode()
??? {
??????? return code;
??? }
??? public String getInfo()
??? {
??????? return info;
??? }
}
|