IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Springboot + Netty + Websocket实现遥控器功能 -> 正文阅读

[Java知识库]Springboot + Netty + Websocket实现遥控器功能

一、pom

    <dependencies>
        <!-- 本地依赖 -->
        <dependency>
            <groupId>cn.piesat</groupId>
            <artifactId>space-user-api</artifactId>
        </dependency>

        <!-- nacos 服务注册发现(客户端)依赖 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- nacos config 配置 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>

        <!--Netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!-- jasypt 加解密 -->
        <dependency>
            <groupId>com.github.ulisesbocchio</groupId>
            <artifactId>jasypt-spring-boot-starter</artifactId>
        </dependency>
    </dependencies>

二、bucket

package cn.piesat.space.control.socket.bucket;

import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.constants.ControlConstants;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Marclew
 * @创建时间 2022/2/16 16:30
 * @描述
 */
public class ControlBucket {

    /**
     * 定义一个channel组,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放智慧屏用户与Chanel的对应信息,用于给指定用户发送消息
     */
    public static ConcurrentHashMap<Long, Channel> pcUserChannelMap = new ConcurrentHashMap<>();

    /**
     * 存放智慧屏用户与Chanel的对应信息,用于给指定用户发送消息
     */
    public static ConcurrentHashMap<Long, Channel> mobileUserChannelMap  = new ConcurrentHashMap<>();

}

三、constant

package cn.piesat.space.control.socket.constants;

/**
 * @author Marclew
 * @date 2022/10/21 9:37
 * @describe
 */

public class ControlConstants {

    public final static String WS_PATH = "/control";
    public final static String USERINFO_ATTRIBUTE_KEY = "userInfo";
    public final static Integer CLIENT_TYPE_PC = 1;
    public final static Integer CLIENT_TYPE_MOBILE = 2;
    public final static Integer READER_IDLE_TIME = 30;
    public final static Integer WRITER_IDLE_TIME = 60;
    public final static Integer ALL_IDLE_TIME = 90;
    public final static Integer MAX_FRAME_SIZE = 65536 * 10;
    public final static Integer MAX_CONTENT_LENGTH = 8192;
}

四、dto

package cn.piesat.space.control.socket.dto;

import lombok.Data;

import java.io.Serializable;

/**
 * @author Marclew
 */
@Data
public class ControlReqDto implements Serializable {

    /**
     * 目标客户端类型(1.智慧屏   2.移动端)
     */
    private Integer clientType;

    /**
     * 消息命令
     */
    private Integer cmd;

    /**
     * 消息内容
     */
    private String message;
}

五、entity

package cn.piesat.space.control.socket.entity;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author Marclew
 * @date 2022/10/20 16:42
 * @describe
 */

@Data
@Builder
public class ControlUserInfo implements Serializable {

    private Long userId;
    private Integer clientType;
}

六、enums

1.ControlReqCmdEnum

package cn.piesat.space.control.socket.enums;


import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author Marclew
 */

@Getter
@AllArgsConstructor
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum ControlReqCmdEnum {

    HEART_CMD(2001, "心跳"),
    CONNECT_CMD(2002, "请求投屏");

    private final int value;
    private final String msg;
}

2.ControlResCmdEnum

package cn.piesat.space.control.socket.enums;


import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author Marclew
 */

@Getter
@AllArgsConstructor
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum ControlResCmdEnum {

    HEART_CMD(2001, "心跳检测成功"),
    CONNECT_CMD(2002, "请求投屏"),
    ERROR_CMD(500, "异常通知");

    private final int value;
    private final String msg;
}

3.ResultCodeEnum

package cn.piesat.space.control.socket.enums;


import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author Marclew
 */

@Getter
@AllArgsConstructor
@SuppressWarnings("AlibabaEnumConstantsMustHaveComment")
public enum ResultCodeEnum {

    SUCCESS(200, "发送成功"),
    PC_NOT_FOUND(601, "发送失败,PC端不在线"),
    TOO_MANY_CLIENT(602, "客户端在其他设备登录,被迫下线"),
    DATA_FORMAT_ERROR(603, "数据格式不正确,被迫下线"),
    HEART_IDLE_ERROR(604, "心跳检测超时,被迫下线");

    private final int value;
    private final String msg;
}

七、handler

1.ControlAuthHandler

package cn.piesat.space.control.socket.handler;

import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.constants.ControlConstants;
import cn.piesat.space.control.socket.service.ControlChannelService;
import cn.piesat.space.control.socket.utils.NettyUtil;
import cn.piesat.space.user.api.feign.RemoteUserProvider;
import cn.piesat.space.user.api.vo.LoginUser;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author Marclew
 * @date 2022/10/18 17:12
 * @describe
 */
@Component
@ChannelHandler.Sharable
@Slf4j
public class ControlAuthHandler extends ChannelInboundHandlerAdapter {

    private static ControlAuthHandler authHandler;
    @Autowired
    private ControlChannelService controlChannelService;

    @Autowired(required = false)
    private RemoteUserProvider remoteUserProvider;

    /**
     * 因为是new出来的handler,没有托给spring容器,所以一定要先初始化,否则autowired失效
     */
    @PostConstruct
    public void init() {
        authHandler = this;
    }

    /**
     * 执行顺序 【4】
     * channel读取事件 【通道可读时触发】
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //首次连接是FullHttpRequest,把用户id和对应的channel对象存储起来
        try {
            if (msg instanceof FullHttpRequest) {
                FullHttpRequest request = (FullHttpRequest) msg;
                String uri = request.uri();
                log.info("【Control】--- 鉴权   start...");
                String authorization = NettyUtil.getFieldValue(uri, "authorization");
                Integer clientType = Integer.valueOf(NettyUtil.getFieldValue(uri, "clientType"));
                LoginUser loginUser = authHandler.remoteUserProvider.getUserInfoByToken(authorization);
                if (ObjectUtil.isEmpty(loginUser) || ObjectUtil.isEmpty(loginUser.getHhxsUserId()) || ObjectUtil.isEmpty(clientType)){
                    authHandler.controlChannelService.authError(ctx.channel());
                    return;
                }
                if (!ObjectUtil.equal(ControlConstants.CLIENT_TYPE_PC, clientType) && !ObjectUtil.equal(ControlConstants.CLIENT_TYPE_MOBILE, clientType)){
                    authHandler.controlChannelService.authError(ctx.channel());
                    return;
                }
                Long hhxsUserId = loginUser.getHhxsUserId();
                authHandler.controlChannelService.userOnline(ctx.channel(), hhxsUserId, clientType);
                log.info("【Control】--- 登录的用户id是:{}", hhxsUserId);
                //如果url包含参数,需要处理
                request.setUri(NettyUtil.processRequestUri(uri));
            }
            ctx.fireChannelRead(msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.ControlChannelHandler?

package cn.piesat.space.control.socket.handler;

import cn.hutool.core.util.ObjectUtil;
import cn.piesat.common.redis.service.RedisService;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.dto.ControlReqDto;
import cn.piesat.space.control.socket.enums.ControlReqCmdEnum;
import cn.piesat.space.control.socket.enums.ResultCodeEnum;
import cn.piesat.space.control.socket.service.ControlChannelService;
import cn.piesat.space.control.socket.service.ControlMessageService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


/**
 * @author  Marclew
 * @创建时间 2022/2/16 16:30
 * @描述  netty执行类
 */
@Component
@ChannelHandler.Sharable
@Slf4j
public class ControlChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Autowired
    private RedisService redisService;
    @Autowired
    private ControlChannelService controlChannelService;

    private static ControlChannelHandler channelHandler;

    @PostConstruct
    /**
     * 因为是new出来的handler,没有托给spring容器,所以一定要先初始化,否则autowired失效
     */
    public void init() {
        channelHandler = this;
    }

    /**
     * 执行顺序 【1】
     * channel 助手类(拦截器)的添加
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
    }

    /**
     * 执行顺序 【2】
     * channel注册事件
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    /**
     * 执行顺序 【3】
     * channel激活事件 【通道激活时触发】
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("【Control】--- 客户端建立连接,address:{}", ctx.channel().remoteAddress().toString());
        log.info("【Control】--- 当前websocket在线连接数:{}个", ControlBucket.channelGroup.size());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        //正常的TEXT消息类型
        log.info("【Control】--- 服务器收到客户端数据:{}", frame.text());
        ControlReqDto controlDto = null;
        try {
            controlDto = JSON.parseObject(frame.text(), ControlReqDto.class);
        } catch (Exception e) {
            channelHandler.controlChannelService.sendErrorMessage(ctx.channel(), ResultCodeEnum.DATA_FORMAT_ERROR);
            e.printStackTrace();
        }
        // TODO:数据校验
        // ...
        // 获取cmd
        Integer cmd = controlDto.getCmd();
        if (ObjectUtil.equal(ControlReqCmdEnum.HEART_CMD.getValue(), cmd)){
            channelHandler.controlChannelService.heartHandle(ctx.channel());
            return;
        }
        channelHandler.controlChannelService.transferMessage(ctx.channel(), controlDto);
    }

    /**
     * 执行顺序 【5】
     * channel读取完成事件 【通道读取完成时触发】
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    /**
     * 执行顺序 【6】
     * channel通道不活跃/断开事件 【通道断开或者不活跃时触发】
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        channelHandler.controlChannelService.removeChannel(ctx.channel());
    }

    /**
     * channel取消注册事件
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    /**
     * channel 可写更改
     */
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

    /**
     * channel 捕获到异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 删除通道
        channelHandler.controlChannelService.removeChannel(ctx.channel());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        channelHandler.controlChannelService.sendErrorMessage(ctx.channel(), ResultCodeEnum.HEART_IDLE_ERROR);
        //超时事件
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleEvent = (IdleStateEvent) evt;
            //读超时
            if (idleEvent.state() == IdleState.READER_IDLE) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    log.info("【Control】--- {}读超时事件={} ", ctx.channel().remoteAddress().toString(), IdleState.READER_IDLE);
                    ctx.disconnect();
                }
                // 断开连接
                channelHandler.controlChannelService.removeChannel(ctx.channel());
                //写
            } else if (idleEvent.state() == IdleState.WRITER_IDLE) {
                //全部
                log.info("【Control】--- {}写超时事件={} ", ctx.channel().remoteAddress().toString(), IdleState.WRITER_IDLE);
            } else if (idleEvent.state() == IdleState.ALL_IDLE) {
                log.info("【Control】--- {}读写超时事件={} ", ctx.channel().remoteAddress().toString(), IdleState.ALL_IDLE);
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    /**
     * channel 助手类(拦截器)移除
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
    }
}

八、server

package cn.piesat.space.control.socket.server;

import cn.piesat.space.control.socket.constants.ControlConstants;
import cn.piesat.space.control.socket.handler.ControlAuthHandler;
import cn.piesat.space.control.socket.handler.ControlChannelHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;


/**
 * @author  Marclew
 * @创建时间 2022/2/16 16:30
 * @描述
 */
@Component
public class ControlChannelServer {

    @Value("${control.port}")
    private int port;
    private static final Logger log = LoggerFactory.getLogger(ControlChannelServer.class);

    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @throws Exception if unable to compute a result
     */
    public void call() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            // 绑定线程池
            sb.group(group, bossGroup)
                    // 指定使用的channel
                    .channel(NioServerSocketChannel.class)
                    // 绑定监听端口
                    .localAddress(this.port)
                    // 绑定客户端连接时候触发操作
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(ControlConstants.MAX_CONTENT_LENGTH));
                            ch.pipeline().addLast(new ControlAuthHandler());//添加鉴权处理类
                            ch.pipeline().addLast("heart", new IdleStateHandler(ControlConstants.READER_IDLE_TIME, ControlConstants.WRITER_IDLE_TIME, ControlConstants.ALL_IDLE_TIME, TimeUnit.SECONDS));
                            ch.pipeline().addLast(new ControlChannelHandler());//添加消息处理类
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler(ControlConstants.WS_PATH, null, true, ControlConstants.MAX_FRAME_SIZE));
                        }
                    });
            // 服务器异步创建绑定
            ChannelFuture cf = sb.bind().sync();
            System.out.println(ControlChannelServer.class + " starting on a port: " + port);
            // 关闭服务器通道
            cf.channel().closeFuture().sync();
        } finally {
            // 释放线程池资源
            group.shutdownGracefully().sync();
            bossGroup.shutdownGracefully().sync();
        }
    }
}

九、service

1.ControlChannelService

package cn.piesat.space.control.socket.service;

import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.dto.ControlReqDto;
import cn.piesat.space.control.socket.entity.ControlUserInfo;
import cn.piesat.space.control.socket.enums.ControlResCmdEnum;
import cn.piesat.space.control.socket.enums.ResultCodeEnum;
import cn.piesat.space.control.socket.utils.ChannelUtil;
import cn.piesat.space.control.socket.utils.ControlBucketUtil;
import cn.piesat.space.control.socket.vo.ControlResVo;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Marclew
 * @date 2022/10/12$ 15:11$
 * @describe
 */
@Component
@Slf4j
public class ControlChannelService {

    @Autowired
    private ControlBucketUtil bucketUtil;
    @Autowired
    private ChannelUtil channelUtil;
    @Autowired
    private ControlMessageService controlMessageService;


    /**
     * 用户上线
     */
    public void userOnline(Channel channel, Long userId, Integer clientType) {
        if (bucketUtil.isOnline(userId, clientType)) {
            Channel oldChannel = bucketUtil.getUserChannel(userId, clientType);
            controlMessageService.sendMessage(oldChannel,
                    JSONObject.toJSONString(
                            ControlResVo.builder()
                                    .cmd(ControlResCmdEnum.ERROR_CMD.getValue())
                                    .resultCode(ResultCodeEnum.TOO_MANY_CLIENT.getValue())
                                    .message(ResultCodeEnum.TOO_MANY_CLIENT.getMsg()).build()));
            bucketUtil.removeUserChannel(userId, clientType);
            bucketUtil.removeChannelGroup(userId, clientType);
        }
        bucketUtil.putUserChannel(userId, clientType, channel);
        //添加到channelGroup通道组
        bucketUtil.addChannelGroup(channel);
        channelUtil.bindingUser(channel, userId, clientType);
    }

    /**
     * 断开连接
     */
    public void removeChannel(Channel channel) {
        // 删除通道
        bucketUtil.removeChannelGroup(channel);
        // 拿到用户信息
        if (channelUtil.hasUser(channel)){
            ControlUserInfo userInfo = channelUtil.getUserInfo(channel);
            bucketUtil.removeUserChannel(userInfo.getUserId(), userInfo.getClientType());
            log.info("【Control】--- 与用户{}断开连接,通道关闭!", userInfo.getUserId());
            log.info("【Control】--- 当前websocket在线连接数:{}个", bucketUtil.onlineUserCount());
        }
        channel.close();
    }

    /**
     * 鉴权失败返回
     */
    public void authError(Channel channel) {
        controlMessageService.sendErrorMessage(channel, HttpResponseStatus.UNAUTHORIZED);
        channel.close();
    }

    /**
     * 正常消息转发
     */
    public void transferMessage(Channel sourceChannel, ControlReqDto controlReqDto) {
        ControlUserInfo userInfo = channelUtil.getUserInfo(sourceChannel);
        log.info("【Control】--- userId{}", userInfo.getUserId());
        if (!bucketUtil.isOnline(userInfo.getUserId(), controlReqDto.getClientType())){
            controlMessageService.sendMessage(sourceChannel, controlReqDto.getCmd(), ResultCodeEnum.PC_NOT_FOUND);
            log.info("【Control】--- 消息转发失败:{}", ResultCodeEnum.PC_NOT_FOUND);
            return;
        }
        Channel targetChannel = bucketUtil.getUserChannel(userInfo.getUserId(), controlReqDto.getClientType());
        ControlResVo controlResVo = ControlResVo.builder().resultCode(ResultCodeEnum.SUCCESS.getValue()).build();
        BeanUtils.copyProperties(controlReqDto, controlResVo);
        controlMessageService.sendMessage(targetChannel, JSONObject.toJSONString(controlResVo));
        controlMessageService.sendMessage(sourceChannel, controlReqDto.getCmd(), ResultCodeEnum.SUCCESS);
    }

    /**
     * 判断用户是否在线
     */
    public Boolean isOnline(Channel channel) {
        ControlUserInfo userInfo = channelUtil.getUserInfo(channel);
        return bucketUtil.isOnline(userInfo.getUserId(), userInfo.getClientType());
    }

    /**
     * 心跳
     */
    public void heartHandle(Channel channel) {
        controlMessageService.sendMessage(channel, JSONObject.toJSONString(ControlResVo.builder().cmd(ControlResCmdEnum.HEART_CMD.getValue()).resultCode(ResultCodeEnum.SUCCESS.getValue()).message(ControlResCmdEnum.HEART_CMD.getMsg()).build()));
    }

    /**
     * 错误消息回馈
     */
    public void sendErrorMessage(Channel channel, ResultCodeEnum resultCodeEnum) {
        controlMessageService.sendMessage(channel, JSONObject.toJSONString(ControlResVo.builder().cmd(ControlResCmdEnum.ERROR_CMD.getValue()).resultCode(resultCodeEnum.getValue()).message(resultCodeEnum.getMsg()).build()));
    }
}

2.ControlMessageService

package cn.piesat.space.control.socket.service;

import cn.piesat.space.control.socket.enums.ResultCodeEnum;
import cn.piesat.space.control.socket.utils.ChannelUtil;
import cn.piesat.space.control.socket.utils.ControlBucketUtil;
import cn.piesat.space.control.socket.vo.ControlResVo;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author Marclew
 * @date 2022/10/21 15:11
 * @describe
 */

@Component
public class ControlMessageService {

    @Autowired
    private ChannelUtil channelUtil;
    @Autowired
    private ControlBucketUtil bucketUtil;

    /**
     * 发送消息
     */
    public void sendMessage(Channel channel, String message) {
        channel.writeAndFlush(new TextWebSocketFrame(message));
    }

    /**
     * 发送消息
     */
    public void sendMessage(Long user, Integer clientType, String message) {
        Channel channel = bucketUtil.getUserChannel(user, clientType);
        this.sendMessage(channel, message);
    }

    /**
     * 发送消息
     */
    public void sendMessage(Channel channel, Integer cmd, ResultCodeEnum resultCodeEnum) {
        this.sendMessage(channel, JSONObject.toJSONString(ControlResVo.builder().cmd(cmd).resultCode(resultCodeEnum.getValue()).message(resultCodeEnum.getMsg()).build()));
    }

    /**
     * 发送连接失败返回
     */
    public void sendErrorMessage(Channel channel, HttpResponseStatus responseStatus) {
        channel.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus));
    }
}

十、utils

1.ChannelUtil

package cn.piesat.space.control.socket.utils;

import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.constants.ControlConstants;
import cn.piesat.space.control.socket.entity.ControlUserInfo;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Marclew
 * @date 2022/10/20 17:28
 * @describe
 */

@Component
public class ChannelUtil {

    /**
     * 判断一个通道是否有用户在使用
     * 可做信息转发时判断该通道是否合法
     */
    public boolean hasUser(Channel channel) {
        AttributeKey<ControlUserInfo> key = AttributeKey.valueOf(ControlConstants.USERINFO_ATTRIBUTE_KEY);
        return (channel.hasAttr(key) || channel.attr(key).get() != null);//netty移除了这个map的remove方法,这里的判断谨慎一点
    }

    /**
     * 给通道绑定用户信息
     */
    public ControlUserInfo getUserInfo(Channel channel) {
        AttributeKey<ControlUserInfo> userInfoKey = AttributeKey.valueOf(ControlConstants.USERINFO_ATTRIBUTE_KEY);
        return channel.attr(userInfoKey).get();
    }

    /**
     * 根据通道获取用户信息
     */
    public void bindingUser(Channel channel, Long userId, Integer clientType) {
        AttributeKey<ControlUserInfo> userInfoKey = AttributeKey.valueOf(ControlConstants.USERINFO_ATTRIBUTE_KEY);
        channel.attr(userInfoKey).set(ControlUserInfo.builder().userId(userId).clientType(clientType).build());
    }

}

2.ControlBucketUtil

package cn.piesat.space.control.socket.utils;

import cn.hutool.core.util.ObjectUtil;
import cn.piesat.space.control.socket.bucket.ControlBucket;
import cn.piesat.space.control.socket.constants.ControlConstants;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Marclew
 * @date 2022/10/21 14:49
 * @describe
 */
@Component
public class ControlBucketUtil {

    /**         userChannelMap            */

    /**
     * 获取userChannelMap
     */
    public ConcurrentHashMap<Long, Channel> getConcurrentHashMap(Integer clientType){
        if (ObjectUtil.equal(ControlConstants.CLIENT_TYPE_PC, clientType)){
            return ControlBucket.pcUserChannelMap;
        }
        if (ObjectUtil.equal(ControlConstants.CLIENT_TYPE_MOBILE, clientType)){
            return ControlBucket.mobileUserChannelMap;
        }
        return new ConcurrentHashMap<>();
    }


    /**
     * 获取channel
     */
    public Channel getUserChannel(Long userId, Integer clientType) {
        return getConcurrentHashMap(clientType).get(userId);
    }

    /**
     * 添加通道至userChannelMap
     */
    public Channel putUserChannel(Long userId, Integer clientType, Channel channel) {
        return getConcurrentHashMap(clientType).put(userId, channel);
    }

    /**
     * 从userChannelMap中删除通道
     */
    public Channel removeUserChannel(Long userId, Integer clientType) {
        return getConcurrentHashMap(clientType).remove(userId);
    }

    /**
     * 添加channel至channelGroup
     */
    public Boolean addChannelGroup(Channel channel) {
        return getChannelGroup().add(channel);
    }

    /**
     * 从channelGroup中删除channel
     */
    public Boolean removeChannelGroup(Channel channel) {
        return getChannelGroup().remove(channel);
    }

    /**
     * 从channelGroup中删除channel
     */
    public Boolean removeChannelGroup(Long userId, Integer clientType) {
        return getChannelGroup().remove(getUserChannel(userId, clientType));
    }

    /**
     * 获取channelGroup
     */
    public ChannelGroup getChannelGroup() {
        return ControlBucket.channelGroup;
    }

    /**
     * 判断一个用户是否在线
     */
    public Boolean isOnline(Long userId, Integer clientType) {
        return getConcurrentHashMap(clientType).containsKey(userId) && getConcurrentHashMap(clientType).get(userId) != null;
    }

    /**
     * 获取用户连接数
     */
    public int onlineUserCount() {
        return ControlBucket.channelGroup.size();
    }
}

3.NettyUtil

package cn.piesat.space.control.socket.utils;

import cn.piesat.common.core.constants.Constants;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * @author Marclew
 */
public class NettyUtil {

    public static String getFieldValue(String urlStr, String field) {
        StringBuilder result = new StringBuilder();
        Pattern pXm = Pattern.compile(field + "=([^&]*)");
        Matcher mXm = pXm.matcher(urlStr);
        while (mXm.find()) {
            result.append(mXm.group(1));
        }
        return result.toString();
    }

    /**
     * 处理uri
     * @return
     */
    public static String processRequestUri(String uri){
        if (uri.contains(Constants.ENGLISH_QUESTION_MARK)) {
            return uri.substring(0, uri.indexOf("?"));
        }
        return uri;
    }
}

十一、ControlResVo

package cn.piesat.space.control.socket.vo;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author Marclew
 */
@Data
@Builder
public class ControlResVo implements Serializable {

    /**
     * 消息命令
     */
    private Integer cmd;

    /**
     * 结果码
     */
    private Integer resultCode;

    /**
     * 消息内容
     */
    private String message;
}

十二、启动类

package cn.piesat.space.control.socket;

import cn.piesat.space.control.socket.server.ControlChannelServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(scanBasePackages = "cn.piesat", exclude = DataSourceAutoConfiguration.class)
public class SpaceControlSocketApplication implements CommandLineRunner {

    @Autowired
    private ControlChannelServer socketServer;

    public static void main(String[] args) {
        SpringApplication.run(SpaceControlSocketApplication.class);
    }


    @Override
    public void run(String... args) {
        try {
            socketServer.call();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-10-31 11:40:47  更:2022-10-31 11:45:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 4:11:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码