IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> c++11基于原子操作的速度极快的多线程无锁队列 -> 正文阅读

[C++知识库]c++11基于原子操作的速度极快的多线程无锁队列

一、对new(delete),mutex,自旋锁,std::atomic的性能测试

在windows下 测试 release版本的如下代码:10000000个rand()相加。

{//速度测试
		   long tt1, tt2;
		long total = 0;
		std::mutex mutex_test;
		SpinHybridLock spin_test;
		std::atomic<int> a_total = 0;
		srand(0);
		total = 0;
		tt1 = GetTickCount();
		for (int i = 0; i < 10000000; i++){
			total += rand();
		}
		tt2 = GetTickCount();
		printf("normal cost %ldms,total = %ld\n", tt2 - tt1, total);

		srand(0);
		total = 0;
		tt1 = GetTickCount();
		for (int i = 0; i < 10000000; i++){
			mutex_test.lock();
			total += rand();
			mutex_test.unlock();
		}
		tt2 = GetTickCount();
		printf("mutex cost %ldms,total = %ld\n", tt2 - tt1, total);

		srand(0);
		total = 0;
		tt1 = GetTickCount();
		for (int i = 0; i < 10000000; i++){
			spin_test.lock();
			total += rand();
			spin_test.unlock();
		}
		tt2 = GetTickCount();
		printf("spin cost %ldms,total = %ld\n", tt2 - tt1, total);

		srand(0);
		tt1 = GetTickCount();
		for (int i = 0; i < 10000000; i++){
			a_total += rand();
		}
		tt2 = GetTickCount();
		printf("atomic cost %ldms,total = %ld\n", tt2 - tt1, total);

		srand(0);
		tt1 = GetTickCount();
		for (int i = 0; i < 10000000; i++){
			char * n = new char[1];
			delete[] n;
		}
		tt2 = GetTickCount();
		printf("malloc cost %ldms,total = %ld\n", tt2 - tt1, total);

		getchar();
	}

结果如下:

????????

最慢的是new和delete,看操作系统的实现,一般来说,new和delete都涉及到内存锁,还有维护内存链表,所以,操作很慢。

理论上最快的应该是normal,然后是atomic,接着是自旋锁,然后是mutex。

这里没有考虑多线程竞争的情况,理论上,mutex在加锁时,有系统调用,线程切换等开销。而自旋锁,只是耗费了几个空指令,实现了同步,所以,在临界区比较小的情况下,自旋锁性能更优。也有的系统优化的mutex可能比自己实现的自旋锁更好。具体看其他关于自旋锁的文章。

二、有界无锁有等待,速度极快的,多线程入/单线程出(消息队列)实现

看完上面的测试,基本可以明确优化的方向,尽可能的不要有new和delete,内存一次分配好,临界区最好使用atomic,要么使用自旋锁(临界区代码尽可能的少)

1.划分临界区

我尝试了很多很多种设计方案,最后发现,消息队列的临界条件有三个:

1.内存位置

2.内存是否被分配

3.插入位置

总结起来就是,多线程分配内存,多线程插入位置。

由于我们想要极致的性能,理论上,我们是想把分配内存和内存数据填充都放到其他线程,不影响主线程的执行。所以,代码可能像

Obj * data = mem.alloc();
*data = xxx;
push(data);

?插入的时候,只插入一个指针,这样,data的拷贝,就能放在当前线程进行,从而减少临界。

2.生产者,消费者模型(环形队列RingQueue)无锁的特点,利用该特点设计插入算法

RingQueue是一个经典的无锁队列模型,一个生产者,只修改push_pos,一个消费者只修改pop_pos,这样,这两个线程就没有临界条件了。对比加锁的队列。RingQueue有一个问题,就是慢,push线程修改了push_pos之后,pop线程可能无法及时的发现数据已发生修改,而导致慢了半拍。总的来说,RingQueue有点像信号量的设计,push_pos对于pop线程来说,就是一个信号量,只有当push线程将push_pos++之后,pop线程才能对之前的push_pos的节点进行操作,这样,我们可以在push_pos++之前,随便修改push的数据。

通过第一点,我们知道,插入的是指针,所以,应该用链表来实现这个数据结构,于是就有了push_node,和pop_node.push_node是一个临界条件,多线程访问,不加锁,想使用atomic来实现。代码如下:

template<class T>
class FastListQueue
{
public:
	struct Node
	{
		T& getObj(){
			return obj;
		}
		T obj;
		Node * next;
		bool b_needNotify = false;
		std::condition_variable * pcond = NULL;
		std::mutex * mutex_pcond = NULL;
		bool * b_poped = NULL;
		bool b_malloced = false;
	protected:
		Node * pre;	//辅助建立链表使用
		friend class FastListQueue;
	};

	FastListQueue(int count = 512) :mem(count){
		head = mem.alloc();
		head->next = NULL;
		head->pre = NULL;
		push_node.store(head);
	}
	~FastListQueue(){}

	void setMulThreadPush(bool b_tf){//是否使用多线程push 原子支持
	}

	//默认每次休眠1ms,供休眠1000次
	bool pushFor(const T &obj,int ms_sleep=1,int times = 1000){
		int push_times = 0;
		while (!push(obj)){
			push_times++;
			if (push_times < times){
				try_notify();
				std::this_thread::sleep_for(std::chrono::milliseconds(1));
			}
			else{
				return false;
			}
		}
		return true;
	}

	bool push(const T &obj){
		auto node = mem.alloc();
		if (!node){
			return false;
		}
		node->obj = obj;
		node->next = NULL;
		node->pre = NULL;
		node->b_needNotify = false;
		node->b_malloced = false;
		insertNode(node);
		return true;
	}

	bool send(const T& obj){//消息发送到队列,并等到消息执行完成后返回
		auto tnode = mem.alloc();	//fixme:由于设计的问题,不能使用栈上的内存作为节点
		if (!tnode){
			tnode = new Node();
			tnode->b_malloced = true;
		}
		else{
			tnode->b_malloced = false;
		}

		if (tnode){//把数据放到push_node节点,然后push_node指向新节点
			tnode->obj = obj;
			tnode->next = NULL;
			tnode->b_needNotify = true;

			bool b_poped = false;
			std::mutex mutex_wait;
			std::unique_lock<std::mutex> locker(mutex_wait);
			std::condition_variable cond_send_ret;
			tnode->pcond = &cond_send_ret;
			tnode->mutex_pcond = &mutex_wait;
			tnode->b_poped = &b_poped;

			insertNode(tnode);
			notify_one();//必须保证不能在休眠中
			//notify(notify_count);
			while (!b_poped)//防止操作系统伪唤醒
				cond_send_ret.wait(locker); // Unlock mu and wait to be notified
			return true;
		}
		return false;
	}
protected:
	void insertNode(Node * node){
		//mutex_pcond.lock();
		//testLock.lock();
		node->pre = push_node.load();//先把前一个节点缓存起来
		while (!push_node.compare_exchange_weak(node->pre, node)){
			node->pre = push_node.load();
		}
		//插入成功,node现在是最后一个节点,然后要让之前的节点指向现在的节点
		node->pre->next = node;//这一步之后,节点才可以被访问到
		

		//testLock.unlock();
		//mutex_pcond.unlock();
		//if (b_needNotify){
			//notify_all();
		//}
		//notify_one();
		try_notify();
	}
public:
	Node * front(){//始终指向第二个节点,链表中始终有一个节点,作为头指针节点
		return head->next;
	}
	Node * end(){
		return NULL;
	}

	void pop(){
		if (head->next != NULL){
			//testLock.lock();
			auto tnext = head->next;
			if (tnext->b_needNotify){
				std::unique_lock<std::mutex> locker(*(tnext->mutex_pcond));
				*(tnext->b_poped) = true;
				tnext->pcond->notify_one();
			}

			auto t = head->next;
			if (head->b_malloced){
				delete head;
			}
			else{
				mem.free(head);
			}
			head = t;
			//testLock.unlock();
			//head->pre = NULL;
		}
	}

	unsigned int size(){//不是很准确,当前的节点数大致在这个范围
		return mem.size() - 1;//
	}

	bool empty(){
		return front() == end();
	}

	bool IsInterrupted(){
		return b_interrupted;
	}

	void interrupt(){
		b_interrupted = true;
		pcond.notify_all();
	}

	void wait(){
		if (!b_interrupted && empty()){//尽量减少锁的次数
			std::unique_lock<std::mutex> ul(mutex_pcond);
			if (!b_interrupted && empty()){
				b_waiting = true;
				pcond.wait(ul);
				b_waiting = false;
				//std::this_thread::yield();
			}
		}
	}

	void try_notify(){//
		if (b_waiting){
			notify_one();
		}
	}

	void notify(int count){
		if (size() <= count){
			std::unique_lock<std::mutex> ul(mutex_pcond);
			pcond.notify_all();
		}
	}

	void notify_one(){
		std::unique_lock<std::mutex> ul(mutex_pcond);
		pcond.notify_one();
	}
	void notify_all(){
		std::unique_lock<std::mutex> ul(mutex_pcond);
		pcond.notify_all();
	}

protected:
	//LockRingMem<Node> mem;
	MultiNoLockRingMem<Node> mem;
	//NoLockMem<Node> mem;
	Node * head;
	std::atomic<Node *> push_node;
	//SpinHybridLock testLock;
	std::mutex testLock;

	bool b_interrupted = false;
	bool b_waiting = false;
	std::condition_variable pcond;
	std::mutex mutex_pcond;
};

关键的代码在insertNode的实现上,因为插入要干两件事,一是修改之前的尾指针节点,二是修改新的push_node的位置,这两个指令都会有多线程的竞争的问题。为了解决这个竞争的问题,用了一个pre头指针,每次尝试插入的时候,都会将头指针指向当前的push_node的位置,然后使用原子CAS的操作,如果操作成功,那么node->pre就是之前的push_node那个节点了。此时,我们再将node->pre->next?指向自己。那么pop线程就能访问到node节点了(这个操作有点像RingQueue里的push_pos++,next就是pop线程的信号量)。这样,我们只用了一个atomic。

再接着就是内存分配的问题了,需要预先分配内存

三、根据环形缓冲区设计的内存管理

本来以为会很简单,结果遇到了很多的问题,最终的结果如下:

template<class T>
class NoLockRingMem{
public:
	struct Node:public T{
		//volatile bool b_freed;
		std::atomic<bool> b_freed;
	};

	NoLockRingMem(int count)
	{
		push_pos = 0;
		pop_pos = 0;
		pool_count = count;
		pool = new Node[count];
		for (int i = 0; i < count; i++){
			pool[i].b_freed = true;
		}
	}
	~NoLockRingMem(){
		delete[] pool;
	}

	T * alloc(){
		do{
			unsigned int pos = push_pos.load();
			unsigned int tpos = pos;
			if (!PushPosValid(tpos) || !pool[pos].b_freed){//缓冲区已满
				//if (size() >= (pool_count - 1) || !pool[pos].b_freed){
				//lock_push.unlock();
				return NULL;
			}
			bool b_freed = true;
			if (pool[pos].b_freed.compare_exchange_weak(b_freed, false)){
				auto ret = &pool[pos];
				while (!push_pos.compare_exchange_weak(pos, (pos + 1) % pool_count)){
					pos = push_pos.load();
				}
				return ret;
			}

		} while (true);
		return NULL;
	}

	void free(T * n){//free一般不存在竞争
		//delete n;
		//return;
		
		Node * tn = (Node *)n;
		assert(!tn->b_freed);
		//printf("free pos = %d\n", tn->pos);
		//tn->lock.lock();
		tn->b_freed = true;
		//tn->lock.unlock();
		//lock_pop.lock();
		//unsigned int tpos = pop_pos.load();
		//while (!pop_pos.compare_exchange_weak(tpos, (tpos + 1) % pool_count)){
			//tpos = pop_pos.load();
		//}
		pop_pos = (pop_pos + 1) % pool_count;
		//lock_pop.unlock();
	}

	int size(){
		int s = push_pos - pop_pos;
		if (s < 0){
			s = s + pool_count;
		}
		return s;
	}

	int idle(){//环形缓冲区最多只能放置count - 1 个
		return pool_count - 1 - size();
	}

	T * findNodeByIndex(int i){
		return (T *)&pool[i];
	}

protected:
	bool PushPosValid(unsigned int pos){
		if (((pos + 1) % pool_count) == pop_pos)
			return false;
		return true;
	}



protected:
	Node * pool = NULL;
	std::atomic<unsigned int> push_pos;
	//std::atomic<unsigned int> pop_pos;
	//SpinHybridLock lock_pop;
	//SpinHybridLock lock_push;
	unsigned int pop_pos;
	int pool_count;
};

template<class T>
class MultiNoLockRingMem{//一个NoLockRing可能由于一块缓冲没有释放而阻塞,多个NoLockRing就不会由于一块内存而阻塞
public:
	struct MNode :public T{
		int pos_mem;
	};
#if 1
	MultiNoLockRingMem(int count){
		int acount = (count + 4) / 4;
		for (int i = 0; i < 4; i++){
			mems[i] = new NoLockRingMem<MNode>(acount);
			for (int j = 0; j < acount; j++){
				auto n = mems[i]->findNodeByIndex(j);
				n->pos_mem = i;
			}
		}
	}
	~MultiNoLockRingMem(){
		for (int i = 0; i < 4; i++){
			delete mems[i];
		}
	}

	T * alloc(){
		//do{
		//for (int k = 0; k < 10; k++){
			for (int i = 0; i < 4; i++){
				MNode * node = mems[i]->alloc();

				if (node){
					//node->pos_mem = i;
					return node;
				}
			}
			//break;
		//}
		//while (true);
		return NULL;
	}
	void free(T * n){//free一般不存在竞争
		MNode * tn = (MNode *)n;
		mems[tn->pos_mem]->free(tn);
	}
	int size(){
		int total = 0;
		for (int i = 0; i < 4; i++){
			total += mems[i]->size();
		}
		return total;
	}
#else
	MultiNoLockRingMem(int count):mems1(count/2),mems2(count/2){
		for (int i = 0; i < count/2; i++){
			auto tn = mems1.findNodeByIndex(i);
			auto tn2 = mems2.findNodeByIndex(i);
			tn->pos_mem = 0;
			tn2->pos_mem = 1;
		}
	}

	T * alloc(){
		auto ret = mems1.alloc();
		if (!ret){
			ret = mems2.alloc();
		}
		return ret;
	}

	void free(T * n){
		Node * tn = (Node *)n;
		if (tn->pos_mem){
			mems2.free(tn);
		}
		else{
			mems1.free(tn);
		}
	}

	int size(){
		return mems1.size() + mems2.size();
	}

#endif

protected:
	NoLockRingMem<MNode> * mems[4];
	//NoLockRingMem<MNode> mems1;
	//NoLockRingMem<MNode> mems2;
};

由于种种原因,FastListQueue需要持有一个对象,这样导致一个环形缓冲区内存分配不够用,后面的释放了,前面的没释放,导致卡死。所以引入了多个环形缓冲分配。

还有CAS的使用问题,原本以为,谁改变了push_pos,谁就能操作那块内存,实际测试中发现,有可能他改变了push_pos,但是没修改b_freed,然后切换线程了,导致其他线程拿到了同一块内存的现象。所以,用了两个原子量。

四、为什么使用环形队列做内存分配

我也写了很多的内存分配方案,也测试了,包括带锁的,无锁for循环查找的。

测试下来,最优的还是这个方案,或者还有其他方案我没想到的?

感觉像:环形队列,只需要判断push位置的内存是否能够使用,然后竞争那块内存就够了,这样少了很多的判断条件,然后就更快了。实际上,这个优化带来的提升,就像从使用自旋锁变成了atomic,实际并不能感受到有多大的提升。反而,代码更复杂难懂了....

五、测试代码,以及测试结果

//测试多线程队列的性能,
	while(1){
		b_continue = false;
		srand(0);
		long tt1 = GetTickCount();
		int t = 0;
		
		for (int i = 0; i < 100000; i++){
			for (int j = 0; j < 4; j++)
				t += (j+1);
		}
		long tt2 = GetTickCount();
		printf("normal result=%d cost %ld\n",t, tt2 - tt1);

		global_count = 0;
		total_exec_count = 0;
		lt.start();
		srand(0);
		tt1 = GetTickCount();
		std::thread t1([]{
			printf("1 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
			for (int i = 0; i < 1000000; i++){
				//printf("add 1\n");
				int r = rand();
				lt.getLooper()->post([=]{
					global_count += 1;
					total_exec_count++;
				});
				//Sleep(1);
			}
			printf("t1 exit\n");
		});
		std::thread t2([]{
			printf("2 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
			for (int i = 0; i < 1000000; i++){
				//printf("add 2\n");
				int r = rand();
				lt.getLooper()->post([=]{
					global_count += 2;
					total_exec_count++;
				});
				//Sleep(1);
			}
			printf("t2 exit\n");
		});
		std::thread t3([]{
			printf("3 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
			for (int i = 0; i < 1000000; i++){
				//printf("add 3\n");
				int r = rand();
				lt.getLooper()->post([=]{
					global_count += 3;
					total_exec_count++;
				});
				//Sleep(1);
			}
			printf("t3 exit\n");
		});
		std::thread t4([]{
			printf("4 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
			for (int i = 0; i < 1000000; i++){
				//printf("add 4\n");
				int r = rand();
				lt.getLooper()->post([=]{
					global_count += 4;
					total_exec_count++;
				});
				//Sleep(1);
			}
			printf("t4 exit\n");
		});

		t1.join();
		t2.join();
		t3.join();
		t4.join(); 
		printf("total:\n");
		auto f = [=]{
			long tt2 = GetTickCount();
			printf("total = %ld,exec_count = %ld total cost %ldms\n", global_count,total_exec_count, tt2 - tt1);
			if (global_count == 10000000){
				b_continue = true;
			}
		};
		lt.getLooper()->post(f);
		lt.getLooper()->post(f);
		//getchar();
		lt.getLooper()->post(f);
		//getchar();
		while (!b_continue){
			Sleep(1);
		}
	}

这里四个线程总共push了4000000个加法操作,实际用时最低的少于600ms

template<class T>
class NoLockMem{
public:
	struct Node :public T{
		std::atomic<bool> b_freed;
	};

	NoLockMem(int count){
		pool_count = count;
		pool = new Node[count];
		for (int i = 0; i < count; i++){
			pool[i].b_freed = true;
		}
	}
	~NoLockMem(){
		delete[] pool;
	}

	T * alloc(){
		for (int i = 0; i < pool_count; i++){
			if (pool[i].b_freed){
				bool b_freed = true;
				if (pool[i].b_freed.compare_exchange_weak(b_freed, false)){
					return (T *)&pool[i];
				}
			}
		}
		return NULL;
	}
	void free(T * n){
		Node * tn = (Node *)n;
		tn->b_freed = true;
	}

	int size(){
		return 0;
	}

protected:
	int pool_count;
	Node * pool;
};

?使用遍历查找已释放的内存,结果如下,最快没低于过600ms

?使用new和delete实现的无界队列,结果如下:

?虽然无界无等待,其他四个线程早早的就退出了,但是,主线程delete太耗时,导致操作到了100s。

五、有锁内存分配,参考实现

template<class T>
class LockRingMem{
public:
	struct Node :public T{
		int pos;
		volatile bool b_freed;
		SpinHybridLock lock;
	};
	LockRingMem(int count){
		push_pos = 0;
		pop_pos = 0;
		pool_count = count;
		pool = new Node[count];
		for (int i = 0; i < count; i++){
			pool[i].pos = i;
			pool[i].b_freed = true;
		}
	}
	~LockRingMem(){
		delete[] pool;
	}

	T * alloc(){
#if 0
		do{
			for (int i = 0; i < pool_count; i++){
				Node * t = &pool[i];
				if (t->b_freed){
					if (t->lock.try_lock()){
						if (t->b_freed){
							t->b_freed = false;
							t->lock.unlock();
							push_pos = (push_pos + 1) % pool_count;
							return t;
						}
						else{
							t->lock.unlock();
						}
					}
				}
			}
			std::this_thread::yield();
		}
		while (true);
#endif

		do{
			auto pos = push_pos.load();
			//auto npos = (pos + 1) % pool_count;
			//for (; npos != pop_pos; ){
			for (int i = 0; i < 8; i++){//通常跟核心数有关
				Node * t = &pool[pos];//从pos开始查找,速度更快
				pos = (pos + 1) % pool_count;
				//pos = npos;
				//npos = (npos + 1) % pool_count;
				if (t->b_freed){
					if (t->lock.try_lock()){
						if (t->b_freed){
							t->b_freed = false;
							t->lock.unlock();
							push_pos = (push_pos + 1) % pool_count;
							return t;
						}
						else{
							t->lock.unlock();
						}
					}
				}
			}
			//std::this_thread::yield();
		} while (false);
		return NULL;
	}
	void free(T * n){//free一般不存在竞争
		Node * tn = (Node *)n;
		tn->b_freed = true;
		pop_pos = (pop_pos + 1) % pool_count;
	}

	int size(){
		int s = push_pos - pop_pos;
		if (s < 0){
			s = s + pool_count;
		}
		return s;
	}

	int idle(){//环形缓冲区最多只能放置count - 1 个
		return pool_count - 1 - size();
	}

	T * findNodeByIndex(int i){
		return (T *)&pool[i];
	}

protected:
	Node * pool = NULL;
	std::atomic<unsigned int> push_pos;
	unsigned int pop_pos;
	int pool_count;
};

实际测试,和无锁版差别不大。

  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2022-01-04 13:14:05  更:2022-01-04 13:15:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/9 15:25:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码