承接着上一节,手把手教你写一个RPC框架(二) 我们定义了两个注解、编写了的连接Zookeeper注册中心的逻辑,这一节,我们继续来完成RPC框架吧!
七 序列化
RPC框架中,离不开网络请求,服务A调用服务B的方法,要发送一个网络请求,服务B收到网络请求后,解析请求,然后把方法的执行结果返回给服务A。为了实现这些步骤,需要编写消息请求体、消息相应体、序列化与反序列化的相关逻辑。下面一起来看看怎么写吧~
import lombok.Data;
import java.io.Serializable;
@Data
public class RpcRequest implements Serializable {
private String serviceName;
private String method;
private Class<?>[] methodParameterTypes;
private Object[] methodParameters;
}
上面这四个属性是一个RPC请求中必不可少的
- 所请求的服务接口名
- 所请求的服务接口中,具体的方法名
- 所请求的方法的参数类型
- 所请求的方法的参数
有了这些属性,我们才能从注册中心中找到对应的服务。当然有小伙伴肯定会问,RpcRequest中并没有所请求服务的IP地址和端口号,该怎么找到对应的地址啊?
其实在上一节,在注册中心编写的代码中,就有写到注册的逻辑:
package com.zhongger.rpc.register.impl;
import com.alibaba.fastjson.JSON;
import com.zhongger.rpc.entity.ServerNode;
import com.zhongger.rpc.register.RpcServiceRegister;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
public class ZookeeperRpcServiceRegister implements RpcServiceRegister {
@Override
public void register(ServiceNode serviceNode) throws Exception {
logger.info("register server node info is {}", serviceNode);
String uri = JSON.toJSONString(serviceNode);
uri = URLEncoder.encode(uri, "UTF-8");
String servicePath = "/com/zhongger/rpc/" + serviceNode.getServiceName() + "/service";
if (zookeeperClient.checkExists().forPath(servicePath) == null) {
logger.info("service path {} not exist create persistent node ", servicePath);
zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(servicePath);
}
String uriPath = servicePath + "/" + uri;
logger.info("uri path is {}", uriPath);
if (zookeeperClient.checkExists().forPath(uriPath) != null) {
zookeeperClient.delete().forPath(uriPath);
}
zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(uriPath);
}
}
其中ServerNode实体类就包含了IP地址、端口号、服务名称的信息,比如我有个接口HelloService,IP地址127.0.0.1 ,端口8888:注册到Zookeeper中的结构就是这样的,永久节点是:/com/zhongger/rpc/HelloService/service 临时节点则是:/com/zhongger/rpc/HelloService/service/{ServiceNode的JSON结构} ,所以RpcRequest其实就不需要Ip地址和port这些属性了。另外,寻找对应方法的时候,只需要遍历/com/zhongger/rpc/HelloService/service 下的所有子节点,然后通过负载均衡来选择一个Ip地址+Port来调用对应的方法就行啦,后面会详细介绍这里的实现。
写完了请求,那么来编写响应,响应就比较简单了,就是把方法的执行结果封装一下:
package com.zhongger.rpc.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class RpcResponse implements Serializable {
private String status;
private Object value;
private Exception exception;
}
那么对于网络传输,我们需要把对象序列化成byte数组,然后我们要操作对象的话,则需要把byte数组反序列化成对象。
1 定义序列化协议
写一个接口,来约定序列化的协议
public interface MessageSerializationProtocol {
byte[] marshal(Object object) throws Exception;
<T> T unMarshal(byte[] bytes, Class<T> clazz) throws Exception;
}
- marshal:将对象序列化成byte[]
- unMarshal:将byte[]反序列化成指定Class的对象
2 JDK序列化
定义 JdkMessageSerializationProtocol 类实现 MessageSerializationProtocol 接口,JDK序列化的实现如下:
import com.zhongger.rpc.serialization.MessageSerializationProtocol;
import java.io.*;
public class JdkMessageSerializationProtocol implements MessageSerializationProtocol {
@Override
public byte[] marshal(Object object) throws Exception {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(object);
byte[] result = byteArrayOutputStream.toByteArray();
objectOutputStream.close();
byteArrayOutputStream.close();
return result;
}
@Override
public <T> T unMarshal(byte[] bytes, Class<T> clazz) throws Exception {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
T result = clazz.cast(objectInputStream.readObject());
objectInputStream.close();
byteArrayInputStream.close();
return result;
}
}
JDK序列化的方式效率是比较低的,于是我用了比较流行的Kryo序列化框架又实现了一套序列化协议
3 Kryo序列化
使用Kryo要注意:Kryo是非线程安全的,需要ThreadLocal来防止出现线程安全问题
package com.zhongger.rpc.serialization.impl;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.zhongger.rpc.entity.RpcRequest;
import com.zhongger.rpc.entity.RpcResponse;
import com.zhongger.rpc.serialization.MessageSerializationProtocol;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
public class KryoMessageSerializationProtocol implements MessageSerializationProtocol {
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcRequest.class);
kryo.register(RpcResponse.class);
return kryo;
});
@Override
public byte[] marshal(Object object) throws Exception {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, object);
kryoThreadLocal.remove();
return output.toBytes();
}
@Override
public <T> T unMarshal(byte[] bytes, Class<T> clazz) throws Exception {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
Kryo kryo = kryoThreadLocal.get();
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
}
}
好了序列化协议就这样写好了,接下来,就要写比较复杂的网络通信咯~
|