netty server:
public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup=new NioEventLoopGroup(1);//处理连接请求
EventLoopGroup workerGroup=new NioEventLoopGroup();//默认线程数量为cpu核数的两倍,处理业务
try {
ServerBootstrap bootstrap=new ServerBootstrap();//创建服务器端的启动对象
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start");
//启动服务器绑定端口,bind是异步操作,sync是等待
ChannelFuture cf=bootstrap.bind(9000).sync();
cf.channel().closeFuture().sync();
System.out.println("******************server close");
}catch (Exception e){
e.printStackTrace();
}
}
}
NettyServerHandler:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server 读取数据线程"+Thread.currentThread().getName());
ByteBuf byteBuf= (ByteBuf) msg;
System.out.println("收到客户端发送的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf byteBuf= Unpooled.copiedBuffer("Hello Client".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(byteBuf);
}
/**
* 处理异常一般是关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyClient
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group=new NioEventLoopGroup();
try {
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new NettyClientHandler());//
}
});
System.out.println("netty client start");
//启动客户端连接服务器
ChannelFuture cf =bootstrap.connect("127.0.0.1",9000).sync();
//关闭通道进行监听
cf.channel().closeFuture().sync();
System.out.println("********************client close");
} finally {
group.shutdownGracefully();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接到服务端是触发
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf byteBuf= Unpooled.copiedBuffer("Hello Server".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(byteBuf);
}
/**
* 读取服务端发送的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf= (ByteBuf) msg;
System.out.println("收到服务端发送的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务端的地址:"+ctx.channel().remoteAddress());
}
/**
* 处理异常一般是关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
|