目录结构:
websocket
├── mq
│ ├── redis.go
│ └── redis_test.go
└── main.go
main.go
package main
import (
"websocket/mq"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"log"
"net/http"
"time"
)
var upGrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func WebsocketDemo(c *gin.Context) {
ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
defer func(ws *websocket.Conn) {
err = ws.Close()
if err != nil {
log.Fatalln(err)
}
}(ws)
MsgHandler(c, ws)
}
func MsgHandler(c *gin.Context, ws *websocket.Conn) {
for {
msg, err := mq.Subscribe(c, mq.PublishKey)
if err != nil {
logrus.Errorf("Subscribe error: %s", err.Error())
}
tm := time.Now().Format("2006-01-02 15:04:05")
m := fmt.Sprintf("[ws][%s]:%s", tm, msg)
err = ws.WriteMessage(1, []byte(m))
if err != nil {
log.Fatalln(err)
}
}
}
func main() {
r := gin.Default()
r.GET("", WebsocketDemo)
err := r.Run(":8080")
if err != nil {
log.Fatalln(err)
}
}
redis.go
package mq
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/sirupsen/logrus"
)
var Rdb *redis.Client
const (
PublishKey = "websocket"
)
func init() {
Rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
}
func Publish(ctx context.Context, channel string, payload string) error {
var err error
logrus.Debugf("[Redis] publish [%s]: %s", channel, payload)
err = Rdb.Publish(ctx, channel, payload).Err()
if err != nil {
logrus.Errorf("[Redis] pulish error: %s", err.Error())
return err
}
return err
}
func Subscribe(ctx context.Context, channel string) (string, error) {
logrus.Debugf("[Redis] subscribe [%s]", channel)
sub := Rdb.Subscribe(ctx, channel)
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
logrus.Errorf("[Redis] subscribe [%s]", channel)
return "", err
}
logrus.Debugf("[Redis] subscribe [%s]: %s", channel, msg.String())
return msg.Payload, err
}
redis_test.go
package mq
import (
"context"
"github.com/sirupsen/logrus"
"testing"
"time"
)
var ctx context.Context
func init() {
ctx = context.Background()
}
func TestPublish(t *testing.T) {
msg := "当前时间: " + time.Now().Format("15:04:05")
logrus.Debugf("[publish] msg: %s", msg)
err := Publish(ctx, PublishKey, msg)
if err != nil {
logrus.Errorf("publish error: %s", err.Error())
}
}
使用说明:
启动main.go ,本地启动redis ,并且确保端口为6379 ,没有密码,若不一致,需要修改redis.go 中的连接,浏览器搜索websocket在线测试 ,需要支持内网测试,比如:http://websocket.jsonin.com/ ,如图:
|