0. 简介
在设计复杂的运行程序时,我们经常需要创建一定数量的线程,然而很多时候线程不都是一直执行的,会存在一些线程处于空闲状态。所以通过线程池的方式,可以有效的对线程进行分配。若线程池中有空闲线程,则从线程池中取出一个空闲的线程处理该任务,任务处理完后,该线程被放到线程池中;若线程池中无空闲线程,则将任务放入任务队列等待线程池中有线程空闲,这样的处理方式可以避免线程在建立与销毁时存在的开销。
1. 基础知识
2. Cartographer中的线程池
关键参数
- 任务队列(task_queue_):每个线程的DoWork()线程空闲时都会通过反复读取该队列来获得任务,各线程通过互斥锁防止同时读取。
- 等待队列(tasks_not_ready_):尚未ready的任务队列,其其依赖的任务(dependent_tasks_)还没准备好,直到dependent_tasks_都完成了,主任务(Task)才能执行将tasks_not_ready转为task_queue_。
- 任务依赖(dependent_tasks_):dependent_tasks_都完成了,主任务(Task)才能执行
核心函数
创建线程池:
ThreadPool::ThreadPool(int num_threads) {
absl::MutexLock locker(&mutex_);
for (int i = 0; i != num_threads; ++i) {
pool_.emplace_back([this]() { ThreadPool::DoWork(); });
}
}
thread_pool::DoWork():
初始化的时候每个线程都执行该死循环函数ThreadPool::DoWork(),并直到析构才返回,只要task_queue有任务,就执行操作。
void ThreadPool::DoWork() {
#ifdef __linux__
CHECK_NE(nice(10), -1);
#endif
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
return !task_queue_.empty() || !running_;
};
for (;;) {
std::shared_ptr<Task> task;
{
absl::MutexLock locker(&mutex_);
mutex_.Await(absl::Condition(&predicate));
if (!task_queue_.empty()) {
task = std::move(task_queue_.front());
task_queue_.pop_front();
} else if (!running_) {
return;
}
}
CHECK(task);
CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
Execute(task.get());
}
}
在初始化成功并拿到task_queue_后,该Task将会按以下几个状态顺序执行:
enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
task->SetWorkItem(task)
新建task实例,状态默认为NEW,然后通过task->SetWorkItem设置任务(示例中运行的函数为DrainWorkQueue)
auto scan_matcher_task = absl::make_unique<common::Task>();
scan_matcher_task->SetWorkItem(
[&submap_scan_matcher, &scan_matcher_options]() {
submap_scan_matcher.fast_correlative_scan_matcher =
absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*submap_scan_matcher.grid, scan_matcher_options);
});
submap_scan_matcher.creation_task_handle =
thread_pool_->Schedule(std::move(scan_matcher_task));
task2->AddDependency(task1)
该函数是可选的,有依赖的任务才需添加,其含义是task2依赖于task1,只有在task1执行完后,task2才能执行。
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
ComputeConstraint(submap_id, submap, node_id, true,
constant_data, transform::Rigid2d::Identity(),
*scan_matcher, constraint);
});
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
thread_pool->Schedule(task)
将task赋给tasks_not_ready_并将task状态变为DISPATCHED ,判断其依赖的任务是否加载完成,若完成则将状态置为DEPENDENCIES_COMPLETED ,然后task加入task_queue_并从tasks_not_ready_移除等待线程执行任务;若依赖未完成,则等待依赖的task执行完。在cartographer中存在有不同的数据用于动态加载,可能存在有依赖,此时Schedule起到了层级的作用。
std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) {
std::shared_ptr<Task> shared_task;
{
std::lock_guard<std::mutex> lock(mutex_);
auto insert_result =
tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
if (!insert_result.second) {
return insert_result.first->second;
}
shared_task = insert_result.first->second;
}
SetThreadPool(shared_task.get());
return shared_task;
}
|