高并发内存池整体框架设计
现在很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身就已经很优秀了,那么我们的项目原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池要考虑几个方面。
concurrent memory pool主要由下面3个部分组成:
- Thread Cahce:每个线程都要自己独立的线程缓存,用于小于256KB的内存分配在这里申请内存不需要加锁。
- Central Cache:程序中只要一个中心缓存,多个线程共享。thread cache是从这里按需获取对象。central cache具有回收机制,当一个线程中thread cache占用太多内存时,central cache会回收内存,避免其他线程对内存吃紧,达到了内存在分配多个线程时更均衡的按需调度的目的。central cache是多个线程共享的,所以需要锁竞争。但central cache是用的桶锁,所以竞争不是很激烈。
- Paga Cache:存储的内存是以页为单位存储及分配给central cache,当central cache没有对象是,就从paga cache按需拿一定数量的paga,并切割成固定大小的小内存块给central cache。pagan cache也有回收机制,paga cache会回收已经满足条件的central cache中的span对象,并且合并相邻的页,合成一个大页,缓解了内存碎片的问题。
申请内存部分
高并发内存池-Thread Cache
thread cache是哈希桶结构,每个桶的位置映射的是内存块对象的自由链表。每个线程都会有一个thread cache对象,在这个对象中获取和释放都是无锁的。
申请内存:
- 当内存申请size<=256KB,先获取到线程本地的thread cache对象,计算size映射的哈希桶自由链表的下标。
- 如果自由链表中有对象,则直接获取一个内存对象返回
- 如果自由链表在没有对象,则去central cache中获取一定数量的对象,插入到自由链表中
框架:
管理小对象的自由链表
static const NFREE_LIST=208;
static const MAX_SIZE=256*1024;
static const PAGE_SHIFT=13;
static const PAGA_LIST=129;
#ifdef _WIN64
typedef size_t PageID;
#elif _WIN32
typedef unsigned long long PageID;
#endif
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
class FreeList
{
public:
FreeList():_freelist(nullptr),_count(0),_size(1)
{}
~FreeList(){}
void Push(void* obj){
assert(obj);
(*(void**))obj=_freelist;
_freelist=obj;
_count++;
}
void PushRange(void* start,void* end,size_t sum){
assert(start&&end);
*((void**)end)=_freelist;
_freelist=start;
_count+=sum;
}
void PopRange(void*& start,void*& end,size_t sum)
{
start=_freelist;
end=start;
for(size_t i=0;i<sum-1;i++){
end=*(void**)end;
}
_freelist=*(void**)end;
*(void**)end=nullptr;
_count-=sum;
}
void* Pop(){
assert(_freelist);
void*obj=_freelist;
_freelist=(*(void**))_freelist;
(*(void**))obj=nullptr;
_count--;
return obj;
}
bool Empty(){
return _freelist==nullptr;
}
size_t Count(){
return _count;
}
size_t& MaxSize(){
return _size;
}
private:
void* _freelist;
size_t _count;
size_t _size;
};
对齐规则
我们最小都要用8字节对齐,因为我们要用链表把这些小内存块连接起来。当字节<=256KB时,就向thread cache申请。
如果我们都用8字节对齐,那么桶的个数就是32768个桶,太多了,没有必要。
所以、我们采用
整体控制在最多10%左右的内碎片浪费
[1,128] 8byte对齐 freelist[0,16) 16个桶
[128+1,1024] 16byte对齐 freelist[16,72) 56个桶
[1024+1,8*1024] 128byte对齐 freelist[72,128) 56个桶
[8*1024+1,64*1024] 1024byte对齐 freelist[128,184) 56个桶
[64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) 24个桶
总共 208个桶
例如:当我们需要9字节的内存时,thread cache给我们16字节
当我们需要129字节的内存时,threadcache给我们144字节
造成的内碎片控制在10%左右,且内碎片可以被下次申请时利用。
class SizeClass
{
public:
普通人写法
static inline size_t _RoundUp(size_t size,size_t alignNum){
size_t align;
if(size%alignNum!=0){
align=(size/alignNum+1)*alignNum;
return align;
}
else{
align=size;
}
return align;
}
大佬写法
static inline size_t _RoundUp(size_t size, size_t alignNum)
{
return ((size+ alignNum - 1) & ~(alignNum - 1));
}
多次调用的函数使用静态,该函数使用内联
static inline size_t RoundUp(size_t size){
if(size<=128){
return _RoundUp(size,8);
}
else if(size>128 && size<=1024){
return _RoundUp(size,16);
}
else if(size>1024 && size<=8*1024){
return _RoundUp(size,128);
}
else if(size>8*1024 && size<=64*1024){
return _RoundUp(size,1024);
}
else if(size>64*1024 && size<=256*1024){
return _RoundUp(size,8*1024);
}
else if(size>256*1024){
return _RoundUp(size,8*1024);
}
else{
assert(false);
return -1;
}
}
普通人写法
static inline size_t _Index(size_t size,size_t alignNum){
if(size % alignNum==0){
return size/alignNum-1;
}
else{
return size/alignNum;
}
}
大佬写法
static inline size_t _Index(size_t size, size_t alignNum)
{
return ((size + (1 << alignNum) - 1) >> alignNum) - 1;
}
映射的桶号
static inline size_t Index(size_t size){
assert(size<=MAX_SIZE);
static int group_array[4]={16,56,56,56};
if (size <= 128){
return _Index(size, 3);
}
else if (size <= 1024){
return _Index(size - 128, 4) + group_array[0];
}
else if (size <= 8 * 1024){
return _Index(size - 1024, 7) + group_array[1] + group_array[0];
}
else if (size <= 64 * 1024){
return _Index(size - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (size <= 256 * 1024){
return _Index(size - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else{
assert(false);
return -1;
}
}
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
static size_t NumMovePaga(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num*size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
class ThreadCache
{
public:
void* Allocate(size_t size);
void* FetchFromCentralCache(size_t index, size_t alignsize);
private:
FreeList freelist[NFREE_LIST];
};
每个线程获取自己独立的Thead Cache
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
Allocate:
void* Allocate(size_t size)
{
assert(size<=MAX_SIZE);
size_t alignsize=RoundUp(size);
size_t index=Index(size);
if(freelist[index].Empty()){
void* obj;
obj=freelist[index].Pop();
return obj;
}
else{
return FetchFromCentralCache(index,alignsize);
}
}
FetchFromCentralCache:
在central cache中获取内存块
在这里我们使用慢增长的方式,每一次去central cache中获取内存时,size++。
好处:当我们大量使用这个桶中的内存时,我们不是每次都在central cache中获取1个,而是每次+1,这样就减少了去central cache中的次数,最大限度是512
FetchFromCentralCache(size_t index,size_t alignsize)
{
assert(index<NFREE_LIST);
size_t batchsum = min(freelist[index].MaxSize(),SizeClass::NumMoveSize(size));
if(batchsum==freelist[index].MaxSize()){
freelist[index].MaxSize()+=1:
}
void* start=nullptr;
void* end=nullptr;
size_t sum=GetNumCache(start,end,batchsum,alignsize);
assert(sum>0);
if(sum==1){
assert(start==end);
return start;
}
else{
freelist[index].PushRange(*(void**)start, end,sum-1);
}
return start;
}
高并发内存池-Central Cache
central cache也是哈希桶的结构,和thread cache的映射关系一样,都是208个桶。不同的是central cache的每个哈希桶的位置都是SpanList链表结构,每个映射桶下面的Span中的大块内存都被切分成了小块内存通过自由链表连接起来。 框架:
描述Span
class Span
{
Span* next=nullptr;
Span* prev=nullptr;
size_t spanId=0;
size_t pagaSum=0;
void* _freelist=nullptr;
size_t _usecount=0;
bool state=false;
size_t objSize=0;
};
class SpanList
{
public:
SpanList()
{
_head=new Span();
_head->next=_head;
_head->prev=_head;
}
Span* Begin()
{
return _head->next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->nex==_heaed;
}
void Insert(Span* pos,Span* obj)
{
assert(obj&&pos);
Span* cur = pos->prev;
cur->next = obj;
pos->prev = obj;
}
void Etase(Span* obj)
{
assert(obj);
assert(obj != _head);
Span* cur = obj->prev;
Span* pos = obj->next;
cur->next = pos;
pos->prev = cur;
}
void PushFront(Span* obj)
{
Insert(Begin(),obj);
}
void* PopFront()
{
Span* front=_head->next;
assert(front!=_head);
Etase(front);
return front;
}
private:
Span* _head;
public:
std::mutex mtx;
};
多个线程共享,则只能有一个对象,采用单例模式-饿汉模式
class CentralCache
{
public:
static CentralCache* GetObjCen()
{
return &_Inst;
}
Span* GetOneSpan(SpanList& list,size_t size);
size_t GetNumCache(void*&start,void*&end,size_t batchsum,size_t size);
private:
CentralCache(){};
CentralCache(const CentralCache& c)=delete;
public:
SpanList _spanlist[NFREE_LIST];
private:
static CentralCache _Inst;
};
初始化
CentralCache CentralCache::_inst;
GetNumCache:
size_t GetNumCache(void*&start,void*&end,size_t batchsum,size_t size)
{
size_t index=Index(size);
_spanlist[index].mtx.lock();
Span* span=GetOneSpan(_spanlist[index],size);
assert(span);
assert(span->_freelist);
start=span->_freelist;
end=start;
size_t i=0;
size_t actualNum=1;
while(i<batchsum-1 && *((void**)end)!=nullptr ){
end=*(void**)end;
++i;
++actualNum;
}
span->_spanlist = *(void**)end;
*(void**)end = nullptr;
span->_usecount += actualNum;
spanlists[index].mtx.unlock();
return actualNum;
}
GetOneSpan:
这里注意的是加锁,当进入Paga之前,要把Central的桶锁解掉,让其他等待锁资源的线程能够进入Central中获取内存。进入Paga获取新的Span时,要加锁,保证线程安全。
拿到了新Span要把这块大块内存切成小块内存通过自由链表的方式连接起来。挂到Central的所对应的桶中
在这里,我们可以通过页号计算该内存块的起始地址
Span* GetOneSpan(SpanList& list,size_t size)
{
assert(&list);
Span* it=list.Begin();
while(it!=list.End()){
if(it->_freelist!=nullptr){
return it;
}
it=it->next;
}
list.mtx.unlock();
PagaSpan::GtePagaObj()->Pagamtx.lock();
Span* span = PagaSpan::GtePagaObj()->NewSpan(SizeClass::NumMovePaga(size));
span->state = true;
span->objSize = size;
PagaSpan::GtePagaObj()->Pagamtx.unlock();
char* start = (char*)(span->_pageId<< PAGE_SHIFT);
size_t bytes = (span->n << PAGE_SHIFT);
char* end = start+bytes;
span->_spanlist = start;
start += size;
void* tail = span->_spanlist;
while (start < end){
*(void**)tail = start;
tail = start;
start += size;
}
*(void**)tail = nullptr;
list.mtx.lock();
list.PushFront(span);
return span;
}
高并发内存池-Paga Cache
- central向paga申请Span时,需要现在对应的位置上检查有没有Span,如果没有就开始往下寻找,找到了之后对该Span进行分割。例如申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。
- 如果最后都没有找到Span,就在堆上申请一个128页的Span挂接到Paga对应的桶中的自由链表上,然后在重复1的过程。
- 需要注意的是central cache和page cache 的核心结构都是哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的哈希桶t则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
框架:
单例模式
class PagaSpan
{
public:
static PagaSpan* GtePagaObj()
{
return &_pagaInst;
}
Span* NewSpan(size_t k);
private:
PagaSpan(){};
PagaSpan(const PagaSpan&) = delete;
public:
SpanList _pagalist[PAGA_LIST];
std::mutex Pagamtx;
private:
static PagaSpan _pagaInst;
在释放内存的时候要用到
std::unordered_map<PageID, Span*> _idSpanMap;
};
NewSpan:
Span* NewSpan(size_t k)
{
assert(k<PAGA_LIST);
if(!_pagalist[k].Emoty()){
Span* iSpan = _pagalist[k].PopFront();
for (PageID i = 0; i < iSpan->n; i++){
_idSpanMap[iSpan->_pageId + i] = iSpan;
}
return iSpan;
}
for(size_t i=k+1;i<PAGA_LIST;i++){
if(!_pagalist[i].Emoty()){
Span* nSpan= _pagalist[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
kSpan->n =k;
nSpan->_pageId += k;
nSpan->n -= k;
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->n-1] = nSpan;
for (PageID i = 0; i < kSpan->n; i++){
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
_pagalist[nSpan->n].PushFront(nSpan);
return kSpan;
}
}
Span* bigSpan = new Span;
void* ptr = SystemAlloc(PAGA_LIST - 1);
bigSpan->_pageId = ((PageID)ptr >> PAGE_SHIFT);
bigSpan->n = PAGA_LIST - 1;
_pagalist[bigSpan->n].PushFront(bigSpan);
return NewSpan(k);
}
释放内存部分
高并发内存池-Thread Cache
class ThreadCache
{
public:
void Deallocate(void* ptr, size_t size);
void ListToLong(size_t index,size_t num);
private:
FreeList freelist[NFREE_LIST];
};
Deallocate:
释放时先计算对应的桶,然后头插进去。
合并到central机制:
当该桶的自由链表的数量大于或者等于慢增长的个数就合并到central中的span中。
void Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
size_t index=Index(size);
freelist[index].Push(ptr);
if(freelist[index].Count()>=freelist[index].MaxSize()){
ListToLong(index,size);
}
}
ListToLong:
void ListToLong(size_t index,size_t num)
{
assert(index < NFREE_LIST);
void* start = nullptr;
void* end = nullptr;
freelist[index].PopRange(start, end, freelist[index].MaxSize());
CentralCache::GetObjCen()->ReleaseToSpan(start,size);
}
高并发内存池-Central Cache
class CentralCache
{
public:
static CentralCache* GetObjCen()
{
return &_Inst;
}
void ReleaseToSpan(void* start,size_t size);
private:
CentralCache(){};
CentralCache(const CentralCache& c)=delete;
public:
SpanList _spanlist[NFREE_LIST];
private:
static CentralCache _Inst;
};
初始化
CentralCache CentralCache::_inst;
ReleaseToSpan:
注意:我们在还给Span时,要把在哪个Span申请的还给Span(获得一段连续的Span空间)
我们
还要注意锁的问题
void ReleaseToSpan(void* start,size_t size)
{
assert(start);
size_t index=Index(size);
_spanlist[index].mtx.lock();
while(start){
void* next=*(void**)start;
Span* span=PagaCache::GetPagaObj()->MapObjectToSpan(start);
*(void**)start=span->_spanlist;
span->_spanlist=start;
assert(span->_usecount>0);
span->_usecount--;
if(span->_usecount == 0){
需要整合到Paga中
_spanlist[index].Erase(span);
span->next = nullptr;
span->prev = nullptr;
不需要_spanlist通过页号也可以找到地址
span->_spanlist = nullptr;
_spanlists[index].mtx.unlock();
PagaSpan::GtePagaObj()->Pagamtx.lock();
PagaSpan::GtePagaObj()->ReleaseSpanToPageCache(span);
PagaSpan::GtePagaObj()->Pagamtx.unlock();
_spanlists[index].mtx.lock();
}
start=next;
}
_spanlists[index].mtx.unlock();
}
高并发内存池-Paga Cache
class PagaSpan
{
public:
static PagaSpan* GtePagaObj()
{
return &_pagaInst;
}
Span* MapObjectToSpan(void* obj);
void ReleaseSpanToPageCache(Span* span);
private:
PagaSpan(){};
PagaSpan(const PagaSpan&) = delete;
public:
SpanList _pagalist[PAGA_LIST];
std::mutex Pagamtx;
private:
static PagaSpan _pagaInst;
std::unordered_map<PageID, Span*> _idSpanMap;
};
MapObjectToSpan:
Span* MapObjectToSpan(void* obj)
{
assert(obj);
PageID id=(PageID)obj >> PAGE_SHIFT;
std::unique_lock<std::mutex> lock(Pagamtx);
auto ren=_idSpanMap.find(id);
if (ren!=_idSpanMap.end()){
return ren->second;
}
else{
assert(false);
return nullptr;
}
}
ReleaseSpanToPageCache:
该Span要对前后页进行和并
void ReleaseSpanToPageCache(Span* span)
{
while (true){
PageID id = span->_pageId - 1;
auto ip = _idSpanMap.find(id);
if (ip == _idSpanMap.end()){
break;
}
if (ip->second->state == true){
break;
}
不能大于128页
if ((ip->second->n + span->n) > PAGA_LIST - 1){
break;
}
span->n += ip->n;
span->_pageId = ip->_pageId;
_pagalist[ip->n].Erase(ip);
delete ip->second;
}
while (true){
PageID id = span->_pageId +span->n;
auto ip =(Span*)_idSpanMap.find(id);
if (ip ==_idSpanMap.end() ){
break;
}
if (ip->state == true){
break;
}
if ((ip->n + span->n) > PAGA_LIST - 1){
break;
}
span->n += ip->n;
_pagalist[ip->n].Erase(ip);
delete ip->second;
}
插入到对应的桶上
_pagalist[span->n].PushFront(span);
span->state = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->n - 1] = span;
}
优化部分
在整个代码中,我们用到了new,我们写这个本来就是来替代new的,所以在这里我们要修改。
之前我们写了一个定长内存池,我们使用这个定长内存池来优化这部分
MemoryPool<Span> _spool;
我们映射span和页号是通过哈希桶来完成的。 在进行哈希桶操作时,要对其进行加锁,因为哈希桶的链表是用红黑树实现的,在对哈希桶操作时,有可能会改变树的结构。
如果是在32位机器下,我们不用哈希桶,而是自己搞一个映射关系呢?
就是弄一个指针数组,长度位2^19个,每个页都有一个地方映射,这样进行操作时,就不要加锁。
#include"Common.h"
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap1() {
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
void set(Number k, void* v) {
array_[k] = v;
}
};
测试
#include"ConcurrentAlloc.h"
#include<atomic>
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&,k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
|