什么是数据并行
数据并行(data-parallelism)和任务并行(task-parallelism)是实现并行计算的两种方式,可以简单理解为多人分工的两种方式。
- 例如:某场考试有200张试卷,试卷上有4种题型,需要4个人改卷。
- 数据并行:对“试卷数量”分割,每人改50张试卷的4种题型。(每人的任务都一样)
- 任务并行:对“试卷内容”分割,每人改200张试卷的1种题型。(每人的任务不一样)
日常写代码,如果需要用一个函数对一组数据依次处理,觉得执行得慢,就可以用数据并行。本文最后有写好的轮子,可以直接放项目里用。
输入输出示例
输入:
def Pow(a,n):
return a**n
args_mat=[
[2,0],
[2,1],
[2,2],
...
[2,100000]
]
- 必须是二维列表,一行代表一组参数。
- 一维列表arr通过`arr=[[x] for x in arr]`升成二维
- 进程池大小 pool_size (默认为5)
- 要小于CPU核数(
os.cpu_count() 可以返回CPU核数) - 进度条文字 desc (默认为空)
输出:
[1,2,4,8,16,32,64,...]
方法1:用Python自带的并行任务接口concurrent.futures
import concurrent.futures
from tqdm import tqdm
def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
results=[None for _ in range(len(args_mat))]
with tqdm(total=len(args_mat), desc=desc) as pbar:
with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
for future in concurrent.futures.as_completed(futures):
i=futures[future]
ret = future.result()
results[i]=ret
pbar.update(1)
return results
这个并行任务接口是给每一条输入数据各开一个进程来执行。 创建/销毁进程的开销很大,时间上比串行执行还慢。 这个进程池不适合用来做“数据并行”,而是适合做“任务并行”。
方法2:用Python自带的多进程接口multiprocessing
from multiprocessing import Pool, Pipe
from multiprocessing.connection import wait
from tqdm import tqdm
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
def batch_exec_v1(f,args_batch,w,offset=0):
for i,args in enumerate(args_batch):
ans = f(*args)
w.send((i+offset,ans))
w.send('exit')
w.close()
def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
if type(args_mat[0]) not in [list,tuple]:
args_mat=[[a]for a in args_mat]
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[None for _ in range(len(args_mat))]
args_batches = ToBatch(args_mat,batch_size)
readers=[]
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
for i,args_batch in enumerate(args_batches):
r,w=Pipe(duplex=False)
readers.append(r)
pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size))
while readers:
for r in wait(readers):
try:
msg=r.recv()
if msg=='exit':
readers.remove(r)
continue
results[msg[0]]=msg[1]
pbar.update(1)
except EOFError:
readers.remove(r)
return results
这段代码是把输入数据分批,每个进程处理一批。 数据分批的批数=进程池大小×4。批数设得越多,越倾向于提高并行计算资源的利用率,但同时,创建/销毁进程的开销也会越多。 由于一个进程是处理一批数据而不是一条数据,那么进度条就是一批一批地更新。要想一条一条地更新,就需要进程间通信。这里是每处理完一条数据,就立即把这一条的结果传给主进程;主进程收集结果,更新进度条。
方法3:在方法2的基础上,共用同一个通信管道
from multiprocessing import Pool, Pipe
from tqdm import tqdm
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
def batch_exec_v2(f,args_batch,w=None,offset=0):
for i,args in enumerate(args_batch):
ans = f(*args)
if w:w.send((i+offset,ans))
def multi_process_exec_v2(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[None for _ in range(len(args_mat))]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
r,w=Pipe(duplex=False)
for i,args_batch in enumerate(args_batches):
pool.apply_async(batch_exec_v2,(f,args_batch,w,i*batch_size))
cnt=0
while cnt<len(args_mat):
try:
msg=r.recv()
results[msg[0]]=msg[1]
pbar.update(1)
cnt+=1
except EOFError:
break
return results
方法2是给每个进程新创建一个通信管道。方法3这里改成给所有进程共用一个通信管道,可以节约很多创建/销毁管道的开销。 这是前三个中最快的方法,但有可能抛异常:
File "/usr/lib/python3.8/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
_pickle.UnpicklingError: invalid load key, '\x00'.
原因是两个进程同时往一个管道里传数据,数据混一起了,导致解析不出来。
方法4:在方法3的基础上,不通过管道传结果
from multiprocessing import Pool, Pipe
from tqdm import tqdm
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
def batch_exec_v3(f,args_batch,w=None):
results=[]
for i,args in enumerate(args_batch):
ans = f(*args)
results.append(ans)
if w:w.send(1)
return results
def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
r,w=Pipe(duplex=False)
pool_rets=[]
for i,args_batch in enumerate(args_batches):
pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w)))
cnt=0
while cnt<len(args_mat):
try:
msg=r.recv()
pbar.update(1)
cnt+=1
except EOFError:
break
for ret in pool_rets:
for r in ret.get():
results.append(r)
return results
方法3是将每条数据的计算结果立即从管道返回,一旦计算结果太长(如计算2的2000+次方),就可能和其他进程的结果混一起,不安全。方法4改成:每算完一条数据,从管道传个信号(数字1),好更新进度条,等该进程的一批数据算完,一次性返回一批结果。
方法5:不给数据分批的多进程,用multiprocessing.pool.imap实现
这是我学到的第一个“多进程+进度条”的方法。
from multiprocessing import Pool
from tqdm import tqdm
def batch_exec_v4(args):
return args[0](*args[1])
def multi_process_exec_v4(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
results=[]
with Pool(processes=pool_size) as pool:
imap_it = pool.imap(batch_exec_v4, [(f,args) for args in args_mat])
for ret in tqdm(imap_it,total=len(args_mat),desc=desc):
results.append(ret)
return results
方法6:在方法1的基础上,给数据分批
import concurrent.futures
from tqdm import tqdm
def batch_exec_v5(f,args_batch):
results=[]
for args in args_batch:
ans = f(*args)
results.append(ans)
return results
def multi_process_exec_v5(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[None for _ in range(len(args_mat))]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
r,w=Pipe(duplex=False)
with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
futures = {executor.submit(batch_exec_v5,*(f,args_batch)): i*batch_size for i,args_batch in enumerate(args_batches)}
for future in concurrent.futures.as_completed(futures):
i=futures[future]
ret = future.result()
results[i:i+len(ret)]=ret
pbar.update(len(ret))
return results
这个方法是“一批一批”地更新进度条,而不是一个一个。
性能比较
测试代码:
import concurrent.futures
from tqdm import tqdm
from multiprocessing import Pool, Pipe, freeze_support
from multiprocessing.connection import wait
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
def batch_exec_v1(f,args_batch,w,offset=0):
for i,args in enumerate(args_batch):
ans = f(*args)
w.send((i+offset,ans))
w.send('exit')
w.close()
def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
if type(args_mat[0]) not in [list,tuple]:
args_mat=[[a]for a in args_mat]
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[None for _ in range(len(args_mat))]
args_batches = ToBatch(args_mat,batch_size)
readers=[]
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
for i,args_batch in enumerate(args_batches):
r,w=Pipe(duplex=False)
readers.append(r)
pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size))
while readers:
for r in wait(readers):
try:
msg=r.recv()
if msg=='exit':
readers.remove(r)
continue
results[msg[0]]=msg[1]
pbar.update(1)
except EOFError:
readers.remove(r)
return results
def batch_exec(f,args_batch,w=None,offset=0):
for i,args in enumerate(args_batch):
ans = f(*args)
if w:w.send((i+offset,ans))
def multi_process_exec(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[None for _ in range(len(args_mat))]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
r,w=Pipe(duplex=False)
for i,args_batch in enumerate(args_batches):
pool.apply_async(batch_exec,(f,args_batch,w,i*batch_size))
cnt=0
while cnt<len(args_mat):
try:
msg=r.recv()
results[msg[0]]=msg[1]
pbar.update(1)
cnt+=1
except EOFError:
break
return results
def batch_exec_v3(f,args_batch,w=None):
results=[]
for i,args in enumerate(args_batch):
ans = f(*args)
results.append(ans)
if w:w.send(1)
return results
def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
r,w=Pipe(duplex=False)
pool_rets=[]
for i,args_batch in enumerate(args_batches):
pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w)))
cnt=0
while cnt<len(args_mat):
try:
msg=r.recv()
pbar.update(1)
cnt+=1
except EOFError:
break
for ret in pool_rets:
for r in ret.get():
results.append(r)
return results
def batch_exec_v4(args):
return args[0](*args[1])
def multi_process_exec_v4(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
results=[]
with Pool(processes=pool_size) as pool:
imap_it = pool.imap(batch_exec_v4, [(f,args) for args in args_mat])
for ret in tqdm(imap_it,total=len(args_mat),desc=desc):
results.append(ret)
return results
def batch_exec_v5(f,args_batch):
results=[]
for args in args_batch:
ans = f(*args)
results.append(ans)
return results
def multi_process_exec_v5(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[None for _ in range(len(args_mat))]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
r,w=Pipe(duplex=False)
with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
futures = {executor.submit(batch_exec_v5,*(f,args_batch)): i*batch_size for i,args_batch in enumerate(args_batches)}
for future in concurrent.futures.as_completed(futures):
i=futures[future]
ret = future.result()
results[i:i+len(ret)]=ret
pbar.update(len(ret))
return results
def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
results=[None for _ in range(len(args_mat))]
with tqdm(total=len(args_mat), desc=desc) as pbar:
with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
for future in concurrent.futures.as_completed(futures):
i=futures[future]
ret = future.result()
results[i]=ret
pbar.update(1)
return results
def multi_thread_exec(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
results=[None for _ in range(len(args_mat))]
with tqdm(total=len(args_mat), desc=desc) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
for future in concurrent.futures.as_completed(futures):
i=futures[future]
ret = future.result()
results[i]=ret
pbar.update(1)
return results
def Pow(a,n):
return a**n
if __name__=='__main__':
import time
args_mat=[(2,i) for i in range(100000)]
t0=time.time()
results=[Pow(*a) for a in tqdm(args_mat,desc='串行执行')]
t1=time.time()
print(f"串行执行,用时:{t1-t0:.2}秒")
t0=time.time()
results=multi_thread_exec(Pow,args_mat,4,desc='多线程')
t1=time.time()
print(f"多线程执行,用时:{t1-t0:.2}秒")
t0=time.time()
results=multi_process_exec_v0(Pow,args_mat,4,desc='多进程方法1')
t1=time.time()
print(f"方法1用时:{t1-t0:.2}秒")
t0=time.time()
results=multi_process_exec_v1(Pow,args_mat,4,desc='多进程方法2')
t1=time.time()
print(f"方法2用时:{t1-t0:.2}秒")
try:
t0=time.time()
results=multi_process_exec(Pow,args_mat,4,desc='多进程方法3')
t1=time.time()
print(f"方法3用时:{t1-t0:.2}秒")
except Exception:
print(f"方法3 异常")
t0=time.time()
results=multi_process_exec_v3(Pow,args_mat,4,desc='多进程方法4')
t1=time.time()
print(f"方法4用时:{t1-t0:.2}秒")
t0=time.time()
results=multi_process_exec_v4(Pow,args_mat,4,desc='多进程方法5')
t1=time.time()
print(f"方法5用时:{t1-t0:.2}秒")
t0=time.time()
results=multi_process_exec_v5(Pow,args_mat,4,desc='多进程方法6')
t1=time.time()
print(f"方法6用时:{t1-t0:.2}秒")
方法 | 用时(Windows笔记本电脑) | 用时(Linux云服务器) |
---|
串行 | 00:13 | 00:09 | 多线程 | 00:16 | 00:16 | 多进程(方法1) | 00:44 | 00:25 | 多进程(方法2) | 00:13 | 00:06 | 多进程(方法3) | 00:06 | 异常 | 多进程(方法4) | 00:06 | 00:05 | 多进程(方法5) | 00:22 | 00:13 | 多进程(方法6) | 00:06 | 00:05 |
可以看到,性能优于单线程的,有方法3、方法4、方法6。但方法3由于共用Pipe传结果,可能出异常;方法6的进度条一批一批更新,看起来不舒服。结论是建议用方法4。
轮子:util_executer.py
import concurrent.futures
from tqdm import tqdm
from multiprocessing import Pool, Pipe, freeze_support
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
def batch_exec(f,args_batch,w):
results=[]
for i,args in enumerate(args_batch):
try:
ans = f(*args)
results.append(ans)
except Exception:
results.append(None)
w.send(1)
return results
def multi_process_exec(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
batch_size=max(1,int(len(args_mat)/4/pool_size))
results=[]
args_batches = ToBatch(args_mat,batch_size)
with tqdm(total=len(args_mat), desc=desc) as pbar:
with Pool(processes=pool_size) as pool:
r,w=Pipe(duplex=False)
pool_rets=[]
for i,args_batch in enumerate(args_batches):
pool_rets.append(pool.apply_async(batch_exec,(f,args_batch,w)))
cnt=0
while cnt<len(args_mat):
try:
msg=r.recv()
pbar.update(1)
cnt+=1
except EOFError:
break
for ret in pool_rets:
for r in ret.get():
results.append(r)
return results
def multi_thread_exec(f,args_mat,pool_size=5,desc=None):
if len(args_mat)==0:return []
results=[None for _ in range(len(args_mat))]
with tqdm(total=len(args_mat), desc=desc) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
for future in concurrent.futures.as_completed(futures):
i=futures[future]
ret = future.result()
results[i]=ret
pbar.update(1)
return results
def Pow(a,n):
return a**n
if __name__=='__main__':
args_mat=[(2,i) for i in range(100)]
results=multi_thread_exec(Pow,args_mat,4,desc='多线程')
print(results)
results=multi_process_exec(Pow,args_mat,4,desc='多进程方法1')
print(results)
参考
本文首发于我的博客:https://www.proup.club/index.php/archives/733/ 转载请注明本页面网址和原作者:pro1515151515
|