进程池是为了简化多进程任务而生。当我们有大量的任务,其处理函数都是相同的,或者只是函数参数不同。这种情况,直接生成和任务数量相同的进程是极其消耗资源的(比如用Process和for依次生成进程)。这个时候就非常适合使用进程池Pool
import multiprocessing as mp
n_proc = 5
pool = mp.Pool(n_proc)
以上代码生成了5个进程的池子。最多可以同时运行5个相同的函数。 pool类有以下4种非常常用的类型。
- apply:阻塞,任务其实是一个一个执行完的。无法实现并行效果
- apply_async
- map
- map_async
其中map和map_async的用法接近,apply和appy_async的用法接近。 带async的区别主要有:
- 返回的顺序不是按照建立任务的顺序来的,完全看哪个任务先结束谁先返回
- 有一个callback参数。可以用于记录返回值和其他回调功能,比如结合tqdm制作多进程进度条。
- 异步函数接受参数,但同步函数接受一个装载参数的迭代器。
现在有一个函数,功能是睡眠一段时间。来看一下四种函数的用法。
import multiprocessing as mp
def test_func(v):
print(v)
sleep(10/2-v/2)
return v
data = range(10)
apply
n_proc = 5
pool = mp.Pool(n_proc)
res = []
for d in data:
pool.apply(test_func, (d,))
pool.close()
pool.join()
可以发现,运行的总时长等于所有任务的总时长。那这个函数的意义是?
apply_async
n_proc = 5
pool = mp.Pool(n_proc)
res = []
for d in data:
tmp = pool.apply_async(test_func, (d,))
res.append(tmp)
pool.close()
pool.join()
for r in res:
r.get()
从打印内容来看,一共有10个任务,pool同时处理5个任务,哪个任务结束了,该进程就执行下个任务
map
n_proc = 5
pool = mp.Pool(n_proc)
res = pool.map(test_func, data)
pool.close()
pool.join()
print(res)
也是同时执行5个任务。总时间在5s左右。说明是确实是并行执行以上10个任务的
map_async
n_proc = 5
pool = mp.Pool(n_proc)
res = pool.map_async(test_func, data)
pool.close()
pool.join()
print(res.get())
依然是并行运行
大家应该注意到了。map和map_async没有用到for循环新建进程任务。因此总结出:以map为首的函数用法是,针对多次运行同一个任务(test_func),如果只是参数不同,可以把参数做成一个迭代器。
callback回调函数的用法
带async的函数,支持应用回调函数。 当进行执行完毕,我们就会调用这个回调函数。回调函数的参数有一个,为进程任务的返回值。
def log(v):
callback_res.append(v)
data = range(10)
callback_res = []
n_proc = 5
pool = mp.Pool(n_proc)
for d in data:
pool.apply_async(test_func, (d,), callback=log)
pool.close()
pool.join()
print(callback_res)
从该示例中,我们可以看到,log函数中执行的内容是,在callback_res中加入test_func的返回值。同时注意到callback_res不是按照顺序排列的,而是按照执行速度依次排列的。在头5个任务中,输入为4的任务执行最快。
多进程进度条
from tqdm import tqdm
pbar = tqdm(total=len(data))
pbar.set_description('Sleep')
update = lambda *args: pbar.update()
n_proc = 5
pool = mp.Pool(n_proc)
for d in data:
pool.apply_async(test_func, (d,), callback=update)
pool.close()
pool.join()
这里我们采用tqdm生成进度条,然后使用labmda操作符,将其转换为函数,然后作为回调对象。那么进程池每处理完一个任务,就会调用一次tqdm的update()。从而实现多进程的进度条显示。
参考
Python multiprocessing.Pool的四种方法比较:: map, apply, map_async, apply_async Python多进程处理方法(Pool、apply_async、map_async) python多进程打印进度条 Multiprocessing : use tqdm to display a progress bar
|