一 需求
1 监听所有客户端的上线和下线。
2 将某一个客户端的上线和离线情况,转告给其他客户端“客户端XX上/下线”
3 客户端先将消息发送给服务端,服务端再将此消息转发给所有客户端(包括发送者自己),如果其他客户端接收到了此消息,则显示“【某ip】发送的消息:XXX”;如果是自己接收到了此消息,则消息“【我】发送的消息:XXX”
二 服务端
1 主程序类
package netty.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyNettyServerTest {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap:服务端启动时的初始化操作
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 将 bossGroup 和 workerGroup 注册到服务端的 Channel 上,并注册一个服务端的初始化器 MyNettyServerInitializer
// 该初始化器中的 initChannel()方法,会在连接被注册后立刻执行;最后将端口号绑定到 8888
ChannelFuture channelFuture = serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyNettyServerInitializer())
.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2 自定义初始化器
package netty.chat;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
// DelimiterBasedFrameDecoder(maxFrameLength, delimiters):分隔符处理器;将接收到的客户端消息,通过回车符(Delimiters.lineDelimiter())进行分割。
pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(2048, Delimiters.lineDelimiter()));
pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
// 自定义处理器
pipeline.addLast("MyNettyServerHandler", new MyNettyServerHandler());
}
}
3 自定义处理器
package netty.chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 每当从服务端读取到客户端写入的信息时,就将该信息转发给所有的客户端 Channel(实现聊天室的效果)。
@Override
protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) throws Exception {
Channel channel = ctx.channel();
// 遍历channelGroup,从而区分“我”和“别人”发出的消息,如果消息是自己发出的就显示“我”
channelGroup.forEach(chnl -> { // JDK8 提供的lambda表达式
if (channel == chnl)
chnl.writeAndFlush("【我】发送的消息:" + receiveMsg + "\n");
else
chnl.writeAndFlush("【" + channel.remoteAddress() + "】发送的消息:" + receiveMsg + "\n");
});
}
// 连接建立。每当从服务端收到新的客户端连接时,就将新客户端的 Channel 加入 ChannelGroup 列表中,并告知列表中的其他客户端Channel
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("客户端-" + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}
// 监听客户端上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "上线");
}
// 监听客户端下线
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "下线");
}
// 连接断开。每当从服务端感知有客户端断开时,就将该客户端的 Channel 从 ChannelGroup 列表中移除,并告知列表中的其他客户端 Channel
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 会自动将 channelGroup 中断开的连接移除掉
channelGroup.writeAndFlush("客户端-" + channel.remoteAddress() + "离开\n");
}
}
三 客户端
1 主程序类
package netty.chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class MyNettyClientTest {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyNettyClientInitializer());
Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) { // 客户端不断的通过控制台向服务端发送消息
channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
2 自定义初始化器
package netty.chat;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class MyNettyClientInitializer extends ChannelInitializer<SocketChannel> {
// 连接被注册后,立刻执行此方法
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
// 与服务端的 Initializer 作用相同:通过 DelimiterBasedFrameDecoder 将接收到的服务端消息,通过回车符(Delimiters.lineDelimiter())进行分割。
pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(2048, Delimiters.lineDelimiter()));
pipeline.addLast("StringDecoder",new StringDecoder(CharsetUtil.UTF_8)) ;
pipeline.addLast("StringEncoder",new StringEncoder(CharsetUtil.UTF_8)) ;
// 自定义处理器
pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());
}
}
3 自定义处理器
package netty.chat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String receiveMsg) {
System.out.println(receiveMsg);
}
}
四 测试
1 启动服务端,整个测试过程打印如下
/127.0.0.1:59967上线
/127.0.0.1:60043上线
2 启动第一个客户端,整个测试过程打印如下
客户端-/127.0.0.1:60043加入
你好,我是客户端1
【我】发送的消息:你好,我是客户端1
3 启动第二个客户端,整个测试过程打印如下
【/127.0.0.1:59967】发送的消息:你好,我是客户端1
|