IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> C++ 11 多线程基础 与 简单线程池实现 -> 正文阅读

[C++知识库]C++ 11 多线程基础 与 简单线程池实现


我厚颜无耻汇总一下(抄袭)别人的博客,方便自己翻阅

借鉴(抄袭)

C++ Reference
http://www.cplusplus.com/reference/thread/

C++ thread用法总结(整理)_sevenjoin的博客-CSDN博客_c++ thread
https://blog.csdn.net/sevenjoin/article/details/82187127

C++多线程(一)thread类_coolwriter的博客-CSDN博客_c++ thread
https://blog.csdn.net/coolwriter/article/details/79883253

c++ 11 多线线程系列----mutex_chen-CSDN博客_c++ mutex
https://blog.csdn.net/chenxun_2010/article/details/49786263

C++11 条件变量(condition_variable) 使用详解 - 小海哥哥de - 博客园
https://www.cnblogs.com/xiaohaigegede/p/14008121.html

基于C++11实现线程池 - 知乎
https://zhuanlan.zhihu.com/p/367309864

C++11线程

C++11 新标准中多线程编程包括五个头文件,他们分别是<atomic> ,<thread>,<mutex>,<condition_variable><future>

  • <atomic>:原子量头文件。该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。
  • <thread>:线程头文件。该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。
  • <mutex>:互斥变量头文件。该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。
  • <condition_variable>:条件变量头文件。该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。
  • <future>:提供异步访问。该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。

线程库thread

thread类成员函数:

  1. get_id:获取线程ID,返回一个类型为std::thread::id的对象。

  2. joinable:检查线程是否可被join。检查thread对象是否标识一个活动(active)的可行性线程。缺省构造的thread对象、已经完成join的thread对象、已经detach的thread对象都不是joinable。

  3. join:调用该函数会阻塞当前线程。阻塞调用者(caller)所在的线程直至被join的std::thread对象标识的线程执行结束。

  4. detach:将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以单独进行。一旦线程执行完毕,它所分配的资源将会被释放。

  5. native_handle:该函数返回与std::thread具体实现相关的线程句柄。native_handle_type是连接thread类和操作系统SDK API之间的桥梁,如在Linux g++(libstdc++)里,native_handle_type其实就是pthread里面的pthread_t类型,当thread类的功能不能满足我们的要求的时候(比如改变某个线程的优先级),可以通过thread类实例的native_handle()返回值作为参数来调用相关的pthread函数达到目录。

  6. swap:交换两个线程对象所代表的底层句柄。

  7. operator=:moves the thread object

  8. hardware_concurrency:静态成员函数,返回当前计算机最大的硬件并发线程数目。基本上可以视为处理器的核心数目。

这里的detach和join区别在于,join()是让用户手动管理线程,会阻塞当前线程直到*this所标识的线程结束;detach()不会阻塞当前线程,通过将thread对象分离线程,让线程独立执行,并且当线程运行结束后释放线程的所有资源。

构造函数示例

// thread example
#include <iostream>       // std::cout
#include <thread>         // std::thread
 
void foo() 
{
  // do stuff...
  for(int i=0;i<10;++i)
    std::cout << "foo func run "<<i<<" times\n";
}

void bar(int x)
{
  // do stuff...
    for(int i=0;i<10;++i)
        std::cout << "bar func run "<<i<<" times with param ="<<x<<"\n";
}

int main() 
{
  std::thread first (foo);     // spawn new thread that calls foo()
  std::thread second (bar,0);  // spawn new thread that calls bar(0)

  std::cout << "main, foo and bar now execute concurrently...\n";

  // synchronize threads:
  // 必须将线程join或者detach 等待子线程结束主进程才可以退出
  first.join();                // pauses until first finishes
  second.join();               // pauses until second finishes

  std::cout << "foo and bar completed.\n";

  return 0;
}
  • 输出结果(多次输出结果,可以发现结果顺序不一致):
main, foo and bar now execute concurrently...
foo func run 0 times
foo func run 1 times
foo func run 2 times
foo func run 3 times
foo func run 4 times
foo func run 5 times
foo func run 6 times
foo func run 7 times
foo func run 9 times
bar func run 0 times with param =0
bar func run 1 times with param =0
bar func run 2 times with param =0
bar func run 3 times with param =0
bar func run 4 times with param =0
bar func run 5 times with param =0
bar func run 6 times with param =0
bar func run 7 times with param =0
bar func run 8 times with param =0
bar func run 9 times with param =0
foo and bar completed.

move赋值操作示例

// move 赋值操作,如果当前对象不可 joinable,需要传递一个右值引用(rhs)给 move 赋值操作;如果当前对象可被 joinable,则 terminate() 报错。 
move (1)    
thread& operator= (thread&& rhs) noexcept;

// 拷贝赋值操作被禁用,thread 对象不可被拷贝。 
copy [deleted] (2)  
thread& operator= (const thread&) = delete;
// example for thread::operator=  
#include <iostream>       // std::cout  
#include <thread>         // std::thread, std::this_thread::sleep_for  
#include <chrono>         // std::chrono::seconds  

void pause_thread(int n)  
{  
    std::this_thread::sleep_for(std::chrono::seconds(n));//c++11 this_thread 类  
    std::cout << "pause of " << n << " seconds ended\n";  
}  

int main()  
{  
    std::thread threads[5];                         // default-constructed threads  

    std::cout << "Spawning 5 threads...\n";  
    for (int i = 0; i<5; ++i)  
        threads[i] = std::thread(pause_thread, i + 1);   // move-assign threads 这里调用move复制函数  

    std::cout << "Done spawning threads. Now waiting for them to join:\n";  
    for (int i = 0; i<5; ++i)  
        threads[i].join();  

    std::cout << "All threads joined!\n";  

    return 0;  
}

join

调用该函数会阻塞当前线程。阻塞调用者(caller)所在的线程直至被join的std::thread对象标识的线程执行结束。
主线程会等待当前线程执行完成再退出。

#include<thread>  
#include<array>  
using namespace std;  
void show()  
{  
    cout << "hello cplusplus!" << endl;  
}  
int main()  
{  
    array<thread, 3>  threads = { thread(show), thread(show), thread(show) };  
    for (int i = 0; i < 3; i++)  
    {  
        cout << threads[i].joinable() << endl;//判断线程是否可以join  
        threads[i].join();//主线程等待当前线程执行完成再退出  
    }  
    return 0;  
}  
运行结果:  
hello cplusplus!  
hello cplusplus!  
1  
hello cplusplus!  
1  
1
//joinTest.cc
#include<iostream>
#include<thread>

class Obj
{
    public:
        Obj() {std::cout << "hello ";}
        ~Obj() {std::cout << "world\n";}
};
void joinWorker()
{
    Obj obj;
		std::this_thread::sleep_for(std::chrono::seconds(2));
}

int main()
{
    std::thread j(joinWorker);
    j.join(); 
		std::this_thread::sleep_for(std::chrono::seconds(1));  
		return 0;
}
输出:
hello world

detach

将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以单独进行。一旦线程执行完毕,它所分配的资源将会被释放。
线程 detach 脱离主线程的绑定,主线程挂了,子线程不报错,子线程执行完自动退出。
线程 detach以后,子线程会成为孤儿线程,线程之间将无法通信。

#include<thread>  
using namespace std;  
void show()  
{  
    cout << "hello cplusplus!" << endl;  
}  
int main()  
{  
    thread th(show);  
    //th.join();   
    th.detach();//脱离主线程的绑定,主线程挂了,子线程不报错,子线程执行完自动退出。  
    //detach以后,子线程会成为孤儿线程,线程之间将无法通信。  
    cout << th.joinable() << endl;  
    return 0;  
}  
运行结果:  
hello cplusplus! 
//detachTest.cc
#include<iostream>
#include<thread>

class Obj
{
    public:
        Obj() {std::cout << "hello ";}
        ~Obj() {std::cout << "world\n";}
};
void detachWorker()
{
    Obj obj;
		std::this_thread::sleep_for(std::chrono::seconds(2));
}

int main()
{
    std::thread d(detachWorker);
    d.detach();   
		std::this_thread::sleep_for(std::chrono::seconds(1));
		return 0;
}
输出
hello

线程交换与线程移动

  • 线程交换
#include<thread>  
using namespace std;  
int main()  
{  
    thread t1([]()  
    {  
        cout << "thread1" << endl;  
    });  
    thread t2([]()  
    {  
        cout << "thread2" << endl;  
    });  
    cout << "thread1' id is " << t1.get_id() << endl;  
    cout << "thread2' id is " << t2.get_id() << endl;  

    cout << "swap after:" << endl;  
    swap(t1, t2);//线程交换  
    cout << "thread1' id is " << t1.get_id() << endl;  
    cout << "thread2' id is " << t2.get_id() << endl;  
    return 0;  
}  
运行结果:  
thread1  
thread2  
thread1' id is 4836  
thread2' id is 4724  
swap after:  
thread1' id is 4724  
thread2' id is 4836
  • 线程移动
#include<thread>  
using namespace std;  
int main()  
{  
    thread t1([]()  
    {  
        cout << "thread1" << endl;  
    });  
    cout << "thread1' id is " << t1.get_id() << endl;  
    thread t2 = move(t1);;  
    cout << "thread2' id is " << t2.get_id() << endl;  
    return 0;  
}  
运行结果:  
thread1  
thread1' id is 5620  
thread2' id is 5620

从上述代码中,线程t2可以通过 move 移动 t1 来获取 t1 的全部属性,而 t1 却销毁了。

线程安全

简单看一个样例代码

**#include<thread>  
using namespace std;  
const int N = 100000000;  
int num = 0;  
void run()  
{  
    for (int i = 0; i < N; i++)  
    {  
        num++;  
    }  
}  
int main()  
{  
    clock_t start = clock();  
    thread t1(run);  
    thread t2(run);  
    t1.join();  
    t2.join();  
    clock_t end = clock();  
    cout << "num=" << num << ",用时 " << end - start << " ms" << endl;  
    return 0;  
}  
运行结果:  
num=143653419,用时 730 ms

这里并不是预期的200000000,线程之间发生冲突,从而导致结果不正确。结果可能是2到200000000之间任何一个值。我们可以通过互斥量、原子变量、join等来解决这个问题

  • join
#include<thread>  
using namespace std;  
const int N = 100000000;  
int num = 0;  
void run()  
{  
    for (int i = 0; i < N; i++)  
    {  
        num++;  
    }  
}  
int main()  
{  
    clock_t start = clock();  
    thread t1(run);  
    t1.join();  
    thread t2(run);  
    t2.join();  
    clock_t end = clock();  
    cout << "num=" << num << ",用时 " << end - start << " ms" << endl;  
    return 0;  
}  
运行结果:  
num=200000000,用时 626 ms
  • 原子变量
#include<thread>  
#include<atomic>  
using namespace std;  
const int N = 100000000;  
atomic_int num{ 0 };//不会发生线程冲突,线程安全  
void run()  
{  
    for (int i = 0; i < N; i++)  
    {  
        num++;  
    }  
}  
int main()  
{  
    clock_t start = clock();  
    thread t1(run);  
    thread t2(run);  
    t1.join();  
    t2.join();  
    clock_t end = clock();  
    cout << "num=" << num << ",用时 " << end - start << " ms" << endl;  
    return 0;  
}  
运行结果:  
num=200000000,用时 29732 ms
  • 互斥量
#include<thread>  
#include<mutex>  
using namespace std;  
const int N = 100000000;  
int num(0);  
mutex m;  
void run()  
{  
    for (int i = 0; i < N; i++)  
    {  
        m.lock();  
        num++;  
        m.unlock();  
    }  
}  
int main()  
{  
    clock_t start = clock();  
    thread t1(run);  
    thread t2(run);  
    t1.join();  
    t2.join();  
    clock_t end = clock();  
    cout << "num=" << num << ",用时 " << end - start << " ms" << endl;  
    return 0;  
}  
运行结果:  
num=200000000,用时 128323 ms

互斥信号量mutex

  • mutex类4种
    std::mutex,最基本的 Mutex 类。
    std::recursive_mutex,递归 Mutex 类。
    std::time_mutex,定时 Mutex 类。
    std::recursive_timed_mutex,定时递归 Mutex 类。
  • Lock 类(两种)
    std::lock_guard,与 Mutex RAII 相关,方便线程对互斥量上锁。
    std::unique_lock,与 Mutex RAII 相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。
  • 其他类型
    std::once_flag
    std::adopt_lock_t
    std::defer_lock_t
    std::try_to_lock_t
  • 成员函数
    std::try_lock,尝试同时对多个互斥量上锁。
    std::lock,可以同时对多个互斥量上锁。
    std::call_once,如果多个线程需要同时调用某个函数,call_once 可以保证多个线程对该函数只调用一次。

lock类

lock_guard

lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态;而 lock_guard 的生命周期结束之后,它所管理的锁对象会被解锁(注:类似 shared_ptr 等智能指针管理动态分配的内存资源)。
在 lock_guard 对象构造时,传入的 Mutex 对象(即它所管理的 Mutex 对象)会被当前线程锁住。在lock_guard 对象被析构时,它所管理的 Mutex 对象会自动解锁,由于不需要程序员手动调用 lock 和 unlock 对 Mutex 进行上锁和解锁操作,因此这也是最简单安全的上锁和解锁方式,尤其是在程序抛出异常后先前已被上锁的 Mutex 对象可以正确进行解锁操作,极大地简化了程序员编写与 Mutex 相关的异常处理代码。

  1. locking 初始化
    lock_guard 对象管理 Mutex 对象 m,并在构造时对 m 进行上锁(调用 m.lock())。
  2. adopting初始化
    lock_guard 对象管理 Mutex 对象 m,与 locking 初始化(1) 不同的是, Mutex 对象 m 已被当前线程锁住。
  3. lock_guard 对象的拷贝构造和移动构造(move construction)均被禁用,因此 lock_guard 对象不可被拷贝构造或移动构造。
  • 默认locking初始化
// lock_guard example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock_guard
#include <stdexcept>      // std::logic_error
 
std::mutex mtx;
 
void print_even (int x) {
  if (x%2==0) std::cout << x << " is even\n";
  else throw (std::logic_error("not even"));
}
 
void print_thread_id (int id) {
  try {
    // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
    std::lock_guard<std::mutex> lck (mtx);
    print_even(id);
  }
  catch (std::logic_error&) {
    std::cout << "[exception caught]\n";
  }
}
 
int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);
 
  for (auto& th : threads) th.join();
 
  return 0;
}
输出:
[exception caught]
4 is even
[exception caught]
2 is even
[exception caught]
6 is even
[exception caught]
8 is even
10 is even
[exception caught]
  • adopting 初始化,首先对 mtx 进行上锁操作(mtx.lock()😉,然后用 mtx 对象构造一个 lock_guard 对象
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock_guard, std::adopt_lock
 
std::mutex mtx;           // mutex for critical section
 
void print_thread_id(int id) {
	mtx.lock();
	std::lock_guard<std::mutex> lck(mtx, std::adopt_lock);
	std::cout << "thread #" << id << '\n';
}
 
int main()
{
	std::thread threads[10];
	// spawn 10 threads:
	for (int i = 0; i<10; ++i)
		threads[i] = std::thread(print_thread_id, i + 1);
 
	for (auto& th : threads) th.join();
 
	return 0;
}

unique_lock

lock_guard只能保证在析构的时候执行解锁操作,lock_guard本身并没有提供加锁和解锁的接口。
互斥锁保证了线程间的同步,但是却将并行操作变成了串行操作,这对性能有很大的影响,所以我们要尽可能的减小锁定的区域,也就是使用细粒度锁。
为了解决这个问题,可以使用unique_lock:

提供了lock()和unlock()接口,能记录现在处于上锁还是没上锁状态,在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard就一定会解锁)

unique_lock 对象以独占所有权的方式( unique owership)管理 mutex 对象的上锁和解锁操作,所谓独占所有权,就是没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权。

在构造(或移动(move)赋值)时,unique_lock 对象需要传递一个 Mutex 对象作为它的参数,新创建的 unique_lock 对象负责传入的 Mutex 对象的上锁和解锁操作。

std::unique_lock 对象也能保证在其自身析构时它所管理的 Mutex 对象能够被正确地解锁(即使没有显式地调用 unlock 函数)。

unique_lock比lock_guard灵活很多,效率上差一点,内存占用多一点。

  1. 默认构造函数
    新创建的 unique_lock 对象不管理任何 Mutex 对象。
  2. locking 初始化
    新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.lock() 对 Mutex 对象进行上锁,如果此时另外某个 unique_lock 对象已经管理了该 Mutex 对象 m,则当前线程将会被阻塞。
  3. try-locking 初始化
    新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.try_lock() 对 Mutex 对象进行上锁,但如果上锁不成功,并不会阻塞当前线程。
  4. deferred 初始化
    新创建的 unique_lock 对象管理 Mutex 对象 m,但是在初始化的时候并不锁住 Mutex 对象。 m 应该是一个没有当前线程锁住的 Mutex 对象。
  5. adopting 初始化
    新创建的 unique_lock 对象管理 Mutex 对象 m, m 应该是一个已经被当前线程锁住的 Mutex 对象。(并且当前新创建的 unique_lock 对象拥有对锁(Lock)的所有权)。
  6. locking 一段时间(duration)
    新创建的 unique_lock 对象管理 Mutex 对象 m,并试图通过调用 m.try_lock_for(rel_time) 来锁住 Mutex 对象一段时间(rel_time)。
  7. locking 直到某个时间点(time point)
    新创建的 unique_lock 对象管理 Mutex 对象m,并试图通过调用 m.try_lock_until(abs_time) 来在某个时间点(abs_time)之前锁住 Mutex 对象。
  8. 拷贝构造 [被禁用]
    unique_lock 对象不能被拷贝构造。
  9. 移动(move)构造
    新创建的 unique_lock 对象获得了由 x 所管理的 Mutex 对象的所有权(包括当前 Mutex 的状态)。调用 move 构造之后, x 对象如同通过默认构造函数所创建的,就不再管理任何 Mutex 对象了。
/*
 * @Descripttion: 
 * @version: 
 * @Author: yanqiu
 * @Date: 2021-09-21 13:56:47
 * @LastEditors: yanqiu
 * @LastEditTime: 2021-09-21 13:56:48
 */
// unique_lock example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::unique_lock

std::mutex mtx;           // mutex for critical section

void print_block (int n, char c) {
  // critical section (exclusive access to std::cout signaled by lifetime of lck):
  std::unique_lock<std::mutex> lck (mtx);
  for (int i=0; i<n; ++i) { std::cout << c; }
  std::cout << '\n';
}

int main ()
{
  std::thread th1 (print_block,50,'*');
  std::thread th2 (print_block,50,'$');

  th1.join();
  th2.join();

  return 0;
}
输出:
**************************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
  • adopt_lock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock, std::unique_lock
// std::adopt_lock, std::defer_lock
std::mutex foo, bar;
 
void task_a() {
	std::lock(foo, bar);         // simultaneous lock (prevents deadlock)
	std::unique_lock<std::mutex> lck1(foo, std::adopt_lock);
	std::unique_lock<std::mutex> lck2(bar, std::adopt_lock);
	std::cout << "task a\n";
	// (unlocked automatically on destruction of lck1 and lck2)
}
 
void task_b() {
	// foo.lock(); bar.lock(); // replaced by:
	std::unique_lock<std::mutex> lck1, lck2;
	lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock);
	lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock);
	std::lock(lck1, lck2);       // simultaneous lock (prevents deadlock)
	std::cout << "task b\n";
	// (unlocked automatically on destruction of lck1 and lck2)
}
 
 
int main()
{
	std::thread th1(task_a);
	std::thread th2(task_b);
 
	th1.join();
	th2.join();
 
	return 0;
}

原子变量atomic

std::atomic对int, char, bool等数据结构进行原子性封装,在多线程环境中,对std::atomic对象的访问不会造成竞争-冒险。利用std::atomic可实现数据结构的无锁设计。

所谓的原子操作,取的就是“原子是最小的、不可分割的最小个体”的意义,它表示在多个线程访问同一个全局资源的时候,能够确保所有其他的线程都不在同一时间内访问相同的资源。也就是他确保了在同一时刻只有唯一的线程对这个资源进行访问。这有点类似互斥对象对共享资源的访问的保护,但是原子操作更加接近底层,因而效率更高。

#include <iostream>
#include <ctime>
#include <vector>
#include <thread>
#include <atomic>
 
 
std::atomic<size_t> count(0);
 
void threadFun()
{
	for (int i = 0; i < 10000; i++)
		count++;
}
 
int main(void)
{
	clock_t start_time = clock();
 
	// 启动多个线程
	std::vector<std::thread> threads;
	for (int i = 0; i < 10; i++)
		threads.push_back(std::thread(threadFun));
	for (auto&thad : threads)
		thad.join();
 
	// 检测count是否正确 10000*10 = 100000
	std::cout << "count number:" << count << std::endl;
 
	clock_t end_time = clock();
	std::cout << "耗时:" << end_time - start_time << "ms" << std::endl;
 
	return 0;
}

输出:
count number:100000
耗时:15ms
  • std::atomic_flag
    std::atomic_flag是一个原子的布尔类型,可支持两种原子操作:
  1. test_and_set, 如果atomic_flag对象被设置,则返回true; 如果atomic_flag对象未被设置,则设置之,返回false
  2. clear. 清楚atomic_flag对象
  3. std::atomic_flag可用于多线程之间的同步操作,类似于linux中的信号量。使用atomic_flag可实现mutex.
#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <sstream>

std::atomic_flag lock = ATOMIC_FLAG_INIT;
std::stringstream stream;

void append_numer(int x)
{
  while (lock.test_and_set());
  stream << "thread#" << x << "\n";
  lock.clear();
}

int main()
{
  std::vector<std::thread> ths;
  for (int i=0; i<10; i++)
    ths.push_back(std::thread(append_numer, i));
  for (int i=0; i<10; i++)
    ths[i].join();
  std::cout << stream.str();
  return 0;
}

完美转发与转移语义

C++高阶知识:深入分析移动构造函数及其原理 | 音视跳动科技
http://avdancedu.com/a39d51f9/

聊聊C++中的完美转发 - 知乎
https://zhuanlan.zhihu.com/p/161039484

std::move的作用是无论你传给它的是左值还是右值,通过std::move之后都变成了右值。
std::forward被称为完美转发,它的作用是保持原来的值属性不变。

条件变量condition_variable

在C++11中,我们可以使用条件变量(condition_variable)实现多个线程间的同步操作;当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。

当需要死循环判断某个条件成立与否时【true or false】,我们往往需要开一个线程死循环来判断,这样非常消耗CPU。使用条件变量,可以让当前线程wait,释放CPU,如果条件改变时,我们再notify退出线程,再次进行判断。

当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。

std::condition_variable 对象通常使用 std::unique_lockstd::mutex 来等待

想要修改共享变量(即“条件”)的线程必须:
(1). 获得一个std::mutex
(2). 当持有锁的时候,执行修改动作
(3). 对std::condition_variable执行notify_one或notify_all(当做notify动作时,不必持有锁)

1、wait函数:
(1)wait(unique_lock &lck)

当前线程的执行会被阻塞,直到收到 notify 为止。

(2)wait(unique_lock &lck,Predicate pred)

当前线程仅在pred=false时阻塞;如果pred=true时,不阻塞。

wait()可依次拆分为三个操作:释放互斥锁、等待在条件变量上、再次获取互斥锁

2、notify_one:
notify_one():没有参数、没有返回值。

解除阻塞当前正在等待此条件的线程之一。如果没有线程在等待,则还函数不执行任何操作。如果超过一个,不会指定具体哪一线程。

// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << '\n';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

线程池

经过上述的基础知识的了解,总算能看懂这个程序了。

为什么需要线程池?

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗。

  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控

线程池原理

借鉴(抄袭)
基于C++11实现线程池 - 知乎
https://zhuanlan.zhihu.com/p/367309864

  • 线程池组成部分
    1、线程池管理器:用于创建并管理线程池,包括创建、销毁线程池,添加新任务;
    2、工作线程:指线程池中线程,在没有任务时处于等待状态,在有任务的时候可以循环的执行任务;
    3、任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
    4、任务队列:用于存放没有处理的任务,提供一种缓冲机制。

  • 任务执行过程
    1.是否达到核心线程数量?没达到,则创建一个工作线程来执行任务。
    2.工作队列是否已满,没满,则将提交的任务存储在工作队列中。
    3.是否达到线程最大数量,没达到,则创建一个新的工作线程来执行任务。
    4.最后,执行拒绝策略来处理这个任务。

实现难点

  1. 线程如何复用?C++的thread执行任务时有固定的task函数,执行完之后线程结束。如何实现task和thread的分配?
  2. 线程安全的任务队列如何实现?
  3. 作为线程池主体,线程池提交任务的功能如何实现?需要接受任何参数的任何函数

线程安全的任务队列

  • 使用std::mutex包装std::queue
  • unique_lock会自动给m_mutex上锁,生命周期结束之后自动解锁
  • move移动构造函数减少系统开销
template <typename T>
class SafeQueue
{
private:
    std::queue<T> m_queue; //利用模板函数构造队列
?
    std::mutex m_mutex; // 访问互斥信号量
?
public:
    SafeQueue() {}
    SafeQueue(SafeQueue &&other) {}
    ~SafeQueue() {}
?
    bool empty() // 返回队列是否为空
    {
        std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
?
        return m_queue.empty();
    }
?
    int size()
    {
        std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
?
        return m_queue.size();
    }
?
    // 队列添加元素
    void enqueue(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_queue.emplace(t);
    }
?
    // 队列取出元素
    bool dequeue(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex); // 队列加锁
?
        if (m_queue.empty())
            return false;
        t = std::move(m_queue.front()); // 取出队首元素,返回队首元素值,并进行右值引用
?
        m_queue.pop(); // 弹出入队的第一个元素
?
        return true;
    }
};

线程池

这里有很多都没弄懂。之后再补充

提交函数

// Submit a function to be executed asynchronously by the pool
    template <typename F, typename... Args>
    auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>{
        // Create a function with bounded parameter ready to execute
        std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);// 连接函数和参数定义,特殊函数类型,避免左右值错误
?
        // Encapsulate it into a shared pointer in order to be able to copy construct
        auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);  ③
?
        // Warp packaged task into void function
        std::function<void()> warpper_func = [task_ptr]()
        {
            (*task_ptr)();
        };  ④
?
        // 队列通用安全封包函数,并压入安全队列
        m_queue.enqueue(warpper_func);
?
        // 唤醒一个等待中的线程
        m_conditional_lock.notify_one();  ⑤
?
        // 返回先前注册的任务指针
        return task_ptr->get_future();
    }

内置工作线程lei

class ThreadWorker // 内置线程工作类
{
private:
    int m_id; // 工作id
?
    ThreadPool *m_pool; // 所属线程池
public:
    // 构造函数
    ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
    {
    }
?
    // 重载()操作
    void operator()()
    {
        std::function<void()> func; // 定义基础函数类func
?
        bool dequeued; // 是否正在取出队列中元素
        
        // 判断线程池是否关闭,没有关闭则从任务队列中循环提取任务
        while (!m_pool->m_shutdown)
        {
            {
                // 为线程环境加锁,互访问工作线程的休眠和唤醒
                std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
?
                // 如果任务队列为空,阻塞当前线程
                if (m_pool->m_queue.empty())
                {
                    m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
                }
?
                // 取出任务队列中的元素
                dequeued = m_pool->m_queue.dequeue(func);
            }
?
            // 如果成功取出,执行工作函数
            if (dequeued)
                func();
        }
    }
};

完整代码

//thread_pool.h
?
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
?
#include <mutex>
#include <queue>
#include <functional>
#include <future>
#include <thread>
#include <utility>
#include <vector>
?
// Thread safe implementation of a Queue using a std::queue
template <typename T>
class SafeQueue
{
private:
    std::queue<T> m_queue; //利用模板函数构造队列
?
    std::mutex m_mutex; // 访问互斥信号量
?
public:
    SafeQueue() {}
    SafeQueue(SafeQueue &&other) {}
    ~SafeQueue() {}
?
    bool empty() // 返回队列是否为空
    {
        std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
?
        return m_queue.empty();
    }
?
    int size()
    {
        std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
?
        return m_queue.size();
    }
?
    // 队列添加元素
    void enqueue(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_queue.emplace(t);
    }
?
    // 队列取出元素
    bool dequeue(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex); // 队列加锁
?
        if (m_queue.empty())
            return false;
        t = std::move(m_queue.front()); // 取出队首元素,返回队首元素值,并进行右值引用
?
        m_queue.pop(); // 弹出入队的第一个元素
?
        return true;
    }
};
?
class ThreadPool
{
private:
    class ThreadWorker // 内置线程工作类
    {
    private:
        int m_id; // 工作id
?
        ThreadPool *m_pool; // 所属线程池
    public:
        // 构造函数
        ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
        {
        }
?
        // 重载()操作
        void operator()()
        {
            std::function<void()> func; // 定义基础函数类func
?
            bool dequeued; // 是否正在取出队列中元素
?
            while (!m_pool->m_shutdown)
            {
                {
                    // 为线程环境加锁,互访问工作线程的休眠和唤醒
                    std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
?
                    // 如果任务队列为空,阻塞当前线程
                    if (m_pool->m_queue.empty())
                    {
                        m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
                    }
?
                    // 取出任务队列中的元素
                    dequeued = m_pool->m_queue.dequeue(func);
                }
?
                // 如果成功取出,执行工作函数
                if (dequeued)
                    func();
            }
        }
    };
?
    bool m_shutdown; // 线程池是否关闭
?
    SafeQueue<std::function<void()>> m_queue; // 执行函数安全队列,即任务队列
?
    std::vector<std::thread> m_threads; // 工作线程队列
?
    std::mutex m_conditional_mutex; // 线程休眠锁互斥变量
?
    std::condition_variable m_conditional_lock; // 线程环境锁,可以让线程处于休眠或者唤醒状态
?
public:
    // 线程池构造函数
    ThreadPool(const int n_threads = 4)
        : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
    {
    }
?
    ThreadPool(const ThreadPool &) = delete;
?
    ThreadPool(ThreadPool &&) = delete;
?
    ThreadPool &operator=(const ThreadPool &) = delete;
?
    ThreadPool &operator=(ThreadPool &&) = delete;
?
    // Inits thread pool
    void init()
    {
        for (int i = 0; i < m_threads.size(); ++i)
        {
            m_threads.at(i) = std::thread(ThreadWorker(this, i)); // 分配工作线程
        }
    }
?
    // Waits until threads finish their current task and shutdowns the pool
    void shutdown()
    {
        m_shutdown = true;
        m_conditional_lock.notify_all(); // 通知,唤醒所有工作线程
?
        for (int i = 0; i < m_threads.size(); ++i)
        {
            if (m_threads.at(i).joinable()) // 判断线程是否在等待
            {
                m_threads.at(i).join(); // 将线程加入到等待队列
            }
        }
    }
?
    // Submit a function to be executed asynchronously by the pool
    template <typename F, typename... Args>
    auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
    {
        // Create a function with bounded parameter ready to execute
        std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
?
        // Encapsulate it into a shared pointer in order to be able to copy construct
        auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
?
        // Warp packaged task into void function
        std::function<void()> warpper_func = [task_ptr]()
        {
            (*task_ptr)();
        };
?
        // 队列通用安全封包函数,并压入安全队列
        m_queue.enqueue(warpper_func);
?
        // 唤醒一个等待中的线程
        m_conditional_lock.notify_one();
?
        // 返回先前注册的任务指针
        return task_ptr->get_future();
    }
};
?
#endif

测试样例代码

// test.cpp
?
#include <iostream>
#include <random>
#include "thread_pool.h"
std::random_device rd; // 真实随机数产生器
?
std::mt19937 mt(rd()); //生成计算随机数mt
?
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
?
auto rnd = std::bind(dist, mt);
?
// 设置线程睡眠时间
void simulate_hard_computation()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
?
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
}
?
// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
    simulate_hard_computation();
    out = a * b;
    std::cout << a << " * " << b << " = " << out << std::endl;
}
?
// 结果返回
int multiply_return(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
    return res;
}
?
void example()
{
    // 创建3个线程的线程池
    ThreadPool pool(3);
?
    // 初始化线程池
    pool.init();
?
    // 提交乘法操作,总共30个
    for (int i = 1; i <= 3; ++i)
        for (int j = 1; j <= 10; ++j)
        {
            pool.submit(multiply, i, j);
        }
?
    // 使用ref传递的输出参数提交函数
    int output_ref;
    auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
?
    // 等待乘法输出完成
    future1.get();
    std::cout << "Last operation result is equals to " << output_ref << std::endl;
?
    // 使用return参数提交函数
    auto future2 = pool.submit(multiply_return, 5, 3);
?
    // 等待乘法输出完成
    int res = future2.get();
    std::cout << "Last operation result is equals to " << res << std::endl;
?
    // 关闭线程池
    pool.shutdown();
}
?
int main()
{
    example();
?
    return 0;
}
  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2021-09-23 11:16:21  更:2021-09-23 11:18:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/29 4:01:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计