又来手把手教大家写代码了,虽然没人看。。。
这是【分布式系统动手实现】系列的第2篇,至于为什么会写这个系列?以及第1篇是啥?
牛逼!自己动手从0实现一个分布式RPC框架,成功拿下阿里offer!
这些项目都是我花了很多个日夜撸出来的,但是阅读量并不高,果然硬核技术文大家不喜欢看。
但是没关系,我还是会坚持写,因为这类文章是有价值的,是真正能帮大家简历加分的项目。
认真看完并且跟着我一起动手实现的小伙伴,相信你们是会有收获的。
Github地址:(欢迎star)
https://github.com/xiajunhust/tinywheel
Websocket介绍
http协议存在的缺陷
- http是半双工协议,同一时刻只能在一个方向上进行数据传送
- 是无状态的,因此客户端想要了解服务端的状态只能轮询,效率低下浪费资源
- http协议消息冗长繁琐,带宽资源利用率低
websocket的优势
HTTP和websocket对比
HTML 5 定义了websocket协议,websocket在2011年成为国际标标准,目前大部分浏览器均已支持。其最大的优势是服务器可以主动向客户端推送信息。其基于TCP进行双向全双工的消息传送,相比http的半双工性能和效率得到很大的提升。
从下图可以看出,http连接,每次客户端和服务端交互都需要建连,服务端返回响应之后连接即断开,下次请求还需要再次建连,非常耗性能。而websocket连接建立后是一直存在的。
websocket的常见应用场景:
- 实时web应用。在客户端展示服务端的实时数据。如交易网站价格信息。
- 游戏应用。
- 聊天应用。
websocket连接的生命周期
websocket连接的完整生命周期可以用如下图来表示:
- CONNECTING-连接中,即建立连接
- OPEN-已建立连接,可以传输数据
- CLOSING-关闭中,关闭连接握手
- CLOSED-已关闭
握手阶段-建立连接
为了在客户端和服务端建立websocket连接,客户端发送一个http协议的握手请求:
GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Origin: http://example.com Sec-WebSocket-Protocol: chat, superchat Sec-WebSocket-Version: 13
“Upgrade: websocket”表示这是一个申请协议升级的http请求。服务端解析这些附加的头信息,返回应答信息:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= Sec-WebSocket-Protocol: chat
连接就建立起来了。
数据传输阶段
数据传输的基本单位是帧(frame),一条消息可能由多个帧组成。帧分为数据帧和控制帧。
- 数据帧用来在客户端和服务端之间传输应用数据,数据类型支持3种:文本数据、二进制、用来标识和上一个帧连接的帧。
- 控制帧:用来传输连接本身的元数据信息:关闭帧、ping、pong「心跳」。
数据帧的格式:
opcode用来标识帧的类型:
用netty实现一个websocket聊天室
我们利用netty基于websocket来实现一个基于浏览器的聊天应用程序。
整体逻辑如下:
websocket服务端可以接受多个客户端的连接,提供服务。
netty server:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
/**
* websocket server based netty
*
* @author summer
* @version $Id: SimpleWebsocketServer.java, v 0.1 2022年01月26日 9:34 AM summer Exp $
*/
@Slf4j
public class SimpleWebsocketServer {
/**
* host
*/
public final static String host = "127.0.0.1";
/**
* 端口号
*/
public final static Integer port = 8085;
/**
* netty服务端启动方法
*/
public void start() {
log.info("SimpleWebsocketServer start begin ");
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
//开启tcp nagle算法
.childOption(ChannelOption.TCP_NODELAY, true)
//开启长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel c) {
c.pipeline().addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(512 * 1024))
.addLast(new ChunkedWriteHandler())
.addLast(new SimpleWebsocketServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
log.info("SimpleWebsocketServer start at port " + port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("SimpleWebsocketServer start exception,", e);
} finally {
log.info("SimpleWebsocketServer shutdown bossEventLoopGroup&workerEventLoopGroup gracefully");
bossEventLoopGroup.shutdownGracefully();
workerEventLoopGroup.shutdownGracefully();
}
}
}
实际处理http请求的handler:
import com.alibaba.fastjson.JSON;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
/**
* websocket服务端处理handler实现
*
* @author summer
* @version $Id: SimpleWebsocketServerHandler.java, v 0.1 2022年01月26日 9:44 AM summer Exp $
*/
@Slf4j
public class SimpleWebsocketServerHandler extends SimpleChannelInboundHandler<Object> {
/**
*websocket shake handler
*/
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
try {
log.info("SimpleWebsocketServerHandler receive msg=" + msg);
if (msg instanceof FullHttpRequest) {
handleHttpShakehandRequest(ctx, (FullHttpRequest)msg);
} else if (msg instanceof WebSocketFrame) {
handleWebsocketFrame(ctx, (WebSocketFrame)msg);
} else {
log.error("SimpleWebsocketServerHandler channelRead0,unkown msg");
}
} catch (Exception e) {
log.error("channelRead0 exception,", e);
}
}
/**
* 处理建连握手请求
*
* @param ctx
* @param fullHttpRequest
*/
private void handleHttpShakehandRequest(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) {
log.info("handleHttpShakehandRequest begin~");
//http请求头合法性检查
if (!fullHttpRequest.getDecoderResult().isSuccess() || !StringUtils.equals("websocket", fullHttpRequest.headers().get("Upgrade"))) {
log.warn("handleHttpShakehandRequest fail,fullHttpRequest illegal,fullHttpRequest=" + fullHttpRequest.toString());
return;
}
//构造握手响应返回
String webSocketURL = SimpleWebsocketServer.host + ":" + SimpleWebsocketServer.port + "/websocket";
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(webSocketURL, null, false);
//实例化一个握手处理handler
handshaker = factory.newHandshaker(fullHttpRequest);
if (handshaker == null) {
log.warn("handleHttpShakehandRequest fail,sendUnsupportedVersionResponse,fullHttpRequest=" + fullHttpRequest.toString());
//不支持
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
log.info("handleHttpShakehandRequest success.");
handshaker.handshake(ctx.channel(), fullHttpRequest);
}
}
/**
* 处理请求帧
*
* @param ctx
* @param webSocketFrame
*/
private void handleWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {
if (webSocketFrame instanceof CloseWebSocketFrame) {
log.info("handleWebsocketFrame close frame");
//控制帧-关闭
handshaker.close(ctx.channel(), (CloseWebSocketFrame)webSocketFrame.retain());
return;
}
if (webSocketFrame instanceof PingWebSocketFrame) {
log.info("handleWebsocketFrame ping frame");
//控制帧-ping
ctx.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
return;
}
//数据帧,仅支持文本形式
if (!(webSocketFrame instanceof TextWebSocketFrame)) {
log.error("handleWebsocketFrame,unsupprted data frame");
return;
}
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame)webSocketFrame;
//构造响应结果
String request = textWebSocketFrame.text();
log.info("handleWebsocketFrame,receive data frame,text=" + request);
ctx.channel().write(new TextWebSocketFrame(request + "_" + System.currentTimeMillis()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("SimpleWebsocketServerHandler exception,", cause);
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
在springboot主程序中启动netty server:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SimplewebsocketserverApplication {
public static void main(String[] args) {
SimpleWebsocketServer simpleWebsocketServer = new SimpleWebsocketServer();
simpleWebsocketServer.start();
SpringApplication.run(SimplewebsocketserverApplication.class, args);
}
}
为了方便测试,我们直接利用js来发起请求,将如下文件保存为websocket.html文件:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>websocket测试</title>
<style type="text/css">
h3,h4{
text-align:center;
}
</style>
</head>
<body>
<h3>WebSocket测试,在<span style="color:red">控制台</span>查看测试信息输出!</h3>
<h4>
单发消息<li>url=/api/ws/sendOne?message=单发消息内容&id=none</li>
群发消息<li>url=/api/ws/sendAll?message=群发消息内容</li>
</h4>
<script type="text/javascript">
var socket;
if (typeof (WebSocket) == "undefined") {
console.log("遗憾:您的浏览器不支持WebSocket");
} else {
console.log("恭喜:您的浏览器支持WebSocket");
//实现化WebSocket对象
//指定要连接的服务器地址与端口建立连接
//注意ws、wss使用不同的端口。我使用自签名的证书测试,
//无法使用wss,浏览器打开WebSocket时报错
//ws对应http、wss对应https。
socket = new WebSocket("ws://localhost:8085/ws/asset");
//连接打开事件
socket.onopen = function() {
console.log("Socket 已打开");
socket.send("消息发送测试(From Client)");
};
//收到消息事件
socket.onmessage = function(msg) {
console.log(msg);
//获得服务器的消息推送
alert(msg.data)
//console.log(msg.data);
};
//连接关闭事件
socket.onclose = function() {
console.log("Socket已关闭");
};
//发生了错误事件
socket.onerror = function() {
alert("Socket发生了错误");
}
//窗口关闭时,关闭连接
window.unload=function() {
socket.close();
};
}
</script>
</body>
</html>
用支持websocket的浏览器打开此websocket.html文件,可以看到收到了服务端返回的结果:
服务端日志显示正常处理:
23:52:26.559 [nioEventLoopGroup-3-1] INFO com.summer.simplewebsocketserver.SimpleWebsocketServerHandler - SimpleWebsocketServerHandler receive msg=HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
GET /ws/asset HTTP/1.1
Host: localhost:8085
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36
Upgrade: websocket
Origin: null
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Sec-WebSocket-Key: 2qmtoAbFKDi3nccZdLJrzQ==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
content-length: 0
23:52:26.559 [nioEventLoopGroup-3-1] INFO com.summer.simplewebsocketserver.SimpleWebsocketServerHandler - handleHttpShakehandRequest begin~
23:52:26.563 [nioEventLoopGroup-3-1] INFO com.summer.simplewebsocketserver.SimpleWebsocketServerHandler - handleHttpShakehandRequest success.
23:52:26.574 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker - [id: 0x0f7a984c, L:/127.0.0.1:8085 - R:/127.0.0.1:50041] WebSocket version V13 server handshake
23:52:26.576 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker - WebSocket version 13 server handshake key: 2qmtoAbFKDi3nccZdLJrzQ==, response: XQR0ok8e59cI934NeuY5LCbiVXY=
23:52:26.645 [nioEventLoopGroup-3-1] INFO com.summer.simplewebsocketserver.SimpleWebsocketServerHandler - SimpleWebsocketServerHandler receive msg=TextWebSocketFrame(data: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 31))
23:52:26.646 [nioEventLoopGroup-3-1] INFO com.summer.simplewebsocketserver.SimpleWebsocketServerHandler - handleWebsocketFrame,receive data frame,text=消息发送测试(From Client)
23:52:26.648 [nioEventLoopGroup-3-1] INFO com.summer.simplewebsocketserver.SimpleWebsocketServerHandler - handleWebsocketFrame,send response text success:[这是响应结果[消息发送测试(From Client)]_1643471546646]
这篇文章花了我挺长时间写出来的,看看这些详细的代码实现,你们应该能感受到我想把你教会的诚意了吧~~~
如果觉得有用的话,点赞+分享+收藏,一键三连防止走丢哇~
欢迎关注宫伀号「编程学习指南」获取更多分布式框架实现代码「徒手撸轮子」,完善你的项目经验。
|