IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> 生产者消费者问题 -> 正文阅读

[C++知识库]生产者消费者问题

生产者消费者是极其经典的并发同步模型,描述了在共享固定大小缓冲区下,生产者生产一定量数据放入缓冲区,而消费者则从缓冲区消费取出一定量数据。

生产者消费者问题还有一个名字是有限缓冲问题。

生产者消费者更加强调了两边写缓冲线程的角色,而有限缓冲则将目光聚焦到中间的缓冲

semaphore 实现

semaphore(信号量)是一个变量或者抽象数据类型,被用于控制在并发系统临界区问题多个线程对公共资源的访问。与自旋锁、栅栏一起作为同步手段。一个普通的信号量是单纯取决于程序员定义条件而改变的变量。

可以将信号量看作是特定资源的数量记录,并耦合调整该记录的安全操作。

生产者消费者的有限缓冲实际上就可以看作是特定资源,因此可以使用信号量来记录有限缓冲的数量。

以下是c++版本的实现,当然为了控制对buffer写操作时可能碰到的多个生产者消费者同时取放数据导致的写冲突,也在其中加了互斥锁

#include <thread>
#include <mutex>
#include <semaphore>

std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;

void producer(){
    for(;;) {
        Portion portion = produce_next_portion();
        number_of_empty_positions.acquire();
        {
            std::lock_guard<std::mutex> g(buffer_manipulation);
            add_portion_to_buffer(portion);
        }
        number_of_queueing_portions.release();
    }
}

void consumer(){
    for(;;) {
        number_of_queueing_portions.acquire();
        Portion portion;
        {
            std::lock_guard<std::mutex> g(buffer_manipulation);
            portion = take_portion_from_buffer();
        }
        number_of_empty_positions.release();
        process_portion_taken(portion);
    }
}

int main(int argc, char const *argv[])
{
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}

monitor实现

monitor(管程)是允许线程具有互斥、等待(堵塞)某个条件为false的能力的抽象数据结构。还具有通知其他线程他们特定条件已经满足的机制,以及让他们暂时放弃独占访问,以便等待某些条件满足,然后重新获取独占访问并恢复他们的任务。

管程由互斥锁以及特定条件变量组成。条件变量本质上是等待特定条件的线程的容器

管程和信号量一个明显的区别就是信号量是对共享资源数量的记录,wait(),notify()是仅有能修改共享资源数量的记录的方法,并且这种修改是互斥的

管程则是条件变量(等待满足特定条件线程容器)以及操作该变量的程序

class Bounded_buffer{
    Portion buffer[N];
    unsigned head, tail;
    unsigned count;
    std::condition_variable nonempty, nonfull;
    std::mutex mtx;

public:
    void append(Portion x){
        std::unique_lock<std::mutex> lck(mtx);
        nonfull.wait(lck, [&]{return !(N==count);});
        assert(0<=count && count < N)
        buffer[tail++]=x;
        tail%=N;
        ++count;
        nonempty.notify_one();
    }

    Portion remove() {
        std::unique_lock<std::mutex> lck(mtx);
        nonempty.wait(lck,[&]{return !(0==count);});
        assert(0 < count && count <= N)
        Portion x = buffer[head++];
        head %= N;
        --count;
        nonfull.notify_one();
        return x;
    }

    Bounded_buffer() {
        head = 0; tail = 0; count = 0;
    }
}

channel实现

channel是一种通过消息传递实现进程间通信和同步的模型。消息可以通过通道发送,而另一个进程或线程能够接收通过它引用channel发送的消息,比如流。通道不同实现可以被缓冲,也可以不被缓冲,可以是异步或者同步的。

channel实现在golang中是极为优雅的

var element = 0

func produce() int {
	element++
	return element
}

func consume(e int) {
	// consume element
}

const (
	producerCount = 2
	consumerCount = 2
	bufferSize = 1
)

func main() {
	ch:=make(chan int, bufferSize)
	for i := 0; i < producerCount; i++ {
		go func() {
			ch<-produce()
		}()
	}

	for j := 0; j < consumerCount; j++ {
		go func() {
			consume(<-ch)
		}()
	}
  
  // assume main will wait other goroutines
}

无semaphore、monitors实现

在单生产者和消费者的时候,可以定义一个容量为b(b>=1)的buffer。使k为大于b的常数并为b的倍数,s、r为0到k-1之间的整数。

在初始化时s=r且buffer为空。生产者放入消息到buffer[s mod b],消费者取出消息buffer[r mod b]

生产者消费者最重要的是互斥有限缓冲的写操作以及同步生产者消费者。在原子操作下的buffer中同一个位置不可能同时存在有资源以及没有资源两种情况,因此无需互斥。单生产者消费者,仅仅只需要考虑生产者和消费者之间同步,而忙等待判断是否为空或者超出容量做到了这点

当然还要考虑到在调度器切换时,可能一个线程读取了变量值,切换到第二个线程更改了该值,再切回来,那么第一个线程将使用旧值,而不是当前值,因此需要通过原子操作来解决这个问题

enum {
    N = 4
};
Message buffer[N];
std::atomic<unsigned> count{0};

void producer() {
    unsigned tail{0};
    for (;;) {
        Message msg = produceMessage();
        while (N == count);
        buffer[tail++] = msg;
        tail%=N;
        count.fetch_add(1,std::memory_order_relaxed);
    }
}

void consumer(){
    unsigned head {0};
    for(;;){
        while (count==0);
        Message msg = buffer[head++];
        head%=N;
        count.fetch_sub(1,std::memory_order_relaxed);
        consumeMessage(msg);
    }
}

注意代码中while的使用,这是为了防止一些竞争情况的出现。比如某消费者在数据放入缓冲区时被唤醒,但另一个消费者在管程上等待了一段时间并移除了该数据。所以如果是if,可能会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。

Ref

  1. https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
  2. https://en.wikipedia.org/wiki/Monitor_(synchronization)
  3. https://en.wikipedia.org/wiki/Semaphore
  4. https://en.wikipedia.org/wiki/Channel_(programming)
  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:22:53  更:2022-04-18 17:23:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/11 0:14:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码