一、pom
<dependencies>
<!-- 本地依赖 -->
<dependency>
<groupId>cn.piesat</groupId>
<artifactId>space-user-api</artifactId>
</dependency>
<!-- nacos 服务注册发现(客户端)依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos config 配置 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--Netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- jasypt 加解密 -->
<dependency>
<groupId>com.github.ulisesbocchio</groupId>
<artifactId>jasypt-spring-boot-starter</artifactId>
</dependency>
</dependencies>
二、bucket
package cn.piesat.space.control.socket.bucket;
import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.constants.ControlConstants;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Marclew
* @创建时间 2022/2/16 16:30
* @描述
*/
public class ControlBucket {
/**
* 定义一个channel组,管理所有的channel
* GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
*/
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放智慧屏用户与Chanel的对应信息,用于给指定用户发送消息
*/
public static ConcurrentHashMap<Long, Channel> pcUserChannelMap = new ConcurrentHashMap<>();
/**
* 存放智慧屏用户与Chanel的对应信息,用于给指定用户发送消息
*/
public static ConcurrentHashMap<Long, Channel> mobileUserChannelMap = new ConcurrentHashMap<>();
}
三、constant
package cn.piesat.space.control.socket.constants;
/**
* @author Marclew
* @date 2022/10/21 9:37
* @describe
*/
public class ControlConstants {
public final static String WS_PATH = "/control";
public final static String USERINFO_ATTRIBUTE_KEY = "userInfo";
public final static Integer CLIENT_TYPE_PC = 1;
public final static Integer CLIENT_TYPE_MOBILE = 2;
public final static Integer READER_IDLE_TIME = 30;
public final static Integer WRITER_IDLE_TIME = 60;
public final static Integer ALL_IDLE_TIME = 90;
public final static Integer MAX_FRAME_SIZE = 65536 * 10;
public final static Integer MAX_CONTENT_LENGTH = 8192;
}
四、dto
package cn.piesat.space.control.socket.dto;
import lombok.Data;
import java.io.Serializable;
/**
* @author Marclew
*/
@Data
public class ControlReqDto implements Serializable {
/**
* 目标客户端类型(1.智慧屏 2.移动端)
*/
private Integer clientType;
/**
* 消息命令
*/
private Integer cmd;
/**
* 消息内容
*/
private String message;
}
五、entity
package cn.piesat.space.control.socket.entity;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author Marclew
* @date 2022/10/20 16:42
* @describe
*/
@Data
@Builder
public class ControlUserInfo implements Serializable {
private Long userId;
private Integer clientType;
}
六、enums
1.ControlReqCmdEnum
package cn.piesat.space.control.socket.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author Marclew
*/
@Getter
@AllArgsConstructor
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum ControlReqCmdEnum {
HEART_CMD(2001, "心跳"),
CONNECT_CMD(2002, "请求投屏");
private final int value;
private final String msg;
}
2.ControlResCmdEnum
package cn.piesat.space.control.socket.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author Marclew
*/
@Getter
@AllArgsConstructor
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum ControlResCmdEnum {
HEART_CMD(2001, "心跳检测成功"),
CONNECT_CMD(2002, "请求投屏"),
ERROR_CMD(500, "异常通知");
private final int value;
private final String msg;
}
3.ResultCodeEnum
package cn.piesat.space.control.socket.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author Marclew
*/
@Getter
@AllArgsConstructor
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum ResultCodeEnum {
SUCCESS(200, "发送成功"),
PC_NOT_FOUND(601, "发送失败,PC端不在线"),
TOO_MANY_CLIENT(602, "客户端在其他设备登录,被迫下线"),
DATA_FORMAT_ERROR(603, "数据格式不正确,被迫下线"),
HEART_IDLE_ERROR(604, "心跳检测超时,被迫下线");
private final int value;
private final String msg;
}
七、handler
1.ControlAuthHandler
package cn.piesat.space.control.socket.handler;
import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.constants.ControlConstants;
import cn.piesat.space.control.socket.service.ControlChannelService;
import cn.piesat.space.control.socket.utils.NettyUtil;
import cn.piesat.space.user.api.feign.RemoteUserProvider;
import cn.piesat.space.user.api.vo.LoginUser;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author Marclew
* @date 2022/10/18 17:12
* @describe
*/
@Component
@ChannelHandler.Sharable
@Slf4j
public class ControlAuthHandler extends ChannelInboundHandlerAdapter {
private static ControlAuthHandler authHandler;
@Autowired
private ControlChannelService controlChannelService;
@Autowired(required = false)
private RemoteUserProvider remoteUserProvider;
/**
* 因为是new出来的handler,没有托给spring容器,所以一定要先初始化,否则autowired失效
*/
@PostConstruct
public void init() {
authHandler = this;
}
/**
* 执行顺序 【4】
* channel读取事件 【通道可读时触发】
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest,把用户id和对应的channel对象存储起来
try {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
log.info("【Control】--- 鉴权 start...");
String authorization = NettyUtil.getFieldValue(uri, "authorization");
Integer clientType = Integer.valueOf(NettyUtil.getFieldValue(uri, "clientType"));
LoginUser loginUser = authHandler.remoteUserProvider.getUserInfoByToken(authorization);
if (ObjectUtil.isEmpty(loginUser) || ObjectUtil.isEmpty(loginUser.getHhxsUserId()) || ObjectUtil.isEmpty(clientType)){
authHandler.controlChannelService.authError(ctx.channel());
return;
}
if (!ObjectUtil.equal(ControlConstants.CLIENT_TYPE_PC, clientType) && !ObjectUtil.equal(ControlConstants.CLIENT_TYPE_MOBILE, clientType)){
authHandler.controlChannelService.authError(ctx.channel());
return;
}
Long hhxsUserId = loginUser.getHhxsUserId();
authHandler.controlChannelService.userOnline(ctx.channel(), hhxsUserId, clientType);
log.info("【Control】--- 登录的用户id是:{}", hhxsUserId);
//如果url包含参数,需要处理
request.setUri(NettyUtil.processRequestUri(uri));
}
ctx.fireChannelRead(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.ControlChannelHandler?
package cn.piesat.space.control.socket.handler;
import cn.hutool.core.util.ObjectUtil;
import cn.piesat.common.redis.service.RedisService;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.dto.ControlReqDto;
import cn.piesat.space.control.socket.enums.ControlReqCmdEnum;
import cn.piesat.space.control.socket.enums.ResultCodeEnum;
import cn.piesat.space.control.socket.service.ControlChannelService;
import cn.piesat.space.control.socket.service.ControlMessageService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author Marclew
* @创建时间 2022/2/16 16:30
* @描述 netty执行类
*/
@Component
@ChannelHandler.Sharable
@Slf4j
public class ControlChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired
private RedisService redisService;
@Autowired
private ControlChannelService controlChannelService;
private static ControlChannelHandler channelHandler;
@PostConstruct
/**
* 因为是new出来的handler,没有托给spring容器,所以一定要先初始化,否则autowired失效
*/
public void init() {
channelHandler = this;
}
/**
* 执行顺序 【1】
* channel 助手类(拦截器)的添加
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
/**
* 执行顺序 【2】
* channel注册事件
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
/**
* 执行顺序 【3】
* channel激活事件 【通道激活时触发】
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("【Control】--- 客户端建立连接,address:{}", ctx.channel().remoteAddress().toString());
log.info("【Control】--- 当前websocket在线连接数:{}个", ControlBucket.channelGroup.size());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
//正常的TEXT消息类型
log.info("【Control】--- 服务器收到客户端数据:{}", frame.text());
ControlReqDto controlDto = null;
try {
controlDto = JSON.parseObject(frame.text(), ControlReqDto.class);
} catch (Exception e) {
channelHandler.controlChannelService.sendErrorMessage(ctx.channel(), ResultCodeEnum.DATA_FORMAT_ERROR);
e.printStackTrace();
}
// TODO:数据校验
// ...
// 获取cmd
Integer cmd = controlDto.getCmd();
if (ObjectUtil.equal(ControlReqCmdEnum.HEART_CMD.getValue(), cmd)){
channelHandler.controlChannelService.heartHandle(ctx.channel());
return;
}
channelHandler.controlChannelService.transferMessage(ctx.channel(), controlDto);
}
/**
* 执行顺序 【5】
* channel读取完成事件 【通道读取完成时触发】
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
/**
* 执行顺序 【6】
* channel通道不活跃/断开事件 【通道断开或者不活跃时触发】
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelHandler.controlChannelService.removeChannel(ctx.channel());
}
/**
* channel取消注册事件
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
/**
* channel 可写更改
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
/**
* channel 捕获到异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 删除通道
channelHandler.controlChannelService.removeChannel(ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
channelHandler.controlChannelService.sendErrorMessage(ctx.channel(), ResultCodeEnum.HEART_IDLE_ERROR);
//超时事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) evt;
//读超时
if (idleEvent.state() == IdleState.READER_IDLE) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("【Control】--- {}读超时事件={} ", ctx.channel().remoteAddress().toString(), IdleState.READER_IDLE);
ctx.disconnect();
}
// 断开连接
channelHandler.controlChannelService.removeChannel(ctx.channel());
//写
} else if (idleEvent.state() == IdleState.WRITER_IDLE) {
//全部
log.info("【Control】--- {}写超时事件={} ", ctx.channel().remoteAddress().toString(), IdleState.WRITER_IDLE);
} else if (idleEvent.state() == IdleState.ALL_IDLE) {
log.info("【Control】--- {}读写超时事件={} ", ctx.channel().remoteAddress().toString(), IdleState.ALL_IDLE);
}
}
super.userEventTriggered(ctx, evt);
}
/**
* channel 助手类(拦截器)移除
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
}
八、server
package cn.piesat.space.control.socket.server;
import cn.piesat.space.control.socket.constants.ControlConstants;
import cn.piesat.space.control.socket.handler.ControlAuthHandler;
import cn.piesat.space.control.socket.handler.ControlChannelHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author Marclew
* @创建时间 2022/2/16 16:30
* @描述
*/
@Component
public class ControlChannelServer {
@Value("${control.port}")
private int port;
private static final Logger log = LoggerFactory.getLogger(ControlChannelServer.class);
/**
* Computes a result, or throws an exception if unable to do so.
*
* @throws Exception if unable to compute a result
*/
public void call() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
// 绑定线程池
sb.group(group, bossGroup)
// 指定使用的channel
.channel(NioServerSocketChannel.class)
// 绑定监听端口
.localAddress(this.port)
// 绑定客户端连接时候触发操作
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
ch.pipeline().addLast(new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(ControlConstants.MAX_CONTENT_LENGTH));
ch.pipeline().addLast(new ControlAuthHandler());//添加鉴权处理类
ch.pipeline().addLast("heart", new IdleStateHandler(ControlConstants.READER_IDLE_TIME, ControlConstants.WRITER_IDLE_TIME, ControlConstants.ALL_IDLE_TIME, TimeUnit.SECONDS));
ch.pipeline().addLast(new ControlChannelHandler());//添加消息处理类
ch.pipeline().addLast(new WebSocketServerProtocolHandler(ControlConstants.WS_PATH, null, true, ControlConstants.MAX_FRAME_SIZE));
}
});
// 服务器异步创建绑定
ChannelFuture cf = sb.bind().sync();
System.out.println(ControlChannelServer.class + " starting on a port: " + port);
// 关闭服务器通道
cf.channel().closeFuture().sync();
} finally {
// 释放线程池资源
group.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
}
九、service
1.ControlChannelService
package cn.piesat.space.control.socket.service;
import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.dto.ControlReqDto;
import cn.piesat.space.control.socket.entity.ControlUserInfo;
import cn.piesat.space.control.socket.enums.ControlResCmdEnum;
import cn.piesat.space.control.socket.enums.ResultCodeEnum;
import cn.piesat.space.control.socket.utils.ChannelUtil;
import cn.piesat.space.control.socket.utils.ControlBucketUtil;
import cn.piesat.space.control.socket.vo.ControlResVo;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Marclew
* @date 2022/10/12$ 15:11$
* @describe
*/
@Component
@Slf4j
public class ControlChannelService {
@Autowired
private ControlBucketUtil bucketUtil;
@Autowired
private ChannelUtil channelUtil;
@Autowired
private ControlMessageService controlMessageService;
/**
* 用户上线
*/
public void userOnline(Channel channel, Long userId, Integer clientType) {
if (bucketUtil.isOnline(userId, clientType)) {
Channel oldChannel = bucketUtil.getUserChannel(userId, clientType);
controlMessageService.sendMessage(oldChannel,
JSONObject.toJSONString(
ControlResVo.builder()
.cmd(ControlResCmdEnum.ERROR_CMD.getValue())
.resultCode(ResultCodeEnum.TOO_MANY_CLIENT.getValue())
.message(ResultCodeEnum.TOO_MANY_CLIENT.getMsg()).build()));
bucketUtil.removeUserChannel(userId, clientType);
bucketUtil.removeChannelGroup(userId, clientType);
}
bucketUtil.putUserChannel(userId, clientType, channel);
//添加到channelGroup通道组
bucketUtil.addChannelGroup(channel);
channelUtil.bindingUser(channel, userId, clientType);
}
/**
* 断开连接
*/
public void removeChannel(Channel channel) {
// 删除通道
bucketUtil.removeChannelGroup(channel);
// 拿到用户信息
if (channelUtil.hasUser(channel)){
ControlUserInfo userInfo = channelUtil.getUserInfo(channel);
bucketUtil.removeUserChannel(userInfo.getUserId(), userInfo.getClientType());
log.info("【Control】--- 与用户{}断开连接,通道关闭!", userInfo.getUserId());
log.info("【Control】--- 当前websocket在线连接数:{}个", bucketUtil.onlineUserCount());
}
channel.close();
}
/**
* 鉴权失败返回
*/
public void authError(Channel channel) {
controlMessageService.sendErrorMessage(channel, HttpResponseStatus.UNAUTHORIZED);
channel.close();
}
/**
* 正常消息转发
*/
public void transferMessage(Channel sourceChannel, ControlReqDto controlReqDto) {
ControlUserInfo userInfo = channelUtil.getUserInfo(sourceChannel);
log.info("【Control】--- userId{}", userInfo.getUserId());
if (!bucketUtil.isOnline(userInfo.getUserId(), controlReqDto.getClientType())){
controlMessageService.sendMessage(sourceChannel, controlReqDto.getCmd(), ResultCodeEnum.PC_NOT_FOUND);
log.info("【Control】--- 消息转发失败:{}", ResultCodeEnum.PC_NOT_FOUND);
return;
}
Channel targetChannel = bucketUtil.getUserChannel(userInfo.getUserId(), controlReqDto.getClientType());
ControlResVo controlResVo = ControlResVo.builder().resultCode(ResultCodeEnum.SUCCESS.getValue()).build();
BeanUtils.copyProperties(controlReqDto, controlResVo);
controlMessageService.sendMessage(targetChannel, JSONObject.toJSONString(controlResVo));
controlMessageService.sendMessage(sourceChannel, controlReqDto.getCmd(), ResultCodeEnum.SUCCESS);
}
/**
* 判断用户是否在线
*/
public Boolean isOnline(Channel channel) {
ControlUserInfo userInfo = channelUtil.getUserInfo(channel);
return bucketUtil.isOnline(userInfo.getUserId(), userInfo.getClientType());
}
/**
* 心跳
*/
public void heartHandle(Channel channel) {
controlMessageService.sendMessage(channel, JSONObject.toJSONString(ControlResVo.builder().cmd(ControlResCmdEnum.HEART_CMD.getValue()).resultCode(ResultCodeEnum.SUCCESS.getValue()).message(ControlResCmdEnum.HEART_CMD.getMsg()).build()));
}
/**
* 错误消息回馈
*/
public void sendErrorMessage(Channel channel, ResultCodeEnum resultCodeEnum) {
controlMessageService.sendMessage(channel, JSONObject.toJSONString(ControlResVo.builder().cmd(ControlResCmdEnum.ERROR_CMD.getValue()).resultCode(resultCodeEnum.getValue()).message(resultCodeEnum.getMsg()).build()));
}
}
2.ControlMessageService
package cn.piesat.space.control.socket.service;
import cn.piesat.space.control.socket.enums.ResultCodeEnum;
import cn.piesat.space.control.socket.utils.ChannelUtil;
import cn.piesat.space.control.socket.utils.ControlBucketUtil;
import cn.piesat.space.control.socket.vo.ControlResVo;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author Marclew
* @date 2022/10/21 15:11
* @describe
*/
@Component
public class ControlMessageService {
@Autowired
private ChannelUtil channelUtil;
@Autowired
private ControlBucketUtil bucketUtil;
/**
* 发送消息
*/
public void sendMessage(Channel channel, String message) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
/**
* 发送消息
*/
public void sendMessage(Long user, Integer clientType, String message) {
Channel channel = bucketUtil.getUserChannel(user, clientType);
this.sendMessage(channel, message);
}
/**
* 发送消息
*/
public void sendMessage(Channel channel, Integer cmd, ResultCodeEnum resultCodeEnum) {
this.sendMessage(channel, JSONObject.toJSONString(ControlResVo.builder().cmd(cmd).resultCode(resultCodeEnum.getValue()).message(resultCodeEnum.getMsg()).build()));
}
/**
* 发送连接失败返回
*/
public void sendErrorMessage(Channel channel, HttpResponseStatus responseStatus) {
channel.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus));
}
}
十、utils
1.ChannelUtil
package cn.piesat.space.control.socket.utils;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.constants.ControlConstants;
import cn.piesat.space.control.socket.entity.ControlUserInfo;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Marclew
* @date 2022/10/20 17:28
* @describe
*/
@Component
public class ChannelUtil {
/**
* 判断一个通道是否有用户在使用
* 可做信息转发时判断该通道是否合法
*/
public boolean hasUser(Channel channel) {
AttributeKey<ControlUserInfo> key = AttributeKey.valueOf(ControlConstants.USERINFO_ATTRIBUTE_KEY);
return (channel.hasAttr(key) || channel.attr(key).get() != null);//netty移除了这个map的remove方法,这里的判断谨慎一点
}
/**
* 给通道绑定用户信息
*/
public ControlUserInfo getUserInfo(Channel channel) {
AttributeKey<ControlUserInfo> userInfoKey = AttributeKey.valueOf(ControlConstants.USERINFO_ATTRIBUTE_KEY);
return channel.attr(userInfoKey).get();
}
/**
* 根据通道获取用户信息
*/
public void bindingUser(Channel channel, Long userId, Integer clientType) {
AttributeKey<ControlUserInfo> userInfoKey = AttributeKey.valueOf(ControlConstants.USERINFO_ATTRIBUTE_KEY);
channel.attr(userInfoKey).set(ControlUserInfo.builder().userId(userId).clientType(clientType).build());
}
}
2.ControlBucketUtil
package cn.piesat.space.control.socket.utils;
import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.constants.ControlConstants;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Marclew
* @date 2022/10/21 14:49
* @describe
*/
@Component
public class ControlBucketUtil {
/** userChannelMap */
/**
* 获取userChannelMap
*/
public ConcurrentHashMap<Long, Channel> getConcurrentHashMap(Integer clientType){
if (ObjectUtil.equal(ControlConstants.CLIENT_TYPE_PC, clientType)){
return ControlBucket.pcUserChannelMap;
}
if (ObjectUtil.equal(ControlConstants.CLIENT_TYPE_MOBILE, clientType)){
return ControlBucket.mobileUserChannelMap;
}
return new ConcurrentHashMap<>();
}
/**
* 获取channel
*/
public Channel getUserChannel(Long userId, Integer clientType) {
return getConcurrentHashMap(clientType).get(userId);
}
/**
* 添加通道至userChannelMap
*/
public Channel putUserChannel(Long userId, Integer clientType, Channel channel) {
return getConcurrentHashMap(clientType).put(userId, channel);
}
/**
* 从userChannelMap中删除通道
*/
public Channel removeUserChannel(Long userId, Integer clientType) {
return getConcurrentHashMap(clientType).remove(userId);
}
/**
* 添加channel至channelGroup
*/
public Boolean addChannelGroup(Channel channel) {
return getChannelGroup().add(channel);
}
/**
* 从channelGroup中删除channel
*/
public Boolean removeChannelGroup(Channel channel) {
return getChannelGroup().remove(channel);
}
/**
* 从channelGroup中删除channel
*/
public Boolean removeChannelGroup(Long userId, Integer clientType) {
return getChannelGroup().remove(getUserChannel(userId, clientType));
}
/**
* 获取channelGroup
*/
public ChannelGroup getChannelGroup() {
return ControlBucket.channelGroup;
}
/**
* 判断一个用户是否在线
*/
public Boolean isOnline(Long userId, Integer clientType) {
return getConcurrentHashMap(clientType).containsKey(userId) && getConcurrentHashMap(clientType).get(userId) != null;
}
/**
* 获取用户连接数
*/
public int onlineUserCount() {
return ControlBucket.channelGroup.size();
}
}
3.NettyUtil
package cn.piesat.space.control.socket.utils;
import cn.piesat.common.core.constants.Constants;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author Marclew
*/
public class NettyUtil {
public static String getFieldValue(String urlStr, String field) {
StringBuilder result = new StringBuilder();
Pattern pXm = Pattern.compile(field + "=([^&]*)");
Matcher mXm = pXm.matcher(urlStr);
while (mXm.find()) {
result.append(mXm.group(1));
}
return result.toString();
}
/**
* 处理uri
* @return
*/
public static String processRequestUri(String uri){
if (uri.contains(Constants.ENGLISH_QUESTION_MARK)) {
return uri.substring(0, uri.indexOf("?"));
}
return uri;
}
}
十一、ControlResVo
package cn.piesat.space.control.socket.vo;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author Marclew
*/
@Data
@Builder
public class ControlResVo implements Serializable {
/**
* 消息命令
*/
private Integer cmd;
/**
* 结果码
*/
private Integer resultCode;
/**
* 消息内容
*/
private String message;
}
十二、启动类
package cn.piesat.space.control.socket;
import cn.piesat.space.control.socket.server.ControlChannelServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(scanBasePackages = "cn.piesat", exclude = DataSourceAutoConfiguration.class)
public class SpaceControlSocketApplication implements CommandLineRunner {
@Autowired
private ControlChannelServer socketServer;
public static void main(String[] args) {
SpringApplication.run(SpaceControlSocketApplication.class);
}
@Override
public void run(String... args) {
try {
socketServer.call();
} catch (Exception e) {
e.printStackTrace();
}
}
}
|