概念
想将并发量做的更大一点,思路是维护一个请求队列,一个线程用于登记并收集客户端请求,缓存到队列中。另一个线程用于处理队列中的请求。目前在考虑是否换一种数据结构,因为我想让此服务器支持长连接(可能要设置服务器查询客户端存活情况,或者在客户端主动断开时能够选择到该客户端并删除套接字)
设计过程
单例基类
线程安全,目前没有使用反射创建实例,后期更新中修改。
namespace HylicServer
{
abstract class SingleTonBase<T> where T : new()
{
private static T instance;
private static Object Olock = new Object();
public static T Instance
{
get
{
if(null == instance)
{
lock (Olock)
{
if (null == instance)
{
instance = new T();
}
}
}
return instance;
}
}
}
}
为日志类,配置文件类使用
配置操作类
配置文件读取: 本地存储一个json格式的配置文件,用于存储部署服务器的IP与端口。 ConfigurationBuilder类用于生成基于键/值的配置设置,以供在应用程序中使用。 AddJsonFile将 JSON 配置源添加到 builder。 建立配置类后可以通过键以获取对应的值
提供消息入队方法MesEnqueue,记录每一个日志消息实例。MesWriting每三秒将日志消息读进磁盘一次 这里做的不好,可以使用信号量改进写磁盘的时机。
更新中可能会修改配置文件,需要添加Set方法。
class Configuration : SingleTonBase<Configuration>
{
private ConfigurationBuilder configBuilder;
private IConfigurationRoot config;
public Configuration()
{
configBuilder = new ConfigurationBuilder();
string path= System.AppDomain.CurrentDomain.BaseDirectory;
path += "\\config\\setting.json";
configBuilder.AddJsonFile(path, false);
config = configBuilder.Build();
}
public string Get(string key)
{
return config[key];
}
}
日志类
LogLevel用于枚举日志等级 日志类需要继承单例模式类,全局唯一实例,通过Instance属性以获取日志实例。
public enum LogLevel
{
Debug,
Info,
Error,
Warn,
Fatal,
Defult
}
public struct LogMessage
{
public string Message { get; set; }
public LogLevel Level { get; set; }
public Exception Exception { get; set; }
public LogMessage(string mes,LogLevel level,Exception ex)
{
this.Message = mes;
this.Level = level;
this.Exception = ex;
}
}
class LogProvider:SingleTonBase<LogProvider>
{
private string outputStr;
private BlockingCollection<LogMessage> logMessagesQue;
public int MaxLength = 100;
public LogProvider()
{
outputStr = string.Empty;
logMessagesQue = new BlockingCollection<LogMessage>(100);
}
public void LogPrint(LogMessage log)
{
string NowTime = DateTime.Now.ToString();
outputStr = $"==========================\n" +
$"Time:{NowTime}\n" +
$"Message:{log.Message}\n" +
$"Level:{log.Level}\n" +
$"Execption:{log.Exception}\n";
Console.WriteLine(outputStr);
}
public void MesEnqueue(string Mes,LogLevel level,Exception ex)
{
LogMessage lm = new LogMessage(Mes, level, ex);
if(logMessagesQue.Count<MaxLength)
{
logMessagesQue.Add(lm);
}
}
public void MesWritting()
{
Task.Run(
() =>
{
while(true)
{
LogMessage lm = new LogMessage("", LogLevel.Defult, null);
while (!logMessagesQue.IsCompleted)
{
try
{
lm = logMessagesQue.Take();
}
catch (InvalidOperationException ex)
{
;
}
if (lm.Level != LogLevel.Defult)
{
}
}
Thread.Sleep(3000);
}
}
);
}
}
为连接上的客户端的信息记录创建类
一个连接上的客户端都有其单独的信息缓存,需要保存Socket并记录是否有关闭的需求,更新后会在内存中存储一张记录需要销毁的连接的表,在请求处理器中找到他们并做销毁。
namespace HylicServer
{
public class ClientInfo
{
public int BufferSize = 100;
public int ClientIndex { get; set; }
public TcpClient tc { get; set; }
public bool isClose;
public Byte[] ReceiveBuffer;
public ClientInfo(int index , TcpClient tc)
{
ClientIndex = index;
this.tc = tc;
ReceiveBuffer = new Byte[BufferSize];
isClose = false;
}
public async Task<string> ClientRead()
{
string receiveStringBuffer;
using (NetworkStream stream = this.tc.GetStream())
{
int bytes = await stream.ReadAsync(ReceiveBuffer, 0, BufferSize);
receiveStringBuffer = Encoding.UTF8.GetString(ReceiveBuffer, 0, bytes);
}
return receiveStringBuffer;
}
public async Task ClientWriteAsync(string msg)
{
Byte[] SendBytes = Encoding.UTF8.GetBytes(msg);
using (NetworkStream stream = this.tc.GetStream())
{
Console.WriteLine(SendBytes);
await stream.WriteAsync(SendBytes, 0, BufferSize);
}
}
}
}
HylicServer核心
主要设置一个线程作为请求收集器,另一个线程作为请求处理器,维护一个BlockingCollection线程安全的队列。
namespace HylicServer
{
class Hylic
{
private const int ConnectMaxNumber = 1000;
private string bindaddress;
private string bindport;
private IPEndPoint ipe;
public TcpListener server;
private BlockingCollection<ClientInfo> ClientList;
private int ConnectCount;
public Task Listenner;
public Task Handler;
public Hylic()
{
ConnectCount = 0;
ClientList = new BlockingCollection<ClientInfo>(boundedCapacity:255);
bindaddress = Configuration.Instance.Get("BindAddress");
bindport = Configuration.Instance.Get("BindPort");
ipe = new IPEndPoint(IPAddress.Parse(bindaddress), int.Parse(bindport));
}
public void HylicRun()
{
try
{
server = new TcpListener(ipe);
server.Start();
Console.WriteLine($"Server Is Running At:[{this.ipe}]\n");
}
catch(SocketException SE)
{
Console.WriteLine(SE.SocketErrorCode);
}
}
public void HylicListen()
{
Listenner = Task.Run(
async () =>
{
while(true)
{
this.ConnectCount = this.ConnectCount + 1;
TcpClient tc =await server.AcceptTcpClientAsync();
ClientInfo clientInfo = new ClientInfo(this.ConnectCount, tc);
ClientList.Add(clientInfo);
}
}
);
}
public void HylicRequestHandler()
{
Handler = Task.Run(
async () =>
{
while(true)
{
while(!ClientList.IsCompleted)
{
ClientInfo ci = null;
bool IsTakeSucceed = false;
try
{
IsTakeSucceed = ClientList.TryTake(out ci);
}
catch(InvalidOperationException ex)
{
Console.WriteLine($"Take Request From Queue Error:{ex.Message}");
}
if(ci != null && IsTakeSucceed==true)
{
string ReadRes = await ci.ClientRead();
Console.WriteLine($"[Message From Client {ci.ClientIndex}]\n" +
$"Message:\n{ReadRes}\n");
var firstLine = ReadRes.Split(" ").First();
Console.WriteLine("Handle String :", firstLine);
ci.tc.Close();
}
}
}
}
);
}
}
}
程序入口使用Hylic示例
namespace HylicServer
{
class Program
{
static void Main(string[] args)
{
Hylic hylicServer = new Hylic();
hylicServer.HylicRun();
hylicServer.HylicListen();
hylicServer.HylicRequestHandler();
while(true)
{
;
}
}
}
总结更新需求
1.扩展请求处理器的线程数 2.实现Server端长连接,设置Alive时长,或根据isClose标志位来决定是否出队。 3.部分配置需要放置到setting文件中 4.考虑解耦性
|