协议类
public abstract class Protocol<TMessageType>
{
const int HEADER_SIZE = 4;
public async Task<TMessageType> ReceiveAsync(NetworkStream networkStream)
{
var bodyLength = await ReadHeader(networkStream);
AssertValidMessageLength(bodyLength);
return await ReadBody(networkStream, bodyLength);
}
public async Task SendAsync<T>(NetworkStream networkStream, T message)
{
var (header, body) = Encode(message);
await networkStream.WriteAsync(header, 0, header.Length);
await networkStream.WriteAsync(body, 0, body.Length);
}
async Task<int> ReadHeader(NetworkStream networkStream)
{
var headerBytes = await ReadAsync(networkStream, HEADER_SIZE);
return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(headerBytes, 0));
}
async Task<TMessageType> ReadBody(NetworkStream networkStream, int bodyLength)
{
var bodyBytes = await ReadAsync(networkStream, bodyLength);
return Decode(bodyBytes);
}
async Task<byte[]> ReadAsync(NetworkStream networkStream, int bytesToRead)
{
var buffer = new byte[bytesToRead];
int bytesRead = 0;
while (bytesRead < bytesToRead)
{
var newBytes = await networkStream.ReadAsync(buffer, bytesRead, buffer.Length);
if (newBytes == 0)
{
throw new Exception("Socket Closed");
}
bytesRead += newBytes;
}
return buffer;
}
protected (byte[] header, byte[] body) Encode<T>(T message)
{
var bodyBytes = EncodeBody(message);
var headerBytes = BitConverter.GetBytes(IPAddress.HostToNetworkOrder(bodyBytes.Length));
return (headerBytes, bodyBytes);
}
protected abstract TMessageType Decode(byte[] message);
protected abstract byte[] EncodeBody<T>(T messageType);
protected virtual void AssertValidMessageLength(int messageLength)
{
if (messageLength < 1)
{
throw new Exception("Invalid Length");
}
}
}
JSON协议类
public class JsonMessageProtocal : Protocol<JObject>
{
static readonly JsonSerializer _serializer;
static readonly JsonSerializerSettings _settings;
static JsonMessageProtocal()
{
_settings = new JsonSerializerSettings
{
Formatting = Formatting.Indented,
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
ContractResolver = new DefaultContractResolver
{
NamingStrategy = new CamelCaseNamingStrategy
{
ProcessDictionaryKeys = false
}
}
};
_serializer = JsonSerializer.Create(_settings);
}
protected override JObject Decode(byte[] message)
{
return JObject.Parse(Encoding.UTF8.GetString(message));
}
protected override byte[] EncodeBody<T>(T messageType)
{
var sb = new StringBuilder();
var sr = new StringWriter(sb);
_serializer.Serialize(sr, messageType);
return Encoding.UTF8.GetBytes(sb.ToString());
}
}
XML协议类
TODO
通信信息适配
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class RouteAttribute : Attribute
{
public string Path { get; }
public RouteAttribute(string path) => Path = path;
}
public abstract class MessageDispatcher<TMessageType> where TMessageType : class, new()
{
public abstract void Register<TParm, TResult>(Func<TParm, Task<TResult>> target);
public abstract void Register<TParm>(Func<TParm, Task> target);
public abstract Task<TMessageType> DispatchAsync(TMessageType message);
}
public class XDocumentMessageDispatch : MessageDispatcher<XDocument>
{
readonly List<(string xpathExpresstion, Func<XDocument, Task<XDocument>> targerMethod)> _handlers
= new List<(string xpathExpresstion, Func<XDocument, Task<XDocument>> targerMethod)>();
public override async Task<XDocument> DispatchAsync(XDocument message)
{
foreach (var (path, target) in _handlers)
{
if ((message.XPathEvaluate(path) as bool?) == true)
{
return await target(message);
}
}
return null;
}
public override void Register<TParm, TResult>(Func<TParm, Task<TResult>> target)
{
throw new NotImplementedException();
}
public override void Register<TParm>(Func<TParm, Task> target)
{
throw new NotImplementedException();
}
private string GetXPathRoute(MethodInfo methodInfo)
{
var routeAttribute = methodInfo.GetCustomAttribute<RouteAttribute>();
if (routeAttribute == null)
throw new ArgumentException($"Method { methodInfo.Name} missing required RouteAttribute");
return $"boolean ({routeAttribute.Path})";
}
}
|