一、Consumer
1.启动程序
public class ConsumerBootStrap {
EventLoopGroup consumer;
Bootstrap bootstrap;
public ConsumerBootStrap() {
consumer = new NioEventLoopGroup();
bootstrap = new Bootstrap();
}
public void boot(String hostName, int port) {
boot0(hostName, port);
}
private void boot0(String hostName, int port) {
ChannelFuture channelFuture = bootstrap.group(consumer)
.handler(new RpcChannelHandlerInitializerClient())
.channel(NioSocketChannel.class)
.connect(hostName, port);
try {
channelFuture.sync();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
2.协议解码
public class RpcConsumerDecoder extends ReplayingDecoder<RpcMessage.RpcMessagePojo> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Charset charset = CharsetUtil.UTF_8;
RpcMessage.RpcMessagePojo.Builder builder = RpcMessage.RpcMessagePojo.newBuilder();
RpcMessage.Response.Builder responseBuilder = RpcMessage.Response.newBuilder();
builder.setMessageType(RpcMessage.RpcMessagePojo.MessageType.RESPONSE);
int responseHeaderSize = in.readInt();
String string=in.readBytes(responseHeaderSize).toString(charset);
responseBuilder.setResponseHeader(string);
int descriptionSize = in.readInt();
string=in.readBytes(descriptionSize).toString(charset);
System.out.println(string+"----");
responseBuilder.setDescription(string);
int resultSize = in.readInt();
responseBuilder.setResult(in.readBytes(resultSize).toString(charset));
out.add(builder.setResponse(responseBuilder.build()).build());
}
}
3、业务handler
public class RpcConsumerHandler extends SimpleChannelInboundHandler<RpcMessage.RpcMessagePojo> implements Callable<RpcMessage.RpcMessagePojo> {
ChannelHandlerContext ctx;
RpcMessage.RpcMessagePojo request;
RpcMessage.RpcMessagePojo response;
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, RpcMessage.RpcMessagePojo msg) throws Exception {
response = msg;
notify();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("active");
this.ctx = ctx;
}
@Override
public synchronized RpcMessage.RpcMessagePojo call() throws Exception {
ctx.writeAndFlush(request);
wait();
return response;
}
public void setRequest(RpcMessage.RpcMessagePojo request) {
this.request = request;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("register");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("Writability");
}
}
4、高层装类
public class Consumer {
public Consumer (String hostName,int port)
{
ConsumerBootStrap consumerBootStrap=new ConsumerBootStrap();
consumerBootStrap.boot(hostName,port);
}
void doFun(Object[] parameters, Class<?> clazz, Method method,Class<?> parameterTypes, Class<?>... interfaces) {
}
public RpcMessage.RpcMessagePojo doFun(RpcMessage.RpcMessagePojo pojo, Object target) {
InvocationHandle handle = new InvocationHandle(target);
UserService userService = (UserService) ProxyFactory.getProxy(ClassLoader.getSystemClassLoader(), handle, UserService.class);
return userService.login(pojo);
}
}
5、代理工厂
public class ProxyFactory<T> {
public static Object getProxy(ClassLoader classLoader, InvocationHandle invocationHandle, Class<?>... interfaces) {
return Proxy.newProxyInstance(classLoader, interfaces, invocationHandle);
}
}
6、真实代理对象生成
public class InvocationHandle implements InvocationHandler {
Object target;
ExecutorService executors = Executors.newFixedThreadPool(NettyRuntime.availableProcessors());
RpcConsumerHandler client = RpcChannelHandlerInitializerClient.handler;
public InvocationHandle(Object target) {
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("do proxy");
client.setRequest((RpcMessage.RpcMessagePojo) args[0]);
RpcMessage.RpcMessagePojo response = executors.submit(client).get();
return response;
}
}
7、handlerChain初始化
public class RpcChannelHandlerInitializerClient extends ChannelInitializer<SocketChannel> {
public static RpcConsumerHandler handler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
handler=new RpcConsumerHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcConsumerDecoder());
pipeline.addLast(handler);
}
}
8、测试类
public class NettyConsumerRpcTest {
public static void main(String[] args) throws InstantiationException, IllegalAccessException {
RpcMessage.RpcMessagePojo request= RpcMessage
.RpcMessagePojo.newBuilder()
.setRequest(RpcMessage.RpcRequest.newBuilder().setRequestHeader("0x8848")
.setReqClassName("")
.setMethodName("login")
.addParameterTypes(String.class.getName())
.addParameterTypes(Integer.class.getName())
.addParameters("hey lxc")
.addParameters("123456.aas")
.build())
.setMessageType(RpcMessage.RpcMessagePojo.MessageType.REQUEST)
.build();
Consumer consumer=new Consumer("127.0.0.1",9920);
RpcMessage.RpcMessagePojo response = consumer.doFun(request, new UserServiceImpl());
System.out.println(response.getResponse().getResult());
System.out.println(response.getResponse().getDescription());
System.out.println(response);
}
}
二、provider
1、启动程序
public class ServerBootStrap {
ServerBootstrap serverBootStrap;
EventLoopGroup acceptor;
EventLoopGroup worker;
public ServerBootStrap() {
serverBootStrap=new ServerBootstrap();
acceptor=new NioEventLoopGroup();
worker=new NioEventLoopGroup();
}
public void boot(String hostName,int port)
{
boot0(hostName,port,NioServerSocketChannel.class);
}
private void boot0(String hostName, int port, Class<? extends ServerChannel> channel) {
try {
ChannelFuture channelFuture = serverBootStrap.group(acceptor, worker)
.channel(channel)
.childHandler(new RpcChannelHandlerInitializer())
.bind(hostName, port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
} finally {
acceptor.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
3、协议解码
public class RpcProviderDecoder extends ReplayingDecoder<RpcMessage.RpcMessagePojo> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RpcMessage.RpcMessagePojo.Builder builder = RpcMessage.RpcMessagePojo.newBuilder();
RpcMessage.RpcRequest.Builder requestBuilder = RpcMessage.RpcRequest.newBuilder();
Charset charset = CharsetUtil.UTF_8;
int requestHeaderSize = in.readInt();
requestBuilder.setRequestHeader(in.readBytes(requestHeaderSize).toString(charset));
int classNameSize = in.readInt();
requestBuilder.setReqClassName(in.readBytes(classNameSize).toString(charset));
int methodNameSize = in.readInt();
requestBuilder.setMethodName(in.readBytes(methodNameSize).toString(charset));
int parameterTypesSize = in.readInt();
String string = in.readBytes(parameterTypesSize).toString(charset);
requestBuilder.addAllParameterTypes(Arrays.asList(string.replace("[", "").replace("]", "").trim().split(",")));
int parametersSize = in.readInt();
string = in.readBytes(parametersSize).toString(charset);
requestBuilder.addAllParameters(Arrays.asList(string.replace("[", "").replace("]", "").trim().split(",")));
out.add(builder.setRequest(requestBuilder.build()).build());
}
}
4、handlerChain初始化
public class RpcChannelHandlerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcEncoder());
pipeline.addLast(new RpcProviderDecoder());
pipeline.addLast(new RpcProviderHandler());
}
}
6、测试类
public class NettyProviderRpcTest {
public static void main(String[] args) throws ClassNotFoundException {
ServerBootStrap serverBootStrap=new ServerBootStrap();
serverBootStrap.boot("127.0.0.1",9920);
}
}
7、动态代理
这里因为不会、省略了,是可以用cgli基于二进制去根据类名从类加载器生成的
三、通用类
1、协议编码handler
package handler;
import com.google.protobuf.ProtocolStringList;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import protobuf.RpcMessage;
import java.nio.charset.Charset;
public class RpcEncoder extends MessageToByteEncoder<RpcMessage.RpcMessagePojo> {
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage.RpcMessagePojo msg, ByteBuf out) throws Exception {
Charset charset = CharsetUtil.UTF_8;
if(msg.getMessageType().equals(RpcMessage.RpcMessagePojo.MessageType.REQUEST)) {
RpcMessage.RpcRequest request = msg.getRequest();
out.writeInt(request.getRequestHeader().getBytes(charset).length);
out.writeBytes(request.getRequestHeader().getBytes(charset));
out.writeInt(request.getReqClassName().getBytes(charset).length);
out.writeBytes(request.getReqClassName().getBytes(charset));
out.writeInt(request.getMethodName().getBytes().length);
out.writeBytes(request.getMethodName().getBytes());
ProtocolStringList requestParameterTypesList = request.getParameterTypesList();
out.writeInt(requestParameterTypesList.toString().getBytes(charset).length);
out.writeBytes(requestParameterTypesList.toString().getBytes(charset));
out.writeInt(request.getParametersList().toString().getBytes(charset).length);
out.writeBytes(request.getParametersList().toString().getBytes(charset));
}
else {
RpcMessage.Response response=msg.getResponse();
out.writeInt(response.getResponseHeader().getBytes(charset).length);
out.writeBytes(response.getResponseHeader().getBytes(charset));
out.writeInt(response.getDescription().getBytes(charset).length);
out.writeBytes(response.getDescription().getBytes(charset));
out.writeInt(response.getResult().getBytes(charset).length);
out.writeBytes(response.getResult().getBytes(charset));
}
}
}
2、服务接口
public interface UserService {
RpcMessage.RpcMessagePojo login(RpcMessage.RpcMessagePojo msg);
}
3、服务实现类
public class UserServiceImpl implements UserService {
@Override
public RpcMessage.RpcMessagePojo login(RpcMessage.RpcMessagePojo msg) {
return
RpcMessage.RpcMessagePojo.newBuilder()
.setResponse(RpcMessage.Response.newBuilder()
.setResult("成功")
.setDescription("账号密码验证通过")
.setResponseHeader("0xff1")
.build())
.setMessageType(RpcMessage.RpcMessagePojo.MessageType.RESPONSE)
.build();
}
}
4、协议结构
syntax = "proto3";
option java_package = 'protobuf';
option java_outer_classname = "RpcMessage";
//编译器的版本和依赖的版本必须保持一致
message RpcRequest {
//请求头
string request_header = 1;
string req_class_name = 2;
string method_name = 3;
//参数类型
repeated string parameter_types = 4;
// 参数列表
repeated string parameters = 5;
int32 header_size=6;
int32 class_name_size=7;
int32 method_name_size=8;
int32 parameter_types_size=9;
int32 parameters_size=10;
}
message Response
{
string response_header = 1;
string description = 2;
string result = 3;
int32 response_header_size=4;
int32 description_size=5;
int32 result_size=6;
}
message RpcMessagePojo
{
enum MessageType{
REQUEST=0;
RESPONSE=1;
}
MessageType message_type = 1;
oneof message_body{
Response response = 2;
RpcRequest request = 3;
}
}
四、测试结果
-
启动这两个 -
-
Provider控制台
这里输出的是请求体
- Consumer控制台
-
- 这里输出的是响应体
- 不知道为什么直接输出对象输出的是码元字符串
- 但是使用get方法拿取是可以获得正确结果的(一度怀疑解码器错了)
|