netty基础_02.Netty 快速入门实例 - TCP 服务
Netty 快速入门实例 - TCP 服务
实例要求:使用 IDEA 创建 Netty 项目
Netty 服务器在 6668 端口监听,客户端能发送消息给服务器"hello,服务器~"- 服务器可以回复消息给客户端"hello,客户端~"
- 目的:对
Netty 线程模型有一个初步认识,便于理解 Netty 模型理论 -
- 编写服务端
- 编写客户端
- 对
netty 程序进行分析,看看 netty 模型特点 - 说明:创建
Maven 项目,并引入 Netty 包 - 代码如下
NettyServer
package com.atguigu.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户socketchannel hashcode=" + ch.hashCode());
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println(".....服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
package com.atguigu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyClient
package com.atguigu.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
NettyClientHandler
package com.atguigu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
任务队列中的 Task 有 3 种典型使用场景
- 用户程序自定义的普通任务【举例说明】
- 用户自定义定时任务
- 非当前
Reactor 线程调用 Channel 的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费
前两种的代码举例:
package com.atguigu.netty.simple;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
});
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
System.out.println("go on ...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
方案再说明
Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 Selector ,用于监听绑定在其上的 socket 网络通道。NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
-
NioEventLoopGroup 下包含多个 NioEventLoop -
每个 NioEventLoop 中包含有一个 Selector ,一个 taskQueue -
每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel -
每个 NioChannel 只会绑定在唯一的 NioEventLoop 上 -
每个 NioChannel 都绑定有一个自己的 ChannelPipeline
异步模型
基本介绍
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会首先简单的返回一个 ChannelFuture 。- 调用者并不能立刻获得结果,而是通过
Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。 Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future ,它的核心思想是:假设一个方法 fun ,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future ,后续可以通过 Future 去监控方法 fun 的处理过程(即:Future-Listener 机制)
Future 说明
- 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等等。
ChannelFuture 是一个接口:public interface ChannelFuture extends Future<Void> 我们可以添加监听器,当监听的事件发生时,就会通知到监听器。
工作原理示意图
下面第一张图就是管道,中间会经过多个handler
说明:
- 在使用
Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用 future 即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。 Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。
Future-Listener 机制
这里看不懂的可以看笔者的并发系列-JUC部分
- 当
Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。 - 常见有如下操作
- 通过
isDone 方法来判断当前操作是否完成; - 通过
isSuccess 方法来判断已完成的当前操作是否成功; - 通过
getCause 方法来获取已完成的当前操作失败的原因; - 通过
isCancelled 方法来判断已完成的当前操作是否被取消; - 通过
addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器
举例说明 演示:绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete (ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口6668成功");
} else {
System.out.println("监听端口6668失败");
}
}
});
|