TCP数据流
系统缓冲流
收到对端数据时,操作系统会将数据放入Socket的接收缓冲区中。
Socket的Receive方法只是提取接收缓冲区数据,接收缓冲区数据为空时,Receive会阻塞,直到里面有数据。
Socket的Send方法只是将数据写入到发送缓冲区,具体发送由操作系统负责。发送缓冲区数据满了,Send会阻塞。
粘包半包现象
发送端快速发送多条数据,接收端没有及时调用Receive,数据就会在接收端缓冲区累积。
发送端发送的数据还有可能被拆分。
接收和发送并不是一一对应。
解决粘包问题
长度信息法
长度信息+数据包。接收一次完整数据后(长度信息+数据包),再接收下一次数据。
固定长度法
收发固定长度数据,不足增加填充字符。
结束符号法
规定结束符号,作为消息分隔符。
解决粘包代码
发送数据
public void Send(string sendStr) {
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
socket.Send(sendBytes);
}
接收数据
byte[] readBuff = new byte[1024];
int buffCount = 0;
socket.BeginReceive(
readBuff,
buffCount,
1024-buffCount,
0,
ReceiveCallback,
socket
);
public void ReceiveCallback(IAsyncResult ar){
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndReceive(ar);
buffCount += count;
}
处理数据
- 缓冲区长度小于等于2
public void OnReceiveData(){
if(buffCount <= 2)
return;
}
- 缓冲区长度小于一条完整消息(长度+数据包)
public void OnReceiveData(){
if(buffCount <= 2)
return;
Int16 bodyLength = BitCoverter.ToInt16(readBuff, 0);
if(buffCount <= 2+bodyLength)
return;
}
- 缓冲区长度大于等于一条完整消息(长度+数据包)
public static void Copy(
Array sourceArray,
long sourceIndex,
Array destinationArray,
long destinationIndex,
long length
)
public void OnReceiveData(){
if(buffCount <= 2)
return;
Int16 bodyLength = BitCoverter.ToInt16(readBuff, 0);
if(buffCount <= 2+bodyLength)
return;
string s = System.Text.Encoding.Default.GetString(readBuff, 2, buffCount);
int start = 2+bodyLength;
int count = buffCount-start;
Array.Copy(readBuff, start, readBuff, 0, count);
buffCount -= start;
if(readBuff.length > 2)
OnReceiveData();
}
完整示例
AsyncEchoServer.cs
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;
namespace AsyncEchoServer {
class ClientState {
public Socket socket = null;
public byte[] readBuff = new byte[1024];
}
class MainClass {
static Socket listenfd;
static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();
public static void SendCallback(IAsyncResult ar) {
try {
Socket clientfd = (Socket) ar.AsyncState;
int count = clientfd.EndSend(ar);
Console.WriteLine("Socket Send Success");
} catch(SocketException ex) {
Console.WriteLine("Socket Send Fail" + ex.ToString());
}
}
public static void ReceiveCallback(IAsyncResult ar) {
try {
ClientState state = (ClientState) ar.AsyncState;
Socket clientfd = state.socket;
int count = clientfd.EndReceive(ar);
Console.WriteLine("Socket Receive Success");
if(count==0) {
clientfd.Close();
clients.Remove(clientfd);
Console.WriteLine("Socket Close");
return;
}
string recvStr = System.Text.Encoding.Default.GetString(state.readBuff, 2, count-2);
Console.WriteLine("Receive"+recvStr);
byte[] sendBytes = new byte[count];
Array.Copy(state.readBuff, 0, sendBytes, 0, count);
clientfd.BeginSend(sendBytes, 0, count, 0, SendCallback, clientfd);
clientfd.BeginReceive(state.readBuff, 0, 1024, 0, ReceiveCallback, state);
} catch(SocketException ex) {
Console.WriteLine("Socket Receive Fail" + ex.ToString());
}
}
public static void AcceptCallback(IAsyncResult ar) {
try {
Console.WriteLine("[服务器]Accept");
Socket listenfd = (Socket) ar.AsyncState;
Socket clientfd = listenfd.EndAccept(ar);
Console.WriteLine("Socket Accept Success");
ClientState state = new ClientState();
state.socket = clientfd;
clients.Add(clientfd, state);
clientfd.BeginReceive(state.readBuff, 0, 1024, 0, ReceiveCallback, state);
listenfd.BeginAccept(AcceptCallback, listenfd);
} catch(SocketException ex) {
Console.WriteLine("Socket Accept Fail" + ex.ToString());
}
}
public static void Main(string[] args) {
listenfd = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
listenfd.Bind(ipEp);
listenfd.Listen(0);
Console.WriteLine("[服务器]启动成功");
listenfd.BeginAccept(AcceptCallback, listenfd);
Console.ReadLine();
listenfd.Close();
}
}
}
csc AsyncEchoServer.cs
AsyncEchoServer
AsyncEchoClient.cs
using System;
using System.Net.Sockets;
using System.Linq;
using System.Threading;
namespace AsyncEchoClient {
class MainClass {
static Socket socket;
static byte[] readBuff = new byte[1024];
static int buffCount = 0;
static string recvStr;
public static void Connect() {
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.BeginConnect("127.0.0.1", 8888, ConnectCallback, socket);
}
public static void ConnectCallback(IAsyncResult ar) {
try {
Socket socket = (Socket) ar.AsyncState;
socket.EndConnect(ar);
socket.BeginReceive(readBuff, buffCount, 1024-buffCount, 0, ReceiveCallback, socket);
} catch(SocketException ex) {
Console.WriteLine("Socket Connect Fail" + ex.ToString());
}
}
public static void ReceiveCallback(IAsyncResult ar) {
try {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndReceive(ar);
buffCount += count;
OnReceiveData();
Thread.Sleep(1000);
socket.BeginReceive(readBuff, buffCount, 1024-buffCount, 0, ReceiveCallback, socket);
} catch(SocketException ex) {
Console.WriteLine("Socket Receive Fail" + ex.ToString());
}
}
public static void OnReceiveData() {
if(buffCount <= 2)
return;
Int16 bodyLength = BitConverter.ToInt16(readBuff, 0);
if(buffCount <= 2+bodyLength)
return;
string s = System.Text.Encoding.Default.GetString(readBuff, 2, buffCount);
recvStr = s + "\n" + recvStr;
int start = 2+bodyLength;
int count = buffCount-start;
Array.Copy(readBuff, start, readBuff, 0, count);
buffCount -= start;
OnReceiveData();
}
public static void Send(string sendStr) {
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
}
public static void SendCallback(IAsyncResult ar) {
try {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndSend(ar);
} catch(SocketException ex) {
Console.WriteLine("Socket Send Fail" + ex.ToString());
}
}
public static void Main(string[] args) {
Connect();
while(true) {
string sendStr = Console.ReadLine();
if(sendStr.Length==0)
continue;
else if(sendStr=="quit")
break;
Send(sendStr);
Console.Clear();
Console.WriteLine(recvStr);
}
socket.Close();
}
}
}
csc AsyncEchoClient.cs
AsyncEchoClient
大端小端
public static short ToInt16(byte[] value, int startIndex) {
if(startIndex%2==0){
return *((short*)pbyte);
}else{
if(IsLittleEndian) {
return (short)((*pbyte)|(*(pbyte+1)<<8));
} else {
return (short)((*pbyte<<8)|(*(pbyte+1)));
}
}
}
区分
数据0x0102
Reverse函数
if(!BitConverter.IsLittleEndian) {
lenBytes = lenBytes.Reverse();
}
手动还原数值
Int16 bodyLength = (short)((readBuff[1]<<8)|readBuff[0]);
完整收发数据
Queue
Queue<ByteArray> writeQueue = new Queue<ByteArray>();
ByteArray ba = new ByteArray(sendBytes);
writeQueue.Enqueue(ba);
ByteArray ba2 = writeQueue.First();
ba2 = writeQueue.Dequeue();
采用队列的原因是队列的入队(Enqueue)和出队(Dequeue)的时间复杂度是o(1),对于lock执行时间短,可以最大限度地减少程序等待的时间。
ByteArray
ByteArray.cs
using System;
public class ByteArray {
const int DEFAULT_SIZE = 1024;
int initSize = 0;
public byte[] bytes;
public int readIdx = 0;
public int writeIdx = 0;
private int capacity = 0;
public int remain { get { return capacity-writeIdx; }}
public int length { get { return writeIdx-readIdx; }}
public ByteArray(int size = DEFAULT_SIZE){
bytes = new byte[size];
capacity = size;
initSize = size;
readIdx = 0;
writeIdx = 0;
}
public ByteArray(byte[] defaultBytes){
bytes = defaultBytes;
capacity = defaultBytes.Length;
initSize = defaultBytes.Length;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
public void ReSize(int size){
if(size < length || size < initSize)
return;
capacity = 1;
while(capacity<size)
capacity *= 2;
byte[] newBytes = new byte[capacity];
Array.Copy(bytes, readIdx, newBytes, 0, length);
bytes = newBytes;
writeIdx = length;
readIdx = 0;
}
public void CheckAndMoveBytes(){
if(length < 8){
MoveBytes();
}
}
public void MoveBytes(){
if(length>0) {
Array.Copy(bytes, readIdx, bytes, 0, length);
}
writeIdx = length;
readIdx = 0;
}
public int Write(byte[] bs, int offset, int count){
if(remain < count){
ReSize(length + count);
}
Array.Copy(bs, offset, bytes, writeIdx, count);
writeIdx += count;
return count;
}
public int Read(byte[] bs, int offset, int count){
count = Math.Min(count, length);
Array.Copy(bytes, readIdx, bs, offset, count);
readIdx += count;
CheckAndMoveBytes();
return count;
}
public Int16 ReadInt16(){
if(length < 2)
return 0;
Int16 ret = BitConverter.ToInt16(bytes, readIdx);
readIdx += 2;
CheckAndMoveBytes();
return ret;
}
public Int32 ReadInt32(){
if(length < 4)
return 0;
Int32 ret = BitConverter.ToInt32(bytes, readIdx);
readIdx += 4;
CheckAndMoveBytes();
return ret;
}
public override string ToString(){
return BitConverter.ToString(bytes, readIdx, length);
}
public string Debug(){
return string.Format("readIdx({0}) writeIdx({1}) bytes({2})",
readIdx,
writeIdx,
BitConverter.ToString(bytes, 0, capacity)
);
}
}
TestByteArray.cs
using System.Collections;
using System.Collections.Generic;
using System;
public class TestByteArray {
public static void Main(string[] args) {
ByteArray buff = new ByteArray(8);
Console.WriteLine("[1 debug ]->" + buff.Debug());
Console.WriteLine("[1 string]->" + buff.ToString());
byte[] wb = new byte[]{1,2,3,4,5};
buff.Write(wb, 0, 5);
Console.WriteLine("[2 debug ]->" + buff.Debug());
Console.WriteLine("[2 string]->" + buff.ToString());
byte[] rb = new byte[4];
buff.Read(rb, 0, 2);
Console.WriteLine("[3 debug ]->" + buff.Debug());
Console.WriteLine("[3 string]->" + buff.ToString());
Console.WriteLine("[3 rb ]->" + BitConverter.ToString(rb));
wb = new byte[]{6,7,8,9,10,11};
buff.Write(wb, 0, 6);
Console.WriteLine("[4 debug ]->" + buff.Debug());
Console.WriteLine("[4 string]->" + buff.ToString());
}
}
csc TestByteArray.cs ByteArray.cs
TestByteArray
ByteArray异步回显程序
ByteArrayAsyncEchoServer.cs
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections.Generic;
using System.Linq;
namespace ByteArrayAsyncEchoServer
{
class ClientState {
public Socket socket = null;
public ByteArray readBuff = new ByteArray();
public Queue<ByteArray> writeQueue = new Queue<ByteArray>();
}
class MainClass {
static Socket listenfd;
static Dictionary<Socket, ClientState> clients = new Dictionary<Socket, ClientState>();
public static void ReceiveCallback(IAsyncResult ar) {
try {
ClientState state = (ClientState) ar.AsyncState;
Socket clientfd = state.socket;
int count = clientfd.EndReceive(ar);
state.readBuff.writeIdx += count;
if (count==0) {
clientfd.Close();
clients.Remove(clientfd);
Console.WriteLine("Socket Close");
return;
}
OnReceiveData(state);
state.readBuff.CheckAndMoveBytes();
state.socket.BeginReceive(state.readBuff.bytes, state.readBuff.writeIdx, state.readBuff.remain, 0, ReceiveCallback, state);
}
catch (SocketException ex) {
Console.WriteLine("Socket Receive Fail" + ex.ToString());
}
}
public static void OnReceiveData(ClientState state)
{
Console.WriteLine("[接收 1] length = " + state.readBuff.length);
Console.WriteLine("[接收 2] readBuff = " + state.readBuff.ToString());
if (state.readBuff.length <= 2)
return;
int readIdx = state.readBuff.readIdx;
byte[] bytes = state.readBuff.bytes;
Int16 bodyLength = (Int16)((bytes[readIdx + 1] << 8) | (bytes[readIdx]));
if (state.readBuff.length < 2 + bodyLength)
return;
state.readBuff.readIdx += 2;
Console.WriteLine("[接收 3] bodyLength = " + bodyLength);
byte[] stringByte = new byte[bodyLength];
state.readBuff.Read(stringByte, 0, bodyLength);
string s = System.Text.Encoding.Default.GetString(stringByte);
Console.WriteLine("[接收 4] s = " + s);
Console.WriteLine("[接收 5] readBuff = " + state.readBuff.ToString());
Send(s, state);
OnReceiveData(state);
}
public static void Send(string sendStr, ClientState state)
{
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
ByteArray ba = new ByteArray(sendBytes);
int count;
lock (state.writeQueue)
{
state.writeQueue.Enqueue(ba);
count = state.writeQueue.Count;
}
if (count == 1)
state.socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, state);
Console.WriteLine("[Send]" + BitConverter.ToString(sendBytes));
}
public static void SendCallback(IAsyncResult ar)
{
ClientState state = (ClientState)ar.AsyncState;
Socket socket = state.socket;
int count = socket.EndSend(ar);
ByteArray ba;
lock (state.writeQueue)
{
ba = state.writeQueue.First();
}
ba.readIdx += count;
if (ba.length == 0){
lock (state.writeQueue){
state.writeQueue.Dequeue();
}
} else {
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, state);
}
}
public static void AcceptCallback(IAsyncResult ar) {
try {
Socket listenfd = (Socket) ar.AsyncState;
Socket clientfd = listenfd.EndAccept(ar);
ClientState state = new ClientState();
state.socket = clientfd;
clients.Add(clientfd, state);
clientfd.BeginReceive(state.readBuff.bytes, state.readBuff.writeIdx, state.readBuff.remain, 0, ReceiveCallback, state);
listenfd.BeginAccept(AcceptCallback, listenfd);
} catch(SocketException ex) {
Console.WriteLine("Socket Accept Fail" + ex.ToString());
}
}
public static void Main(string[] args) {
listenfd = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress ipAdr = IPAddress.Parse("127.0.0.1");
IPEndPoint ipEp = new IPEndPoint(ipAdr, 8888);
listenfd.Bind(ipEp);
listenfd.Listen(0);
Console.WriteLine("[服务器]启动成功");
listenfd.BeginAccept(AcceptCallback, listenfd);
Console.ReadLine();
listenfd.Close();
}
}
}
csc ByteArrayAsyncEchoServer.cs ByteArray.cs
ByteArrayAsyncEchoServer
ByteArrayAsyncEchoClient.cs
using System;
using System.Net.Sockets;
using System.Linq;
using System.Collections.Generic;
namespace ByteArrayAsyncEchoClient {
class MainClass {
static Socket socket;
static Queue<ByteArray> writeQueue = new Queue<ByteArray>();
static ByteArray readBuff = new ByteArray();
public static void Connect() {
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.BeginConnect("127.0.0.1", 8888, ConnectCallback, socket);
}
public static void ConnectCallback(IAsyncResult ar) {
Socket socket = (Socket) ar.AsyncState;
socket.EndConnect(ar);
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx, readBuff.remain, 0, ReceiveCallback, socket);
}
public static void ReceiveCallback(IAsyncResult ar) {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndReceive(ar);
readBuff.writeIdx += count;
OnReceiveData();
if(readBuff.remain<8) {
readBuff.MoveBytes();
readBuff.ReSize(readBuff.length*2);
}
socket.BeginReceive(readBuff.bytes, readBuff.writeIdx, readBuff.remain, 0, ReceiveCallback, socket);
}
public static void OnReceiveData() {
Console.WriteLine("[接收 1] length = " + readBuff.length);
Console.WriteLine("[接收 2] readBuff = " + readBuff.ToString());
if (readBuff.length <= 2)
return;
int readIdx = readBuff.readIdx;
byte[] bytes = readBuff.bytes;
Int16 bodyLength = (Int16)((bytes[readIdx+1]<<8)|(bytes[readIdx]));
if (readBuff.length < 2+bodyLength)
return;
readBuff.readIdx += 2;
Console.WriteLine("[接收 3] bodyLength = " + bodyLength);
byte[] stringByte = new byte[bodyLength];
readBuff.Read(stringByte, 0, bodyLength);
string s = System.Text.Encoding.Default.GetString(stringByte);
Console.WriteLine("[接收 4] s = " + s);
Console.WriteLine("[接收 5] readBuff = " + readBuff.ToString());
OnReceiveData();
}
public static void Send(string sendStr) {
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
ByteArray ba = new ByteArray(sendBytes);
int count;
lock(writeQueue) {
writeQueue.Enqueue(ba);
count = writeQueue.Count;
}
if(count==1)
socket.BeginSend(sendBytes, 0, sendBytes.Length, 0, SendCallback, socket);
Console.WriteLine("[Send]" + BitConverter.ToString(sendBytes));
}
public static void SendCallback(IAsyncResult ar) {
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndSend(ar);
ByteArray ba;
lock (writeQueue) {
ba = writeQueue.First();
}
ba.readIdx += count;
if (ba.length == 0){
lock (writeQueue){
writeQueue.Dequeue();
}
} else {
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
}
}
public static void Main(string[] args) {
Connect();
while(true) {
string sendStr = Console.ReadLine();
if(sendStr.Length==0)
continue;
else if(sendStr=="quit")
break;
Send(sendStr);
}
socket.Close();
}
}
}
csc ByteArrayAsyncEchoClient.cs ByteArray.cs
ByteArrayAsyncEchoClient
|