前言
netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享了redis客户端和websocket弹幕功能的简单实现,这次为大家带来相对比较高档的rpc框架底层网络通信,今天主要以dubbo为例,希望大家有所收获。
RPC
定义
RPC为远程服务调用,即客户端远程调用服务端的方法,然后服务端返回响应或异常。常用的RPC解决方案有JAVA RMI,webService,Http Invoker,Dubbo,SpringCloud等等。
去中心化架构
传统集中式架构: 下图是小编理解的中心化架构
中心化架构其优点为:架构简单,客户端调用的时候可以跨语言,缺点的话所有的调用都会进过ngnix(这里就可以理解为中心,大家都得进过他,无论是调用还是返回响应),ngnix一旦挂了之后就会是服务挂了,当然如果ngnix部署集群也会让架构变的复杂。 去中心化架构 如下图:
这里的话客户端调用服务端不需要进过中心直连调用。 去中心化架构简单描述后,继续来看一下rpc框架组成。
框架组成
这个架构是不是似曾相识,这里如果看过小编的dubbo分享就觉得面熟了。 上面最小化实现rpc框架就是最下面的rpc协议就可以了,这样两个服务就可以通信即可。接着咱们来介绍一下rpc协议。
协议报文
这里小编直接用dubbo协议来说明报文:
上面请求头主要有16个字节,如果看过小编的前基本应用的使用后,看到这个就可以使用netty来进行消息的拆包以及编解码。那小编接下来继续说明编解码的过程。
概设过程
这边在写代码之前,小编先分享一下设计的思路,以及一些调用的逻辑。 首先是编解码:编解码占网络传输中必须且固定的,先看下图:
具体已经在上图解释清楚了,接下来看器调用过程即各个组件功能。
上图是比较简单的,接下来是至关重要的的从客户端到服务整个流程的调用过程
容小编解释一下:
- 从客户端到服务端的调用涉及到了四个线程,分别是客户端以及服务端的业务线程和IO线程
- 发起掉用写入消息体的内容是上面的Transfer,而编码request则为客户端发起的请求。而Transfer中的Request包含了接口,方法以及参数
- 写入到socket中的为bytebuf,其经过Bytebuf -> head -> unsafe -> nio socket (doWrite) -> java nio channel -> socket
- 写入到socket则到达服务端,服务端的io线程通过多路复用选择器select轮询,之后调用read
- 读取到内容后,这边的读取流程和上面相似 unsafe read -> pipeline fireChannelRead,拿到了ByteBuf,然后解码request,根据上面的解码工具类
- 从ByteBuf拿到Transfer,之后交给业务的handler,之后涉及到服务端业务线程的处理,业务处理后返回了结果或者报错信息(这里同上其实是Transfer)
- 之后又交还给io线程,再次将Response进行编码(服务端的响应),Bytebuf写到socket
- 回到客户端io线程后,再次有select进行轮询,读取到内容Bytebuf,解码成Transfer,Transfer中的response进行反序列化拿到结果填充回执,
- 客户端拿到回执,释放等待。
注意事项 第一:加入客户端A和B的请求,客户端怎样拿到服务端回来的A响应和B相应呢,这里就需要Transfer里面的id,即协议中的id,不过如果是协议中的id,那就需要做请求的时候放入到一个map中来保存。 第二:既然知道使用id来区分请求响应,那什么时候放入到map中, 怎么保证线程安全,那最好是线程安全的map,不过高并发的时候,对系统很不友好,所以放入到map的时候也在io线程中执行。 第三:如何在io线程放入map中,这里是用eventloop的submit,写入消息完成后监听并写入map
讲完理论小编不是纯粹的理论派,还是代码实战派
代码实战
编解码工具以及传输类
Transfer类
public class Transfer {
public static final byte STATUS_ERROR = 0;
public static final byte STATUS_OK = 1;
public static final byte STATUS_ILLEGAL = 2;
public static final byte SERIALIZABLE_JAVA=1;
public static final byte SERIALIZABLE_HESSIAN2=2;
public static final byte SERIALIZABLE_JSON=3;
boolean request;
byte serializableId;
boolean twoWay;
boolean heartbeat;
long id;
byte status;
Object target;
public Transfer(long id) {
this.id = id;
}
}
编解码工具类
public class RpcCodec extends ByteToMessageCodec {
private static final int HEADER_LENGTH = 16;
private static final short MAGIC = 0xdad;
private static final ByteBuf MAGIC_BUF = Unpooled.copyShort(MAGIC);
private static final byte FLAG_REQUEST = (byte) 0x80;
private static final byte FLAG_TWO_WAY = (byte) 0x40;
private static final byte FLAG_EVENT = (byte) 0x20;
private static final int SERIALIZATION_MASK = 0x1f;
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (msg instanceof Transfer) {
doEncode((Transfer) msg, out);
} else {
throw new IllegalArgumentException();
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
Transfer transfer = doDecode(in);
if (transfer != null) {
out.add(transfer);
}
}
protected void doEncode(Transfer data, ByteBuf buf) {
byte[] header = new byte[HEADER_LENGTH];
Bytes.short2bytes(MAGIC, header);
header[2] = data.serializableId;
if (data.request) header[2] |= FLAG_REQUEST;
if (data.twoWay) header[2] |= FLAG_TWO_WAY;
if (data.heartbeat) header[2] |= FLAG_EVENT;
if (!data.request) header[3] = data.status;
Bytes.long2bytes(data.id, header, 4);
int len = 0;
byte[] body = new byte[0];
if (!data.heartbeat) {
body = serialize(data.serializableId, data.target);
len = body.length;
}
Bytes.int2bytes(len, header, 12);
buf.writeBytes(header);
buf.writeBytes(body);
}
protected Transfer doDecode(ByteBuf in) {
int index = ByteBufUtil.indexOf(MAGIC_BUF, in);
if (index < 0) {
return null;
}
if (!in.isReadable(index + HEADER_LENGTH)) {
return null;
}
byte[] header = new byte[HEADER_LENGTH];
ByteBuf slice = in.slice();
slice.readBytes(header);
int length = Bytes.bytes2int(header, 12);
if (!in.isReadable(index + HEADER_LENGTH + length)) {
return null;
}
Transfer transfer = new Transfer(Bytes.bytes2long(header, 4));
transfer.heartbeat = (header[2] & FLAG_EVENT) != 0;
transfer.request = (header[2] & FLAG_REQUEST) != 0;
transfer.twoWay = (header[2] & FLAG_TWO_WAY) != 0;
transfer.serializableId = (byte) (header[2] & SERIALIZATION_MASK);
transfer.status = header[3];
if (!transfer.heartbeat) {
byte[] content = new byte[length];
slice.readBytes(content);
transfer.target = deserialize(transfer.serializableId, content);
}
in.skipBytes(index + HEADER_LENGTH + length);
return transfer;
}
private byte[] serialize(byte serializableId, Object target) {
if (serializableId == Transfer.SERIALIZABLE_JAVA) {
ByteArrayOutputStream out;
try {
out = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(out);
stream.writeObject(target);
} catch (IOException e) {
throw new RuntimeException(e);
}
return out.toByteArray();
} else {
throw new UnsupportedOperationException();
}
}
private Object deserialize(byte serializableId, byte[] bytes) {
if (serializableId == Transfer.SERIALIZABLE_JAVA) {
try {
ObjectInputStream stream =
new ObjectInputStream(new ByteArrayInputStream(bytes));
return stream.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
} else {
throw new UnsupportedOperationException();
}
}
}
客户端代码
public class RpcClient {
static AtomicLong atomicLong = new AtomicLong(100);
private Channel channel;
private Map<Long, Promise<Response>> results = new HashMap<>();
public static long getNextId() {
return atomicLong.getAndIncrement();
}
public void init(String address, int port) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast("codec", new RpcCodec());
ch.pipeline().addLast("resultSet", new ResultFill());
}
});
ChannelFuture connect = bootstrap.connect(address, port);
channel = connect.sync().channel();
System.out.println("连接成功");
channel.eventLoop().scheduleWithFixedDelay(() -> {
Transfer transfer=new Transfer(getNextId());
transfer.heartbeat=true;
channel.writeAndFlush(transfer);
},2000,2000,TimeUnit.MILLISECONDS);
}
public Response invokerRemote(Class serverInterface,
String methodDesc,
Object[] args) throws InterruptedException, ExecutionException, TimeoutException {
Request request = new Request(serverInterface.getName(), methodDesc);
request.setArgs(args);
Transfer transfer = new Transfer(getNextId());
transfer.request=true;
transfer.serializableId=Transfer.SERIALIZABLE_JAVA;
transfer.target = request;
DefaultPromise<Response> resultPromise = new DefaultPromise(channel.eventLoop());
channel.writeAndFlush(transfer).addListener(future ->
{
if (future.cause() != null) {
resultPromise.setFailure(future.cause());
} else {
results.put(transfer.id, resultPromise);
}
}
);
return resultPromise.get(10000, TimeUnit.MILLISECONDS);
}
private class ResultFill extends SimpleChannelInboundHandler<Transfer> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Transfer msg) {
if (msg.heartbeat) {
System.out.println(String.format("服务端心跳返回:%s",
ctx.channel().remoteAddress()));
} else {
Promise<Response> promise = results.remove(msg.id);
promise.setSuccess((Response) msg.target);
}
}
}
public <T> T getRemoteService(Class<T> serviceInterface) {
assert serviceInterface.isInterface();
Object o = Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Exception {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
String methodDescriptor = method.getName()+Type.getMethodDescriptor(method);
Response response = invokerRemote(serviceInterface, methodDescriptor, args);
if (response.getError() != null) {
throw new RuntimeException("远程服务调用异常:", response.getError());
}
return response.getResult();
}
});
return (T) o;
}
}
服务端代码:
public class RpcServer {
ExecutorService threadPool = Executors.newFixedThreadPool(500);
private Map<String, ServiceBean> register = new HashMap<>();
public void start(int port) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("codec", new RpcCodec());
ch.pipeline().addLast("dispatch", new Dispatch());
}
}).bind(port).sync();
System.out.println("服务启动成功");
}
private class Dispatch extends SimpleChannelInboundHandler<Transfer> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Transfer transfer) {
if (transfer.heartbeat) {
Transfer t = new Transfer(transfer.id);
t.heartbeat = true;
t.request = false;
ctx.writeAndFlush(t);
} else {
threadPool.submit(() -> {
Transfer to = doDispatchRequest(transfer);
ctx.writeAndFlush(to);
});
}
}
Transfer doDispatchRequest(Transfer from) {
Request request = (Request) from.target;
Transfer to = new Transfer(from.id);
to.request = false;
to.serializableId = from.serializableId;
Response response = new Response();
try {
String serverId = request.getClassName() + request.getMethodDesc();
ServiceBean serverBean = register.get(serverId);
if (serverBean == null) {
throw new IllegalArgumentException("找不到服务" + serverId);
}
Object result = serverBean.invoke(request.getArgs());
response.setResult(result);
to.status = Transfer.STATUS_OK;
} catch (Throwable e) {
e.printStackTrace();
response.setError(e);
to.status = Transfer.STATUS_ERROR;
}
to.target = response;
return to;
}
}
private static class ServiceBean {
Method method;
Object target;
public ServiceBean(Method method, Object target) {
this.method = method;
this.target = target;
}
public Object invoke(Object[] args) throws Exception {
return method.invoke(target, args);
}
}
public void registerServer(Class serviceInterface, Object serverBean) {
assert serviceInterface.isInterface();
for (Method method : serviceInterface.getMethods()) {
int modifiers = method.getModifiers();
if (Modifier.isStatic(modifiers) || Modifier.isNative(modifiers)) {
continue;
}
String methodDescriptor = Type.getMethodDescriptor(method);
String key = serviceInterface.getName() +method.getName()+ methodDescriptor;
register.put(key, new ServiceBean(method, serverBean));
}
}
}
测试类
public class RpcTest {
@Test
public void startServerTest() throws InterruptedException, IOException {
RpcServer server = new RpcServer();
server.registerServer(UserService.class, new UserServiceImpl());
server.start(8080);
System.in.read();
}
public static void main(String[] args) throws InterruptedException, IOException {
RpcClient client = new RpcClient();
client.init("127.0.0.1", 8080);
UserService service = client.getRemoteService(UserService.class);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String s = in.readLine();
System.out.println(service.getUser(1));
}
}
@Test
public void syncTest() throws InterruptedException, IOException {
RpcClient client = new RpcClient();
client.init("127.0.0.1", 8080);
UserService service = client.getRemoteService(UserService.class);
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++) {
int id = i;
executor.execute(() -> {
User user = service.getUser(id);
System.out.println(user);
assert user.getId().equals(id);
});
}
System.in.read();
}
}
对于UserSevice大家可以自己建一个接口和随便实现一下即可。
总结
利用netty实现rpc还是挺有难度的,小编是进过将近一周才陆续写完这篇博客,希望小编已经充分讲清楚了,假如大家将netty的理论与实战结合完毕,那相信和小编一样有长足进步。加油!
|