Netty网络框架学习笔记-20(实现一个简单RPC-2_2020.07.02)
服务提供者
1.1 定义接口以及其实现
1.1.1 MyRPCTest
public interface MyRPCTest {
String hiHi(String p1);
}
1.1.2 MyRPCTestImpl
public class MyRPCTestImpl implements MyRPCTest {
@Override
public String hiHi(String p1) {
return "成功进行了远程调用哟, 恭喜恭喜! 你的参数:"+p1+"==="+ UUID.fastUUID().toString();
}
}
2.1 服务注册
2.1.2 ServiceRegistration
@Slf4j
public class ServiceRegistration {
public static void ProjectStartCompletedTrigger(RegistrationFuture registrationFuture) {
new Thread(() -> {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new JsonObjectDecoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new MyProduceRegistrationHandler(registrationFuture));
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888));
channelFuture.addListener(el -> {
if (el.isSuccess()) {
log.error("ServiceRegistration===已经成功连接!!!");
} else {
log.error("ServiceRegistration===连接失败!!!");
}
});
try {
channelFuture.sync().channel().closeFuture().sync();
} catch (Exception e) {
log.error("ServiceRegistration===发生异常, 信息:{}", e);
} finally {
workerGroup.shutdownGracefully();
}
}).start();
}
}
2.1.3 RegistrationFuture
public class RegistrationFuture implements Future<Object> {
private boolean isDone;
private Object result;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
synchronized (this) {
this.wait();
}
return result;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
public void setResult(Object result) {
this.result = result;
this.isDone = Boolean.TRUE;
synchronized (this) {
this.notify();
}
}
}
2.1.4 MyProduceRegistrationHandler
@Slf4j
public class MyProduceRegistrationHandler extends SimpleChannelInboundHandler<String> {
private RegistrationFuture registrationFuture;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("MyRegistrationHandler===读取到消息:{}",msg);
registrationFuture.setResult("注册完成啦,大兄弟!");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
doRegistrationInfo(ctx);
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("MyRegistrationHandler===发生异常:{}",cause);
ctx.close();
super.exceptionCaught(ctx, cause);
}
public MyProduceRegistrationHandler(RegistrationFuture registrationFuture) {
this.registrationFuture = registrationFuture;
}
private void doRegistrationInfo(ChannelHandlerContext ctx) {
RegistrationInfo registrationInfo = new RegistrationInfo();
registrationInfo.setType(0);
registrationInfo.setServiceName("myRPCProvider");
registrationInfo.setIp("127.0.0.1");
registrationInfo.setPort(7777);
String jsonStr = JSONUtil.toJsonStr(registrationInfo);
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes(StandardCharsets.UTF_8)));
}
}
3.0 服务提供
3.0.1 MyProvide
@Slf4j
public class MyProvide {
public static void doProvide(){
Map<String,Object> Beans = new HashMap<>();
Beans.put("myRPCTest",new MyRPCTestImpl());
new Thread(()->{
NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap = serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childHandler(new ProvideChannelInitializer(Beans));
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 7777));
channelFuture.addListener(el->{
if (el.isSuccess()) {
log.info("MyProvide===服务提供者启动成功, 等待消费者访问!");
}
});
try {
channelFuture.sync().channel().closeFuture().sync();
} catch (Exception e) {
log.info("MyProvide===发生异常, 消息:{}",e);
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}).start();
}
}
3.0.2 ProvideChannelInitializer
public class ProvideChannelInitializer extends ChannelInitializer<SocketChannel> {
private Map<String,Object> Beans;
private static final UnorderedThreadPoolEventExecutor UNORDERED_THREAD_POOL_EVENT_EXECUTOR = new UnorderedThreadPoolEventExecutor(16);
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new JsonObjectDecoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(UNORDERED_THREAD_POOL_EVENT_EXECUTOR,new ProvideChannelHandler(Beans));
}
public ProvideChannelInitializer(Map<String, Object> beans) {
Beans = beans;
}
}
3.0.3 ProvideChannelHandler
@Slf4j
public class ProvideChannelHandler extends SimpleChannelInboundHandler<String> {
private Map<String, Object> Beans;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
MessageProtocol messageProtocol = JSONUtil.toBean(msg, MessageProtocol.class);
log.info("ProvideChannelHandler===, 开始处理客户端请求, 请求参数:{}",messageProtocol);
if (Objects.nonNull(messageProtocol)) {
String className = messageProtocol.getClassName();
String methodName = messageProtocol.getMethodName();
Object[] methodParameter = messageProtocol.getMethodParameter();
Class[] classes = this.getClasses(messageProtocol);
Object obj = Beans.get(className);
if (Objects.nonNull(obj)) {
try {
Class<?> aClass = obj.getClass();
Method method = aClass.getMethod(methodName,classes);
if (Objects.nonNull(method)) {
Object invoke = method.invoke(obj, methodParameter);
messageProtocol.setInvokeResult(new InvokeResult(0, invoke));
ctx.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(messageProtocol).getBytes(StandardCharsets.UTF_8)));
return;
}
} catch (Exception e) {
messageProtocol.setInvokeResult(new InvokeResult(1, e.getMessage()));
ctx.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(messageProtocol).getBytes(StandardCharsets.UTF_8)));
return;
}
}
}
messageProtocol.setInvokeResult(new InvokeResult(1, "没有找到方法!"));
ctx.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(messageProtocol).getBytes(StandardCharsets.UTF_8)));
}
private Class[] getClasses(MessageProtocol messageProtocol) {
return Optional.ofNullable(messageProtocol.getMethodParameterTypes())
.map(el -> Arrays.stream(el).filter(Objects::nonNull)
.map(obj -> {
try {
return Class.forName(obj.toString().replace("class","").trim());
} catch (ClassNotFoundException e) {
return null;
}
}).toArray(Class[]::new)).orElse(null);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("MyChannelHandler===发生异常, 信息:{}", cause);
ctx.close();
}
public ProvideChannelHandler(Map<String, Object> beans) {
Beans = beans;
}
}
测试服务提供者启动
进行服务注册, 服务提供
public class ProvideStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("假设SpringBoot项目启动中, 加载各类Bean");
System.out.println("===================================");
System.out.println("假设SpringBoot项目启动完成");
System.out.println("===================================");
System.out.println("进行服务注册");
RegistrationFuture registrationFuture = new RegistrationFuture();
ServiceRegistration.ProjectStartCompletedTrigger(registrationFuture);
System.out.println("=========="+registrationFuture.get()+"=============");
System.out.println("进行服务提供");
MyProvide.doProvide();
System.out.println("============服务提供启动完成==============");
LockSupport.park();
}
}
提供者日志结果:
假设SpringBoot项目启动中, 加载各类Bean
===================================
假设SpringBoot项目启动完成
===================================
进行服务注册
19:01:54.638 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.ServiceRegistration - ServiceRegistration===已经成功连接!!!
19:01:54.705 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyProduceRegistrationHandler===doRegistrationInfo, 服务信息注册:{"ip":"127.0.0.1","type":0,"serviceName":"myRPCProvider","port":7777}
19:01:54.777 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyRegistrationHandler===读取到消息:{"result":"信息注册成功!"}
==========注册完成啦,大兄弟!=============
进行服务提供
============服务提供启动完成==============
19:01:54.795 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.rpc.serviceproduce.produce.MyProvide - MyProvide===服务提供者启动成功, 等待消费者访问!
注册中心日志
19:01:54.769 [defaultEventLoopGroup-4-1] INFO com.zhihao.netty.rpc.registrationcenter.handler.MyRegisteredChannelHandler - MyChannelHandler===注册成功一个服务提供者:RegistrationInfo(type=0, serviceName=myRPCProvider, ip=127.0.0.1, port=7777)
客户端消费者
1.1 定义接口
public interface MyRPCTest {
String hiHi(String p1);
}
2.1 编写接口动态代理
2.1.1 ConsumerFactory
public class ConsumerFactory {
public static <T> T getProxy(Class<T> aclass) {
if (null == aclass){
return (T) aclass;
}
if (!aclass.isInterface()){
throw new RuntimeException(aclass.getName()+ "该类不是接口, 无法使用JDK动态代理!");
}
return (T) Proxy.newProxyInstance(aclass.getClassLoader(), new Class[]{aclass}, new NettyInvocationHandler());
}
}
2.1.2 NettyInvocationHandler
public class NettyInvocationHandler implements InvocationHandler {
private Map<String, NettyClient> nettyClientMap;
public NettyInvocationHandler() {
this.nettyClientMap = NettyConsumerContext.nettyClientMap;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String serviceName = "myRPCProvider";
NettyClient nettyClient = null;
synchronized (serviceName.intern()) {
if (!nettyClientMap.containsKey(serviceName)) {
RegistrationInfo registrationInfo = GetRegistrationInfo.getGetRegistrationInfo(serviceName);
nettyClient = NettyClient.buildClient(registrationInfo);
Optional.ofNullable(nettyClient).ifPresent(el -> nettyClientMap.put(serviceName, el));
} else {
nettyClient = nettyClientMap.get(serviceName);
}
}
if (null == nettyClient){
throw new RuntimeException(serviceName+"客户端初始化有误!");
}
Object invoke = nettyClient.doRemotelyInvoke(method, args);
return invoke;
}
}
3.1 拉取服务提供者信息
3.1.1 GetRegistrationInfo
@Slf4j
public class GetRegistrationInfo {
private static MyGetRegistrationHandler myGetRegistrationHandler;
public static final RegistrationInfo getGetRegistrationInfo(String serviceName) {
checkingClientIsSuccess(serviceName);
Object apply = myGetRegistrationHandler.apply(serviceName);
if (apply instanceof String) {
throw new InvalidParameterException(serviceName + "注册中心没有该服务地址");
}
if (apply instanceof RegistrationInfo) {
return (RegistrationInfo) apply;
}
return null;
}
private synchronized static void checkingClientIsSuccess(String serviceName) {
if (null == myGetRegistrationHandler) {
try {
initRegistrationNettyClient();
GetRegistrationInfo.class.wait(5000L);
} catch (Exception e) {
log.error("GetRegistrationInfo===初始化客户端发生异常, 信息:{}", e);
throw new RuntimeException(e);
}
if (null == myGetRegistrationHandler) {
throw new RuntimeException("initNettyClient===初始化客户端发生异常");
}
log.error("GetRegistrationInfo===初始化客户端完成");
}
}
private static void initRegistrationNettyClient() {
new Thread(() -> {
log.info("GetRegistrationInfo===开始初始化!!!");
MyGetRegistrationHandler registrationHandler = new MyGetRegistrationHandler();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new JsonObjectDecoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(registrationHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8888));
channelFuture.addListener(el -> {
if (el.isSuccess()) {
myGetRegistrationHandler = registrationHandler;
log.info("GetRegistrationInfo===已经成功连接!!!");
synchronized (GetRegistrationInfo.class){
GetRegistrationInfo.class.notify();
}
} else {
log.error("GetRegistrationInfo===连接失败!!!");
}
});
try {
channelFuture.sync().channel().closeFuture().sync();
} catch (Exception e) {
log.error("GetRegistrationInfo===发生异常, 信息:{}", e);
} finally {
workerGroup.shutdownGracefully();
}
}).start();
}
}
3.1.2 MyGetRegistrationHandler
@Slf4j
public class MyGetRegistrationHandler extends SimpleChannelInboundHandler<String> implements Function<Object,Object> {
private ChannelHandlerContext context;
private Object result;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
super.channelActive(ctx);
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("MyGetRegistrationHandler===读取到信息:{}",msg);
JSONObject jsonObject = JSONUtil.parseObj(msg);
this.result = jsonObject.get("result");
if (this.result == null) {
this.result = JSONUtil.toBean(msg, RegistrationInfo.class);
}
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("MyGetRegistrationHandler===发生异常:{}",cause);
ctx.close();
super.exceptionCaught(ctx, cause);
}
@Override
public synchronized Object apply(Object o) {
this.result = null;
RegistrationInfo registrationInfo = new RegistrationInfo();
registrationInfo.setServiceName(o.toString());
registrationInfo.setType(1);
try {
this.context.writeAndFlush(Unpooled.wrappedBuffer(JSONUtil.toJsonStr(registrationInfo).getBytes(StandardCharsets.UTF_8)));
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return this.result;
}
}
4.0 初始化远程调用客户端
4.1.1 NettyClient
@Slf4j
public class NettyClient {
private String ip;
private Integer port;
private String serviceName;
private Map<String, ChannelHandlerContext> channelHandlerContextMap;
private Map<String, SynchronousQueue<?>> resultConcurrentHashMap;
public NettyClient(RegistrationInfo info) {
if (StrUtil.isBlank(info.getIp())) {
throw new RuntimeException("提供者注册信息有误!!!");
}
this.ip = info.getIp();
this.port = info.getPort();
this.serviceName = info.getServiceName();
this.channelHandlerContextMap = NettyConsumerContext.channelHandlerContextMap;
this.resultConcurrentHashMap = NettyConsumerContext.resultConcurrentHashMap;
}
public static NettyClient buildClient(RegistrationInfo info) {
NettyClient nettyClient = new NettyClient(info);
nettyClient.initNettyClient();
boolean flag = nettyClient.channelHandlerContextMap.containsKey(nettyClient.serviceName);
log.info("initNettyClient===serviceName:{}, 初始化客户端结果:{}", nettyClient.serviceName, flag);
return flag ? nettyClient : null;
}
public Object doRemotelyInvoke(Method method, Object[] args) throws TimeoutException {
ChannelHandlerContext channelHandlerContext = channelHandlerContextMap.get(serviceName);
if (null == channelHandlerContextMap) {
throw new RuntimeException("服务提供者已不存在!!!");
}
MessageProtocol protocol = new MessageProtocol();
String requestId = UUID.fastUUID().toString() + this.getCurrentTimeSecond();
String simpleName = method.getDeclaringClass().getSimpleName();
protocol.setClassName(StrUtil.lowerFirst(simpleName));
protocol.setMethodName(method.getName());
protocol.setMethodParameterTypes(method.getParameterTypes());
protocol.setMethodParameter(args);
protocol.setRequestId(requestId);
SynchronousQueue<?> synchronousQueue = new SynchronousQueue<>();
resultConcurrentHashMap.put(requestId, synchronousQueue);
String jsonStr = JSONUtil.toJsonStr(protocol);
channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes(StandardCharsets.UTF_8)));
log.info("doRemotelyInvoke===已经发送, jsonStr:{}!!!!", jsonStr);
Class<?> returnType = method.getReturnType();
InvokeResult take = null;
long second;
try {
second = this.getCurrentTimeSecond();
take = (InvokeResult) synchronousQueue.poll(30L, TimeUnit.SECONDS);
second = this.getCurrentTimeSecond() - second;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (second >= 30L){
resultConcurrentHashMap.remove(requestId);
throw new TimeoutException("调用超时!!!");
}
Integer resultCode = Optional.ofNullable(take).map(InvokeResult::getResultCode).orElse(-1);
Object result = null;
if (resultCode.equals(1)) {
throw new RuntimeException(take.getFailMessage());
} else {
String resultStr = Optional.ofNullable(take).map(InvokeResult::getInvokeResult)
.map(Objects::toString).orElse("");
if (JSONUtil.isJsonArray(resultStr)) {
result = JSONUtil.toList(resultStr, returnType);
} else if (JSONUtil.isJsonObj(resultStr)) {
result = JSONUtil.toBean(resultStr, returnType);
} else if (ClassUtil.isBasicType(returnType)) {
result = resultStr;
} else {
result = resultStr;
}
}
return result;
}
private void initNettyClient() {
synchronized (serviceName.intern()) {
try {
if (!channelHandlerContextMap.containsKey(serviceName)) {
new Thread(() -> {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new MyChannelInitializer(serviceName));
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(ip, port));
channelFuture.addListener(el -> {
if (el.isSuccess()) {
log.info("initNettyClient===serviceName:{}, 连接成功!!!", serviceName);
} else {
log.error("initNettyClient===serviceName:{}, 连接失败!!!", serviceName);
}
});
try {
channelFuture.sync().channel().closeFuture().sync();
} catch (Throwable e) {
log.error("initNettyClient===serviceName:{}, 发生异常, 信息:{}", serviceName, e);
} finally {
workerGroup.shutdownGracefully();
}
}).start();
}
log.info("initNettyClient===serviceName:{}, 开始等待初始化客户端完成", serviceName);
serviceName.intern().wait(5000L);
} catch (Exception e) {
log.error("initNettyClient===serviceName:{}, 初始化客户端发生异常, 信息:{}", serviceName, e);
}
}
}
private long getCurrentTimeSecond() {
return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
}
}
4.1.2 MyChannelInitializer
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
private static final UnorderedThreadPoolEventExecutor UNORDERED_THREAD_POOL_EVENT_EXECUTOR = new UnorderedThreadPoolEventExecutor(16);
private String serviceName;
public MyChannelInitializer(String serviceName) {
this.serviceName = serviceName;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new JsonObjectDecoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(UNORDERED_THREAD_POOL_EVENT_EXECUTOR,new MyConsumerHandler(this.serviceName));
}
}
4.1.3 MyConsumerHandler
@Slf4j
public class MyConsumerHandler extends SimpleChannelInboundHandler<String> {
private String serviceName;
private Map<String, SynchronousQueue<?>> resultConcurrentHashMap;
public MyConsumerHandler(String serviceName) {
this.serviceName = serviceName;
this.resultConcurrentHashMap = NettyConsumerContext.resultConcurrentHashMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("MyConsumerHandler===serviceName:{}, 服务端数据响应:{}", msg);
MessageProtocol messageProtocol = JSONUtil.toBean(msg, MessageProtocol.class);
if (null == messageProtocol) {
log.error("MyConsumerHandler===serviceName:{}, 发生异常:{}, 服务端发生空数据响应", serviceName);
return;
}
String requestId = messageProtocol.getRequestId();
SynchronousQueue<Object> synchronousQueue = (SynchronousQueue<Object>) = resultConcurrentHashMap.get(requestId);
if (synchronousQueue == null){
log.error("MyConsumerHandler===serviceName:{}, 发生异常:{}, 服务端数据响应, 已超时!!!", serviceName);
return;
}
resultConcurrentHashMap.remove(requestId);
synchronousQueue.put(messageProtocol.getInvokeResult());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConsumerContext.channelHandlerContextMap.put(serviceName, ctx);
synchronized (serviceName.intern()) {
serviceName.intern().notify();
}
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConsumerContext.channelHandlerContextMap.remove(ctx);
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
NettyConsumerContext.channelHandlerContextMap.remove(ctx);
log.error("MyConsumerHandler===serviceName:{}, 发生异常:{}", serviceName, cause);
super.exceptionCaught(ctx, cause);
}
}
4.1.4 NettyConsumerContext
@Slf4j
public class NettyConsumerContext {
public static final Map<String, ChannelHandlerContext> channelHandlerContextMap = new ConcurrentHashMap<>();
public static final Map<String, SynchronousQueue<?>> resultConcurrentHashMap = new ConcurrentHashMap<>();
public static final Map<String, NettyClient> nettyClientMap = new ConcurrentHashMap<>();
}
进行消费者测试
public class ConsumerStart {
public static void main(String[] args) {
System.out.println("假设SpringBoot项目启动中, 加载各类Bean");
System.out.println("===================================");
System.out.println("假设SpringBoot项目启动完成");
System.out.println("===================================");
MyRPCTest proxy = ConsumerFactory.getProxy(MyRPCTest.class);
System.out.println("假设注入MyRPCTest bean 完成");
System.out.println("进行服务调用");
System.out.println(proxy.hiHi("测试一波开始开始开始"));
}
}
服务提供者日志:
14:43:04.213 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyProduceRegistrationHandler===doRegistrationInfo, 服务信息注册:{"ip":"127.0.0.1","type":0,"serviceName":"myRPCProvider","port":7777}
14:43:04.307 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.serviceproduce.registration.handler.MyProduceRegistrationHandler - MyRegistrationHandler===读取到消息:{"result":"信息注册成功!"}
==========注册完成啦,大兄弟!=============
进行服务提供
============服务提供启动完成==============
14:43:04.335 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.rpc.serviceproduce.produce.MyProvide - MyProvide===服务提供者启动成功, 等待消费者访问!
14:43:11.405 [unorderedThreadPoolEventExecutor-5-2] INFO com.zhihao.netty.rpc.serviceproduce.produce.handler.ProvideChannelHandler - ProvideChannelHandler===, 开始处理客户端请求, 请求参数:MessageProtocol(className=myRPCTest, methodName=hiHi, methodParameterTypes=[class java.lang.String], methodParameter=[测试一波开始开始开始], requestId=ad39b4be-11aa-481b-bf56-50a8e19df4801656744191, invokeResult=null)
消费者客户端日志:
假设SpringBoot项目启动中, 加载各类Bean
===================================
假设SpringBoot项目启动完成
===================================
假设注入MyRPCTest bean 完成
进行服务调用
14:43:10.565 [Thread-0] INFO com.zhihao.netty.rpc.clientconsumer.registrationinfo.GetRegistrationInfo - GetRegistrationInfo===开始初始化!!!
14:43:11.328 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.clientconsumer.registrationinfo.handler.MyGetRegistrationHandler - MyGetRegistrationHandler===读取到信息:{"ip":"127.0.0.1","type":0,"serviceName":"myRPCProvider","port":7777}
14:43:11.354 [main] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - initNettyClient===serviceName:myRPCProvider, 开始等待初始化客户端完成
14:43:11.366 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - initNettyClient===serviceName:myRPCProvider, 连接成功!!!
14:43:11.366 [main] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - initNettyClient===serviceName:myRPCProvider, 初始化客户端结果:true
14:43:11.375 [main] INFO com.zhihao.netty.rpc.clientconsumer.consumer.NettyClient - doRemotelyInvoke===已经发送, jsonStr:{"methodName":"hiHi","methodParameter":["测试一波开始开始开始"],"className":"myRPCTest","requestId":"ad39b4be-11aa-481b-bf56-50a8e19df4801656744191","methodParameterTypes":["class java.lang.String"]}!!!!
14:43:11.425 [unorderedThreadPoolEventExecutor-4-2] INFO com.zhihao.netty.rpc.clientconsumer.consumer.handler.MyConsumerHandler - MyConsumerHandler===serviceName:{"methodName":"hiHi","methodParameter":["测试一波开始开始开始"],"className":"myRPCTest","requestId":"ad39b4be-11aa-481b-bf56-50a8e19df4801656744191","invokeResult":{"resultCode":0,"invokeResult":"成功进行了远程调用哟, 恭喜恭喜! 你的参数:测试一波开始开始开始===67e60a8d-14f0-43ea-8130-fef9ee48478e"},"methodParameterTypes":["class java.lang.String"]}, 服务端数据响应:{}
成功进行了远程调用哟, 恭喜恭喜! 你的参数:测试一波开始开始开始===67e60a8d-14f0-43ea-8130-fef9ee48478e
到此一个简单的RPC框架就完成了!!!
总结:
结论:
以上提到的只是 RPC 的基础流程,这对于工业级别的使用是远远不够的。 生产环境中的服务提供者都是集群部署的,所以有多个提供者,而且还会随着大促等流量情况动态增减机器。 调用者也能通过注册中心得知服务提供者下线。 还需要有路由分组策略,调用者根据下发的路由信息选择对应的服务提供者,能实现分组调用、灰度发布、流量隔离等功能。 还需要有负载均衡策略,一般经过路由过滤之后还是有多个服务提供者可以选择,通过负载均衡策略来达到流量均衡。 当然还需要有异常重试,毕竟网络是不稳定的,而且有时候某个服务提供者也可能出点问题,所以一次调用出错进行重试,较少业务的损耗。 还需要限流熔断,限流是因为服务提供者不知道会接入多少调用者,也不清楚每个调用者的调用量,所以需要衡量一下自身服务的承受值来进行限流,防止服务崩溃。 而熔断是为了防止下游服务故障导致自身服务调用超时阻塞堆积而崩溃,特别是调用链很长的那种,影响很大。
netty提供的获取异步结果的
客户端中的SynchronousQueue 可以使用netty中的 DefaultPromise 代替, 来获取异步线程执行结果!!
说明: DefaultPromise
用法:
DefaultPromise<Object> defaultPromise = new DefaultPromise<>(channelHandlerContext.executor());
resultConcurrentHashMap.put(requestId, defaultPromise);
take = (InvokeResult) defaultPromise.get(30L, TimeUnit.SECONDS);
DefaultPromise<Object> synchronousQueue = resultConcurrentHashMap.remove(requestId);
synchronousQueue.setSuccess(messageProtocol.getInvokeResult());
1
|