Leaf的模块机制
一个Leaf开发的游戏服务器由多个模块组成,模块有一下特点: 1、每个模块运行在一个单独的goroutine中 2、模块间通过一套轻量的RPC机制通讯(leaf/charpc) 游戏服务器在启动时进行模块的注册,例如:
leaf.Run(
game.Module,
gate.Module,
login.Module,
)
这里按顺序注册了game、gate、login三个模块。每个模块都需要实现接口:
type Module interface {
OnInit()
OnDestroy()
Run(closeSig chan bool)
}
login模块的module:
var (
skeleton = base.NewSkeleton()
ChanRPC = skeleton.ChanRPCServer
)
type Module struct {
*module.Skeleton
}
func (m *Module) OnInit() {
m.Skeleton = skeleton
}
func (m *Module) OnDestroy() {
log.Release("login module destroy")
}
协议源码分析
一个请求协议设置和处理的流程。 leaf框架简单服务器:https://github.com/name5566/leaf/blob/master/TUTORIAL_ZH.md 按照链接的例子分析。 当我们定义一个协议的时候先要注册到信息处理器。
package msg
import (
"github.com/name5566/leaf/network/json"
)
var Processor = json.NewProcessor()
func init() {
Processor.Register(&Hello{})
}
type Hello struct {
Name string
}
客服端发送到游戏服务器的消息需要通过gate模块,简而言之,gate模块决定了某个消息具体交给内部的哪个模块来处理。这里我们将Hello消息路由到game模块中。
package gate
import (
"server/game"
"server/msg"
)
func init() {
msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)
}
消息路由到了game模块,那么就需要有一个处理器或者说方法来接受这个协议。
package internal
import (
"github.com/name5566/leaf/log"
"github.com/name5566/leaf/gate"
"reflect"
"server/msg"
)
func init() {
handler(&msg.Hello{}, handleHello)
}
func handler(m interface{}, h interface{}) {
skeleton.RegisterChanRPC(reflect.TypeOf(m), h)
}
func handleHello(args []interface{}) {
m := args[0].(*msg.Hello)
a := args[1].(gate.Agent)
log.Debug("hello %v", m.Name)
a.WriteMsg(&msg.Hello{
Name: "client",
})
}
客户端代码:
package main
import (
"encoding/binary"
"net"
)
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:3563")
if err != nil {
panic(err)
}
data := []byte(`{
"Hello": {
"Name": "leaf"
}
}`)
m := make([]byte, 2+len(data))
binary.BigEndian.PutUint16(m, uint16(len(data)))
copy(m[2:], data)
conn.Write(m)
}
通过源码分析一波整个流程。 设置消息到消息处理器
Processor.Register(&Hello{})
func (p *Processor) Register(msg interface{}) string {
msgType := reflect.TypeOf(msg)
if msgType == nil || msgType.Kind() != reflect.Ptr {
log.Fatal("json message pointer required")
}
msgID := msgType.Elem().Name()
if msgID == "" {
log.Fatal("unnamed json message")
}
if _, ok := p.msgInfo[msgID]; ok {
log.Fatal("message %v is already registered", msgID)
}
i := new(MsgInfo)
i.msgType = msgType
p.msgInfo[msgID] = i
return msgID
}
设置路由信息
msg.Processor.SetRouter(&msg.Hello{}, game.ChanRPC)
func (p *Processor) SetRouter(msg interface{}, msgRouter *chanrpc.Server) {
msgType := reflect.TypeOf(msg)
if msgType == nil || msgType.Kind() != reflect.Ptr {
log.Fatal("json message pointer required")
}
msgID := msgType.Elem().Name()
i, ok := p.msgInfo[msgID]
if !ok {
log.Fatal("message %v not registered", msgID)
}
i.msgRouter = msgRouter
}
设置协议对应的处理方法
handler(&msg.Hello{}, handleHello)
func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
if s.ChanRPCServer == nil {
panic("invalid ChanRPCServer")
}
s.server.Register(id, f)
}
func (s *Server) Register(id interface{}, f interface{}) {
switch f.(type) {
case func([]interface{}):
case func([]interface{}) interface{}:
case func([]interface{}) []interface{}:
default:
panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
}
if _, ok := s.functions[id]; ok {
panic(fmt.Sprintf("function id %v: already registered", id))
}
s.functions[id] = f
}
type Server struct {
functions map[interface{}]interface{}
ChanCall chan *CallInfo
}
服务器接收客服端的链接
func (server *TCPServer) run() {
...
for {
conn, err := server.ln.Accept()
...
tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
agent := server.NewAgent(tcpConn)
...
}
请求协议的接收
func (a *agent) Run() {
for {
data, err := a.conn.ReadMsg()
if err != nil {
log.Debug("read message: %v", err)
break
}
if a.gate.Processor != nil {
msg, err := a.gate.Processor.Unmarshal(data)
if err != nil {
log.Debug("unmarshal message error: %v", err)
break
}
err = a.gate.Processor.Route(msg, a)
if err != nil {
log.Debug("route message error: %v", err)
break
}
}
}
}
func (p *Processor) Route(msg interface{}, userData interface{}) error {
if msgRaw, ok := msg.(MsgRaw); ok {
i, ok := p.msgInfo[msgRaw.msgID]
if !ok {
return fmt.Errorf("message %v not registered", msgRaw.msgID)
}
if i.msgRawHandler != nil {
i.msgRawHandler([]interface{}{msgRaw.msgID, msgRaw.msgRawData, userData})
}
return nil
}
msgType := reflect.TypeOf(msg)
if msgType == nil || msgType.Kind() != reflect.Ptr {
return errors.New("json message pointer required")
}
msgID := msgType.Elem().Name()
i, ok := p.msgInfo[msgID]
if !ok {
return fmt.Errorf("message %v not registered", msgID)
}
if i.msgHandler != nil {
i.msgHandler([]interface{}{msg, userData})
}
if i.msgRouter != nil {
i.msgRouter.Go(msgType, msg, userData)
}
return nil
}
func (s *Server) Go(id interface{}, args ...interface{}) {
f := s.functions[id]
if f == nil {
return
}
defer func() {
recover()
}()
s.ChanCall <- &CallInfo{
f: f,
args: args,
}
}
方法的调用
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
上面这个函数最后就调用了handleHello()这个我们自己写的业务逻辑方法了,然后将返回值存入到chanRet中,等待其他协程消费返回给客户端。整个请求过程就结束了。
|