第二章 线程管理
2.1 线程的基本操作
某些情况下,任务函数需要通过某种通讯机制进行参数的传递,或者执行一些独立操作,通过通讯机制传递信号让线程停止。
使用C++线程库启动线程,就是构造std::thread 对象。
void do_some_work();
std::thread my_thread(do_some_work);
std::thread 可以通过有函数操作符类型的实例进行构造。
class background_task
{
public:
void operator()() const
{
do_something();
do_something_else();
}
};
background_task f;
std::thread my_thread(f);
在上面的代码中,提供的函数对象会复制到新线程的存储空间中,函数对象的执行和调用都在线程的内存空间中进行。
当把函数对象传入到线程构造函数中时,需要避免语法解析,如果传递了一个临时变量,而不是一个命名的变量。C++编译器会将其解析为函数声明,而不是类型对象的定义。
std::thread my_thread(background_task());
这相当于声明了一个名为my_thread 的函数,这个函数带有一个参数(函数指针指向没有参数并返回background_task 对象的函数),返回一个std::thread 对象的函数。
使用多组括号,或者使用同一的初始化语法,都可以避免这个问题。
如下所示
std::thread my_thread((background_task()));
std::thread my_thread{background_task()};
Lambda 表达式也能避免这个问题,Lambda 表达式允许使用一个可以捕获局部变量的局部函数。
std::thread my_thread([]{
do_something();
do_something_else();
});
线程启动后要等待线程结束,还是自主运行。当std::thread 对象销毁之前还没有做出决定,程序就会终止(std::thread 的析构函数会调用std::terminate() )。因此,即便是有异常存在,也需要确保线程能够正确joined 或detached 。
如果不等待线程joined ,就必须保证线程结束之前,访问数据的有效性。单线程代码中,对象销毁之后再去访问,会产生未定义行为。这种情况下很可能发生在线程还没结束,函数已经退出的时候,这时线程函数还持有函数局部变量的指针或引用。
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator()()
{
for(unsigned j = 0; j < 10000; ++j) //1.潜在访问隐患:空引用
do_something(i);
}
};
void oops()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach(); //2.不等待线程结束
} //3.新线程可能还在运行
下表显示线程在局部变量销毁后,仍对该变量进行访问。
主线程 | 新线程 |
---|
使用some_local_state 构造my_func | | 开启新线程my_thread | | | 启动 | | 调用func::operator() | 将my_thread 分离 | 执行func::operator(); 可能会在do_something 中调用some_local_state 的引用 | 销毁some_local_state | 持续运行 | 退出oops 函数 | 持续执行func::operator(); 可能会在do_something 中调用some_local_state 的引用 --> 导致未定义行为 |
这种情况的常规处理方法:将数据复制到线程中。如果使用一个可调用的对象作为线程函数,这个对象就会复制到线程中,然后原始对象会立即销毁。对于对象中包含的指针和引用还需谨慎。
不要使用访问局部变量的函数去创建线程
2.1.2 等待线程完成
使用join() 函数等待线程完成,在上面的代码中将my_thread.detach() 替换为my_thread.join() ,可以确保局部变量在线程完成后才销毁。
调用join() ,可以清理线程的相关内存,这样std::thread 对象将不再与已经完成的线程有任何关联。只能对一个线程使用一次join() 。
2.1.3 特殊情况下的等待
需要对一个未销毁的std::thread 对象使用join() 或detach() 。如果等待线程,则需要仔细挑选join() 的位置。当在线程运行后产生的异常,会在join() 调用之前抛出。
避免应用被抛出的异常所终止。通常,在无异常的情况下使用join() 时,需要在异常处理过程中调用join() ,
struct func; //上面的代码
void f()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join(); //1
throw;
}
t.join(); //2
}
代码中使用了try/catch 块确保线程退出后函数才结束。当函数正常退出后,会执行到2处。当执行过程中抛出异常,程序会执行到1处。
另一种方式是使用“资源获取即初始化方式(RAII)”,提供一个类,在析构函数中使用join() 。
//代码2.3
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_): t(t_){}
~thread_guard()
{
if(t.joinable()) //1
{
t.join(); //2
}
}
thread_guard(thread_guard const&) = delete; //3
thread_guard&
};
struct func; //上面定义的
void f()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
do_something_in_current_thread();
} //4
线程执行到4处,局部对象就要被逆序销毁了。因此,thread_guard 对象g 是第一个被销毁的,这时线程在析构函数中被加入2到原始线程中。即使do_something_in_current_thread 抛出一个异常,这个销毁依旧会发生。
2.1.4 后台运行线程
使用detach() 会让线程在后台运行,这就意味着与主线程不能直接交互。如果线程分离,就不可能有std::thread 对象能引用它。分离的线程不能join 。不过C++运行库保证,当线程退出时,相关资源能够正确回收。
分离线程通常称为守护线程。
调用std::thread 成员函数detach() 来分离一个线程,之后,相应的std::thread 对象就与实际执行的线程无关了,并且这个线程也无法join() 。
为了从std::thread 对象中分离线程,不能对没有执行线程的std::thread 对象使用detach() ,并且要用同样的方式检查——当std::thread 对象使用t.joinable() 返回的是true ,就可以使用t.detach() 。
使用分离线程处理文档
void edit_document(std::string const& filename)
{
open_document_and_display_gui(filename);
while(!done_editing())
{
user_command cmd = get_user_input();
if(cmd.type == open_new_document)
{
std::string const new_name = get_filename_from_user();
std::thread t(edit_document, new_name); // 1
t.detach(); // 2
}
else
{
process_user_input(cmd);
}
}
}
如果用户选择打开一个新文档,需要启动一个新线程去打开新文档1,并分离线程2。与当前线程做出的操作一样,新线程只不过是打开另一个文件而已。所以,edit_document 函数可以复用,并通过传参的形式打开新的文件。
这个例子也展示了传参启动线程的方法,不仅可以向std::thread 构造函数1传递函数名,还可以传递函数所需要的参数(实参)。
2.2 传递参数
向可调用对象或函数传递参数只需要将这些参数作为std::thread 构造函数的附加参数即可。需要注意的是,这些参数会拷贝至新线程的内存空间中。即使函数中的参数是引用的形式,拷贝操作也会执行。
void f(int i, std::string const& s);
std::thread t(f, 3, "hello");
代码创建了一个调用f(3, "hello") 的线程。函数f需要一个std::string 对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const* 类型,线程的上下文完成字面值向std::string 的转化。
指向动态变量的指针作为参数的情况。
void f(int i, std::string const& s);
void oops(int some_param)
{
char buffer[1024]; //1
sprintf(buffer, "%i", some_param);
std::thread t(f, 3, buffer); //2
t.detach();
}
buffer 是一个指针变量,指向局部变量,然后通过buffer 传递到新线程中。此时,函数oops 可能会在buffer 转换成std::string 之前结束,从而导致未定义的行为。因为无法保证隐式转换和std::thread 构造函数的拷贝操作的顺序。
解决方案就是在传递std::thread 构造函数之前,就将字面值转化为std::string 。
void f(int i, std::string const& s);
void oops(int some_param)
{
char buffer[1024]; //1
sprintf(buffer, "%i", some_param);
std::thread t(f, 3, std::string(buffer)); //使用std::string,避免空悬指针
t.detach();
}
相反的情形(期望传递一个非常量引用,但是复制了整个对象)不会出现,因为会出现编译错误。比如,尝试使用线程更新引用传递的数据结构。
void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget,w,data); // 2
display_status();
t.join();
process_widget_data(data);
}
虽然update_data_for_widget 的第二个参数期待传入一个引用,但std::thread 构造函数并不知情。构造函数无视函数参数类型,盲目地拷贝已提供的变量。不过,内部代码会将拷贝的参数以右值的形式进行传递,因为函数期望的是一个非常量引用作为参数,所以会在编译时出错。
解决上述问题的方法是使用std::ref 将参数转换成引用的形式。因此可将线程的调用改为以下形式。
std::thread t(update_data_for_widget, w, std::ref(data))
可以传递一个成员函数指针作为线程函数,并提供一个合适的对象指针作为第一个参数。
class X
{
public:
void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work, &my_x); // 1
上述代码中,新线程将会调用my_x.do_lengthy_work() ,其中my_x 的地址作为对象指针提供给函数,也可以为成员函数提供参数,std::thread 构造函数的第三个参数就是成员函数的第一个参数,以此类推。
class X
{
public:
void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);
对于另一种有趣的情形是,提供的参数仅支持移动操作,不能拷贝。
当元对象是临时变量时,则自动进行移动操作,但当原对象是一个命名变量,转移的时候就需要使用std::move() 进行显式移动。
下面展示了std::move 的用法。展示了std::move 是如何转移动态对象的所有权到线程中去的。
void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));
通过在std::thread 构造函数中执行std::move(p) ,big_object 对象的所有权首先被转移到新创建线程的内部存储中,之后再传递给process_big_object 函数。
2.3 转移所有权
假设通过新线程返回的所有权调用一个需要后台启动线程的函数,并需要在函数中转移线程的所有权。这些操作都要等待线程结束才能进行,并且需要线程的所有权能够进行转移。
C++标准库中有很多资源占有类型,比如std::ifstream ,std::unique_ptr 还有std::thread 都是可移动,但是不可复制。说明执行线程的所有权可以在std::thread 实例中移动。下面将展示一个例子,创建两个执行线程,并在std::thread 实例之间转移所有权。
void some_function();
void some_other_function();
std::thread t1(some_function); // 1
std::thread t2=std::move(t1); // 2
t1=std::thread(some_other_function); // 3
std::thread t3; // 4
t3=std::move(t2); // 5
t1=std::move(t3); // 6 赋值操作将使程序崩溃
首先,新线程与t1 相关联①。当显式使用std::move() 创建t2 后②,t1 的所有权就转移给了t2 。之后,t1 和执行线程已经没有关联了,执行some_function 的函数线程与t2 关联。
然后,临时std::thread 对象相关的线程启动了③。为什么不显式调用std::move() 转移所有权呢?因为,所有者是一个临时对象——移动操作将会隐式的调用。
t3 使用默认构造方式创建④,没有与任何线程进行关联。调用std::move() 将t2 关联线程的所有权转移到t3 中⑤。因为t2 是一个命名对象,需要显式的调用std::move() 。移动操作⑤完成后,t1 与执行some_other_function 的线程相关联,t2 与任何线程都无关联,t3 与执行some_function 的线程相关联。
最后一个移动操作,将some_function 线程的所有权转移⑥给t1 。不过,t1 已经有了一个关联的线程(执行some_other_function 的线程),所以这里系统直接调用std::terminate() 终止程序继续运行。
需要在线程对象析构前,显式的等待线程完成,或者分离它,进行赋值也需要满足这些条件,不能通过赋新值给std::thread对象的方式来丢弃一个线程。
std::thread 支持移动,线程的所有权可以在函数外进行转移,如下面程序一样。
std::thread f()
{
void some_function();
return std::thread(some_function);
}
std::thread g()
{
void some_other_function(int);
std::thread t(some_other_function,42);
return t;
}
当所有权可以在函数内部传递,就允许std::thread 实例作为参数进行传递。
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}
std::thread 支持移动可以创建thread_guard 类的实例(定义见代码2.3),并且拥有线程所有权。当引用thread_guard 对象所持有的线程时,移动操作就可以避免很多不必要的麻烦。当某个对象转移了线程的所有权,就不能对线程进行汇入或分离。为了确保线程在程序退出前完成,定义了scoped_thread 类。
//代码2.6 scoped_thread
class scoped_thread
{
std::thread t;
public:
explicit scoped_thread(std::thread t_): // 1
t(std::move(t_))
{
if(!t.joinable()) // 2
throw std::logic_error(“No thread”);
}
~scoped_thread()
{
t.join(); // 3
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
struct func; // 定义在代码2.1中
void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state))); // 4
do_something_in_current_thread();
} // 5
与代码2.3相似,不过新线程会直接传递到scoped_thread 中④,而非创建一个独立变量。当主线程到达f()末尾时⑤,scoped_thread 对象就会销毁,然后在析构函数中完成汇入③。代码2.3中的thread_guard 类,需要在析构中检查线程是否“可汇入”。这里把检查放在了构造函数中②,并且当线程不可汇入时抛出异常。
std::thread 中对移动语义的支持,也适用于使用std::thread 的移动敏感容器。
//代码2.8 量产线程,等待它们结束
void do_work(unsigned id);
void f()
{
std::vector<std::thread> threads;
for (unsigned i = 0; i < 20; ++i)
{
threads.emplace_back(do_work,i); // 产生线程
}
for (auto& entry : threads) // 对每个线程调用 join()
entry.join();
}
2.4 确定线程数量
std::thread::hardware_concurrency() 可以返回并发线程的数量。在多核系统中,返回值可以是CPU核心的数量。返回值也仅仅是一个标识,当无法获取时,函数返回0。
下面代码实现了并行版的std::accumulate 。代码将整体工作拆分成小任务,交给每个线程去做,并设置最小任务数,避免产生太多的线程,程序会在操作数量为0时抛出异常。
//代码2.9 并行版的std::accumulate
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 1
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads; // 4
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // 8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9
for (auto& entry : threads)
entry.join(); // 10
return std::accumulate(results.begin(),results.end(),init); // 11
}
如果输入的范围为空①,就会得到init 的值。如果范围内的元素多于一个时,需要使用范围内元素的总数量除以线程(块)中最小任务数,从而确定启动线程的最大数量②。
因为上下文频繁切换会降低线程的性能,所以计算量的最大值和硬件支持线程数,较小值为启动线程的数量③,std::thread::hardware_concurrency() 返回0时,可以选择一个合适的数字,本例中,选择2。
每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得出的④。
现在,确定了线程个数,创建一个std::vector<T> 容器存放中间结果,并为线程创建一个std::vector<std::thread> 容器⑤。因为在启动之前已经有了一个线程(主线程),所以启动的线程数必须比num_threads 少1。
使用循环来启动线程:block_end 迭代器指向当前块的末尾⑥,并启动一个新线程为当前块累加结果⑦。当迭代器指向当前块的末尾时,启动下一个块⑧。
启动所有线程后,⑨中的线程会处理最终块的结果,因为知道最终块是哪一个,所以最终块中有多少个元素就无所谓了。
累加最终块的结果后,可等待std::for_each ⑩创建线程,之后使用std::accumulate 将所有结果进行累加。
2.5 线程标识
线程标识为std::thread::id 类型,可以通过两种方式进行检索。第一种,可以通过调用std::thread 对象的成员函数get_id() 来直接获取。如果std::thread 对象没有与任何执行线程相关联,get_id() 将返回std::thread::type 默认构造值,这个值表示”无线程“。第二种,当前线程调用std::this_thread::get_id() 也可以获得线程标识。
std::thread::id 对象可以自由的拷贝和对比,因为标识符可以复用。如果两个对象的std::thread::id 相等,那就是同一个线程,或者都无线程。如果不相等,那么就代表了两个不同的线程,或者一个有线程,另一个没有线程。
C++线程库不会限制你去检查线程标识符是否一致。
std::thread::id 实例常用作检测线程是否需要进行一些操作。比如,当用线程来分割一项工作时,主要线程可能要做一些与其他线程不同的工作,启动其他线程前,可以通过std::this_thread::get_id() 得到自己的线程ID,每个线程都要检查一下,其拥有的线程ID是否与初始线程的ID相同。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}
std::thread::id 可以作为线程的通用标识符,当标识符只和语义相关时,就需要这个方案,也可以使用输出流来记录一个std::thread::id 对象的值。
std::cout << std::this_thread::get_id();
具体的输出结果严格依赖于具体实现,C++标准要求保证ID相同的线程必须有相同的输出。
做一些与其他线程不同的工作,启动其他线程前,可以通过std::this_thread::get_id() 得到自己的线程ID,每个线程都要检查一下,其拥有的线程ID是否与初始线程的ID相同。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}
std::thread::id 可以作为线程的通用标识符,当标识符只和语义相关时,就需要这个方案,也可以使用输出流来记录一个std::thread::id 对象的值。
std::cout << std::this_thread::get_id();
具体的输出结果严格依赖于具体实现,C++标准要求保证ID相同的线程必须有相同的输出。
|