关于项目
项目目前开源:https://github.com/ZYunfeii/ElegantDIS,遵循MIT协议。 之后会持续更新,目前完成了一些分布式仿真的基本功能。
项目效果展示
管理节点
- 日志显示栏
- 仿真节点IP Port显示,订阅话题显示
- 仿真显示栏
- 开始按钮和初始化仿真节点按钮
仿真节点(客户节点)
- 填入管理节点IP和Port后向管理节点进行连接;
- 日志显示栏显示日志信息;
项目原理
项目基于Muduo网络库,受其中examples/hub代码启发,正巧实验室只有Win下的分布式系统,想自己开发一个Linux下的。
- 客户端采用QT搭建,主线程负责显示,子线程用于网络监听和一系列指令接受与发布逻辑;
- 客户节点通过回调绑定设置仿真步进函数和初始化函数;
- 管理节点向已注册的节点发布步进命令和初始化命令(目前只支持这两种操作);
- 网络消息使用Json传输;
仿真同步逻辑(该DIS系统的核心思想) 对于仿真节点,其在步进函数中需要做的是将订阅话题的数据用于输入,计算得到需要发布话题的新值。之后向管理节点发布stepover指令,管理节点设置变量记录当前仿真步接受到stepover的节点数量,当数量等于所注册的节点数量时,管理节点发布synpub指令给所有仿真节点,仿真节点接收到该指令后将需要发布的话题值发布,通过管理节点转送到订阅该话题的节点,节点更新订阅话题的值,同时节点需要记录每一步仿真中订阅话题已更新数量,当所有订阅话题都被更新后,仿真节点向管理节点发布synpubover指令。管理节点统计每一仿真步中接受到synpubover的数量,当其等于所注册节点数量时,说明所有节点都完成话题更新了,此时再次向所有节点发布step指令进行下一步仿真。该逻辑保证了仿真的同步和时序的正常。 逻辑:所有节点先仿真一步->所有节点发布话题新值->所有节点更新话题值->下一步仿真。如果在step过程中又pub,时序会混乱。
对于管理节点,其需要记录注册仿真节点的各种信息。由于Muduo网络库是基于事件触发的,下面给出当管理节点收到消息后的处理函数:
void PubSubServer::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime) {
ParseResult result = kSuccess;
while (result == kSuccess) {
string cmd;
string topic;
string content;
result = parseMessage(buf, &cmd, &topic, &content);
if (result == kSuccess) {
if (cmd == "pub") {
doPublish(conn->name(), topic, content, receiveTime);
}
else if (cmd == "sub") {
doSubscribe(conn, topic);
}
else if (cmd == "unsub") {
doUnsubscribe(conn, topic);
}
else if (cmd == "stepover") {
emit step_over_sig();
}
else if (cmd == "initover") {
}
else if (cmd == "synpubover") {
emit synpub_over_sig();
}
else {
conn->shutdown();
result = kError;
}
}
else if (result == kError) {
conn->shutdown();
}
}
}
对于仿真节点,当有消息到来时的处理函数:
void PubSubClient::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime) {
ParseResult result = kSuccess;
while (result == kSuccess) {
string cmd;
string topic;
string content;
result = parseMessage(buf, &cmd, &topic, &content);
if (result == kSuccess) {
if (cmd == "pub" && subscribeCallback_) {
subscribeCallback_(topic, content, receiveTime);
}
if (cmd == "step") {
emit log_msg(QString("[Info] Step cmd received!"));
stepCallback_();
send("stepover\r\n");
}
if (cmd == "init") {
emit log_msg(QString("[Info] Init cmd received!"));
initCallback_();
send("initover\r\n");
}
if (cmd == "synpub") {
emit synpub_sig();
}
}
else if (result == kError) {
conn->shutdown();
}
}
}
可以看到,当接收到step或者init命令时,程序将执行上层注册的回调函数。并向管理节点发布over指令。其中emit是为了让qt主线程相应并显示相关日志信息。
Todo
- 将订阅话题的数据改为Json传输。(已完成)
鸣谢
Muduo网络库——陈硕
|