- 组件篇值RPC(上)
基础架构 暂时无法在文档外展示此内容 基础架构之组件篇 组件篇
- RPC
- 注册中心
- Zookeeper
- 配置中心
- Nacos
- 消息队列
- 事务消息
- 延时消息
- Spring
组件篇之RPC(上) 01.RPC原理分析 理解RPC - Remote Procedure Call
- 远程过程调用
- 基于网络表达语义和传达数据
- 通信协议
- 像调用本地方法调用远程服务
- 扩展了算力
- 服务治理的基础
RPC作用 - 屏蔽组包/解包
- 屏蔽数据发送/接收
- 提高开发效率
- 业务发展的必然产物
RPC核心组成 - 远程方法对象代理
- 连接管理
- 序列化/反序列化
- 寻址与负载均衡
RPC调用方式 - 同步调用
- 异步调用
RPC调用过程 暂时无法在文档外展示此内容 02.精简版RPC实现 假如没有RPC : 如果没有RPC框架支持,实现远程调用需要做哪些事情? Client 端 建立与Server的连接 组装数据 发送数据包 接收处理结果数据包 解析返回数据包 Server 端 监听端口 响应连接请求 接收数据包 解析数据包,调用相应方法 组装请求处理结果数据包 发送结果数据包
设计“用户”服务 功能需求:用户信息管理—CRUD 调用方式:TCP长连接同步交互 协议:自定义协议
接口设计 注册: bool addUser(User user) 更新: bool updateUser(long uid, User user) 注销: bool deleteUser(long uid) 查询: User Info getUser(long ui)
序列化协议 远程调用涉及数据的传输,就会涉及组包和解包,需要调用方和服务方约定数据格式——序列化协议 暂时无法在文档外展示此内容 package com.naixue.client.protocol;
import com.naixue.client.entity.User; import com.naixue.util.ByteConverter; import java.io.*;
public class RpcProtocol implements Serializable{
public static int CMD_CREATE_USER = 1;
private int version;
private int cmd;
private int magicNum;
private int bodyLen = 0;
private byte[] body;
final public static int HEAD_LEN = 16;
public byte[] getBody() {
return body;
}
public RpcProtocol setBody(byte[] body) {
this.body = body;
return this;
}
public int getVersion() {
return version;
}
public RpcProtocol setVersion(int version) {
this.version = version;
return this;
}
public int getCmd() {
return cmd;
}
public RpcProtocol setCmd(int cmd) {
this.cmd = cmd;
return this;
}
public int getMagicNum() {
return magicNum;
}
public RpcProtocol setMagicNum(int magicNum) {
this.magicNum = magicNum;
return this;
}
public int getBodyLen() {
return bodyLen;
}
public RpcProtocol setBodyLen(int bodyLen) {
this.bodyLen = bodyLen;
return this;
}
public byte[] generateByteArray()
{
byte[] data = new byte[HEAD_LEN + bodyLen];
int index = 0;
System.arraycopy(ByteConverter.intToBytes(version), 0, data, index,
Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(ByteConverter.intToBytes(cmd), 0, data, index,
Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(ByteConverter.intToBytes(magicNum), 0, data, index,
Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(ByteConverter.intToBytes(bodyLen), 0, data, index,
Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(body, 0, data, index, body.length);
return data;
}
public RpcProtocol byteArrayToRpcHeader(byte[] data)
{
int index = 0;
this.setVersion(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.setCmd(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.setMagicNum(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.setBodyLen(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.body = new byte[this.bodyLen];
System.arraycopy(data, index, this.body, 0, this.bodyLen);
return this;
}
public User byteArrayToUserInfo(byte[] data)
{
User user = new User();
int index = 0;
user.setUid(ByteConverter.bytesToLong(data, index));
index += Long.BYTES;
user.setAge(ByteConverter.bytesToShort(data, index));
index += Short.BYTES;
user.setSex(ByteConverter.bytesToShort(data, index));
index += Short.BYTES;
return user;
}
public byte[] userInfoTobyteArray(User info)
{
byte[] data = new byte[Long.BYTES + Short.BYTES + Short.BYTES];
int index = 0;
System.arraycopy(ByteConverter.longToBytes(info.getUid()), 0, data,
index, Long.BYTES);
index += Long.BYTES;
System.arraycopy(ByteConverter.shortToBytes(info.getAge()), 0, data,
index, Short.BYTES);
index += Short.BYTES;
System.arraycopy(ByteConverter.shortToBytes(info.getSex()), 0, data,
index, Short.BYTES);
return data;
}
public static Object bytes2Object(byte[] objBytes) throws Exception {
if (objBytes == null || objBytes.length == 0) {
return null;
}
ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
ObjectInputStream oi = new ObjectInputStream(bi);
Object obj = oi.readObject();
bi.close();
oi.close();
return obj;
}
public static byte[] object2Bytes(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bo.close();
oo.close();
return bo.toByteArray();
}
public byte[] createUserRespTobyteArray(int result)
{
byte[] data = new byte[Integer.BYTES];
int index = 0;
System.arraycopy(ByteConverter.intToBytes(result), 0, data, index,
Integer.BYTES);
return data;
}
}
暂时无法在文档外展示此内容 public static int CMD_CREATE_USER = 1; private int version;// = 1 private int cmd;// = 0 private int magicNum; // = 0x20191009 private int bodyLen = 0;// = 12 private byte[] body; final public static int HEAD_LEN = 16;
暂时无法在文档外展示此内容 Consumer代码实现
- 创建代理类
- 构造请求数据
- 执行远程调用
package com.naixue;
import com.naixue.client.entity.User; import com.naixue.client.service.UserService;
/**
-
Created by chendong on 2019/9/3. */ public class RpcClient { public static void main(String[] args) throws Exception { UserService proxyUserService = new UserService(); User user = new User();
user.setAge((short) 26);
user.setSex((short) 1);
int ret = proxyUserService.addUser(user);
if(ret == 0)
System.out.println("调用远程服务创建用户成功!!!");
else
System.out.println("调用远程服务创建用户失败!!!");
} }
addUser : package com.naixue.client.entity;
import java.io.Serializable;
/**
-
Created by chendong on 2019/9/3. */ public class User implements Serializable { private long uid; private short age; private short sex; public long getUid() { return uid; } public User setUid(long uid) { this.uid = uid; return this; } public short getAge() { return age; } public User setAge(short age) { this.age = age; return this; } public short getSex() { return sex; } public User setSex(short sex) { this.sex = sex; return this; } }
package com.naixue.client.connect;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;
public class TcpClient { private Logger logger = LoggerFactory.getLogger(this.getClass()); private static int MAX_PACKAGE_SIZE = 1024 * 4; private static String SERVER_IP = “127.0.0.1”; private static int SERVER_PORT = 58885; private static TcpClient instance = null;
private boolean isInit = false;
//private ChannelFuture channelFuture = null;
SocketChannel client = null;
private final static int CONNECT_TIMEOUT_MILLIS = 2000;
//private Bootstrap bootstrap = new Bootstrap();
public TcpClient() {}
public static TcpClient GetInstance() {
if (instance == null) {
instance = new TcpClient();
}
return instance;
}
public void init() throws Exception{
if(!isInit) {
client = SocketChannel.open(new InetSocketAddress(SERVER_IP, SERVER_PORT));
client.configureBlocking(true);
}
isInit = true;
}
public boolean sendData(byte[] data){
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
byteBuffer.put(data);
byteBuffer.flip();
int ret = 0;
try {
ret = client.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public byte[] recvData() {
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_PACKAGE_SIZE);
try {
int rs = client.read(byteBuffer);
byte[] bytes = new byte[rs];
byteBuffer.flip();
byteBuffer.get(bytes);
return bytes;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
package com.naixue.client.service;
import com.naixue.client.connect.TcpClient; import com.naixue.client.entity.User; import com.naixue.client.protocol.RpcProtocol; import com.naixue.util.ByteConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class UserService { private Logger logger = LoggerFactory.getLogger(this.getClass());
public int addUser (User userinfo) throws Exception {
//初始化客户端连接
TcpClient client = TcpClient.GetInstance();
try {
client.init();
} catch (Exception e) {
e.printStackTrace();
logger.error("init rpc client error");
}
//构造请求数据 组装协议数据
RpcProtocol rpcReq = new RpcProtocol();
rpcReq.setCmd(RpcProtocol.CMD_CREATE_USER);
rpcReq.setVersion(0x01);
rpcReq.setMagicNum(0x20110711);
byte[] body = rpcReq.userInfoTobyteArray(userinfo);
rpcReq.setBodyLen(body.length);
rpcReq.setBody(body);
//序列化 序列化数据
byte[] reqData = rpcReq.generateByteArray();
//发送请求 发送请求等待返回
client.sendData(reqData);
//接收请求结果
byte[] recvData = client.recvData();
//反序列化结果 反序列化返回数据
RpcProtocol rpcResp = new RpcProtocol();
rpcResp.byteArrayToRpcHeader(recvData);
int ret = ByteConverter.bytesToInt(rpcResp.getBody(), 0);
return ret;
}
}
序列化/反序列化 /**
- @ClassName: ${ByteConverter}
- @Description: ${}
- @author ${wangzongsheng}
- @version V1.0
- @Date ${2019-01-14}
*/
package com.naixue.util;
public class ByteConverter {
/**
* @param buf
* @return
*/
public static short bytesToShort(byte[] buf) {
return (short) (buf[0] & 0xff | ((buf[1] << 8) & 0xff00));
}
/**
* @param buf
* @return
*/
public static int bytesToIntBigEndian(byte[] buf) {
return buf[0] & 0xff | ((buf[1] << 8) & 0xff00)
| ((buf[2] << 16) & 0xff0000) | ((buf[3] << 24) & 0xff000000);
}
/**
* byte array to int
*
* @param buf
* @return
*/
public static long bytesToLong(byte[] buf) {
return (long) buf[0] & 0xffl
| (((long) buf[1] << 8) & 0xff00l)
| (((long) buf[2] << 16) & 0xff0000l)
| (((long) buf[3] << 24) & 0xff000000l)
| (((long) buf[4] << 32) & 0xff00000000l)
| (((long) buf[5] << 40) & 0xff0000000000l)
| (((long) buf[6] << 48) & 0xff000000000000l)
| (((long) buf[7] << 56) & 0xff00000000000000l);
}
public static byte[] shortToBytes(short n) {
byte[] buf = new byte[2];
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) (n >> (8 * i));
}
return buf;
}
/**
* int to byte array
*
* @param n
* @return
*/
public static byte[] intToBytes(int n) {
byte[] buf = new byte[4];
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) (n >> (8 * i));
}
return buf;
}
public static byte[] longToBytes(long n) {
byte[] buf = new byte[8];
for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) (n >> (8 * i));
}
return buf;
}
public static short bytesToShort(byte[] buf, int offset) {
return (short) (buf[offset] & 0xff | ((buf[offset + 1] << 8) & 0xff00));
}
public static int bytesToInt(byte[] buf, int offset) {
return buf[offset] & 0xff
| ((buf[offset + 1] << 8) & 0xff00)
| ((buf[offset + 2] << 16) & 0xff0000)
| ((buf[offset + 3] << 24) & 0xff000000);
}
public static long bytesToLong(byte[] buf, int offset) {
return (long) buf[offset] & 0xffl
| (((long) buf[offset + 1] << 8) & 0xff00l)
| (((long) buf[offset + 2] << 16) & 0xff0000l)
| (((long) buf[offset + 3] << 24) & 0xff000000l)
| (((long) buf[offset + 4] << 32) & 0xff00000000l)
| (((long) buf[offset + 5] << 40) & 0xff0000000000l)
| (((long) buf[offset + 6] << 48) & 0xff000000000000l)
| (((long) buf[offset + 7] << 56) & 0xff00000000000000l);
}
}
序列化过程
- 序列化请求参数到body
- 序列化RpcProtocol
反序列化过程 - 反序列化RpcProtocol
- 反序列化body
package com.naixue.util;
import com.naixue.client.protocol.RpcProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.List;
public class PkgDecoder extends ByteToMessageDecoder { private Logger logger = LoggerFactory.getLogger(PkgDecoder.class);
public PkgDecoder(){ }
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
{
if (buffer.readableBytes() < RpcProtocol.HEAD_LEN) {
return; //未读完足够的字节流,缓存后继续读
}
byte[] intBuf = new byte[4];
buffer.getBytes(buffer.readerIndex() + RpcProtocol.HEAD_LEN - 4, intBuf); // ImHeader的bodyLen在第68位到71为, int类型
int bodyLen = ByteConverter.bytesToIntBigEndian(intBuf);
if (buffer.readableBytes() < RpcProtocol.HEAD_LEN + bodyLen) {
return; //未读完足够的字节流,缓存后继续读
}
byte[] bytesReady = new byte[RpcProtocol.HEAD_LEN + bodyLen];
buffer.readBytes(bytesReady);
out.add(bytesReady);
}
}
/**
- @ClassName: ${PkgEncoder}
- @Description: ${tcp编码器}
- @author ${wangzongsheng}
- @version V1.0
- @Date ${2019-01-14}
*/
package com.naixue.util;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder;
public class PkgEncoder extends MessageToByteEncoder { public PkgEncoder() { }
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
{
try {
//在这之前可以实现编码工作。
out.writeBytes((byte[])msg);
}finally {
}
}
}
03.RPC服务消费方核心设计 RPC功能
- RPC基础功能
- 数据传输
- 序列化/反序列化
- 客户端代理类
- 请求映射分发
- RPC产品功能
Consumer 连接管理 负载均衡 请求路由 超时处理 Provider 队列/线程池 超时丢弃 优雅关闭 过载保护
连接管理 保持与服务提供方长连接,用于传输请求数据和返回结果。 暂时无法在文档外展示此内容
初始化时机 饿汉模式 懒汉模式
连接数维护 服务连接池 数据库连接池 思考:两类连接有什么本质区别? 心跳/断线重连
客户端线程模型
package com.naixue.server.connect;
import java.net.InetSocketAddress;
import com.naixue.server.entity.User; import com.naixue.server.protocol.RpcProtocol; import com.naixue.server.server.UserService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class ServerHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(ServerHandler.class);
private static int CMD_CREATE_USER = 1;
private static int CMD_FIND_USER = 2;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
InetSocketAddress socketAddress = (InetSocketAddress) ch.remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress();
logger.info("client connect to rpc server, client's ip is: " + clientIp);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
InetSocketAddress socketAddress = (InetSocketAddress) ch.remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress();
logger.info("client close the connection to rpc server, client's ip is: " + clientIp);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
byte[] recvData = (byte[]) msg;
if (recvData.length == 0) {
logger.warn("receive request from client, but the data length is 0");
return;
}
logger.info("receive request from client, the data length is: " + recvData.length);
//反序列化请求数据
RpcProtocol rpcReq = new RpcProtocol();
rpcReq.byteArrayToRpcHeader(recvData);
if(rpcReq.getMagicNum() != RpcProtocol.CONST_CMD_MAGIC){
logger.warn("request msgic code error");
return;
}
//解析请求,并调用处理方法
int ret = -1;
if(rpcReq.getCmd() == CMD_CREATE_USER){
User user = rpcReq.byteArrayToUserInfo(rpcReq.getBody());
UserService userService = new UserService();
ret = userService.addUser(user);
//构造返回数据
RpcProtocol rpcResp = new RpcProtocol();
rpcResp.setCmd(rpcReq.getCmd());
rpcResp.setVersion(rpcReq.getVersion());
rpcResp.setMagicNum(rpcReq.getMagicNum());
rpcResp.setBodyLen(Integer.BYTES);
byte[] body = rpcResp.createUserRespTobyteArray(ret);
rpcResp.setBody(body);
ByteBuf respData = Unpooled.copiedBuffer(rpcResp.generateByteArray());
ctx.channel().writeAndFlush(respData);
}
}
}
package com.naixue.server.connect;
import com.naixue.util.PkgDecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.ChannelInitializer; import org.slf4j.LoggerFactory; import org.slf4j.Logger;
public class TcpServer { private Logger logger = LoggerFactory.getLogger(this.getClass());
private int port;
private final EventLoopGroup bossGroup; //处理Accept连接事件的线程
private final EventLoopGroup workerGroup; //处理handler的工作线程
public TcpServer(int port) {
this.port = port;
this.bossGroup = new NioEventLoopGroup(1);
int cores = Runtime.getRuntime().availableProcessors();
this.workerGroup = new NioEventLoopGroup(cores);
}
public void start() throws Exception {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); //连接数
serverBootstrap.localAddress(this.port);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new PkgDecoder());
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
if (channelFuture.isSuccess()) {
logger.info("rpc server start success!");
} else {
logger.info("rpc server start fail!");
}
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
logger.error("exception occurred exception=" + ex.getMessage());
} finally {
bossGroup.shutdownGracefully().sync(); // 释放线程池资源
workerGroup.shutdownGracefully().sync();
}
}
}
package com.naixue.server.entity;
/**
-
Created by zhuanzhuan on 2019/9/3. */ public class User { private long uid; private short age; private short sex; public long getUid() { return uid; } public User setUid(long uid) { this.uid = uid; return this; } public short getAge() { return age; } public User setAge(short age) { this.age = age; return this; } public short getSex() { return sex; } public User setSex(short sex) { this.sex = sex; return this; } }
package com.naixue.server.protocol;
import com.naixue.server.entity.User; import com.naixue.util.ByteConverter; import java.io.*;
public class RpcProtocol { static public int CONST_CMD_MAGIC = 0x20110711; private int version; private int cmd; public int magicNum; private int bodyLen; private byte[] body; final public static int HEAD_LEN = 16;
public byte[] getBody() {
return body;
}
public RpcProtocol setBody(byte[] body) {
this.body = body;
return this;
}
public int getVersion() {
return version;
}
public RpcProtocol setVersion(int version) {
this.version = version;
return this;
}
public int getCmd() {
return cmd;
}
public RpcProtocol setCmd(int cmd) {
this.cmd = cmd;
return this;
}
public int getMagicNum() {
return magicNum;
}
public RpcProtocol setMagicNum(int magicNum) {
this.magicNum = magicNum;
return this;
}
public int getBodyLen() {
return bodyLen;
}
public RpcProtocol setBodyLen(int bodyLen) {
this.bodyLen = bodyLen;
return this;
}
public byte[] generateByteArray()
{
byte[] data = new byte[HEAD_LEN + bodyLen];
int index = 0;
System.arraycopy(ByteConverter.intToBytes(version), 0, data, index, Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(ByteConverter.intToBytes(cmd), 0, data, index, Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(ByteConverter.intToBytes(magicNum), 0, data, index, Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(ByteConverter.intToBytes(bodyLen), 0, data, index, Integer.BYTES);
index += Integer.BYTES;
System.arraycopy(body, 0, data, index, body.length);
return data;
}
public RpcProtocol byteArrayToRpcHeader(byte[] data) throws IOException, ClassNotFoundException {
if (data == null || data.length == 0) {
return null;
}
int index = 0;
this.setVersion(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.setCmd(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.setMagicNum(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.setBodyLen(ByteConverter.bytesToInt(data, index));
index += Integer.BYTES;
this.body = new byte[this.bodyLen];
System.arraycopy(data, index, this.body, 0, this.bodyLen);
return this;
}
public User byteArrayToUserInfo(byte[] data)
{
User user = new User();
int index = 0;
user.setUid(ByteConverter.bytesToLong(data, index));
index += Long.BYTES;
user.setAge(ByteConverter.bytesToShort(data, index));
index += Short.BYTES;
user.setSex(ByteConverter.bytesToShort(data, index));
index += Short.BYTES;
return user;
}
public byte[] userInfoTobyteArray(User info)
{
byte[] data = new byte[Long.BYTES + Short.BYTES + Short.BYTES];
int index = 0;
System.arraycopy(ByteConverter.longToBytes(info.getUid()), 0, data, index, Long.BYTES);
index += Long.BYTES;
System.arraycopy(ByteConverter.shortToBytes(info.getAge()), 0, data, index, Short.BYTES);
index += Short.BYTES;
System.arraycopy(ByteConverter.shortToBytes(info.getSex()), 0, data, index, Short.BYTES);
return data;
}
public static Object bytes2Object(byte[] objBytes) throws Exception {
if (objBytes == null || objBytes.length == 0) {
return null;
}
ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
ObjectInputStream oi = new ObjectInputStream(bi);
Object obj = oi.readObject();
bi.close();
oi.close();
return obj;
}
public static byte[] object2Bytes(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bo.close();
oo.close();
return bo.toByteArray();
}
public byte[] createUserRespTobyteArray(int result)
{
byte[] data = new byte[Integer.BYTES];
int index = 0;
System.arraycopy(ByteConverter.intToBytes(result), 0, data, index, Integer.BYTES);
return data;
}
}
package com.naixue.server.server;
import com.naixue.server.entity.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
/**
-
Created by chendong on 2019/9/3. */ public class UserService { private Logger logger = LoggerFactory.getLogger(this.getClass()); public int addUser(User userinfo){ logger.debug(“create user success, uid=” + userinfo.getUid()); return 0; } }
package com.naixue;
import com.naixue.server.connect.TcpServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class RpcServer {
private static Logger logger = LoggerFactory.getLogger(RpcServer.class);
private static int SERVER_LISTEN_PORT = 58885;
public static void main(String[] args) throws Exception {
Thread tcpServerThread = new Thread("tcpServer") {
public void run() {
TcpServer tcpServer = new TcpServer(SERVER_LISTEN_PORT);
try {
tcpServer.start();
} catch (Exception e) {
logger.info("TcpServer start exception: " + e.getMessage());
}
}
};
tcpServerThread.start();
tcpServerThread.join();
}
}
|