2)线程控制
① …
见Linux----多线程(上)
② 线程池
线程池 :
- 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
一次预先创建 一大批线程,让这些线程处于待机状态,一旦有数据或任务,直接可以交给线程处理 框架:thread_pool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
template<class T>
class ThreadPool{
private:
int num;
queue<T> q;
pthread_mutex_t lock;
pthread_cond_t cond;
ThreadPool()
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
}
public:
ThreadPool(int _num):num(_num)
{}
void *Routine(void *args)
{
pthread_detach(pthread_self());
while(true){
pthread_mutex_lock(&lock);
pthread_mutex_unlock(&lock);
}
}
void InitThreadPool()
{
for(auto i=0;i<num;i++){
pthread_t tid;
pthread_create(&tid, nullptr, Routine,nullptr);
}
}
void PushTask(const T &in)
{
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
};
main函数:tp.cc
#include "thread_pool.hpp"
int main()
{
ThreadPool<int> *tp=new ThreadPool<int>(5);
tp->InitThreadPool();
return 0;
}
Routine的static属性
以上代码编译: 报错 :
- Routine是类中的一个成员方法,包含了隐式参数*this,补全后是这样的
void *Routine(void *args, ThreadPool *this) ,不能匹配库中对Routine这个回调函数的参数要求void *(*start_routine) (void *)
解决方案 :
- 用static关键字修饰,static成员函数没有this指针,且只能访问static修饰的其他成员/方法,或者通过对象调用的方式进行内部方法属性的访问
所以我们可以在pthread_create的时候将this指针作为args传入Routine,同时在TheadPool内封装加锁解锁等涉及到访问类成员的函数LockQueue(), UnlockQueue()等,在Routine内将args强转为ThreadPool*类型,进行加锁解锁等已封装好的方法调用
Routine回调函数修改代码如下:
static void *Routinue(void *args)
{
pthread_detach(pthread_self());
ThreadPool *tp = (ThreadPool*)args;
while(true){
tp->LockQueue();
while(tp->IsEmpty()){
tp->ThreadWait();
}
T t;
tp->PopTask(&t);
tp->UnlockQueue();
t();
}
}
实现一个线程池,多线程处理任务(接收两数和一个操作符打印结果)
完整代码如下: 任务处理类:task_tp.hpp
#pragma once
#include <iostream>
int x;
Task()
{}
Task(int _x, int _y, char _op):x(_x),y(_y),op(_op)
{}
void operator()()
{
run();
}
void run()
{
int z = -1;
switch(op){
case '+':
z = x + y;
break;
case '-':
z = x - y;
break;
case '*':
z = x * y;
break;
case '/':
if(0 != y) z = x / y;
else std::cout << "Warning: div zero!" <<std::endl;
break;
case '%':
if(0 != y) z = x % y;
else std::cout << "Warning: div zero!" <<std::endl;
break;
default:
std::cout << "unknow operator!" << std::endl;
break;
}
std::cout <<"thread "<< "[" << pthread_self() << "] handler task done : "<< x << op << y << "=" << z << std::endl;
}
~Task(){}
};
线程池:thread_pool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "task_tp.hpp"
using namespace std;
template<class T>
class ThreadPool{
private:
int num;
queue<T> q;
pthread_mutex_t lock;
pthread_cond_t cond;
public:
ThreadPool(int _num):num(_num)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
}
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
bool IsEmpty()
{
return q.size() == 0;
}
void ThreadWait()
{
pthread_cond_wait(&cond, &lock);
}
void ThreadWakeup()
{
pthread_cond_signal(&cond);
}
void PopTask(T *out)
{
*out = q.front();
q.pop();
}
void PushTask(const T &in)
{
LockQueue();
q.push(in);
ThreadWakeup();
UnlockQueue();
}
static void *Routinue(void *args)
{
pthread_detach(pthread_self());
ThreadPool *tp = (ThreadPool*)args;
while(true){
tp->LockQueue();
while(tp->IsEmpty()){
tp->ThreadWait();
}
T t;
tp->PopTask(&t);
tp->UnlockQueue();
t();
}
}
void InitThreadPool()
{
for(auto i=0;i<num;i++){
pthread_t tid;
pthread_create(&tid, nullptr, Routine, this);
}
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
};
主函数:tp.cc
#include "thread_pool.hpp"
#include "task_tp.hpp"
#include <time.h>
#include <unistd.h>
#define NUM 5
int main()
{
srand((unsigned)time(nullptr));
ThreadPool<Task> *tp = new ThreadPool<Task>(NUM);
tp->InitThreadPool();
sleep(3);
const std::string ops = "+-*/%";
while(true){
int x = rand() % 50 + 1;
int y = rand() % 50 + 1;
char op = ops[rand()%5];
Task t(x, y, op);
tp->PushTask(t);
sleep(1);
}
return 0;
}
③单例模式(线程安全)
复习:c++中单例模式
更改上面实现的线程池为懒汉模式
回顾 :懒汉方式最核心的思想是 “延时加载”. 从而能够优化服务器的启动速度
线程池:thread_pool.hpp 注意:
- num直接改为在InitThreadPool(int num)中作为参数传入
- 私有构造函数,delete拷贝构造,delete赋值运算符重载
- public一个静态get_instance函数用于创建单例对象(返回instance)
- 加锁防止创建多个单例对象
- 双判断效率更高,第一次调用才会创建初始化单例对象
static pthread_mutex_t mtx= PTHREAD_MUTEX_INITIALIZER; 不需要destroy销毁mtx- volatile防止过度优化
template<class T>
class ThreadPool{
private:
queue<T> q;
pthread_mutex_t lock;
pthread_cond_t cond;
ThreadPool()
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
}
ThreadPool(const ThreadPool<T>&)=delete;
ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;
volatile static ThreadPool<T> *instance;
public:
static ThreadPool<T> *get_instance()
{
static pthread_mutex_t mtx= PTHREAD_MUTEX_INITIALIZER;
if(nullptr == instance){
pthread_mutex_lock(&mtx);
if(nullptr==instance){
instance=new ThreadPool<T>();
}
pthread_mutex_unlock(&mtx);
}
return instance;
}
};
template<class T>
ThreadPool<T>* ThreadPool<T>::instance=nullptr;
主函数:tp.cc
int count=3;
while(count--){
cout<<"懒汉模式,启动后"<<count<<"秒"<<"后 再进行加载..."<<endl;
sleep(1);
}
ThreadPool<Task> *tp = ThreadPool<Task>::get_instance();
tp->InitThreadPool(5);
④STL 智能指针与线程安全
STL 线程不安全 :
- STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全
智能指针:
- 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
- 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
⑤其他常见的各种锁 (了解MARK)
悲观锁 :在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起乐观锁 :每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试
自旋锁 ,公平锁,非公平锁… 自旋锁 :非自旋锁和自旋锁最大的区别,就是如果它遇到拿不到锁的情况,它会把线程阻塞,直到被唤醒。而自旋锁会不停地尝试,自旋锁的好处,那就是自旋锁用循环去不停地尝试获取锁,让线程始终处于 Runnable 状态,节省了线程状态切换带来的开销 是否使用自旋锁一般是由拥有锁的线程在临界区执行时间长短决定的,短:自旋 对于调用者来看依然是“卡住”的情况, 底层类似于
while(1){
if(
break;
}
pthread库中提供的自旋锁接口有: int pthread_spin_init(pthread_spinlock_t *lock int pshared); int pthread_spin_destroy(pthread_spinlock_t *lock); int pthread_spin_lock(pthread_spinlock_t *lock); int pthread_spin_unlock(pthread_spinlock_t *lock);
⑥读者写者(MARK以后看)
读写锁行为:
当前锁状态 | 读锁请求 | 写锁请求 |
---|
无锁 | 可以 | 可以 | 读锁 | 可以 | 阻塞 | 写锁 | 阻塞 | 阻塞 |
读者优先 :读者和写者一起到来的时候,让读者优先进入 写者优先 :当写者到来的时候,后续读者就暂时不能进入临界资源进行读取了,所有的正在读取的线程执行完毕,写者再进入!
|