线程
创建进程,我们从0到有创建了很多东西,申请了很多资源。
进程:承担分配系统资源的基本实体。 线程:调度的基本单位,线程是进程里面的执行流(线程在进程的地址空间内运行)。 进程:线程 = 1:n
Linux中,没有真正意义上的线程,线程是用进程模拟的(轻量级进程)。 只有第一个进程执行时创建了地址空间等资源,其它进程都只是分享了同一个资源。显然分配资源比申请资源快。
- 在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列””
- 一切进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
线程的优点:
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
线程的缺点:
- 性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。 - 健壮性降低
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。 - 缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。 - 编程难度提高
编写与调试一个多线程程序比单线程程序困难得多
线程异常:
- 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
线程用途:
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
进程是资源分配的基本单位 线程是调度的基本单位
进程更强调独立性,但不是绝对独立,如fork、进程间通信。 线程更强调共享,但也有自己私有的。
线程拥有自己的一部分数据:私有栈、一组寄存器(硬件上下文)。 线程必须得私有栈,否则运行期间多个线程产生的临时数据可能会互相干扰。 多个线程之间可以有切换。
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:文件描述符表、每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)、当前工作目录、用户id和组id。
进程和线程的关系如下图:
如何看待之前学习的单进程? ------具有一个线程执行流的进程
线程控制
POSIX线程库
OS其实没有线程这个概念,也没有提供关于线程的接口,但Linux平台有一套第三方提供的多线程的库。
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
- 要使用这些函数库,要通过引入头文<pthread.h>
- 链接这些线程函数库时要使用编译器命令的“-lpthread”选项
创建线程
-pthread:链接pthread库
功能:创建一个新的线程
原型
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
1、thread:OS不知道thread(线程id),线程id是由库提供的,也是库来管理 2、attr:线程属性,我们不关心,用NULL表示默认,由库来自动管理。 3、start_routine:返回值为void*,参数为void*的函数指针。指针指向函数,以函数作为入口,执行进程代码块中的一块(可以理解成所有的进程都是由一个个代码块,即函数构成的)。 4、arg:线程启动后执行的就是start_routine函数,而arg就是传递给它的参数
错误检查: 传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误。 pthreads函数出错时不会设置全局变量errno(而大部分其他POSIX函数会这样做)。而是将错误代码通过返回值返回。 pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,建议通过返回值业判定,因为读取返回值要比读取线程内的errno变量的开销更小。
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void* arg)
{
while(1){
printf("%s, %lu, pid: %d\n", (char*)arg, pthread_self(), getpid());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void*)"thread 1");
while(1){
printf("main: %lu, pid: %d\n", pthread_self(), getpid());
sleep(2);
}
}
-L:显示当前系统当中的轻量级进程 线程调度时,OS看的id是LWP(light weight process)。
- pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事。
- 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
- pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。
- 线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID:
pthread_t pthread_self(void);
pthread_t到底是什么类型呢?取决于实现。对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。
所谓的线程id,其实就是线程在库当中的线程控制块中的起始位置(地址)。 pthread_t是无符号长整型(8字节),而64位系统地址一共8字节,刚好可以全部包括。 描述线程的信息在动态库中保存,即用户层;而执行的功能由底层的轻量级进程来完成。
pthread_self: 获取当前线程的线程id
线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
- 线程可以调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
pthread_exit函数
功能:线程终止
原型
void pthread_exit(void *value_ptr);
参数
value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
pthread_cancel函数
功能:取消一个执行中的线程
原型
int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void* arg)
{
while(1){
printf("%s, %lu, pid: %d\n", (char*)arg, pthread_self(), getpid());
sleep(5);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void*)"thread 1");
printf("main: %lu, pid: %d\n", pthread_self(), getpid());
sleep(10);
pthread_cancel(tid);
printf("new thread %lu be canceled!\n", tid);
void *ret = NULL;
pthread_join(tid, &ret);
W> printf("thread quit code : %d\n", (long long)ret);
return 0;
}
线程等待
为什么需要线程等待?
- 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
- 创建新的线程不会复用刚才退出线程的地址空间。
功能:等待线程结束
原型
int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:要等待的线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码
当一个新线程退出时它的返回值void* 被主线程读取,而主线程在读取这个void* 时就需要传入一个参数void **
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void* arg)
{
printf("%s, %lu, pid: %d\n", (char*)arg, pthread_self(), getpid());
sleep(5);
return (void*)10;
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void*)"thread 1");
printf("main: %lu, pid: %d\n", pthread_self(), getpid());
void *ret = NULL;
pthread_join(tid, &ret);
W> printf("thread quit code : %d\n", (long long)ret);
return 0;
}
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
关于线程的等待,我们默认线程是正常退出的。 如果线程崩溃了,就是进程该处理的问题了。 所以线程退出时,只要关心它的退出码是否成功,一旦错误就是进程的问题。信号是专门为进程设计的
分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
int pthread_detach(pthread_t thread);
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:
pthread_detach(pthread_self());
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void* arg)
{
pthread_detach(pthread_self());
while(1){
printf("%s, %lu, pid: %d\n", (char*)arg, pthread_self(), getpid());
sleep(1);
break;
}
return (void*)10;
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void*)"thread 1");
printf("main: %lu, pid: %d\n", pthread_self(), getpid());
sleep(10);
pthread_cancel(tid);
printf("new thread %lu be canceled!\n", tid);
void *ret = NULL;
pthread_join(tid, &ret);
W> printf("thread quit code : %d\n", (long long)ret);
return 0;
}
当线程被设置为分离状态,这个线程退出时,会自动释放自己的资源,退出码也不写入pcb,父线程也不再等待。 但如果被分离的线程异常退出,进程也还是会受到影响。
线程互斥
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
int a = 10;
void *thread_run(void* arg)
{
while(1){
printf("%s, %lu, pid: %d\n", (char*)arg, pthread_self(), getpid());
printf("%s global vaiable: %d, %p\n", (char*)arg, a, &a);
sleep(1);
}
return (void*)10;
}
int main()
{
pthread_t tid;
pthread_t tid1;
pthread_create(&tid, NULL, thread_run, (void*)"thread 0");
pthread_create(&tid1, NULL, thread_run, (void*)"thread 1");
printf("main: %lu, pid: %d\n", pthread_self(), getpid());
printf("new thread %lu be canceled!\n", tid);
printf("before: %s global vaiable: %d, %p\n", "main", a, &a);
sleep(10);
a = 100;
printf("after: %s global vaiable: %d, %p\n", "main", a, &a);
void *ret = NULL;
pthread_join(tid, &ret);
W> printf("thread quit code : %d\n", (long long)ret);
return 100;
}
三个线程能不能看到同一个全局变量? 可以。它们共享同一个进程地址空间,其下部有一个全局数据区,所有线程共享。
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
不是所有共享资源都是临界资源 访问临界资源的代码称为临界区
1、对临界区进行保护,所有的执行线程都必须遵守这个规则(编码)。 2、lock -> 访问临界区 -> unlock 3、所有的线程必须先看到同一把锁,锁本身就是临界资源。锁本身得先保证自身安全,申请锁的过程,不能有中间状态,也就是两态的,lock -> 原子性。 4、lock -> 访问临界区(花时间) -> unlock,在特定线程/进程拥有锁的时候,期间有新线程过来申请锁,一定申请不到的。新线程该如何?阻塞:将进程、线程对应的PCB投入到等待队列,unlock之后,进行进程、线程的唤醒操作。 5、该如何理解POSIX pthread中的mutex? struct mutex{ int lock; //0,1 wait_queue *head } 6、pthread_mutex_init() 初始化,把这把锁的值设置为1 /pthread_mutex_destroy() 销毁head /pthread_mutex_lock() 由1变0 /pthread_mutex_unlock() 由0变1 7、一次保证只有一个线程进入临界区,访问临界资源,就叫做互斥。 在临界区中的多行代码中,有可能线程时间片到了,当前线程被切换了,但这并不影响,因为该线程是带着锁走的。 8、加锁为什么一般效率比较低,或者影响效率? (1)原来并发的执行流变成了串行 (2)当当前线程被切换了,别的线程也不能访问。
互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题。
初始化互斥量有两种方法:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量 销毁互斥量需要注意:
- 使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用pthread_ lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
互斥量实现原理探究
- 由上面的例子可以看出,单纯的i++或者++i都不是原子的,有可能会有数据一致性问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下
每个线程都有自己的硬件上下文(TSS),其中包含各种寄存器信息。假设有1、2两个线程。move $0,%al,把0放进al寄存器,在第一条和第二条语句之间,线程1有可能被切走,所以线程1的TSS会把0这个值保存起来,CPU中的0这个值就废弃了。线程2进来后也执行第一条语句,往寄存器中写入0,如果线程2也被切走,它move进去的0也会被保存在它的TSS中,al中的值就又废弃了。接着线程1会把TSS中的值重新填入al中(恢复线程),继续执行第二条语句,把CPU中的值和内存中的mutex的值(即1)进行交换,CPU中的值就变为1,内存中的值变为0,此时线程1已经把锁拿到了。如果此时线程1又被切走,1这个值就会保存在线程1的TSS中,然后线程2恢复线程,执行第二条,但此时mutexd值已经是0了,不大于0,不能申请到锁,线程2就被挂起等待(放到等待队列中)。然后线程1回来,TSS中保存的1继续放入寄存器,执行if申请成功。
整个过程中,为1的mutex只有一份。exchange一条汇编完成了寄存器和内存数据的交换。 mutex_lock:原子的
抢票逻辑:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 10000;
pthread_mutex_t lock;
void *get_ticket(void *arg)
{
usleep(1000);
E> int num = (int)arg;
while(1){
pthread_mutex_lock(&lock);
if(ticket > 0){
usleep(1000);
printf("thread %d, get a ticket, no: %d\n", num, ticket);
ticket--;
pthread_mutex_unlock(&lock);
}
else{
pthread_mutex_unlock(&lock);
break;
}
}
}
int main()
{
int i = 0;
pthread_t tid[4];
pthread_mutex_init(&lock, NULL);
for(; i < 4; i++){
W> pthread_create(tid+i, NULL, get_ticket, (void*)i);
}
for(i = 0; i < 4; i++){
pthread_join(tid[i], NULL);
}
pthread_mutex_destroy(&lock);
return 0;
}
可重入VS线程安全
线程安全: 多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。 重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见的线程不安全的情况:
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况:
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不
- 可重入的方式使用全局数据结构可重入函数体内使用了静态的数据结构
常见可重入的情况:
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系:
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别:
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
锁
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁算法
线程同步
同步:在保证数据安全的情况下(一般使用加锁方式),让多个执行流按照特定的顺序进行临界资源的访问,称之为同步。
为什么要存在同步?多线程协同高效完成某些事情。
如何编码实现? 1.如果条件不满足,等待,释放锁。 2.通知机制。
cond 一个线程要在cond上等,一个要把通知信息发到cond
struct cond{ int value; wait_queue *head; }
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
条件变量函数 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数: cond:要在这个条件变量上等待
mutex:互斥量
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
为什么pthread_cond_wait 需要互斥量? 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
条件变量使用规范
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
pthread_mutex_t lock;
pthread_cond_t cond;
void *routine_r1(void *arg)
{
const char *name = (char*)arg;
while(1){
pthread_cond_wait(&cond, &lock);
printf("get cond, %s 活动....\n", name);
}
}
void *routine_r2(void *arg)
{
const char *name = (char*)arg;
while(1){
E> sleep(rand()%3+1);
pthread_cond_signal(&cond);
printf("%s signal done ... \n", name);
}
}
int main()
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&cond, NULL);
pthread_t t1, t2;
E> pthread_create(&t1, NULL, routine_r1, "thread1");
E> pthread_create(&t2, NULL, routine_r2, "thread2");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
return 0;
}
生产者消费者模型
321原则 3:3种关系,生vs生(互斥) 生vs消(同步) 消vs消(互斥) 2:两种角色,生产者和消费者 1:一个交易场所
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型
makefile:
main:main.cc
g++ $^ -o $@ -std=c++11 -lpthread
.PHONY:claen
clean:
rm -f main
BlockQueue.hpp:
#ifndef __QUEUE_BLOCK_H__
#define __QUEUE_BLOCK_H__
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
class Task{
public:
int x;
int y;
public:
Task()
{}
Task(int _x, int _y):x(_x), y(_y)
{}
int Run()
{
return x + y;
}
~Task()
{}
};
class BlockQueue{
private:
std::queue<Task> q;
size_t cap;
pthread_mutex_t lock;
pthread_cond_t c_cond;
pthread_cond_t p_cond;
public:
bool IsFull()
{
return q.size() >= cap;
}
bool IsEmpty()
{
return q.empty();
}
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
void WakeUpConsumer()
{
std::cout << "wake up consumer ... " << std::endl;
pthread_cond_signal(&c_cond);
}
void WakeUpProductor()
{
std::cout << "wake up productor ... " << std::endl;
pthread_cond_signal(&p_cond);
}
void ProducterWait()
{
std::cout << "productor wait ... " << std::endl;
pthread_cond_wait(&p_cond, &lock);
}
void ComsumerWait()
{
std::cout << "consumer wait ... " << std::endl;
pthread_cond_wait(&c_cond, &lock);
}
public:
BlockQueue(size_t _cap):cap(_cap)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&c_cond, nullptr);
pthread_cond_init(&p_cond, nullptr);
}
void Put(Task t)
{
LockQueue();
while(IsFull()){
WakeUpConsumer();
ProducterWait();
}
q.push(t);
UnlockQueue();
}
void Get(Task &t)
{
LockQueue();
while(IsEmpty()){
WakeUpProductor();
ComsumerWait();
}
t = q.front();
q.pop();
UnlockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&c_cond);
pthread_cond_destroy(&p_cond);
}
};
#endif
main.cc:
#include "BlockQueue.hpp"
using namespace std;
void *consumer_run(void *arg)
{
BlockQueue *bq = (BlockQueue*)arg;
while(true){
W> int n = 0;
Task t;
bq->Get(t);
cout << "consumer:" << t.x << "+" << t.y << "=" << t.Run() << endl;
}
}
void *productor_run(void *arg)
{
sleep(1);
BlockQueue *bq = (BlockQueue*)arg;
while(true){
int x = rand()%10 + 1;
int y = rand()%100 + 1;
Task t(x, y);
bq->Put(t);
cout << "product Task is : " << x << "+" << y << "= ?" << endl;
sleep(1);
}
}
int main()
{
BlockQueue *bq = new BlockQueue(5);
pthread_t c,p;
pthread_create(&c, nullptr, consumer_run, (void*)bq);
pthread_create(&c, nullptr, productor_run, (void*)bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
POSIX信号量
是什么? 信号量(信号灯)本质是一个计数器 描述临界资源中的有效个数的计数器 count = 5; count-- P(); 原子的 count++ V(); 原子的
为什么? 临界资源可以看成多份,不冲突,提高效率
怎么办(用)?
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
RingQueue.hpp:
#pragma once
#include <iostream>
#include <unistd.h>
#include <vector>
#include <semaphore.h>
#define NUM 10
class RingQueue{
private:
std::vector<int> v;
int max_cap;
sem_t sem_blank;
sem_t sem_data;
int c_index;
int p_index;
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
W> RingQueue(int _cap = NUM):max_cap(_cap),v(_cap)
{
sem_init(&sem_blank, 0, max_cap);
sem_init(&sem_data, 0, 0);
c_index = 0;
p_index = 0;
}
void Get(int &out)
{
P(sem_data);
out = v[c_index];
c_index++;
c_index %= max_cap;
V(sem_blank);
}
void Put(const int &in)
{
P(sem_blank);
v[p_index] = in;
p_index++;
p_index %= max_cap;
V(sem_data);
}
~RingQueue()
{
sem_destroy(&sem_blank);
sem_destroy(&sem_data);
c_index = 0;
p_index = 0;
}
};
main.cc:
#include "RingQueue.hpp"
void *comsumer(void *ring_queue)
{
RingQueue *rq = (RingQueue*)ring_queue;
while(true){
sleep(1);
int data = 0;
rq->Get(data);
std::cout << "consumer done ...# " << data << std::endl;
}
}
void *productor(void *ring_queue)
{
RingQueue *rq = (RingQueue*)ring_queue;
int count = 100;
while(true){
rq->Put(count);
count++;
if(count > 110){
count = 100;
}
std::cout << "productor done ..." << std::endl;
}
}
int main()
{
pthread_t c,p;
RingQueue *rq = new RingQueue();
pthread_create(&c, nullptr, comsumer, rq);
pthread_create(&p, nullptr, productor, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
}
线程池
线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
线程池的种类: 线程池示例:
- 创建固定数量线程池,循环从任务队列中获取任务对象,
- 获取到任务对象后,执行任务对象中的任务接口
ThreadPool.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <math.h>
#include <unistd.h>
#define NUM 5
class Task{
public:
int base;
public:
Task(){}
Task(int _b):base(_b)
{}
void Run()
{
std::cout << "thread id[" << pthread_self() << "]task run ... done: base# " << base << " pow is:#" << pow(base, 2) << std::endl;
}
~Task(){}
};
class ThreadPool{
private:
std::queue<Task*> q;
int max_num;
pthread_mutex_t lock;
pthread_cond_t cond;
bool quit;
public:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
bool IsEmpty()
{
return q.size() == 0;
}
void ThreadWait()
{
pthread_cond_wait(&cond, &lock);
}
void ThreadWakeUp()
{
pthread_cond_signal(&cond);
}
void ThreadsWakeUp()
{
pthread_cond_broadcast(&cond);
}
public:
ThreadPool(int _max = NUM):max_num(_max),quit(false)
{
}
static void *Routine(void *arg)
{
ThreadPool *this_p = (ThreadPool*)arg;
E> while(!quit){
this_p->LockQueue();
E> while(i!quit && this_p->IsEmpty()){
this_p->ThreadWait();
}
Task t;
E> if(!quit && !this_p->IsEmpty()){
this_p->Get(t);
}
this_p->UnlockQueue();
t.Run();
}
}
void ThreadPoolInit()
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t t;
for(int i = 0; i < max_num; i++){
pthread_create(&t, nullptr, Routine, this);
}
}
void Put(Task &in)
{
LockQueue();
q.push(&in);
UnlockQueue();
ThreadWakeUp();
}
void Get(Task &out)
{
Task *t = q.front();
q.pop();
out = *t;
}
void ThreadQuit()
{
if(!IsEmpty()){
std::cout << "task queue is not empty()" << std::endl;
return;
}
quit = true;
ThreadsWakeUp();
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
};
main.cc:
#include "ThreadPool.hpp"
int main()
{
ThreadPool *tp = new ThreadPool();
tp->ThreadPoolInit();
int count = 20;
while(count){
int x = rand()%10 + 1;
Task t(x);
tp->Put(t);
sleep(1);
count--;
}
tp->ThreadQuit();
return 0;
}
线程池存在的价值: 1.有任务,立马有线程进行服务,省掉了线程创建的时间。 2.有效防止,server中线程过多,导致系统过载的问题。
|