一、序言
流数据的发送,其实就是分块数据的发送,在之前,相信大家已经学会用Channel来发送和接收数据,但是要想直接发送流数据,还是比较麻烦的。所以这节为大家展示,如何优雅的发送流数据。
本节须知
在学习本节之前,您必须熟悉RRQM中的Protocol服务器与客户端(或其派生类,例如文件传输和RPC)的创建,如果您不熟悉,请在下列链接中了解。
二、程序集源码、Demo下载
2.1 源码位置
RRQMSocket
2.2 Demo位置
RRQMBox
三、安装
安装RRQMSocket 即可,具体步骤详看链接博客。
VS、Unity安装和使用Nuget包
四、流发送
流发送是由IProtocolClient 的实现类提供的函数(客户端实现为ProtocolClient ,服务器实现为ProtocolSocketClient )。
直接调用SendStream(重载)函数直接发送,该流数据将从Position位置 直到结束。
同时,还需要一个StreamOperator 类型的必要参数和Metadata 类型的可选参数。
StreamOperator 是本次流发送的操作器,其内容包括最大流速度 、当前速度 、当前进度 、分包长度 、取消操作 等。
Metadata是string类型的键值对 ,可以用于传递数据。 
代码示例如下:
SimpleProtocolClient protocolClient = CreateSimpleProtocolClient(new FixedHeaderDataHandlingAdapter());
byte[] data = new byte[1024 * 1024 * 50];
new Random().NextBytes(data);
MemoryStream stream = new MemoryStream(data);
stream.Position = 0;
Console.WriteLine($"即将发送流数据,长度为:{stream.Length}");
StreamOperator streamOperator = new StreamOperator();
streamOperator.PackageSize = 1024 * 64;
streamOperator.MaxSpeed = 1024 * 1024 * 5;
LoopAction loopAction = LoopAction.CreateLoopAction(-1, 1000, (a) =>
{
if (streamOperator.Status != ChannelStatus.Default)
{
a.Dispose();
}
Console.WriteLine($"速度:{streamOperator.Speed()},进度:{streamOperator.Progress}");
});
loopAction.RunAsync();
Metadata metadata = new Metadata();
metadata.Add("1", "1");
metadata.Add("2", "2");
AsyncResult asyncResult = protocolClient.SendStream(stream, streamOperator, metadata);
Console.WriteLine($"状态:{asyncResult.IsSuccess},信息:{asyncResult.Message}");
五、流接收
如果是Simple类型的实现实例,则订阅BeforeReceiveStream 和ReceivedStream 事件即可(SimpleProtocolService也有该事件),如果是继承类,则需要实现相关函数,此处不再演示。
以SimpleProtocolService为例,订阅BeforeReceiveStream 和ReceivedStream 事件。
BeforeReceiveStream事件会在发送方发起请求时触发,可以获得相关信息,以e.IsPermitOperation 属性决定要不要接收。如果要接收,则必须对e.Bucket (Stream类型)赋值。同样的可以从e.StreamOperator获得操作器,获取传输的相关信息。
ReceivedStream 事件会在流数据接收结束后触发(不一定是完成,失败也会触发),然后可以通过e.Status判断状态,同时,应当对 e.Bucket 进行手动释放。
SimpleProtocolService protocolService = CreateSimpleProtocolService();
protocolService.CreateSocketClient += (client, e) =>
{
client.SetDataHandlingAdapter(new FixedHeaderDataHandlingAdapter());
};
protocolService.BeforeReceiveStream += (socketClient, e) =>
{
e.Bucket = new MemoryStream();
e.IsPermitOperation = true;
Metadata metadata = e.Metadata;
StreamOperator streamOperator = e.StreamOperator;
Task.Run(async () =>
{
while (streamOperator.Status == ChannelStatus.Default)
{
Console.WriteLine($"速度={streamOperator.Speed()},进度={streamOperator.Progress}");
await Task.Delay(1000);
}
Console.WriteLine($"从循环传输结束,状态={streamOperator.Status}");
});
Console.WriteLine("开始接收流数据");
};
protocolService.ReceivedStream += (socketClient, e) =>
{
if (e.Status== ChannelStatus.Completed)
{
e.Bucket.Dispose();
}
Console.WriteLine($"从ReceivedStream传输结束,状态={e.Status}");
};
|