IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 基于springboot+netty自定义rpc框架(starter组件方式) -> 正文阅读

[网络协议]基于springboot+netty自定义rpc框架(starter组件方式)

一、rpc生产端
自定义注解为

/**
 * @author :hzz
 * @description:rpc服务端提供的注解
 * @date :2022/1/2 10:47
 */
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    /**
     * 注册的生产者发布名
     * @return
     */
    String ServerName() default "" ;
}

1、核心思路是实现 BeanPostProcessor,并自定义注解,注解用于定义实现类

/**
 * @author :hzz
 * @description:TODO
 * @date :2022/1/2 11:42
 */
public class RpcServerRegistrarBeanPostProcesser  implements BeanPostProcessor {




    private RpcServerContext    rpcServerContext;


    public RpcServerRegistrarBeanPostProcesser(RpcServerContext rpcServerContext) {
        this.rpcServerContext = rpcServerContext;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        RpcService annotation = bean.getClass().getAnnotation(RpcService.class);
        if(annotation!=null) {
            String serverName = annotation.ServerName();
            if (serverName == null || "".equalsIgnoreCase(serverName.trim())) {
                serverName = bean.getClass().getInterfaces()[0].getName();
            }
            rpcServerContext.addRpcServer(serverName, beanName);
        }
        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }


}

2、注册中心直接使用redis,将扫描到的带有自定义rpc服务生产者的类对应的netty的ip+端口号注册到redis,并在服务本地缓存一份
3、启动netty服务端监听对应端口
4、使用方式

/**
 * @author :hzz
 * @description:TODO
 * @date :2022/1/2 10:56
 */
@RpcService
public  class MyServiceServerImpl implements MyServiceServer {
    @Override
    public String hello() {
        return "hello";
    }
}

二、消费端
自定义注解为

/**
 * @author :hzz
 * @description:rpc服务端的注入
 * @date :2022/1/2 10:47
 */
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAutowired {

    /**
     * 注册的生产者发布名
     * @return
     */
    String ServerName() default "" ;
}

1、核心思路是实现InstantiationAwareBeanPostProcessor接口,并实现
postProcessProperties方法,这个方法是在spring生命周期中填充属性的时候执行的回调,这个时候可以对这个bean的属性参数进行修改,这边主要是判断Bean的属性中有没有自定义注解为RpcAutowired的参数,这块代码可以参考jetcache的写法

package com.example.hzzrpcclient.config;

import com.example.hzzrpcclient.config.annotations.RpcAutowired;
import com.example.hzzrpcclient.config.netty.NettyMainClient;
import com.example.hzzrpcclient.config.registercenter.RedisRegisterCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.InjectionMetadata;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author :hzz
 * @description:TODO
 * @date :2022/1/2 21:56
 */
@Slf4j
@Configuration
public class RpcClientBeanPostProcesser implements InstantiationAwareBeanPostProcessor, BeanFactoryAware {

    @Autowired
    private RedisRegisterCenter redisRegisterCenter;

    @Autowired
    private NettyMainClient nettyMainClient;

    private ConfigurableListableBeanFactory beanFactory;

    private final Map<String, InjectionMetadata> injectionMetadataCache = new ConcurrentHashMap<String, InjectionMetadata>();

    @Override
    public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName)
            throws BeanCreationException {
        InjectionMetadata metadata = findAutowiringMetadata(beanName, bean.getClass(), pvs);
        try {
            metadata.inject(bean, beanName, pvs);
        } catch (BeanCreationException ex) {
            throw ex;
        } catch (Throwable ex) {
            throw new BeanCreationException(beanName, "Injection of autowired dependencies failed", ex);
        }
        return pvs;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        if (!(beanFactory instanceof ConfigurableListableBeanFactory)) {
            throw new IllegalArgumentException(
                    "AutowiredAnnotationBeanPostProcessor requires a ConfigurableListableBeanFactory");
        }
        this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }


    private InjectionMetadata findAutowiringMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
        // Fall back to class name as cache key, for backwards compatibility with custom callers.
        String cacheKey = (StringUtils.hasLength(beanName) ? beanName : clazz.getName());
        // Quick check on the concurrent map first, with minimal locking.
        InjectionMetadata metadata = this.injectionMetadataCache.get(cacheKey);
        if (InjectionMetadata.needsRefresh(metadata, clazz)) {
            synchronized (this.injectionMetadataCache) {
                metadata = this.injectionMetadataCache.get(cacheKey);
                if (InjectionMetadata.needsRefresh(metadata, clazz)) {
                    try {
                        metadata = buildAutowiringMetadata(clazz);
                        this.injectionMetadataCache.put(cacheKey, metadata);
                    } catch (NoClassDefFoundError err) {
                        throw new IllegalStateException("Failed to introspect bean class [" + clazz.getName() +
                                "] for autowiring metadata: could not find class that it depends on", err);
                    }
                }
            }
        }
        return metadata;
    }


    private InjectionMetadata buildAutowiringMetadata(final Class<?> clazz) {
        LinkedList<InjectionMetadata.InjectedElement> elements = new LinkedList<InjectionMetadata.InjectedElement>();
        Class<?> targetClass = clazz;

        do {
            final LinkedList<InjectionMetadata.InjectedElement> currElements =
                    new LinkedList<InjectionMetadata.InjectedElement>();

            doWithLocalFields(targetClass, new ReflectionUtils.FieldCallback() {
                @Override
                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                    RpcAutowired ann = field.getAnnotation(RpcAutowired.class);
                    if (ann != null) {
                        if (Modifier.isStatic(field.getModifiers())) {
                            if (log.isWarnEnabled()) {
                                log.warn("RpcClient Autowired annotation is not supported on static fields: " + field);
                            }
                            return;
                        }
                        currElements.add(new AutowiredFieldElement(field, ann));
                    }
                }
            });

            elements.addAll(0, currElements);
            targetClass = targetClass.getSuperclass();
        }
        while (targetClass != null && targetClass != Object.class);

        return new InjectionMetadata(clazz, elements);
    }


    private void doWithLocalFields(Class clazz, ReflectionUtils.FieldCallback fieldCallback) {
        Field fs[] = clazz.getDeclaredFields();
        for (Field field : fs) {
            try {
                fieldCallback.doWith(field);
            } catch (IllegalAccessException ex) {
                throw new IllegalStateException("Not allowed to access field '" + field.getName() + "': " + ex);
            }
        }
    }


    private class AutowiredFieldElement extends InjectionMetadata.InjectedElement {

        private Field field;
        private RpcAutowired ann;

        public AutowiredFieldElement(Field field, RpcAutowired ann) {
            super(field, null);
            this.field = field;
            this.ann = ann;
        }

        @Override
        protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
            RpcClientProxy rpcClientProxy=new RpcClientProxy(redisRegisterCenter,nettyMainClient);
            Object o = Proxy.newProxyInstance(field.getType().getClassLoader(),
                    new Class[]{field.getType()}, rpcClientProxy);
            field.setAccessible(true);
            field.set(bean, o);
        }
    }
}

2、注入的属性应该是一个代理类,代理类里面使用netty客户端进行链接并发送数据,并使用雪花算法创建一个唯一id,在发送完数据之后进行线程自旋并阻塞等待服务端的消息发送过来

/**
 * @author :hzz
 * @description:TODO
 * @date :2022/1/2 22:14
 */
@Slf4j
public class RpcClientProxy implements InvocationHandler {

    private RedisRegisterCenter redisRegisterCenter;

    private NettyMainClient nettyMainClient;

    public RpcClientProxy(RedisRegisterCenter redisRegisterCenter, NettyMainClient nettyMainClient) {
        this.redisRegisterCenter = redisRegisterCenter;
        this.nettyMainClient = nettyMainClient;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();//方法名,hello()
        String serverName = method.getDeclaringClass().getName();//接口名:MyServiceServer,用来寻找服务
        RpcServerDto rpcServerDto = redisRegisterCenter.searchRpcServer(serverName);
        Channel connect = nettyMainClient.connect(rpcServerDto.getIp(), rpcServerDto.getPort());
        long start = System.currentTimeMillis();

        String msgId = UUID.randomUUID().toString();
        connect.writeAndFlush(new String( msgId ).getBytes()).addListener(
                future -> {
                    long time = System.currentTimeMillis() - start;
                    if (!future.isSuccess()) {
                        log.error("下发失败,msgId=[{}],耗时[{}]ms", msgId, time);
                        return;
                    }
                    if (time > 20000) {
                        log.warn("指令操作=[{}],下发异常耗时,msgId=[{}],耗时[{}]ms", 20000, msgId, time);
                        return;
                    }
                    log.info("下发成功,msgId=[{}],耗时[{}]ms", msgId, time);
                });

        // 阻塞
        GuardedObject<String> go = GuardedObject.create(msgId);
        String resPayloadPacket = go.getAndThrow(res ->{
            return (res.indexOf(msgId)>-1);
        }, 50000, TimeUnit.MILLISECONDS);

        return resPayloadPacket;
    }

}

3、使用方式

@Service
public class RpcClientServiceimpl {

    @RpcAutowired
    private MyServiceServer myServiceServer;



    public String hello(){
          long s = System.currentTimeMillis();   //获取开始时间

        String hello = myServiceServer.hello();
        long e = System.currentTimeMillis(); //获取结束时间

        System.out.println(hello+"通过rpc远程调用获取到的值"+"用时:" + (e - s) + "ms");
        return hello;
    }
}

三、接口服务
在这里插入图片描述

gitee地址,写了2个多小时,后面直接偷懒累了,主要目的是学习spring。。。。所以代码很烂,所以就没有地址了

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-01-04 13:46:39  更:2022-01-04 13:48:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 11:28:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码