1. 线程概念
1.1 什么是线程
??在操作系统中,如果我们执行了某一应用程序,那么操作系统就会对这个应用程序创建一系列的资源以用来让这个程序在操作系统中运行起来。而整个创建过程以及创建成功后所产生的资源,我们将其称为一个进程。 ??所以说,进程是操作系统分配资源的基本单位。而线程通俗来讲就是一个进程中一个执行流。这里以串行与并行下载文件举例,如果我们使用串行的方式去下载多个文件,那么得到的结果是,将这些文件逐个按个的下载,即上一个下载完成之后才会下载接下来的文件。如果使用并行的方式下载,那么这些文件就会一次同时下载多个文件,而不是等待上一个下载完后才继续下载接下来的,大大的提高了下载效率。 ??通过上述例子,可以看出一个进程中可以同时执行多段程序代码片段,而这种同时有多个程序片段在执行就称为多线程。而其中的每一个执行流就被称为一个线程。
1.2 从操作系统看线程
我们先来看一看进程是如何组织的。
线程又是如何组织的?
我们又称线程位轻量级进程(LWP),因为在linux内核中并没有描述线程的结构体,线程与进程都使用struct task_struct{...} ,所以在线程中pid 被称为线程号,tgid 被称为线程组id,对标进程id。 线程是操作系统调度的基本单位,进程是操作系统分配资源的基本单位
轻量级线程在哪里体现轻量级? ??在创建进程时,需要对该进程分配一系列的资源,资源如上图所示,但是在创建线程时,就不需要开辟那些资源,与进程共用虚拟地址空间,大大减少了开销,但线程也有自身独立的空间线程号,栈,errno,信号屏蔽字,寄存器,调度优先级 。共享空间有:文件描述符、信号处理方式、当前的工作目录、用户id与组id 。
1.3 线程的分类
线程分为主线程与工作线程。 主线程
工作线程
- 同一个进程中的线程的线程组是相同的,标识是同一个进程
- 但pid不同,标识不同的进程
注意:多线程在工作执行时,也是抢占式执行的,所有当有很多进程同时在执行时,cpu采用时间片轮转的方式,轮流执行所有进程。
1.4 线程的优缺点
优点
- 创建线程的代价要小于创建进程
- 与进程切换相比,线程之间的切换需要操作系统做的工作很少,线程切换只需要交换上下文信息即可
- 线程占用的资源比进程少很多
- 可以充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,系统可以执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作
缺点
- 性能损失
如果一个进程中的多个线程在频繁的进行切换,则程序的运行效率会降低,因为性能损失在了线程切换当中。进程的执行效率,随着线程数量的增多,性能呈现出正态分布的状况。 - 代码健壮性降低
一个线程的崩溃,会导致整个进程的崩溃 - 缺乏访问控制
多个线程在访问同一个变量时可能会导致程序结果的二义性
2. 线程控制
2.1 线程创建
函数接口
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
代码测试
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
struct ThreadNum
{
int thread_num_;
};
void* MyThreadStrat(void* arg)
{
struct ThreadNum* tn = (struct ThreadNum*)arg;
while(1)
{
printf("MyThreadStrat: %d\n", tn->thread_num_);
sleep(1);
}
delete tn;
return NULL;
}
int main()
{
pthread_t tid;
for(int i = 0; i < 2; i++)
{
struct ThreadNum* tn = new ThreadNum;
if(tn == NULL)
{
exit(1);
}
tn->thread_num_ = i;
int ret = pthread_create(&tid, NULL, MyThreadStrat, (void*)tn);
if(ret != 0)
{
perror("pthread_create");
return 0;
}
}
while(1)
{
printf("i am main thread\n");
sleep(1);
}
return 0;
}
注意:
- 线程入口函数的参数最好不传递临时变量,因为临时变量在创建完毕后会销毁,而线程中的arg指针就变成了野指针。尽量传递堆上开辟空间。
- 线程入口函数的参数如果传递的为堆上开辟的空间,则释放时是在线程不去使用这块空间的条件下释放。
- 线程入口函数的参数不仅可以传递内置类型还可以传递自定义类型。
2.2 线程终止
1. 线程入口函数的return返回,当前线程也进行了退出
2. 函数退出
函数一:
void pthread_exit(void* retval);
函数二:
int pthread_canael(pthread_t thread);
pthread_t pthread_self(void);
代码测试
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
void* MyThreadStrat(void* arg)
{
pthread_exit(NULL);
pthread_cancel(pthread_self());
printf("MyThreadStrat :%s\n", (char*)arg);
return NULL;
}
int main()
{
pthread_t tid;
for(int i = 0; i < 2; i++)
{
int ret = pthread_create(&tid, NULL, MyThreadStrat, NULL);
if(ret != 0)
{
perror("pthread_create");
return 0;
}
}
while(1)
{
printf("i am main thread\n");
sleep(1);
}
return 0;
}
注意:
默认创建线程时,线程的属性时joinable属性,joinable会导致线程在退出时,需要别人来回收自己的退出资源(即线程退出了,但是线程在共享区当中的空间还没有释放)。所以就需要线程等待或者线程分离,来解决当前问题。
2.3 线程等待
已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。创建新的线程不会复用刚才退出线程的地址空间。
int pthread_join(pthread_t thread, void **retval);
代码测试
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
void *thread1(void *arg)
{
printf("thread 1 returning ... \n");
int *p = (int*)malloc(sizeof(int));
*p = 1;
return (void*)p;
}
void *thread2(void *arg)
{
printf("thread 2 exiting ...\n");
int *p = (int*)malloc(sizeof(int));
*p = 2;
pthread_exit((void*)p);
}
void *thread3(void *arg)
{
while ( 1)
{
printf("thread 3 is running ...\n");
sleep(1);
}
return NULL;
}
int main()
{
pthread_t tid;
void *ret;
pthread_create(&tid, NULL, thread1, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int*)ret);
free(ret);
pthread_create(&tid, NULL, thread2, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int*)ret);
free(ret);
pthread_create(&tid, NULL, thread3, NULL);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &ret);
if ( ret == PTHREAD_CANCELED )
printf("thread return, thread id %X, return code:PTHREAD_CANCELED\n", tid);
else
printf("thread return, thread id %X, return code:NULL\n", tid);
return 0;
}
现象
2.4 线程分离
一个线程被设置为分离属性,则该线程在退出之后,不需要其他执行流回收该进程的资源,而是由操作系统统一回收。
函数接口
int pthread_detach(pthread_t thread)
代码实现
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
void* MyThreadStrat(void* arg)
{
(void)arg;
pthread_detach(pthread_self());
while(1)
{
printf("i am MyThreadStrat\n");
pthread_cancel(pthread_self());
sleep(1);
}
sleep(20);
return NULL;
}
int main()
{
pthread_t tid;
int ret = pthread_create(&tid, NULL, MyThreadStrat, NULL);
if(ret != 0)
{
perror("pthread_create");
return 0;
}
while(1)
{
printf("i am main thread\n");
sleep(1);
}
return 0;
}
执行一遍 函数内部数据就进行退出,退出时不需要手动释放资源,而是由操作系统进行资源的释放。
3. 线程安全
3.1 线程不安全的现象
以下大致模拟了一个黄牛抢票的系统,使用4个线程同时去抢1000张票,我们的预期结果是,4个线程没人拿到的票都不相同,不会拿到同一张票。但是:
代码
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define THREADNUM 4
int g_val = 100;
void* buyTicket(void* arg)
{
while (1)
{
if (g_val > 0)
{
printf("%p:have ticket %d\n", pthread_self(), g_val);
g_val--;
}
else
{
break;
}
}
return NULL;
}
int main()
{
pthread_t tid[THREADNUM];
for (int i = 0; i < THREADNUM; i++)
{
int ret = pthread_create(&tid[i], NULL, buyTicket, NULL);
if (ret < 0)
{
perror("pthread_create");
return 0;
}
}
for (int i = 0; i < THREADNUM; i++)
{
pthread_join(tid[i], NULL);
}
return 0;
}
如上就是我们说的线程不安全的现象
当多个线程访问同一个资源时,这个资源不能够保持原子性,就有可能发生结果错误。
总结
假设有一个cpu,两个线程A和B,线程AB都想对全局变量g_i 做++ 操作。假设线程A先运行,但是线程A将g_i 读取到寄存器后,A的时间片使用完了被线程切换了。但是A没有对g_i 的值修改完成,而是被线程B读取修改完成了,等到线程切换回来之后,线程A还是对原本的g_i 修改,不是对线程B修改完后的值进行修改,两个线程都进行了++操作,但是结果不符合预期,所以造成了结果的错误。
3.1 如何解决–互斥锁
??上述黄牛抢票的问题,如果我们给上述的4个黄牛,只给一个特殊的令牌。只有抢到这个令牌之后,才有资格买票。但是买完票之后必须把令牌交出来,4个人再次公平竞争,抢到令牌的才可以去买票。这样就保证了每个人买的票都是唯一的,不会出现多人买一张票。我们将上述的令牌就叫做互斥锁。
3.1.1 互斥锁原理
??互斥锁保证多个执行流在访问同一个临界资源时,其操作时原子性的。
名词解释
- 执行流: 线程
- 临界资源: 多个线程都能访问到的资源
- 临界区: 访问临界资源的代码区被称为临界区
- 原子操作: 要么执行流还没有开始执行临界区代码,要么已经执行完毕临界区代码
原理 ??互斥锁的底层是一个互斥量,互斥量的本质时一个计数器,该计数器的取值只能为0或者1。0代表不能获取互斥锁,1标识可以获取互斥锁 。
加锁时原理
如何保证我们拿锁的这个过程是原子性操作?
??为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 ??在加锁之前,申请一个寄存器,寄存器中放入0,直接入内存中的值进行交换,交换后寄存器值有两种结果: 寄存器中为1: 加锁成功;寄存器中为0: 加锁失败 。
解锁时原理 直接将寄存器中的值置为1,不关心内存的值,直接交换完成就是解锁的过程。
3.1.2 互斥锁接口
1. 初始化接口
1.动态初始化: 必须手动销毁否则内存泄露
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
2.静态初始化: 系统自动回收
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2. 销毁接口
int pthread_mutex_destroy(pthread_mutex_t *mutex);
3. 加锁接口
阻塞加锁: 如果没加上锁就一直加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
非阻塞加锁: 需要搭配循环判断返回值
int pthread_mutex_trylock(pthread_mutex_t *mutex);
带有超时时间的加锁接口
int pthread_mutex_timelock(pthread_mutex_t* restrict mutex,
const struct timspec* restrict abs_timeout);
注意: 加锁位置一定要放在访问临界资源之前
4. 解锁接口
以上三种加锁的方式,都可以使用该接口解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
注意: 在所有可能导致线程退出的地方进行解锁,否则可能造成死锁得情况
黄牛抢票改良
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define THREADNUM 4
int g_val = 100;
pthread_mutex_t g_lock;
void* buyTicket(void* arg)
{
while (1)
{
pthread_mutex_lock(&g_lock);
if (g_val > 0)
{
printf("%p:have ticket %d\n", pthread_self(), g_val);
g_val--;
}
else
{
pthread_mutex_unlock(&g_lock);
break;
}
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
int main()
{
pthread_mutex_init(&g_lock, NULL);
pthread_t tid[THREADNUM];
for (int i = 0; i < THREADNUM; i++)
{
int ret = pthread_create(&tid[i], NULL, buyTicket, NULL);
if (ret < 0)
{
perror("pthread_create");
return 0;
}
}
for (int i = 0; i < THREADNUM; i++)
{
pthread_join(tid[i], NULL);
}
pthread_mutex_destroy(&g_lock);
return 0;
}
程序运行之后,发现多人抢到一张票得情况消失。
3.2 死锁
3.2.1 死锁的现象
1.如果执行流加载完毕之后不进行解锁操作则会造成死锁现象
void* buyTicket(void* arg)
{
while (1)
{
pthread_mutex_lock(&g_lock);
if (g_val > 0)
{
printf("%p:have ticket %d\n", pthread_self(), g_val);
g_val--;
}
else
{
break;
}
}
return NULL;
}
上述代码会不会造成死锁现象,答案是会的,如果一开始这个进程拿到了锁。但是这个程序在循环结束后没有释放锁,并且在下一次循环还要去申请锁,就会导致阻塞在一直拿锁得这个地方,这就是所谓的死锁。
2.线程A获取到了1锁,线程B获取到了2锁,同时线程A还想获取1锁,线程B还想获取2锁。
pthread_mutex_t g_lock1;
pthread_mutex_t g_lock2;
void* MyThreadStartA(void* arg)
{
pthread_mutex_lock(&g_lock1);
sleep(3);
pthread_mutex_lock(&g_lock2);
return NULL;
}
void* MyThreadStartB(void* arg)
{
pthread_mutex_lock(&g_lock2);
sleep(3);
pthread_mutex_lock(&g_lock1);
return NULL;
}
这个表现得是两不相让,我有锁但是我不释放,你就是拿不走,互相制约就造成了死锁。
3.2.2 死锁的必要条件
- 互斥条件:一个执行流获取了互斥锁之后,其他执行流不能获取该锁
- 不可剥夺:A执行流拿着互斥锁,其他执行不能释放
- 循环等待:多个执行流各自拿着对方想要的锁,并且各个执行流还去请求对方的锁
- 请求与保持:拿着一把锁还去申请别的锁
3.2.3 避免产生死锁的方法
- 破环死锁的必要条件:循环等待,请求与保持
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
3.2.3 gdb调试排查死锁位置
1. 发生了死锁
2.查找进程号 ps aux 命令 3.调试正在死锁的进程gdb attach [pid] 命令 4. 查看所有线程的堆栈 thread apply all bt 命令 5. 进入线程的堆栈t 堆栈号 命令 6. 查看当前堆栈内容,并进入自定义函数内部 7. 打印进程锁变量查看具体信息 ower 字段标识进程锁目前在那个线程手中。
如上就是查看互斥锁在哪个位置发生了死锁。
3.3 线程同步
??线程同步指的是,在保证数据安全的情况下(互斥锁),让多个执行流按照特定的顺序进行临界区资源的访问,称之为同步。
??为什么要让多个执行流按照特定的顺序进行临界区的资源访问呢?
这里我们举一个取盘子与放盘子的例子,规定放盘子时要保证桌面上是空的没有盘子,取盘子时桌子上有且只有一个盘子。 ??因为盘子属于临界区资源,如果我们要取或者放时,必须保证这个操作是原子性的,所以在取和放动作之前都要进行加锁。但是对这个操作加锁了之后就能够完全保证桌面上只会出现两种情况吗?答案是不能的。因为取和放这两个操作是同步执行的,依靠两个线程的竞争能力去竞争那把锁,如果有放盘子线程的竞争能力特别强,那么就会出现桌面上的盘子已经放了高高一沓了,但是取盘子却一直都取不到盘子。这种结果显然不是我们想看到的,所以在这里用到了线程同步,让多个线程按照顺序去访问。 ??比如说,桌面上没有盘子时去通知放盘子的线程向桌面上放盘子,当桌面上有一个盘子时,放盘子的动作就应该停止,让取盘子的动作去取桌面上的盘子,这样就可以看到我们想看到结果。
3.3.1 条件变量
条件变量的本质是一个PCB等待队列,在该队列中存放的是线程或者进程的PCB。如果有多个线程,则将这些线程放入该等待队列,这些队列按照队列的方式顺序的出入队。
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
3.3.1 线程同步的接口函数
1. 初始化接口
动态初始化: 搭配销毁函数使用
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2. 销毁接口
int pthread_cond_destroy(pthread_cond_t *cond);
3. 等待接口
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
注意
- 为什么等待接口需要传入互斥锁变量?
??在等待函数内部,需要对互斥锁进行释放,释放之后其它线程才可以拿到锁正常的进行其它操作。当函数调用返回时,返回到了临界区内,所以该函数又会让线程重新持有该锁。 ??eg: 我们在取盘子时,如果桌面上没有盘子,则无法取盘子要通知放盘子的动作取放盘子,但是放盘子的动作是需要访问临界资源的,现在的锁在取盘子的线程中,所以放盘子拿不到锁就无法向桌面上放盘子,线程就进入了死锁状态。因此我们需要将取盘子线程放入等待队列时,把锁释放掉,放盘子就可以拿到锁,进而正常操作。
4. 唤醒接口
唤醒一个线程
int pthread_cond_signal(pthread_cond_t *cond);
唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);
代码示例
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
int g_val = 1;
pthread_mutex_t g_lock;
pthread_cond_t g_cond;
void* Product(void* arg)
{
while (true)
{
pthread_mutex_lock(&g_lock);
while (g_val >= 1)
{
pthread_cond_wait(&g_cond, &g_lock);
}
g_val++;
printf("Product done..:%d\n", g_val);
pthread_mutex_unlock(&g_lock);
pthread_cond_signal(&g_cond);
}
}
void* Consume(void* arg)
{
while (true)
{
pthread_mutex_lock(&g_lock);
while (g_val <= 0)
{
pthread_cond_wait(&g_cond, &g_lock);
}
g_val--;
printf("Consume done..: %d\n", g_val);
pthread_mutex_unlock(&g_lock);
pthread_cond_signal(&g_cond);
}
}
int main()
{
pthread_t pt, ct;
pthread_mutex_init(&g_lock, NULL);
pthread_cond_init(&g_cond, NULL);
pthread_create(&pt, NULL, Product, NULL);
pthread_create(&ct, NULL, Consume, NULL);
pthread_join(pt, NULL);
pthread_join(ct, NULL);
return 0;
}
3.3.2 注意
如果有多个生产消费者,那么只用一个条件变量够吗?
上述代码中,只有单消费者,单生产者,如果有多个生产消费者,那么我们只有一个条件变量是不可以的。所有生产消费者都使用一个等待队列,那么一定会造成阻塞,即所有生产消费者全部都阻塞在一个等待队列中。
解释 如果所有生产消费者使用一个等待队列,那么它们一定会同时去抢同一个锁,谁有了锁谁就有了操作临界区资源的权力。 用上面放盘子的例子举例,如果开始桌面上没有盘子,那么取盘子的线程1拿到了锁,可以取盘子,但是没有盘子,取盘子线程1就会进入阻塞队列并且通知放盘子的人取放盘子。放盘子线程3抢到了锁,就会在桌面上放盘子,并且把锁放出来。如果放盘子线程4抢到了锁,放盘子4就会进入阻塞队列并且释放锁。那么这个时候如果放盘子3又抢到了锁,看到桌面上有盘子,它也被放入了阻塞队列。这个时候外面就只剩一个取盘子线程2了,这个线程一定会拿到锁,并且把桌面上的盘子取出来,取完之后,取盘子线程1会从函数调用返回,并且和线程2一起在阻塞队列外准备抢锁。这个时候取盘子线程2拿到了锁,一看桌子上没盘子了,进等待队列。锁就到了线程1手里,同样也进入了等待队列。那么这个时候,4个线程全部都在等待队列中,没有人去通知了。那么4个线程一直等待。程序死锁。
如何解决
生产者与消费者一定要有两个不同的等待队列,去相互唤醒才不会造成上述的局面。
改正后代码
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
int g_val = 1;
pthread_mutex_t g_lock;
pthread_cond_t p_cond;
pthread_cond_t c_cond;
void* Product(void* arg)
{
while (true)
{
pthread_mutex_lock(&g_lock);
while (g_val >= 1)
{
pthread_cond_wait(&p_cond, &g_lock);
}
g_val++;
printf("Product done..:%d\n", g_val);
pthread_mutex_unlock(&g_lock);
pthread_cond_signal(&c_cond);
}
}
void* Consume(void* arg)
{
while (true)
{
pthread_mutex_lock(&g_lock);
while (g_val <= 0)
{
pthread_cond_wait(&c_cond, &g_lock);
}
g_val--;
printf("Consume done..: %d\n", g_val);
pthread_mutex_unlock(&g_lock);
pthread_cond_signal(&p_cond);
}
}
int main()
{
pthread_t pt, ct;
pthread_mutex_init(&g_lock, NULL);
pthread_cond_init(&p_cond, NULL);
pthread_cond_init(&c_cond, NULL);
pthread_create(&pt, NULL, Product, NULL);
pthread_create(&ct, NULL, Consume, NULL);
pthread_join(pt, NULL);
pthread_join(ct, NULL);
pthread_mutex_destroy(&g_lock);
pthread_cond_destroy(&p_cond);
pthread_cond_destroy(&c_cond);
return 0;
}
4. 生产者与消费者模型
为何要使用生产者消费者模型?
??生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
例如:我们在超市购物,购买商品都是从超市中购买,而不是从每个商品的生产厂家去购买,如果我们从生产厂家直接去购买,就会非常麻烦,每个商品都有很多生产厂家,而且又有很多商品。如果我们用一个中间媒介,将生产厂家的物品全部放入这个媒介中,顾客在这个媒介就可以随意的挑选很多生产厂家其中的一个物品,大大提高了顾客的便利性,也让顾客与厂家的关系没有这么强。
关系 生产者消费者模型,本质上就是在一块内存空间中,多个线程负责生产,多个线程负责消费。
其中生产者与消费者的关系是:
- 生产者与生产者: 互斥关系,任何时间只允许一个操作临时变量
- 生产者与消费者: 同步关系
- 消费者与消费者: 互斥关系
使用该模型的代码实现
main函数
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
#include "BlockQueue.hpp"
void* Consume(void* arg)
{
BlockQueue* bq = (BlockQueue*)arg;
int data;
while (true)
{
bq->PopData(data);
cout << "consume: " << data << endl;
sleep(1);
}
}
void* Product(void* arg)
{
BlockQueue* bq = (BlockQueue*)arg;
int data;
while (true)
{
data = rand() % 5;
bq->PushData(data);
cout << "product: " << data << endl;
}
}
int main()
{
BlockQueue* bq = new BlockQueue;
pthread_t c, p;
pthread_create(&c, NULL, Consume, (void*)bq);
pthread_create(&p, NULL, Product, (void*)bq);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
#pragma once
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
using namespace std;
#define NUM 5
class BlockQueue
{
public:
BlockQueue(int cap = NUM)
: _cap(cap)
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&c_cond, NULL);
pthread_cond_init(&p_cond, NULL);
}
void PushData(const int& data)
{
LockQueue();
while (IsFull())
{
NotifyConsume();
ProductWait();
std::cout << "queue full, NotifyConsume" << std::endl;
}
q.push(data);
NotifyConsume();
UnlockQueue();
}
void PopData(int& data)
{
LockQueue();
while (IsEmpty())
{
NotifyProduct();
ConsumeWait();
std::cout << "queue empty, NotifyProduct" << std::endl;
}
data = q.front();
q.pop();
NotifyConsume();
UnlockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&c_cond);
pthread_cond_destroy(&p_cond);
}
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductWait()
{
pthread_cond_wait(&p_cond, &lock);
}
void ConsumeWait()
{
pthread_cond_wait(&c_cond, &lock);
}
void NotifyProduct()
{
pthread_cond_signal(&p_cond);
}
void NotifyConsume()
{
pthread_cond_signal(&c_cond);
}
bool IsEmpty()
{
return q.empty();
}
bool IsFull()
{
return q.size() >= _cap;
}
private:
std::queue<int> q;
int _cap;
pthread_mutex_t lock;
pthread_cond_t p_cond;
pthread_cond_t c_cond;
};
5. 信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。这里我们介绍的信号量为POSIX标准 下的。
5.1 信号量本质
??信号量的本质其实是一个计数器+PCB等待队列共同构成,其中计数器是描述临界资源有效个数的一个计数器。其中计数器的操作一定是原子性的。当多个线程想要获取信号量时,都会对信号量中的资源计数器进行-1操作。 ??在信号量中,我们不用过多关心其中的原子性如何保持的,在接口内部已经将这种机制实现了。
资源计数器
> 0: 表示还有多少资源可以使用= 0: 表示没有资源可以被使用< 0: 表示有多少线程在等待资源
5.2 信号量的同步与互斥
互斥
初始化信号量的资源计数器为1 ,表示当前只有一个资源可以被使用。意味着只有同一个线程在同一时刻可以获取到信号量。
同步
资源计数器在初始化时为当前的资源数量,因为在对资源使用时,接口会自动管理当前计数器的数值。
5.3 信号量的接口函数
1. 初始化接口
int sem_init(sem_t *sem, int pshared, usigned int value);
参数:
sem: 传入待要初始化的信号量
pshard: 表示信号量用于线程间还是进程间
0 线程间 非0 进程间
value: 表示初始化的资源数量
2. 销毁接口
int sem_destroy(sem_t* sem);
3. 等待接口
调用该接口函数,会对资源计数器执行-1操作
int sem_wait(sem_t* sem);
4. 发布接口
调用该接口,会对资源计数器进行+1操作
int sem_post(sem_t* sem);
5.4 基于信号量完成生产者消费模型
此示例中使用循环队列存放任务
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
class RingQueue
{
public:
RingQueue(int cap = 10)
: _cap(cap)
, q(cap)
{
sem_init(&data_sem, 0, 0);
sem_init(&blank_sem, 0, cap);
sem_init(&lock_sem, 0, 1);
c_index = 0;
p_index = 0;
}
void Put(int& data)
{
P(data_sem);
sem_wait(&lock_sem);
q[c_index] = data;
c_index++;
c_index = c_index % _cap;
sem_post(&lock_sem);
V(blank_sem);
}
void Get(int& data)
{
P(blank_sem);
sem_wait(&lock_sem);
data = q[p_index];
p_index++;
p_index = p_index % _cap;
sem_post(&lock_sem);
V(data_sem);
}
~RingQueue()
{
sem_destroy(&data_sem);
sem_destroy(&blank_sem);
}
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
private:
std::vector<int> q;
int _cap;
sem_t data_sem;
sem_t blank_sem;
sem_t lock_sem;
int c_index;
int p_index;
};
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
#include "RingQueue.hpp"
#define THREADNUM 4
void* Consume(void* arg)
{
RingQueue* rq = (RingQueue*)arg;
int data = 0;
while (true)
{
rq->Get(data);
cout << "consume:" << data << endl;
sleep(1);
}
}
void* Product(void* arg)
{
RingQueue* rq = (RingQueue*)arg;
int data = 0;
while (true)
{
rq->Put(data);
data++;
cout << "product finish " << endl;
if (data == 10)
{
data = 0;
}
}
}
int main()
{
pthread_t c[THREADNUM], p[THREADNUM];
RingQueue* rq = new RingQueue;
for (int i = 0; i < THREADNUM; i++)
{
pthread_create(&c[i], NULL, Consume, rq);
pthread_create(&p[i], NULL, Product, rq);
}
for (int i = 0; i < THREADNUM; i++)
{
pthread_join(c[i], NULL);
pthread_join(p[i], NULL);
}
delete rq;
return 0;
}
6. 线程池
6.1 概念
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
6.2 应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程
可能使内存到达极限,出现错误.
6.3 线程池的代码实现
线程池代码
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <math.h>
#include <unistd.h>
class Task
{
private:
int base;
public:
Task()
{}
Task(int _b)
: base(_b)
{}
void Run()
{
std::cout << "pthread[" << pthread_self() << "]task run... done: base: " << base << " pow is: " << pow(base, 2) <<std::endl;
}
~Task()
{
}
};
class ThreadPool
{
private:
std::queue<Task*> q;
int max_num;
pthread_mutex_t lock;
pthread_cond_t cond;
public:
bool quitFlag;
public:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
bool IsEmpty()
{
return q.size() == 0;
}
void ThreadWait()
{
pthread_cond_wait(&cond, &lock);
}
void ThreadWakeUp()
{
pthread_cond_signal(&cond);
}
void ThreadsWakeUp()
{
pthread_cond_broadcast(&cond);
}
public:
ThreadPool(int max = 5)
: max_num(max)
, quitFlag(false)
{}
static void* Routine(void* arg)
{
ThreadPool* this_pool = (ThreadPool*)arg;
while (!this_pool->quitFlag)
{
this_pool->LockQueue();
while (!this_pool->quitFlag && this_pool->IsEmpty())
{
this_pool->ThreadWait();
}
Task t;
if (!this_pool->quitFlag && !this_pool->IsEmpty())
{
this_pool->Get(t);
}
this_pool->UnlockQueue();
if(!this_pool->quitFlag && this_pool->IsEmpty())
{
t.Run();
}
}
}
void ThreadPoolInit()
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t t;
for (int i = 0; i < max_num; i++)
{
pthread_create(&t, nullptr, Routine, this);
}
}
void Push(Task &in)
{
LockQueue();
q.push(&in);
UnlockQueue();
ThreadWakeUp();
}
void Get(Task &out)
{
Task* t = q.front();
q.pop();
out = *t;
}
void ThreadQuit()
{
if (!IsEmpty())
{
std::cout << "task queue is not IsEmpty" << std::endl;
return;
}
quitFlag = true;
ThreadsWakeUp();
pthread_exit(NULL);
}
~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
};
测试代码
#include "ThreadPool.hpp"
int main()
{
ThreadPool *tp = new ThreadPool;
tp->ThreadPoolInit();
int count = 20;
while(count > 0)
{
int data = rand() % 10 + 1;
Task t(data);
tp->Push(t);
sleep(1);
count--;
}
tp->ThreadQuit();
return 0;
}
|