C++线程操作
C++11 中提供的线程类 std::thread ,基于此类创建一个新的线程相对简单,只需要提供线程函数和线程对象即可;
1.命名空间 this_thread
C++11 添加一个关于线程的命名空间std::this_pthread ,此命名空间中提供四个公共的成员函数;
1.1 get_id()
调用命名空间std::this_thread 中的 get_id() 方法可以得到当前线程ID:
thread::id get_id() noexcept;
函数使用示例:
#include <iostream>
using namespace std;
#include <thread>
void func()
{
cout << "子线程id: " << this_thread::get_id() << endl;
}
int main()
{
cout << "主线程id: " << this_thread::get_id() << endl;
thread t(func);
t.join();
return 0;
}
程序启动后执行main() 函数,此时只有一个主线程。创建子线程之后,指定的函数func()会在子进程中执行。
1.2 sleep_for()
线程被创建出来之后有5中状态 : 创建态、就绪态、阻塞态、运行态、推出态 ;
线程和进程在使用时非常相识,在计算机中启动的多个线程都需要占用 CPU 资源,但是 CPU 的个数是有限的并且每个 CPU 在同一时间点不能同时处理多个任务。为了能够实现并发处理,多个线程都是分时复用CPU时间片,快速的交替处理各个线程中的任务。因此多个线程之间需要争抢CPU时间片,抢到了就执行,抢不到则无法执行 (因为默认所有的线程优先级都相同,内核也会从中调度,不会出现某个线程永远抢不到 CPU 时间片的情况)。
命名空间 this_thread 中提供了一个休眠函数 sleep_for() ,调用这个函数的线程会马上从运行态变成阻塞态并在这种状态下休眠一定的时长 ,因为阻塞态的线程已经让出了 CPU 资源,代码也不会被执行,所以线程休眠过程中对 CPU 来说没有任何负担;
template <class Rep ,class Period>
void sleep_for(const chrono::duration<Rep ,Period>&rel_time);
使用示例:
#include <iostream>
using namespace std;
#include <thread>
void func()
{
for (int i = 0; i < 5; ++i)
{
this_thread::sleep_for(chrono::seconds(1));
cout << "子线程id: " << this_thread::get_id() << endl;
}
}
int main()
{
cout << "主线程id: " << this_thread::get_id() << endl;
thread t(func);
t.join();
return 0;
}
在for循环中使用this_thread::sleep_for(chrono::seconds(1)); 后每次循环一次都会阻塞1s ,即每隔1s输出一次;注意: 程序休眠之后会从阻塞态变为就绪态,就绪态的线程需要再次抢夺CPU时间片,抢到之后会变成运行态,程序才能继续运行下去;
1.3 sleep_until
this_thread 命名空间还提供另一个休眠函数 sleep_until ,和 sleep_for 有不同的参数类型;
template <class Clock, class Duration>
void sleep_until (const chrono::time_point<Clock,Duration>& abs_time);
使用示例:
#include <iostream>
#include <thread>
using namespace std;
void func()
{
for (int i = 0; i < 5; i++)
{
//获取当前时间点
auto NewTime = chrono::system_clock::now();
//时间间隔
chrono::seconds sec(3);
//当前时间点后休眠3秒
this_thread::sleep_until(NewTime + sec);
cout << "子进程ID: " << this_thread::get_id() << endl;
}
}
int main()
{
thread t(func);
t.join();
return 0;
}
sleep_until 和 sleep_for 函数功能一样 ,前者基于时间点阻塞 ,后者基于时间段阻塞;
1.4 yield()
this_thread 命名空间提供能主动由运行态退让出已经抢到时间片 的线程函数 yield() ,最终变为就绪态,这样其他线程就能抢到CPU时间片;
线程调用了 yield () 之后会主动放弃 CPU 资源,但是这个变为就绪态的线程会马上参与到下一轮 CPU 的抢夺战中,不排除它能继续抢到 CPU 时间片的情况
void yield() noexcept;
使用示例:
#include <iostream>
#include <thread>
using namespace std;
void func()
{
for (int i = 0; i < 100000000000; ++i)
{
cout << "子线程: " << this_thread::get_id() << ", i = " << i << endl;
this_thread::yield();
}
}
int main()
{
thread t(func);
thread t1(func);
t.join();
t1.join();
return 0;
}
func() 中的 for 循环会占用大量的时间 ,在极端情况下,如果当前线程占用 CPU 资源不释放就会导致其他线程中的任务无法被处理,或者该线程每次都能抢到 CPU 时间片,导致其他线程中的任务没有机会被执行。解决方案就是每执行一次循环,让该线程主动放弃 CPU 资源,重新和其他线程再次抢夺 CPU 时间片,如果其他线程抢到了 CPU 时间片就可以执行相应的任务了。
注意:
- yield() 的目的是避免一个线程长时间占用CPU资源,从而多线程处理能力下降;
- yield() 是让当前线程主动放弃自己抢到的CPU资源,但是在下一轮还会继续抢;
2. C++ 线程类
2.1构造函数
//1. 默认构造
thread() noexcept;
//2. 移动构造函数
thread(thread&& other) noexcept;
//3. 创建线程对象
template <class Function ,class ...Args>
explicit thread(Function&& f , Args&&... args);
//4. 显示删除拷贝构造函数
thread(const thread&) =delete;
-
默认构造函数 ,构造一个线程对象,在此线程中不执行任何处理动作; -
移动构造函数 ,将 other的线程所有权转移给新的thread对象,之后other不在便是执行线程; -
创建线程对象 ,并在该线程中执行函数 f 中的业务逻辑,args 是要传递给函数 f 的参数 — 任务函数 f 有很多类型: ? — 普通函数、类成员函数、匿名函数、仿函数 (可调用对象类型); ? — 可以是可调用对象包装器类型,也可以是使用绑定器之后得到的类型(仿函数); -
使用 =delete 显示删除拷贝构造,不允许线程对象之间的拷贝;
2.2 公共成员函数
2.2.1 get_id() 函数
应用程序启动之后默认只有一个线程,这个线程被称为主线程(父线程) ,通过线程类创建的出的线程被称为子线程 ,每个创建的进程都对应一个唯一的线程ID,可通过线程ID 来区分各个线程实例,获取线程ID 的函数:get_id() ;
std::thread::id get_id() const noexcept;
使用示例:
#include <iostream>
#include <thread>
using namespace std;
void func()
{
for (int i = 0; i < 5; i++)
{
//获取当前时间点
auto NewTime = chrono::system_clock::now();
//时间间隔
chrono::seconds sec(1);
//当前时间点后休眠3秒
this_thread::sleep_until(NewTime + sec);
cout << "i = " << i << endl;
}
}
void func2(int num, string str)
{
for (int i = 0; i < 5; i++)
{
cout << "num = " << num << " string = " << str << endl;
}
}
int main()
{
thread t(func);
thread t1(func2, 666, "刘");
cout << "线程t的线程ID = " << t.get_id() << endl;
cout << "线程t1的线程ID = " << t1.get_id() << endl;
t.join();
t1.join();
return 0;
}
thread t1(func2, 666, "刘"); 创建子线程对象 t1 ,func()函数会在子线程中运行:
func() 函数是一个回调函数 ,线程启动之后就会执行这个任务函数;func1() 的参数通过 thread 的参数进行传递,666 ,“刘” 都是调用func1() 需要调用的实参;- 构造函数3 是一个**变参函数* *,无需担心参数的个数;
- 任务函数
func() 一般返回值指定为 void ,因为子线程在调用这个函数的时候不会处理其他返回值; - 通过线程对象调用
get_id() 可知道子线程线程ID ,t.get_id() ;
?
当启动一个线程(创建一个线程对象)后,回收线程资源,thread库有两种方式:
- 加入式 :join();
- 分离式 : detach();
必须在线程对象销毁前选择以上一种,否则在运行阶段BUG;
2.2.2 join() 函数
在子线程对象中调用 join()函数 ,调用此函数的线程会被阻塞 ,但是子线程对象中的任务函数会继续执行 ,当任务执行完毕之后 join()函数 会清理当前子线程中的相关资源后返回,同时该线程函数会解除阻塞继续执行下去。函数在那个线程中被执行,函数就阻塞那个函数;
void join();
如果要阻塞主线程的执行,只需要在主线程中通过子线程对象调用这个方法即可,当调用这个方法的子线程对象中的任务函数执行完毕之后,主线程的阻塞也就随之解除了:
int main()
{
cout << "主线程的线程ID: " << this_thread::get_id() << endl;
thread t(func, 520, "i love you");
thread t1(func1);
cout << "线程t 的线程ID: " << t.get_id() << endl;
cout << "线程t1的线程ID: " << t1.get_id() << endl;
t.join();
t1.join();
}
当主线程运行到第八行 :t.join() ; 根据子线程对象 t 的任务函数 func() 的执行情况,主线程会:
- 任务函数
func() 还没执行完毕,主线程阻塞直到任务执行完毕,主线程解除阻塞,继续向下执行; - 任务函数
func() 执行完毕,主线程不会阻塞 ,继续向下运行;
为了更好理解 join() 的使用,举例:
程序中三个线程,两个线程分别下载同一文件 ,下载完毕之后主线程对文件处理。
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
void download1()
{
//模拟下载,故意耗时100ms
this_thread::sleep_for(chrono::microseconds(100));
cout << "子线程1: " << this_thread::get_id() << "..找到历史正文了1" << endl;
}
void download2()
{
//模拟下载,故意耗时200ms
this_thread::sleep_for(chrono::microseconds(200));
cout << "子线程2: " << this_thread::get_id() << "..找到历史正文了2" << endl;
}
void mainShow()
{
cout << "集齐历史正文, 呼叫罗宾...." << endl;
cout << "历史正文解析中...." << endl;
cout << "起航,前往拉夫德尔...." << endl;
cout << "找到OnePiece, 成为海贼王, 哈哈哈!!!" << endl;
cout << "若干年后,草帽全员卒...." << endl;
cout << "大海贼时代再次被开启...." << endl;
}
int main()
{
thread t1(download1);
thread t2(download2);
//阻塞线程,保证子线程执行完成
t1.join();
t2.join();
mainShow();
return 0;
}
//输出结果
子线程2: 72540, 找到历史正文....
子线程1: 79776, 找到历史正文....
集齐历史正文, 呼叫罗宾....
历史正文解析中....
起航,前往拉夫德尔....
找到OnePiece, 成为海贼王, 哈哈哈!!!
若干年后,草帽全员卒....
大海贼时代再次被开启...
第 35、36行通过子线程对象调用了 join() 方法,这样就能够保证两个子线程的任务都执行完毕了,主线程才继续执行;
2.2.3 detach() 函数
detach() 函数的左右是进行线程分离 ,分离主线程和子线程。在线程分离之后,主线程退出也会销毁创建的所有子线程,在主线程推出之前,子线程可以脱离主线程继续独立运行,任务结束完毕之后,这个子线程会自动释放自己占用的系统资源。 孩子翅膀硬了离家出走了 ,但是家里株连九族也会被牵连;
void detach();
使用示例:
int main()
{
cout<<"主线程ID:"<<this_thread::get_id()<<endl;
thread t1(func);
thread t2(func1);
cout<< "线程t1 的线程ID:"<<t1.get_id()<<endl;
cout<<"线程t2 的线程ID:"<<t2.get_id()<<endl;
//线程分离
t1.detach();
t2.detach();
//主线等待 子线程执行完毕
this_thread ::sleep_for(chrono::seconds (3));
return 0;
}
注意: 线程分离函数 detach () 不会阻塞线程,子线程和主线程分离之后,在主线程中就不能再对这个子线程做任何控制了,比如:通过 join () 阻塞主线程等待子线程中的任务执行完毕,或者调用 get_id () 获取子线程的线程 ID。
建议使用 join();
2.2.4 joinable() 函数
joinable() 函数用于判断主线程和子线程是否处于关联(连接)状态,通常情况下两者处于关联状态,该函数返回一个布尔类型:
-
返回 true : 主线程和子线程有关联; -
返回 false : 主线程和子线程没有关联;
bool joinable() const noexcept;
使用示例:
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
void foo()
{
this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
thread t;
cout << "before starting, joinable: " << t.joinable() << endl;
t = thread(foo);
cout << "after starting, joinable: " << t.joinable() << endl;
t.join();
cout << "after joining, joinable: " << t.joinable() << endl;
thread t1(foo);
cout << "after starting, joinable: " << t1.joinable() << endl;
t1.detach();
cout << "after detaching, joinable: " << t1.joinable() << endl;
}
编译结果:
before starting, joinable: 0
after starting, joinable: 1
after joining, joinable: 0
after starting, joinable: 1
after detaching, joinable: 0
结论:
创建子线程对象时,如果没有指定任务函数,那么子线程不会启动,主线程和子线程也不会进行连接; 创建子线程对象时,如果指定任务函数,子线程启动并执行任务,主线程和子线程自动连接成功; 子线程调用detach()函数后,父子线程分离,两者的连接断开,调用joinable()返回 fasle; 子线程调用 join()函数后,子线程中的任务函数继续执行,知道任务处理完毕,此时join()清理回收当前线程的相关资源,此时子线程和主线程连接断开了,此时调用join()函数之后再调用joinable()返回false;
2.3 静态函数
thread 线程类提供一个静态方法,用于**获取当前计算机CPU的核心数* *,可根据获取的核心数多少确定相关数量的线程,每个CPU核心对应一个线程时,线程不需要分时复用CPU时间片,此时程序的并发效率是最高的。
static unsigned hardware_concurrency() noexcept;
使用示例:
#include <iostream>
#include <thread>
using namespace std;
int main()
{
int num = thread::hardware_concurrency();
cout << "CPU number: " << num << endl;
}
3. C++ 线程同步
进行多线程编程,如果多个线程需要对同一块内存进行操作,如:同时读、同时写、同时读写 对后两种情况而言如果不做任何的人为干涉会出现各种错误数据。这是因为线程在运行的时候需要先得到 CPU 时间片,时间片用完之后需要放弃已获得的 CPU 资源,就这样线程频繁地在就绪态和运行态之间切换,更复杂一点还可以在就绪态、运行态、挂起态之间切换,这样就会导致线程的执行顺序并不是有序的,而是随机的混乱的。
3.1互斥锁
解决多线程数据混乱的方案就是进行线程同步,最常用的是互斥锁 ,在C++11 中提供了四种互斥锁:
std::mutex : 独占的互斥锁,不能递归使用;std::timed_mutex: 带超时的独占互斥锁,不能递归使用;std::recursive_mutex: 递归互斥锁,不带超时功能;std::recursive_timed_mutex : 带超时的递归互斥锁;
3.1.1 std::mutex
无论在C 或者C++中进行线程同步的处理流程基本一致的,C++的mutex类 提供相关的API函数;
3.1.1.1 成员函数
lock()函数 用于给临界区加锁,并只能有一个线程获得锁的所有权 ,有阻塞线程的作用;
void lock();
独占互斥锁对象有两种状态: 锁定 和 未锁定
如果互斥锁是打开的,调用 lock() 函数的线程会得到互斥锁的所有权,并将其上锁。其他线程再调用该函数时由于得不到互斥锁的所有权,就会被 lock() 函数阻塞。
当拥有互斥锁所有权的线程将互斥锁解锁 ,此时被 lock() 阻塞的线程解除阻塞 ,抢到互斥锁所有权的线程加锁成功并继续开锁,没抢到互斥锁所有权的线程继续阻塞;
还可以使用 try_lock() 获取互斥锁的所有权并对互斥锁加锁:
bool try_lock();
和 lock()的区别在于 ,try_lock()不会阻塞线程,lock()会阻塞线程:
互斥锁被锁定之后可以通过 unlock() 进行解锁,但是需要注意:只有拥有互斥锁所有权的线程(对互斥锁上锁的线程)才能将其解锁,其他线程没有权限做这件事;
void unlock();
使用互斥锁进行线程同步的流程:
- 找到多个线程操作的共享资源
(全局变量、堆内存、类成员变量) ,成为临界资源 ; - 找到共享资源相关的上下文代码,即临界区
- 再临界区的上边调用互斥锁类的
lock() 方法; - 再临界区的下边调用互斥锁类的
unlock() 方法;
线程同步的目的:使多线程按照顺序依次进行执行临界区代码,对共享资源的访问从并行访问变成线性访问,访问效率降低了但是保证了数据的正确性;
当线程对互斥锁对象加锁,并且执行完临界区代码之后,一定要使用这个线程对互斥锁解锁,否则最终会造成线程的死锁。死锁之后当前应用程序中的所有线程都会被阻塞,并且阻塞无法解除,应用程序也无法继续运行。
3.1.1.2 线程同步
==示例:==两个线程共同操作同一个全局变量,二者交替数数,将数值存储到这个全局变量里并打印出来;
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
//设置全局变量
int number = 0;
mutex m_num_mutex;
void increment(int id)
{
for (int i = 0; i < 5; ++i)
{
m_num_mutex.lock();
number++;
cout << "id = " << id << "number = " << number << endl;
m_num_mutex.unlock();
this_thread::sleep_for(chrono::seconds(1));
}
}
int main()
{
thread t1(increment, 0);
thread t2(increment, 1);
t1.join();
t2.join();
return 0;
}
注意:
在所有线程的任务函数执行完毕之前,互斥锁对象是不能被析构的 ,一定要在程序中保证对象的可用性;- 互斥锁的个数和共享资源的个数相等,
每一个共享资源对应一个互斥锁对象 ,与线程数无关;
3.1.2 std::lock_guard
lock_guard 是C++11新增的一个模板类,可以简化互斥锁 lock() 和 unlock() 的写法,同时也更安全;
template <class Mutex>
class lock_guard;
//常用构造
eplicit lock_guard(mutex_type& m);
lock_guard 在使用上面的构造函数构造对象时,会自动锁定互斥量,且在退出作用域后进行析构就会自动解锁,以保证互斥量的正确性,避免忘记 unlock() 而导致的线程死锁。
使用 lock_guard 对互斥锁案例修改:
void increment(int id)
{
for (int i=0;i<;i++)
{
//使用lock_guard 加锁
lock_guard<mutex>lock(m_num_mutex);
number++;
cout << "id = " << id << "number = " << number << endl;
m_num_mutex.unlock();
this_thread::sleep_for(chrono::seconds(1));
}
}
代码变得精简了也不用担心忘记解锁而造成的程序死锁,但是此种方式中整个for循环都被当做临界区,多个线程线性的执行临界区的代码,因此临界区越大程序效率越低;
3.1.3 std::recursive_mutex
递归互斥锁: std::recursive_mutex 允许同一线程多次获得互斥锁,可以用来解决同一线程需要多次获取互斥量时的死锁问题 ,以下案例使用独占非递归互斥量会发生死锁:
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
class Calculate
{
public:
Calculate():m_num(2){}
void mul(int x)
{
lock_guard<mutex>lock(m_mutex);
m_num *= x;
}
void div(int x)
{
lock_guard<mutex>lock(m_mutex);
m_num = m_num /x;
}
void both(int x, int y)
{
lock_guard<mutex>lock(m_mutex);
mul(x);
div(x);
}
private:
int m_num;
mutex m_mutex;
};
int main()
{
Calculate cal;
cal.both(3,4);
return 0;
}
在执行到 cal.both(3,4); 调用之后程序会发生死锁,在 both() 中已经对互斥锁加锁了,继续调用 mult() 函数,已经得到互斥锁所有权的线程再次获取这个互斥锁的所有权便会造成死锁 (C++异常退出);使用递归互斥锁 std::recursive_mutex ,其允许一个线程多次获取互斥锁的所有权;
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
class Calculate
{
public:
Calculate():m_num(2){}
void mul(int x)
{
lock_guard<recursive_mutex>lock(m_mutex);
m_num *= x;
}
void div(int x)
{
lock_guard<recursive_mutex>lock(m_mutex);
m_num = m_num /x;
}
void both(int x, int y)
{
lock_guard<recursive_mutex>lock(m_mutex);
mul(x);
div(x);
}
int m_num;
recursive_mutex m_mutex;
};
int main()
{
Calculate cal;
cal.both(3,4);
cout << "cal.both(3,4) = " << cal.m_num << endl;
return 0;
}
总结:
递归互斥锁可以解决同一个互斥锁频繁获取互斥锁资源的问题,但是建议少用:
使用递归锁的场景往往可以都是简化的,使用递归锁可能会导致复杂逻辑产生,可能会导致bug产生; 互斥递归锁比非互斥递归锁效率低一些; 递归互斥锁虽运行同一个线程多次获取同一个互斥锁的所有权,但是最大使用次数未知,使用次数过多可能会抛出异常 std::system 错误;
3.1.4 std::timed_mutex
std::timed_mutex 是独占超时互斥锁 ,在获取互斥锁资源是增加一个超时等待功能 ,因为不知道获取锁资源需要等待多长时间,为了保证不一直等待下去,设置一个超时时长,超时后线程会解除阻塞做其他事情;
std::timed_mutex 比 std::mutex 多了两个成员函数:try_lock_for() 和 try_lock_until() :
// std::timed_mutex比std::_mutex多出的两个成员函数
template <class Rep, class Period>
bool try_lock_for (const chrono::duration<Rep,Period>& rel_time);
template <class Clock, class Duration>
bool try_lock_until (const chrono::time_point<Clock,Duration>& abs_time);
-
try_lock_for 函数是当线程获取不到互斥锁资源之后,让线程阻塞一定的时间长度; -
try_lock_until 函数是当线程获取不到互斥锁资源时,让线程阻塞到某一个时间点; -
当两个函数返回值:当得到互斥锁所有权后,函数会马上解除阻塞 ,返回true ,如果阻塞的时长用完或达到某时间点后,函数会解除阻塞 ,返回false ;
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
timed_mutex g_mutex;
void work()
{
chrono::seconds timeout(1);
while (true)
{
// 通过阻塞一定的时长来争取得到互斥锁所有权
if (g_mutex.try_lock_for(timeout))
{
cout << "当前线程ID: " << this_thread::get_id()
<< ", 得到互斥锁所有权..." << endl;
// 模拟处理任务用了一定的时长
this_thread::sleep_for(chrono::seconds(10));
// 互斥锁解锁
g_mutex.unlock();
break;
}
else
{
cout << "当前线程ID: " << this_thread::get_id()
<< ", 没有得到互斥锁所有权..." << endl;
// 模拟处理其他任务用了一定的时长
this_thread::sleep_for(chrono::milliseconds(50));
}
}
}
int main()
{
thread t1(work);
thread t2(work);
t1.join();
t2.join();
return 0;
}
编译运行:
当前线程ID: 125776, 得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 没有得到互斥锁所有权...
当前线程ID: 112324, 得到互斥锁所有权...
在上面的例子中,通过一个 while 循环不停的去获取超时互斥锁的所有权,如果得不到就阻塞 1 秒钟,1 秒之后如果还是得不到阻塞 50 毫秒,然后再次继续尝试,直到获得互斥锁的所有权,跳出循环体。
关于递归超时互斥锁 std::recursive_timed_mutex 的使用方式和 std::timed_mutex 是一样的,只不过它可以允许一个线程多次获得互斥锁所有权,而 std::timed_mutex 只允许线程获取一次互斥锁所有权。另外,递归超时互斥锁 std::recursive_timed_mutex 也拥有和 std::recursive_mutex 一样的弊端,不建议频繁使用
3.2 条件变量
C++11 提供了另一种用于等待的同步机制,能阻塞一个或多个线程,直到收到另一个线程发出的通知或超时时,才能唤醒当前阻塞的线程。条件变量需要和互斥量配合使用。
C++11 提供了两种条件变量:
condition_variable: 配合 std::unique_lock<std::mutex> 进行 wait 操作,也就是阻塞线程的操作;conditon_variable_any : 可以和任意带有 lock() 、unlock() 语义的 mutex 搭配使用,即存在四种:
std::mutex : 独占的非递归互斥锁;std::timed_mutex: 带超时的独占非递归锁;std::recursive_mutex: 不带超时功能的递归互斥锁;std::recursive_timed_mutex: 带超时的递归互斥锁;
条件变量常用于生产者和消费者模型,使用流程:
-
拥有条件变量的线程获取互斥锁; -
循环检查某个条件,如果条件不满足 阻塞当前线程,否则线程继续向下执行; — 产品的数量达到上线,生产者阻塞 ,否则生产者一直生产; — 产品的数量为零 ,消费者阻塞,否则消费者一直消费; -
条件满足后,可调用 notify_one() 或 notify_all() 唤醒一个或者所有被阻塞的线程; — 由消费者唤醒被阻塞的生产者 ,生产者解除阻塞继续生产。。 — 由生产者唤醒被阻塞的消费者 ,消费者解除阻塞继续消费。。
3.2.1 conditon_variable
3.2.1.1 成员函数
conditon_variable 的成员函数主要分为两部分:线程等待(阻塞)函数 和 线程通知(唤醒)函数 ,定义在头文件 <condition_variable> 中;
调用 wait()函数 的线程会被阻塞
// ①
void wait (unique_lock<mutex>& lck);
// ②
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);
? — 返回 fasle当前线程被阻塞,返回true 当前线程不被阻塞,继续向下执行;
- 独占的互斥锁对象不能直接传递给
wait() 函数 ,需要通过模板类 unique_lock 进行二次处理,通过得到的对象依然可以对独占的互斥锁对象做下面操作,使用更加灵活:
公共成员函数 | 说明 |
---|
lock | 锁定关联的互斥锁 | try_lock | 尝试锁定关联的互斥锁,若无法锁定,函数直接返回 | try_lock_for | 试图锁定关联的可定是锁定互斥锁,如无法给定时长 内锁定,函数返回 | try_lock_until | 试图锁定关联的可定是锁定互斥锁,如无法给定时间点 内锁定,函数返回 | unlock | 互斥锁解锁 |
如果该线程被该函数阻塞,此线程会释放占有的互斥锁的所有权,当解除阻塞之后此线程会重新得到互斥锁的所有权,继续执行 (避免死锁);
wait_for() 函数 和 wait() 函数 的功能是一样的,添加了一个阻塞时长 :假设阻塞的线程没有被其他线程唤醒,当阻塞时长用完之后,线程就会自动解除阻塞向下执行;
template <class Clock, class Duration>
cv_status wait_until (unique_lock<mutex>& lck,
const chrono::time_point<Clock,Duration>& abs_time);
template <class Clock, class Duration, class Predicate>
bool wait_until (unique_lock<mutex>& lck,
const chrono::time_point<Clock,Duration>& abs_time, Predicate pred);
通知函数:
void notify_one() noexcept;
void notify_all() noexcept;
3.2.2 生产者和消费者模型
示例:使用条件变量实现一个同步队列,此队列作为生产者线程和消费者线程的共享资源:
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
#include <functional>
#include <condition_variable>
using namespace std;
class SyncQueue
{
public:
SyncQueue(int maxSize) : m_maxSize(maxSize) {}
void put(const int& x)
{
unique_lock<mutex> locker(m_mutex);
// 判断任务队列是不已经满了
while (m_queue.size() == m_maxSize)
{
cout << "任务队列已满, 请耐心等待..." << endl;
// 阻塞线程
m_notFull.wait(locker);
}
// 将任务放入到任务队列中
m_queue.push_back(x);
cout << x << " 被生产" << endl;
// 通知消费者去消费
m_notEmpty.notify_one();
}
int take()
{
unique_lock<mutex> locker(m_mutex);
while (m_queue.empty())
{
cout << "任务队列已空,请耐心等待。。。" << endl;
m_notEmpty.wait(locker);
}
// 从任务队列中取出任务(消费)
int x = m_queue.front();
m_queue.pop_front();
// 通知生产者去生产
m_notFull.notify_one();
cout << x << " 被消费" << endl;
return x;
}
bool empty()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.empty();
}
bool full()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
int size()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size();
}
private:
list<int> m_queue; // 存储队列数据
mutex m_mutex; // 互斥锁
condition_variable m_notEmpty; // 不为空的条件变量
condition_variable m_notFull; // 没有满的条件变量
int m_maxSize; // 任务队列的最大任务个数
};
int main()
{
SyncQueue taskQ(50);
auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1); //函数对象 ,替换函数指针使用
auto consume = bind(&SyncQueue::take, &taskQ);
thread t1[3];
thread t2[3];
for (int i = 0; i < 3; ++i)
{
t1[i] = thread(produce, i + 100);
t2[i] = thread(consume);
}
for (int i = 0; i < 3; ++i)
{
t1[i].join();
t2[i].join();
}
return 0;
}
条件变量 condition_variable 类的 wait() 方法还存在重载方法: 可以接受一个条件,这个条件也可以是一个返回值为布尔类型的函数,条件变量会先检查判断这个条件是否满足,如果满足条件(布尔值:true)则当前线程重新获得互斥锁的所有权,结束阻塞,继续向下执行;如果不满足条件(布尔值:false)当前线程会释放互斥锁(解锁)同时被阻塞,等待被唤醒;
修改上面代码中 put () take()函数:
// put() 函数
void put(const int& x)
{
unique_lock<mutex> locker(m_mutex);
//根据条件阻塞线程
m_notFull.wait(locker ,[this](){
return m_queue.size() != m_maxSize;
});
//将任务放置再任务队列中
m_queue.push_back(x);
cout<<x<<" 被生产。。"<<endl;
//通知消费者去消费
m_notEmpty.notify_one();
}
//take 函数
int take()
{
unique_lock<mutex> locker;
m_notEmpty.wait(locker ,[this](){
return !m_queue.empty();
});
//消费
int x = m_queue.front();
m_queue.pop_front();
//通知生产者生产
m_notFull.notify_one();
cout<<x<<" 被消费。。"<<endl;
return x;
}
修改之后程序变得更加精简且执行效率更高了 (去除了while循环),两种方式效果一致,推荐使用此种wait() 进行线程阻塞;
3.2.2 condition_variable_any
3.2.2.1 成员函数
condition_variable_any 的成员函数分为两部分 : 线程等待(阻塞)函数 和 线程通知(唤醒)函数 ,被定义再头文件 <condition_variable> ;
// ①
template <class Lock> void wait (Lock& lck);
// ②
template <class Lock, class Predicate>
void wait (Lock& lck, Predicate pred);
- 函数1 :调用该函数的线程直接被阻塞;
- 函数2 : 该函数的第二个参数是一个判定条件,是一个返回布尔类型的函数;
该参数可以传递一个有名函数的地址,也可以指向一个匿名函数; 表达式返回false 当前线程被阻塞,表达式返回true 当前线程不会被阻塞,继续向下执行; - 可以直接给wait() 函数的互斥锁类型分别是:
std::mutex; std::timed_mutex; std::recursive_mutex; std::recursive_timed_mutex; 如果线程被该函数阻塞,这个线程会释放占有的互斥锁的所有权,当阻塞解除之后该线程重新获得互斥锁的所有权,继续向下执行 (避免死锁);
wait_for() 函数 wait_until() 函数 和 wait() 函数功能一致,添加一个阻塞时长功能 或 指定时间点。 如阻塞的线程没有被其他线程唤醒,当阻塞的时长用完之后 ,或到达指定阻塞时间点,线程会自动解除阻塞;
template <class Lock, class Rep, class Period>
cv_status wait_for (Lock& lck, const chrono::duration<Rep,Period>& rel_time);
template <class Lock, class Rep, class Period, class Predicate>
bool wait_for (Lock& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred);
--------------------------------------------------------------------
template <class Lock, class Clock, class Duration>
cv_status wait_until (Lock& lck, const chrono::time_point<Clock,Duration>& abs_time);
template <class Lock, class Clock, class Duration, class Predicate>
bool wait_until (Lock& lck,
const chrono::time_point<Clock,Duration>& abs_time,
Predicate pred);
void notify_one() noexcept;
void notify_all() noexcept;
3.2.2.2 生产者和消费者模型
使用条件变量实现 condition_variable_any 实现以上案例:
#include <iostream>
#include <thread>
#include <mutex>
#include <list>
#include <functional>
#include <condition_variable>
using namespace std;
class SyncQueue
{
public:
SyncQueue(int maxSize) : m_maxSize(maxSize) {}
void put(const int& x)
{
lock_guard<mutex> locker(m_mutex);
// 根据条件阻塞线程
m_notFull.wait(m_mutex, [this]() {
return m_queue.size() != m_maxSize;
});
// 将任务放入到任务队列中
m_queue.push_back(x);
cout << x << " 被生产" << endl;
// 通知消费者去消费
m_notEmpty.notify_one();
}
int take()
{
lock_guard<mutex> locker(m_mutex);
m_notEmpty.wait(m_mutex, [this]() {
return !m_queue.empty();
});
// 从任务队列中取出任务(消费)
int x = m_queue.front();
m_queue.pop_front();
// 通知生产者去生产
m_notFull.notify_one();
cout << x << " 被消费" << endl;
return x;
}
bool empty()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.empty();
}
bool full()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
int size()
{
lock_guard<mutex> locker(m_mutex);
return m_queue.size();
}
private:
list<int> m_queue; // 存储队列数据
mutex m_mutex; // 互斥锁
condition_variable_any m_notEmpty; // 不为空的条件变量
condition_variable_any m_notFull; // 没有满的条件变量
int m_maxSize; // 任务队列的最大任务个数
};
int main()
{
SyncQueue taskQ(50);
auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1);
auto consume = bind(&SyncQueue::take, &taskQ);
thread t1[3];
thread t2[3];
for (int i = 0; i < 3; ++i)
{
t1[i] = thread(produce, i + 100);
t2[i] = thread(consume);
}
for (int i = 0; i < 3; ++i)
{
t1[i].join();
t2[i].join();
}
return 0;
}
总结:以上介绍的两种互斥锁各自有各自的特点,condition_variable 配合 unique_lock 使用更灵活一些,可以在在任何时候自由地释放互斥锁,而 condition_variable_any 如果和 lock_guard 一起使用必须要等到其生命周期结束才能将互斥锁释放。但是,condition_variable_any 可以和多种互斥锁配合使用,应用场景也更广,而 condition_variable 只能和独占的非递归互斥锁(mutex)配合使用,有一定的局限性;
|