前言
笔记基于黑马的Netty教学讲义加上自己的一些理解,感觉这是看过的视频中挺不错的,基本没有什么废话,视频地址:黑马Netty,这里的聊天室是用的黑马提供的资料,跟着黑马来敲的,补齐了视频中没有写的退出群聊和查看成员方法。
首页这里的代码没有涉及到数据库,对于注册等方法以后进行完善,其次里面的一些方法写得不是很完整,考虑的情况不全,但是只要达到学习的目的就够了,知道聊天室消息的流程就够了
聊天室群聊
1. 思路
- 账号密码存在后台中,登陆完成之后和 Server 建立连接之后,把 channel 和 用户名字关联起来存到一个 map 中
- 对于每个 handler 的处理我们都继承 SimpleChannelInboundHandler 这个类,对于不同的消息,进行不同的处理
- 每个 handler 都是 @Sharable 的,要加上这个注解
- 对于每个人和每个群聊都用 map 进行对 channel 的存储
- 在发送消息的时候,群聊就是通过 map 找出所有的 channel,给出了自己的其他人发送消息
- 对于每一个 client,为了防止异常的连接比如网络卡这些影响,我们需要定时发送心跳包给服务端
2. 代码
1. handler,用于处理消息
1. 好友聊天消息处理器
- 拿到好友姓名
- 通过好友姓名查出channel
- 发送消息
- 如果对方不在线就响应对方不在线给自己
@ChannelHandler.Sharable
public class ChatRequestHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
if(channel != null){
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
}
else{
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不在线"));
}
}
}
2. 群聊消息处理器
- 获取群聊名字
- 获取发送的内容
- 获取这个群里面所有的成员的 channel
- 给出了自己的所有 channel 发送消息
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
String content = msg.getContent();
List<Channel> membersChannel = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
if(membersChannel == null){
ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));
}else{
for (Channel channel : membersChannel) {
if(!ctx.channel().equals(channel)){
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
}
}
}
}
3. 创建群消息处理器
- 获取群聊名字
- 获取到要拉进去的人
- 创建一个 Group,create 方法进行存储到 map 了
- 给拉进群的人发送消息,通知他们被拉进群了
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if(group == null){
List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel channel : channels) {
if(!channel.equals(ctx.channel())){
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入群聊" + groupName));
}
}
ctx.writeAndFlush(new GroupCreateResponseMessage(true, "创建群聊成功"));
}else{
ctx.writeAndFlush(new GroupCreateResponseMessage(false, "群已经存在了"));
}
}
}
4. 加入群聊消息处理器
- 获取自己的channel
- 把自己添加到 group 的 map 中
- 拿到这个群里面的人的 channel
- 给除了自己的所有人发通知有人加群了
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
Channel channel = ctx.channel();
GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(msg.getGroupName());
for (Channel users : membersChannel) {
if(!users.equals(channel)){
users.writeAndFlush(new GroupJoinResponseMessage(true, msg.getUsername() + "加入了聊天室"));
}
}
}
}
5. 查看群成员消息处理器
- 获取群聊名字
- 获取到所有的成员的 channel
- 通过 channel 找到对应的名字
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
List<String> users = GroupSessionFactory.getGroupSession().getUsers(groupName);
ctx.channel().writeAndFlush("组 " + groupName + " 的成员是:" + users.toString());
}
}
6. 退出群消息处理器
- 拿到群聊名字
- 获取群里面的人的 channel
- 发送消息通知
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
String username = msg.getUsername();
String groupName = msg.getGroupName();
GroupSessionFactory.getGroupSession().removeMember(groupName, username);
for (Channel channel : GroupSessionFactory.getGroupSession().getMembersChannel(groupName)) {
channel.writeAndFlush(new GroupQuitResponseMessage(true, "用户" + username + "退出了群聊"));
}
}
}
7. 登陆消息处理器
- 获取用户名密码,进行检测
- 把 channel 和 名字相对应起来存入 map 中
- 发通知
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message = null;
if (login) {
message = new LoginResponseMessage(true, "登陆成功");
SessionFactory.getSession().bind(ctx.channel(), username);
} else {
message = new LoginResponseMessage(false, "用户名或者密码不正确");
}
ctx.writeAndFlush(message);
}
}
8. 退出程序处理器
- 两种情况,一种是连接断开,一种是异常
- 都要对应处理,把 channel 移除掉
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经断开", ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经异常断开,异常是:{}", ctx.channel(), cause.getMessage());
}
}
2. 消息类,用于发送返回
1. AbstractResponseMessage
@Data
@ToString(callSuper = true)
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
}
2. ChatRequestMessage
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
@Override
public int getMessageType() {
return ChatRequestMessage;
}
}
3. ChatResponseMessage
@Data
@ToString(callSuper = true)
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
}
4. GroupChatRequestMessage
@Data
@ToString(callSuper = true)
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
}
5. GroupChatResponseMessage
@Data
@ToString(callSuper = true)
public class GroupChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public GroupChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public GroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
}
6. GroupCreateRequestMessage
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
}
7. GroupCreateResponseMessage
@Data
@ToString(callSuper = true)
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
8. GroupJoinRequestMessage
@Data
@ToString(callSuper = true)
public class GroupJoinRequestMessage extends Message {
private String groupName;
private String username;
public GroupJoinRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupJoinRequestMessage;
}
}
9. GroupJoinResponseMessage
@Data
@ToString(callSuper = true)
public class GroupJoinResponseMessage extends AbstractResponseMessage {
public GroupJoinResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupJoinResponseMessage;
}
}
10. GroupMembersRequestMessage
@Data
@ToString(callSuper = true)
public class GroupMembersRequestMessage extends Message {
private String groupName;
public GroupMembersRequestMessage(String groupName) {
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupMembersRequestMessage;
}
}
11. GroupMembersResponseMessage
@Data
@ToString(callSuper = true)
public class GroupMembersResponseMessage extends Message {
private Set<String> members;
public GroupMembersResponseMessage(Set<String> members) {
this.members = members;
}
@Override
public int getMessageType() {
return GroupMembersResponseMessage;
}
}
12. GroupQuitRequestMessage
@Data
@ToString(callSuper = true)
public class GroupQuitRequestMessage extends Message {
private String groupName;
private String username;
public GroupQuitRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupQuitRequestMessage;
}
}
13. GroupQuitResponseMessage
@Data
@ToString(callSuper = true)
public class GroupQuitResponseMessage extends AbstractResponseMessage {
public GroupQuitResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupQuitResponseMessage;
}
}
14. LoginRequestMessage
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
15. LoginResponseMessage
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
15. Message
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
messageClasses.put(PingMessage, GroupMembersResponseMessage.class);
messageClasses.put(PongMessage, GroupMembersResponseMessage.class);
}
}
16. PingMessage
public class PingMessage extends Message{
@Override
public int getMessageType() {
return PingMessage;
}
}
17. PongMessage
public class PongMessage extends Message{
@Override
public int getMessageType() {
return PongMessage;
}
}
3. Session类,用于存储 channel
1. Group:聊天室
@Data
public class Group {
private String name;
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
}
2. GroupSession:聊天室接口
public interface GroupSession {
Group createGroup(String name, Set<String> members);
Group joinMember(String name, String member);
Group removeMember(String name, String member);
Group removeGroup(String name);
Set<String> getMembers(String name);
List<Channel> getMembersChannel(String name);
List<String> getUsers(String name);
}
3. GroupSessionFactory
public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}
4. GroupSessionFactoryImpl:群聊方法实现类
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
if(groupMap.get(name) == null){
return null;
}
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
@Override
public List<String> getUsers(String name) {
if(groupMap.get(name) == null){
return null;
}
return new ArrayList<>(groupMap.get(name).getMembers());
}
}
5. Session:会话管理接口
public interface Session {
void bind(Channel channel, String username);
void unbind(Channel channel);
Object getAttribute(Channel channel, String name);
void setAttribute(Channel channel, String name, Object value);
Channel getChannel(String username);
}
6. SessionFactory :会话工厂类
public abstract class SessionFactory {
private static Session session = new SessionMemoryImpl();
public static Session getSession() {
return session;
}
}
7. SessionMemoryImpl:会话管理接口实现类
里面的方法就是绑定 channel 和 name 的
public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Object getAttribute(Channel channel, String name) {
return channelAttributesMap.get(channel).get(name);
}
@Override
public void setAttribute(Channel channel, String name, Object value) {
channelAttributesMap.get(channel).put(name, value);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
@Override
public String toString() {
return usernameChannelMap.toString();
}
}
4. 登陆类
1. UserService:登陆管理接口
public interface UserService {
boolean login(String username, String password);
}
2. UserServiceFactory:返回userService的实现类
public abstract class UserServiceFactory {
private static UserService userService = new UserServiceMemoryImpl();
public static UserService getUserService() {
return userService;
}
}
3. UserServiceMemoryImpl:用户管理接口的实现类
public class UserServiceMemoryImpl implements UserService {
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
allUserMap.put("zhangsan", "123");
allUserMap.put("lisi", "123");
allUserMap.put("wangwu", "123");
allUserMap.put("zhaoliu", "123");
allUserMap.put("qianqi", "123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
}
5. 协议
1. MessageCodecSharable: 协议累,消息按这种格式传递和解析
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
out.writeBytes(new byte[]{1, 2, 3, 4});
out.writeByte(1);
out.writeByte(0);
out.writeByte(msg.getMessageType());
out.writeInt(msg.getSequenceId());
out.writeByte(0xff);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
out.writeInt(bytes.length);
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
out.add(message);
}
}
2. 协议解码处理类, 配合上面的协议类一起用
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
6. 服务端
- 添加各种 handler
- 注意心跳机制的处理
- 其他就是很常规的代码了
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ChatRequestHandler CHAT_HANDLER = new ChatRequestHandler();
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
GroupMembersRequestMessageHandler GROUP_MEMBERT_HANDLER = new GroupMembersRequestMessageHandler();
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
GroupChatRequestMessageHandler CROUP_CHAR_HANDLER = new GroupChatRequestMessageHandler();
QuitHandler QUIT_HANDLER = new QuitHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 8, 0));
ch.pipeline().addLast(new ChannelDuplexHandler(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state() == IdleState.READER_IDLE ){
ctx.channel().close();
}
}
});
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(LOGIN_HANDLER);
ch.pipeline().addLast(CHAT_HANDLER);
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
ch.pipeline().addLast(GROUP_JOIN_HANDLER);
ch.pipeline().addLast(GROUP_MEMBERT_HANDLER);
ch.pipeline().addLast(GROUP_QUIT_HANDLER);
ch.pipeline().addLast(CROUP_CHAR_HANDLER);
ch.pipeline().addLast(QUIT_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
7. 客户端
- 通过菜单对不同的消息进行不同的处理
- 使用 CountDownLatch 来完成线程间的通信
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state() == IdleState.WRITER_IDLE ){
ctx.writeAndFlush(new PingMessage());
}
}
});
ch.pipeline().addLast("clien handler", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg:{}", msg);
if(msg instanceof LoginResponseMessage){
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
LOGIN.set(true);
}
}
WAIT_FOR_LOGIN.countDown();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(()->{
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
LoginRequestMessage message = new LoginRequestMessage(username, password);
ctx.writeAndFlush(message);
System.out.println("等待后续操作....");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!LOGIN.get()){
ctx.channel().close();
return;
}
while(true){
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
String[] s = command.split(" ");
switch(s[0]){
case "send":{
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
}
case "gsend":{
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
}
case "gcreate":{
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
set.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
}
case "gmembers":{
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
}
case "gjoin":{
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
}
case "gquit":{
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
}
case "quit":{
ctx.channel().close();
return;
}
default:
throw new IllegalStateException("Unexpected value: " + s[0]);
}
}
}, "System in").start();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
如有错误,欢迎指出!!!
|