?服务器: Spring boot 启动服务 ( http://127.0.0.1:8080/ )
package org.kayla.rpcfx.provider;
/**
* @author Kayla(J - doIt)
* @date 2021/11/25 23:39
**/
@RestController
@SpringBootApplication
@EnableAspectJAutoProxy
@Slf4j
public class RpcfxServerApplication {
public static void main(String[] args) {
SpringApplication.run(RpcfxServerApplication.class, args);
}
@Autowired
RpcfxInvoker invoker;
@PostMapping("/")
public RpcfxResponse invoke(@RequestBody RpcfxRequest request) {
return invoker.invoke(request);
}
}
客户端:?
package org.kayla.rpcfx.core.client.netty.client;
/**
* @author Kayla(J - doIt)
* @date 2021/11/25 23:39
**/
@Slf4j
public class ClientBootStrap {
static final String HOST = "127.0.0.1";
static final int PORT = 8080;
public static void main(String[] args) throws Exception {
//创建reactor 线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//1 设置reactor 线程组
b.group(workerGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(HOST, PORT);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
//5 装配通道流水线
b.handler(new ClientInitializer());
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess()) {
log.info("EchoClient客户端连接成功!");
} else {
log.info("EchoClient客户端连接失败!");
}
});
// 阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
RpcfxRequest request = new RpcfxRequest();
request.setServiceClass("org.kayla.rpcfx.api.UserService");
request.setMethod("findById");
request.setParams(new Object[]{1});
String reqJson = JSON.toJSONString(request);
DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
"/");
req.setDecoderResult(DecoderResult.SUCCESS);
req.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json;charset=UTF-8");
req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
// req.headers().add(HttpHeaderNames.ACCEPT, "application/json");
req.headers().add(HttpHeaderNames.HOST, "127.0.0.1:8080");
ByteBuf buffer = req.content().clear();
int p0 = buffer.writerIndex();
buffer.writeBytes(reqJson.getBytes());
int p1 = buffer.writerIndex();
int i = buffer.readableBytes();
System.out.println("buffer.readableBytes(): " + i);
System.out.println("p1 - p0: " + (p1 - p0) );
// req.headers().add(HttpHeaderNames.CONTENT_LENGTH, p1 - p0);
req.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
channel.writeAndFlush(req).sync();
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package org.kayla.rpcfx.core.client.netty.client;
/**
* @author Kayla(J - doIt)
* @date 2021/11/25 23:39
**/
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
ChannelPipeline pipeline = sh.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ClientInboundHandler());
pipeline.addLast(new ClientOutboundHandler());
}
}
@Slf4j
class ClientInboundHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
log.info("channelRead0");
ByteBuf content = msg.content();
int len = content.readableBytes();
byte[] arr = new byte[len];
content.getBytes(0, arr);
log.info(new String(arr, "UTF-8"));
}
}
@Slf4j
class ClientOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("write");
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
ByteBuf content = request.content();
int len = content.readableBytes();
byte[] arr = new byte[len];
content.getBytes(0, arr);
log.info(new String(arr, "UTF-8"));
}
super.write(ctx, msg, promise);
}
}
http - Netty 5 sending JSON POST request - Stack Overflow
|