1. 导入Netty的Maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
2. 建立Netty聊天服务端
package com.wsp.train.netty.group;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyChatServer {
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(60, 120, 180, TimeUnit.SECONDS));
ch.pipeline().addLast(new NettyChatServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
System.out.println("服务器启动完成");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
3. 实现服务端处理类
package com.wsp.train.netty.group;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
public class NettyChatServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup chatChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketAddress socketAddress = ctx.channel().remoteAddress();
System.out.println(sdf.format(new Date())+" "+socketAddress+" 上线了!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(sdf.format(new Date())+ctx.channel().remoteAddress()+" 下线了!");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = "";
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
default:
break;
}
System.out.println(sdf.format(new Date())+ctx.channel().remoteAddress()+" 超时事件为----"+eventType);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel chatChannel = ctx.channel();
String msg = sdf.format(new Date())+"【客户端】--" + chatChannel.remoteAddress() + " 已加入聊天组\n";
chatChannelGroup.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes()));
chatChannelGroup.add(chatChannel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String msg = sdf.format(new Date())+"【客户端】" + channel.remoteAddress() + " 离开聊天了\n";
chatChannelGroup.writeAndFlush(msg.getBytes());
System.out.println("当前channelGroup大小为 "+chatChannelGroup.size());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
ByteBuf byteBuf = (ByteBuf) msg;
String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
chatChannelGroup.forEach(ch -> {
if (ch != channel) {
byte[] response = (sdf.format(new Date()) + "【客户】" + channel.remoteAddress() + " 发送消息: " + msgStr).getBytes(StandardCharsets.UTF_8);
ch.writeAndFlush(Unpooled.copiedBuffer(response));
} else {
byte[] response = (sdf.format(new Date()) + "【自己】发送了消息 " + msgStr).getBytes(StandardCharsets.UTF_8);
ch.writeAndFlush(Unpooled.copiedBuffer(response));
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4. 建立Netty聊天客户端
package com.wsp.train.netty.group;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class NettyChatClient {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
Channel channel = channelFuture.channel();
SocketAddress socketAddress = channel.localAddress();
System.out.println("连接成功 "+socketAddress+"============");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String content = scanner.nextLine();
ByteBuf byteBuf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(byteBuf);
}
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
5. 实现客户端处理类
package com.wsp.train.netty.group;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutorGroup;
public class NettyChatClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("服务端回复的消息为: "+byteBuf.toString(CharsetUtil.UTF_8));
}
}
|