依赖
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
<version>3.7.4.v20210808-RELEASE</version>
</dependency>
Packet
import cn.hutool.core.util.HexUtil;
import com.demo.socket.util.HexStrUtil;
import lombok.Getter;
import lombok.Setter;
import org.tio.core.intf.Packet;
import java.io.UnsupportedEncodingException;
/**
* @author Lenovo 消息体
*/
@Setter
@Getter
public class MindPackage extends Packet {
private static final long serialVersionUID = -172060606924066412L;
/**
* 包头长度
*/
public static final Integer HEADER = 4;
/**
* 记录包长度的长度
*/
public static final Integer LENGTH = 4;
/**
* 登录ID长度
*/
public static final Integer LOGIN_ID = 4;
/**
* 命令码长度
*/
public static final Integer COMMAND_CODE = 2;
/**
* 命令ID长度
*/
public static final Integer COMMAND_ID = 4;
/**
* 命令ID长度 0正常1非正常;数据前 8 字节是错误码,之后是 UTF8 编码的错误信息
*/
public static final Integer RESULT = 1;
/**
* 加密方式0未加密长度
*/
public static final Integer SECRET = 1;
/**
* 压缩方式长度;0=无压缩,1=gzip;
*/
public static final Integer ZIP = 1;
/**
* 数据长度
*/
public static final Integer DATA = null;
/**
* 包尾长度
*/
public static final Integer LAST = 4;
/**
* 编码
*/
public static final String CHARSET = "utf-8";
/**
* 字节数组:包头
*/
private byte[] head = HexStrUtil.hexStrToBinaryStr("3C 43 6D 64");
/**
* 字节数组:包长度 【null都要初始化一下,否则发送心跳包计算长度的时候会空指针】
*/
private byte[] length = new byte[4];
/**
* 字节数组:登录ID
*/
private byte[] loginId = new byte[4];
/**
* 字节数组:命令码
*/
private byte[] commandCode = new byte[2];
/**
* 字节数组:命令ID 自定义
*/
private byte[] commandId = new byte[4];
/**
* 字节数组:访问结果
*/
private byte[] result = HexUtil.decodeHex("00");
/**
* 字节数组:加密方式
*/
private byte[] secret = HexStrUtil.hexStrToBinaryStr("00");
/**
* 字节数组:压缩方式
*/
private byte[] zip = HexStrUtil.hexStrToBinaryStr("00");
/**
* 字节数组:数据
*/
private byte[] data = new byte[0];
/**
* 字节数组:包尾
*/
private byte[] last = HexStrUtil.hexStrToBinaryStr("63 6D 44 3E");
/**
* 计算出包长度字节4byte
*/
public byte[] getReqLengthByCount() {
int number = loginId.length
+ commandCode.length
+ commandId.length
+ secret.length
+ zip.length
+ data.length;
//先转16进制字符串,再转bytes
byte[] bytes = new byte[4];
byte[] temp = HexUtil.decodeHex(Integer.toHexString(number));
//固定占4个字节,所以这里找个容器转一道
System.arraycopy(temp, 0, bytes, 0, temp.length);
return bytes;
}
/**
* 计算出包长度字节4byte
*/
public byte[] getResLengthByCount() {
int number = loginId.length
+ commandCode.length
+ commandId.length
+ secret.length
+ result.length
+ zip.length
+ data.length;
//先转16进制字符串,再转bytes
byte[] bytes = new byte[4];
byte[] temp = HexUtil.decodeHex(Integer.toHexString(number));
//固定占4个字节,所以这里找个容器转一道
System.arraycopy(temp, 0, bytes, 0, temp.length);
return bytes;
}
/**
* 获取字符串数据
*/
public String getStringData() throws UnsupportedEncodingException {
return new String(this.data, CHARSET);
}
/**
* 设置字符串数据
*/
public void setStringData(String data) {
//先转16进制字符串,再转bytes
String data16 = HexUtil.encodeHexStr(data).toUpperCase();
this.data = HexUtil.decodeHex(data16);
}
}
客户端处理器实现
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSON;
import com.demo.socket.entity.Response;
import com.demo.socket.service.IResponseService;
import com.demo.socket.socketplus.Remark;
import com.demo.socket.util.HexStrUtil;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
import java.nio.ByteBuffer;
import java.util.Date;
/**
* @author Lenovo 处理器
*/
public class ClientAioHandlerImpl implements ClientAioHandler {
/**
* 创建心跳包
*/
@Override
public Packet heartbeatPacket(ChannelContext channelContext) {
return new MindPackage();
}
/**
* 根据ByteBuffer解码成业务需要的Packet对象.
* 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据
*/
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
MindPackage mindPackage = new MindPackage();
//至少要知道包头和包长度
if (readableLength < MindPackage.HEADER + MindPackage.LENGTH) {
return null;
}
//包头byte
byte[] head = new byte[4];
buffer.get(head, 0, MindPackage.HEADER);
mindPackage.setHead(head);
//包长度byte
byte[] length = new byte[4];
buffer.get(length, 0, MindPackage.LENGTH);
mindPackage.setLength(length);
//位运算得出包长度
int number = HexStrUtil.bytes2int(length);
//一个完整的包的长度必须大于可读取的长度
if (MindPackage.HEADER + MindPackage.LENGTH + number + MindPackage.LAST > readableLength) {
return null;
} else {
//登录ID
byte[] loginId = new byte[4];
buffer.get(loginId, 0, 4);
mindPackage.setLoginId(loginId);
//命令码
byte[] commandCode = new byte[2];
buffer.get(commandCode, 0, 2);
mindPackage.setCommandCode(commandCode);
//命令ID
byte[] commandID = new byte[4];
buffer.get(commandID, 0, 4);
mindPackage.setCommandId(commandID);
//执行结果
byte[] result = new byte[1];
buffer.get(result, 0, 1);
mindPackage.setResult(result);
//加密方式
byte[] secret = new byte[1];
buffer.get(secret, 0, 1);
mindPackage.setSecret(secret);
//压缩方式
byte[] zip = new byte[1];
buffer.get(zip, 0, 1);
mindPackage.setZip(zip);
//数据
int dataLength = number - MindPackage.HEADER
- MindPackage.COMMAND_CODE
- MindPackage.COMMAND_ID
- MindPackage.RESULT
- MindPackage.SECRET
- MindPackage.ZIP;
byte[] data = new byte[dataLength];
buffer.get(data, 0, dataLength);
mindPackage.setData(data);
//包尾
byte[] last = new byte[4];
buffer.get(last, 0, 4);
mindPackage.setLast(last);
return mindPackage;
}
}
/**
* 编码
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
MindPackage mindPackage = (MindPackage) packet;
byte[] head = mindPackage.getHead();
byte[] length = mindPackage.getReqLengthByCount();
byte[] loginId = mindPackage.getLoginId();
byte[] commandCode = mindPackage.getCommandCode();
byte[] commandId = mindPackage.getCommandId();
byte[] secret = mindPackage.getSecret();
byte[] zip = mindPackage.getZip();
byte[] data = mindPackage.getData();
byte[] last = mindPackage.getLast();
//组建在一起
byte[] bytes = ArrayUtil.addAll(head, length, loginId, commandCode, commandId, secret, zip, data, last);
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
byteBuffer.order(tioConfig.getByteOrder());
byteBuffer.put(bytes);
return byteBuffer;
}
/**
* 处理消息包
*/
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
System.out.println("------------------------------------------START--------------------------------------------------------");
//处理解码后的消息
MindPackage mindPackage = (MindPackage) packet;
//登录
if (Remark.LOGIN_COMMAND_ID.equals(HexUtil.encodeHexStr(mindPackage.getCommandId()))) {
IResponseService responseService = SpringUtil.getBean(IResponseService.class);
String hexStrCommandId = HexUtil.encodeHexStr(mindPackage.getCommandId());
Response response = responseService.query(hexStrCommandId);
System.out.println("result16" + HexUtil.encodeHexStr(mindPackage.getResult()));
//是否登录成功
if ("00".equals(HexUtil.encodeHexStr(mindPackage.getResult()))) {
response.setJson(mindPackage.getStringData());
response.setResponseTime(new Date());
int i = responseService.updateData(response);
System.out.println("试图保存登录结果response表:" + JSON.toJSONString(response) + ":::" + i);
} else {
response.setJson("{\"Result\":\"error\"}");
response.setResponseTime(new Date());
int i = responseService.updateData(response);
System.out.println("试图保存登录错误结果response表:" + JSON.toJSONString(response) + ":::" + i);
}
System.out.println("登录接口结果:" + mindPackage.getStringData());
} else {
System.out.println("客户端最终捕获commandID:"+HexUtil.encodeHexStr(mindPackage.getCommandId()));
}
System.out.println("--------------------------------------------END--------------------------------------------------------");
}
}
客户端监听器实现
import cn.hutool.core.util.HexUtil;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
/**
* @author Lenovo 监听器
*/
public class ClientAioListenerImpl implements ClientAioListener {
/**
* 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected
*
* @param channelContext
* @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败
* @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接
* @throws Exception
*/
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
//建立(重连)连接之后需要重新登录
if (isConnected) {
String message = "{\"Account\":\"Account\",\"Password\":\"Password\"}";
MindPackage mindPackage = new MindPackage();
mindPackage.setCommandId(HexUtil.decodeHex(Remark.LOGIN_COMMAND_ID));
mindPackage.setCommandCode(HexUtil.decodeHex(Remark.LOGIN_COMMAND_CODE));
mindPackage.setLoginId(HexUtil.decodeHex("00 00 00 00"));
mindPackage.setStringData(message);
//发出登录请求
Tio.send(channelContext, mindPackage);
}
}
/**
* 原方法名:onAfterDecoded
* 解码成功后触发本方法
*
* @param channelContext
* @param packet
* @param packetSize
* @throws Exception
*/
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
}
/**
* 接收到TCP层传过来的数据后
*
* @param channelContext
* @param receivedBytes 本次接收了多少字节
* @throws Exception
*/
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
}
/**
* 消息包发送之后触发本方法
*
* @param channelContext
* @param packet
* @param isSentSuccess true:发送成功,false:发送失败
* @throws Exception
*/
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
}
/**
* 处理一个消息包后
*
* @param channelContext
* @param packet
* @param cost 本次处理消息耗时,单位:毫秒
* @throws Exception
*/
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
}
/**
* 连接关闭前触发本方法
*
* @param channelContext the channelcontext
* @param throwable the throwable 有可能为空
* @param remark the remark 有可能为空
* @param isRemove
* @throws Exception
*/
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
}
}
服务端处理器
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.UUID;
/**
* @author Lenovo 处理器
*/
@Slf4j
public class ServerAioHandlerImpl implements ServerAioHandler {
/**
* 根据ByteBuffer解码成业务需要的Packet对象.
* 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据
*
* @param buffer 参与本次希望解码的ByteBuffer
* @param limit ByteBuffer的limit
* @param position ByteBuffer的position,不一定是0哦
* @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position)
* @param channelContext
* @return
* @throws
*/
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws TioDecodeException {
if (readableLength == 1) {
return null;
}
MindPackage mindPackage = new MindPackage();
//至少要知道包头和包长度
log.info("readableLength:" + readableLength);
if (readableLength < MindPackage.HEADER + MindPackage.LENGTH) {
log.info("readable太小无法解码");
return null;
}
//包头byte
byte[] head = new byte[4];
buffer.get(head, 0, MindPackage.HEADER);
mindPackage.setHead(head);
//包长度byte
byte[] length = new byte[4];
buffer.get(length, 0, MindPackage.LENGTH);
mindPackage.setLength(length);
//位运算得出包长度
int number = HexStrUtil.bytes2int(length);
log.info("number:" + number);
//一个完整的包的长度必须大于可读取的长度
if (MindPackage.HEADER + MindPackage.LENGTH + number + MindPackage.LAST > readableLength) {
log.info("包长大于readable,本次无法解码");
return null;
} else {
//登录ID
byte[] loginId = new byte[4];
buffer.get(loginId, 0, 4);
mindPackage.setLoginId(loginId);
//命令码
byte[] commandCode = new byte[2];
buffer.get(commandCode, 0, 2);
mindPackage.setCommandCode(commandCode);
//命令ID
byte[] commandID = new byte[4];
buffer.get(commandID, 0, 4);
mindPackage.setCommandId(commandID);
//加密方式
byte[] secret = new byte[1];
buffer.get(secret, 0, 1);
mindPackage.setSecret(secret);
//压缩方式
byte[] zip = new byte[1];
buffer.get(zip, 0, 1);
mindPackage.setZip(zip);
//数据
int dataLength = number - MindPackage.HEADER
- MindPackage.COMMAND_CODE
- MindPackage.COMMAND_ID
- MindPackage.SECRET
- MindPackage.ZIP;
byte[] data = new byte[dataLength];
buffer.get(data, 0, dataLength);
mindPackage.setData(data);
//包尾
byte[] last = new byte[4];
buffer.get(last, 0, 4);
mindPackage.setLast(last);
return mindPackage;
}
}
/**
* 编码
*
* @param packet
* @param tioConfig
* @param channelContext
* @return
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
MindPackage mindPackage = (MindPackage) packet;
byte[] head = mindPackage.getHead();
byte[] length = mindPackage.getResLengthByCount();
byte[] loginId = mindPackage.getLoginId();
byte[] commandCode = mindPackage.getCommandCode();
byte[] commandId = mindPackage.getCommandId();
byte[] result = mindPackage.getResult();
byte[] secret = mindPackage.getSecret();
byte[] zip = mindPackage.getZip();
byte[] data = mindPackage.getData();
byte[] last = mindPackage.getLast();
//组建在一起
byte[] bytes = ArrayUtil.addAll(head, length, loginId, commandCode, commandId, result, secret, zip, data, last);
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
byteBuffer.order(tioConfig.getByteOrder());
byteBuffer.put(bytes);
return byteBuffer;
}
/**
* 处理消息包
*
* @param packet
* @param channelContext
* @throws Exception
*/
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
MindPackage mindPackage = (MindPackage) packet;
//报警照片推送
if (Remark.PUSH_VIDEO_SUMMARY_OR_ALARM_CODE.equals(HexUtil.encodeHexStr(mindPackage.getCommandCode()))) {
BusinessHandle.push_video_summary_or_alarm_code(mindPackage);
} else if (Remark.LOGIN_COMMAND_CODE.equals(HexUtil.encodeHexStr(mindPackage.getCommandCode()))) {
System.out.println("客户端正在尝试登录:" + mindPackage.getStringData());
} else {
System.out.println("最终捕获:" + mindPackage.getStringData());
Request request = new Request();
request.setOrderCode(HexUtil.encodeHexStr(mindPackage.getCommandCode()));
request.setRequestTime(new Date());
request.setJson(mindPackage.getStringData());
IRequestService requestService = SpringUtil.getBean(IRequestService.class);
requestService.add(request);
}
//发消息
String account = "{\"UserId\":10145}";
mindPackage.setStringData(account);
mindPackage.setCommandCode(HexUtil.decodeHex("0000"));
mindPackage.setLoginId(HexUtil.decodeHex("A1270000"));
Tio.send(channelContext, mindPackage);
}
}
服务端监听器实现
package com.demo.socket.socketplus.server;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioListener;
/**
* @author Lenovo 监听器
*/
public class ServerAioListenerImpl implements ServerAioListener {
@Override
public boolean onHeartbeatTimeout(ChannelContext channelContext, Long aLong, int i) {
return false;
}
@Override
public void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {
}
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {
}
}
启动
public static ClientChannelContext starter() throws Exception {
ClientTioConfig clientTioConfig = new ClientTioConfig(new ClientAioHandlerImpl(), new ClientAioListenerImpl(), new ReconnConf());
TioClient tioClient = new TioClient(clientTioConfig);
return tioClient.connect(new Node("192.168.0.1", 8000));
}
public static TioServer starter() throws IOException {
ServerTioConfig serverTioConfig = new ServerTioConfig("tio-server", new ServerAioHandlerImpl(), new ServerAioListenerImpl());
tioServer = new TioServer(serverTioConfig);
tioServer.start(null, 8080);
return tioServer;
}
注意:4byte记录了包长度,低位在前。函数如下:
/**
* 标识包长度的4byte转成int数字
*/
public static int bytes2int(byte[] bytes) {
Assert.isTrue(bytes.length == 4, "只支持byte[]长度为4的转换");
String picLenStr1 = HexUtil.encodeHexStr(bytes);
String newPicLenStr1 = picLenStr1.substring(6, 8) + picLenStr1.substring(4, 6)
+ picLenStr1.substring(2, 4) + picLenStr1.substring(0, 2);
return Integer.parseInt(newPicLenStr1, 16);
}
|