typora-copy-images-to: upload
C++11 多线程
一、基本概念
1.并发:一个核通过切换上下文运行多个线程或进程;并行:一个核运行一个进程或线程,多个核同时进行
2.进程:一个可执行程序
3.线程:进程一定含有一个主线程,主线程是唯一的,其生命周期等于进程,多线程并发,线程频繁的切换会消耗本属于进程的运行时间
4.一个进程执行完毕的标志是主线程执行完毕
二、线程详解
1.线程的基本函数
join(),主线程等待子线程执行完毕,负责回收子线程的资源。
detach(),分离线程函数,使用detach()函数会让线程在后台运行,即说明主线程不会等待子线程运行结束才结束。分离后的子线程由运行时刻(操作系统)接管,回收其资源,相当于一个守护线程,子线程detach后不能再用join回收了。如果此时子线程中使用了主线程中数据或对象的引用,而操作系统没有因为数据安全忽略掉引用而使用拷贝形式的话,datach后主线程回收会导致资源的回收,从而导致传递给子线程的引用数据不存在,引发错误。
joinable(),检查线程是否可被join或datach.检查thread对象是否标识一个活动(active)的可行性线程。缺省构造的thread对象、已经完成join的thread对象、已经detach的thread对象都不是joinable。返回true(可以使用datach或join),false(不可以使用datach或join)。
#include <thread>
#include <iostream>
using namespace std;
void Func(const int &num){
cout << "子线程开始执行" << endl;
cout << num << endl;
cout << "子线程执行结束" << endl;
}
int main(){
int count = 10;
thread mythread(Func,count);//创建线程
mythread.join();
//mythread.detach()
if(mythread.joinable()){
cout << "还没有datach或join" << endl;
}
else{
cout << "已经datach或join" << endl;
}
cout << "主线程执行完毕" << endl;
return 0;
}
join()运行结果
detach()运行结果,一种情况。主线程先于子线程执行结束,主线程就会与子线程分离,如果主线程先执行完毕,销毁线程对象及局部变量,并且子线程使用了主线程中的共享变量或引用数据,这样子线程可能使用的变量,就变成未定义,产生异常或不可预测的错误。
2.线程传参
get_id(),获取线程ID,返回一个类型为std::thread::id的对象
test1()中创建子线程时,由于线程入口函数Func()中的第二个参数是一个对象,所以传参时将会发生隐式转换,而使用datach无法保证主线程执行结束前转换已经完成,就可能存在主线程已经执行完成回收了栈数据mySecond,而后子线程用已经回收的数据构造构造对象,产生未定义行为、产生异常或不可预测的错误。所以必须使用join()。join()运行结果如下:
我们可以发现对象的构建是在子线程中完成的。
test2(),在创建子线程中我们提前构造临时对象,确保临时对象的构造在主线程中完成,避免test1()的问题。因此可以使用detach(),datach()运行结果如下:
临时对象在主线程中完成构造,操作系统为了安全忽略掉引用使用拷贝。
问题:为什么使用了两次拷贝构造,第二次拷贝构造在主线程中执行,而析构在子线程中
test3(),通过ref传递对象的引用(ref能用包装类型reference_wrapper来代替原本会被识别的值类型,而reference_wrapper能隐式转换为被引用的值的引用类型),从而在子线程中改变主线程中对象的数据成员。执行结果如下:
test4(),执行结果如下:
test5(),执行结果如下:
test6(),执行结果如下:
#include <thread>
#include <iostream>
using namespace std;
class A
{
public:
int m_i;
A(int a) : m_i(a)
{
cout << "构造函数执行" << this << "线程id:" << std::this_thread::get_id() << endl;
}
A(const A &a) : m_i(a.m_i)
{
cout << "拷贝构造函数执行" << this << "线程id:" << std::this_thread::get_id() << endl;
}
~A()
{
cout << "析构函数执行" << this << "线程id:" << std::this_thread::get_id() << endl;
}
void thread_work(int num)
{
cout << "thread_work 执行,num = " << num << endl;
}
void operator()(int num)
{
cout << "可调用对象,num = " << num << endl;
}
};
//为什么对象引用一定要用const修饰
//传递的临时对象属于右值,右值只能被 const 类型的引用所指向,函数的参数如果是一个类的引用,定义为const类型时,右值作为函数参数可以不报错
void Func(const A &buf)
{
cout << "对象地址:" << &buf << endl;
return;
}
void Func1(A &buf)
{
cout << &buf.m_i << endl;
buf.m_i = 199;
return;
}
void Func2(unique_ptr<int> ptr)
{
cout << "智能指针转移后的地址" << &ptr << endl;
cout << *ptr << endl;
cout << "智能指针指向数据的地址:" << &(*ptr) << endl;
}
//传递临时对象
void test1()
{
int mySecond = 12;
thread mythread(Func,mySecond);
//mythread.detach();
mythread.join();
}
void test2()
{
int mySecond = 12;
thread mythread(Func,A(mySecond)); //操作系统为了安全忽略掉引用使用拷贝
mythread.detach();
//mythread.join();
}
//传递类对象
void test3()
{
int mySecond = 12;
A myObj(mySecond);
thread mythread(Func1, std::ref(myObj)); //ref告诉操作系统一定要使用引用,操作系统就不会为了数据安全忽悠引用使用拷贝了
mythread.join();
cout << myObj.m_i << endl;
cout << &myObj.m_i << endl;
}
//传递智能指针
void test4()
{
unique_ptr<int> myPtr(new int(100));
cout << "智能指针的地址" << &myPtr << endl;
cout << *myPtr << endl;
cout << "转移前智能指针指向数据的地址:" << &(*myPtr);
mythread.join(); //必须用join,要不然主线程结束ptr指向的内存就被回收了,而如果子线程还没有接收到主线程ptr的move,此时将使用已回收的内存
}
//对象成员函数做线程入口函数
void test5()
{
A myObj(10);
thread mythread(&A::thread_work, std::ref(myObj), 15);
mythread.join();
}
//可调用对象做线程入口函数
void test6()
{
A myObj(10);
thread mythread(std::ref(myObj), 15);
mythread.join();
}
int main()
{
cout << "主线程id:" << std::this_thread::get_id() << endl;
//test1();
//test2();
//test3();
//test4();
//test5();
test6();
cout << "主线程执行完毕" << endl;
return 0;
//线程的入口函数可以是普通函数,lambda表达式,对象的函数成员,可调用对象
3.互斥量
-
互斥量:mutex,一个类对象,其成员函数lock,unlock对临界资源上锁、解锁。 -
lock和unlock必须成对使用,一个mutex不能lock两次而只unlock一次,也不能lock一次而unlock一次 -
为了防止忘记unlock引入std::lock_guard类模板,利用对象的生命周期加锁和释放锁.,此时不能对这个互斥量lock和unlock -
死锁必须要有2个及以上的互斥量才会产生,原因是系统资源的竞争下进程运行推进顺序不合适,保证不同线程的上锁顺序一致就不会发生死锁。 -
std::lock,要求一次性将多个锁上锁,只要一个没有上锁成功,则立即释放已经上锁的锁,给其它线程使用,破坏死锁的请求与保持条件。阻塞线程,稍后再尝试将所有互斥量上锁,使用lock要记得解锁。 -
lock_guard的第二个参数std::adapt_lock,使得其构造函数不调用互斥量的lock成员函数,可以配合lock一次锁定多个互斥量使用
#include <iostream>。
#include <thread>
#include <list>
#include <mutex>
using namespace std;
class A
{
public:
bool outMsg(int &recv)
{
lock(my_mutex1, my_mutex2);
// my_mutex1.lock();
// my_mutex2.lock();
if (!msgQueue.empty())
{
recv = msgQueue.front();
msgQueue.pop_front();
my_mutex2.unlock();
my_mutex1.unlock();
return true;
}
my_mutex2.unlock();
my_mutex1.unlock();
return false;
}
void inMsgqueue()
{
for (int i = 0; i < 1000; ++i)
{
//lock_guard<mutex>my_guard(my_mutex);//取代lock,unlock防止忘记unlock,利用对象的生命周期,类似智能指针。adopt_lock,只解锁不上锁。
lock(my_mutex1, my_mutex2);//确保两个锁成功上锁
//lock已经将两个锁上锁,所以lock_guard使用adopt_lock不加锁以防止对同一个互斥量加锁多次
lock_guard<mutex> my_guard1(my_mutex1, adopt_lock);
lock_guard<mutex> my_guard2(my_mutex2, adopt_lock);
// my_mutex1.lock();
// my_mutex2.lock();
cout << "插入一个数据" << i << endl;
msgQueue.push_back(i);
// my_mutex2.unlock();
// my_mutex1.unlock();
}
}
void outMsgqueue()
{
int command;
for (int i = 0; i < 1000; ++i)
{
bool flag = outMsg(command);
if (flag == true)
{
cout << "接收到数据:" << command << endl;
}
else
{
cout << "消息队列为空" << endl;
}
}
}
private:
list<int> msgQueue;
mutex my_mutex1;
mutex my_mutex2;
};
int main()
{
A myobj;
thread outThread(&A::outMsgqueue, &myobj); //第二个参数必须是引用以保证两个线程入口对象成员函数是同一个对象,而不是拷贝后的不同对象。两个线程处理的数据才是同一个对象中的数据,才需要用互斥量保护数据,以防止一个线程对数据进行写操作时,另一个线程进行读操作
thread inThread(&A::inMsgqueue, &myobj);
outThread.join();
inThread.join();
cout << "主线程结束" << endl;
return 0;
}
4.unique_lock
unique_lock 的参数
- unique_lock 能够取代lock_guard
-
- std::adopt_lock,使得其构造函数不调用互斥量的lock成员函数,所以必须提前lock
unique_lock的功能和lock_guard类似,使用第二个参数adopt_lock表示互斥量已经被lock,所以使用前需要提前lock互斥量 2)std::try_to_lock,尝试使用mutex的lock去加锁,但如果加锁失败并不会阻塞线程。如果此线程在try_to_lock之前加锁,try_to_lock将一直拿锁失败。 unique_lock的.owns_lock查看try_to_lock是否拿到锁,拿到返回true 3)std::defer_lock,使用的前提是不能自己先lock,它初始化一个没有加锁的mutex
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#define defer_flag
using namespace std;
class A
{
public:
bool outMsg(int &recv)
{
lock(my_mutex1, my_mutex2);
// std::chrono::milliseconds drua(200); //1秒==1000毫秒
// std::this_thread::sleep_for(drua); //休息一定的时长
if (!msgQueue.empty())
{
recv = msgQueue.front();
msgQueue.pop_front();
my_mutex2.unlock();
my_mutex1.unlock();
return true;
}
my_mutex1.unlock();
my_mutex2.unlock();
return false;
}
void inMsgqueue()
{
for (int i = 0; i < 1000; ++i)
{
#ifdef defer_flag
//演示defer_lock
std::unique_lock<mutex> sbguard1(my_mutex1, std::defer_lock);
std::unique_lock<mutex> sbguard2(my_mutex2, std::defer_lock);
sbguard1.lock();
sbguard2.lock();
cout << "插入一个数据" << i << endl;
msgQueue.push_back(i);
#else
//演示adopt_lock
// lock(my_mutex1,my_mutex2);
// unique_lock<mutex>uniqueLock1(my_mutex1,adopt_lock);
// unique_lock<mutex>uniqueLock2(my_mutex2,adopt_lock);
// cout << "插入一个数据" << i << endl;
// msgQueue.push_back(i);
//演示try_to_lock
// lock(my_mutex1,my_mutex2);
unique_lock<mutex> uniqueLock1(my_mutex1, try_to_lock);
unique_lock<mutex> uniqueLock2(my_mutex2, try_to_lock);
if (uniqueLock1.owns_lock() && uniqueLock2.owns_lock())
{
cout << "两个锁都拿到了" << endl;
cout << "插入一个数据" << i << endl;
msgQueue.push_back(i);
}
else
{
cout << "拿锁失败,干点别的事" << endl;
}
//my_mutex1.unlock();
//my_mutex2.unlock();
#endif
}
}
void outMsgqueue()
{
int command;
for (int i = 0; i < 1000; ++i)
{
bool flag = outMsg(command);
if (flag == true)
{
cout << "接收到数据:" << command << endl;
}
else
{
cout << "消息队列为空" << endl;
}
}
}
private:
list<int> msgQueue;
mutex my_mutex1;
mutex my_mutex2;
};
int main()
{
A myobj;
thread outThread(&A::outMsgqueue, &myobj);
thread inThread(&A::inMsgqueue, &myobj);
outThread.join();
inThread.join();
cout << "主线程结束" << endl;
return 0;
}
unique_lock 的成员函数
- lock()加锁
- unlock()解锁,unique_lock能够自动解锁,当我们需要中途处理一些别的事情时可以先unlock处理完,unique_lock检测到已经解锁不会再解锁。
- try_lock()尝试加锁,不会阻塞线程,成功返回true
- release()解绑锁,返回指向锁的指针
unique_lock所有权的传递mutex
- std::unique_lockstd:mutex uniqueLock1 (my_mutex1): 所有权权概念uniqueLock1拥有my-mutex1的所有权
- uniqueLock1可以把自己对mutex (my-mutex1)的所有权转移给其他的unique-lock对象
unique-lock对象这个mutex的所有权是属于 可以转移,但是不能复制。 方法1:std::move 方法2:return std::unique_lock 临时对象
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#define flag_release
using namespace std;
class A
{
public:
std::unique_lock<mutex> rtn_unique_lock()
{
std::unique_lock<mutex> temguard(my_mutex1);
return temguard;//从函数返回一个局部对象,系统生成一个临时unique_lock并调用其移动构造函数
}
bool outMsg(int &recv)
{
lock(my_mutex1, my_mutex2);
if (!msgQueue.empty())
{
recv = msgQueue.front();
msgQueue.pop_front();
my_mutex2.unlock();
my_mutex1.unlock();
return true;
}
my_mutex1.unlock();
my_mutex2.unlock();
return false;
}
void inMsgqueue()
{
for (int i = 0; i < 1000; ++i)
{
#ifdef flag_release
//演示unique_lock 的lock和unlock
// unique_lock<mutex> uniqueLock1(my_mutex1, defer_lock);
// unique_lock<mutex> uniqueLock2(my_mutex2, defer_lock);
// uniqueLock1.lock();
// uniqueLock2.lock();
// cout << "插入一个数据" << i << endl;
// msgQueue.push_back(i);
// uniqueLock1.unlock();
// uniqueLock2.unlock();
//演示unique_lock 的try_lock
unique_lock<mutex> uniqueLock1(my_mutex1, defer_lock);
unique_lock<mutex> uniqueLock2(my_mutex2, defer_lock); //lock后,生命周期结束后会自动unlock,中途可以unlock做其他事
if (uniqueLock1.try_lock() && uniqueLock2.try_lock())
{
cout << "插入一个数据" << i << endl;
msgQueue.push_back(i);
}
else
{
cout << "没拿到锁,干别的事" << endl;
}
#else
//演示unique_lock 的release
unique_lock<mutex> uniqueLock1(my_mutex1);
//unique_lock<mutex>sbguard = rtn_unique_lock();//转移函数中mutex1的所有权
unique_lock<mutex> uniqueLock2(my_mutex2);
cout << "插入一个数据" << i << endl;
msgQueue.push_back(i);
unique_lock<mutex> uniqueLock3(std::move(uniqueLock2)); //将my_mutex2的所有权通过移动语义转移到uniqueLock3
std::mutex *mut1 = uniqueLock1.release();//将uniqueLock1和my_mutex1解绑,所以后续要负责my_mutex1的unlock
std::mutex *mut2 = uniqueLock3.release();
mut1->unlock();
mut2->unlock();
#endif
}
}
void outMsgqueue()
{
int command;
for (int i = 0; i < 1000; ++i)
{
bool flag = outMsg(command);
if (flag == true)
{
cout << "接收到数据:" << command << endl;
}
else
{
cout << "消息队列为空" << endl;
}
}
}
private:
list<int> msgQueue;
mutex my_mutex1;
mutex my_mutex2;
};
int main()
{
A myobj;
thread outThread(&A::outMsgqueue, &myobj);
thread inThread(&A::inMsgqueue, &myobj);
outThread.join();
inThread.join();
cout << "主线程结束" << endl;
return 0;
}
5.单例模式和call_once
- 多线程中创建单例对象是需要加锁,以防止其它线程在当前线程还没创建成功前切换线程去创建,而后切回再次创建。
- 双重锁定的编程方式通过避免单例对象创建成功后其它线程再次创建时频繁的加锁解锁,从而提高性能
- call_once 通过配合std::once_flag确保函数只被调用一次
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#define flag_callOnce
using namespace std;
mutex resource_mutex;
std::once_flag g_flag;
class MyCas{
#ifdefflag_callOnce
static void Createinstance(){
m_instance = new MyCas();
cout << "Createinstance()执行了" << endl;
static huishou hs;
}
public:
static MyCas * GetInstance(){
std::call_once(g_flag,Createinstance);//类似一个互斥量,同一时间只有一个线程能够执行,其它线程会阻塞
return m_instance;
}
#else
static MyCas * GetInstance(){
//if(m_instance != nullptr)条件成立,则m_instance 肯定已经被new了
//if(m_instance == nullptr)成立并不代表m_instance一定没被new过,线程1new之前切换到线程2new完回来
if(m_instance == nullptr){//双重锁定
std::unique_lock<mutex>mymutex(resource_mutex);
if(m_instance == nullptr){
m_instance = new MyCas();
static huishou hs;//静态对象,当程序退出时一定会调用其析构函数
}
}
return m_instance;
}
#endif
void func(){
cout << "测试函数" <<endl;
}
class huishou{
public:
~huishou(){
if(MyCas::m_instance){
cout << "huishou析构函数执行" << endl;
delete MyCas::m_instance;
MyCas::m_instance == nullptr;
}
}
};
private:
MyCas(){
cout << "MyCas构造函数" << endl;
}//构造函数私有化
static MyCas *m_instance;
};
MyCas *MyCas::m_instance = nullptr;
void myThread(){
cout << "线程开始执行" << endl;
MyCas * p_a = MyCas::GetInstance();
cout << "线程执行结束" << endl;
return;
}
int main()
{
//最好在主线程中创建好单例对象,避免在子线程中创建
thread mythread1(myThread);
thread mythread2(myThread);
return 0;
}
6.条件变量
-
std::condition_variable 的wait成员函数流程 wait ()用来等待条件变量的成立,wait的第二个参数就是条件 如果第二个参数lambda表达式返回值是true,那wait()直接返回; 如果第二个参数lambda表达式返回值是false,那么wait()将解锁互斥量,并堵塞到本行, 那堵塞到什么时候为止呢?堵塞到其他某个线程调用notify_one()成员函数为止; 如果wait ()没有第二个参数: my_cond. wait(guard) :那么就跟第二个参数1ambda表达式返回false效果一样(默认条件为false) wait()将解锁互斥量,并堵塞到本行,阻塞到其他某个线程调用notify_one()成员函数为止; 当其他线程用notify one ()将本wait (原来是睡看/堵塞)的状态唤醒后, wait就开始工作,恢复后wait干什么? a) wait()不断的尝试重新获取互斥量锁,如果获取不到,那么流程就卡在wait这里等着获取,如果获取到了锁,那wait就继续往下走 b) b.1)如果wait有第二个参数(lambda) ,就判断这个lambda表达式,如果lambda表达式为false,那么wait又对互斥量解锁,进入阻塞状态(能够防止虚假唤醒,即多线程环境下某个线程被唤醒但条件其实并未满足) b.2)如果lambda表达式为true, 则wait返回,流程走下来(此时互斥锁被锁着)。 b.3)如果wait没有第二个参数, 则wait返回,流程走下来。(无条件唤醒,不同于开始的阻塞) -
注意事项 conditon_variable一定要用notify_one(唤醒一个)/notify_all(唤醒所有,但是唤醒后存在互斥锁的争夺)唤醒,要不然条件不满足将被永久阻塞 wait解锁后被激活后会竞争锁资源,所以会受到操作系统的调度时机影响 如果notify_one()时,另外一个线程并没有被wait阻塞,那么notify_one没有效
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <condition_variable>
using namespace std;
class A
{
public:
void inMsgqueue()
{
for (int i = 0; i < 1000; ++i)
{
std::unique_lock<mutex> guard(my_mutex1);
cout << "插入一个数据" << i << endl;
msgQueue.push_back(i);
my_cond.notify_one();
}
}
void outMsgqueue()
{
int command;
while(true){
std::unique_lock<mutex> guard(my_mutex1);
my_cond.wait(guard,[this]{
if (!msgQueue.empty()){
return true;
}
return false;
});
command = msgQueue.front();
msgQueue.pop_front();
guard.unlock();
cout << "取出一个元素:" << command << endl;
}
}
private:
list<int> msgQueue;
mutex my_mutex1;
std::condition_variable my_cond;
};
int main()
{
A myobj;
thread outThread(&A::outMsgqueue, &myobj);
thread inThread(&A::inMsgqueue, &myobj);
outThread.join();
inThread.join();
cout << "主线程结束" << endl;
return 0;
}
7.async、packaged_task、promise
std::async是一个函数模板,用来启动一个异步任务,任务结束后返回一个std::future对象 异步任务就是自动的创建一个线程并开始执行对应线程的入口函数 std::future对象中含有线程入口函数将来返回的结果,但由于线程执行结束返回可能需要一段时间 因此我们可以通过future对象的成员函数get()阻塞等待线程结束,获取返回值(get是移动语义,只能get一次)
std::async第一个参数 std::launch::deferred,表示线程入口函数调用会被延迟到std::future的wait()或者get()调用时才执行,如果没有wait()或者get(),线程入口函数根本不会执行。 std::launch::deferred实际上并没有创建新线程,而是在主线程中调用线程入口函数 std::launch::async创建一个新线程并立即执行
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
class A{
public:
int mythread(int num){
cout << num << endl;
cout << "thread start,thread_id= " << std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "thread end,thread_id= " << std::this_thread::get_id() << endl;
return 8;
}
};
int main(){
A a;
int temp = 12;
cout << "main thread = " << std::this_thread::get_id() << endl;
std::future<int>result = std::async(std::launch::deferred,&A::mythread,&a,temp);
cout << "continue..........!" << endl;
cout << result.get() << endl;//阻塞等待mythread返回,拿到返回值,只能调用一次
//result.wait();//等待线程返回,本身并不返回结果
}
std::packaged_task :打包任务,把任务包装起来,是个类模板,它的模板参数是各种可调用对象;通过std::packaged_task来把各种可调用象包装起来,方便将来作为线程入口函数 std::packaged task包装完后也可以直接当成一个函数调用,也能放入容器里以后调用。
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
// int mythread(int num)
// {
// cout << num << endl;
// cout << "thread start,thread_id= " << std::this_thread::get_id() << endl;
// std::chrono::milliseconds dura(5000);
// std::this_thread::sleep_for(dura);
// cout << "thread end,thread_id= " << std::this_thread::get_id() << endl;
// return 8;
// }
int main()
{
int temp = 12;
cout << "main thread = " << std::this_thread::get_id() << endl;
//std::packaged_task<int(int)>mypt(mythread);//将函数mythread通过packaged_task包装起来
std::packaged_task<int(int)> mypt([](int num)
{
cout << num << endl;
cout << "thread start,thread_id= " << std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "thread end,thread_id= " << std::this_thread::get_id() << endl;
return 8;
}); //将函数lambda表达式通过packaged_task包装起来,和上面直接包装一个函数效果一样
std::thread pt1(std::ref(mypt), temp); //线程执行
pt1.join();
std::future<int> result = mypt.get_future(); //std::future对象中包含线程入口函数的返回值,get_future返回一个future对象
cout << result.get() << endl;
}
std::promise,是一个类模板我们可以在线程中给其赋值,在其它线程中取出值使用,可以实现线程之间的数据传递
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
void mythread(std::promise<int>&temp,int calc){
calc *= 10;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
int result = calc;
temp.set_value(result);//结果保存到temp对象中
return;
}
void mythread2(std::future <int>&temp){
auto result = temp.get();
cout << "mythread2 result:" << result << endl;
return;
}
int main(){
std::promise<int>mypro;//声明一个promise对象,保存值的类型为int
std::thread t1(mythread,std::ref(mypro),180);
t1.join();
std::future<int>ful = mypro.get_future();//promise和future绑定,用于获取从线程返回值
//auto result =ful.get();
//cout << "result = " << result << endl;
std::thread t2(mythread2,std::ref(ful));
t2.join();
}
8.future的成员函数
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
int mythread(){
cout << "thread start,thread_id= " << std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "thread end,thread_id= " << std::this_thread::get_id() << endl;
return 8;
};
int main(){
cout << "main thread = " << std::this_thread::get_id() << endl;
//std::future<int> result = std::async(mythread);//创建新线程立即执行
std::future<int> result = std::async(std::launch::deferred,mythread);//延迟执行,调用主线程执行入口函数
cout << "continue..........!" << endl;
//枚举类型
//std::future_status status = result.wait_for(std::chrono::seconds(1));//timeout
std::future_status status = result.wait_for(std::chrono::seconds(6));//ready
if(status == std::future_status::timeout){
//超时:表示线程还没有执行完,我等你1秒,你执行需要5秒
cout << "线程超时" <<endl;
}
else if(status == std::future_status::ready){
//表示线程执行完毕,我等你6秒,你执行需要5秒
cout << "线程执行完毕" << endl;
cout << result.get() << endl;
}
else if(status == std::future_status::deferred){
//当async的第一个参数被设置为std::launch:deferred,则本条件成立
cout << "线程被延迟执行" << endl;
cout << result.get() << endl;//调用get时才执行,并且并没有创建新线程而是使用主线程调用入口函数
}
cout << "主线程执行结束" << endl;
return 0;
}
- future的get是移动语义,所以只能get一次,而share_future是拷贝,所以可以get多次,当多个线程都需要获取packaged_task里的值时可以用share_future
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
int mythread(int mypar){
cout << "mythread() start ,threadid = " << std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
return 8;
}
void mythread2(std::shared_future <int>&temp){
cout << "mythread() start ,threadid = " << std::this_thread::get_id() << endl;
auto result = temp.get();//获取值,只能get一次,因为get这个future是一个移动语义,将temp的值移动到result,所以改成shared_future
cout << "mythread2 result:" << result << endl;
return;
}
int main(){
std::packaged_task<int(int)>mypt(mythread);
std::thread t1(std::ref(mypt),10);
t1.join();
std::future<int>result = mypt.get_future();
std::shared_future<int> result_move(std::move(result));//future 转移 到 shared_future
//std::shared_future<int> result_move(result.share());同上一句一样将result的结果移动到result_move
//bool ifcanget = result.valid();判断result现在是不是一个有效值
auto mythread_result_copy = result_move.get();//可以get多次,这里是复制
//std::thread t2(mythread2,std::ref(result));
std::thread t2(mythread2,std::ref(result_move));
t2.join();
cout << "主线程结束" << endl;
return 0;
}
//std::future ,get 转移数据
//std::shared_future 也是一个类模板,get 复制数据
9.atomic原子操作
- 原子操作,一个不可分割的操作,也就是执行原子操作时操作系统不能切换线程
std::atomic来代表原子操作,std::atomic是一个类模板,用来封装某个类型的值 std::atomic原子操作支持++、–、+=、-=操作,其它的可能不支持,看文档 - 与互斥量的区别在于互斥量可以锁住一段代码,而atomic只能针对一个变量
- load,以原子操作的方式读入,store以原子操作的方式读写入
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
std::atomic<int> mycount;
void mythread(){
for(int i =0;i<1000000 ;i++){
mycount++;
//mycount = mycount + 1;不支持
}
return;
}
int main(){
thread mythread1(mythread);
thread mythread2(mythread);
mythread1.join();
mythread2.join();
cout << "两个线程执行完毕,mycount = " << mycount << endl;
//std::atomic<int> mycount = 0;此对象的std::atomic<int>::atomic(const std::atomic<int>&) = delete ,拷贝构造函数delete
//std::atomic<int>temp = mycount;//error
std::atomic<int>mycountCopy(mycount.load());
cout << mycountCopy << endl;
mycountCopy.store(99);
cout << mycountCopy << endl;
return 0;
}
10.async
-
std::thread()如果系统资源紧张,线程可能创建失败导致程序崩溃 -
std::async()的参数
- std::launch::deferred,不创建新线程,延迟调用线程入口函数直到future对象的get()或wait()成员函数调用
- std::launch::async,强制异步任务在新线程中执行,必须创建新线程
- std::launch::deferred | std::launch::async 系统根据实际情况选择(系统资源的的紧张程度),当std::async不带第一个参数时就是这种情况,所以我们并不能确定是否创建的了新线程
-
std::thread 和 std::async的区别 std::thread 一定会去创建线程,创建失败则程序可能崩溃 std::async 不一定会创建新线程,当系统资源紧张时,可能系统会选择不创建新线程,而使用调用future对象get()或wait()的线程执行线程入口函数 std::async通过future获取线程返回值更容易 -
std::async 不确定创建新线程的问题的解决 使用future对象的wait_for 来查看status
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <future>
using namespace std;
int mythread(){
cout << "thread start,thread_id= " << std::this_thread::get_id() << endl;
std::chrono::milliseconds dura(5000);
std::this_thread::sleep_for(dura);
cout << "thread end,thread_id= " << std::this_thread::get_id() << endl;
return 8;
};
int main(){
cout << "main thread = " << std::this_thread::get_id() << endl;
std::future<int> result = std::async(mythread);
cout << "continue..........!" << endl;
//枚举类型
std::future_status status = result.wait_for(0s);//通过下面的代码来判断系统采用了什么策略
if(status == std::future_status::deferred){
//线程被延迟执行,可能是因为系统资源紧张,系统选择了std::launch::deferred策略
cout << result.get() <<endl;//这里调用的mythread
}
else{
if(status == std::future_status::ready){
//表示线程执行完
cout << "线程执行完" << endl;
cout << result.get() << endl;
}
else if(status == std::future_status::timeout){
cout << "超时线程还没执行完" << endl;
cout << result.get() << endl;
}
}
cout << "主线程执行结束" << endl;
return 0;
}
11.Windows临界区
- CRITICAL_SECTION临界区变量
- InitializeCriticalSection初始化临界区
- EnterCriticalSection进入临界区
- LeaveCriticalSection出临界区
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <Windows.h>
using namespace std;
#define __WINDOWS_
//本类用于自动释放windows下的临界区,防止忘记LeaveCriticalSection导致死锁情况的发生,类似于c++11中的std::lock_guard<std::mutex>功能
class CWinLock //叫RAII类(Resource Acquisition is initialization)中文“资源获取即初始化”;
//容器,智能指针这种类,都属于RAII类;
{
public:
CWinLock(CRITICAL_SECTION *pCritmp) //构造函数
{
m_pCritical = pCritmp;
EnterCriticalSection(m_pCritical);//进入临界区
}
~CWinLock() //析构函数
{
LeaveCriticalSection(m_pCritical);//出临界区
}
private:
CRITICAL_SECTION *m_pCritical;
};
class A
{
public:
A(){
#ifdef __WINDOWS_
InitializeCriticalSection(&my_winsec);//使用临界区之前需要初始化
#endif
}
bool outMsg(int &recv)
{
#ifdef __WINDOWS_
EnterCriticalSection(&my_winsec);//进入临界区
if (!msgQueue.empty())
{
recv = msgQueue.front();
msgQueue.pop_front();
LeaveCriticalSection(&my_winsec);//出临界区
return true;
}
LeaveCriticalSection(&my_winsec);//出临界区
#else
my_mutex1.lock();
if (!msgQueue.empty())
{
recv = msgQueue.front();
msgQueue.pop_front();
my_mutex1.unlock();
return true;
}
my_mutex1.unlock();
#endif
return false;
}
void inMsgqueue()
{
for (int i = 0; i < 1000; ++i)
{
cout << "插入一个数据" << i << endl;
#ifdef __WINDOWS_
CWinLock wlock(&my_winsec); //window版本的lock_guard
msgQueue.push_back(i);
#else
my_mutex1.lock();
msgQueue.push_back(i);
my_mutex1.unlock();
#endif
}
}
void outMsgqueue()
{
int command;
for (int i = 0; i < 1000; ++i)
{
bool flag = outMsg(command);//问题,传递引用,在其它函数中改变,本函数打印出改变后的值
if (flag == true)
{
cout << "接收到数据:" << command << endl;
}
else
{
cout << "消息队列为空" << endl;
}
}
}
private:
list<int> msgQueue;
mutex my_mutex1;
#ifdef __WINDOWS_
CRITICAL_SECTION my_winsec;//windows中的临界区,类似于C++11中的mutex
#endif
};
int main()
{
A myobj;
thread outThread(&A::outMsgqueue, &myobj);
thread inThread(&A::inMsgqueue, &myobj);
outThread.join();
inThread.join();
cout << "主线程结束" << endl;
return 0;
}
12.其它互斥量
-
recursive_mutex递归的独占互斥量,类似linux中的嵌套锁 std::mutex: 独占互斥量,自己lock时别人lock不了 std::recursive_mutex:递归的独占互斥量:允许同一个线程,同一个互斥量多次被.lock(),效率上比mutex要差一些;recursive_mutex也有lock,也有unlock(); -
带超时的互斥量std::timed_mutex和std::recursive_timed_mutex
-
std::timed_mutex:是带超时功能的独占互斥量; try_lock_for() :参数是一段时间,是等待一段时间。如果我拿到了锁,或者等待超过时间没拿到锁,程序继续执行,不会阻塞; try_lock_until() :参数是一个未来的时间点,在这个未来的时间没到的时间内,如果拿到了锁,那么就走下来;如果时间到了,没拿到锁,程序继续执行,不会阻塞; -
std::recursive_timed_mutex:带超时功能的递归独占互斥量(允许同一个线程多次获取这个互斥量)
#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <Windows.h>
using namespace std;
class A
{
public:
bool outMsg(int &recv)
{
std::lock_guard<std::timed_mutex> sbguard(my_mutex1);
if (!msgQueue.empty())
{
recv = msgQueue.front();
msgQueue.pop_front();
return true;
}
my_mutex1.unlock();
return false;
}
void inMsgqueue()
{
for (int i = 0; i < 1000; ++i)
{
cout << "插入一个数据" << i << endl;
std::chrono::milliseconds timeout(100); //100毫秒
//if (my_mutex.try_lock_for(timeout)) //等待100毫秒来尝试 获取锁
if (my_mutex1.try_lock_until(chrono::steady_clock::now() + timeout))
{
//在这100毫秒之内拿到了锁
msgQueue.push_back(i);//假设这个数字就是我收到的命令,我直接弄到消息队列里来
my_mutex1.unlock(); //用完了要解锁;
}
else
{
//这次没拿到锁头
std::chrono::microseconds sleeptime(100);
std::this_thread::sleep_for(sleeptime);
}
}
}
void outMsgqueue()
{
int command;
for (int i = 0; i < 1000; ++i)
{
bool flag = outMsg(command);//问题,传递引用,在其它函数中改变,本函数打印出改变后的值
if (flag == true)
{
cout << "接收到数据:" << command << endl;
}
else
{
cout << "消息队列为空" << endl;
}
}
}
private:
list<int> msgQueue;
std::timed_mutex my_mutex1;
};
int main()
{
A myobj;
thread outThread(&A::outMsgqueue, &myobj);
thread inThread(&A::inMsgqueue, &myobj);
outThread.join();
inThread.join();
cout << "主线程结束" << endl;
return 0;
}
|