IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 通过LiveGo学习WebSocket -> 正文阅读

[网络协议]通过LiveGo学习WebSocket

一直对网络直播、在线互动的前端应用特比感兴趣,偶然间在Gitee上看到了livego的开源代码,是基于golang的直播服务实现。代码量不大,于是抽丝剥茧的分析了一下其设计思路,觉得还蛮有意思的。

所以,用户至少具备两个基本属性,用户的id和关联数据推送的WebSocket连接。除了这两个基本信息外,可根据实际业务需要增加其他信息,如用户名、头像等。上线用户的临时组管理无用户认证、用户信息管理等复杂业务逻辑,也没有数据信息的持久化处理。

对于消息的定义有如下字段:消息类型-MType、消息内容-Data和源用户的头像信息-Img。消息类型的串行化处理采用了json.Marshall。

  1. 1. 直播中的业务处理

  2. 直播是一端数据上传,多端数据分发的过程。数据的上传和转发都要借助长连接,所以在livego中采用了WebSocket。livego是一个简易的开放式聊天服务,在线群组管理做的比较简单:

  3. 1) 为请求连接的用户创建唯一标识,若连接成功则在群组中保留;否则将其移除;

  4. 2) 消息转发过程中,根据数据推送的状态更新群群组状态,如数据推送失败则断开连接,并将用户剔除。

所以,用户至少具备两个基本属性,用户的id和关联数据推送的WebSocket连接。除了这两个基本信息外,可根据实际业务需要增加其他信息,如用户名、头像等。上线用户的临时组管理无用户认证、用户信息管理等复杂业务逻辑,也没有数据信息的持久化处理。

对于消息的定义有如下字段:消息类型-MType、消息内容-Data和源用户的头像信息-Img。消息类型的串行化处理采用了json.Marshall。

2.? WebSocket会话

WebSocket会话直播服务的核心基础构件。在livego中自带了一套websocket的封装,协议字段的定义、ws帧解析参考规范文档,draft-ietf-hybi-thewebsocketprotocol-10。这个文档是可以网上找到的,对照文档解读源码,思路会比较清晰易懂。

从WebSocket客户端请求说起。WebSocket请求处理的第一补是解析ws的url,其格式与http的URL格式类似,不同的是把scheme换成了ws或wss,该scheme的处理策略是创建ws连接。处理如下:

1)?根据URL及origin够构造DiagConfig,该配置参数会用于后续的TCP连接的建立和参数配置1

	// Dial opens a new client connection to a WebSocket.
	func Dial(url_, protocol, origin string) (ws *Conn, err error) {
	? ? config, err := NewConfig(url_, origin)
	? ? if err != nil {
	? ? ? ? return nil, err
	? ? }
	? ? if protocol != "" {
	? ? ? ? config.Protocol = []string{protocol}
	? ? }
	? ? return DialConfig(config)
	}
		? 借助golang的net库,创建TCP连接
	func dialWithDialer(dialer *net.Dialer, config *Config) (conn net.Conn, err error) {
	? ? switch config.Location.Scheme {
	? ? case "ws":
	? ? ? ? conn, err = dialer.Dial("tcp", parseAuthority(config.Location))
	? ? case "wss":
	? ? ? ? conn, err = tls.DialWithDialer(dialer, "tcp", parseAuthority(config.Location), config.TlsConfig)
	? ? default:
	? ? ? ? err = ErrBadScheme
	? ? }
	? ? return
	}

2)借助golang的net库,创建TCP连接

	func dialWithDialer(dialer *net.Dialer, config *Config) (conn net.Conn, err error) {
	? ? switch config.Location.Scheme {
	? ? case "ws":
	? ? ? ? conn, err = dialer.Dial("tcp", parseAuthority(config.Location))
	? ? case "wss":
	? ? ? ? conn, err = tls.DialWithDialer(dialer, "tcp", parseAuthority(config.Location), config.TlsConfig)
	? ? default:
	? ? ? ? err = ErrBadScheme
	? ? }
	? ? return
}

? 3)TCP连接创建成功后,构造WS Client

	// DialConfig opens a new client connection to a WebSocket with a config.
	func DialConfig(config *Config) (ws *Conn, err error) {
	? ? var client net.Conn
	? ? if config.Location == nil {
	? ? ? ? return nil, &DialError{config, ErrBadWebSocketLocation}
	? ? }
	? ? if config.Origin == nil {
	? ? ? ? return nil, &DialError{config, ErrBadWebSocketOrigin}
	? ? }
	? ? dialer := config.Dialer
	? ? if dialer == nil {
	? ? ? ? dialer = &net.Dialer{}
	? ? }
		// client是TCP连接对象
	? ? client, err = dialWithDialer(dialer, config)
	? ? if err != nil {
	? ? ? ? goto Error
	? ? }
		// 基于TCP连接创建ws连接
	? ? ws, err = NewClient(config, client)
	? ? if err != nil {
	? ? ? ? client.Close()
	? ? ? ? goto Error
	? ? }
	? ? return
	Error:
	? ? return nil, &DialError{config, err}
}

4)?rwc是tcp连接对象,并由此构造缓冲读写的对象br,bw。并开始执行WS会话中的关键步骤-握手。握手完成后随即构造WS连接对象。hybiClientHandshake实现是纯HTTP的协议交互,按照HTTP的协议规范构造协议内容,如头部、请求体等,发送给对端,根据对端返回的状态码及必要字段的合法性判断握手过程是否成功。

	// NewClient creates a new WebSocket client connection over rwc.
	func NewClient(config *Config, rwc io.ReadWriteCloser) (ws *Conn, err error) {
	? ? br := bufio.NewReader(rwc)
	? ? bw := bufio.NewWriter(rwc)
	? ? err = hybiClientHandshake(config, br, bw)
	? ? if err != nil {
	? ? ? ? return
	? ? }
	? ? buf := bufio.NewReadWriter(br, bw)
	? ? ws = newHybiClientConn(config, buf, rwc)
	? ? return
	}
		? WS连接创建的过程即为Conn对象的构造和初始化过程,各自字段的含义及备注说明如下:
	// newHybiConn creates a new WebSocket connection speaking hybi draft protocol.
	func newHybiConn(config *Config, buf *bufio.ReadWriter, rwc io.ReadWriteCloser, request *http.Request) *Conn {
	? ? if buf == nil {
	? ? ? ? br := bufio.NewReader(rwc)
	? ? ? ? bw := bufio.NewWriter(rwc)
	? ? ? ? buf = bufio.NewReadWriter(br, bw)
	? ? }
	? ? ws := &Conn{config: config, request: request, buf: buf, rwc: rwc,
	? ? ? ? frameReaderFactory: hybiFrameReaderFactory{buf.Reader},
	? ? ? ? frameWriterFactory: hybiFrameWriterFactory{
	? ? ? ? ? ? buf.Writer, request == nil},
	? ? ? ? PayloadType: ? ? ? ?TextFrame,
	? ? ? ? defaultCloseStatus: closeStatusNormal}
	? ? ws.frameHandler = &hybiFrameHandler{conn: ws}
	? ? return ws
	}
	
	
	// Conn represents a WebSocket connection.
	//
	// Multiple goroutines may invoke methods on a Conn simultaneously.
	type Conn struct {
	? ? config ?*Config           // 连接建立的基础参数配置
	? ? request *http.Request     // 服务端WS处理用,保存对端WS连接的握手请求对象
	? ? buf *bufio.ReadWriter     // TCP连接的缓存读写
	? ? rwc io.ReadWriteCloser    // 关闭连接的接口对象
	
	? ? rio sync.Mutex
	? ? frameReaderFactory        // 工厂类-构造帧读对象
	? ? frameReader               // 读对象记录:每次的读都要构造新的读对象,用于处理连接中的数据。该对象记录用于读取上一次读操作后剩余的数据
	
	? ? wio sync.Mutex
	? ? frameWriterFactory        // 工厂类-构造写对象
	
	? ? frameHandler              // 读操作中的中继处理,如连续读中的状态码记录和转换
	? ? PayloadType ? ? ? ?byte   // 与conn对象绑定,写数据时默认的数据类型
	? ? defaultCloseStatus int
	? ? // MaxPayloadBytes limits the size of frame payload received over Conn
	? ? // by Codec's Receive method. If zero, DefaultMaxPayloadBytes is used.
	? ? MaxPayloadBytes int       // 读数据时最大的有效长度,超长直接报错
	}

5)?连接建立之后,就是WS帧的读写操作。WS帧的结构为:

|------------- Frame Header -----------------|------------------- PlayLoad ----------------|

FrameHeader是对帧状态、长度等的基本信息描述,对PayLoad的读取起决定作用。

先看读操作

	func (ws *Conn) Read(msg []byte) (n int, err error) {
	? ? ws.rio.Lock()
	? ? defer ws.rio.Unlock()
	again:
	? ? if ws.frameReader == nil {
		   // 构造读对象。实现上是从连接中读数据,构造ws帧的描述信息,即Frame Header,包含状态位,帧长度等;
	? ? ? ? frame, err := ws.frameReaderFactory.NewFrameReader()
	? ? ? ? if err != nil {
	? ? ? ? ? ? return 0, err
	? ? ? ? }
	       // 可能是中间读,所以通过handler校正下PayloadType
	? ? ? ? ws.frameReader, err = ws.frameHandler.HandleFrame(frame)
	? ? ? ? if err != nil {
	? ? ? ? ? ? return 0, err
	? ? ? ? }
	? ? ? ? if ws.frameReader == nil {
	? ? ? ? ? ? goto again
	? ? ? ? }
	? ? }
	? ? // 根据FrameHeader描述,读取连接中的PayLoad数据。如果当前的OperateCode是FinishFrame则表示
	         //   整个帧读取结束,返回io.EOF
	? ? n, err = ws.frameReader.Read(msg)
	? ? if err == io.EOF {
	? ? ? ? if trailer := ws.frameReader.TrailerReader(); trailer != nil {
	? ? ? ? ? ? io.Copy(ioutil.Discard, trailer)
	? ? ? ? }
	? ? ? ? ws.frameReader = nil
	? ? ? ? goto again //再未遇到CloseFrame之前,要多次、连续读
	? ? }
	? ? return n, err
	}

6)?写操作与读类似,读是分头解析和数据读取,那么写自然也是分两部分。值得注意是,写操作是一次性将所有数据单帧写入到连接,不做拆分,每一帧都结束帧,不存在中间连续帧的情况。

	// Write implements the io.Writer interface:
	// it writes data as a frame to the WebSocket connection.
	func (ws *Conn) Write(msg []byte) (n int, err error) {
	? ? ws.wio.Lock()
	? ? defer ws.wio.Unlock()
		// 构造FrameHeader对象,完成Header中描述字段的初始化赋值
	? ? w, err := ws.frameWriterFactory.NewFrameWriter(ws.PayloadType)
	? ? if err != nil {
	? ? ? ? return 0, err
	? ? }
		// 写Header和PayLoad数据
	? ? n, err = w.Write(msg)
	? ? w.Close()
	? ? return n, err
	}

至此,WS客户端的WS建立及数据读写过程就说完了。其实WS Client的建立和读写操作对服务同样适用,剩下的是服务端如何处理WS的握手请求并建立WS连接的。首先从http的请求分发开始

1)?服务启动时会注册如下的http请求处理,表示如果是chat请求则走WS的握手过程

	func main() {
	? ? //fmt.Printf("LiveGoServer is ready...\n")
	? ? go func() {
	? ? ? ? http.Handle("/chat", websocket.Handler(pwint)) //这里是将函数对象强转为Handler类型
? ? }()

2)?Handler是实现了http.Handler接口的类型,自然在接口的实现中开始处理对端的WS握手请求

		// Handler是函数类型别名
        type Handler func(*Conn)
		// ServeHTTP implements the http.Handler interface for a WebSocket
		func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
		? ? s := Server{Handler: h, Handshake: checkOrigin}
		? ? s.serveWebSocket(w, req)
        }

3)serveWebSocket是完成握手过程,创建WS连接

		func (s Server) serveWebSocket(w http.ResponseWriter, req *http.Request) {
		? ? rwc, buf, err := w.(http.Hijacker).Hijack()  // 接管http的tcp连接对象
		? ? if err != nil {
		? ? ? ? panic("Hijack failed: " + err.Error())
		? ? }
		? ? // The server should abort the WebSocket connection if it finds
		? ? // the client did not send a handshake that matches with protocol
		? ? // specification.
		? ? defer rwc.Close()  // 直播业务结束后,断开tcp连接
			// 创建WS连接
		? ? conn, err := newServerConn(rwc, buf, req, &s.Config, s.Handshake)
		? ? if err != nil {
		? ? ? ? return
		? ? }
		? ? if conn == nil {
		? ? ? ? panic("unexpected nil conn")
		? ? }
			// 这里类似接口回调,转发到上方的pwint接口中。pwint为阻塞接口,持续的连接读写。
		? ? s.Handler(conn)
}

4)?最后看一下pwint,可以发现是最上层的业务处理:更新临时组、从当前的连接中读后转发给其他连接,完成直播中的多端分发处理。

好了,没写多少文字,大部分都是在分析代码,相信应该知道websocket是怎么回事了,这就是与new WebSocket而后Send、Receive调用相比有意思的地方的,就是可以看到背后的细节。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-07-03 11:09:04  更:2022-07-03 11:09:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 9:03:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码