IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Python数据并行的6种方法(进程池+进度条) -> 正文阅读

[Python知识库]Python数据并行的6种方法(进程池+进度条)

什么是数据并行

数据并行(data-parallelism)和任务并行(task-parallelism)是实现并行计算的两种方式,可以简单理解为多人分工的两种方式。

  • 例如:某场考试有200张试卷,试卷上有4种题型,需要4个人改卷。
    • 数据并行:对“试卷数量”分割,每人改50张试卷的4种题型。(每人的任务都一样)
    • 任务并行:对“试卷内容”分割,每人改200张试卷的1种题型。(每人的任务不一样)

日常写代码,如果需要用一个函数对一组数据依次处理,觉得执行得慢,就可以用数据并行。本文最后有写好的轮子,可以直接放项目里用。

输入输出示例

输入:

  • 函数 f
def Pow(a,n):    # 例如:计算a的n次方
    return a**n
  • 数据列表 args_mat
args_mat=[     # 例如:从2的0次方到2的100000次方都需要计算
    [2,0],
    [2,1],
    [2,2],
    ...
    [2,100000]
]
- 必须是二维列表,一行代表一组参数。
- 一维列表arr通过`arr=[[x] for x in arr]`升成二维
  • 进程池大小 pool_size (默认为5)
    • 要小于CPU核数(os.cpu_count()可以返回CPU核数)
  • 进度条文字 desc (默认为空)

输出:

  • 结果列表
[1,2,4,8,16,32,64,...] # 结果列表要和数据列表一一对应得上

方法1:用Python自带的并行任务接口concurrent.futures

import concurrent.futures
from tqdm import tqdm

def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    results=[None for _ in range(len(args_mat))]
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
            futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
            for future in concurrent.futures.as_completed(futures):
                i=futures[future]
                ret = future.result()
                results[i]=ret
                pbar.update(1)
    return results

这个并行任务接口是给每一条输入数据各开一个进程来执行。
创建/销毁进程的开销很大,时间上比串行执行还慢。
这个进程池不适合用来做“数据并行”,而是适合做“任务并行”。

方法2:用Python自带的多进程接口multiprocessing

from multiprocessing import Pool, Pipe
from multiprocessing.connection import wait
from tqdm import tqdm

ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
def batch_exec_v1(f,args_batch,w,offset=0):
    for i,args in enumerate(args_batch):
        ans = f(*args)
        w.send((i+offset,ans))
    w.send('exit')
    w.close()
def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    if type(args_mat[0]) not in [list,tuple]:
        args_mat=[[a]for a in args_mat]
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[None for _ in range(len(args_mat))]
    args_batches = ToBatch(args_mat,batch_size)
    readers=[]
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            for i,args_batch in enumerate(args_batches):
                r,w=Pipe(duplex=False)
                readers.append(r)
                pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size))
            while readers:
                for r in wait(readers):
                    try:
                        msg=r.recv()
                        if msg=='exit':
                            readers.remove(r)
                            continue
                        results[msg[0]]=msg[1]
                        pbar.update(1)
                    except EOFError:
                        readers.remove(r)
    return results

这段代码是把输入数据分批,每个进程处理一批。
数据分批的批数=进程池大小×4。批数设得越多,越倾向于提高并行计算资源的利用率,但同时,创建/销毁进程的开销也会越多。
由于一个进程是处理一批数据而不是一条数据,那么进度条就是一批一批地更新。要想一条一条地更新,就需要进程间通信。这里是每处理完一条数据,就立即把这一条的结果传给主进程;主进程收集结果,更新进度条。

方法3:在方法2的基础上,共用同一个通信管道

from multiprocessing import Pool, Pipe
from tqdm import tqdm
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]

def batch_exec_v2(f,args_batch,w=None,offset=0):
    for i,args in enumerate(args_batch):
        ans = f(*args)
        if w:w.send((i+offset,ans))

def multi_process_exec_v2(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[None for _ in range(len(args_mat))]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            r,w=Pipe(duplex=False)
            for i,args_batch in enumerate(args_batches):
                pool.apply_async(batch_exec_v2,(f,args_batch,w,i*batch_size))
            cnt=0
            while cnt<len(args_mat):
                try:
                    msg=r.recv()
                    results[msg[0]]=msg[1]
                    pbar.update(1)
                    cnt+=1
                except EOFError:
                    break
    return results

方法2是给每个进程新创建一个通信管道。方法3这里改成给所有进程共用一个通信管道,可以节约很多创建/销毁管道的开销。
这是前三个中最快的方法,但有可能抛异常:

  File "/usr/lib/python3.8/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
_pickle.UnpicklingError: invalid load key, '\x00'.

原因是两个进程同时往一个管道里传数据,数据混一起了,导致解析不出来。

方法4:在方法3的基础上,不通过管道传结果

from multiprocessing import Pool, Pipe
from tqdm import tqdm
ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]

def batch_exec_v3(f,args_batch,w=None):
    results=[]
    for i,args in enumerate(args_batch):
        ans = f(*args)
        results.append(ans)
        if w:w.send(1)
    return results

def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            r,w=Pipe(duplex=False)
            pool_rets=[]
            for i,args_batch in enumerate(args_batches):
                pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w)))
            cnt=0
            while cnt<len(args_mat):
                try:
                    msg=r.recv()
                    pbar.update(1)
                    cnt+=1
                except EOFError:
                    break
            for ret in pool_rets:
                for r in ret.get():
                    results.append(r)
    return results

方法3是将每条数据的计算结果立即从管道返回,一旦计算结果太长(如计算2的2000+次方),就可能和其他进程的结果混一起,不安全。方法4改成:每算完一条数据,从管道传个信号(数字1),好更新进度条,等该进程的一批数据算完,一次性返回一批结果。

方法5:不给数据分批的多进程,用multiprocessing.pool.imap实现

这是我学到的第一个“多进程+进度条”的方法。

from multiprocessing import Pool
from tqdm import tqdm

def batch_exec_v4(args):
    return args[0](*args[1])

def multi_process_exec_v4(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    results=[]
    with Pool(processes=pool_size) as pool:
        imap_it = pool.imap(batch_exec_v4, [(f,args) for args in args_mat])
        for ret in tqdm(imap_it,total=len(args_mat),desc=desc):
            results.append(ret)
    return results

方法6:在方法1的基础上,给数据分批

import concurrent.futures
from tqdm import tqdm

def batch_exec_v5(f,args_batch):
    results=[]
    for args in args_batch:
        ans = f(*args)
        results.append(ans)
    return results
def multi_process_exec_v5(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[None for _ in range(len(args_mat))]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        r,w=Pipe(duplex=False)
        with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
            futures = {executor.submit(batch_exec_v5,*(f,args_batch)): i*batch_size for i,args_batch in enumerate(args_batches)}
            for future in concurrent.futures.as_completed(futures):
                i=futures[future]
                ret = future.result()
                results[i:i+len(ret)]=ret
                pbar.update(len(ret))
    return results

这个方法是“一批一批”地更新进度条,而不是一个一个。

性能比较

测试代码:

import concurrent.futures
from tqdm import tqdm
from multiprocessing import Pool, Pipe, freeze_support
from multiprocessing.connection import wait

ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]

def batch_exec_v1(f,args_batch,w,offset=0):
    for i,args in enumerate(args_batch):
        ans = f(*args)
        w.send((i+offset,ans))
    w.send('exit')
    w.close()
def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    if type(args_mat[0]) not in [list,tuple]:
        args_mat=[[a]for a in args_mat]
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[None for _ in range(len(args_mat))]
    args_batches = ToBatch(args_mat,batch_size)
    readers=[]
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            for i,args_batch in enumerate(args_batches):
                r,w=Pipe(duplex=False)
                readers.append(r)
                pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size))
            while readers:
                for r in wait(readers):
                    try:
                        msg=r.recv()
                        if msg=='exit':
                            readers.remove(r)
                            continue
                        results[msg[0]]=msg[1]
                        pbar.update(1)
                    except EOFError:
                        readers.remove(r)
    return results

def batch_exec(f,args_batch,w=None,offset=0):
    for i,args in enumerate(args_batch):
        ans = f(*args)
        if w:w.send((i+offset,ans))

def multi_process_exec(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[None for _ in range(len(args_mat))]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            r,w=Pipe(duplex=False)
            for i,args_batch in enumerate(args_batches):
                pool.apply_async(batch_exec,(f,args_batch,w,i*batch_size))
            cnt=0
            while cnt<len(args_mat):
                try:
                    msg=r.recv()
                    results[msg[0]]=msg[1]
                    pbar.update(1)
                    cnt+=1
                except EOFError:
                    break
    return results

def batch_exec_v3(f,args_batch,w=None):
    results=[]
    for i,args in enumerate(args_batch):
        ans = f(*args)
        results.append(ans)
        if w:w.send(1)
    return results

def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            r,w=Pipe(duplex=False)
            pool_rets=[]
            for i,args_batch in enumerate(args_batches):
                pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w)))
            cnt=0
            while cnt<len(args_mat):
                try:
                    msg=r.recv()
                    pbar.update(1)
                    cnt+=1
                except EOFError:
                    break
            for ret in pool_rets:
                for r in ret.get():
                    results.append(r)
    return results

def batch_exec_v4(args):
    return args[0](*args[1])

def multi_process_exec_v4(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    results=[]
    with Pool(processes=pool_size) as pool:
        imap_it = pool.imap(batch_exec_v4, [(f,args) for args in args_mat])
        for ret in tqdm(imap_it,total=len(args_mat),desc=desc):
            results.append(ret)
    return results

def batch_exec_v5(f,args_batch):
    results=[]
    for args in args_batch:
        ans = f(*args)
        results.append(ans)
    return results
def multi_process_exec_v5(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[None for _ in range(len(args_mat))]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        r,w=Pipe(duplex=False)
        with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
            futures = {executor.submit(batch_exec_v5,*(f,args_batch)): i*batch_size for i,args_batch in enumerate(args_batches)}
            for future in concurrent.futures.as_completed(futures):
                i=futures[future]
                ret = future.result()
                results[i:i+len(ret)]=ret
                pbar.update(len(ret))
    return results


def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    results=[None for _ in range(len(args_mat))]
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
            futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
            for future in concurrent.futures.as_completed(futures):
                i=futures[future]
                ret = future.result()
                results[i]=ret
                pbar.update(1)
    return results


def multi_thread_exec(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    results=[None for _ in range(len(args_mat))]
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
            futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
            for future in concurrent.futures.as_completed(futures):
                i=futures[future]
                ret = future.result()
                results[i]=ret
                pbar.update(1)
    return results

def Pow(a,n):
    return a**n

if __name__=='__main__':
    import time
    args_mat=[(2,i) for i in range(100000)]
    
    t0=time.time()
    results=[Pow(*a) for a in tqdm(args_mat,desc='串行执行')]
    t1=time.time()
    print(f"串行执行,用时:{t1-t0:.2}秒")
    
    t0=time.time()
    results=multi_thread_exec(Pow,args_mat,4,desc='多线程')
    t1=time.time()
    print(f"多线程执行,用时:{t1-t0:.2}秒")
    
    t0=time.time()
    results=multi_process_exec_v0(Pow,args_mat,4,desc='多进程方法1')
    t1=time.time()
    print(f"方法1用时:{t1-t0:.2}秒")
    
    t0=time.time()
    results=multi_process_exec_v1(Pow,args_mat,4,desc='多进程方法2')
    t1=time.time()
    print(f"方法2用时:{t1-t0:.2}秒")
    
    try:
        t0=time.time()
        results=multi_process_exec(Pow,args_mat,4,desc='多进程方法3')
        t1=time.time()
        print(f"方法3用时:{t1-t0:.2}秒")
    except Exception:
        print(f"方法3 异常")
    
    t0=time.time()
    results=multi_process_exec_v3(Pow,args_mat,4,desc='多进程方法4')
    t1=time.time()
    print(f"方法4用时:{t1-t0:.2}秒")

    t0=time.time()
    results=multi_process_exec_v4(Pow,args_mat,4,desc='多进程方法5')
    t1=time.time()
    print(f"方法5用时:{t1-t0:.2}秒")

    t0=time.time()
    results=multi_process_exec_v5(Pow,args_mat,4,desc='多进程方法6')
    t1=time.time()
    print(f"方法6用时:{t1-t0:.2}秒")

方法用时(Windows笔记本电脑)用时(Linux云服务器)
串行00:1300:09
多线程00:1600:16
多进程(方法1)00:4400:25
多进程(方法2)00:1300:06
多进程(方法3)00:06异常
多进程(方法4)00:0600:05
多进程(方法5)00:2200:13
多进程(方法6)00:0600:05

可以看到,性能优于单线程的,有方法3、方法4、方法6。但方法3由于共用Pipe传结果,可能出异常;方法6的进度条一批一批更新,看起来不舒服。结论是建议用方法4。

轮子:util_executer.py

import concurrent.futures
from tqdm import tqdm
from multiprocessing import Pool, Pipe, freeze_support

#=============================================================#
# 接口                                                        #
#-------------------------------------------------------------#
#   multi_process_exec 多进程执行                             #
#   multi_thread_exec  多线程执行                             #
#-------------------------------------------------------------#
# 参数:                                                      #
#   f         (function): 批量执行的函数                      #
#   args_mat  (list)    : 批量执行的参数                      #
#   pool_size (int)     : 进程/线程池的大小                   #
#   desc      (str)     : 进度条的描述文字                    #
#-------------------------------------------------------------#
# 例子:                                                      #
# >>> def Pow(a,n):        ← 定义一个函数(可以有多个参数)   #
# ...     return a**n                                         #
# >>>                                                         #
# >>> args_mat=[[2,1],     ← 批量计算 Pow(2,1)                #
# ...           [2,2],                Pow(2,2)                #
# ...           [2,3],                Pow(2,3)                #
# ...           [2,4],                Pow(2,4)                #
# ...           [2,5],                Pow(2,5)                #
# ...           [2,6]]                Pow(2,6)                #
# >>>                                                         #
# >>> results=multi_thread_exec(Pow,args_mat,desc='计算中')   #
# 计算中: 100%|█████████████| 6/6 [00:00<00:00, 20610.83it/s] #
# >>>                                                         #
# >>> print(results)                                          #
# [2, 4, 8, 16, 32, 64]                                       #
#-------------------------------------------------------------#

ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]

def batch_exec(f,args_batch,w):
    results=[]
    for i,args in enumerate(args_batch):
        try:
            ans = f(*args)
            results.append(ans)
        except Exception:
            results.append(None)
        w.send(1)
    return results

def multi_process_exec(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    batch_size=max(1,int(len(args_mat)/4/pool_size))
    results=[]
    args_batches = ToBatch(args_mat,batch_size)
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with Pool(processes=pool_size) as pool:
            r,w=Pipe(duplex=False)
            pool_rets=[]
            for i,args_batch in enumerate(args_batches):
                pool_rets.append(pool.apply_async(batch_exec,(f,args_batch,w)))
            cnt=0
            while cnt<len(args_mat):
                try:
                    msg=r.recv()
                    pbar.update(1)
                    cnt+=1
                except EOFError:
                    break
            for ret in pool_rets:
                for r in ret.get():
                    results.append(r)
    return results

def multi_thread_exec(f,args_mat,pool_size=5,desc=None):
    if len(args_mat)==0:return []
    results=[None for _ in range(len(args_mat))]
    with tqdm(total=len(args_mat), desc=desc) as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
            futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
            for future in concurrent.futures.as_completed(futures):
                i=futures[future]
                ret = future.result()
                results[i]=ret
                pbar.update(1)
    return results

def Pow(a,n):
    return a**n

if __name__=='__main__':
    args_mat=[(2,i) for i in range(100)]
    results=multi_thread_exec(Pow,args_mat,4,desc='多线程')
    print(results)
    results=multi_process_exec(Pow,args_mat,4,desc='多进程方法1')
    print(results)
    

参考

本文首发于我的博客:https://www.proup.club/index.php/archives/733/
转载请注明本页面网址和原作者:pro1515151515

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-06-29 19:00:05  更:2022-06-29 19:01:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/25 12:52:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码