引言
使用WebSocket实现实时的在线聊天室有以下3个技术难点: 1.实时记录每个用户都在线状态,并实时更新在线用户数量 2.实时接收客户端的消息并广播到所有客户端 3.唯一记录和标识每一个客户端
数据结构的定义
客户端结构体定义
type Client struct {
ID string
IpAddress string
IpSource string
UserId interface{}
Socket *websocket.Conn
Send chan []byte
Start time.Time
ExpireTime time.Duration
}
其中Start是记录客户端最后一次给服务端发送消息的时间,ExpirTime则表示是在ExpirTime时间内如果服务端没有接收客户端发送过来的消息就代表对应客户端已经下线。Socket则是用来记录客户端和服务端直接到WebSocket连接。
管理使用在线客户端结构体定义
type ClientManager struct {
Clients map[string]*Client
Broadcast chan []byte
Register chan *Client
UnRegister chan *Client
}
由于在本案例中每个客户端都有自己独立的goroutine,所以利用channel保证多个goroutine之间的正常通信,进而保证每个客户端之间正常的消息通讯
消息体的定义
type WsMessage struct {
Type int `json:"type"`
Data interface{} `json:"data"`
}
Client对应方法的定义
Read方法的定义
func (c *Client) Read() {
defer func() {
_ = c.Socket.Close()
Manager.UnRegister <- c
}()
for {
_, data, err := c.Socket.ReadMessage()
if err != nil {
logger.Error(err.Error())
break
}
var msg WsMessage
err = json.Unmarshal(data, &msg)
if err != nil {
logger.Error(err.Error())
break
}
switch msg.Type {
case 6:
resp, _ := json.Marshal(&WsMessage{Type: 6, Data: "pong"})
c.Start = time.Now()
c.Send <- resp
case 1:
count := len(Manager.Clients)
resp, _ := json.Marshal(&WsMessage{Type: 1, Data: count})
c.Send <- resp
case 2:
_data := ChatRecord()
resp, _ := json.Marshal(&WsMessage{Type: 2, Data: _data})
c.Send <- resp
case 3:
resp, _ := json.Marshal(&WsMessage{Type: 3, Data: msg.Data})
Manager.Broadcast <- resp
case 4:
c.Send <- []byte("回复消息")
}
}
}
该方法用来监听客户端发送过来消息,如果服务端在收到消息后只需给发送方回复就把回复消息写入c.Send 管道中,如果是群发消息则将回复消息写入Manager.Broadcast管道中。如果客户端离线了则将对应离线客户端写入Manager.UnRegister中。
Write 方法的定义
func (c *Client) Write() {
defer func() {
_ = c.Socket.Close()
Manager.UnRegister <- c
}()
for {
select {
case msg, ok := <-c.Send:
if !ok {
err := c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
logger.Error(err.Error())
return
}
return
}
err := c.Socket.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Error(err.Error())
return
}
}
}
}
当对应客户端的Send管道被写入消息是就会触发Write方法中服务端给该客户端推送消息的操作。如果客户端离线了则将对应离线客户端写入Manager.UnRegister中。
Check方法定义
func (c *Client) Check() {
for {
now := time.Now()
var duration = now.Sub(c.Start)
if duration >= c.ExpireTime {
Manager.UnRegister <- c
break
}
}
}
该方法用于实时监听对应客户端的在线状态是否过期,如果过期则将该客户端写入Manager.UnRegister管道中。
ClientManager 对应方法的定义
Start方法的定义
func (manager *ClientManager) Start() {
for {
select {
case conn := <-Manager.Register:
Manager.Clients[conn.ID] = conn
count := len(Manager.Clients)
Manager.InitSend(conn, count)
}
}
}
在每个客户端与服务端建立WebSocket连接后,Start函数则会记录下当前客户端的所有信息,并给客户端推送在线人数、历史消息记录消息。
InitSend方法定义
func (manager *ClientManager) InitSend(cur *Client, count int) {
resp, _ := json.Marshal(&WsMessage{Type: 1, Data: count})
Manager.Broadcast <- resp
_data := YouChatHistoryList()
resp, _ = json.Marshal(&WsMessage{Type: 2, Data: _data})
cur.Send <- resp
}
该方法用于给新上线的用户发送初始消息推送
BroadcastSend方法定义
func (manager *ClientManager) BroadcastSend() {
for {
select {
case msg := <-Manager.Broadcast:
for _, conn := range Manager.Clients {
conn.Send <- msg
}
}
}
}
每当管道Manager.Broadcast 接收到群发消息,就会将该消息写入到每个客户端的Send管道中,从而触发给每个客户端推送该消息的操作。
Quit 方法定义
func (manager *ClientManager) Quit() {
for {
select {
case conn := <-Manager.UnRegister:
delete(Manager.Clients, conn.ID)
resp, _ := json.Marshal(&WsMessage{Type: 1, Data: len(Manager.Clients)})
manager.Broadcast <- resp
}
}
}
每当管道Manager.UnRegister中被写入离线客户端,该客户端都会从Manager的在线客户端表中被删除,并将更新后的在线人数消息写入管道manager.Broadcast中以此告知所有在线客户端当前在线的用户数量。
Gin 对应的处理函数
func (mw *MyWebSocket) WebSocketHandle(ctx *gin.Context) {
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}).Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
http.NotFound(ctx.Writer, ctx.Request)
logger.Error(err.Error())
return
}
_session, _ := Store.Get(ctx.Request, "CurUser")
userid := _session.Values["a_userid"]
ip := ctx.ClientIP()
addr, err := common.GetIpAddressAndSource(ip)
if err != nil {
http.NotFound(ctx.Writer, ctx.Request)
logger.Error(err.Error())
return
}
ua := ctx.GetHeader("User-Agent")
id := ip + ua
idMd5 := fmt.Sprintf("%x", md5.Sum([]byte(id)))
client := &Client{
ID: idMd5,
Socket: conn, Send: make(chan []byte),
IpAddress: ip,
IpSource: addr.Data.Province,
UserId: userid,
Start: time.Now(),
ExpireTime: time.Minute * 1,
}
Manager.Register <- client
go client.Read()
go client.Write()
go client.Check()
}
启用
在你的web服务启动函数中以goroutine的方式分别调用ManagerClient的Start、Quit、BroadcastSend函数。
最终效果
|