一、对new(delete),mutex,自旋锁,std::atomic的性能测试
在windows下 测试 release版本的如下代码:10000000个rand()相加。
{//速度测试
long tt1, tt2;
long total = 0;
std::mutex mutex_test;
SpinHybridLock spin_test;
std::atomic<int> a_total = 0;
srand(0);
total = 0;
tt1 = GetTickCount();
for (int i = 0; i < 10000000; i++){
total += rand();
}
tt2 = GetTickCount();
printf("normal cost %ldms,total = %ld\n", tt2 - tt1, total);
srand(0);
total = 0;
tt1 = GetTickCount();
for (int i = 0; i < 10000000; i++){
mutex_test.lock();
total += rand();
mutex_test.unlock();
}
tt2 = GetTickCount();
printf("mutex cost %ldms,total = %ld\n", tt2 - tt1, total);
srand(0);
total = 0;
tt1 = GetTickCount();
for (int i = 0; i < 10000000; i++){
spin_test.lock();
total += rand();
spin_test.unlock();
}
tt2 = GetTickCount();
printf("spin cost %ldms,total = %ld\n", tt2 - tt1, total);
srand(0);
tt1 = GetTickCount();
for (int i = 0; i < 10000000; i++){
a_total += rand();
}
tt2 = GetTickCount();
printf("atomic cost %ldms,total = %ld\n", tt2 - tt1, total);
srand(0);
tt1 = GetTickCount();
for (int i = 0; i < 10000000; i++){
char * n = new char[1];
delete[] n;
}
tt2 = GetTickCount();
printf("malloc cost %ldms,total = %ld\n", tt2 - tt1, total);
getchar();
}
结果如下:
????????
最慢的是new和delete,看操作系统的实现,一般来说,new和delete都涉及到内存锁,还有维护内存链表,所以,操作很慢。
理论上最快的应该是normal,然后是atomic,接着是自旋锁,然后是mutex。
这里没有考虑多线程竞争的情况,理论上,mutex在加锁时,有系统调用,线程切换等开销。而自旋锁,只是耗费了几个空指令,实现了同步,所以,在临界区比较小的情况下,自旋锁性能更优。也有的系统优化的mutex可能比自己实现的自旋锁更好。具体看其他关于自旋锁的文章。
二、有界无锁有等待,速度极快的,多线程入/单线程出(消息队列)实现
看完上面的测试,基本可以明确优化的方向,尽可能的不要有new和delete,内存一次分配好,临界区最好使用atomic,要么使用自旋锁(临界区代码尽可能的少)
1.划分临界区
我尝试了很多很多种设计方案,最后发现,消息队列的临界条件有三个:
1.内存位置
2.内存是否被分配
3.插入位置
总结起来就是,多线程分配内存,多线程插入位置。
由于我们想要极致的性能,理论上,我们是想把分配内存和内存数据填充都放到其他线程,不影响主线程的执行。所以,代码可能像
Obj * data = mem.alloc();
*data = xxx;
push(data);
?插入的时候,只插入一个指针,这样,data的拷贝,就能放在当前线程进行,从而减少临界。
2.生产者,消费者模型(环形队列RingQueue)无锁的特点,利用该特点设计插入算法
RingQueue是一个经典的无锁队列模型,一个生产者,只修改push_pos,一个消费者只修改pop_pos,这样,这两个线程就没有临界条件了。对比加锁的队列。RingQueue有一个问题,就是慢,push线程修改了push_pos之后,pop线程可能无法及时的发现数据已发生修改,而导致慢了半拍。总的来说,RingQueue有点像信号量的设计,push_pos对于pop线程来说,就是一个信号量,只有当push线程将push_pos++之后,pop线程才能对之前的push_pos的节点进行操作,这样,我们可以在push_pos++之前,随便修改push的数据。
通过第一点,我们知道,插入的是指针,所以,应该用链表来实现这个数据结构,于是就有了push_node,和pop_node.push_node是一个临界条件,多线程访问,不加锁,想使用atomic来实现。代码如下:
template<class T>
class FastListQueue
{
public:
struct Node
{
T& getObj(){
return obj;
}
T obj;
Node * next;
bool b_needNotify = false;
std::condition_variable * pcond = NULL;
std::mutex * mutex_pcond = NULL;
bool * b_poped = NULL;
bool b_malloced = false;
protected:
Node * pre; //辅助建立链表使用
friend class FastListQueue;
};
FastListQueue(int count = 512) :mem(count){
head = mem.alloc();
head->next = NULL;
head->pre = NULL;
push_node.store(head);
}
~FastListQueue(){}
void setMulThreadPush(bool b_tf){//是否使用多线程push 原子支持
}
//默认每次休眠1ms,供休眠1000次
bool pushFor(const T &obj,int ms_sleep=1,int times = 1000){
int push_times = 0;
while (!push(obj)){
push_times++;
if (push_times < times){
try_notify();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
else{
return false;
}
}
return true;
}
bool push(const T &obj){
auto node = mem.alloc();
if (!node){
return false;
}
node->obj = obj;
node->next = NULL;
node->pre = NULL;
node->b_needNotify = false;
node->b_malloced = false;
insertNode(node);
return true;
}
bool send(const T& obj){//消息发送到队列,并等到消息执行完成后返回
auto tnode = mem.alloc(); //fixme:由于设计的问题,不能使用栈上的内存作为节点
if (!tnode){
tnode = new Node();
tnode->b_malloced = true;
}
else{
tnode->b_malloced = false;
}
if (tnode){//把数据放到push_node节点,然后push_node指向新节点
tnode->obj = obj;
tnode->next = NULL;
tnode->b_needNotify = true;
bool b_poped = false;
std::mutex mutex_wait;
std::unique_lock<std::mutex> locker(mutex_wait);
std::condition_variable cond_send_ret;
tnode->pcond = &cond_send_ret;
tnode->mutex_pcond = &mutex_wait;
tnode->b_poped = &b_poped;
insertNode(tnode);
notify_one();//必须保证不能在休眠中
//notify(notify_count);
while (!b_poped)//防止操作系统伪唤醒
cond_send_ret.wait(locker); // Unlock mu and wait to be notified
return true;
}
return false;
}
protected:
void insertNode(Node * node){
//mutex_pcond.lock();
//testLock.lock();
node->pre = push_node.load();//先把前一个节点缓存起来
while (!push_node.compare_exchange_weak(node->pre, node)){
node->pre = push_node.load();
}
//插入成功,node现在是最后一个节点,然后要让之前的节点指向现在的节点
node->pre->next = node;//这一步之后,节点才可以被访问到
//testLock.unlock();
//mutex_pcond.unlock();
//if (b_needNotify){
//notify_all();
//}
//notify_one();
try_notify();
}
public:
Node * front(){//始终指向第二个节点,链表中始终有一个节点,作为头指针节点
return head->next;
}
Node * end(){
return NULL;
}
void pop(){
if (head->next != NULL){
//testLock.lock();
auto tnext = head->next;
if (tnext->b_needNotify){
std::unique_lock<std::mutex> locker(*(tnext->mutex_pcond));
*(tnext->b_poped) = true;
tnext->pcond->notify_one();
}
auto t = head->next;
if (head->b_malloced){
delete head;
}
else{
mem.free(head);
}
head = t;
//testLock.unlock();
//head->pre = NULL;
}
}
unsigned int size(){//不是很准确,当前的节点数大致在这个范围
return mem.size() - 1;//
}
bool empty(){
return front() == end();
}
bool IsInterrupted(){
return b_interrupted;
}
void interrupt(){
b_interrupted = true;
pcond.notify_all();
}
void wait(){
if (!b_interrupted && empty()){//尽量减少锁的次数
std::unique_lock<std::mutex> ul(mutex_pcond);
if (!b_interrupted && empty()){
b_waiting = true;
pcond.wait(ul);
b_waiting = false;
//std::this_thread::yield();
}
}
}
void try_notify(){//
if (b_waiting){
notify_one();
}
}
void notify(int count){
if (size() <= count){
std::unique_lock<std::mutex> ul(mutex_pcond);
pcond.notify_all();
}
}
void notify_one(){
std::unique_lock<std::mutex> ul(mutex_pcond);
pcond.notify_one();
}
void notify_all(){
std::unique_lock<std::mutex> ul(mutex_pcond);
pcond.notify_all();
}
protected:
//LockRingMem<Node> mem;
MultiNoLockRingMem<Node> mem;
//NoLockMem<Node> mem;
Node * head;
std::atomic<Node *> push_node;
//SpinHybridLock testLock;
std::mutex testLock;
bool b_interrupted = false;
bool b_waiting = false;
std::condition_variable pcond;
std::mutex mutex_pcond;
};
关键的代码在insertNode的实现上,因为插入要干两件事,一是修改之前的尾指针节点,二是修改新的push_node的位置,这两个指令都会有多线程的竞争的问题。为了解决这个竞争的问题,用了一个pre头指针,每次尝试插入的时候,都会将头指针指向当前的push_node的位置,然后使用原子CAS的操作,如果操作成功,那么node->pre就是之前的push_node那个节点了。此时,我们再将node->pre->next?指向自己。那么pop线程就能访问到node节点了(这个操作有点像RingQueue里的push_pos++,next就是pop线程的信号量)。这样,我们只用了一个atomic。
再接着就是内存分配的问题了,需要预先分配内存
三、根据环形缓冲区设计的内存管理
本来以为会很简单,结果遇到了很多的问题,最终的结果如下:
template<class T>
class NoLockRingMem{
public:
struct Node:public T{
//volatile bool b_freed;
std::atomic<bool> b_freed;
};
NoLockRingMem(int count)
{
push_pos = 0;
pop_pos = 0;
pool_count = count;
pool = new Node[count];
for (int i = 0; i < count; i++){
pool[i].b_freed = true;
}
}
~NoLockRingMem(){
delete[] pool;
}
T * alloc(){
do{
unsigned int pos = push_pos.load();
unsigned int tpos = pos;
if (!PushPosValid(tpos) || !pool[pos].b_freed){//缓冲区已满
//if (size() >= (pool_count - 1) || !pool[pos].b_freed){
//lock_push.unlock();
return NULL;
}
bool b_freed = true;
if (pool[pos].b_freed.compare_exchange_weak(b_freed, false)){
auto ret = &pool[pos];
while (!push_pos.compare_exchange_weak(pos, (pos + 1) % pool_count)){
pos = push_pos.load();
}
return ret;
}
} while (true);
return NULL;
}
void free(T * n){//free一般不存在竞争
//delete n;
//return;
Node * tn = (Node *)n;
assert(!tn->b_freed);
//printf("free pos = %d\n", tn->pos);
//tn->lock.lock();
tn->b_freed = true;
//tn->lock.unlock();
//lock_pop.lock();
//unsigned int tpos = pop_pos.load();
//while (!pop_pos.compare_exchange_weak(tpos, (tpos + 1) % pool_count)){
//tpos = pop_pos.load();
//}
pop_pos = (pop_pos + 1) % pool_count;
//lock_pop.unlock();
}
int size(){
int s = push_pos - pop_pos;
if (s < 0){
s = s + pool_count;
}
return s;
}
int idle(){//环形缓冲区最多只能放置count - 1 个
return pool_count - 1 - size();
}
T * findNodeByIndex(int i){
return (T *)&pool[i];
}
protected:
bool PushPosValid(unsigned int pos){
if (((pos + 1) % pool_count) == pop_pos)
return false;
return true;
}
protected:
Node * pool = NULL;
std::atomic<unsigned int> push_pos;
//std::atomic<unsigned int> pop_pos;
//SpinHybridLock lock_pop;
//SpinHybridLock lock_push;
unsigned int pop_pos;
int pool_count;
};
template<class T>
class MultiNoLockRingMem{//一个NoLockRing可能由于一块缓冲没有释放而阻塞,多个NoLockRing就不会由于一块内存而阻塞
public:
struct MNode :public T{
int pos_mem;
};
#if 1
MultiNoLockRingMem(int count){
int acount = (count + 4) / 4;
for (int i = 0; i < 4; i++){
mems[i] = new NoLockRingMem<MNode>(acount);
for (int j = 0; j < acount; j++){
auto n = mems[i]->findNodeByIndex(j);
n->pos_mem = i;
}
}
}
~MultiNoLockRingMem(){
for (int i = 0; i < 4; i++){
delete mems[i];
}
}
T * alloc(){
//do{
//for (int k = 0; k < 10; k++){
for (int i = 0; i < 4; i++){
MNode * node = mems[i]->alloc();
if (node){
//node->pos_mem = i;
return node;
}
}
//break;
//}
//while (true);
return NULL;
}
void free(T * n){//free一般不存在竞争
MNode * tn = (MNode *)n;
mems[tn->pos_mem]->free(tn);
}
int size(){
int total = 0;
for (int i = 0; i < 4; i++){
total += mems[i]->size();
}
return total;
}
#else
MultiNoLockRingMem(int count):mems1(count/2),mems2(count/2){
for (int i = 0; i < count/2; i++){
auto tn = mems1.findNodeByIndex(i);
auto tn2 = mems2.findNodeByIndex(i);
tn->pos_mem = 0;
tn2->pos_mem = 1;
}
}
T * alloc(){
auto ret = mems1.alloc();
if (!ret){
ret = mems2.alloc();
}
return ret;
}
void free(T * n){
Node * tn = (Node *)n;
if (tn->pos_mem){
mems2.free(tn);
}
else{
mems1.free(tn);
}
}
int size(){
return mems1.size() + mems2.size();
}
#endif
protected:
NoLockRingMem<MNode> * mems[4];
//NoLockRingMem<MNode> mems1;
//NoLockRingMem<MNode> mems2;
};
由于种种原因,FastListQueue需要持有一个对象,这样导致一个环形缓冲区内存分配不够用,后面的释放了,前面的没释放,导致卡死。所以引入了多个环形缓冲分配。
还有CAS的使用问题,原本以为,谁改变了push_pos,谁就能操作那块内存,实际测试中发现,有可能他改变了push_pos,但是没修改b_freed,然后切换线程了,导致其他线程拿到了同一块内存的现象。所以,用了两个原子量。
四、为什么使用环形队列做内存分配
我也写了很多的内存分配方案,也测试了,包括带锁的,无锁for循环查找的。
测试下来,最优的还是这个方案,或者还有其他方案我没想到的?
感觉像:环形队列,只需要判断push位置的内存是否能够使用,然后竞争那块内存就够了,这样少了很多的判断条件,然后就更快了。实际上,这个优化带来的提升,就像从使用自旋锁变成了atomic,实际并不能感受到有多大的提升。反而,代码更复杂难懂了....
五、测试代码,以及测试结果
//测试多线程队列的性能,
while(1){
b_continue = false;
srand(0);
long tt1 = GetTickCount();
int t = 0;
for (int i = 0; i < 100000; i++){
for (int j = 0; j < 4; j++)
t += (j+1);
}
long tt2 = GetTickCount();
printf("normal result=%d cost %ld\n",t, tt2 - tt1);
global_count = 0;
total_exec_count = 0;
lt.start();
srand(0);
tt1 = GetTickCount();
std::thread t1([]{
printf("1 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
for (int i = 0; i < 1000000; i++){
//printf("add 1\n");
int r = rand();
lt.getLooper()->post([=]{
global_count += 1;
total_exec_count++;
});
//Sleep(1);
}
printf("t1 exit\n");
});
std::thread t2([]{
printf("2 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
for (int i = 0; i < 1000000; i++){
//printf("add 2\n");
int r = rand();
lt.getLooper()->post([=]{
global_count += 2;
total_exec_count++;
});
//Sleep(1);
}
printf("t2 exit\n");
});
std::thread t3([]{
printf("3 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
for (int i = 0; i < 1000000; i++){
//printf("add 3\n");
int r = rand();
lt.getLooper()->post([=]{
global_count += 3;
total_exec_count++;
});
//Sleep(1);
}
printf("t3 exit\n");
});
std::thread t4([]{
printf("4 total = %ld,exec_count = %ld\n", global_count, total_exec_count);
for (int i = 0; i < 1000000; i++){
//printf("add 4\n");
int r = rand();
lt.getLooper()->post([=]{
global_count += 4;
total_exec_count++;
});
//Sleep(1);
}
printf("t4 exit\n");
});
t1.join();
t2.join();
t3.join();
t4.join();
printf("total:\n");
auto f = [=]{
long tt2 = GetTickCount();
printf("total = %ld,exec_count = %ld total cost %ldms\n", global_count,total_exec_count, tt2 - tt1);
if (global_count == 10000000){
b_continue = true;
}
};
lt.getLooper()->post(f);
lt.getLooper()->post(f);
//getchar();
lt.getLooper()->post(f);
//getchar();
while (!b_continue){
Sleep(1);
}
}
这里四个线程总共push了4000000个加法操作,实际用时最低的少于600ms
template<class T>
class NoLockMem{
public:
struct Node :public T{
std::atomic<bool> b_freed;
};
NoLockMem(int count){
pool_count = count;
pool = new Node[count];
for (int i = 0; i < count; i++){
pool[i].b_freed = true;
}
}
~NoLockMem(){
delete[] pool;
}
T * alloc(){
for (int i = 0; i < pool_count; i++){
if (pool[i].b_freed){
bool b_freed = true;
if (pool[i].b_freed.compare_exchange_weak(b_freed, false)){
return (T *)&pool[i];
}
}
}
return NULL;
}
void free(T * n){
Node * tn = (Node *)n;
tn->b_freed = true;
}
int size(){
return 0;
}
protected:
int pool_count;
Node * pool;
};
?使用遍历查找已释放的内存,结果如下,最快没低于过600ms
?使用new和delete实现的无界队列,结果如下:
?虽然无界无等待,其他四个线程早早的就退出了,但是,主线程delete太耗时,导致操作到了100s。
五、有锁内存分配,参考实现
template<class T>
class LockRingMem{
public:
struct Node :public T{
int pos;
volatile bool b_freed;
SpinHybridLock lock;
};
LockRingMem(int count){
push_pos = 0;
pop_pos = 0;
pool_count = count;
pool = new Node[count];
for (int i = 0; i < count; i++){
pool[i].pos = i;
pool[i].b_freed = true;
}
}
~LockRingMem(){
delete[] pool;
}
T * alloc(){
#if 0
do{
for (int i = 0; i < pool_count; i++){
Node * t = &pool[i];
if (t->b_freed){
if (t->lock.try_lock()){
if (t->b_freed){
t->b_freed = false;
t->lock.unlock();
push_pos = (push_pos + 1) % pool_count;
return t;
}
else{
t->lock.unlock();
}
}
}
}
std::this_thread::yield();
}
while (true);
#endif
do{
auto pos = push_pos.load();
//auto npos = (pos + 1) % pool_count;
//for (; npos != pop_pos; ){
for (int i = 0; i < 8; i++){//通常跟核心数有关
Node * t = &pool[pos];//从pos开始查找,速度更快
pos = (pos + 1) % pool_count;
//pos = npos;
//npos = (npos + 1) % pool_count;
if (t->b_freed){
if (t->lock.try_lock()){
if (t->b_freed){
t->b_freed = false;
t->lock.unlock();
push_pos = (push_pos + 1) % pool_count;
return t;
}
else{
t->lock.unlock();
}
}
}
}
//std::this_thread::yield();
} while (false);
return NULL;
}
void free(T * n){//free一般不存在竞争
Node * tn = (Node *)n;
tn->b_freed = true;
pop_pos = (pop_pos + 1) % pool_count;
}
int size(){
int s = push_pos - pop_pos;
if (s < 0){
s = s + pool_count;
}
return s;
}
int idle(){//环形缓冲区最多只能放置count - 1 个
return pool_count - 1 - size();
}
T * findNodeByIndex(int i){
return (T *)&pool[i];
}
protected:
Node * pool = NULL;
std::atomic<unsigned int> push_pos;
unsigned int pop_pos;
int pool_count;
};
实际测试,和无锁版差别不大。
|