multiprocessing 模块
Python标准库中的multiprocessing 模块允许脚本通过与threading 模块非常类似的API来派生进程,且在Unix和Windows下皆可工作,因为它支持一个基本上与平台无关的进程派生模型。
基本操作:进程和锁
multiprocessing 模块的Process 类被设计为模拟threading.Thread 类,它允许启动一个与调用者脚本并行运行函数的进程。
示例:multi_1.py
"multiprocessing模块基本操作"
from multiprocessing import Process, Lock
import os
def who_am_i(label_str, lock):
"打印信息"
msg_str = '{}: name={} process={}'.format(label_str, __name__, os.getpid())
with lock:
print(msg_str)
def main():
lock = Lock()
who_am_i('function call', lock)
process = Process(target=who_am_i, args=('spawned child', lock))
process.start()
process.join()
for i_id_int in range(5):
Process(
target=who_am_i,
args=('run process {}'.format(i_id_int), lock)
).start()
with lock:
print('Main process exit')
if __name__ == '__main__':
main()
输出:multi_1.py
function call: name=__main__ process=10416
spawned child: name=__main__ process=10417
run process 0: name=__main__ process=10418
run process 1: name=__main__ process=10419
run process 2: name=__main__ process=10420
Main process exit
run process 3: name=__main__ process=10421
run process 4: name=__main__ process=10422
- 注意一下,这个脚本最后派生出的5个进程其中部分比其父进程运行的时间更长。
Process.start :该方法在一个新进程中调用其run 方法Process.run :该方法仅仅调用传入的目标函数Process.join :方法等待派生进程退出- 我们可以如示例一样传入
target ,也可以定义子类的run 方法。
关于实现和用法的规则
multiprocessing 模块针对不同的平台有不同的可移植方案:
- 在UNIX下,它分支一个新的子进程并在其中调用
Process 对象的run 方法。 - 在Windows下,它通过Windows下特有的进程创建工具来派生一个新的解释器,通过管道向新进程中传入
pickle 后的Process 对象,并在新进程中运行python -c 命令行,后者运行这个包里一个特殊的Python编码的函数来读取和unpickle 这个Process 对象并调用其run 方法。
multiprocessing 模块的基本构架仍然可以对您能够使用它的方法产生微妙的影响:
- 在Windows下,主进程的逻辑业务通常嵌套在一个
__name__==__main__ 的测试中。 - 当Windows下子进程访问全局对象的时候,后者的值可能与其在父进程的起始时间不同。
- 在Windows下,
Process 接受的所有参数必须能接受pickle 操作,因此target 不能是绑定或者非绑定对象的方法,也不能是lambda 语句创建的函数,还不能是带有系统状态的对象,如连接上的套接字。 - 在Windows下,定制的
Process 子类中的所有东西也必须是可被pickle 的。 - 这个包中的IPC对象,如
Pipe 和Queue 也只接受可pickle 的对象。 - 在UNIX下,最好将对象作为参数传入子进程的构造器,这样对Windows来说有更好的可移植性,如果这种对象是父进程收集的垃圾的话,还能避免某些潜在问题。
IPC工具:管道、共享内存和队列
multiprocessing 模块为它派生的进程提供了可跨平台移植的消息传递工具:
- 模块的
Pipe 对象提供了一个可连接两个进程的双向匿名管道,返回两个Connection 对象,代表管道的两端,发送和接受任何可被pickle 操作的Python 对象。目前在UNIX下,它们在内部有一对连接上的套接字或我们之前看到的os.pipe 调用得以实现,而在Windows下由平台特异的具名管道实现。 - 模块的
Value 和Array 对象实现共享的进程、线程安全的内存以用于进程间通信。这些调用返回基于ctypes 模块并在共享内存中创建的标量和数组对象,默认带有访问同步化设置。 - 模块的
Queue 对象可用作Python对象一个先进先出(FIFO)的列表,其中允许多个生产者和消费者。从本质上来说,队列是一个管道加上用于协调随机访问的锁机制,并继承了Pipe 可进行pickle 操作的限制。
管道
示例:multi_2.py
"测试multiprocessing.Pipe"
from multiprocessing import Pipe, Process
def sender(pipe):
"通过管道向父进程发送对象"
pipe.send(['spam'] + [42, 'eggs'])
pipe.close()
def talker(pipe):
"通过管道向父进程发送和接受对象"
pipe.send(dict(name='Bob', spam=42))
print('talker got:', pipe.recv())
def main():
parent_pipe, child_pipe = Pipe()
Process(target=sender, args=(child_pipe,)).start()
print('parent got:', parent_pipe.recv())
parent_pipe.close()
parent_pipe, child_pipe = Pipe()
child_process = Process(target=talker, args=(child_pipe,))
child_process.start()
print('parent got:', parent_pipe.recv())
parent_pipe.send([x_str * 2 for x_str in 'spam'])
child_process.join()
print('Parent exit')
if __name__ == '__main__':
main()
输出:multi_2.py
parent got: ['spam', 42, 'eggs']
parent got: {'name': 'Bob', 'spam': 42}
talker got: ['ss', 'pp', 'aa', 'mm']
Parent exit
共享内存和全局对象
示例:multi_3.py
"测试multiprocessing的共享内存"
import os
from multiprocessing import Process, Value, Array
NUMS_PROC_INT = 3
COUNT_INT = 0
def show_data(label_str, value, array):
"打印数据"
msg = '{}: pid={} global={} value={} array={}'.format(
label_str, os.getpid(), COUNT_INT, value.value, list(array)
)
print(msg)
def updater(value, array):
"更新数据"
global COUNT_INT
COUNT_INT += 1
value.value += 1
for i_index_int in range(NUMS_PROC_INT):
array[i_index_int] += 1
def main():
global COUNT_INT
value = Value('i', 0)
array = Array('d', NUMS_PROC_INT)
show_data('parent start', value, array)
process = Process(target=show_data, args=('child', value, array))
process.start()
process.join()
print('\nloop 1 (updates in parent, serial children)...')
for i_int in range(NUMS_PROC_INT):
COUNT_INT += 1
value.value += 1
array[i_int] += 1
process = Process(
target=show_data, args=('process {}'.format(i_int), value, array)
)
process.start()
process.join()
print('\nloop 2 (updates in parent, parallel children)...')
listprocess = []
for i_int in range(NUMS_PROC_INT):
COUNT_INT += 1
value.value += 1
array[i_int] += 1
process = Process(
target=show_data, args=('process {}'.format(i_int), value, array)
)
process.start()
listprocess.append(process)
for process in listprocess:
process.join()
print('\nloop 3 (updates in serial children)...')
for i_int in range(NUMS_PROC_INT):
process = Process(target=updater, args=(value, array))
process.start()
process.join()
show_data('parent temp', value, array)
print('\nloop 4 (updates in parallel children)...')
listprocess = []
for i_int in range(NUMS_PROC_INT):
process = Process(target=updater, args=(value, array))
process.start()
listprocess.append(process)
for process in listprocess:
process.join()
show_data('parent temp', value, array)
if __name__ == '__main__':
main()
输出:multi_3.py
parent start: pid=26201 global=0 value=0 array=[0.0, 0.0, 0.0]
child: pid=26202 global=0 value=0 array=[0.0, 0.0, 0.0]
loop 1 (updates in parent, serial children)...
process 0: pid=26203 global=1 value=1 array=[1.0, 0.0, 0.0]
process 1: pid=26204 global=2 value=2 array=[1.0, 1.0, 0.0]
process 2: pid=26205 global=3 value=3 array=[1.0, 1.0, 1.0]
loop 2 (updates in parent, parallel children)...
process 0: pid=26206 global=4 value=6 array=[2.0, 2.0, 2.0]
process 1: pid=26207 global=5 value=6 array=[2.0, 2.0, 2.0]
process 2: pid=26208 global=6 value=6 array=[2.0, 2.0, 2.0]
loop 3 (updates in serial children)...
parent temp: pid=26201 global=6 value=9 array=[5.0, 5.0, 5.0]
loop 4 (updates in parallel children)...
parent temp: pid=26201 global=6 value=12 array=[8.0, 8.0, 8.0]
- 最后loop 4测试代表了共享内存最常用的的用例——在数个平行进程间分配计算工作,最后在父进程中统计结果
队列和子类
multiprocessing 模块还拥有以下特性:
- 允许模块的
Process 类创建子类,并提供架构和状态保留(很像threading.Thread ,不过是用于进程的)。 - 提供进程安全的
Queue 对象,可以再任意数量的进程间共享,满足更广泛的通信需求(很像queue.Queue ,不过是用于进程的)。
示例:multi_4.py
"multiprocessing模块的子类和队列"
import os
import time
import queue
from multiprocessing import Process, Lock, Queue
class CounterProcess(Process):
"Process的子类"
label_str = '\t@'
def __init__(self, start_int, queue, stdout_lock):
self.state_int = start_int
self.post_queue = queue
self.stdout_lock = stdout_lock
Process.__init__(self)
def run(self):
for i_int in range(3):
time.sleep(1)
self.state_int += 1
with self.stdout_lock:
print(self.label_str, self.pid, self.state_int)
self.post_queue.put([self.pid, self.state_int])
with self.stdout_lock:
print(self.label_str, self.pid, '-')
def main():
print('start', os.getpid())
expected_int = 9
post_queue = Queue()
stdout_lock = Lock()
a_counterprocess = CounterProcess(0, post_queue, stdout_lock)
b_counterprocess = CounterProcess(100, post_queue, stdout_lock)
c_counterprocess = CounterProcess(10000, post_queue, stdout_lock)
a_counterprocess.start()
b_counterprocess.start()
c_counterprocess.start()
while expected_int:
time.sleep(0.5)
try:
data_listint = post_queue.get(block=False)
except queue.Empty:
with stdout_lock:
print('no data...')
else:
with stdout_lock:
print('posted:', data_listint)
expected_int -= 1
a_counterprocess.join()
b_counterprocess.join()
c_counterprocess.join()
print('finish', os.getpid(), c_counterprocess.exitcode)
if __name__ == '__main__':
main()
输出:multi_4.py
start 33926
no data...
@ 33927 1
posted: [33927, 1]
@ 33928 101
@ 33929 10001
posted: [33928, 101]
@ 33927 2
posted: [33929, 10001]
@ 33928 102
@ 33929 10002
posted: [33927, 2]
@ 33927 3
@ 33927 -
posted: [33928, 102]
@ 33928 103
@ 33928 -
@ 33929 10003
@ 33929 -
posted: [33929, 10002]
posted: [33927, 3]
posted: [33928, 103]
posted: [33929, 10003]
finish 33926 0
- 生产者调用
time.sleep 是为了模拟长时运行任务。 - 所有4个进程共享一个输出流。
- 子进程结束后由其
exitcode 属性提供退出状态。
启动独立程序
在派生子进程中可以使用我们之前见过的os.exec 调用等工具来启动一个真正的独立程序。
示例:multi_5.py
"使用multiprocessing和os.exec起始一个新程序"
import os
from multiprocessing import Process
def run_program(id_int):
os.execlp('python', 'python', 'child.py', str(id_int))
def main():
for i_id_int in range(5):
Process(target=run_program, args=(i_id_int,)).start()
print('parent exit')
if __name__ == '__main__':
main()
输出:multi_5.py
parent exit
Hello from child! 39635 1
Hello from child! 39636 2
Hello from child! 39634 0
Hello from child! 39637 3
Hello from child! 39638 4
其他更多
示例:multi_6.py
"multiprocessing.Pool类"
import os
from multiprocessing import Pool
def powers(x):
'返回2的x次方'
return 2 ** x
def main():
workers_pool = Pool(processes=5)
results_list = workers_pool.map(powers, [2] * 100)
print(results_list[:16])
print(results_list[-2:])
results_list = workers_pool.map(powers, list(range(1, 101)))
print(results_list[:16])
print(results_list[-2:])
if __name__ == '__main__':
main()
输出:multi_6.py
[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
[4, 4]
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]
[633825300114114700748351602688, 1267650600228229401496703205376]
———————————————————————————————————————————
😃 学完博客后,是不是有所启发呢?如果对此还有疑问,欢迎在评论区留言哦。 如果还想了解更多的信息,欢迎大佬们关注我哦,也可以查看我的个人博客网站BeacherHou。
|