一、rpc生产端 自定义注解为
/**
* @author :hzz
* @description:rpc服务端提供的注解
* @date :2022/1/2 10:47
*/
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
/**
* 注册的生产者发布名
* @return
*/
String ServerName() default "" ;
}
1、核心思路是实现 BeanPostProcessor,并自定义注解,注解用于定义实现类
/**
* @author :hzz
* @description:TODO
* @date :2022/1/2 11:42
*/
public class RpcServerRegistrarBeanPostProcesser implements BeanPostProcessor {
private RpcServerContext rpcServerContext;
public RpcServerRegistrarBeanPostProcesser(RpcServerContext rpcServerContext) {
this.rpcServerContext = rpcServerContext;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
RpcService annotation = bean.getClass().getAnnotation(RpcService.class);
if(annotation!=null) {
String serverName = annotation.ServerName();
if (serverName == null || "".equalsIgnoreCase(serverName.trim())) {
serverName = bean.getClass().getInterfaces()[0].getName();
}
rpcServerContext.addRpcServer(serverName, beanName);
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
}
2、注册中心直接使用redis,将扫描到的带有自定义rpc服务生产者的类对应的netty的ip+端口号注册到redis,并在服务本地缓存一份 3、启动netty服务端监听对应端口 4、使用方式
/**
* @author :hzz
* @description:TODO
* @date :2022/1/2 10:56
*/
@RpcService
public class MyServiceServerImpl implements MyServiceServer {
@Override
public String hello() {
return "hello";
}
}
二、消费端 自定义注解为
/**
* @author :hzz
* @description:rpc服务端的注入
* @date :2022/1/2 10:47
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAutowired {
/**
* 注册的生产者发布名
* @return
*/
String ServerName() default "" ;
}
1、核心思路是实现InstantiationAwareBeanPostProcessor接口,并实现 postProcessProperties方法,这个方法是在spring生命周期中填充属性的时候执行的回调,这个时候可以对这个bean的属性参数进行修改,这边主要是判断Bean的属性中有没有自定义注解为RpcAutowired的参数,这块代码可以参考jetcache的写法
package com.example.hzzrpcclient.config;
import com.example.hzzrpcclient.config.annotations.RpcAutowired;
import com.example.hzzrpcclient.config.netty.NettyMainClient;
import com.example.hzzrpcclient.config.registercenter.RedisRegisterCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.InjectionMetadata;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author :hzz
* @description:TODO
* @date :2022/1/2 21:56
*/
@Slf4j
@Configuration
public class RpcClientBeanPostProcesser implements InstantiationAwareBeanPostProcessor, BeanFactoryAware {
@Autowired
private RedisRegisterCenter redisRegisterCenter;
@Autowired
private NettyMainClient nettyMainClient;
private ConfigurableListableBeanFactory beanFactory;
private final Map<String, InjectionMetadata> injectionMetadataCache = new ConcurrentHashMap<String, InjectionMetadata>();
@Override
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName)
throws BeanCreationException {
InjectionMetadata metadata = findAutowiringMetadata(beanName, bean.getClass(), pvs);
try {
metadata.inject(bean, beanName, pvs);
} catch (BeanCreationException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanCreationException(beanName, "Injection of autowired dependencies failed", ex);
}
return pvs;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
if (!(beanFactory instanceof ConfigurableListableBeanFactory)) {
throw new IllegalArgumentException(
"AutowiredAnnotationBeanPostProcessor requires a ConfigurableListableBeanFactory");
}
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
private InjectionMetadata findAutowiringMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
// Fall back to class name as cache key, for backwards compatibility with custom callers.
String cacheKey = (StringUtils.hasLength(beanName) ? beanName : clazz.getName());
// Quick check on the concurrent map first, with minimal locking.
InjectionMetadata metadata = this.injectionMetadataCache.get(cacheKey);
if (InjectionMetadata.needsRefresh(metadata, clazz)) {
synchronized (this.injectionMetadataCache) {
metadata = this.injectionMetadataCache.get(cacheKey);
if (InjectionMetadata.needsRefresh(metadata, clazz)) {
try {
metadata = buildAutowiringMetadata(clazz);
this.injectionMetadataCache.put(cacheKey, metadata);
} catch (NoClassDefFoundError err) {
throw new IllegalStateException("Failed to introspect bean class [" + clazz.getName() +
"] for autowiring metadata: could not find class that it depends on", err);
}
}
}
}
return metadata;
}
private InjectionMetadata buildAutowiringMetadata(final Class<?> clazz) {
LinkedList<InjectionMetadata.InjectedElement> elements = new LinkedList<InjectionMetadata.InjectedElement>();
Class<?> targetClass = clazz;
do {
final LinkedList<InjectionMetadata.InjectedElement> currElements =
new LinkedList<InjectionMetadata.InjectedElement>();
doWithLocalFields(targetClass, new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
RpcAutowired ann = field.getAnnotation(RpcAutowired.class);
if (ann != null) {
if (Modifier.isStatic(field.getModifiers())) {
if (log.isWarnEnabled()) {
log.warn("RpcClient Autowired annotation is not supported on static fields: " + field);
}
return;
}
currElements.add(new AutowiredFieldElement(field, ann));
}
}
});
elements.addAll(0, currElements);
targetClass = targetClass.getSuperclass();
}
while (targetClass != null && targetClass != Object.class);
return new InjectionMetadata(clazz, elements);
}
private void doWithLocalFields(Class clazz, ReflectionUtils.FieldCallback fieldCallback) {
Field fs[] = clazz.getDeclaredFields();
for (Field field : fs) {
try {
fieldCallback.doWith(field);
} catch (IllegalAccessException ex) {
throw new IllegalStateException("Not allowed to access field '" + field.getName() + "': " + ex);
}
}
}
private class AutowiredFieldElement extends InjectionMetadata.InjectedElement {
private Field field;
private RpcAutowired ann;
public AutowiredFieldElement(Field field, RpcAutowired ann) {
super(field, null);
this.field = field;
this.ann = ann;
}
@Override
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
RpcClientProxy rpcClientProxy=new RpcClientProxy(redisRegisterCenter,nettyMainClient);
Object o = Proxy.newProxyInstance(field.getType().getClassLoader(),
new Class[]{field.getType()}, rpcClientProxy);
field.setAccessible(true);
field.set(bean, o);
}
}
}
2、注入的属性应该是一个代理类,代理类里面使用netty客户端进行链接并发送数据,并使用雪花算法创建一个唯一id,在发送完数据之后进行线程自旋并阻塞等待服务端的消息发送过来
/**
* @author :hzz
* @description:TODO
* @date :2022/1/2 22:14
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private RedisRegisterCenter redisRegisterCenter;
private NettyMainClient nettyMainClient;
public RpcClientProxy(RedisRegisterCenter redisRegisterCenter, NettyMainClient nettyMainClient) {
this.redisRegisterCenter = redisRegisterCenter;
this.nettyMainClient = nettyMainClient;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();//方法名,hello()
String serverName = method.getDeclaringClass().getName();//接口名:MyServiceServer,用来寻找服务
RpcServerDto rpcServerDto = redisRegisterCenter.searchRpcServer(serverName);
Channel connect = nettyMainClient.connect(rpcServerDto.getIp(), rpcServerDto.getPort());
long start = System.currentTimeMillis();
String msgId = UUID.randomUUID().toString();
connect.writeAndFlush(new String( msgId ).getBytes()).addListener(
future -> {
long time = System.currentTimeMillis() - start;
if (!future.isSuccess()) {
log.error("下发失败,msgId=[{}],耗时[{}]ms", msgId, time);
return;
}
if (time > 20000) {
log.warn("指令操作=[{}],下发异常耗时,msgId=[{}],耗时[{}]ms", 20000, msgId, time);
return;
}
log.info("下发成功,msgId=[{}],耗时[{}]ms", msgId, time);
});
// 阻塞
GuardedObject<String> go = GuardedObject.create(msgId);
String resPayloadPacket = go.getAndThrow(res ->{
return (res.indexOf(msgId)>-1);
}, 50000, TimeUnit.MILLISECONDS);
return resPayloadPacket;
}
}
3、使用方式
@Service
public class RpcClientServiceimpl {
@RpcAutowired
private MyServiceServer myServiceServer;
public String hello(){
long s = System.currentTimeMillis(); //获取开始时间
String hello = myServiceServer.hello();
long e = System.currentTimeMillis(); //获取结束时间
System.out.println(hello+"通过rpc远程调用获取到的值"+"用时:" + (e - s) + "ms");
return hello;
}
}
三、接口服务
gitee地址,写了2个多小时,后面直接偷懒累了,主要目的是学习spring。。。。所以代码很烂,所以就没有地址了
|