1??线程概念
什么是线程
- 线程(thread)是进程中的一条执行路线,也可以说成线程是“一个进程内部的控制序列”。
通过下面内容可以理解“线程(thread)是进程中的一条执行路线”: 在我们之前学的进程中,一个进程的创建,操作系统会给该进程创建一个进程控制块(PCB),还要拷贝父进程的进程地址空间。如果子进程对父进程的数据进行读取并写入,就会发生写时拷贝,体现了进程的独立性。如果我们想要让该子进程能够和父进程一起去执行某个任务,则需要让子进程task_struct去指向父进程的进程地址空间,自己不需要自己的进程地址空间,这样当子进程去对父进程的数据进行写入时,就不会发生写时拷贝了,也可以和父进程一起完成任务,想当于该父进程有两个执行流,而这样的子进程可以通过vfork 函数来创建。
我们有可以得出
- 一切进程至少都有一个执行线程
(我们之前学的进程都是单线程进程)
如果多线程创建好了,进程中的多个线程都看见看到同一块资源,而进程对这块资源分配给线程来完成一个任务。
- 所以说线程是在进程内部完成的,本质是在进程地址空间内运行的。
- 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
线程的优点
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
计算密集型:执行流的大部分任务,主要以计算为主:加密解密,排序查找。 IO密集型:执行流的大部分任务是以IO为主的:刷磁盘,访问数据库,访问网络。
线程的缺点
性能损失
- 一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
健壮性降低
- 编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
缺乏访问控制
- 进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
比如:在多线程进程中,其中一个线程进行了一次I/O调用,这导致从用户态切换到内核态,把该进程置于阻塞状态,并切换到另一个进程(对用户级线程)。
编程难度提高
线程异常
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
线程异常
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
Linux进程VS线程
进程是资源分配的基本实体。 线程是调度的基本单位。 线程共享一部分数据,但也拥有自己的一部分数据
- 线程ID
- 一组寄存器
- 栈
- error
- 信号屏蔽字
- 调度优先级
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(SIG_IGN、SIG_DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
进程和线程的关系如下图:
2??线程控制
Linux中没有正真的线程,线程中的结构是模拟了进程的PCB,所以,Linux内核中没有正真意义上关于线程的系统调用,我们使用的使用要引用<pthread.h> 的头文件。 在使用编译器编译的时候,要指明使用pthread库,选项-lpthread
创建线程
功能:创建一个线程
原型:int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
参数:theard:返回线程ID(输入型参数)
attr:设置线程的属性,attr为NULL表示默认属性
start_routine:函数地址,线程启动后执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0,失败返回错误码
错误检查:
传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误。
pthreads函数出错时不会设置全局变量errno(而大部分其他POSIX函数会这样做)。而是将错误代码通
过返回值返回
pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,
建议通过返回值业判定,因为读取返回值要比读取线程内的errno变量的开销更小
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
void* Routine(void* buf)
{
printf("%s\n",(char*)buf);
return NULL;
}
int main()
{
pthread_t t1;
pthread_create(&t1,NULL,Routine,(void*)"establish succeed");
while(1);
return 0;
}
运行结果:
establish succeed
获取线程的id
功能:获取线程在用户层的id
原型:pthread_self(void);
在我们创建了一个线程的时候,通过ps ajx |head -1&&ps ajx|grep ./a.out |grep -v grep 命令查看进程时,只能看到一个进程。并且这两个执行流的pid是一样的。
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
void* Routine(void* buf)
{
while(1){
printf("%s:---->pid:%d---\n",(char*)buf,getpid());
sleep(1);
}
return NULL;
}
int main()
{
pthread_t t1;
pthread_create(&t1,NULL,Routine,(void*)"establish succeed");
while(1){
printf("--->pid:%d<----\n",getpid());
sleep(1);
}
return 0;
}
这说明这两个执行流是一个进程。 我们是通过ps -aL|head -1 &&ps -aL|grep a.out 来查看线程。 但是,当我们通过pthread_self函数获取的线程id和LWP不同。LWP是给内核看到,而pthread_self函数获取的id是用户层的id,给用户看到。
LWP是轻量级进程,在Linux下进程是资源分配的基本单位,线程是cpu调度的基本单位,而线程使用进程PCB描述实现,并且同一个进程中的所有PCB共用同一个虚拟地址空间,因此相较于传统进程更加的轻量化。 那么用户层的id又是什么呢?进程地址空间的一块地址
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
void* Routine(void * asg)
{
pthread_t ret=pthread_self();
while(1){
printf("----id:%lu-----\n",ret);
printf("----id:%p-----\n",ret);
sleep(1);
}
return NULL;
}
int main()
{
pthread_t t1;
pthread_create(&t1,NULL,Routine,NULL);
while(1);
return 0;
}
如图: 我们使用的pthread库是通过动态链接的,在进程地址空间的共享区中,其中创建线程中线程的结构也在其中(线程的一些属性),通过上图我们可以看到,该结构是在动态库中的,所以在我们调度线程的时候或者切换线程的时候不用区内核中,而是在库中来找到相关的函数来调度,这也就是为什么说线程是在进程地址空间中运行的。 而我们可以通过用户级的id找到这块空间,来调度这个线程。这就是使用pthread_self函数获得的id的作用。
补充一下内容:线程是有自己的寄存器的,当线程还没有执行完自己的任务然而时间片到了,那里该寄存器是来存放上下文数据的。线程是有自己的栈的,当一个线程在执行任务时产生了临时数据是放在这个栈中,不会干扰其他的进程。(自己的理解哈)
线程终止
在主线程中直接用return结束,是整个进程的结束。
如果需要终止某个线程而不是终止整个进程,可以有三种方法:
-
从线程函数return。 -
线程可以调用pthread_exit终止自己 -
一个线程可以调用pthread_cancel终止同一进程中的同一线程 功能:线程终止
原型:void pthread_exit(void *retval);
参数:retval:retval不要指向一个局部变量。
需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函 数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
功能:取消一个执行中的线程
原型:int pthread_cancel(pthread_t thread);
参数:thread:线程id
返回值:成功返回0;失败返回错误码
等待线程
进程中父进程需要等待子进程,防止子进程形成僵尸进程,造成内存泄漏。 那么,在线程中,主线程一样也要等待其他是线程。当线程退出后,如果主进程没有等待其他线程,那么主线程不知道其他线程是否完成了自己的任务,这导致线程的空间没有被释放,仍然在进程地址空间中,当创建新线程后,不会复用这块空间,这就会导致内存泄漏。
功能:等待线程结束
原型:int pthread_join(pthread_t thread, void **retval);
参数:thread:线程的id
retval::它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止。 thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
pthread_t t1,t2,t3,t4;
void* Routine1(void * asg)
{
printf("%s....quit\n",(char*)asg);
return NULL;
}
void* Routine2(void *asg)
{
printf("%s....quit\n",(char*)asg);
return (void*)1;
}
void *Routine3(void* asg)
{
printf("%s....quit\n",(char*)asg);
pthread_exit((void*)2);
}
void *Routine4(void* asg)
{
printf("%s....quit\n",(char*)asg);
pthread_cancel(t4);
return NULL;
}
int main()
{
pthread_create(&t1,NULL,Routine1,(void*)"thread 1");
pthread_create(&t2,NULL,Routine2,(void*)"thread 2");
pthread_create(&t3,NULL,Routine3,(void*)"thread 3");
pthread_create(&t4,NULL,Routine4,(void*)"thread 4");
void* ret1=NULL;
void* ret2=NULL;
void* ret3=NULL;
void* ret4=NULL;
pthread_join(t1,&ret1);
pthread_join(t2,&ret2);
pthread_join(t3,&ret3);
pthread_join(t4,&ret4);
printf("thread return, thread id %lu, return code:%d\n", t1 , *(int*)&ret1);
printf("thread return, thread id %lu, return code:%d\n", t2 , *(int*)&ret2);
printf("thread return, thread id %lu, return code:%d\n", t3, *(int*)&ret3);
printf("thread return, thread id %lu, return code:%d\n", t4, *(int*)&ret4);
return 0;
}
线程分离
创建线程,要对线程进行等待,否则无法释放资源,从而导致内存泄漏。如果不关心线程的符号值,那么等待就是一种负担,这个时候,我们可以告诉系统,当这个线程退出时,自动释放线程的资源。
功能:线程分离
原型:int pthread_detach(pthread_t thread);
参数:线程id
返回值:成功时返回0;出错时,它返回一个错误号。
可以是线程组内其他线程对目标线程进行分离,也可以线程自己分离
3??线程互斥
进程线程间的互斥概念
在学习管道的时候,管道是自带同步与互斥的。而在线程中,当多个线程没有加锁的情况下同时访问临界资源时会发生混乱。在举例之前,先了解几个概念。
- 临界资源:多个线程执行流共享的资源叫做临界资源
- 临界区:每个线程内部访问临界资源的代码叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完
成
互斥量
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。多个线程并发的操作共享变量,会带来一些问题。
我们可以通过一个买票的例子,来看这块问题。
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
int ticket=2000;
void *STicket(void* asg)
{
while(1){
if(ticket>0){
usleep(100);
printf("%s sang ticket:%d \n",(char*)asg,ticket--);
}
else{
break;
}
}
return NULL;
}
int main()
{
pthread_t t[4];
int i;
for(i=0;i<4;i++){
char* p=(char*)malloc(sizeof(char)*64);
sprintf(p,"pthread t%d",i);
pthread_create(&t[i],NULL,STicket,(void *)p);
}
pthread_join(t[0],NULL);
pthread_join(t[1],NULL);
pthread_join(t[2],NULL);
pthread_join(t[3],NULL);
return 0;
}
我们在运行结果中可以看到,票的数量本不可能出现负数的,但是在结果中出现了,那么这就是一个问题。 多个线程并发的访问同一块临界资源,我们用t1,t2,t3,t4,来表示四个线程。一开始票的数量有1000张。
《出现问题1》当t1首先访问到票时,判断票还有剩余,于是拿走一张票,票还剩999张。但是这些线程是并发执行的,有可能多个线程同时拿到票,且通过对票进行减减操作,那么这个票是重复了。 《出现问题2》当t3拿到票的时候,刚准备对票进行减减,时间片就到了,线程退出,那么在t3这个线程内把读取到的票的数量保存起来,当t3这个线程有运行时,先恢复上下文数据,然后对山下文数据中保存票的数量进行减减,当t3这个线程完成了操作后,把剩余票的数量进行更新,那么在t3没有运行前,票已经抢完了,但是t3它不知道,然后又把票的数量进行更新了,票又回来了,这个时候又出错了。出现负数的情况就是这样。 在我们判断票是否有剩余的时候,和对票减减的时候,并不是具有原子性的,因为这个时候,其他线程也在进行抢票,可能拿到重复的票。我们可以通过汇编来验证是否具有原子性。
#include<stdio.h>
int main()
{
int a=5;
a--;
return 0;
}
–操作并不是原子性,而是对应了三条汇编:
- load :将共享变量ticket从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1操作
- store :将新值,从寄存器写回共享变量ticket的内存地址
想要解决上面的问题,需要做到三点:
- 代码必须要有互斥行为:当一个线程的代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区
- 果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
而以上的三点本质就是加一把锁,在Linux上提供的这把锁叫做互斥量。
先要理解这个锁。当多个线程同时要执行临界区的代码,那么谁先申请到这把锁,谁就执行,其他的线程就开始进行等待,等待这把锁被释放,然后申请这把锁。
互斥量的接口
初始化互斥量有两中方法:
- 方法1,静态分配
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER - 方法2,动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); 参数:mutex:要初始化的互斥量 attr:设置属性,一般设置NULL,用默认设置 返回值:成功返回0,错误返回错误号
功能:销毁互斥量
原型:int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:mutex:要销毁的互斥量
返回值:成功返回0,错误返回错误号
注意:
- 使用PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
互斥量加锁和解锁
功能:加锁
原型:int pthread_mutex_lock(pthread_mutex_t *mutex);
参数:mutex:要加锁的互斥量
返回值:成功返回0,错误返回错误号
功能:解锁
原型:int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数:mutex:要解锁的互斥量
返回值:成功返回0,错误返回错误号
调用pthread_mutex_lock会遇到的情况
- 互斥量处于没锁的状态,该函数将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
现在我们对之前的买票系统进行改进
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
int ticket=2000;
pthread_mutex_t lock;
void *STicket(void* asg)
{
while(1){
pthread_mutex_lock(&lock);
if(ticket>0){
usleep(100);
printf("%s sang ticket:%d \n",(char*)asg,ticket--);
}
else{
pthread_mutex_unlock(&lock);
break;
}
pthread_mutex_unlock(&lock);
}
return NULL;
}
int main()
{
pthread_mutex_init(&lock,NULL);
pthread_t t[4];
int i;
for(i=0;i<4;i++){
char* p=(char*)malloc(sizeof(char)*64);
sprintf(p,"pthread t%d",i);
pthread_create(&t[i],NULL,STicket,(void *)p);
}
pthread_join(t[0],NULL);
pthread_join(t[1],NULL);
pthread_join(t[2],NULL);
pthread_join(t[3],NULL);
pthread_mutex_destroy(&lock);
return 0;
}
《问题1》:一个线程拿到了锁,会不会被其他线程切换? 答:会被切换,当这个拿到锁的线程切换到了其他线程,其他线程依然没有锁,依然要等待,然而当拿到锁的线程又开始运行时,首先要先恢复上下文数据,这个线程依然是拿到锁的状态(这个线程是拿着锁被切走的),可以继续执行临界区的代码。
《问题2》:申请锁的过程是不是原子性的? 答:申请锁的原子性的,要么没有申请到锁,要么锁已经释放了,可以申请锁。
《问题3》:锁本身就是临界资源,那么谁来保护锁? 答:锁是来保护临界资源的,但是锁也是临界资源的呀。但是锁本身就具有原子性,申请锁的过程必须是原子性的。
互斥量的实现原理研究
通过上面的例子,大家已经意识到单纯的i++和++i都不是原子的,有可能会有数据一致性的问题。
为了实现互斥锁的操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相互交换,由于只有一条指令,保证了原子性。即使是多处理器平台,访问内存的总线周期也是有先有后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
我们可以通过lock和unlock的伪代码改一下。
因为在申请锁的时候,只是一条指令来完成,直接交换,所以具有原子性。
注意:
- 线程在申请锁的时候不可能一起去申请,而是根据指令周期去决定的。
- 寄存器不是被所以线程共享的,每个线程都有自己的一组寄存器,而线程是上下文就保存在其中。
怎么看待线程在申请锁的时候等待挂起
在线程程要申请CPU资源去完成某种任务是,这些线程是在运行队列中排队。当其中一个线程要锁这个资源才能在CPU上完成某种任务时,CPU不会因为这个线程要先申请锁而去等这个线程,而运行队列中的下一个线程来申请CPU资源。而那么要申请锁的线程就去锁的资源等待队列去排队申请。
比如:我们在下载某个资源的时候,突然卡在了百分之99不动了。站在进程等待队列理解!进程线程等待某种资源,在OS层就是将当前的进程或线程的task_struct放入对应的等待队列!R->S,这种情况可以称之为当前进程或线程的等待挂起!在用户看到自己的进程卡这不动了,一般称之为应用阻塞了!
可重入VS线程安全
概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见的线程不安全情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见的不可重入情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见的可重入情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全的联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全的区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的
4??死锁
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
自己也可以让自己死锁。
#include<stdio.h>
#include<pthread.h>
pthread_mutex_t lock;
int count=10;
void * Rounit(void *asg)
{
while(1){
pthread_mutex_lock(&lock);
pthread_mutex_lock(&lock);
if(count>0){
count--;
}else{
pthread_mutex_unlock(&lock);
break;
}
pthread_mutex_unlock(&lock);
}
}
int main()
{
pthread_t t1;
pthread_mutex_init(&lock,NULL);
pthread_create(&t1,NULL,Rounit,NULL);
pthread_join(t1,NULL);
pthread_mutex_destroy(&lock);
return 0;
}
这会行成死锁,因为线程申请了两次锁,第二次申请锁的时候锁已经在你手上了,但因为你还要等待锁被释放,所以一直等待,形成了死锁。
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁的算法
5??线程同步
在上面的买票系统中,如果线程1的优先级非常高,那么会不会出现票都被线程A给抢完了。线程1申请锁后抢票完成,释放锁,释放完后线程A又申请到锁,如此往复,直到票买完了。按理说这样没有错,各凭本事买票嘛,但这样没有高效的让多个执行流使用这个资源,那么多执行流就没有意义了。线程同步就是来解决这个问题的。要申请锁的所有线程依次排队申请,使用完锁的线程去队尾排队,这样就防止了一个优先级高的线程抢完所以资源。
条件变量
条件变量我们可以理解为:条件变量使我们可以睡眠等待某种条件的出现。(可能会有点抽象)
举个例子:线程A要在某个队列中拿资源,线程B在某个队列中放资源。在没有加条件变量和加了互斥量(锁)的情况下,线程A申请到锁,去访问这个队列,队列中没有资源,线程A释放锁,后又去申请锁,去访问这个队列,队列中又没有资源,如此往复,使得这个逻辑存在严重的效率问题。当我们加了条件变量的时候,当线程A申请到锁后访问这个队列,队列中没有资源,释放锁等待。线程B去申请锁访问这个队列去放入资源,当这个资源达到了某个条件,线程B唤醒线程A去访问这个队列,拿到想要的资源,这个就是条件变量的作用,不会一直让线程A去申请锁释放锁,进行没有必要的动作,反而影响了线程B工作。
饥饿问题:多个执行流,在保证互斥地访问同一块资源时,该资源一直被同一个执行流访问,就会导致其他执行流形成饥饿,这种现象就做饥饿问题。
同步概念和竞态条件
- 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
初始化条件变量
功能:初始化条件变量
原型:int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
参数:cond:要初始化的条件变量
attr:条件变量的属性,设置NULL,使用默认的。
销毁条件变量
功能:释放条件变量
原型:int pthread_cond_destroy(pthread_cond_t *cond);
参数:cond:要销毁的条件变量
等待条件满足
功能:等待条件满足
原型: int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
唤醒等待
功能:唤醒等待队列中队头线程
原型:int pthread_cond_signal(pthread_cond_t *cond);
参数:cond:在这个条件变量上唤醒
功能:唤醒所以线程
原型 :int pthread_cond_broadcast(pthread_cond_t *cond);
参数:cond:在这个条件变量上唤醒
返回值:成功返回0;失败返回错误号
pthread_cond_t
我们设置条件变量的类型是pthread_cond_t。
struct pthread_cond_t
{
int flag;
task_struct *queue;
}
简单的案例:
#include<iostream>
#include<pthread.h>
#include<cstdio>
pthread_mutex_t lock;
pthread_cond_t cond;
int ticket=6;
void* RunRoute(void* arg)
{
pthread_detach(pthread_self());
while(true){
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond,&lock);
if(ticket>0){
std::cout<<(char*)arg<<"抢到了"<<ticket<<"号票"<<std::endl;
ticket--;
}
else{
std::cout<<"票卖完了"<<std::endl;
pthread_mutex_unlock(&lock);
break;
}
pthread_mutex_unlock(&lock);
}
}
int main()
{
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&cond,nullptr);
pthread_t t1,t2,t3;
pthread_create(&t1,NULL,RunRoute,(void*)"thread t1");
pthread_create(&t2,NULL,RunRoute,(void*)"thread t2");
pthread_create(&t3,NULL,RunRoute,(void*)"thread t3");
while(true){
getchar();
pthread_cond_signal(&cond);
}
return 0;
}
看运行结果,t1、t2、t3线程轮流抢票。
pthread_cond_wait为什么需要互斥量
看上面的代码。在pthread_cond_wait函数最后一个参数是互斥量。 看代码,当一个线程申请到锁时,就开始执行pthread_cond_wait进行等待,在等待的过程中,该线程的锁会被释放,等线程被唤醒的时候,该线程的锁又会回到手上。
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作。调用解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。
- int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复成原样。
条件变量使用规范
pthread_mutex_lock(&lock);
while (条件为假)
pthread_cond_wait(&cond, &lock);
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&lock);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&lock);
6??生产者消费者模型
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
要实现生产者消费者模式。首先要保证: 三个关系:生产者和生产者(竞争关系,互斥关系)、消费者和消费者(竞争关系,互斥关系)、生产者和消费者(竞争关系(保证数据的正确性),同步关系(保证多线程协调))。 两种角色:生产者和消费者(特定的进程或线程)。 一个交易场所:通常指内存的一段缓冲区。
生产者消费者模型优点
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
C++ queue模拟阻塞队列的生产消费模型
- 为了便于理解,我以单生产者,单消费者,来进行讲解。
ProCon.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
#include<cstdlib>
#include<ctime>
#define NUM 6
template<class T>
class PC
{
public:
PC(int cap=NUM):_cap(cap)
{
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&_empty,nullptr);
pthread_cond_init(&_full,nullptr);
}
void Push(const T& in)
{
pthread_mutex_lock(&lock);
while(full())
{
pthread_cond_wait(&_full,&lock);
}
q.push(in);
if(q.size()>=_cap/2){
pthread_cond_signal(&_empty);
std::cout<<"消费者快来消费吧 ";
}
pthread_mutex_unlock(&lock);
}
void Pop(T& out)
{
pthread_mutex_lock(&lock);
while(empty())
{
pthread_cond_wait(&_empty,&lock);
}
out=q.front();
q.pop();
if(q.size()<=_cap/2){
pthread_cond_signal(&_full);
std::cout<<"生产者快来生产把 ";
}
pthread_mutex_unlock(&lock);
}
~PC()
{
_cap=0;
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
bool empty()
{
return q.empty();
}
bool full()
{
return q.size()==_cap;
}
private:
std::queue<T> q;
int _cap;
pthread_mutex_t lock;
pthread_cond_t _empty;
pthread_cond_t _full;
};
main.cc
#include"ProCon.hpp"
void* Pro(void* asg)
{
pthread_detach(pthread_self());
PC<int>* qc=(PC<int>*)asg;
while(true){
sleep(1);
int data=rand()%100+1;
qc->Push(data);
std::cout<<"生产者生产了:"<<data<<std::endl;
}
}
void* Con(void* asg)
{
pthread_detach(pthread_self());
PC<int>* qc=(PC<int>*)asg;
while(true){
sleep(2);
int data=0;
qc->Pop(data);
std::cout<<"消费者消费了:"<<data<<std::endl;
}
}
int main()
{
pthread_t producer,consumer;
srand((unsigned long )time(nullptr));
PC<int>* qc=new PC<int>();
pthread_create(&producer,nullptr,Pro,qc);
pthread_create(&consumer,nullptr,Con,qc);
while(true);
return 0;
}
使用生产者消费者模型模拟简单的加减乘除
生产者生产数据,消费者拿出数据并进行计算。
ProCon1.hpp,和上面的基本上是一样的
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
#include<cstdlib>
#include<ctime>
#define NUM 6
template<class T>
class PC
{
public:
PC(int cap=NUM):_cap(cap)
{
pthread_mutex_init(&lock,nullptr);
pthread_cond_init(&_empty,nullptr);
pthread_cond_init(&_full,nullptr);
}
void Push(const T& in)
{
pthread_mutex_lock(&lock);
while(full())
{
pthread_cond_wait(&_full,&lock);
}
q.push(in);
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&lock);
}
void Pop(T& out)
{
pthread_mutex_lock(&lock);
while(empty())
{
pthread_cond_wait(&_empty,&lock);
}
out=q.front();
q.pop();
pthread_cond_signal(&_full);
pthread_mutex_unlock(&lock);
}
~PC()
{
_cap=0;
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
bool empty()
{
return q.empty();
}
bool full()
{
return q.size()==_cap;
}
private:
std::queue<T> q;
int _cap;
pthread_mutex_t lock;
pthread_cond_t _empty;
pthread_cond_t _full;
};
fun.hpp
#include<iostream>
class computer
{
public:
computer(int _x,int _y,char _por):x(_x),y(_y),por(_por)
{}
computer(){}
int fun()
{
int result=0;
switch(por)
{
case '+':
result=x+y;
break;
case '-':
result=x-y;
break;
case '*':
result=x*y;
break;
case '/':
if(y==0){
std::cout<<"除0错误"<<std::endl;
return -1;
}
result=x/y;
break;
default:
break;
}
return result;
}
public:
int x;
int y;
char por;
};
main1.cc
#include"ProCon1.hpp"
#include"fun.hpp"
void* Pro(void* asg)
{
pthread_detach(pthread_self());
PC<computer>* qc=(PC<computer>*)asg;
char *por="+-*/";
while(true){
sleep(1);
int x=rand()%100+1;
int y=rand()%50;
int p=rand()%4;
computer su(x,y,por[p]);
qc->Push(su);
}
}
void* Con(void* asg)
{
pthread_detach(pthread_self());
PC<computer>* qc=(PC<computer>*)asg;
while(true){
sleep(1);
computer su;
qc->Pop(su);
int data=su.fun();
std::cout<<su.x<<su.por<<su.y<<"="<<data<<std::endl;
}
}
int main()
{
pthread_t producer,consumer;
srand((unsigned long )time(nullptr));
PC<computer>* qc=new PC<computer>();
pthread_create(&producer,nullptr,Pro,qc);
pthread_create(&consumer,nullptr,Con,qc);
while(true);
return 0;
}
看运行结果吧:
7??POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
使用信号量要包semaphore.h头文件
功能:初始化信号量
原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:sem:信号量
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
功能:销毁信号量
原型:int sem_destroy(sem_t *sem);
参数:sem:信号量
功能:等待信号量,会将信号量的值减1(P操作)
原型:int sem_wait(sem_t *sem);
参数:sem:信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1(V操作)
原型:int sem_post(sem_t *sem);
参数:sem:信号量
写个二元信号量来熟悉这些函数。二元信号量的使用类似锁。 二元信号量: value为1,当 value经过P操作变成0时,线程要等待 value又变成1。
#include<iostream>
#include<pthread.h>
#include<semaphore.h>
#include<unistd.h>
class Sem
{
public:
Sem()
{
sem_init(&sem,0,1);
}
~Sem()
{
sem_destroy(&sem);
}
void P()
{
sem_wait(&sem);
}
void V()
{
sem_post(&sem);
}
private:
sem_t sem;
};
Sem sem;
int tickets=10;
void* GetTickets(void* asg){
while(true){
sleep(1);
sem.P();
if(tickets>0){
std::cout<<(char*)asg<<"抢到了"<<tickets<<"号票"<<std::endl;
tickets--;
sem.V();
}
else{
break;
}
}
}
int main()
{
pthread_t t1,t2;
pthread_create(&t1,nullptr,GetTickets,(void *)"thread 1");
pthread_create(&t2,nullptr,GetTickets,(void *)"thread 2");
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
return 0;
}
8??基于环形队列的生产者消费者模式
在之前的抢票系统中,并没有对票这个全局变量进行管理。当我们想买其中的一号票时,我们就应该用信号量来管理。
在这个生产者消费者模型中的环形队列中要保证两个点:
1.生产者和消费者不能指向同一个位置 2. 生产者和消费者不能超过对方一圈。
Ring.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<semaphore.h>
#include<unistd.h>
#include<vector>
#include<stdint.h>
#include<ctime>
#define NUM 16
template<class T>
class RingQueue
{
private:
std::vector<T> q;
int _cap;
sem_t balk_sem;
sem_t data_sem;
int c_pos;
int p_pos;
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
public:
RingQueue(int cap=NUM):_cap(cap),c_pos(0),p_pos(0)
{
q.resize(cap);
sem_init(&balk_sem,0,cap);
sem_init(&data_sem,0,0);
}
void Push(const T& in)
{
P(balk_sem);
q[p_pos]=in;
p_pos++;
p_pos%=_cap;
V(data_sem);
}
void Pop(T& out)
{
P(data_sem);
out=q[c_pos];
c_pos++;
c_pos%=_cap;
V(balk_sem);
}
~RingQueue()
{
sem_destroy(&balk_sem);
sem_destroy(&data_sem);
}
};
main.cc
#include"Ring.hpp"
void* production(void* asg)
{
RingQueue<int>* q=(RingQueue<int>*)asg;
while(true)
{
sleep(1);
int data=rand()%100+1;
q->Push(data);
std::cout<<"生产者-》:"<<data<<std::endl;
}
}
void* consumption(void* asg)
{
RingQueue<int>* q=(RingQueue<int>*)asg;
while(true)
{
sleep(2);
int data;
q->Pop(data);
std::cout<<"消费者-》:"<<data<<std::endl;
}
}
int main()
{
RingQueue<int>* q=new RingQueue<int>();
srand((unsigned long)time(nullptr));
pthread_t producer,consumer;
pthread_create(&producer,nullptr,production,q);
pthread_create(&consumer,nullptr,consumption,q);
pthread_join(producer,nullptr);
pthread_join(consumer,nullptr);
return 0;
}
运行结果:
9??线程池
线程池:
- 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
- 线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技
术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 - 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
线程池的简单模拟实现:
threadPool.hpp
#include<iostream>
#include<queue>
#include<pthread.h>
using std::cout;
using std::endl;
#define THREAD_MAX 5
template<typename T>
class threadPool
{
private:
int thread_num;
std::queue<T> task_queue;
pthread_cond_t cond;
pthread_mutex_t lock;
public:
threadPool(int num=THREAD_MAX):thread_num(num)
{
pthread_cond_init(&cond,nullptr);
pthread_mutex_init(&lock,nullptr);
}
~threadPool()
{
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&lock);
}
void Push(T& in)
{
UpLock();
task_queue.push(in);
UnLock();
pthread_cond_signal(&cond);
}
void UpLock()
{
pthread_mutex_lock(&lock);
}
void UnLock()
{
pthread_mutex_unlock(&lock);
}
void Pop(T& ou)
{
while(task_queue.empty()){
pthread_cond_wait(&cond,&lock);
}
ou=task_queue.front();
task_queue.pop();
}
static void* Routin(void* arg)
{
threadPool<T>* ptr=(threadPool<T>*)arg;
while(true){
ptr->UpLock();
T t;
ptr->Pop(t);
ptr-> UnLock();
t.Run();
}
}
void PthreadInit()
{
pthread_t pt;
int i=0;
for(i=0;i<thread_num;i++){
pthread_create(&pt,nullptr,Routin,this);
pthread_detach(pt);
}
}
};
task.hpp
#include<iostream>
#include<pthread.h>
class task{
private:
int x;
int y;
char op;
public:
task(){}
task(int _x,int _y,char _op):x(_x),y(_y),op(_op)
{}
~task()
{}
void Run()
{
int z;
switch(op){
case '+':
z=x+y;
break;
case '-':
z=x-y;
break;
case '*':
z=x*y;
break;
case '/':
if(y==0) std::cout<<"除0错误"<<std::endl;
if(y!=0) z=x/y;
break;
case '%':
if(y==0) std::cout<<"模0错误"<<std::endl;
if(y!=0) z=x%y;
break;
default:
break;
}
std::cout<<"thred ["<<pthread_self()<<"]"<<x<<op<<y<<"="<<z<<std::endl;
}
};
main.cc
#include<cstdlib>
#include<ctime>
#include<unistd.h>
#include"threadpool.hpp"
#include"task.hpp"
int main()
{
threadPool<task>* pr=new threadPool<task>();
pr->PthreadInit();
srand((unsigned long)time(nullptr));
char* p="+-*/%";
while(true){
int x=rand()%100+1;
int y=rand()%100+1;
task t(x,y,p[x%4]);
pr->Push(t);
sleep(1);
}
return 0;
}
运行结果:
|