并发架构
-
活动对象的设计模式将执行与调用进行解耦,每个对象会留在自己的控制线程中,其目标是通过使用异步方法和调度器来引入并发。维基百科:Active object -
监控对象的设计模式,会同步并发方法的执行,以确保对象每次只运行一个成员函数。并且,还允许对象的成员函数协同调度序列的执行。 -
半同步/半异步体系结构模式,在并发系统中对异步和同步服务处理进行解耦,从而在不降低太多性能的情况下简化编程。该模式引入了两个通信层,一个用于异步,另一个用于同步。
活动对象
? 活动对象模式将执行与对象的成员函数解耦,每个对象会留在在自己的控制线程中。其目标是通过使用异步方法,处理调度器的请求,从而触发并发。维基百科:Active object。所以,这种模式也称为并发对象模式。
客户端的调用会转到代理,代理表现为活动对象的接口。服务提供活动对象的实现,并在单独的线程中运行。代理在运行时将客户端的调用转换为对服务的调用,调度程序将方法加入到激活列表中。调度器与服务在相同的线程中活动,并将方法调用从激活列表中取出,再将它们分派到相应的服务上。最后,客户端可以通过future从代理处获取最终的结果。
组件
活动对象模式由六个组件组成:
- 代理为活动对象的可访问方法提供接口。代理将触发激活列表的方法,并请求对象的构造。并且,代理和客户端运行在相同的线程中。
- 方法请求类定义了执行活动对象的接口。
- 激活列表的目标是维护挂起的请求,激活列表将客户端线程与活动对象线程解耦。代理对入队请求的进行处理,而调度器将请求移出队列。
- 调度器与代理可在不同的线程中运行。调度器会在活动对象的线程中运行,并决定接下来执行激活列表中的哪个请求。
- 可以通过服务实现活动对象,并在活动对象的线程中运行,服务也支持代理接口。
- future是由代理创造的,客户端可以从future上获取活动对象调用的结果。客户端可以安静等待结果,也可以对结果进行轮询。
代理
代理设计模式是《设计模式:可重用的面向对象软件的元素》中的经典模式,代理是其他对象的代表。典型的代理可以是远程代理CORBA、安全代理、虚拟代理或智能指针,如std::shared_ptr 。每个代理会为它所代表的对象添加额外的功能。远程代理代表远程对象,并使客户端产生本地对象的错觉。安全代理通过对数据进行加密和解密,将不安全的连接转换为安全的连接。虚拟代理以惰性的方式封装对象的创建,智能指针将接管底层内存的生存期。
优点和缺点
介绍Active Object模式的最小实现前,先了解一下它的优点和缺点。
优点:
-
同步只需要在活动对象的线程上进行,不需要在客户端的线程上进行。 -
客户端(用户)和服务器(实现者)之间的解耦,同步的挑战则在实现者的一边。 -
由于客户端为异步请求,所以系统的吞吐量提高了,从而调用处理密集型方法不会阻塞整个系统。 -
调度器可以实现各种策略来执行挂起请求,因此可以按不同的顺序执行入队请求。
缺点:
- 如果请求的粒度太细,则活动对象模式(如代理、激活列表和调度器)的性能开销可能过大。
- 由于调度器的调度策略和操作系统的调度互相影响,调试活动对象模式通常非常困难,尤其是以不同顺序执行请求的情况下。
具体实现
下面的示例展示了活动对象模式的简单实现。我没有定义一个请求,这应该由代理和服务实现。而且,当请求调度程序执行下一个请求时,服务应该只执行这个请求。
所涉及的类型为future<vector<future<pair<bool, int>>>> ,这个类型的标识有点长。为了提高可读性,我使用了声明(第16 - 37行)。
#include <algorithm>
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <utility>
#include <vector>
using std::async;
using std::boolalpha;
using std::cout;
using std::deque;
using std::distance;
using std::endl;
using std::for_each;
using std::find_if;
using std::future;
using std::lock_guard;
using std::make_move_iterator;
using std::make_pair;
using std::move;
using std::mt19937;
using std::mutex;
using std::packaged_task;
using std::pair;
using std::random_device;
using std::sort;
using std::thread;
using std::uniform_int_distribution;
using std::vector;
class IsPrime
{
public:
pair<bool, int> operator()(int i)
{
for (int j = 2; j * j <= i; ++j)
{
if (i % j == 0)return std::make_pair(false, i);
}
return std::make_pair(true, i);
}
};
class ActivaeObject
{
public:
future<pair<bool, int>> enqueueTask(int i)
{
IsPrime isPrime;
packaged_task<pair<bool, int>(int)> newJob(isPrime);
auto isPrimeFuture = newJob.get_future();
auto pair = make_pair(move(newJob), i);
{
lock_guard<mutex> lockGuard(activationListMutex);
activationList.push_back(move(pair));
}
return isPrimeFuture;
}
void run()
{
thread servant([this]
{
while (!isEmpty())
{
auto myTask = dequeueTask();
myTask.first(myTask.second);
}
});
servant.join();
}
private:
pair<packaged_task<pair<bool, int>(int)>, int> dequeueTask()
{
lock_guard<mutex> lockGuard(activationListMutex);
auto myTask = std::move(activationList.front());
activationList.pop_front();
return myTask;
}
bool isEmpty()
{
lock_guard<mutex> lockGuard(activationListMutex);
auto empty = activationList.empty();
return empty;
}
deque<pair<packaged_task<pair<bool, int>(int)>, int >> activationList;
mutex activationListMutex;
};
vector<int> getRandNumber(int number)
{
random_device seed;
mt19937 engine(seed());
uniform_int_distribution<> dist(1000000, 1000000000);
vector<int> numbers;
for (long long i = 0; i < number; ++i) numbers.push_back(dist(engine));
return numbers;
}
future<vector<future<pair<bool, int>>>> getFutures(ActivaeObject &activeObject,
int numberPrimes)
{
return async([&activeObject, numberPrimes]
{
vector<future<pair<bool, int>>> futures;
auto randNumbers = getRandNumber(numberPrimes);
for (auto numb : randNumbers)
{
futures.push_back(activeObject.enqueueTask(numb));
}
return futures;
});
}
int main()
{
cout << boolalpha << endl;
ActivaeObject activeObject;
auto client1 = getFutures(activeObject, 1998);
auto client2 = getFutures(activeObject, 2003);
auto client3 = getFutures(activeObject, 2011);
auto client4 = getFutures(activeObject, 2014);
auto client5 = getFutures(activeObject, 2017);
auto futures = client1.get();
auto futures2 = client2.get();
auto futures3 = client3.get();
auto futures4 = client4.get();
auto futures5 = client5.get();
futures.insert(futures.end(), make_move_iterator(futures2.begin()),
make_move_iterator(futures2.end()));
futures.insert(futures.end(), make_move_iterator(futures3.begin()),
make_move_iterator(futures3.end()));
futures.insert(futures.end(), make_move_iterator(futures4.begin()),
make_move_iterator(futures4.end()));
futures.insert(futures.end(), make_move_iterator(futures5.begin()),
make_move_iterator(futures5.end()));
activeObject.run();
vector<pair<bool, int>> futResults;
futResults.reserve(futResults.size());
for (auto &fut : futures)futResults.push_back(fut.get());
sort(futResults.begin(), futResults.end());
auto prIt = find_if(futResults.begin(), futResults.end(),
[](pair<bool, int>pa) {return pa.first == true; });
cout << "Number primes: " << distance(prIt, futResults.end()) << endl;
cout << "Primes: " << endl;
for_each(prIt, futResults.end(), [](auto p) {cout << p.second << " "; });
cout << "\n\n";
cout << "Number no primes: " << distance(futResults.begin(), prIt) << endl;
cout << "No primes: " << endl;
for_each(futResults.begin(), prIt, [](auto p) {cout << p.second << " "; });
cout << endl;
}
示例的基本思想是,客户端可以在激活列表上并发地安排作业。线程的工作是确定哪些数是质数。激活列表是活动对象的一部分,而活动对象在一个单独的线程上进行入队操作,并且客户端可以在激活列表中查询作业的结果。
程序的详情:5个客户端通过getFutures 将工作(第121 - 126行)入队到activeObject 。numberPrimes 中的数字是1000000到1000000000之间(第96行)的随机数,将这些数值放入vector<future<pair<bool, int>> 中。future<pair<bool, int> 持有一个bool 和int 对,其中bool 表示int 值是否是质数。再看看第108行:future .push_back(activeObject.enqueueTask(numb)) 。此调用将触发新作业进入激活列表的队列,所有对激活列表的调用都必须受到保护,这里激活列表是一个promise队列(第89行):deque<pair<packaged_task<pair<bool, int>(int)>, int >> 。
每个promise在调用执行函数对象IsPrime (第39 - 47行)时,会返回一个bool 和int 对。现在,工作包已经准备好了,开始计算吧。所有客户端在第129 - 133行中返回关联future的句柄,并把所有的future放在一起(第136 - 146行),这样会使工作更加容易。第149行中的调用activeObject.run() 启动执行。run (第64 - 72行)启动单独的线程,并执行promises(第68行),直到执行完所有作业(第66行)。isEmpty (第83 - 87行)确定队列是否为空,dequeTask 会返回一个新任务。通过在每个future 上调用futResults.push_back(fut.get()) (第154行),所有结果都会推送到futResults 上。第156行对成对的向量进行排序:vector<pair<bool, int>> 。其余代码则是给出了计算结果,第159行中的迭代器prIt 将第一个迭代器指向一个素数对。
监控对象
监控对象模式会同步并发执行,以确保对象只执行一个方法。并且,还允许对象的方法协同调度执行序列。这种模式也称为线程安全的被动对象模式。
模式要求
多个线程同时访问一个共享对象时,需要满足以下要求:
- 并发访问时,需要保护共享对象不受非同步读写操作的影响,以避免数据争用。
- 必要的同步是实现的一部分,而不是接口的一部分。
- 当线程处理完共享对象时,需要发送一个通知,以便下一个线程可以使用共享对象。这种机制有助于避免死锁,并提高系统的整体性能。
- 方法执行后,共享对象的不变量必须保持不变。
客户端(线程)可以访问监控对象的同步方法。因为监控锁在任何时间点上,只能运行一个同步方法。每个监控对象都有一个通知等待客户端的监控条件。
组件
监控对象由四个组件组成。
- 监控对象:支持一个或多个方法。每个客户端必须通过这些方法访问对象,每个方法都必须在客户端线程中运行。
- 同步方法:监控对象支持同步方法。任何给定的时间点上,只能执行一个方法。线程安全接口有助于区分接口方法(同步方法)和(监控对象的)实现方法。
- 监控锁:每个监控对象有一个监控锁,锁可以确保在任何时间点上,只有一个客户端可以访问监控对象。
- 监控条件:允许线程在监控对象上进行调度。当前客户端完成同步方法的调用后,下一个等待的客户端将被唤醒。
虽然监控锁可以确保同步方法的独占访问,但是监控条件可以保证客户端的等待时间最少。实质上,监控锁可以避免数据竞争,条件监控可以避免死锁。
运行时行为
监控对象及其组件之间的交互具有不同的阶段。
- 当客户端调用监控对象的同步方法时,必须锁定全局监控锁。如果客户端成功访问,将执行同步方法,并在结束时解锁。如果客户端访问不成功,则阻塞客户端,进入等待状态。
- 当客户端阻塞时,监控对象会在解锁时,对阻塞的客户端发送通知。通常,等待是资源友好的休眠,而不是忙等。
- 当客户端收到通知时,会锁定监控锁,并执行同步方法。同步方法结束时解锁,并发送监控条件的通知,以通知下一个客户端去执行。
优点和缺点
监控对象的优点和缺点是什么?
优点:
-
同步方法会完全封装在实现中,所以客户端不知道监控对象会隐式同步。 -
同步方法将自动调度监控条件的通知/等待机制,其表现类似一个简单的调度器。 -
缺点:
- 功能和同步是强耦合的,所以很难改变同步机制。
- 当同步方法直接或间接调用同一监控对象时,可能会发生死锁。
下面的程序段中定义了一个ThreadSafeQueue。
#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
template <typename T>
class Monitor
{
public:
void lock() const
{
monitMutex.lock();
}
void unlock() const
{
monitMutex.unlock();
}
void notify_one() const noexcept
{
monitCond.notify_one();
}
void wait() const
{
std::unique_lock<std::recursive_mutex> monitLock(monitMutex);
monitCond.wait(monitLock);
}
private:
mutable std::recursive_mutex monitMutex;
mutable std::condition_variable_any monitCond;
};
template <typename T>
class ThreadSafeQueue : public Monitor<ThreadSafeQueue<T>>
{
public:
void add(T val)
{
derived.lock();
myQueue.push(val);
derived.unlock();
derived.notify_one();
}
T get()
{
derived.lock();
while (myQueue.empty()) derived.wait();
auto val = myQueue.front();
myQueue.pop();
derived.unlock();
return val;
}
private:
std::queue<T> myQueue;
ThreadSafeQueue<T> &derived = static_cast<ThreadSafeQueue<T>&>(*this);
};
class Dice
{
public:
int operator()() { return rand(); }
private:
std::function<int()>rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main()
{
std::cout << std::endl;
constexpr auto NUM = 100;
ThreadSafeQueue<int> safeQueue;
auto addLambda = [&safeQueue](int val) {safeQueue.add(val); };
auto getLambda = [&safeQueue] {std::cout << safeQueue.get() << " "
<< std::this_thread::get_id() << ";";
};
std::vector<std::thread> addThreads(NUM);
Dice dice;
for (auto &thr : addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NUM);
for (auto &thr : getThreads) thr = std::thread(getLambda);
for (auto &thr : addThreads) thr.join();
for (auto &thr : addThreads) thr.join();
std::cout << "\n\n";
}
该示例的核心思想是,将监控对象封装在一个类中,这样就可以重用。监控类使用std::recursive_mutex 作为监控锁,std::condition_variable_any 作为监控条件。与std::condition_variable 不同,std::condition_variable_any 能够接受递归互斥。这两个成员变量都声明为可变,因此可以在常量方法中使用。监控类提供了监控对象的最小支持接口。
第34 - 55行中的ThreadSafeQueue 使用线程安全接口扩展了第53行中的std::queue 。ThreadSafeQueue 继承于监控类,并使用父类的方法来支持同步的方法add 和get 。方法add 和get 使用监控锁来保护监控对象,特别是非线程安全的myQueue 。当一个新项添加到myQueue 时,add 会通知等待线程,并且这个通知是线程安全的。当如ThreadSafeQueue 这样的模板类,将派生类作为基类的模板参数时,这属于C++的一种习惯性用法,称为CRTP:class ThreadSafeQueue: public Monitor<threadsafequeue<T>> 。理解这个习惯的关键是第54行:ThreadSafeQueue<T>& derived = static_cast<threadsafequeue<T>&>(*this) ,该表达式将this 指针向下转换为派生类。监控对象safeQueue 第72行使用(第73行和第74行中的)Lambda函数添加一个数字,或从同步的safeQueue 中删除一个数字。ThreadSafeQueue 本身是一个模板类,可以保存任意类型的值。程序模拟的是100个客户端向safeQueue 添加100个介于1 - 6之间的随机数(第78行)的同时,另外100个客户端从safeQueue 中删除这100个数字。程序会显示使用的线程的编号和id。
不知道为啥会段错误
奇异递归模板模式(CRTP)
奇异递归模板模式,简单地说,CRTP代表C++中的一种习惯用法,在这种用法中,Derived类派生自类模板Base,因此Base作为Derived模板参数。
template<class T>
class Base
{
....
};
class Derived : public Base<Derived>
{
....
};
理解CRTP习惯用法的关键是,实例化方法是惰性的,只有在需要时才实例化方法。CRTP有两个主要的用例。
- 静态多态性:静态多态性与动态多态性类似,但与使用虚方法的动态多态性相反,方法调用的分派在编译时进行。
- Mixin: Mixin是设计混合代码类时的一个流行概念。
ThreadSafeQueue 使用Mixin技术来扩展它的接口。通过从Monitor 类派生ThreadSafeQueue ,派生类ThreadSafeQueue 获得类Monitor 的所有方法:ThreadSafeQueue: public Monitor<threadsafequeue<T>> 类。
惰性C++:CRTP一文中,有对CRTP习语有更深入地描述。
活动对象和监控对象在几个重要的方面类似,但也有不同。这两种体系结构模式,会同步对共享对象的访问。活动对象的方法在不同线程中执行,而监控对象的方法则在同一线程中执行。活动对象更好地将其方法调用与执行解耦,因此更容易维护。
半同步/半异步
半同步/半异步模式会对并发系统中异步和同步服务进行解耦,从而在不过度降低性能的情况下简化编程。该模式引入了两个可以相互通信的层,一个用于异步,另一个用于同步。
半同步/半异步模式通常用于服务器的事件循环或图形界面。事件循环的工作流是将事件请求插入队,并在单独的线程中同步处理。异步处理确保了运行效率,而同步处理简化了申请流程。异步服务层和同步服务层分解为两个层,并且在这两个层之间有队列坐标。异步层由较底层的系统服务(如中断)组成,而同步层由较高层的服务(如数据库查询或文件操作)组成。异步层和同步层可以通过队列层相互通信。
优点和缺点
半同步/半异步模式的优点和缺点是什么?
优点:
- 异步和同步分界线很明确。底层系统服务在异步层中处理,高层服务在同步层中处理。
- 对请求队列处理的层,保证了异步层和同步层的解耦。
- 清晰的分离使软件更容易理解、调试、维护和扩展。
- 同步服务中的阻塞不会影响异步服务。
缺点:
- 异步层和同步层之间交叉的部分可能会导致开销。通常,因为异步服务通常在内核空间中运行,同步服务在用户空间中运行,所以“边界的部分”会涉及内核空间和用户空间之间的上下文切换。
- 为了严格分离各层,要求复制数据或数据是不可变的
半同步/半异步模式通常用于事件的多路分解和调度框架,如Reactor或Proactor模式。
Reactor模式
Reactor模式也称为调度程序或通知程序。该模式是一个事件驱动的框架,用于将多个服务请求并发地分发到各个服务端。
使用要求
服务器应该并发地处理客户端的请求。每个客户端的请求都有一个唯一标识符,并支持映射到特定的服务端。以下几点是Reactor必备的:
- 不阻塞。
- 支持最大吞吐量,避免不必要的上下文切换,避免数据的复制或同步。
- 易于扩展,以支持服务的修改。
- 不使用复杂的同步机制。
解决方案
对于支持的服务类型,实现一个事件处理程序来满足特定客户端的请求。反应器中使用注册的方式,将服务端的事件处理程序进行注册,这里使用了事件解复用器来同步等待所有传入的事件。当一个事件到达时,反应器得到通知,并将相应的事件分派给特定的服务。
组件
- 句柄:
- 句柄标识了事件源,如网络连接、打开文件或GUI事件。
- 事件源生成连接、读或写等事件,这些事件会在句柄上进行排队。
- 同步事件多路分解器:
- 同步事件多路分解器会等待一个或多个事件。多路分解器会进行阻塞,直到关联的句柄能够处理该事件为止。
- 事件处理接口:
- 事件处理程序定义了处理特定事件的接口。
- 事件处理程序定义了应用程序支持的服务。
- 特定事件处理程序:
- 反应器:
反应器(而不是应用程序)等待特定事件,并进行分解和分派。具体的事件处理在反应器中注册,反应器改变了控制流程。反应器等待特定事件,并调用特定的处理程序。这种控制的倒置,称为好莱坞原则。(译者注:“不要给我们打电话,我们会给你打电话(don‘t call us, we‘ll call you)”这是著名的好莱坞原则。)
下面的代码段显示了C++框架的事件循环——自适应通信环境(ACE)。
SignalHandler *mutateTimer1 = new SignalHandler(timerId1);
SignalHandler *mutateTimer2 = new SignalHandler(timerId2);
ACE_Reactor::instance()->register_handler(SIGINT, mutateTimer1);
ACE_Reactor::instance()->register_handler(SIGTSTP, mutateTimer2);
Timer::instance()->wait_fot_event();
第2行和第5行定义按CTRL+c和CTRL+z的键盘事件的信号处理程序。第7行和第8行记录它们,事件循环从第12行开始。
优点和缺点
反应器模式的优点和缺点是什么呢?
- 优点:
- 框架和应用逻辑解耦。
- 各种具体处理程序的模块化。
- 接口和实现的分离,使服务更容易适应或扩展。
- 整体结构支持并发。
- 缺点:
- 需要调用事件分解系统。
- 长时间运行的程序会阻塞反应器。
- 反转控制使得测试和调试更加困难。
半同步/半异步模式通常在反应器模式中,用于在独立线程中对客户端请求的响应。
Proactor模式是反应器模式的异步变体。反应器模式同步地分解和分派事件处理程序,而Proactor模式异步地分派事件处理程序。
Proactor模式
Proactor模式允许事件驱动的应用程序,对异步操作完成时触发的服务请求进行多路的分解和分派。
使用要求
事件驱动程序(如服务器),其性能可以通过异步处理服务来提高。为了实现这种方式,事件驱动程序必须同步处理多个事件,从而避免昂贵的数据同步或上下文切换。此外,修改后的服务应该很容易集成入系统,应用程序应该避免对多线程和同步方式进行挑战。
解决方案
将服务分为两部分:异步运行的长时间操作和处理操作结果的程序。结果处理程序与反应器模式中的事件处理程序非常相似,不过异步操作通常是操作系统的工作。所以,作为反应器模式,Proactor模式定义了事件循环。
异步操作(如连接请求)是该模式的独特之处,并且在不阻塞调用线程的情况下执行操作。当耗时相当长的操作完成时,它将一个完成事件放入完成事件队列,Proactor通过使用异步事件多路分解器在队列上等待。异步事件多路分解器将从队列中删除完成事件,而Proactor将其分派给特定的处理程序,处理操作的结果。
组件
Proactor模式由九个组件组成。
- 句柄:
- 表示操作系统的实体(如套接字),可以生成完成事件。
- 异步操作:
- 通常异步执行耗时相当长的操作。可以在套接字上进行读或写操作。
- 异步操作处理器:
- 执行异步操作,完成后在完成事件队列上注册完成事件。
- 完成事件接口:
- 定义处理异步操作结果的接口。
- 完成事件处理逻辑:
- 完成事件队列:
- 作为完成事件的缓冲,直到被异步事件分解器移出队列。
- 异步事件多路分解器:
- 在完成事件队列上等待完成事件时,可以阻塞程序。
- 从完成事件队列中删除完成事件。
- Proactor:
- 调用异步事件分解器对完成事件进行脱队操作。
- 分解和分派完成事件,并调用特定的处理程序处理完成事件。
- 创建者:
- 调用异步操作。
- 可与异步操作处理器进行交互。
优点和缺点
Proactor模式的优点和缺点是什么呢?
- 优点:
- 应用程序将独立的异步功能进行功能性分离。
- Proactor的接口可用于支持不同操作系统上的多种异步事件分解器。
- 应用程序不需要启动新线程,因为耗时相当长的异步操作会在调用者的线程中运行。
- Proactor模式可以避免上下文的切换。
- 应用程序的逻辑部分不启动任何线程,因此不需要同步。
- 缺点:
- 为了高效地应用Proactor模式,操作系统需要支持异步操作。
- 由于操作启动和完成之间在时间和空间上的分离,调试或测试程序相当困难。
- 异步操作的调用和完成事件的维护需要额外的内存。
Asio,即「异步 IO」(Asynchronous Input/Output)
随着Boost.Asio库可能作为网络库成为C++23的一部分,在未来大家可以在C++中轻易实现Proactor模式了。Boost.Asio是由Christopher Kohlhoff的提供,是“一个用于网络和低级I/O编程的跨平台C++库,并使用现代C++为其他开发者提供了一致性异步模型”。
|