一、Linux线程概念
1.1 什么是线程
- 在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
- 一切进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
1.2 线程的优点
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
加密,大数据运算等。线程不是越多越好,如果线程太多,对导致线程间被过度调度切换(有成本的) - I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
网络下载,云盘,ssh,在线直播等。线程也不是越多越好,不过IO允许多一些线程。因为IO比较慢。
1.3 线程的缺点
性能损失
- 一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
- 健壮性降低
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。 - 缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。 - 编程难度提高
编写与调试一个多线程程序比单线程程序困难得多
1.4 线程异常
- 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
1.5 线程用途
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
1.6 Linux进程VS线程
进程是系统资源分配的基本单位 线程CPU是调度的基本单位,承担进程资源一部分的基本实体 线程共享进程数据,但也拥有自己的一部分数据:
- 线程ID, 一组寄存器,栈,上下文,errno,信号屏蔽字,调度优先级,ipc资源。
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
进程和线程的关系如下图:
二、Linux线程控制
2.1 POSIX线程库
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
- 要使用这些函数库,要通过引入头文<pthread.h>
- 链接这些线程函数库时要使用编译器命令的“-lpthread”选项
2.2 创建线程
功能:创建一个新的线程
原型
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void*), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
错误检查:
- 传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误。
- pthreads函数出错时不会设置全局变量errno(而大部分其他POSIX函数会这样做)。而是将错误代码通过返回值返回
- pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,建议通过返回值业判定,因为读取返回值要比读取线程内的errno变量的开销更小
一次创建多个线程:
2.3 线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit(进程退出)。
- 线程可以调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
线程函数return pthread_exit函数
功能:线程终止
原型
void pthread_exit(void *value_ptr);
参数
value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
exit是终止进程,不要再其它线程中调用。 需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
pthread_cancel函数
功能:取消一个执行中的线程
原型
int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码
不要用其它线程去取消主线程
2.4 进程等待
为什么需要线程等待?
- 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
- 创建新的线程不会复用刚才退出线程的地址空间。
功能:等待线程结束
原型
int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
2.5 分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:
2.6 线程ID及进程地址空间布局
- pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事。
- 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
- pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。
- 线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID
pthread_t 到底是什么类型呢?取决于实现。对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。
三、 Linux线程互斥
3.1 进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
3.2 互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题
下面代码是一个简单的抢票逻辑:
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
int tickets = 100;
void* thread_run(void* args)
{
int id = *(int*)args;
delete (int*)args;
while(true)
{
if(tickets > 0)
{
usleep(1000);
std::cout << "我是[" << id << "], 我要抢的票是:" << tickets << std::endl;
tickets--;
}
else
{
break;
}
}
}
int main()
{
pthread_t tid[5];
for(int i = 0; i < 5; ++i)
{
int* id = new int(i);
pthread_create(tid+i, nullptr, thread_run, id);
}
for(int i = 0; i < 5; ++i)
{
pthread_join(tid[i], nullptr);
}
return 0;
}
为什么可能无法获得争取结果?
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- –ticket 操作本身就不是一个原子操作
取出ticket–部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34 <ticket>
– 操作并不是原子操作,而是对应三条汇编指令
load :将共享变量ticket从内存加载到寄存器中
update : 更新寄存器里面的值,执行-1操作
store :将新值,从寄存器写回共享变量ticket的内存地址
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量 互斥量的接口 初始化互斥量 初始化互斥量有两种方法:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量 销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock 时,可能会遇到以下情况
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
改进上面的售票系统:
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
class Tickets
{
public:
Tickets()
{
pthread_mutex_init(&_mtx, nullptr);
}
~Tickets()
{
pthread_mutex_destroy(&_mtx);
}
bool GetTickets()
{
bool ret = true;
pthread_mutex_lock(&_mtx);
if(_tickets > 0)
{
usleep(1000);
std::cout << "我是[" << pthread_self() << "], 我要抢的票是:" << _tickets << std::endl;
_tickets--;
}
else
{
std::cout << "票被抢完了" << std::endl;
ret = false;
}
pthread_mutex_unlock(&_mtx);
return ret;
}
private:
int _tickets = 100;
pthread_mutex_t _mtx;
};
void* thread_run(void* args)
{
Tickets* t = (Tickets*)args;
while(true)
{
if(!t->GetTickets())
{
break;
}
}
}
int main()
{
pthread_t tid[5];
Tickets* t = new Tickets;
for(int i = 0; i < 5; ++i)
{
pthread_create(tid+i, nullptr, thread_run, t);
}
for(int i = 0; i < 5; ++i)
{
pthread_join(tid[i], nullptr);
}
delete t;
return 0;
}
也可语言封装的mutex 互斥量实现原理探究
- 经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下
3.3 可重入VS线程安全
概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见的线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的
可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
四、常见锁概念
4.1 死锁
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁算法
五、Linux线程同步
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
5.1 同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
条件变量函数 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrictattr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
简单案例
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
pthread_mutex_t mtx;
pthread_cond_t cond;
void* ctrl(void* args)
{
std::string name = (char*)args;
while(true)
{
pthread_cond_signal(&cond);
std::cout << "master say: begin work" << std::endl;
sleep(1);
}
}
void* work(void* args)
{
int number = *(int*)args;
delete (int*)args;
while(true)
{
pthread_cond_wait(&cond, &mtx);
std::cout << "work: " << number << " is working..." << std::endl;;
}
}
int main()
{
#define NUM 3
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t master;
pthread_t worker[NUM];
pthread_create(&master, nullptr, ctrl, (void*)"master");
for(int i = 0; i < NUM; ++i)
{
int* number = new int(i);
pthread_create(worker+i, nullptr, work, number);
}
pthread_join(master, nullptr);
for(int i = 0; i < NUM; ++i)
{
pthread_join(worker[i], nullptr);
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
为什么 pthread_cond_wait 需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据
- 按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码
六、 生产者消费者模型
6.1 为何要使用生产者消费者模型
产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。 三种关系 生产者和生产者:竞争 互斥 消费者和消费者:竞争 互斥 消费者和竞争者:同步 互斥 两种角色 生产者和消费者(执行流) 一个交易场所 仓库(内存空间,stl容器)
6.2 生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
6.3 基于BlockingQueue的生产者消费者模型
BlockingQueue 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞) C++ queue模拟阻塞队列的生产消费模型
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
namespace ts
{
const int capacity = 5;
template <class T>
class BlockQueue
{
private:
std::queue<T> _bq;
int _cap;
pthread_mutex_t _mtx;
pthread_cond_t _is_full;
pthread_cond_t _is_empty;
private:
bool IsFull()
{
return _bq.size() == _cap;
}
void LcokQueue()
{
pthread_mutex_lock(&_mtx);
}
void UnLcokQueue()
{
pthread_mutex_unlock(&_mtx);
}
void ProducterWait()
{
pthread_cond_wait(&_is_empty, &_mtx);
}
bool IsEmpty()
{
return _bq.size() == 0;
}
void ConsumerWait()
{
pthread_cond_wait(&_is_full, &_mtx);
}
void WakeUpConsumer()
{
pthread_cond_signal(&_is_full);
}
void WakeUpProducter()
{
pthread_cond_signal(&_is_empty);
}
public:
BlockQueue()
:_cap(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_is_full, nullptr);
pthread_cond_init(&_is_empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_is_full);
pthread_cond_destroy(&_is_empty);
}
void push(const T& in)
{
LcokQueue();
while(IsFull())
{
ProducterWait();
}
_bq.push(in);
WakeUpConsumer();
UnLcokQueue();
}
void pop(T* out)
{
LcokQueue();
while(IsEmpty())
{
ConsumerWait();
}
*out = _bq.front();
_bq.pop();
WakeUpProducter();
UnLcokQueue();
}
};
}
#include "BlockQueue.hpp"
using namespace ts;
void* consumer(void *args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true)
{
sleep(2);
int data = 0;
bq->pop(&data);
std::cout << "消费者消费数据:" << data << std::endl;
}
}
void* producter(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true)
{
int data = rand()%20 + 1;
std::cout << "生产者生产数据: " << data << std::endl;
bq->push(data);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<int> *bq = new BlockQueue<int>;
pthread_t c;
pthread_t p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, producter, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
单生产者多消费者消费模型
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
~Task()
{
}
int operator ()()
{
return Run();
}
public:
int Run()
{
int ret = 0;
switch(_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
std::cout << "bug??" << std::endl;
break;
}
std::cout << "当线程是:" << pthread_self() << "正在执行" \
<< _x << _op << _y << "= " << ret << std::endl;
return ret;
}
private:
int _x;
int _y;
char _op;
};
}
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace ts;
using namespace ns_task;
void *consumer(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
while (true)
{
Task t;
bq->pop(&t);
t();
}
}
void *producter(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
std::string ops = "+-*/%";
while (true)
{
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
char op = ops[rand() % 5];
std::cout << "当前线程是" << pthread_self() << "生产的任务是" << x << " " << y << " " << op << std::endl;
Task t(x, y, op);
bq->push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>;
pthread_t c1;
pthread_t c2;
pthread_t c3;
pthread_t c4;
pthread_t p;
pthread_create(&c1, nullptr, consumer, bq);
pthread_create(&c2, nullptr, consumer, bq);
pthread_create(&c3, nullptr, consumer, bq);
pthread_create(&c4, nullptr, consumer, bq);
pthread_create(&p, nullptr, producter, bq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
6.4 POSIX信号量
什么是信号量:本质是一把计数器,用来描述临界资源数目的大小。
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量)
6.5 基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
多生产者多消费者生产消费模型
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
namespace ns_ring_queue
{
int g_cap_default = 10;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = g_cap_default)
: _rq(cap)
{
_cap = cap;
sem_init(&_blank_sem,0, cap);
sem_init(&_data_sem, 0, 0);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mtx, nullptr);
pthread_mutex_init(&_p_mtx, nullptr);
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_c_mtx);
pthread_mutex_destroy(&_p_mtx);
}
public:
void push(const T &in)
{
sem_wait(&_blank_sem);
pthread_mutex_lock(&_p_mtx);
_rq[_p_step] = in;
_p_step++;
_p_step %= _cap;
pthread_mutex_unlock(&_p_mtx);
sem_post(&_data_sem);
}
void pop(T *out)
{
sem_wait(&_data_sem);
pthread_mutex_lock(&_c_mtx);
*out = _rq[_c_step];
_c_step++;
_c_step %= _cap;
pthread_mutex_unlock(&_c_mtx);
sem_post(&_blank_sem);
}
private:
std::vector<T> _rq;
int _cap;
sem_t _blank_sem;
sem_t _data_sem;
int _c_step;
int _p_step;
pthread_mutex_t _c_mtx;
pthread_mutex_t _p_mtx;
};
}
#include "ring_queue.hpp"
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
using namespace ns_ring_queue;
void *consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (true)
{
int data = 0;
rq->pop(&data);
std::cout << "消费者消费数据:" << data << pthread_self() << std::endl;
sleep(1);
}
}
void *productor(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while (true)
{
int data = rand() % 20 + 1;
std::cout << "生产数据时:" << data << pthread_self() << std::endl;
rq->push(data);
}
}
int main()
{
srand((long long)time(nullptr));
pthread_t c1;
pthread_t c2;
pthread_t c3;
pthread_t c4;
pthread_t p1;
pthread_t p2;
pthread_t p3;
pthread_t p4;
RingQueue<int> *rq = new RingQueue<int>;
pthread_create(&c1, nullptr, consumer, rq);
pthread_create(&c2, nullptr, consumer, rq);
pthread_create(&c3, nullptr, consumer, rq);
pthread_create(&c4, nullptr, consumer, rq);
pthread_create(&p1, nullptr, productor, rq);
pthread_create(&p2, nullptr, productor, rq);
pthread_create(&p3, nullptr, productor, rq);
pthread_create(&p4, nullptr, productor, rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
pthread_join(p4, nullptr);
return 0;
}
生产者和消费者之间不仅仅是传输数据,而是派发任务和处理任务
#include "ring_queue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string>
using namespace ns_task;
using namespace ns_ring_queue;
void *consumer(void *args)
{
RingQueue<Task> *rq = (RingQueue<Task> *)args;
while (true)
{
Task t;
rq->pop(&t);
t();
}
}
void *productor(void *args)
{
RingQueue<Task> *rq = (RingQueue<Task> *)args;
std::string ops = "+-*/%";
while (true)
{
int x = rand()%20 + 1;
int y = rand()%10 + 1;
char op = ops[rand()%5];
std::cout << "生产者派发任务" << x << " " << y << " " << op << " " << \
"当前线程 " << pthread_self() << std::endl;
Task t(x, y, op);
rq->push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
pthread_t c1;
pthread_t c2;
pthread_t c3;
pthread_t c4;
pthread_t p1;
pthread_t p2;
pthread_t p3;
pthread_t p4;
RingQueue<Task> *rq = new RingQueue<Task>;
pthread_create(&c1, nullptr, consumer, rq);
pthread_create(&c2, nullptr, consumer, rq);
pthread_create(&c3, nullptr, consumer, rq);
pthread_create(&c4, nullptr, consumer, rq);
pthread_create(&p1, nullptr, productor, rq);
pthread_create(&p2, nullptr, productor, rq);
pthread_create(&p3, nullptr, productor, rq);
pthread_create(&p4, nullptr, productor, rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
pthread_join(p4, nullptr);
return 0;
}
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
~Task()
{
}
int operator ()()
{
return Run();
}
public:
int Run()
{
int ret = 0;
switch(_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
std::cout << "bug??" << std::endl;
break;
}
std::cout << "当线程是:" << pthread_self() << "正在执行" \
<< _x << _op << _y << "= " << ret << std::endl;
return ret;
}
private:
int _x;
int _y;
char _op;
};
}
七、线程池
线程池:
- 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利
用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 - 线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
- 线程池的种类:
线程池示例: 1.创建固定数量线程池,循环从任务队列中获取任务对象, 2.获取到任务对象后,执行任务对象中的任务接口
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
namespace ns_task_queue
{
int g_num_default = 10;
template<class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void UnLock()
{
pthread_mutex_unlock(&_mtx);
}
bool IsEmpty()
{
return _tq.empty();
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
public:
ThreadPool(int num = g_num_default)
:_num(g_num_default)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
void PushTask(const T& in)
{
Lock();
_tq.push(in);
UnLock();
WakeUp();
}
void PopTask(T* out)
{
*out = _tq.front();
_tq.pop();
}
static void* Rountine(void* args)
{
pthread_detach(pthread_self());
ThreadPool<T>* tq = (ThreadPool<T>*)args;
while(true)
{
T t;
tq->Lock();
while (tq->IsEmpty())
{
tq->Wait();
}
tq->PopTask(&t);
tq->UnLock();
t();
}
}
void InitThreadPool()
{
pthread_t tid[_num];
for(int i = 0; i < _num; ++i)
{
pthread_create(tid+1, nullptr, Rountine, this);
}
}
private:
int _num ;
std::queue<T> _tq;
pthread_mutex_t _mtx;
pthread_cond_t _cond;
};
}
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
~Task()
{
}
int operator ()()
{
return Run();
}
public:
int Run()
{
int ret = 0;
switch(_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
std::cout << "bug??" << std::endl;
break;
}
std::cout << "当线程是:" << pthread_self() << "正在执行" \
<< _x << _op << _y << "= " << ret << std::endl;
return ret;
}
private:
int _x;
int _y;
char _op;
};
}
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <stdlib.h>
#include <unistd.h>
using namespace ns_task;
using namespace ns_task_queue;
int main()
{
srand((long long)time(nullptr));
ThreadPool<Task>* tq = new ThreadPool<Task>;
tq->InitThreadPool();
while(1)
{
Task t(rand()%20+1, rand()%10+1, "+-*/%"[rand()%5]);
tq->PushTask(t);
sleep(1);
}
return 0;
}
八、线程安全的单例模式
8.1 单例模式概念
什么是单例模式 单例模式是一种 “经典的, 常用的, 常考的” 设计模式.
什么是设计模式 IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点 某些类, 只应该具有一个对象(实例), 就称之为单例.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据
8.2 懒汉方式实现单例模式(线程安全版本)
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
namespace ns_task_queue
{
int g_num_default = 10;
template <class T>
class ThreadPool
{
private:
void Lock()
{
pthread_mutex_lock(&_mtx);
}
void UnLock()
{
pthread_mutex_unlock(&_mtx);
}
bool IsEmpty()
{
return _tq.empty();
}
void Wait()
{
pthread_cond_wait(&_cond, &_mtx);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
ThreadPool(int num = g_num_default)
: _num(g_num_default)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &tp) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;
public:
static ThreadPool<T> *GetInstance()
{
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
if (ins == nullptr)
{
pthread_mutex_lock(&lock);
if (ins == nullptr)
{
ins = new ThreadPool<T>;
ins->InitThreadPool();
}
pthread_mutex_unlock(&lock);
}
return ins;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
void PushTask(const T &in)
{
Lock();
_tq.push(in);
UnLock();
WakeUp();
}
void PopTask(T *out)
{
*out = _tq.front();
_tq.pop();
}
static void *Rountine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tq = (ThreadPool<T> *)args;
while (true)
{
T t;
tq->Lock();
while (tq->IsEmpty())
{
tq->Wait();
}
tq->PopTask(&t);
tq->UnLock();
t();
}
}
void InitThreadPool()
{
pthread_t tid[_num];
for (int i = 0; i < _num; ++i)
{
pthread_create(tid + 1, nullptr, Rountine, this);
}
}
private:
int _num;
std::queue<T> _tq;
pthread_mutex_t _mtx;
pthread_cond_t _cond;
static ThreadPool<T> *ins;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::ins = nullptr;
}
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{
}
~Task()
{
}
int operator ()()
{
return Run();
}
public:
int Run()
{
int ret = 0;
switch(_op)
{
case '+':
ret = _x + _y;
break;
case '-':
ret = _x - _y;
break;
case '*':
ret = _x * _y;
break;
case '/':
ret = _x / _y;
break;
case '%':
ret = _x % _y;
break;
default:
std::cout << "bug??" << std::endl;
break;
}
std::cout << "当线程是:" << pthread_self() << "正在执行" \
<< _x << _op << _y << "= " << ret << std::endl;
return ret;
}
private:
int _x;
int _y;
char _op;
};
}
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <stdlib.h>
#include <unistd.h>
using namespace ns_task;
using namespace ns_task_queue;
int main()
{
srand((long long)time(nullptr));
ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance();
while(1)
{
Task t(rand()%20+1, rand()%10+1, "+-*/%"[rand()%5]);
tp->PushTask(t);
sleep(1);
}
return 0;
}
九、STL,智能指针和线程安全
STL中的容器是否是线程安全的? 不是. 原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响. 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶). 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
十、其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁,公平锁,非公平锁?如果不断通过循环检测锁的状态
十一、读者写者问题[选学]
读写锁 在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
- 注意:写独占,读共享,读锁优先级高
- 遵循321原则
三种关系: 写者和写者:互斥关系 写者和读者:互斥关系 读者和读者:没有关系 - 生产者VS读者写者
根本原因:读者不会拿走数据,而消费者会拿走数据
读写锁接口 设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
|