参考资料
- windows api: https://docs.microsoft.com/en-us/windows/win32/api/
- 《Linux-UNIX系统编程手册》
- Python Doc: https://docs.python.org/3/library/mmap.html
- Linux manual: https://man7.org/linux/man-pages/man2/mmap.2.html
实现(windows)
主要功能
用C++创建共享内存,用Python进行读取,用csv格式传递DataFrame
Q&A
Q:为什么使用csv格式? A:如果使用binary格式,需要用python的struct.unpack解析二进制数据,相当于在Python环境下执行数据解析,这会非常慢,数据量大的话,可能会比写文件还慢。使用csv,C++写入一个字符流,Python中调用pd.read_csv,也是在C++代码中解析csv,因此比较快。 Q:为什么不将Python嵌入到C++中,用函数参数传递来传递DataFrame A:这种方式传递参数,也会非常非常慢
实现步骤
- 调用CreateFile创建文件
- 调用CreateFileMapping将创建的内存文件映射到本进程的内存空间
- 调用MapViewOfFile获得共享内存在本进程内存空间内的地址
- 写数据
C++创建共享内存并写入数据
#include <iostream>
#include <windows.h>
#include <thread>
using namespace std;
// 共享内存的大小
#define SHARE_MEMORY_FILE_SIZE_BYTES (100LL*1024*1024)
int main() {
// 创建文件
// https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
HANDLE dumpFd = CreateFile(
L"shared_memory1", // 名称
GENERIC_READ | GENERIC_WRITE, // 权限,读写
FILE_SHARE_READ | FILE_SHARE_DELETE, // 其他进程对该内存的权限,可读、可删除
nullptr,
OPEN_ALWAYS, // 共享内存存在时,将其打开,如果不存在,创建
FILE_ATTRIBUTE_NORMAL, // 文件系统中,该共享内存的权限
nullptr
);
if (dumpFd == INVALID_HANDLE_VALUE) {
cout << "create file error" << endl;
perror("CreateFile");
return -1;
}
// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createfilemappinga
HANDLE sharedFd = CreateFileMapping(
dumpFd,
nullptr,
PAGE_READWRITE, // 要执行的操作,读写
0,
SHARE_MEMORY_FILE_SIZE_BYTES, // 要映射的大小,如果不够大,操作系统会将该内存扩充至该大小
L"shared_memory1"
);
if (!sharedFd && false) {
cout << "CreateFileMapping error" << endl;
perror("CreateFileMapping");
return -1;
}
else {
// https://docs.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-mapviewoffile
LPVOID lp_base = MapViewOfFile(
sharedFd, // Handle of the map object
FILE_MAP_READ | FILE_MAP_WRITE, // Read and write access
0, // High-order DWORD of the file offset
0, // Low-order DWORD of the file offset
SHARE_MEMORY_FILE_SIZE_BYTES); // The number of bytes to map to view
// 要共享的DataFrame
char msg[] = "index,a,b,c\n0,1,1,1\n1,2,2,2\n2,3,3,3\n";
int len = strlen(msg);
char* ptr = (char*)lp_base;
// 写入4字节,代表后续数据长度
memcpy(ptr, &len, 4);
ptr = ptr + 4;
memcpy(ptr, msg, strlen(msg));
}
// 等待Python读取共享内存中的数据
this_thread::sleep_for(chrono::seconds(100));
}
Python中读取共享内存
import mmap
from io import StringIO
from struct import unpack
import pandas as pd
# 注意这里写的大小,一定要不大于C++中开启的共享内存大小,否则映射失败
SHARE_MEMORY_FILE_SIZE_BYTES = 100*1024
mmap_file = mmap.mmap(-1, SHARE_MEMORY_FILE_SIZE_BYTES, "shared_memory1", mmap.ACCESS_READ)
buffer_size = unpack("i", mmap_file.read(4))
print(f"Buffer size: {buffer_size}")
ex_factor = pd.read_csv(StringIO(mmap_file.read(buffer_size[0]).decode()), index_col=0)
print(ex_factor)
"""
结果:
Buffer size: (36,)
a b c
index
0 1 1 1
1 2 2 2
2 3 3 3
"""
实现(Linux)
说明:
C++和python的mmap.mmap函数在windows下和linux下有不同的使用方法。
流程
- C++调用shm_open打开一个临时文件,该文件会被创建在/dev/shm下
- C++调用mmap进行内存映射
- C++写入数据
- Python读取数据
- C++关闭共享内存
C++创建共享内存并写入数据
#include <iostream>
#include <sys/mman.h>
#include <fcntl.h>
#include <cstring>
#include <sys/stat.h>
#include <unistd.h>
#include <chrono>
#include <thread>
using namespace std;
int main() {
// 打开一个文件,在ubuntu中,这个文件会被创建在/dev/shm/下
int shm_fd = shm_open("shared_memory1", O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
const int64_t size = 1024LL * 10 * 1024;
// 将文件扩展到相应大小,稍后写入数据
ftruncate(shm_fd, size);
perror("shm_open");
// 调用mmap将共享内存映射到本进程的虚拟内存空间,返回指针
char* ptr = static_cast<char *>(mmap(nullptr, 1024LL * 10 * 1024, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0));
perror("mmap");
// 写入数据
char msg[] = "index,a,b,c\n0,1,1,1\n1,2,2,2\n2,3,3,3\n";
int len = strlen(msg);
memcpy(ptr, &len, 4);
ptr = ptr + 4;
memcpy(ptr , msg, len);
// 等待python读取
this_thread::sleep_for(chrono::seconds(10));
// 关闭共享内存对象,关闭之后,已经mmap的进程仍然可以使用该块内存,但新的进程无法再进行mmap
shm_unlink("shared_memory1");
// 解除映射,解除之后,该共享内存会被删掉
munmap(ptr-4, 1024LL * 10 * 1024);
}
Python中读取共享内存
import mmap
from io import StringIO
from struct import unpack
import pandas as pd
# 注意这里写的大小,一定要不大于C++中开启的共享内存大小,否则映射失败
SHARE_MEMORY_FILE_SIZE_BYTES = 100*1024
f = open("/dev/shm/shared_memory1",'r+b')
fd = f.fileno()
print(fd)
mmap_file = mmap.mmap(fd, 0)
buffer_size = unpack("i", mmap_file.read(4))
print(f"Buffer size: {buffer_size}")
ex_factor = pd.read_csv(StringIO(mmap_file.read(buffer_size[0]).decode()), index_col=0)
print(ex_factor)
|