我厚颜无耻汇总一下(抄袭)别人的博客,方便自己翻阅
借鉴(抄袭)
C++ Reference http://www.cplusplus.com/reference/thread/
C++ thread用法总结(整理)_sevenjoin的博客-CSDN博客_c++ thread https://blog.csdn.net/sevenjoin/article/details/82187127
C++多线程(一)thread类_coolwriter的博客-CSDN博客_c++ thread https://blog.csdn.net/coolwriter/article/details/79883253
c++ 11 多线线程系列----mutex_chen-CSDN博客_c++ mutex https://blog.csdn.net/chenxun_2010/article/details/49786263
C++11 条件变量(condition_variable) 使用详解 - 小海哥哥de - 博客园 https://www.cnblogs.com/xiaohaigegede/p/14008121.html
基于C++11实现线程池 - 知乎 https://zhuanlan.zhihu.com/p/367309864
C++11线程
C++11 新标准中多线程编程包括五个头文件,他们分别是<atomic> ,<thread>,<mutex>,<condition_variable> 和<future> 。
<atomic> :原子量头文件。该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。<thread> :线程头文件。该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。<mutex> :互斥变量头文件。该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。<condition_variable> :条件变量头文件。该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。<future> :提供异步访问。该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。
线程库thread
thread类成员函数:
-
get_id:获取线程ID,返回一个类型为std::thread::id的对象。 -
joinable:检查线程是否可被join。检查thread对象是否标识一个活动(active)的可行性线程。缺省构造的thread对象、已经完成join的thread对象、已经detach的thread对象都不是joinable。 -
join:调用该函数会阻塞当前线程。阻塞调用者(caller)所在的线程直至被join的std::thread对象标识的线程执行结束。 -
detach:将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以单独进行。一旦线程执行完毕,它所分配的资源将会被释放。 -
native_handle:该函数返回与std::thread具体实现相关的线程句柄。native_handle_type是连接thread类和操作系统SDK API之间的桥梁,如在Linux g++(libstdc++)里,native_handle_type其实就是pthread里面的pthread_t类型,当thread类的功能不能满足我们的要求的时候(比如改变某个线程的优先级),可以通过thread类实例的native_handle()返回值作为参数来调用相关的pthread函数达到目录。 -
swap:交换两个线程对象所代表的底层句柄。 -
operator=:moves the thread object -
hardware_concurrency:静态成员函数,返回当前计算机最大的硬件并发线程数目。基本上可以视为处理器的核心数目。
这里的detach和join区别在于,join()是让用户手动管理线程,会阻塞当前线程直到*this所标识的线程结束;detach()不会阻塞当前线程,通过将thread对象分离线程,让线程独立执行,并且当线程运行结束后释放线程的所有资源。
构造函数示例
#include <iostream>
#include <thread>
void foo()
{
for(int i=0;i<10;++i)
std::cout << "foo func run "<<i<<" times\n";
}
void bar(int x)
{
for(int i=0;i<10;++i)
std::cout << "bar func run "<<i<<" times with param ="<<x<<"\n";
}
int main()
{
std::thread first (foo);
std::thread second (bar,0);
std::cout << "main, foo and bar now execute concurrently...\n";
first.join();
second.join();
std::cout << "foo and bar completed.\n";
return 0;
}
- 输出结果(多次输出结果,可以发现结果顺序不一致):
main, foo and bar now execute concurrently...
foo func run 0 times
foo func run 1 times
foo func run 2 times
foo func run 3 times
foo func run 4 times
foo func run 5 times
foo func run 6 times
foo func run 7 times
foo func run 9 times
bar func run 0 times with param =0
bar func run 1 times with param =0
bar func run 2 times with param =0
bar func run 3 times with param =0
bar func run 4 times with param =0
bar func run 5 times with param =0
bar func run 6 times with param =0
bar func run 7 times with param =0
bar func run 8 times with param =0
bar func run 9 times with param =0
foo and bar completed.
move赋值操作示例
move (1)
thread& operator= (thread&& rhs) noexcept;
copy [deleted] (2)
thread& operator= (const thread&) = delete;
#include <iostream>
#include <thread>
#include <chrono>
void pause_thread(int n)
{
std::this_thread::sleep_for(std::chrono::seconds(n));
std::cout << "pause of " << n << " seconds ended\n";
}
int main()
{
std::thread threads[5];
std::cout << "Spawning 5 threads...\n";
for (int i = 0; i<5; ++i)
threads[i] = std::thread(pause_thread, i + 1);
std::cout << "Done spawning threads. Now waiting for them to join:\n";
for (int i = 0; i<5; ++i)
threads[i].join();
std::cout << "All threads joined!\n";
return 0;
}
join
调用该函数会阻塞当前线程。阻塞调用者(caller)所在的线程直至被join的std::thread对象标识的线程执行结束。 主线程会等待当前线程执行完成再退出。
#include<thread>
#include<array>
using namespace std;
void show()
{
cout << "hello cplusplus!" << endl;
}
int main()
{
array<thread, 3> threads = { thread(show), thread(show), thread(show) };
for (int i = 0; i < 3; i++)
{
cout << threads[i].joinable() << endl;
threads[i].join();
}
return 0;
}
运行结果:
hello cplusplus!
hello cplusplus!
1
hello cplusplus!
1
1
#include<iostream>
#include<thread>
class Obj
{
public:
Obj() {std::cout << "hello ";}
~Obj() {std::cout << "world\n";}
};
void joinWorker()
{
Obj obj;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
int main()
{
std::thread j(joinWorker);
j.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
输出:
hello world
detach
将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以单独进行。一旦线程执行完毕,它所分配的资源将会被释放。 线程 detach 脱离主线程的绑定,主线程挂了,子线程不报错,子线程执行完自动退出。 线程 detach以后,子线程会成为孤儿线程,线程之间将无法通信。
#include<thread>
using namespace std;
void show()
{
cout << "hello cplusplus!" << endl;
}
int main()
{
thread th(show);
th.detach();
cout << th.joinable() << endl;
return 0;
}
运行结果:
hello cplusplus!
#include<iostream>
#include<thread>
class Obj
{
public:
Obj() {std::cout << "hello ";}
~Obj() {std::cout << "world\n";}
};
void detachWorker()
{
Obj obj;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
int main()
{
std::thread d(detachWorker);
d.detach();
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
输出
hello
线程交换与线程移动
#include<thread>
using namespace std;
int main()
{
thread t1([]()
{
cout << "thread1" << endl;
});
thread t2([]()
{
cout << "thread2" << endl;
});
cout << "thread1' id is " << t1.get_id() << endl;
cout << "thread2' id is " << t2.get_id() << endl;
cout << "swap after:" << endl;
swap(t1, t2);
cout << "thread1' id is " << t1.get_id() << endl;
cout << "thread2' id is " << t2.get_id() << endl;
return 0;
}
运行结果:
thread1
thread2
thread1' id is 4836
thread2' id is 4724
swap after:
thread1' id is 4724
thread2' id is 4836
#include<thread>
using namespace std;
int main()
{
thread t1([]()
{
cout << "thread1" << endl;
});
cout << "thread1' id is " << t1.get_id() << endl;
thread t2 = move(t1);;
cout << "thread2' id is " << t2.get_id() << endl;
return 0;
}
运行结果:
thread1
thread1' id is 5620
thread2' id is 5620
从上述代码中,线程t2可以通过 move 移动 t1 来获取 t1 的全部属性,而 t1 却销毁了。
线程安全
简单看一个样例代码
**#include<thread>
using namespace std;
const int N = 100000000;
int num = 0;
void run()
{
for (int i = 0; i < N; i++)
{
num++;
}
}
int main()
{
clock_t start = clock();
thread t1(run);
thread t2(run);
t1.join();
t2.join();
clock_t end = clock();
cout << "num=" << num << ",用时 " << end - start << " ms" << endl;
return 0;
}
运行结果:
num=143653419,用时 730 ms
这里并不是预期的200000000,线程之间发生冲突,从而导致结果不正确。结果可能是2到200000000之间任何一个值。我们可以通过互斥量、原子变量、join等来解决这个问题
#include<thread>
using namespace std;
const int N = 100000000;
int num = 0;
void run()
{
for (int i = 0; i < N; i++)
{
num++;
}
}
int main()
{
clock_t start = clock();
thread t1(run);
t1.join();
thread t2(run);
t2.join();
clock_t end = clock();
cout << "num=" << num << ",用时 " << end - start << " ms" << endl;
return 0;
}
运行结果:
num=200000000,用时 626 ms
#include<thread>
#include<atomic>
using namespace std;
const int N = 100000000;
atomic_int num{ 0 };
void run()
{
for (int i = 0; i < N; i++)
{
num++;
}
}
int main()
{
clock_t start = clock();
thread t1(run);
thread t2(run);
t1.join();
t2.join();
clock_t end = clock();
cout << "num=" << num << ",用时 " << end - start << " ms" << endl;
return 0;
}
运行结果:
num=200000000,用时 29732 ms
#include<thread>
#include<mutex>
using namespace std;
const int N = 100000000;
int num(0);
mutex m;
void run()
{
for (int i = 0; i < N; i++)
{
m.lock();
num++;
m.unlock();
}
}
int main()
{
clock_t start = clock();
thread t1(run);
thread t2(run);
t1.join();
t2.join();
clock_t end = clock();
cout << "num=" << num << ",用时 " << end - start << " ms" << endl;
return 0;
}
运行结果:
num=200000000,用时 128323 ms
互斥信号量mutex
- mutex类4种
std::mutex,最基本的 Mutex 类。 std::recursive_mutex,递归 Mutex 类。 std::time_mutex,定时 Mutex 类。 std::recursive_timed_mutex,定时递归 Mutex 类。 - Lock 类(两种)
std::lock_guard,与 Mutex RAII 相关,方便线程对互斥量上锁。 std::unique_lock,与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。 - 其他类型
std::once_flag std::adopt_lock_t std::defer_lock_t std::try_to_lock_t - 成员函数
std::try_lock,尝试同时对多个互斥量上锁。 std::lock,可以同时对多个互斥量上锁。 std::call_once,如果多个线程需要同时调用某个函数,call_once 可以保证多个线程对该函数只调用一次。
lock类
lock_guard
lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态;而 lock_guard 的生命周期结束之后,它所管理的锁对象会被解锁(注:类似 shared_ptr 等智能指针管理动态分配的内存资源)。 在 lock_guard 对象构造时,传入的 Mutex 对象(即它所管理的 Mutex 对象)会被当前线程锁住。在lock_guard 对象被析构时,它所管理的 Mutex 对象会自动解锁,由于不需要程序员手动调用 lock 和 unlock 对 Mutex 进行上锁和解锁操作,因此这也是最简单安全的上锁和解锁方式,尤其是在程序抛出异常后先前已被上锁的 Mutex 对象可以正确进行解锁操作,极大地简化了程序员编写与 Mutex 相关的异常处理代码。
- locking 初始化
lock_guard 对象管理 Mutex 对象 m,并在构造时对 m 进行上锁(调用 m.lock())。 - adopting初始化
lock_guard 对象管理 Mutex 对象 m,与 locking 初始化(1) 不同的是, Mutex 对象 m 已被当前线程锁住。 - lock_guard 对象的拷贝构造和移动构造(move construction)均被禁用,因此 lock_guard 对象不可被拷贝构造或移动构造。
#include <iostream>
#include <thread>
#include <mutex>
#include <stdexcept>
std::mutex mtx;
void print_even (int x) {
if (x%2==0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}
void print_thread_id (int id) {
try {
std::lock_guard<std::mutex> lck (mtx);
print_even(id);
}
catch (std::logic_error&) {
std::cout << "[exception caught]\n";
}
}
int main ()
{
std::thread threads[10];
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
输出:
[exception caught]
4 is even
[exception caught]
2 is even
[exception caught]
6 is even
[exception caught]
8 is even
10 is even
[exception caught]
- adopting 初始化,首先对 mtx 进行上锁操作(mtx.lock()😉,然后用 mtx 对象构造一个 lock_guard 对象
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void print_thread_id(int id) {
mtx.lock();
std::lock_guard<std::mutex> lck(mtx, std::adopt_lock);
std::cout << "thread #" << id << '\n';
}
int main()
{
std::thread threads[10];
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_thread_id, i + 1);
for (auto& th : threads) th.join();
return 0;
}
unique_lock
lock_guard只能保证在析构的时候执行解锁操作,lock_guard本身并没有提供加锁和解锁的接口。 互斥锁保证了线程间的同步,但是却将并行操作变成了串行操作,这对性能有很大的影响,所以我们要尽可能的减小锁定的区域,也就是使用细粒度锁。 为了解决这个问题,可以使用unique_lock:
提供了lock()和unlock()接口,能记录现在处于上锁还是没上锁状态,在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard就一定会解锁)
unique_lock 对象以独占所有权的方式( unique owership)管理 mutex 对象的上锁和解锁操作,所谓独占所有权,就是没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权。
在构造(或移动(move)赋值)时,unique_lock 对象需要传递一个 Mutex 对象作为它的参数,新创建的 unique_lock 对象负责传入的 Mutex 对象的上锁和解锁操作。
std::unique_lock 对象也能保证在其自身析构时它所管理的 Mutex 对象能够被正确地解锁(即使没有显式地调用 unlock 函数)。
unique_lock比lock_guard灵活很多,效率上差一点,内存占用多一点。
- 默认构造函数
新创建的 unique_lock 对象不管理任何 Mutex 对象。 - locking 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.lock() 对 Mutex 对象进行上锁,如果此时另外某个 unique_lock 对象已经管理了该 Mutex 对象 m,则当前线程将会被阻塞。 - try-locking 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.try_lock() 对 Mutex 对象进行上锁,但如果上锁不成功,并不会阻塞当前线程。 - deferred 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m,但是在初始化的时候并不锁住 Mutex 对象。 m 应该是一个没有当前线程锁住的 Mutex 对象。 - adopting 初始化
新创建的 unique_lock 对象管理 Mutex 对象 m, m 应该是一个已经被当前线程锁住的 Mutex 对象。(并且当前新创建的 unique_lock 对象拥有对锁(Lock)的所有权)。 - locking 一段时间(duration)
新创建的 unique_lock 对象管理 Mutex 对象 m,并试图通过调用 m.try_lock_for(rel_time) 来锁住 Mutex 对象一段时间(rel_time)。 - locking 直到某个时间点(time point)
新创建的 unique_lock 对象管理 Mutex 对象m,并试图通过调用 m.try_lock_until(abs_time) 来在某个时间点(abs_time)之前锁住 Mutex 对象。 - 拷贝构造 [被禁用]
unique_lock 对象不能被拷贝构造。 - 移动(move)构造
新创建的 unique_lock 对象获得了由 x 所管理的 Mutex 对象的所有权(包括当前 Mutex 的状态)。调用 move 构造之后, x 对象如同通过默认构造函数所创建的,就不再管理任何 Mutex 对象了。
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void print_block (int n, char c) {
std::unique_lock<std::mutex> lck (mtx);
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
输出:
**************************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
#include <iostream>
#include <thread>
#include <mutex>
std::mutex foo, bar;
void task_a() {
std::lock(foo, bar);
std::unique_lock<std::mutex> lck1(foo, std::adopt_lock);
std::unique_lock<std::mutex> lck2(bar, std::adopt_lock);
std::cout << "task a\n";
}
void task_b() {
std::unique_lock<std::mutex> lck1, lck2;
lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock);
lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock);
std::lock(lck1, lck2);
std::cout << "task b\n";
}
int main()
{
std::thread th1(task_a);
std::thread th2(task_b);
th1.join();
th2.join();
return 0;
}
原子变量atomic
std::atomic对int, char, bool等数据结构进行原子性封装,在多线程环境中,对std::atomic对象的访问不会造成竞争-冒险。利用std::atomic可实现数据结构的无锁设计。
所谓的原子操作,取的就是“原子是最小的、不可分割的最小个体”的意义,它表示在多个线程访问同一个全局资源的时候,能够确保所有其他的线程都不在同一时间内访问相同的资源。也就是他确保了在同一时刻只有唯一的线程对这个资源进行访问。这有点类似互斥对象对共享资源的访问的保护,但是原子操作更加接近底层,因而效率更高。
#include <iostream>
#include <ctime>
#include <vector>
#include <thread>
#include <atomic>
std::atomic<size_t> count(0);
void threadFun()
{
for (int i = 0; i < 10000; i++)
count++;
}
int main(void)
{
clock_t start_time = clock();
std::vector<std::thread> threads;
for (int i = 0; i < 10; i++)
threads.push_back(std::thread(threadFun));
for (auto&thad : threads)
thad.join();
std::cout << "count number:" << count << std::endl;
clock_t end_time = clock();
std::cout << "耗时:" << end_time - start_time << "ms" << std::endl;
return 0;
}
输出:
count number:100000
耗时:15ms
- std::atomic_flag
std::atomic_flag是一个原子的布尔类型,可支持两种原子操作:
- test_and_set, 如果atomic_flag对象被设置,则返回true; 如果atomic_flag对象未被设置,则设置之,返回false
- clear. 清楚atomic_flag对象
- std::atomic_flag可用于多线程之间的同步操作,类似于linux中的信号量。使用atomic_flag可实现mutex.
#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <sstream>
std::atomic_flag lock = ATOMIC_FLAG_INIT;
std::stringstream stream;
void append_numer(int x)
{
while (lock.test_and_set());
stream << "thread#" << x << "\n";
lock.clear();
}
int main()
{
std::vector<std::thread> ths;
for (int i=0; i<10; i++)
ths.push_back(std::thread(append_numer, i));
for (int i=0; i<10; i++)
ths[i].join();
std::cout << stream.str();
return 0;
}
完美转发与转移语义
C++高阶知识:深入分析移动构造函数及其原理 | 音视跳动科技 http://avdancedu.com/a39d51f9/
聊聊C++中的完美转发 - 知乎 https://zhuanlan.zhihu.com/p/161039484
std::move的作用是无论你传给它的是左值还是右值,通过std::move之后都变成了右值。 std::forward被称为完美转发,它的作用是保持原来的值属性不变。
条件变量condition_variable
在C++11中,我们可以使用条件变量(condition_variable)实现多个线程间的同步操作;当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。
当需要死循环判断某个条件成立与否时【true or false】,我们往往需要开一个线程死循环来判断,这样非常消耗CPU。使用条件变量,可以让当前线程wait,释放CPU,如果条件改变时,我们再notify退出线程,再次进行判断。
当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。
std::condition_variable 对象通常使用 std::unique_lockstd::mutex 来等待
想要修改共享变量(即“条件”)的线程必须: (1). 获得一个std::mutex (2). 当持有锁的时候,执行修改动作 (3). 对std::condition_variable执行notify_one或notify_all(当做notify动作时,不必持有锁)
1、wait函数: (1)wait(unique_lock &lck)
当前线程的执行会被阻塞,直到收到 notify 为止。
(2)wait(unique_lock &lck,Predicate pred)
当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞。
wait()可依次拆分为三个操作:释放互斥锁、等待在条件变量上、再次获取互斥锁
2、notify_one: notify_one():没有参数、没有返回值。
解除阻塞当前正在等待此条件的线程之一。如果没有线程在等待,则还函数不执行任何操作。如果超过一个,不会指定具体哪一线程。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}
int main ()
{
std::thread threads[10];
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_id,i);
std::cout << "10 threads ready to race...\n";
go();
for (auto& th : threads) th.join();
return 0;
}
线程池
经过上述的基础知识的了解,总算能看懂这个程序了。
为什么需要线程池?
-
降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗。 -
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 -
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控
线程池原理
借鉴(抄袭) 基于C++11实现线程池 - 知乎 https://zhuanlan.zhihu.com/p/367309864
-
线程池组成部分 1、线程池管理器:用于创建并管理线程池,包括创建、销毁线程池,添加新任务; 2、工作线程:指线程池中线程,在没有任务时处于等待状态,在有任务的时候可以循环的执行任务; 3、任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等; 4、任务队列:用于存放没有处理的任务,提供一种缓冲机制。 -
任务执行过程 1.是否达到核心线程数量?没达到,则创建一个工作线程来执行任务。 2.工作队列是否已满,没满,则将提交的任务存储在工作队列中。 3.是否达到线程最大数量,没达到,则创建一个新的工作线程来执行任务。 4.最后,执行拒绝策略来处理这个任务。
实现难点
- 线程如何复用?C++的thread执行任务时有固定的task函数,执行完之后线程结束。如何实现task和thread的分配?
- 线程安全的任务队列如何实现?
- 作为线程池主体,线程池提交任务的功能如何实现?需要接受任何参数的任何函数
线程安全的任务队列
- 使用std::mutex包装std::queue
- unique_lock会自动给m_mutex上锁,生命周期结束之后自动解锁
- move移动构造函数减少系统开销
template <typename T>
class SafeQueue
{
private:
std::queue<T> m_queue;
?
std::mutex m_mutex;
?
public:
SafeQueue() {}
SafeQueue(SafeQueue &&other) {}
~SafeQueue() {}
?
bool empty()
{
std::unique_lock<std::mutex> lock(m_mutex);
?
return m_queue.empty();
}
?
int size()
{
std::unique_lock<std::mutex> lock(m_mutex);
?
return m_queue.size();
}
?
void enqueue(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
?
bool dequeue(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
?
if (m_queue.empty())
return false;
t = std::move(m_queue.front());
?
m_queue.pop();
?
return true;
}
};
线程池
这里有很多都没弄懂。之后再补充
提交函数
template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> ①
{
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); ②
?
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func); ③
?
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
}; ④
?
m_queue.enqueue(warpper_func);
?
m_conditional_lock.notify_one(); ⑤
?
return task_ptr->get_future();
}
内置工作线程lei
class ThreadWorker
{
private:
int m_id;
?
ThreadPool *m_pool;
public:
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
{
}
?
void operator()()
{
std::function<void()> func;
?
bool dequeued;
while (!m_pool->m_shutdown)
{
{
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
?
if (m_pool->m_queue.empty())
{
m_pool->m_conditional_lock.wait(lock);
}
?
dequeued = m_pool->m_queue.dequeue(func);
}
?
if (dequeued)
func();
}
}
};
完整代码
?
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
?
#include <mutex>
#include <queue>
#include <functional>
#include <future>
#include <thread>
#include <utility>
#include <vector>
?
template <typename T>
class SafeQueue
{
private:
std::queue<T> m_queue;
?
std::mutex m_mutex;
?
public:
SafeQueue() {}
SafeQueue(SafeQueue &&other) {}
~SafeQueue() {}
?
bool empty()
{
std::unique_lock<std::mutex> lock(m_mutex);
?
return m_queue.empty();
}
?
int size()
{
std::unique_lock<std::mutex> lock(m_mutex);
?
return m_queue.size();
}
?
void enqueue(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
?
bool dequeue(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
?
if (m_queue.empty())
return false;
t = std::move(m_queue.front());
?
m_queue.pop();
?
return true;
}
};
?
class ThreadPool
{
private:
class ThreadWorker
{
private:
int m_id;
?
ThreadPool *m_pool;
public:
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
{
}
?
void operator()()
{
std::function<void()> func;
?
bool dequeued;
?
while (!m_pool->m_shutdown)
{
{
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
?
if (m_pool->m_queue.empty())
{
m_pool->m_conditional_lock.wait(lock);
}
?
dequeued = m_pool->m_queue.dequeue(func);
}
?
if (dequeued)
func();
}
}
};
?
bool m_shutdown;
?
SafeQueue<std::function<void()>> m_queue;
?
std::vector<std::thread> m_threads;
?
std::mutex m_conditional_mutex;
?
std::condition_variable m_conditional_lock;
?
public:
ThreadPool(const int n_threads = 4)
: m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
{
}
?
ThreadPool(const ThreadPool &) = delete;
?
ThreadPool(ThreadPool &&) = delete;
?
ThreadPool &operator=(const ThreadPool &) = delete;
?
ThreadPool &operator=(ThreadPool &&) = delete;
?
void init()
{
for (int i = 0; i < m_threads.size(); ++i)
{
m_threads.at(i) = std::thread(ThreadWorker(this, i));
}
}
?
void shutdown()
{
m_shutdown = true;
m_conditional_lock.notify_all();
?
for (int i = 0; i < m_threads.size(); ++i)
{
if (m_threads.at(i).joinable())
{
m_threads.at(i).join();
}
}
}
?
template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
?
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
?
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
?
m_queue.enqueue(warpper_func);
?
m_conditional_lock.notify_one();
?
return task_ptr->get_future();
}
};
?
#endif
测试样例代码
?
#include <iostream>
#include <random>
#include "thread_pool.h"
std::random_device rd;
?
std::mt19937 mt(rd());
?
std::uniform_int_distribution<int> dist(-1000, 1000);
?
auto rnd = std::bind(dist, mt);
?
void simulate_hard_computation()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
?
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
?
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
?
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
?
void example()
{
ThreadPool pool(3);
?
pool.init();
?
for (int i = 1; i <= 3; ++i)
for (int j = 1; j <= 10; ++j)
{
pool.submit(multiply, i, j);
}
?
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
?
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
?
auto future2 = pool.submit(multiply_return, 5, 3);
?
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
?
pool.shutdown();
}
?
int main()
{
example();
?
return 0;
}
|