3.2 WebSocket协议开发
3.2.1 Http协议的弊端
- ( 1)HTTP协议为半双工协议。半双工协议指数据可以在客户端和服务端两个方向上传输,但是不能同时传输。它意味着在同一时刻,只有一个方向上的数据传送,。
- (2)HTTP消息冗长而繁琐。HTTP 消息包含消息头、消息体、换行符等,通常情况下采用文本方式传输,相比于其他的二进制通信协议,冗长而繁琐。
- (3)针对服务器推送的黑客攻击。例如长时间轮询。
- 现在,很多网站为了实现消息推送,所用的技术都是轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端浏览器。这种传统的模式具有很明显的缺点,即浏览器需要不断地向服务器发出请求,然而 HTTP request 的 header是非常冗长的,里面包含的可用数据比例可能非常低,这会占用很多的带宽和服务器资源。
3.2.2 WebSocket基本知识
- webSocket是 HTMLS开始提供的一种浏览器与服务器间进行全双工通信的网络技术,WebSocket通信协议于2011年被IETF定为标准RFC6455,WebSocket API被 W3C定为标准。
- 在 WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道,两者就可以直接互相传送数据了。WebSocket基于TCP双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息,相比于HTTP的半双工协议,性能得到很大提升。
3.2.3 基本过程
- 建立WebSocket连接时,需要通过客户端或者浏览器发出握手请求。请求格式如下:
GET /ws HTTP/1.1
Host: localhost:8002
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36 Edg/99.0.1150.30
Upgrade: websocket
Origin: http://localhost:8001
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5
Sec-WebSocket-Key: spyQjkK99Jt/y+JGqq0ljA==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Content-Length: 0
- 为了建立一个 WebSocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求和通常的HTTP请求不同,包含了一些附加头信息,其中附加头信息Upgrade:WebSocket表明这是一个申请协议升级的HTTP请求。服务器端解析这些附加的头信息,然后生成应答信息返回给客户端,客户端和服务器端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由地传递信息,并且这个连接会持续存在直到客户端或者服务器端的某一方主动关闭连接。
- 请求消息中的Sec-WebSocket-Key是随机的,服务器端会用这些数据来构造出一个SHA-1的信息摘要,把Sec-WebSocket-Key加上一个魔幻字符串258EAFA5-E914-47DA-95CA-CSABODC85B11。使用SHA-1加密,然后进行BASE-64编码,将结果做为Sec-WebSocket-Accept头的值,返回给客户端。
3.2.4 WebSocket开发聊天室
效果图: 规定前后端数据交换格式:定义消息格式 [指令][时间戳][username][头像][消息内容] 定义消息实体类
package com.shu.Protocol;
import org.msgpack.annotation.Message;
@Message
public class IMMessage{
private String addr;
private String cmd;
private long time;
private int online;
private String sender;
private String headPic;
private String receiver;
private String content;
public IMMessage(){}
public IMMessage(String cmd,long time,int online,String content){
this.cmd = cmd;
this.time = time;
this.online = online;
this.content = content;
}
public IMMessage(String cmd,long time,String sender){
this.cmd = cmd;
this.time = time;
this.sender = sender;
}
public IMMessage(String cmd,long time,String sender,String content){
this.cmd = cmd;
this.time = time;
this.sender = sender;
this.content = content;
}
public IMMessage(String cmd,long time,String sender,String content,String headPic){
this.cmd = cmd;
this.time = time;
this.sender = sender;
this.content = content;
this.headPic = headPic;
}
public String getCmd() {
return cmd;
}
public void setCmd(String cmd) {
this.cmd = cmd;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public int getOnline() {
return online;
}
public void setOnline(int online) {
this.online = online;
}
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
public String getReceiver() {
return receiver;
}
public void setReceiver(String receiver) {
this.receiver = receiver;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public String getHeadPic() {
return headPic;
}
public void setHeadPic(String headPic) {
this.headPic = headPic;
}
@Override
public String toString() {
return "IMMessage{" +
"addr='" + addr + '\'' +
", cmd='" + cmd + '\'' +
", time=" + time +
", online=" + online +
", sender='" + sender + '\'' +
", headPic='" + headPic + '\'' +
", receiver='" + receiver + '\'' +
", content='" + content + '\'' +
'}';
}
}
定义命令枚举类
package com.shu.Protocol;
public enum IMP {
SYSTEM("SYSTEM"),
LOGIN("LOGIN"),
LOGOUT("LOGOUT"),
CHAT("CHAT"),
FLOWER("FLOWER");
private String name;
public static boolean isIMP(String content){
return content.matches("^\\[(SYSTEM|LOGIN|LOGOUT|CHAT|FLOWER)\\]");
}
IMP(String name){
this.name = name;
}
public String getName(){
return this.name;
}
public String toString(){
return this.name;
}
}
消息解码器
package com.shu.Protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.msgpack.MessagePack;
import org.msgpack.MessageTypeException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class IMDecoder extends ByteToMessageDecoder {
private Pattern pattern = Pattern.compile("^\\[(.*)\\](\\s\\-\\s(.*))?");
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {
try{
final int length = in.readableBytes();
final byte[] array = new byte[length];
String content = new String(array,in.readerIndex(),length);
if(!(null == content || "".equals(content.trim()))){
if(!IMP.isIMP(content)){
ctx.channel().pipeline().remove(this);
return;
}
}
in.getBytes(in.readerIndex(), array, 0, length);
out.add(new MessagePack().read(array,IMMessage.class));
in.clear();
}catch(MessageTypeException e){
ctx.channel().pipeline().remove(this);
}
}
public IMMessage decode(String msg){
if(null == msg || "".equals(msg.trim())){ return null; }
try{
Matcher m = pattern.matcher(msg);
String header = "";
String content = "";
if(m.matches()){
header = m.group(1);
content = m.group(3);
}
String [] heards = header.split("\\]\\[");
long time = 0;
try{ time = Long.parseLong(heards[1]); } catch(Exception e){}
String username = heards[2];
username = username.length() < 10 ? username : username.substring(0, 9);
if(msg.startsWith("[" + IMP.LOGIN.getName() + "]")){
return new IMMessage(heards[0],time,username);
}else if(msg.startsWith("[" + IMP.CHAT.getName() + "]")){
String headPic = heards[3];
return new IMMessage(heards[0],time,username,content,headPic);
}else if(msg.startsWith("[" + IMP.FLOWER.getName() + "]")){
return new IMMessage(heards[0],time,username);
}else{
return null;
}
}catch(Exception e){
e.printStackTrace();
return null;
}
}
}
消息编码器
package com.shu.Protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
import org.msgpack.MessagePack;
@Slf4j
public class IMEncoder extends MessageToByteEncoder<IMMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out)
throws Exception {
out.writeBytes(new MessagePack().write(msg));
}
public String encode(IMMessage msg){
if(null == msg){ return ""; }
String prex = "[" + msg.getCmd() + "]" + "[" + msg.getTime() + "]";
if(IMP.LOGIN.getName().equals(msg.getCmd()) ||
IMP.FLOWER.getName().equals(msg.getCmd())){
prex += ("[" + msg.getSender() + "]");
}else if(IMP.CHAT.getName().equals(msg.getCmd()) ){
prex += ("[" + msg.getSender() + "]["+msg.getHeadPic()+"]");
}
else if(IMP.SYSTEM.getName().equals(msg.getCmd())){
prex += ("[" + msg.getOnline() + "]");
}
if(!(null == msg.getContent() || "".equals(msg.getContent()))){
prex += (" - " + msg.getContent());
}
log.info("编码消息"+prex);
return prex;
}
}
消息处理中心
package com.shu.Process;
import com.alibaba.fastjson.JSONObject;
import com.shu.Protocol.IMDecoder;
import com.shu.Protocol.IMEncoder;
import com.shu.Protocol.IMMessage;
import com.shu.Protocol.IMP;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MsgProcessor {
private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private IMDecoder decoder = new IMDecoder();
private IMEncoder encoder = new IMEncoder();
private final AttributeKey<String> USERNAME = AttributeKey.valueOf("username");
private final AttributeKey<String> HEAD_PIC = AttributeKey.valueOf("headPic");
private final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
private final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");
public void process(Channel client,String msg){
IMMessage request = decoder.decode(msg);
log.info("解码消息"+String.valueOf(request));
if(null == request){return;}
String username = request.getSender();
if(IMP.LOGIN.getName().equals(request.getCmd())){
client.attr(IP_ADDR).getAndSet("");
client.attr(USERNAME).getAndSet(request.getSender());
client.attr(HEAD_PIC).getAndSet(request.getHeadPic());
onlineUsers.add(client);
for (Channel channel : onlineUsers) {
if (channel != client) {
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 加入聊天室!");
}
else {
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 欢迎进入聊天室!");
}
String text = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
else if(IMP.LOGOUT.getName().equals(request.getCmd())){
logout(client);
}
else if(IMP.CHAT.getName().equals(request.getCmd())){
for (Channel channel : onlineUsers) {
if (channel != client) {
request.setSender(username);
}
else {
request.setSender("MY_SELF");
}
String text = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
else if (IMP.FLOWER.getName().equals(request.getCmd())){
JSONObject attrs = getAttrs(client);
long currTime = sysTime();
if(null != attrs){
long lastTime = attrs.getLongValue("lastFlowerTime");
int seconds = 10;
long sub = currTime - lastTime;
if(sub < 1000 * seconds){
request.setSender("MY_SELF");
request.setCmd(IMP.SYSTEM.getName());
request.setContent("您送鲜花太频繁," + (seconds - Math.round(sub / 1000)) + "秒后再试");
String content = encoder.encode(request);
client.writeAndFlush(new TextWebSocketFrame(content));
return;
}
}
for (Channel channel : onlineUsers) {
if (channel == client) {
request.setSender("MY_SELF");
request.setContent("你给大家送了一波鲜花雨");
setAttrs(client, "lastFlowerTime", currTime);
}else{
request.setSender(getNickName(client));
request.setContent(getNickName(client) + "送来一波鲜花雨");
}
request.setTime(sysTime());
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}
}
public String getNickName(Channel client){
return client.attr(USERNAME).get();
}
public String getAddress(Channel client){
return client.remoteAddress().toString().replaceFirst("/","");
}
public JSONObject getAttrs(Channel client){
try{
return client.attr(ATTRS).get();
}catch(Exception e){
return null;
}
}
private void setAttrs(Channel client,String key,Object value){
try{
JSONObject json = client.attr(ATTRS).get();
json.put(key, value);
client.attr(ATTRS).set(json);
}catch(Exception e){
JSONObject json = new JSONObject();
json.put(key, value);
client.attr(ATTRS).set(json);
}
}
public void logout(Channel client) {
IMMessage request = new IMMessage();
request.setSender(client.attr(USERNAME).get());
request.setCmd(IMP.SYSTEM.getName());
request.setOnline(onlineUsers.size());
request.setContent(request.getSender()+" 退出聊天室!");
for (Channel channel : onlineUsers) {
if (channel != client) {
String text = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
onlineUsers.remove(client);
}
private long sysTime(){
return System.currentTimeMillis();
}
}
server启动器
package com.shu.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
@Component
public class CharServer {
private static final Logger logger = LoggerFactory.getLogger(CharServer.class);
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
ChannelFuture future = null;
@PreDestroy
public void stop(){
if(future!=null){
future.channel().close().addListener(ChannelFutureListener.CLOSE);
future.awaitUninterruptibly();
boss.shutdownGracefully();
work.shutdownGracefully();
future=null;
logger.info(" 服务关闭 ");
}
}
public void start(){
logger.info("nettyServer 正在启动");
int port = 8002;
serverBootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChatNettyServerInitializer());
logger.info("netty服务器在["+port+"]端口启动监听");
try{
future = serverBootstrap.bind(port).sync();
if(future.isSuccess()){
logger.info("nettyServer 完成启动 ");
}
future.channel().closeFuture().sync();
}catch (Exception e){
logger.info("[出现异常释放资源,{%s}]",e);
boss.shutdownGracefully();
work.shutdownGracefully();
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
package com.shu.Server;
import com.shu.Handle.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class ChatNettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
socketChannel.pipeline().addLast(new IdleStateHandler(0,0,5, TimeUnit.MINUTES));
socketChannel.pipeline().addLast(new HttpServerCodec());
socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
socketChannel.pipeline().addLast(new ChunkedWriteHandler());
socketChannel.pipeline().addLast(new WebSocketHandler());
}
}
自定义处理器
package com.shu.Handle;
import com.shu.Process.MsgProcessor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@ChannelHandler.Sharable
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handShaker;
private ChannelHandlerContext ctx;
private MsgProcessor process = new MsgProcessor();
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("收到消息:"+msg);
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
}
else if (msg instanceof WebSocketFrame) {
handleWebSocket(ctx, (WebSocketFrame) msg);
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
handShaker = wsFactory.newHandshaker(request);
if (handShaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handShaker.handshake(ctx.channel(), request);
this.ctx = ctx;
log.info("websocket 建立成功!");
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
if (response.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
HttpHeaders.setContentLength(response, response.content().readableBytes());
}
ChannelFuture f = ctx.channel().writeAndFlush(response);
if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof CloseWebSocketFrame) {
handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
}
String msg =((TextWebSocketFrame)frame).text();
log.info("来自客服端信息:"+msg);
process.process(ctx.channel(),msg);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
process.logout(ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (IdleState.READER_IDLE.equals((event.state()))) {
ctx.writeAndFlush("heartbeat").addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
}
}
super.userEventTriggered(ctx, evt);
}
}
启动类
package com.shu;
import com.shu.Server.CharServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class NettyChatApplication {
public static void main(String[] args) {
SpringApplication.run(NettyChatApplication.class, args);
new CharServer().start();
}
}
观察结果
- 服务器发送附加头信息Upgrade:WebSocket与Sec-WebSocket-Key返回前端
3.2.5 过程分析
- 第一次握手请求消息由 HTTP协议承载,所以它是一个HTTP消息,执行 handlclIttpRequcst方法来处理WcbSocket握手请求。
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
}
else if (msg instanceof WebSocketFrame) {
handleWebSocket(ctx, (WebSocketFrame) msg);
}
- 首先对握手请求消息进行判断,如果消息头中没有包含Upgrade字段或者它的值不是 websocket,则返回HTTP 400响应。
if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
- 握手请求简单校验通过之后,开始构造握手工厂,创建握手处理WebSocketServerHandshaker,通过它构造握手响应消息返回给客户端。
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
handShaker = wsFactory.newHandshaker(request);
if (handShaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handShaker.handshake(ctx.channel(), request);
this.ctx = ctx;
log.info("websocket 建立成功!");
}
- 此时表名WebSocket连接已建立,将进入WebSocket处理逻辑
private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof CloseWebSocketFrame) {
handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
}
String msg =((TextWebSocketFrame)frame).text();
log.info("来自客服端信息:"+msg);
process.process(ctx.channel(),msg);
}
package com.shu.Process;
import com.alibaba.fastjson.JSONObject;
import com.shu.Protocol.IMDecoder;
import com.shu.Protocol.IMEncoder;
import com.shu.Protocol.IMMessage;
import com.shu.Protocol.IMP;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MsgProcessor {
private static ChannelGroup onlineUsers = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private IMDecoder decoder = new IMDecoder();
private IMEncoder encoder = new IMEncoder();
private final AttributeKey<String> USERNAME = AttributeKey.valueOf("username");
private final AttributeKey<String> HEAD_PIC = AttributeKey.valueOf("headPic");
private final AttributeKey<String> IP_ADDR = AttributeKey.valueOf("ipAddr");
private final AttributeKey<JSONObject> ATTRS = AttributeKey.valueOf("attrs");
public void process(Channel client,String msg){
IMMessage request = decoder.decode(msg);
log.info("解码消息"+String.valueOf(request));
if(null == request){return;}
String username = request.getSender();
if(IMP.LOGIN.getName().equals(request.getCmd())){
client.attr(IP_ADDR).getAndSet("");
client.attr(USERNAME).getAndSet(request.getSender());
client.attr(HEAD_PIC).getAndSet(request.getHeadPic());
onlineUsers.add(client);
for (Channel channel : onlineUsers) {
if (channel != client) {
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 加入聊天室!");
}
else {
request = new IMMessage(IMP.SYSTEM.getName(), sysTime(), onlineUsers.size(), username + " 欢迎进入聊天室!");
}
String text = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
else if(IMP.LOGOUT.getName().equals(request.getCmd())){
logout(client);
}
else if(IMP.CHAT.getName().equals(request.getCmd())){
for (Channel channel : onlineUsers) {
if (channel != client) {
request.setSender(username);
}
else {
request.setSender("MY_SELF");
}
String text = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
else if (IMP.FLOWER.getName().equals(request.getCmd())){
JSONObject attrs = getAttrs(client);
long currTime = sysTime();
if(null != attrs){
long lastTime = attrs.getLongValue("lastFlowerTime");
int seconds = 10;
long sub = currTime - lastTime;
if(sub < 1000 * seconds){
request.setSender("MY_SELF");
request.setCmd(IMP.SYSTEM.getName());
request.setContent("您送鲜花太频繁," + (seconds - Math.round(sub / 1000)) + "秒后再试");
String content = encoder.encode(request);
client.writeAndFlush(new TextWebSocketFrame(content));
return;
}
}
for (Channel channel : onlineUsers) {
if (channel == client) {
request.setSender("MY_SELF");
request.setContent("你给大家送了一波鲜花雨");
setAttrs(client, "lastFlowerTime", currTime);
}else{
request.setSender(getNickName(client));
request.setContent(getNickName(client) + "送来一波鲜花雨");
}
request.setTime(sysTime());
String content = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(content));
}
}
}
public String getNickName(Channel client){
return client.attr(USERNAME).get();
}
public String getAddress(Channel client){
return client.remoteAddress().toString().replaceFirst("/","");
}
public JSONObject getAttrs(Channel client){
try{
return client.attr(ATTRS).get();
}catch(Exception e){
return null;
}
}
private void setAttrs(Channel client,String key,Object value){
try{
JSONObject json = client.attr(ATTRS).get();
json.put(key, value);
client.attr(ATTRS).set(json);
}catch(Exception e){
JSONObject json = new JSONObject();
json.put(key, value);
client.attr(ATTRS).set(json);
}
}
public void logout(Channel client) {
IMMessage request = new IMMessage();
request.setSender(client.attr(USERNAME).get());
request.setCmd(IMP.SYSTEM.getName());
request.setOnline(onlineUsers.size());
request.setContent(request.getSender()+" 退出聊天室!");
for (Channel channel : onlineUsers) {
if (channel != client) {
String text = encoder.encode(request);
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
onlineUsers.remove(client);
}
private long sysTime(){
return System.currentTimeMillis();
}
}
|