前言
编解码器在RPC框架中处于最基础也是最重要的部分之一,上一篇文章的编写中,并没有对数据进行过深入的编解码,只是使用字符串进行序列化然后进行的传输,在实际的项目中是不能这么做的,所以这篇文章主要对编解码这块进行了优化,为了保证代码的完整性,会将所有的代码在这片文章中进行粘贴
一、jar包引用
由于不在将对象转换成字符串进行传输,所以不在需要使用fastJson,而是使用对象序列化工具类
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>3.1.5</version>
</dependency>
二、公共基础类
1.定义一个请求协议封装类
import java.util.Arrays;
public class RequestObject {
private String interfaceName;
private String methodName;
private String[] argsSig;
private transient Object[] args;
public RequestObject(String interfaceName, Method method, Object[] args) {
this.interfaceName = interfaceName;
this.methodName = method.getName();
this.args = args;
String[] argsSig = new String[args.length];
for (int i = 0; i < args.length; i++) {
argsSig[i] = args[i].getClass().getTypeName();
}
this.argsSig = argsSig;
}
}
2.定义一个RPC请求对象
import java.util.Arrays;
public class RpcRequest {
private RequestObject requestObject;
private int count = 8;
private short interfaceNameLen;
private short methodNameLen;
private int contentLen;
private byte[] interfaceNameByte;
private byte[] methodNameByte;
private byte[] contentByte;
}
3.定义响应数据对象
public class ResponseObject {
private String responseSig;
private Object result;
public ResponseObject(Object result) {
this.result = result;
this.responseSig = result.getClass().getTypeName();
}
}
4.定义响应数据包体对象
public class RpcResponse {
private int responseLen = 4;
private byte[] content;
public void setContent(byte[] content) {
this.content = content;
if (null != content) {
this.responseLen+= content.length;
}
}
}
5.定义一个自定义的SerializerFactory
import com.caucho.hessian.io.JavaSerializer;
import com.caucho.hessian.io.Serializer;
import com.caucho.hessian.io.SerializerFactory;
public class MySerializerFactory extends SerializerFactory {
@Override
protected Serializer getDefaultSerializer(Class cl) {
if (this._defaultSerializer != null) {
return this._defaultSerializer;
}
return new JavaSerializer(cl);
}
}
三、服务消费端代码编写(Consumer)
public class RequestProxyHandler implements InvocationHandler, Serializable {
private final Socket socket;
private final String interfaceName;
public RequestProxyHandler(Socket socket, String interfaceName) {
this.socket = socket;
this.interfaceName = interfaceName;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RequestObject requestObject = new RequestObject(interfaceName, method, args);
RpcRequest rpcRequest = new RpcRequest(requestObject);
OutputStream outputStream = socket.getOutputStream();
String interfaceName = requestObject.getInterfaceName();
rpcRequest.setInterfaceNameByte(interfaceName.getBytes(StandardCharsets.UTF_8));
String methodName = requestObject.getMethodName();
rpcRequest.setMethodNameByte(methodName.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Hessian2Output hessian2Output = new Hessian2Output(byteArrayOutputStream);
hessian2Output.setSerializerFactory(new MySerializerFactory());
try {
hessian2Output.writeObject(requestObject);
if (args.length > 0) {
for (Object arg : requestObject.getArgs()) {
hessian2Output.writeObject(arg);
}
}
hessian2Output.close();
rpcRequest.setContentByte(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(rpcRequest.getCount());
byteBuffer.putShort(rpcRequest.getInterfaceNameLen())
.putShort(rpcRequest.getMethodNameLen()).putInt(rpcRequest.getContentLen())
.put(rpcRequest.getInterfaceNameByte()).put(rpcRequest.getMethodNameByte()).put(rpcRequest.getContentByte());
outputStream.write(byteBuffer.array());
outputStream.flush();
InputStream inputStream = socket.getInputStream();
byte[] contentLength = new byte[4];
inputStream.read(contentLength);
ByteBuffer lengthBuffer = ByteBuffer.wrap(contentLength);
byte[] content = new byte[lengthBuffer.getInt()];
inputStream.read(content);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content);
Hessian2Input hessian2Input = new Hessian2Input(byteArrayInputStream);
hessian2Input.setSerializerFactory(new MySerializerFactory());
Object object = hessian2Input.readObject();
if (object instanceof ResponseObject) {
ResponseObject responseObject = (ResponseObject) object;
return responseObject.getResult();
}
return null;
}
}
消费端测试代码
public class Consumer {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
HelloRpc helloRpc = (HelloRpc) Proxy.newProxyInstance(HelloRpc.class.getClassLoader()
, new Class[]{HelloRpc.class}, new RequestProxyHandler(socket, HelloRpc.class.getTypeName()));
String result = helloRpc.hello("张三");
System.out.println(result);
}
}
四.生产者端代码
public class Provider {
public static void main(String[] arg) throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
Map<String, Object> providerCache = new HashMap<>();
providerCache.put(HelloRpc.class.getName(), new HelloRpcImpl());
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
Socket clientSocket = serverSocket.accept();
InputStream inputStream = clientSocket.getInputStream();
byte[] length = new byte[8];
inputStream.read(length);
ByteBuffer allLengthByte = ByteBuffer.wrap(length);
byte[] interfaceNameByte = new byte[allLengthByte.getShort()];
inputStream.read(interfaceNameByte);
String interfaceName = new String(interfaceNameByte, StandardCharsets.UTF_8);
if ("".equals(interfaceName) || null == providerCache.get(interfaceName)) {
continue;
}
byte[] methodNameByte = new byte[allLengthByte.getShort()];
String methodName = new String(methodNameByte, StandardCharsets.UTF_8);
byte[] content = new byte[allLengthByte.getInt()];
Hessian2Input hessian2Input = new Hessian2Input(new ByteArrayInputStream(content));
hessian2Input.setSerializerFactory(new MySerializerFactory());
Object object = hessian2Input.readObject();
if (object instanceof RequestObject) {
RequestObject requestObject = (RequestObject) object;
String[] argsSig = requestObject.getArgsSig();
Object[] args = new Object[argsSig.length];
Class<?>[] argsClazz = new Class<?>[argsSig.length];
for (int i = 0; i < argsSig.length; i++) {
Class<?> aClass = ClassUtils.forName(argsSig[i], Thread.currentThread().getContextClassLoader());
argsClazz[i] = aClass;
args[i] = hessian2Input.readObject(aClass);
}
requestObject.setArgs(args);
Object o = providerCache.get(interfaceName);
Method declaredMethod = o.getClass().getDeclaredMethod(methodName, argsClazz);
Object result = declaredMethod.invoke(o, args);
ResponseObject responseObject = new ResponseObject(result);
RpcResponse rpcResponse = new RpcResponse();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Hessian2Output hessian2Output = new Hessian2Output(byteArrayOutputStream);
hessian2Output.setSerializerFactory(new MySerializerFactory());
hessian2Output.writeObject(responseObject);
hessian2Output.close();
rpcResponse.setContent(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.close();
ByteBuffer byteBuffer = ByteBuffer.allocate(rpcResponse.getResponseLen());
ByteBuffer put = byteBuffer.putInt(rpcResponse.getResponseLen()).put(rpcResponse.getContent());
OutputStream outputStream = clientSocket.getOutputStream();
outputStream.write(put.array());
outputStream.flush();
}
}
}
}
|