IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> linux下C语言实现多线程通信—环形缓冲区可用于生产者(producer)/消费者(consumer) -> 正文阅读

[C++知识库]linux下C语言实现多线程通信—环形缓冲区可用于生产者(producer)/消费者(consumer)

一、概念引入
日常生活中,每当我们缺少某些生活用品时,我们都会去超市进行购买,那么,你有没有想过,你是以什么身份去的超市呢?相信大部分人都会说自己是消费者,确实如此,那么既然我们是消费者,又是谁替我们生产各种各样的商品呢?当然是超市的各大供货商,自然而然地也就成了我们的生产者。如此一来,生产者有了,消费者也有了,那么将二者联系起来的超市又该作何理解呢?诚然,它本身是作为一座交易场所而诞生。

将上述场景例比到我们实际的软件开发过程中,经常会见到这样一幕:代码的某个模块负责生产数据(供货商),而生产出来的数据却不得不交给另一模块(消费者)来对其进行处理,在这之间我们必须要有一个类似上述超市的东西来存储数据(超市),这就抽象除了我们的生产者/消费者模型。
其中,产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者;生产者和消费者之间的中介就叫做缓冲区。

三者之间的结构图:
这里写图片描述
在这里插入图片描述

为了方便理解,再列举一个寄信的例子:
1、你把信写好——相当于生产者制造数据
2、你把信放入邮筒——相当于生产者把数据放入缓冲区
3、邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区
4、邮递员把信拿去邮局做相应的处理——相当于消费者处理数据

二、为什么要使用生产者消费者模型
归根结底来说,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

三、生产者/消费者模型的优点
1、解耦,即降低生产者和消费者之间的依赖关系。
例如上述写信的例子,如果不使用邮筒(也就是缓区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了 )。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。

2、支持并发,即生产者和消费者可以是两个独立的并发主体,互不干扰的运行。
从寄信的例子来看。如果没有邮筒,你得拿着信傻站在路口等邮递员过来收(相当于生产者阻塞);又或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。不管是哪种方法,效率都比较低。

3、支持忙闲不均,如果制造数据的速度时快时慢,缓冲区可以对其进行适当缓冲。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来时再拿走。

四、生产者/消费者模型的记忆原则
为了方便记忆,我对其进行了如下总结:
三二一原则:三种关系、两个角色、一个场所
三种关系:
1生产者与生产者(互斥)
2生产者与消费者(同步与互斥)
3消费者与消费者(互斥)
两个角色:
1生产者
2消费者
一个场所:
1缓冲区

五、生产者/消费者模型的学习旅程
1、确定数据单元
★啥是数据单元
向缓冲区拿放数据的一个基本数据单元。简单地说,每次生产者放到缓冲区的,就是一个数据单元;每次消费者从缓冲区取出的,也是一个数据单元。
★数据单元的特性
◇关联到业务对象:数据单元必须关联到某种业务对象
◇完整性:保证每一个数据单元的完整
◇独立性:各个数据单元之间没有互相依赖
◇颗粒度:业务对象和数据单元之间的对应比例

2、学习队列缓冲区
★线程方式
◇内存分配的性能:内存分配的开销问题—->环形缓冲区
◇同步和互斥的性能:例如信号量、互斥量等的开销—->双缓冲区
◇适用于队列的场合:适用于数据流量不是很大的场合
★进程方式
◇匿名管道:生产者进程在管道的写端放入数据;消费者
进程在管道的读端取出数据。
好处:
1》跨平台发方便。
2》跨语言方便。
3》有利于降低开发、调试成本。
不足:
1》生产者进程和消费者进程必须得在同一台主机上,无法跨机器通讯。
2》只适用于一对一通信。
3》在某些情况下,程序不便于对管道进行操纵(比如调整管道缓冲区尺寸)。
4》只能单向通信。
◇SOCKET(TCP方式):类似IPC方式,同样保证了
数据的顺序到达;同样有缓冲的机制。
优点:
1》可以跨机器(便于实现分布式)。
2》便于将来扩展成为多对一或者一对多。
3》可以设置阻塞和非阻塞方法,用起来比较灵活。
4》支持双向通讯,有利于消费者反馈信息。

前言:
在嵌入式开发中,只要是带操作系统的,在其上开发产品应用,基本都需要用到多线程。
为了提高效率,尽可能的提高并发率。因此,线程之间的通信就是问题的核心。
根据当前产品需要,使用 环形缓冲区 解决。
一,环形缓冲区的实现
cbuf.h

#ifndef __CBUF_H__
#define __CBUF_H__

#ifdef __cplusplus
extern "C" {
#endif

/* Define to prevent recursive inclusion
-------------------------------------*/
#include "types.h"
#include "thread.h"


typedef    struct _cbuf
{
    int32_t        size;            /* 当前缓冲区中存放的数据的个数 */
    int32_t        next_in;        /* 缓冲区中下一个保存数据的位置 */
    int32_t        next_out;        /* 从缓冲区中取出下一个数据的位置 */
    int32_t        capacity;        /* 这个缓冲区的可保存的数据的总个数 */
    mutex_t        mutex;            /* Lock the structure */
    cond_t        not_full;        /* Full -> not full condition */
    cond_t        not_empty;        /* Empty -> not empty condition */
    void        *data[CBUF_MAX];/* 缓冲区中保存的数据指针 */
}cbuf_t;


/* 初始化环形缓冲区 */
extern    int32_t        cbuf_init(cbuf_t *c);

/* 销毁环形缓冲区 */
extern    void        cbuf_destroy(cbuf_t    *c);

/* 压入数据 */
extern    int32_t        cbuf_enqueue(cbuf_t *c,void *data);

/* 取出数据 */
extern    void*        cbuf_dequeue(cbuf_t *c);


/* 判断缓冲区是否为满 */
extern    bool        cbuf_full(cbuf_t    *c);

/* 判断缓冲区是否为空 */
extern    bool        cbuf_empty(cbuf_t *c);

/* 获取缓冲区可存放的元素的总个数 */
extern    int32_t        cbuf_capacity(cbuf_t *c);


#ifdef __cplusplus
}
#endif

#endif
/* END OF FILE
---------------------------------------------------------------*/

cbuf.c

#include "cbuf.h"



/* 初始化环形缓冲区 */
int32_t        cbuf_init(cbuf_t *c)
{
    int32_t    ret = OPER_OK;

    if((ret = mutex_init(&c->mutex)) != OPER_OK)    
    {
#ifdef DEBUG_CBUF
    debug("cbuf init fail ! mutex init fail !\n");
#endif
        return ret;
    }

    if((ret = cond_init(&c->not_full)) != OPER_OK)    
    {
#ifdef DEBUG_CBUF
    debug("cbuf init fail ! cond not full init fail !\n");
#endif
        mutex_destroy(&c->mutex);
        return ret;
    }

    if((ret = cond_init(&c->not_empty)) != OPER_OK)
    {
#ifdef DEBUG_CBUF
    debug("cbuf init fail ! cond not empty init fail !\n");
#endif
        cond_destroy(&c->not_full);
        mutex_destroy(&c->mutex);
        return ret;
    }

    c->size     = 0;
    c->next_in    = 0;
    c->next_out = 0;
    c->capacity    = CBUF_MAX;

#ifdef DEBUG_CBUF
    debug("cbuf init success !\n");
#endif

    return ret;
}


/* 销毁环形缓冲区 */
void        cbuf_destroy(cbuf_t    *c)
{
    cond_destroy(&c->not_empty);
    cond_destroy(&c->not_full);
    mutex_destroy(&c->mutex);

#ifdef DEBUG_CBUF
    debug("cbuf destroy success \n");
#endif
}



/* 压入数据 */
int32_t        cbuf_enqueue(cbuf_t *c,void *data)
{
    int32_t    ret = OPER_OK;

    if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return ret;

    /*
     * Wait while the buffer is full.
     */
    while(cbuf_full(c))
    {
#ifdef DEBUG_CBUF
    debug("cbuf is full !!!\n");
#endif
        cond_wait(&c->not_full,&c->mutex);
    }

    c->data[c->next_in++] = data;
    c->size++;
    c->next_in %= c->capacity;

    mutex_unlock(&c->mutex);

    /*
     * Let a waiting consumer know there is data.
     */
    cond_signal(&c->not_empty);

#ifdef DEBUG_CBUF
//    debug("cbuf enqueue success ,data : %p\n",data);
    debug("enqueue\n");
#endif

    return ret;
}



/* 取出数据 */
void*        cbuf_dequeue(cbuf_t *c)
{
    void     *data     = NULL;
    int32_t    ret     = OPER_OK;

    if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return NULL;

       /*
     * Wait while there is nothing in the buffer
     */
    while(cbuf_empty(c))
    {
#ifdef DEBUG_CBUF
    debug("cbuf is empty!!!\n");
#endif
        cond_wait(&c->not_empty,&c->mutex);
    }

    data = c->data[c->next_out++];
    c->size--;
    c->next_out %= c->capacity;

    mutex_unlock(&c->mutex);


    /*
     * Let a waiting producer know there is room.
     * 取出了一个元素,又有空间来保存接下来需要存储的元素
     */
    cond_signal(&c->not_full);

#ifdef DEBUG_CBUF
//    debug("cbuf dequeue success ,data : %p\n",data);
    debug("dequeue\n");
#endif

    return data;
}


/* 判断缓冲区是否为满 */
bool        cbuf_full(cbuf_t    *c)
{
    return (c->size == c->capacity);
}

/* 判断缓冲区是否为空 */
bool        cbuf_empty(cbuf_t *c)
{
    return (c->size == 0);
}

/* 获取缓冲区可存放的元素的总个数 */
int32_t        cbuf_capacity(cbuf_t *c)
{
    return c->capacity;
}

二,辅助文件
为了提高程序的移植性,对线程相关进行封装。
1,thread.h

#ifndef __THREAD_H__
#define __THREAD_H__

#ifdef __cplusplus
extern "C" {
#endif

/* Define to prevent recursive inclusion
-------------------------------------*/
#include "types.h"





typedef    struct _mutex
{
    pthread_mutex_t        mutex;
}mutex_t;


typedef    struct _cond
{
    pthread_cond_t        cond;
}cond_t;


typedef    pthread_t        tid_t;
typedef    pthread_attr_t    attr_t;
typedef    void*    (* thread_fun_t)(void*);


typedef    struct _thread
{
    tid_t            tid;
    cond_t            *cv;
    int32_t            state;
    int32_t            stack_size;
    attr_t         attr;
    thread_fun_t    fun;
}thread_t;



/* mutex */
extern    int32_t        mutex_init(mutex_t    *m);
extern    int32_t        mutex_destroy(mutex_t    *m);
extern    int32_t        mutex_lock(mutex_t    *m);
extern    int32_t        mutex_unlock(mutex_t    *m);


/* cond */
extern    int32_t        cond_init(cond_t    *c);
extern    int32_t        cond_destroy(cond_t    *c);
extern    int32_t        cond_signal(cond_t *c);
extern    int32_t        cond_wait(cond_t    *c,mutex_t *m);



/* thread */
/* 线程的创建,其属性的设置等都封装在里面 */
extern    int32_t        thread_create(thread_t *t);
//extern    int32_t        thread_init(thread_t    *t);

#define    thread_join(t, p)     pthread_join(t, p)
#define    thread_self()        pthread_self()
#define    thread_sigmask        pthread_sigmask


#ifdef __cplusplus
}
#endif

#endif
/* END OF FILE
---------------------------------------------------------------*/

2,thread.c

#include "thread.h"




/* mutex */
int32_t        mutex_init(mutex_t    *m)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_mutex_init(&m->mutex, NULL)) != 0)
        ret = -THREAD_MUTEX_INIT_ERROR;

    return ret;
}


int32_t        mutex_destroy(mutex_t    *m)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_mutex_destroy(&m->mutex)) != 0)
        ret = -MUTEX_DESTROY_ERROR;

    return ret;
}



int32_t        mutex_lock(mutex_t    *m)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_mutex_lock(&m->mutex)) != 0)
        ret = -THREAD_MUTEX_LOCK_ERROR;

    return ret;
}



int32_t        mutex_unlock(mutex_t    *m)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_mutex_unlock(&m->mutex)) != 0)
        ret = -THREAD_MUTEX_UNLOCK_ERROR;
    
    return ret;
}






/* cond */
int32_t        cond_init(cond_t    *c)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_cond_init(&c->cond, NULL)) != 0)
        ret = -THREAD_COND_INIT_ERROR;

    return ret;
}



int32_t        cond_destroy(cond_t    *c)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_cond_destroy(&c->cond)) != 0)
        ret = -COND_DESTROY_ERROR;
    
    return ret;
}



int32_t        cond_signal(cond_t *c)
{
    int32_t        ret = OPER_OK;


    if((ret = pthread_cond_signal(&c->cond)) != 0)
        ret = -COND_SIGNAL_ERROR;
    
    return ret;
}




int32_t        cond_wait(cond_t    *c,mutex_t *m)
{
    int32_t        ret = OPER_OK;

    if((ret = pthread_cond_wait(&c->cond, &m->mutex)) != 0)
        ret = -COND_WAIT_ERROR;    
    
    return ret;
}

/*

  • cbuf begin
    */
    #define OVER (-1)

static cbuf_t cmd;
static int line_1[200];
static int line_2[200];
//static int temp = 0;

static bool line1_finish = false;
static bool line2_finish = false;

void* producer_1(void *data)
{
int32_t i = 0;

for(i = 0; i < 200; i++)
{
    line_1[i] = i+1000;
    cbuf_enqueue(&cmd, &line_1[i]);

    if(0 == (i % 9)) sleep(1);
}

line1_finish = true;

return NULL;

}

void* producer_2(void *data)
{
int32_t i = 0;

for(i = 0; i < 200; i++)
{
    line_2[i] = i+20000;
    cbuf_enqueue(&cmd, &line_2[i]);

    if(0 == (i % 9)) sleep(1);
}

line2_finish = true;

return NULL;

}

void* consumer(void *data)
{
int32_t *ptr = NULL;

while(1)
{
    ptr = cbuf_dequeue(&cmd);
    printf("%d\n",*ptr);

    if(cbuf_empty(&cmd) && line2_finish && line1_finish)
    {
        printf("quit\n");
        break;
    }
}

return NULL;

}

三,测试
1,测试代码

void    test_cbuf_oper(void)
{
    pthread_t    l_1;
    pthread_t    l_2;
    pthread_t    c;
    
    cbuf_init(&cmd);

    pthread_create(&l_1,NULL,producer_1,0);
    pthread_create(&l_2,NULL,producer_2,0);
    pthread_create(&c,NULL,consumer,0);

    pthread_join(l_1,NULL);
    pthread_join(l_2,NULL);
    pthread_join(c,NULL);

    cbuf_destroy(&cmd);
}


void    test_cbuf(void)
{
    test_cbuf_oper();
}


/*
 * cbuf end
 */

在这里插入图片描述

  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 11:56:22  更:2021-08-26 11:58:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/14 8:18:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码