网络模块设计
对外接口
public static class NetManager {
public static void Connect(string ip, int port) {}
public static void Close() {}
public static void Send(MsgBase msg) {}
public static void Update() {}
public static void AddMsgListener(string msgName, MsgListener listener) {}
public static void AddEventListener(NetEvent netEvent, EventListener listener) {}
public enum NetEvent {
ConnectSucc = 1,
ConnectFail = 2,
Close = 3,
}
}
内部设计
public static class NetManager {
static List<MsgBase> msgList = new List<MsgBase>();
public static void Update() {
MsgUpdate();
PingUpdate();
}
}
framework/
NetManager.cs
ByteArray.cs
MsgBase.cs
proto/
BattleMsg
SysMsg
......
public static class NetManager {
static Socket socket;
static ByteArray readBuff;
static Queue<ByteArray> writeQueue;
}
网络事件
事件类型
public enum NetEvent {
ConnectSucc = 1,
ConnectFail = 2,
Close = 3,
}
监听列表
public delegate void EventListener(String err);
private static Dictionary<NetEvent, EventListener> eventListeners = new Dictionary<NetEvent, EventListener>();
public static void AddEventListener(NetEvent netEvent, EventListener listener) {
if(eventListeners.ContainsKey(netEvent)) {
eventListeners[netEvent] += listener;
} else {
eventListeners[netEvent] = listener;
}
}
public static void RemoveEventListener(NetEvent netEvent, EventListener listener){
if (eventListeners.ContainsKey(netEvent)){
eventListeners[netEvent] -= listener;
}
}
分发事件
private static void FireEvent(NetEvent netEvent, String err){
if(eventListeners.ContainsKey(netEvent)){
eventListeners[netEvent](err);
}
}
连接服务端
Connect
static bool isConnecting = false;
public static void Connect(string ip, int port) {
if(socket!=null && socket.Connected){
Console.WriteLine("Connect fail, already connected!");
return;
}
if(isConnecting){
Console.WriteLine("Connect fail, isConnecting");
return;
}
InitState();
socket.NoDelay = true;
isConnecting = true;
socket.BeginConnect(ip, port, ConnectCallback, socket);
}
private static void InitState(){
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
readBuff = new ByteArray();
writeQueue = new Queue<ByteArray>();
isConnecting = false;
}
ConnectCallback
private static void ConnectCallback(IAsyncResult ar) {
try{
Socket socket = (Socket) ar.AsyncState;
socket.EndConnect(ar);
Console.WriteLine("Socket Connect Succ ");
FireEvent(NetEvent.ConnectSucc,"");
}catch (SocketException ex){
Console.WriteLine("Socket Connect fail " + ex.ToString());
FireEvent(NetEvent.ConnectFail, ex.ToString());
}
isConnecting = false;
}
测试
public class Client {
public static void Start() {
NetManager.AddEventListener(NetManager.NetEvent.ConnectSucc, OnConnectSucc);
NetManager.AddEventListener(NetManager.NetEvent.ConnectFail, OnConnectFail);
NetManager.AddEventListener(NetManager.NetEvent.Close, OnConnectClose);
}
public static void Connect() {
NetManager.Connect("127.0.0.1", 8888);
}
public static void OnConnectSucc(string err){
Console.WriteLine("OnConnectSucc");
}
public static void OnConnectFail(string err){
Console.WriteLine("OnConnectFail " + err);
}
public static void OnConnectClose(string err){
Console.WriteLine("OnConnectClose");
}
public static void Main(string[] args) {
while(true) {
string sendStr = Console.ReadLine();
if(sendStr.Length==0)
continue;
else if(sendStr=="quit")
break;
else if(sendStr=="connect")
Connect;
}
}
}
关闭连接
isClosing
static bool isClosing = false;
private static void InitState() {
......
isClosing = false;
}
Close
public static void Close(){
if(socket==null || !socket.Connected){
return;
}
if(isConnecting){
return;
}
if(writeQueue.Count > 0) {
isClosing = true;
} else {
socket.Close();
FireEvent(NetEvent.Close, "");
}
}
测试
public static void Close() {
NetManager.Close();
}
public static void Main(string[] args) {
while(true) {
string sendStr = Console.ReadLine();
if(sendStr.Length==0)
continue;
else if(sendStr=="quit")
break;
else if(sendStr=="close")
Close();
}
}
Json协议
协议类
协议类的核心功能,将协议对象转换为二进制数据(编码)和将二进制数据转换为协议对象(解码)。 Json协议,将协议对象转换为类似{“x”:100,“y”:200,“z”:300}的字符串,和将相应字符串转换为协议对象。
使用JsonUtility
public class MsgMove {
public int x = 0;
public int y = 0;
public int z = 0;
}
MsgMove msgMove = new MsgMove();
msgMove.x = 100;
msgMove.y = *20;
string s = JsonUtility.ToJson(msgMove);
Console.WriteLine(s);
public class MsgAttack{
public string desc = "127.0.0.1:6543";
}
string s = "{\"desc\":\"\127.0.0.1:1289"}";
MsgAttack msgAttack = JsonUtility.FromJson(s, Type.GetType("MsgAttack"));
Console.WriteLine(msgAttack.desc);
string s = "{\"x\":\"hehe"}";
MsgMove msgMove = new MsgMove();
JsonUtility.FromJsonOverwrite(s, msgMove);
Console.WriteLine(msgMove.x);
协议格式
//16 = 2 + 7 + 7 消息长度(2字节,比如16)+协议名长度(2字节,比如7)+协议名(比如MsgMove)+协议体(比如{“x”=1})
协议文件
定义MsgBase类为了实现处理消息的统一接口(OnMove(MsgBase msgBase)),方便实现Send方法(send(MsgBase msgBase))。
public class MsgBase {
public string protoName = "";
}
public class MsgMove:MsgBase {
public MsgMove() {protoName = "MsgMove";}
public int x = 0;
public int y = 0;
public int z = 0;
}
public class MsgAttack:MsgBase {
public MsgAttack() {protoName = "MsgAttack";}
public string desc = "127.0.0.1:6543";
}
协议体的编码解码
using System;
using UnityEngine;
public class MsgBase{
public string protoName = "";
public static byte[] Encode(MsgBase msgBase){
string s = JsonUtility.ToJson(msgBase);
return System.Text.Encoding.UTF8.GetBytes(s);
}
public static MsgBase Decode(string protoName, byte[] bytes, int offset, int count){
string s = System.Text.Encoding.UTF8.GetString(bytes, offset, count);
MsgBase msgBase = (MsgBase)JsonUtility.FromJson(s, Type.GetType(protoName));
return msgBase;
}
}
//编码函数调用
MsgMove msgMove = new MsgMove();
msgMove.x = 100;
msgMove.y = -20;
byte[] bytes = MsgBase.Encode(msgMove);
string s = System.Text.Encoding.UTF8.GetString(bytes);
Console.WriteLine(s);
//解码函数调用
string s = "{\"protoName\":\"MsgMove\",\"x\":100,\"y\":-20,\"z\":0}";
byte[] bytes = System.Text.Encoding.UTF8.GetBytes(s);
MsgMove m = (MsgMove)MsgBase.Decode("MsgMove", bytes, 0, bytes.Length);
Console.WriteLine(m.x);
Console.WriteLine(m.y);
Console.WriteLine(m.z);
协议名的编码解码
using System;
using UnityEngine;
public class MsgBase{
public static byte[] EncodeName(MsgBase msgBase){
byte[] nameBytes = System.Text.Encoding.UTF8.GetBytes(msgBase.protoName);
Int16 len = (Int16)nameBytes.Length;
byte[] bytes = new byte[2+len];
bytes[0] = (byte)(len%256);
bytes[1] = (byte)(len/256);
Array.Copy(nameBytes, 0, bytes, 2, len);
return bytes;
}
public static string DecodeName(byte[] bytes, int offset, out int count){
count = 0;
if(offset + 2 > bytes.Length){
return "";
}
Int16 len = (Int16)((bytes[offset+1] << 8 )| bytes[offset] );
if(offset + 2 + len > bytes.Length){
return "";
}
count = 2+len;
string name = System.Text.Encoding.UTF8.GetString(bytes, offset+2, len);
return name;
}
}
//协议名的编码解码函数调用
MsgMove msgMove = new MsgMove();
byte[] bs = MsgBase.EncodeName(msgMove);
int count;
string name = MsgBase.DecodeName(bs, 0, out count);
Console.WriteLine(name);
Console.WriteLine(count);
发送数据
Send
public static class NetManager {
public static void Send(MsgBase msg) {
if(socket==null || !socket.Connected){
return;
}
if(isConnecting){
return;
}
if(isClosing){
return;
}
byte[] nameBytes = MsgBase.EncodeName(msg);
byte[] bodyBytes = MsgBase.Encode(msg);
int len = nameBytes.Length + bodyBytes.Length;
byte[] sendBytes = new byte[2+len];
sendBytes[0] = (byte)(len%256);
sendBytes[1] = (byte)(len/256);
Array.Copy(nameBytes, 0, sendBytes, 2, nameBytes.Length);
Array.Copy(bodyBytes, 0, sendBytes, 2+nameBytes.Length, bodyBytes.Length);
ByteArray ba = new ByteArray(sendBytes);
lock(writeQueue) {
writeQueue.Enqueue(ba);
if(writeQueue.Count==1) {
socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
}
}
}
}
SendCallback
public static class NetManager {
public static void SendCallback(IAsyncResult ar) {
Socket socket = (Socket) ar.AsyncState;
if(socket == null || !socket.Connected){
return;
}
int count = socket.EndSend(ar);
ByteArray ba;
lock(writeQueue) {
ba = writeQueue.First();
}
ba.readIdx += count;
if(ba.length == 0){
lock(writeQueue){
writeQueue.Dequeue();
if(writeQueue.Count>0) {
ba = writeQueue.First();
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
} else if(isClosing) {
socket.Close();
}
}
} else {
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
}
}
}
测试
public static void Send(MsgBase msg) {
NetManager.Send(msg);
}
public static void Main(string[] args) {
while(true) {
string sendStr = Console.ReadLine();
if(sendStr.Length==0)
continue;
else if(sendStr=="quit")
break;
else if(sendStr=="send") {
MsgMove msg = new MsgMove();
msg.x = 120;
msg.y = 123;
msg.z = -6;
NetManager.Send(msg);
}
}
}
消息事件
给不同的协议添加不同的回调方法。
public delegate void MsgListener(MsgBase msgBase);
private static Dictionary<string, MsgListener> msgListeners = new Dictionary<string, MsgListener>();
public static void AddMsgListener(string msgName, MsgListener listener) {
if (msgListeners.ContainsKey(msgName)) {
msgListeners[msgName] += listener;
} else {
msgListeners[msgName] = listener;
}
}
public static void RemoveMsgListener(string msgName, MsgListener listener){
if (msgListeners.ContainsKey(msgName)){
msgListeners[msgName] -= listener;
}
}
private static void FireMsg(string msgName, MsgBase msgBase) {
if(msgListeners.ContainsKey(msgName)) {
msgListeners[msgName](msgBase);
}
}
NetManager.AddMsgListener("MsgMove", OnMsgMove);
public void OnMsgMove (MsgBase msgBase) {
MsgMove msg = (MsgMove)msgBase;
Console.WriteLine("OnMsgMove msg.x = " + msg.x);
Console.WriteLine("OnMsgMove msg.y = " + msg.y);
Console.WriteLine("OnMsgMove msg.z = " + msg.z);
}
接收数据
回调函数ReceiveCallback将消息存放到消息队列msgList中,主线程Update读取消息队列,一条条处理。
- (1)每次Update处理多条数据,每一帧执行一次Update,处理多条数据。
- (2)添加粘包半包、大小端判断等处理。
- (3)使用Json协议。
新成员
static List<MsgBase> msgList = new List<MsgBase>();
static int msgCount = 0;
readonly static int MAX_MESSAGE_FIRE = 10;
private static void InitState(){
msgList = new List<MsgBase>();
msgCount = 0;
}
ConnectCallback
private static void ConnectCallback(IAsyncResult ar) {
try {
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx, readBuff.remain, 0, ReceiveCallback, socket);
} catch (SocketException ex) {
}
}
ReceiveCallback
public static void ReceiveCallback(IAsyncResult ar){
try {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndReceive(ar);
if(count==0) {
Close();
return;
}
readBuff.writeIdx += count;
OnReceiveData();
if(readBuff.remain < 8){
readBuff.ReSize(readBuff.length*2);
}
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx, readBuff.remain, 0, ReceiveCallback, socket);
} catch (SocketException ex){
Console.WriteLine("Socket Receive fail" + ex.ToString());
}
}
OnReceiveData
public static void OnReceiveData(){
if(readBuff.length <= 2) {
return;
}
int readIdx = readBuff.readIdx;
byte[] bytes = readBuff.bytes;
Int16 bodyLength = (Int16)((bytes[readIdx+1] << 8 )| bytes[readIdx]);
if(readBuff.length < bodyLength+2)
return;
readBuff.readIdx += 2;
int nameCount = 0;
string protoName = MsgBase.DecodeName(readBuff.bytes, readBuff.readIdx, out nameCount);
if(protoName == ""){
Console.WriteLine("OnReceiveData MsgBase.DecodeName fail");
return;
}
readBuff.readIdx += nameCount;
int bodyCount = bodyLength - nameCount;
MsgBase msgBase = MsgBase.Decode(protoName, readBuff.bytes, readBuff.readIdx, bodyCount);
readBuff.readIdx += bodyCount;
readBuff.CheckAndMoveBytes();
lock(msgList){
msgList.Add(msgBase);
msgCount++;
}
if(readBuff.length > 2){
OnReceiveData();
}
}
Update
NetManage.Update实现每帧处理MAX_MESSAGE_FIRE(10)条消息。 (1)根据msgCount是否为0判断是否需要处理消息。 (2)循环中读取多条消息并处理。
public static void Update(){
MsgUpdate();
}
public static void MsgUpdate(){
if(msgCount == 0){
return;
}
for(int i = 0; i< MAX_MESSAGE_FIRE; i++){
MsgBase msgBase = null;
lock(msgList){
if(msgList.Count > 0){
msgBase = msgList[0];
msgList.RemoveAt(0);
msgCount--;
}
}
if(msgBase != null){
FireMsg(msgBase.protoName, msgBase);
} else {
break;
}
}
}
测试
void Start(){
NetManager.AddEventListener(NetManager.NetEvent.ConnectSucc, OnConnectSucc);
NetManager.AddEventListener(NetManager.NetEvent.ConnectFail, OnConnectFail);
NetManager.AddEventListener(NetManager.NetEvent.Close, OnConnectClose);
NetManager.AddMsgListener("MsgMove", OnMsgMove);
}
public void OnMsgMove (MsgBase msgBase) {
MsgMove msg = (MsgMove)msgBase;
Console.WriteLine("OnMsgMove msg.x = " + msg.x);
Console.WriteLine("OnMsgMove msg.y = " + msg.y);
Console.WriteLine("OnMsgMove msg.z = " + msg.z);
}
void Update() {
NetManager.Update();
}
心跳机制
客户端定时(如30秒)向服务端发送PING协议,服务端收到后回应PONG协议。 服务端长时间(如120秒)未收到PING协议,很可能是网络不通畅或客户端挂掉,服务端可以释放Socket资源。
PING和PONG协议
public class MsgPing:MsgBase {
public MsgPing() {protoName = "MsgPing";}
}
public class MsgPong:MsgBase {
public MsgPong() {protoName = "MsgPong";}
}
成员变量
public static bool isUsePing = true;
public static int pingInterval = 30;
static float lastPingTime = 0;
static float lastPongTime = 0;
private static void InitState(){
lastPingTime = Time.time;
lastPongTime = Time.time;
}
发送PING协议
private static void PingUpdate(){
if(!isUsePing){
return;
}
if(Time.time - lastPingTime > pingInterval){
MsgPing msgPing = new MsgPing();
Send(msgPing);
lastPingTime = Time.time;
}
if(Time.time - lastPongTime > pingInterval*4){
Close();
}
}
public static void Update(){
MsgUpdate();
PingUpdate();
}
监听PONG协议
private static void InitState(){
if(!msgListeners.ContainsKey("MsgPong")){
AddMsgListener("MsgPong", OnMsgPong);
}
}
private static void OnMsgPong(MsgBase msgBase){
lastPongTime = Time.time;
}
完整代码
proto
proto/
MsgBase.cs
BattleMsg.cs
SysMsg.cs
csc proto\*.cs -t:library -out:proto.dll
MsgBase.cs
using System;
using System.Linq;
using System.Web.Script.Serialization;
public class MsgBase{
public string protoName = "null";
static JavaScriptSerializer Js = new JavaScriptSerializer();
public static byte[] Encode(MsgBase msgBase){
string s = Js.Serialize(msgBase);
return System.Text.Encoding.UTF8.GetBytes(s);
}
public static MsgBase Decode(string protoName, byte[] bytes, int offset, int count){
string s = System.Text.Encoding.UTF8.GetString(bytes, offset, count);
MsgBase msgBase = (MsgBase)Js.Deserialize(s, Type.GetType(protoName));
return msgBase;
}
public static byte[] EncodeName(MsgBase msgBase){
byte[] nameBytes = System.Text.Encoding.UTF8.GetBytes(msgBase.protoName);
Int16 len = (Int16)nameBytes.Length;
byte[] bytes = new byte[2+len];
bytes[0] = (byte)(len%256);
bytes[1] = (byte)(len/256);
Array.Copy(nameBytes, 0, bytes, 2, len);
return bytes;
}
public static string DecodeName(byte[] bytes, int offset, out int count){
count = 0;
if(offset + 2 > bytes.Length){
return "";
}
Int16 len = (Int16)((bytes[offset+1] << 8 )| bytes[offset] );
if(offset + 2 + len > bytes.Length){
return "";
}
count = 2+len;
string name = System.Text.Encoding.UTF8.GetString(bytes, offset+2, len);
return name;
}
}
BattleMsg.cs
public class MsgMove:MsgBase {
public MsgMove() {protoName = "MsgMove";}
public int x = 0;
public int y = 0;
public int z = 0;
}
public class MsgAttack:MsgBase {
public MsgAttack() {protoName = "MsgAttack";}
public string desc = "127.0.0.1:6543";
}
SysMsg.cs
public class MsgPing:MsgBase {
public MsgPing() {protoName = "MsgPing";}
}
public class MsgPong:MsgBase {
public MsgPong() {protoName = "MsgPong";}
}
framework
framework/
ByteArray.cs
NetManager.cs
csc framework\*.cs -t:library -out:framework.dll -reference:proto.dll
ByteArray.cs
using System;
public class ByteArray {
//默认大小
const int DEFAULT_SIZE = 1024;
//缓冲区
public byte[] bytes;
//读写位置
public int readIdx = 0;
public int writeIdx = 0;
//容量
private int capacity = 0;
//剩余空间
public int remain { get { return capacity-writeIdx; }}
//数据长度
public int length { get { return writeIdx-readIdx; }}
//构造函数
public ByteArray(int size = DEFAULT_SIZE){
bytes = new byte[size];
capacity = size;
readIdx = 0;
writeIdx = 0;
}
//构造函数
public ByteArray(byte[] defaultBytes){
bytes = defaultBytes;
capacity = defaultBytes.Length;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
//重设尺寸
public void ReSize(int size){
if(size < capacity)
return;
capacity = 1;
while(capacity<size)
capacity *= 2;
byte[] newBytes = new byte[capacity];
Array.Copy(bytes, readIdx, newBytes, 0, length);
bytes = newBytes;
writeIdx = length;
readIdx = 0;
}
//写入数据
public int Write(byte[] bs, int offset, int count){
ReSize(length + count);//容量不够才会扩容
Array.Copy(bs, offset, bytes, writeIdx, count);
writeIdx += count;
return count;
}
//读取数据
public int Read(byte[] bs, int offset, int count){
count = Math.Min(count, length);
Array.Copy(bytes, 0, bs, offset, count);
readIdx += count;
CheckAndMoveBytes();
return count;
}
//检查并移动数据
public void CheckAndMoveBytes(){
if(length < 8){
MoveBytes();
}
}
//移动数据
public void MoveBytes(){
if(length>0) {
Array.Copy(bytes, readIdx, bytes, 0, length);
}
writeIdx = length;
readIdx = 0;
}
//读取Int16
public Int16 ReadInt16(){
if(length < 2)
return 0;
Int16 ret = BitConverter.ToInt16(bytes, readIdx);
readIdx += 2;
CheckAndMoveBytes();
return ret;
}
//读取Int32
public Int32 ReadInt32(){
if(length < 4)
return 0;
Int32 ret = BitConverter.ToInt32(bytes, readIdx);
readIdx += 4;
CheckAndMoveBytes();
return ret;
}
//打印缓冲区
public override string ToString(){
return BitConverter.ToString(bytes, readIdx, length);//所有可读字节
}
//打印调试信息
public string Debug(){
return string.Format("readIdx({0}) writeIdx({1}) bytes({2})",//读序号,写序号,所有字节
readIdx,
writeIdx,
BitConverter.ToString(bytes, 0, capacity)
);
}
}
NetManager.cs
using System.Collections;
using System.Collections.Generic;
using System.Net.Sockets;
using System;
using System.Linq;
public static class NetManager {
//定义套接字
static Socket socket;
//接收缓冲区
static ByteArray readBuff;
//写入队列
static Queue<ByteArray> writeQueue;
//是否正在连接
static bool isConnecting = false;
//是否正在关闭
static bool isClosing = false;
//消息列表
static List<MsgBase> msgList = new List<MsgBase>();
//消息列表长度
static int msgCount = 0;
//每一次Update处理的消息量
readonly static int MAX_MESSAGE_FIRE = 10;
//是否启用心跳
public static bool isUsePing = true;
//心跳间隔时间
public static int pingInterval = 30;
//上一次发送PING的时间
static DateTime lastPingTime = DateTime.Now;
//上一次收到PONG的时间
static DateTime lastPongTime = DateTime.Now;
//事件
public enum NetEvent {
ConnectSucc = 1,
ConnectFail = 2,
Close = 3,
}
//事件委托类型
public delegate void EventListener(String err);
//事件监听列表
private static Dictionary<NetEvent, EventListener> eventListeners = new Dictionary<NetEvent, EventListener>();
//添加事件监听
public static void AddEventListener(NetEvent netEvent, EventListener listener) {
if(eventListeners.ContainsKey(netEvent)) {//添加事件
eventListeners[netEvent] += listener;
} else {
eventListeners[netEvent] = listener;//新增事件
}
}
//删除事件监听
public static void RemoveEventListener(NetEvent netEvent, EventListener listener) {
if(eventListeners.ContainsKey(netEvent)) {
eventListeners[netEvent] -= listener;
}
}
//分发事件
private static void FireEvent(NetEvent netEvent, String err) {
if(eventListeners.ContainsKey(netEvent)) {
eventListeners[netEvent](err);
}
}
//消息委托类型
public delegate void MsgListener(MsgBase msgBase);
//消息监听列表
private static Dictionary<string, MsgListener> msgListeners = new Dictionary<string, MsgListener>();
//添加消息监听
public static void AddMsgListener(string msgName, MsgListener listener) {
if(msgListeners.ContainsKey(msgName)) {
msgListeners[msgName] += listener;//添加
} else {
msgListeners[msgName] = listener;//新增
}
}
//删除消息监听
public static void RemoveMsgListener(string msgName, MsgListener listener) {
if(msgListeners.ContainsKey(msgName)) {
msgListeners[msgName] -= listener;
}
}
//分发消息
private static void FireMsg(string msgName, MsgBase msgBase) {
if(msgListeners.ContainsKey(msgName)) {
msgListeners[msgName](msgBase);
}
}
//连接
public static void Connect(string ip, int port) {
//状态判断
if(socket!=null && socket.Connected){
Console.WriteLine("Connect fail, already connected!");
return;
}
if(isConnecting){
Console.WriteLine("Connect fail, isConnecting");
return;
}
//初始化成员
InitState();
//参数设置
socket.NoDelay = true;
//Connect
isConnecting = true;
socket.BeginConnect(ip, port, ConnectCallback, socket);
}
//初始化状态
private static void InitState(){
//Socket
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
//接收缓冲区
readBuff = new ByteArray();
//写入队列
writeQueue = new Queue<ByteArray>();
//是否正在连接
isConnecting = false;
//是否正在关闭
isClosing = false;
//消息列表
msgList = new List<MsgBase>();
//消息列表长度
msgCount = 0;
//上一次发送PING的时间
lastPingTime = DateTime.Now;
//上一次收到PONG的时间
lastPongTime = DateTime.Now;
//监听PONG协议
if(!msgListeners.ContainsKey("MsgPong")){
AddMsgListener("MsgPong", OnMsgPong);
}
}
//Connect回调
private static void ConnectCallback(IAsyncResult ar){
try{
Socket socket = (Socket) ar.AsyncState;
socket.EndConnect(ar);
Console.WriteLine("Socket Connect Succ ");
FireEvent(NetEvent.ConnectSucc,"");
//开始接收
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx, readBuff.remain, 0, ReceiveCallback, socket);
} catch (SocketException ex){
Console.WriteLine("Socket Connect fail " + ex.ToString());
FireEvent(NetEvent.ConnectFail, ex.ToString());
}
isConnecting = false;
}
//关闭连接
public static void Close(){
//状态判断
if(socket==null || !socket.Connected){
return;
}
if(isConnecting){
return;
}
if(writeQueue.Count > 0){//还有数据在发送
isClosing = true;
} else {//没有数据在发送
socket.Close();
FireEvent(NetEvent.Close, "");
}
}
//发送数据
public static void Send(MsgBase msg) {
//状态判断
if(socket==null || !socket.Connected){
return;
}
if(isConnecting){
return;
}
if(isClosing){
return;
}
//数据编码
byte[] nameBytes = MsgBase.EncodeName(msg);
byte[] bodyBytes = MsgBase.Encode(msg);
int len = nameBytes.Length + bodyBytes.Length;
byte[] sendBytes = new byte[2+len];
//组装长度
sendBytes[0] = (byte)(len%256);
sendBytes[1] = (byte)(len/256);
//组装名字
Array.Copy(nameBytes, 0, sendBytes, 2, nameBytes.Length);
//组装消息体
Array.Copy(bodyBytes, 0, sendBytes, 2+nameBytes.Length, bodyBytes.Length);
//写入队列
ByteArray ba = new ByteArray(sendBytes);
lock(writeQueue){
writeQueue.Enqueue(ba);
//send
if(writeQueue.Count == 1){//writeQueue的长度
socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
}
}
}
//Send回调
public static void SendCallback(IAsyncResult ar){
//获取state、EndSend的处理
Socket socket = (Socket) ar.AsyncState;
//状态判断
if(socket == null || !socket.Connected){
return;
}
//EndSend
int count = socket.EndSend(ar);
ByteArray ba;
lock(writeQueue){
ba = writeQueue.First();//调用BeginSend时Queue中至少存在一个元素
}
//完整发送
ba.readIdx += count;
if(ba.length == 0){//队列首端bytes已发送完毕
lock(writeQueue){
writeQueue.Dequeue();//删除首端
if(writeQueue.Count>0) {//队列存在元素时继续发送
ba = writeQueue.First();
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
} else if(isClosing) {//正在关闭
socket.Close();
}
}
} else {//继续发送Queue队列中ByteArray剩余字节
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
}
}
//Receive回调
public static void ReceiveCallback(IAsyncResult ar){
try {
Socket socket = (Socket) ar.AsyncState;
//获取接收数据长度
int count = socket.EndReceive(ar);
readBuff.writeIdx += count;
//处理二进制消息
OnReceiveData();
//继续接收数据
if(readBuff.remain < 8){
//readBuff.MoveBytes();//多余
readBuff.ReSize(readBuff.length*2);
}
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx, readBuff.remain, 0, ReceiveCallback, socket);
} catch (SocketException ex){
Console.WriteLine("Socket Receive fail" + ex.ToString());
}
}
//数据处理
public static void OnReceiveData(){
//消息长度
if(readBuff.length <= 2) {
return;
}
//获取消息体长度
int readIdx = readBuff.readIdx;
byte[] bytes = readBuff.bytes;
Int16 bodyLength = (Int16)((bytes[readIdx+1] << 8 )| bytes[readIdx]);
if(readBuff.length < bodyLength+2)
return;
readBuff.readIdx += 2;
//解析协议名
int nameCount = 0;
string protoName = MsgBase.DecodeName(readBuff.bytes, readBuff.readIdx, out nameCount);
if(protoName == ""){
Console.WriteLine("OnReceiveData MsgBase.DecodeName fail");
return;
}
readBuff.readIdx += nameCount;
//解析协议体
int bodyCount = bodyLength - nameCount;
MsgBase msgBase = MsgBase.Decode(protoName, readBuff.bytes, readBuff.readIdx, bodyCount);
readBuff.readIdx += bodyCount;
readBuff.CheckAndMoveBytes();
//添加到消息队列
lock(msgList){
msgList.Add(msgBase);
msgCount++;
}
//继续读取消息
if(readBuff.length > 2){
OnReceiveData();
}
}
//Update
public static void Update(){
MsgUpdate();
PingUpdate();
}
//更新消息
public static void MsgUpdate(){
//初步判断,提升效率
if(msgCount == 0){
return;
}
//重复处理消息
for(int i = 0; i< MAX_MESSAGE_FIRE; i++){
//获取第一条消息
MsgBase msgBase = null;
lock(msgList){
if(msgList.Count > 0){
msgBase = msgList[0];
msgList.RemoveAt(0);
msgCount--;
}
}
if(msgBase != null){
FireMsg(msgBase.protoName, msgBase);//分发消息
}else{//没有消息了
break;
}
}
}
//发送PING协议
private static void PingUpdate(){
//是否启用
if(!isUsePing){
return;
}
//发送PING
if((DateTime.Now - lastPingTime).Seconds > pingInterval){
MsgPing msgPing = new MsgPing();
Send(msgPing);
lastPingTime = DateTime.Now;
}
//检测PONG时间
if((DateTime.Now - lastPongTime).Seconds > pingInterval*4){
Close();
}
}
//监听PONG协议
private static void OnMsgPong(MsgBase msgBase){
lastPongTime = DateTime.Now;
}
}
Server
Server.cs
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;
using System.Linq;
class ClientState {
public Socket socket;
public ByteArray readBuff = new ByteArray();
}
class Server {
static Socket listenfd;
static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();
public static void Main(string[] args) {
listenfd = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
listenfd.Bind(ipEp);
listenfd.Listen(0);
Console.WriteLine("[服务器]启动成功");
while(true){
if(listenfd.Poll(0, SelectMode.SelectRead)){
ReadListenfd(listenfd);
}
foreach(ClientState s in clients.Values){
Socket clientfd = s.socket;
if(clientfd.Poll(0, SelectMode.SelectRead)){
if(!ReadClientfd(clientfd)){
break;
}
}
}
System.Threading.Thread.Sleep(1);
}
}
public static void ReadListenfd(Socket listenfd) {
Console.WriteLine("[Accept]");
Socket clientfd = listenfd.Accept();
ClientState state = new ClientState();
state.socket = clientfd;
clients.Add(clientfd, state);
}
public static bool ReadClientfd(Socket clientfd) {
ClientState state = clients[clientfd];
ByteArray ba = state.readBuff;
if(ba.remain<8){
ba.ReSize(ba.length*2);
}
int count = clientfd.Receive(ba.bytes, ba.writeIdx, ba.remain, 0);
if(count == 0){
clientfd.Close();
clients.Remove(clientfd);
Console.WriteLine("Socket Close");
return false;
}
ba.writeIdx += count;
if (ba.length <= 2){
return true;
}
Int16 bodyLength = BitConverter.ToInt16(ba.bytes, 0);
if(ba.length < 2 + bodyLength)
return true;
int nameCount = 0;
string protoName = MsgBase.DecodeName(ba.bytes, 2, out nameCount);
if(protoName == ""){
Console.WriteLine("MsgBase.DecodeName fail");
return true;
}
Console.WriteLine("[ProtoName]" + protoName);
int bodyCount = bodyLength - nameCount;
MsgBase msgBase = MsgBase.Decode(protoName, ba.bytes, 2+nameCount, bodyCount);
Console.WriteLine("[MsgBase]" + msgBase);
byte[] sendBytes = new byte[ba.length];
Array.Copy(ba.bytes, 0, sendBytes, 0, ba.length);
ba.readIdx = 0;
ba.writeIdx = 0;
foreach(ClientState cs in clients.Values) {
cs.socket.Send(sendBytes);
}
return true;
}
}
csc Server.cs -reference:proto.dll,framework.dll
Server
Client
Client.cs
using System.Collections.Generic;
using System.Net.Sockets;
using System;
using System.Linq;
public class Client {
static void Start(){
NetManager.AddEventListener(NetManager.NetEvent.ConnectSucc, OnConnectSucc);
NetManager.AddEventListener(NetManager.NetEvent.ConnectFail, OnConnectFail);
NetManager.AddEventListener(NetManager.NetEvent.Close, OnConnectClose);
NetManager.AddMsgListener("MsgMove", OnMsgMove);
}
public static void OnMsgMove(MsgBase msgBase) {
MsgMove msg = (MsgMove)msgBase;
Console.WriteLine("OnMsgMove msg.x = " + msg.x);
Console.WriteLine("OnMsgMove msg.y = " + msg.y);
Console.WriteLine("OnMsgMove msg.z = " + msg.z);
}
public static void OnConnectClick() {
NetManager.Connect("127.0.0.1", 8888);
}
public static void OnCloseClick() {
NetManager.Close();
}
public static void OnMoveClick() {
MsgMove msg = new MsgMove();
msg.x = 120;
msg.y = 123;
msg.z = -6;
NetManager.Send(msg);
}
static void OnConnectSucc(string err){
Console.WriteLine("OnConnectSucc");
}
static void OnConnectFail(string err){
Console.WriteLine("OnConnectFail " + err);
}
static void OnConnectClose(string err){
Console.WriteLine("OnConnectClose");
}
static void Update() {
NetManager.Update();
}
public static void Main(string[] args) {
Start();
while(true) {
string sendStr = Console.ReadLine();
if(sendStr.Length==0)
continue;
else if(sendStr=="quit")
break;
else if(sendStr=="connect")
OnConnectClick();
else if(sendStr=="close")
OnCloseClick();
else if(sendStr=="move")
OnMoveClick();
Update();
}
OnCloseClick();
}
}
csc Client.cs -reference:proto.dll,framework.dll
Client
Protobuf协议
什么是Protobuf
Protobuf是谷歌发布的一套协议规范,规定了一系列的编码和解码方法,编码后数据量较小,可以节省网络宽带。
编写proto文件
BattleMsg.proto
message MsgMove{
optional int32 x = 1;
optional int32 y = 2;
optional int32 z = 3;
}
message MsgAttack{
optional string desc = 1;
}
SysMsg.proto
message MsgPing{
}
message MsgPong{
}
生成协议类
Protobuf-net库,提供将Protobuf描述文件转换为协议类的工具,实现协议对象编码解码的方法。
protobuf-net下载地址: https://github.com/mgravell/protobuf-net https://code.google.com/p/protobuf-net
protogen.exe -i:proto\BattleMsg.proto -o:cs\BattleMsg.cs
protogen.exe -i:proto\SysMsg.proto -o:cs\SysMsg.cs
pause
导入protobuf-net.dll
复制protobuf-net.dll库文件
编码解码
- 编码方法
using System;
using System.Collections;
using System.Collections.Generic;
using proto.BattleMsg;
public class TestProtobuf {
public static byte[] Encode(ProtoBuf.IExtensible msgBase) {
using(var memory = new System.IO.MemoryStream()) {
ProtoBuf.Serializer.Serialize(memory, msgBase);
return memory.ToArray();
}
}
static void Start() {
MsgMove msgMove = new MsgMove();
msgMove.x = 214;
byte[] bs = Encode(msgMove);
Console.WriteLine(System.BitConverter.ToString(bs));
}
public static void Main(string[] args) {
Start();
}
}
BattleMsg.cs由protogen.exe根据BattleMsg.proto描述文件生成。
csc TestProtobuf.cs BattleMsg.cs -reference:protobuf-net.dll
- 获取协议名字
MsgMove msgMove = new MsgMove();
//获取协议名,proto.BattleMsg.MsgMove
Console.WriteLine(msgMove.ToString());
- 解码
//解码
public static ProtoBuf.IExtensible Decode(string protoName, byte[] bytes, int offset, int count) {
using(var memory = new System.IO.MemoryStream(bytes, offset, count)) {
System.Type t = System.Type.GetType(protoName);
return (ProtoBuf.IExtensible)ProtoBuf.Serializer.NonGeneric.Deserialize(t, memory);
}
}
|