IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> [Linux]多线程 -> 正文阅读

[系统运维][Linux]多线程

●🧑个人主页:你帅你先说.
●📃欢迎点赞👍关注💡收藏💖
●📖既选择了远方,便只顾风雨兼程。
●🤟欢迎大家有问题随时私信我!
●🧐版权:本文由[你帅你先说.]原创,CSDN首发,侵权必究。

1. Linux线程概念

  • 在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
  • 一切进程至少都有一个执行线程。
  • 线程在进程内部运行,本质是在进程地址空间内运行。
  • 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化。
  • 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流。

??线程的优点

  1. 创建一个新线程的代价要比创建一个新进程小得多
  2. 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
  3. 线程占用的资源要比进程少很多
  4. 能充分利用多处理器的可并行数量
  5. 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务。
  6. 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现。(加密,大数据运算等)
  7. I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。(网络下载、云盘、ssh、在线直播、看电影)

??线程的缺点

  1. 性能损失
    一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
  2. 健壮性降低
    编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
  3. 缺乏访问控制
    进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
  4. 编程难度提高
    编写与调试一个多线程程序比单线程程序困难得多

值得注意的是Linux在设计线程时,并没有单独设计一个结构体为线程,而是用进程的pcb来模拟线程。这样的好处是不用维护复杂的进程和线程的关系,不用单独为线程设计任何算法,直接使用进程的一套相关的方法。OS只需要聚焦在线程间的资源分配上就可以了

进程是资源分配的基本单位。
线程是CPU调度的基本单位,承担进程资源的一部分的基本实体。
线程共享进程数据,但也拥有自己的一部分数据。

进程的多个线程共享同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:

  • 文件描述符表
  • 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
  • 当前工作目录
  • 用户id和组id

2.线程控制

2.1线程创建

功能:创建一个新的线程 
原型: int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (*start_routine)(void*), void *arg); 
参数: 
	thread:返回线程ID 
	attr:设置线程的属性,attr为NULL表示使用默认属性 
	start_routine:是个函数地址,线程启动后要执行的函数 
	arg:传给线程启动函数的参数 
返回值:成功返回0;失败返回错误码
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void* thread_run(void* args)
{
	while(1)
	{
		//输出新线程
		printf("我是新线程[%s],我创建的线程ID是: %lu\n",(const char*)args,pthread_self());
		sleep(1);
	}
}
int main()
{
	pthread_t tid;
	//创建新线程
	pthread_create(&tid,NULL,thread_run,(void*)"new  thread");
	while(1)
	{
		printf("我是主线程,我创建的线程ID是: %lu\n",tid);
		sleep(1);
	}
}

注意,pthread_create这个接口不是系统提供的,而是第三方库,所以我们还需要链接pthread
在makefile中应该这样

my_thread:mythread.c
	gcc -o $@ $^ -lpthread
.PHONY:clean
clean:
	rm -f mythread 

在这里插入图片描述
在这里插入图片描述
我们发现主线程的PID和它的LWP(轻量级进程)是一样的,而新的线程PID和主线程一样,LWP不同。
若创建多个线程就可以执行多个任务,但创建多线程会导致健壮性下降,即其中一个线程出现问题,其它线程会受牵连。
不知道大家有没有发现,LWP好像和我们打印出的ID不一样。其实我们查看到的线程ID是pthread库的线程id,不是Linux内核中的LWP,pthread库的线程id是一个内存地址。

2.2线程等待

一般而言,线程也是需要等待的,如果不等待,可能会导致类似于"僵尸进程"的问题。

功能:等待线程结束 
原型: int pthread_join(pthread_t thread, void **retval); 
参数: 
	thread:线程ID 
	retval:输出型参数,它指向一个指针。
返回值:成功返回0;失败返回错误码
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void* thread_run(void* args)
{
	while(1)
	{
		//输出新线程ID
		printf("我是新线程[%s],我创建的线程ID是: %lu\n",(const char*)args,pthread_self());
		sleep(5);
		break;
	}
	return (void*)666;//返回值可以是任何类型,但不能时临时变量。可以是int、对象的地址等
}
int main()
{
	pthread_t tid;
	//创建新线程
	pthread_create(&tid,NULL,thread_run,(void*)"new  thread");
	void* status = NULL;
	//等待线程
	pthread_join(tid,&status);
	printf("ret: %d\n",(int)status);
}

在这里插入图片描述
有人可能会有疑问,线程异常的情况这个函数好像没有处理。实际上根本不需要,因为当这个线程崩掉了,其它线程也会跟着崩,这个时候轮不到线程来处理,而是进程来处理。

2.3线程终止

1.函数中return(main函数中退出return代表主线程和进程退出),其它线程函数return,只代表当前线程退出。
2.新线程通过pthread_exit终止自己。(不能用exit进行退出,exit的对象是进程,进程退出后所有线程也就退出了。)
3.取消目标线程。

功能:线程终止 
原型: void pthread_exit(void *value_ptr); 
参数: 
	retval:输出型参数,它指向一个指针。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者。
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void* thread_run(void* args)
{
	while(1)
	{
		//输出新线程ID
		printf("我是新线程[%s],我创建的线程ID是: %lu\n",(const char*)args,pthread_self());
		sleep(5);
		break;
	}
	pthread_exit((void*)123);
}
int main()
{
	pthread_t tid;
	//创建新线程
	pthread_create(&tid,NULL,thread_run,(void*)"new  thread");
	void* status = NULL;
	//等待线程
	pthread_join(tid,&status);
	printf("ret: %d\n",(int)status);
}

在这里插入图片描述

功能:取消一个执行中的线程 
原型: int pthread_cancel(pthread_t thread); 参数: 
	thread:线程ID 
返回值:成功返回0;失败返回错误码
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void* thread_run(void* args)
{
	while(1)
	{
		//输出新线程ID
		printf("我是新线程[%s],我创建的线程ID是: %lu\n",(const char*)args,pthread_self());
		sleep(2);
		//注意这里必须是死循环,若是循环正常结束则可能导致线程不是被取消而是正常退出的。
	}

}
int main()
{
	pthread_t tid;
	//创建新线程
	pthread_create(&tid,NULL,thread_run,(void*)"new  thread");
	printf("wait sub thread...\n");
	sleep(5);
	printf("cancel sub thread..\n");
	//取消线程
	pthread_cancel(tid);
	void* status = NULL;
	//等待线程
	pthread_join(tid,&status);
	printf("ret: %d\n",(int)status);
}

在这里插入图片描述
线程成功被取消,退出码为-1。-1实际上在系统中是这样定义的。

#define PTHREAD_CANCELED (void*)-1

这个函数也可以用来取消主线程,但一般不会那样做,否则会出现类似于"僵尸进程"的情况。

2.4线程分离

分离之后的线程不需要被等待,运行完毕之后,会自动释放。

功能:分离一个执行中的线程 
原型: int pthread_detach(pthread_t thread); 参数: 
	thread:线程ID 
返回值:成功返回0;失败返回错误码
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void* thread_run(void* args)
{
	pthread_detach(pthread_self());
	while(1)
	{
		//输出新线程ID
		printf("我是新线程[%s],我创建的线程ID是: %lu\n",(const char*)args,pthread_self());
		sleep(2);
		break;
	}
	return (void*)666;
}
int main()
{
	pthread_t tid;
	//创建新线程
	pthread_create(&tid,NULL,thread_run,(void*)"new  thread");
	printf("wait sub thread...\n");
	sleep(1);
	void* status = NULL;
	//等待线程
	pthread_join(tid,&status);
	printf("ret: %d\n",(int)status);
}

在这里插入图片描述
此时并没有拿到我们所预期的返回值,说明线程分离成功了。
因为多个线程是共享地址空间的,也就是很多资源都是共享的。优点是通信方便,缺点是缺乏访问控制。因为一个线程的操作问题,给其他线程造成了不可控,或者引起奔溃、异常,逻辑不正确等这种现象,这就导致了线程的安全问题。如果创建一个函数想要没有线程安全问题,就不要使用全局变量、STL、malloc、new等会在全局内有效的数据。因为如果都是局部变量,线程有自己的独立栈结构。

一张图总结线程与其它内核结构的关系。
在这里插入图片描述

2.5线程互斥

  1. 临界资源:被线程共享访问的资源。
  2. 临界区:代码中访问临界资源的代码
  3. 互斥或同步是对临界区进行保护的功能。本质是对临界资源的保护
  4. 互斥:在任意时刻,只允许一个执行流访问某段代码。
  5. 同步:一般而言,让访问临界资源的过程在安全的前提下,让访问资源都具有一定的顺序性
  6. 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。

让我们通过一个例子来感受一下互斥机制的必要性。

#include <iostream>
#include <cstdio>
#include <string>
#include <ctime>
#include <mutex>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// 抢票逻辑,1000张票,5线程同时再抢
int tickets = 1000;

void *ThreadRoutine(void *args)
{
    int id = *(int*)args;
    delete (int*)args;
    while(true)
    {
        if(tickets > 0)
        {
            usleep(1000);//1000微秒
            cout << "我是[" << id << "] 我要抢的票是: " << tickets << endl;
            tickets--;
        }
        else
        {
            break;
        }
    }

}

int main()
{
    pthread_t tid[5];

    for(int i = 0; i < 5; i++)
    {
        int *id = new int(i);
        pthread_create(tid+i, nullptr, ThreadRoutine,id);
    }

    for(int i = 0 ; i < 5; i++)
    {
        pthread_join(tid[i], nullptr);
    }
    return 0;
}

在这里插入图片描述
你会发现最终票的数量成负数了。这是什么原因呢?
假设有一个线程A和一个线程B,刚开始线程A执行while循环里的代码,即把tickets加载到CPU做运算,假设tickets减减了10次后时间片到了被切走了,此时还没有把数据反馈给内存,线程A就带着它的上下文数据放到等待队列中了,然后线程B来了,对tickets减减了900次并反馈给内存后被切走,此时线程A又来了,带着它的上下文数据,即线程A中tickets保存的是990,然后tickets减减100次为890,最后反馈到内存中,此时890把原来的100给覆盖了。那为什么会出现票数减到负数的情况?
在这里插入图片描述
假设线程1执行到这段代码时被切走且tickets为1,此时线程2和线程3进来了,而线程2和线程3是继续执行这段代码之后的代码,都还没来得及判断就减减了,此时两个线程减减了两次就导致了出现负数。这段区域的代码就是临界区,而tickets就是临界资源。
要想解决这个问题,就得用一把"锁"来控制。

#include <iostream>
#include <cstdio>
#include <string>
#include <ctime>
#include <mutex>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// 抢票逻辑,1000张票,5线程同时再抢
class Ticket
{
private:
    int tickets;
    pthread_mutex_t mtx;
public:
    Ticket():tickets(1000)
    {
        //初始化锁
        pthread_mutex_init(&mtx,nullptr);
    }
    bool GetTicket()
    {
        bool res = true;
        //上锁
        usleep(100);
        pthread_mutex_lock(&mtx);
        //上锁之后以下代码就只能串行执行
        if(tickets > 0)
        {
            usleep(1000);//1000微秒
            cout << "我是[" << pthread_self() << "] 我要抢的票是: " << tickets << endl;
            tickets--;
        }
        else
        {
            cout << "票被抢空" << endl;
            res = false;
        }
        //解锁
        pthread_mutex_unlock(&mtx);
        return res;

    }
    ~Ticket()
    {
    	//销毁锁
        pthread_mutex_destroy(&mtx);
    }
};
void *ThreadRoutine(void *args)
{
    Ticket* t = (Ticket*)args;

    while(true)
    {
    	//抢票失败则退出
        if(!t->GetTicket())
            break;
    }

}
int main()
{
    Ticket* t = new Ticket();
    pthread_t tid[5];

    for(int i = 0; i < 5; i++)
    {
        int* id = new int(i);
        pthread_create(tid+i, nullptr, ThreadRoutine,(void*)t);
    }

    for(int i = 0 ; i < 5; i++)
    {
        pthread_join(tid[i], nullptr);
    }
    return 0;
}

在这里插入图片描述
这样就是一个线程安全的抢票逻辑了。
锁的原理
我们能用锁的前提是锁本身是安全的,那为什么锁本身是安全的呢?
在Linux中,只有一行汇编的是原子的(即安全的)。
在这里插入图片描述
在这里插入图片描述
此时即使其它线程执行这段代码也只是跟0进行交换,这就保证了锁的原子性。
在这里插入图片描述
线程被切走后保留上下文数据保证了锁的原子性,即使在临界区线程被切走,但是保留了上下文数据,相当于你把锁也给带走了,所以其它线程也进不了临界区。
可重入VS线程安全

  • 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
  • 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

常见的线程不安全的情况

  • 不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数

常见的线程安全的情况

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
  • 类或者接口对于线程来说都是原子操作
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

  • 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
  • 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
  • 可重入函数体内使用了静态的数据结构

常见可重入的情况

  • 不使用全局变量或静态变量
  • 不使用用malloc或者new开辟出的空间
  • 不调用不可重入函数
  • 不返回静态或全局数据,所有数据都有函数的调用者提供
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

可重入与线程安全联系

  • 函数是可重入的,那就是线程安全的
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
  • 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

可重入与线程安全区别

  • 可重入函数是线程安全函数的一种
  • 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
  • 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。

红色的字要怎么理解?
举个例子

void insert(Node** node)
{
	lock();
	//当执行到这时线程被切走,此时会发送信号,会调用信号捕捉函数
	//
	unlock();
}
void handler(Node** node)
{
	insert(*Node);//信号捕捉函数里还有一个insert,此时已经没有锁可以申请了,而这个线程却还在等待锁,这就造成了死锁问题
}
int main()
{
	Node* node;
	insert(&Node);
}
//这个是线程安全的但不是可重入的。

2.6线程同步

条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
直接来看代码感受一下

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mtx;
pthread_cond_t cond;

//ctrl thread 控制work线程,让他定期运行
void *ctrl(void *args)
{
    std::string name = (char*)args;
    while(true)
    {
        sleep(1);//这里休眠的目的是留出时间让线程B进入等待队列
        std::cout << "master say : begin work" << std::endl;
        //唤醒在条件变量在cond 等待队列里等待的第一个线程
        pthread_cond_signal(&cond); 
    }
}

void *work(void *args)
{
    int number = *(int*)args;
    delete (int*)args;

    while(true)
    {
        pthread_cond_wait(&cond, &mtx);
        std::cout << "worker: " << number << " is working ..." << std::endl;
    }
}
#define NUM 3
int main()
{
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t master;
    pthread_t worker[NUM];
    pthread_create(&master, nullptr, ctrl, (void*)"boss");
    for(int i = 0; i < NUM; i++)
    {
        int *number = new int(i);
        pthread_create(worker+i, nullptr, work, (void*)number);
    }

    for(int i = 0; i < NUM; i++)
    {
        pthread_join(worker[i], nullptr);
    }
    pthread_join(master, nullptr);

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);
    
    return 0;
}

在这里插入图片描述
当然,我们也可以一次唤醒所有线程

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mtx;
pthread_cond_t cond;

//ctrl thread 控制work线程,让他定期运行
void *ctrl(void *args)
{
    std::string name = (char*)args;
    while(true)
    {
       
        std::cout << "master say : begin work" << std::endl;
        //唤醒在条件变量在cond 等待队列里等待的第一个线程
        //pthread_cond_signal(&cond); 
        //唤醒所有线程
        pthread_cond_broadcast(&cond);
        sleep(1);
    }
}

void *work(void *args)
{
    int number = *(int*)args;
    delete (int*)args;

    while(true)
    {
        pthread_cond_wait(&cond, &mtx);
        std::cout << "worker: " << number << " is working ..." << std::endl;
    }
}
#define NUM 3
int main()
{
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t master;
    pthread_t worker[NUM];
    pthread_create(&master, nullptr, ctrl, (void*)"boss");
    for(int i = 0; i < NUM; i++)
    {
        int *number = new int(i);
        pthread_create(worker+i, nullptr, work, (void*)number);
    }

    for(int i = 0; i < NUM; i++)
    {
        pthread_join(worker[i], nullptr);
    }
    pthread_join(master, nullptr);

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);
    
    return 0;
}

在这里插入图片描述
这个时候所有线程就是一起出现的。
生产者-消费者模型
在这里插入图片描述
从生活的角度理解,假设有一家超市,你去超市购物你就算消费者,那生产者是谁?是超市吗?显然不是,是供货商,超市只是个交易场所。
生产者-消费者模型反映出3种关系,2种角色,1个交易场所。
3种关系: 即供货商与供货商之间的竞争关系(从线程来看就是互斥)、消费者和消费者间的竞争关系(从线程来看就是互斥)、供货商和消费者之间的互斥关系(从线程来看就是同步)。
2种角色: 生产者和消费者。
1个交易场所: 超市。

基于BlockingQueue的生产者消费者模型
在这里插入图片描述
进程间通信的本质就是生产消费模型。
接下来通过一个案例来深入理解生产者-消费者模型
BlockQueue.hpp

//.hpp -> 开源软件使用 -> 声明和实现可以放在一个文件里
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>

namespace ns_blockqueue
{
    const int default_cap = 5;//默认大小

    template <class T>
    class BlockQueue
    {
    private:
        std::queue<T> bq_;    //阻塞队列
        int cap_;             //队列的元素上限
        pthread_mutex_t mtx_; //保护临界资源的锁
        // 1. 当生产满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费
        // 2. 当消费空了,就不应该消费(不要竞争锁了),应该让生产者来进行生产
        pthread_cond_t is_full_; //队列是否满的条件变量
        pthread_cond_t is_empty_; //队列是否为空的条件变量
    private:
        bool IsFull()
        {
            return bq_.size() == cap_;
        }
        bool IsEmpty()
        {
            return bq_.size() == 0;
        }
        void LockQueue()
        {
            pthread_mutex_lock(&mtx_);
        }
        void UnlockQueue()
        {
            pthread_mutex_unlock(&mtx_);
        }
        void ProducterWait()
        {
            // pthread_cond_wait
            // 1. 调用的时候,会首先自动释放mtx_,然后再挂起自己,此时让消费者来竞争锁从队列中拿数据
            // 2. 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
            pthread_cond_wait(&is_empty_, &mtx_);
        }
        void ConsumerWait()
        {
            pthread_cond_wait(&is_full_, &mtx_);
        }
        void WakeupComsumer()
        {
            pthread_cond_signal(&is_full_);
        }
        void WakeupProducter()
        {
            pthread_cond_signal(&is_empty_);
        }

    public:
        BlockQueue(int cap = default_cap) : cap_(cap)
        {
            pthread_mutex_init(&mtx_, nullptr);
            pthread_cond_init(&is_empty_, nullptr);
            pthread_cond_init(&is_full_, nullptr);
        }
        ~BlockQueue()
        {
            pthread_mutex_destroy(&mtx_);
            pthread_cond_destroy(&is_empty_);
            pthread_cond_destroy(&is_full_);
        }

    public:
        void Push(const T &in)
        {
            LockQueue();
            //临界区
            //我们需要进行条件检测的时候,这里需要使用循环方式
            //来保证退出循环一定是因为条件不满足导致的!
            while (IsFull())
            {
                //等待的,把线程挂起,我们当前是持有锁的。
                ProducterWait();
            }
            //向队列中放数据,生产函数
            bq_.push(in);

            UnlockQueue();
            WakeupComsumer();
        }

        void Pop(T *out)
        {
            LockQueue();
            //从队列中拿数据。
            //我们需要进行条件检测的时候,这里需要使用循环方式
            //来保证退出循环一定是因为条件不满足导致的。
            while (IsEmpty())
            { 
                //无法消费
                ConsumerWait();
            }
            *out = bq_.front();//拿到pop掉的数据
            bq_.pop();

            UnlockQueue();
            WakeupProducter();
        }
    };
}

Main.c++

#include "BlockQueue.hpp"
#include "Task.hpp"

#include <time.h>
#include <cstdlib>
#include <unistd.h>

using namespace ns_blockqueue;
using namespace ns_task;

void *consumer(void *args)
{
    BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
    while (true)
    {
        Task t;
        bq->Pop(&t); //这里完成了任务消费的第1步
        t();         //这里完成了任务消费的第2步
    }
}

void *producter(void *args)
{
    BlockQueue<Task> *bq = (BlockQueue<Task> *)args;
    std::string ops = "+-*/%";
    while (true)
    {
        int x = rand() % 20 + 1; //[1,20]
        int y = rand() % 10 + 1; //[1,10]
        char op = ops[rand() % 5];
        Task t(x, y, op);
        std::cout << "生产者派发了一个任务: " << x << op << y << "=?" << std::endl;
        // 2. 将数据推送到任务队列中
        bq->Push(t);
        sleep(1);
    }
}
#define Num 5
int main()
{
    srand((long long)time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t p, c[Num];
    pthread_create(&p, nullptr, producter, (void *)bq);
    for (int i = 0; i < Num; i++)
    {
        pthread_create(c + i, nullptr, consumer, (void *)bq);
    }
    pthread_join(p, nullptr);
    for (int i = 0; i < Num; i++)
    {
        pthread_join(c[i], nullptr);
    }

    return 0;
}

Task.hpp

#pragma once

#include <iostream>
#include <pthread.h>

namespace ns_task
{
    class Task
    {
    private:
        int x_;
        int y_;
        char op_;

    public:
        Task() {}
        Task(int x, int y, char op) : x_(x), y_(y), op_(op) {}
        int Run()
        {
            int res = 0;
            switch (op_)
            {
            case '+':
                res = x_ + y_;
                break;
            case '-':
                res = x_ - y_;
                break;
            case '*':
                res = x_ * y_;
                break;
            case '/':
                res = x_ / y_;
                break;
            case '%':
                res = x_ % y_;
                break;
            default:
                std::cout << "Error!" << std::endl;
                break;
            }
            std::cout << "当前任务正在被: " << pthread_self() << " 处理: "
                      << x_ << op_ << y_ << "=" << res << std::endl;
            return res;
        }
        //重载()
        int operator()()
        {
            return Run();
        }
        ~Task() {}
    };
}

在这里插入图片描述
从输出的数据来看,确实符合队列先进先出的特点。

3.信号量

3.1概念

在学进程的时候我们已经讲过信号量的概念了。信号量的本质就是一个计数器,描述临界资源中资源数目的大小。临界资源如果可以被划分成为一个一个的小资源,就可能实现让多个线程同时访问临界资源的不同区域,从而实现并发。

每个线程要想访问临界资源,都得先申请信号量资源。

3.2信号量函数

初始化信号量

#include <semaphore.h> 
int sem_init(sem_t *sem, int pshared, unsigned int value); 
参数:
	pshared:0表示线程间共享,非零表示进程间共享 
	value:信号量初始值

等待信号量

功能:等待信号量,会将信号量的值减1 
int sem_wait(sem_t *sem); //P操作

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1int sem_post(sem_t *sem);//V操作

销毁信号量

int sem_destroy(sem_t *sem);

基于环形队列的生产消费模型
ring_queue.hpp

#pragma once

#include <iostream>
#include <vector>
#include <semaphore.h>

namespace ns_ring_queue
{
    const int g_cap_default = 10;

    template <class T>
    class RingQueue
    {
    private:
        std::vector<T> ring_queue_;
        int cap_;
        sem_t blank_sem_;
        sem_t data_sem_;

        int c_step_;
        int p_step_;

    public:
        RingQueue(int cap = g_cap_default) : ring_queue_(cap), cap_(cap)
        {
            sem_init(&blank_sem_, 0, cap);
            sem_init(&data_sem_, 0, 0);
            c_step_ = p_step_ = 0;
        }
        ~RingQueue()
        {
            sem_destroy(&blank_sem_);
            sem_destroy(&data_sem_);
        }

    public:
        void Push(const T &in)
        {
            //生产接口
            sem_wait(&blank_sem_); // P(空位置)

            ring_queue_[p_step_] = in;
            sem_post(&data_sem_); // V(数据)

            p_step_++;
            p_step_ %= cap_;
        }
        void Pop(T *out)
        {
            //消费接口
            sem_wait(&data_sem_);
            *out = ring_queue_[c_step_];
            sem_post(&blank_sem_);

            c_step_++;
            c_step_ %= cap_;
        }
    };
}

ring.c++

#include "ring_queue.hpp"
#include <pthread.h>
#include <time.h>
#include <unistd.h>

using namespace ns_ring_queue;

void *consumer(void *args)
{
    RingQueue<int> *rq = (RingQueue<int> *)args;
    while (true)
    {
        int data = 0;
        rq->Pop(&data);
        std::cout << "消费数据是: " << data << std::endl;
    }
}

void *producter(void *args)
{
    RingQueue<int> *rq = (RingQueue<int> *)args;
    while (true)
    {
        int data = rand() % 20 + 1;
        std::cout << "生产数据是:  " << data << std::endl;
        rq->Push(data);
        sleep(1);
    }
}

int main()
{
    srand((long long)time(nullptr));
    RingQueue<int> *rq = new RingQueue<int>();

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, (void *)rq);
    pthread_create(&p, nullptr, producter, (void *)rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    return 0;
}

在这里插入图片描述

喜欢这篇文章的可以给个一键三连点赞👍关注💡收藏💖

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:18:51  更:2022-05-21 19:19:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 0:02:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码