协程线程池
如何等待协程 完成 协程表示懒 任务,完成任务时恢复 协程,从而获取任务 返回值. 以线程中执行任务 为例,来看如何定制协程 机器的"零件 "来实现等待 并获取任务 返回值:
任务<大小型>读文件(){
常 动 结果=协待 异步读文件{"../主.c++"};
协中 结果.大小();
}
整个流程 大概这样: 创建 协程后,执行协待承诺.初始挂起 ,由于返回从不挂起 ,不会挂起协程,执行协程函数体(函数体) ,然后执行协待异步读文件{"../主.c++"} ,协待式 会产生3个结果: 1,挂起当前协程; 2,执行等待器.挂起协 3,执行代码返回到调用者
协待等待器 生成的伪代码 如下:
如(!等待器.准备好协()){
挂起协程;
等待器.挂起协(协程句柄);
中 到调用者;
恢复点;
}
中 等待器.恢复协();
在等待器.挂起协 中传入 协程句柄(协程句柄) 到执行任务 的线程中,线程中执行 完成任务 时,恢复 挂起协程,通过恢复 协程来告诉协程任务 执行完了,并通过等待器.恢复协() 返回任务 的结果. 现在已经了解协程 是如何表示懒 的任务 了,接下来要实现 同步等待协程的完成 ,实现思路还是定制"协程机器 "零件. 要实现等待语义 ,可再创建等待协程 ,这个协程 和调用者 间通过事件 来通信,调用者 那里阻塞等待事件 ,等待协程 执行时,也是被等待协程 完成时,通过事件 通知调用者 已完成了. 可在等待器 下功夫,创建协程 后挂起,传入协程句柄 到被等待协程(A) 中,A 执行完成后恢复等待协程 ,等待协程 给阻塞的调用者 发完成 通知.
元<类 T>
动 同步等待(T 任务){
承诺<空>承诺;
动 帮助器=[&]()->同步等待任务{
承诺.置值();
协中;
};
动 帮助器任务=帮助器();
任务.置下(帮助器任务.取句柄());
承诺.取未来().等待();
中 任务.取结果();
}
整 主(){
动 懒读任务=读文件();
动 读大小=同步等待(移动(懒读任务));
中 0;
}
看看同步等待 实现,这里通过承诺 来实现协程 和调用者 间通信,也可换成条件变量 或等待事件 等.先创建给调用者 发通知 表示等待结束 的同步等待任务 协程(B ).B 是在被等待协程 的止挂起 里恢复的,因为止挂起 是在执行协程函数体 后调用的,所以止挂起 就说明已执行完 了协程,在这里恢复同步等待任务 是最好的.
#指示 一次
元<型名 T>
构 承诺;
元<型名 T>
构[[未丢弃]]任务{
用 承诺类型=承诺<T>;
任务()=默认;
动 符号 协待()常 无异{
构 可等待{
极 准备好协()常 无异{
中 承诺.是准备好();
}
用 协程句柄=协程句柄<>;
协程句柄 挂起协(协程句柄 连续)常 无异{
承诺.连续=连续;
中 协程句柄<承诺<T>>::从承诺(承诺);
}
T&&恢复协()常{
中 承诺.取结果();
}
承诺<T>&承诺;
};
中 可等待{*承诺};
}
T&&取结果(){
中 承诺->取结果();
}
动 取句柄(){中 承诺->取句柄();}
空 置下(协程句柄<>h){
承诺->连续=h;
}
私:
任务(承诺<T>*承诺):承诺{承诺}{}
承诺<T>*承诺=空针;元<型名>友 构 承诺;
};
元<型名 T>
构 承诺{
动 取中对象(){
中 任务<T>{本};
}
动 取句柄(){
中 协程句柄<承诺<T>>::从承诺(*本);
}
从不挂起 初始挂起()无异{中{};}
动 止挂起()无异{
构 止可等待{
极 准备好协()常 无异{中 假;}
空 挂起协(协程句柄<承诺<T>>本协程)无异{
动&承诺=本协程.承诺();
如(承诺.连续)
承诺.连续();
}
空 恢复协()常 无异{}
};
中 止可等待{};
}
空 未处理异常(){终止();}
元<型名 U>
空 返回值(U&&值)
{
结果.元 原位<1>(前向<U>(值));
}
T&&取结果(){
如(结果.索引()==2)
再抛异常(取<2>(结果));
中 移动(取<1>(结果));
}
极 是准备好(){
中 结果.索引()!=0;
}
变量<单态,T,异常针>结果;
协程句柄<>连续=无操协程();
};
构 同步等待任务{
构 承诺类型{
动 取中对象(){
中 同步等待任务{协程句柄<承诺类型>::从承诺(*本)};
}
动 初始挂起(){
中 总是挂起{};
}
空 中空(){
}
动 止挂起()无异{
中 从不挂起{};
}
空 未处理异常(){
退出(1);
}
};
动 取句柄(){
中 句柄_;
}
协程句柄<承诺类型>句柄_;
};
元<类 T>
动 同步等待(T 任务){
承诺<空>承诺;
动 帮助器=[&]()->同步等待任务{
承诺.置值();
协中;
};
动 帮助器任务=帮助器();
任务.置下(帮助器任务.取句柄());
承诺.取未来().等待();
中 任务.取结果();
}
协程线程池
相比调度协程 到线程中执行,把协程 调度到线程池 中执行更有意义,实现思路也比较简单,通过协待等待器 即可实现,在等待器.挂起协 里把协程 丢到线程池 里就好了,剩下就由线程池 调度执行协程 就好了. 而线程池 部分实现也很简单,把前面 的简单池 的代码 增加个调度 方法即可,其它都不变:
#包含"安全队列.h++"
用 协程项=协程句柄<>;
类 协程线程池{
公:
显 协程线程池(大小型 线程=线程::硬件并行()){
对(大小型 i=0;i<线程;++i)
工作者_.原后([本]{
对(;;){
协程项 任务;
如(!队列_.弹(任务))中;
如(任务)任务();
}
});
}
动 调度()
{
构 等待器
{
协程线程池*线程池_;
常式 极 准备好协()常 无异{中 假;}
常式 空 恢复协()常 无异{}
空 挂起协(协程句柄<>协程)常 无异{
线程池_->入列(协程);
}
};
中 等待器{本};
}
空 入列(协程项 项){队列_.压(移动(项));}
~协程线程池(){
队列_.停止();
对(动&thd:工作者_){
thd.合并();
}
}
私:
队列<协程项>队列_;
向量<线程>工作者_;
};
空 测试协程池(){
协程线程池 池;
动 任务=调度(
池,[](整 x){
输出<<"当前线程标识:"<<本线程::取标识()<<"\n";
中 x;
},
42);
输出<<同步等待(移动(任务))<<'\n';
调度(
池,[](整 x){
输出<<"当前线程标识:"<<本线程::取标识()<<"\n";
中 x;
},42);
本线程::休息(时间::秒(2));
}
|