一、rpc-provider
自定义业务处理器 UserServiceHandler
/**
* 自定义的业务处理器
*/
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
//当客户端读取数据时,该方法会被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//注意: 客户端将来发送请求的时候会传递一个参数: UserService#sayHello#are you ok
//1.判断当前的请求是否符合规则
if(msg.toString().startsWith("UserService")){
//2.如果符合规则,调用实现类货到一个result
UserServiceImpl service = new UserServiceImpl();
String result = service.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#")+1));
//3.把调用实现类的方法获得的结果写到客户端
ctx.writeAndFlush(result);
}
}
}
服务提供类与连接Server端 UserServiceImpl
public class UserServiceImpl implements IUserService {
//将来客户单要远程调用的方法
public String sayHello(String msg) {
System.out.println("are you ok ? "+msg);
return "服务器返回数据 : "+msg;
}
//创建一个方法启动服务器
public static void startServer(String ip , int port) throws InterruptedException {
//1.创建两个线程池对象
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
//2.创建服务端的启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.配置启动引导对象
serverBootstrap.group(bossGroup,workGroup)
//设置通道为NIO
.channel(NioServerSocketChannel.class)
//创建监听channel
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//获取管道对象
ChannelPipeline pipeline = nioSocketChannel.pipeline();
//给管道对象pipeLine 设置编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//把我们自顶一个ChannelHander添加到通道中
pipeline.addLast(new UserServiceHandler());
}
});
//4.绑定端口
serverBootstrap.bind(8999).sync();
}
}
Server端启动类
public class ServerBoot {
public static void main(String[] args) throws InterruptedException {
//启动服务器
UserServiceImpl.startServer("127.0.0.1",8999);
}
}
二、rpc-consumer
自定义处理器 UserClientHandler
/**
* 自定义事件处理器
*/
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
//1.定义成员变量
private ChannelHandlerContext context; //事件处理器上下文对象 (存储handler信息,写操作)
private String result; // 记录服务器返回的数据
private String param; //记录将要返送给服务器的数据
//2.实现channelActive 客户端和服务器连接时,该方法就自动执行
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//初始化ChannelHandlerContext
this.context = ctx;
}
//3.实现channelRead 当我们读到服务器数据,该方法自动执行
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将读到的服务器的数据msg ,设置为成员变量的值
result = msg.toString();
notify();
}
//4.将客户端的数写到服务器
public synchronized Object call() throws Exception {
//context给服务器写数据
context.writeAndFlush(param);
wait();
return result;
}
//5.设置参数的方法
public void setParam(String param){
this.param = param;
}
}
服务消费者
/**
* 消费者
*/
public class RPCConsumer {
//1.创建一个线程池对象 -- 它要处理我们自定义事件
private static ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//2.声明一个自定义事件处理器 UserClientHandler
private static UserClientHandler userClientHandler;
//3.编写方法,初始化客户端 ( 创建连接池 bootStrap 设置bootstrap 连接服务器)
public static void initClient() throws InterruptedException {
//1) 初始化UserClientHandler
userClientHandler = new UserClientHandler();
//2)创建连接池对象
EventLoopGroup group = new NioEventLoopGroup();
//3)创建客户端的引导对象
Bootstrap bootstrap = new Bootstrap();
//4)配置启动引导对象
bootstrap.group(group)
//设置通道为NIO
.channel(NioSocketChannel.class)
//设置请求协议为TCP
.option(ChannelOption.TCP_NODELAY,true)
//监听channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取ChannelPipeline
ChannelPipeline pipeline = socketChannel.pipeline();
//设置编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//添加自定义事件处理器
pipeline.addLast(userClientHandler);
}
});
//5)连接服务端
bootstrap.connect("127.0.0.1",8999).sync();
}
//4.编写一个方法,使用JDK的动态代理创建对象
// serviceClass 接口类型,根据哪个接口生成子类代理对象; providerParam : "UserService#sayHello#"
public static Object createProxy(Class<?> serviceClass, final String providerParam){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
//1)初始化客户端cliet
if(userClientHandler == null){
initClient();
}
//2)给UserClientHandler 设置param参数
userClientHandler.setParam(providerParam+objects[0]);
//3).使用线程池,开启一个线程处理处理call() 写操作,并返回结果
Object result = executorService.submit(userClientHandler).get();
//4)return 结果
return result;
}
});
}
}
消费者启动类?ConsumerBoot
public class ConsumerBoot {
//参数定义
private static final String PROVIDER_NAME = "UserService#sayHello#";
public static void main(String[] args) throws InterruptedException {
//1.创建代理对象
IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);
//2.循环给服务器写数据
while (true){
String result = service.sayHello("are you ok !!");
System.out.println(result);
Thread.sleep(2000);
}
}
}
三,Common interface 公用接口模块
public interface IUserService {
public String sayHello(String smg);
}
|