IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> 什么!学Python多进程的你还不知道multiprocessing模块?该充电了>_(Python编程 | 系统编程 | 并行系统工具 | multiprocessing模块) -> 正文阅读

[Python知识库]什么!学Python多进程的你还不知道multiprocessing模块?该充电了>_(Python编程 | 系统编程 | 并行系统工具 | multiprocessing模块)

我要学习

multiprocessing模块

Python标准库中的multiprocessing模块允许脚本通过与threading模块非常类似的API来派生进程,且在UnixWindows下皆可工作,因为它支持一个基本上与平台无关的进程派生模型。

基本操作:进程和锁

multiprocessing模块的Process类被设计为模拟threading.Thread类,它允许启动一个与调用者脚本并行运行函数的进程。

示例:multi_1.py

#!/usr/bin/env python
"multiprocessing模块基本操作"

from multiprocessing import Process, Lock
import os


def who_am_i(label_str, lock):
	"打印信息"
	msg_str = '{}: name={} process={}'.format(label_str, __name__, os.getpid())
	with lock:
		print(msg_str)


def main():
	lock = Lock()
	who_am_i('function call', lock)

	process = Process(target=who_am_i, args=('spawned child', lock))
	process.start()
	process.join()

	for i_id_int in range(5):
		Process(
			target=who_am_i, 
			args=('run process {}'.format(i_id_int), lock)
		).start()

	with lock:
		print('Main process exit')


if __name__ == '__main__':
	main()

输出:multi_1.py

function call: name=__main__ process=10416
spawned child: name=__main__ process=10417
run process 0: name=__main__ process=10418
run process 1: name=__main__ process=10419
run process 2: name=__main__ process=10420
Main process exit
run process 3: name=__main__ process=10421
run process 4: name=__main__ process=10422
  • 注意一下,这个脚本最后派生出的5个进程其中部分比其父进程运行的时间更长。
  • Process.start:该方法在一个新进程中调用其run方法
  • Process.run:该方法仅仅调用传入的目标函数
  • Process.join:方法等待派生进程退出
  • 我们可以如示例一样传入target,也可以定义子类的run方法。

关于实现和用法的规则

multiprocessing模块针对不同的平台有不同的可移植方案:

  • UNIX下,它分支一个新的子进程并在其中调用Process对象的run方法。
  • Windows下,它通过Windows下特有的进程创建工具来派生一个新的解释器,通过管道向新进程中传入pickle后的Process对象,并在新进程中运行python -c命令行,后者运行这个包里一个特殊的Python编码的函数来读取和unpickle这个Process对象并调用其run方法。

multiprocessing模块的基本构架仍然可以对您能够使用它的方法产生微妙的影响:

  • Windows下,主进程的逻辑业务通常嵌套在一个__name__==__main__的测试中。
  • Windows下子进程访问全局对象的时候,后者的值可能与其在父进程的起始时间不同。
  • Windows下,Process接受的所有参数必须能接受pickle操作,因此target不能是绑定或者非绑定对象的方法,也不能是lambda语句创建的函数,还不能是带有系统状态的对象,如连接上的套接字。
  • Windows下,定制的Process子类中的所有东西也必须是可被pickle的。
  • 这个包中的IPC对象,如PipeQueue也只接受可pickle的对象。
  • UNIX下,最好将对象作为参数传入子进程的构造器,这样对Windows来说有更好的可移植性,如果这种对象是父进程收集的垃圾的话,还能避免某些潜在问题。

IPC工具:管道、共享内存和队列

multiprocessing模块为它派生的进程提供了可跨平台移植的消息传递工具:

  • 模块的Pipe对象提供了一个可连接两个进程的双向匿名管道,返回两个Connection对象,代表管道的两端,发送和接受任何可被pickle操作的Python对象。目前在UNIX下,它们在内部有一对连接上的套接字或我们之前看到的os.pipe调用得以实现,而在Windows下由平台特异的具名管道实现。
  • 模块的ValueArray对象实现共享的进程、线程安全的内存以用于进程间通信。这些调用返回基于ctypes模块并在共享内存中创建的标量和数组对象,默认带有访问同步化设置。
  • 模块的Queue对象可用作Python对象一个先进先出(FIFO)的列表,其中允许多个生产者和消费者。从本质上来说,队列是一个管道加上用于协调随机访问的锁机制,并继承了Pipe可进行pickle操作的限制。

管道

示例:multi_2.py

#!/usr/bin/env python
"测试multiprocessing.Pipe"

from multiprocessing import Pipe, Process


def sender(pipe):
	"通过管道向父进程发送对象"
	pipe.send(['spam'] + [42, 'eggs'])
	pipe.close()


def talker(pipe):
	"通过管道向父进程发送和接受对象"
	pipe.send(dict(name='Bob', spam=42))
	print('talker got:', pipe.recv())


def main():
	parent_pipe, child_pipe = Pipe()
	Process(target=sender, args=(child_pipe,)).start()
	print('parent got:', parent_pipe.recv())
	parent_pipe.close()  # 或者在程序运行完后自动关闭

	parent_pipe, child_pipe = Pipe()
	child_process = Process(target=talker, args=(child_pipe,))
	child_process.start()
	print('parent got:', parent_pipe.recv())
	parent_pipe.send([x_str * 2 for x_str in 'spam'])
	child_process.join()
	print('Parent exit')


if __name__ == '__main__':
	main()

输出:multi_2.py

parent got: ['spam', 42, 'eggs']
parent got: {'name': 'Bob', 'spam': 42}
talker got: ['ss', 'pp', 'aa', 'mm']
Parent exit

共享内存和全局对象

示例:multi_3.py

#!/usr/bin/env python
"测试multiprocessing的共享内存"


import os
from multiprocessing import Process, Value, Array


NUMS_PROC_INT = 3  # 每个进程各自的全局对象,并非共享
COUNT_INT = 0


def show_data(label_str, value, array):
	"打印数据"
	msg = '{}: pid={} global={} value={} array={}'.format(
		label_str, os.getpid(), COUNT_INT, value.value, list(array)
	)
	print(msg)


def updater(value, array):
	"更新数据"
	global COUNT_INT
	COUNT_INT += 1  # 全局计数器,非共享
	value.value += 1  # 传入的对象是共享的
	for i_index_int in range(NUMS_PROC_INT):
		array[i_index_int] += 1


def main():
	global COUNT_INT
	value = Value('i', 0)  # 共享内存是进程、线程安全的
	array = Array('d', NUMS_PROC_INT)  # ctypes中的类型代码:就像int和double
	show_data('parent start', value, array)  # 在父进程中显示初始值

	# 派生子进程,传入共享内存
	process = Process(target=show_data, args=('child', value, array))
	process.start()
	process.join()

	# 传入父进程中更新过的共享内存,等待每次进程结束
	# 每个子进程看到了父进程到现在为止对args的更新(但全局变量看不到)
	print('\nloop 1 (updates in parent, serial children)...')
	for i_int in range(NUMS_PROC_INT):
		COUNT_INT += 1
		value.value += 1
		array[i_int] += 1
		process = Process(
			target=show_data, args=('process {}'.format(i_int), value, array)
		)
		process.start()
		process.join()

	# 同上,不过允许所有子进程并行运行
	# 所有进程都看到了最近一次迭代的结果,因为它们都共享这个对象
	print('\nloop 2 (updates in parent, parallel children)...')
	listprocess = []
	for i_int in range(NUMS_PROC_INT):
		COUNT_INT += 1
		value.value += 1
		array[i_int] += 1
		process = Process(
			target=show_data, args=('process {}'.format(i_int), value, array)
		)
		process.start()
		listprocess.append(process)
	for process in listprocess:
		process.join()

	# 共享内存在派生子进程中进行更新,等待每个更新结束
	print('\nloop 3 (updates in serial children)...')
	for i_int in range(NUMS_PROC_INT):
		process = Process(target=updater, args=(value, array))
		process.start()
		process.join()
	show_data('parent temp', value, array)  # 仅在父进程中全局变量COUNT_INT=6

	# 同上,不过允许子进程并行的更新
	print('\nloop 4 (updates in parallel children)...')
	listprocess = []
	for i_int in range(NUMS_PROC_INT):
		process = Process(target=updater, args=(value, array))
		process.start()
		listprocess.append(process)
	for process in listprocess:
		process.join()
	show_data('parent temp', value, array)  # 在此打印最终结果
	# value=12:父进程+6,6个子进程+6
	# array=[8.0, 8.0, 8.0]:父进程+2,6个子进程+6


if __name__ == '__main__':
	main()

输出:multi_3.py

parent start: pid=26201 global=0 value=0 array=[0.0, 0.0, 0.0]
child: pid=26202 global=0 value=0 array=[0.0, 0.0, 0.0]

loop 1 (updates in parent, serial children)...
process 0: pid=26203 global=1 value=1 array=[1.0, 0.0, 0.0]
process 1: pid=26204 global=2 value=2 array=[1.0, 1.0, 0.0]
process 2: pid=26205 global=3 value=3 array=[1.0, 1.0, 1.0]

loop 2 (updates in parent, parallel children)...
process 0: pid=26206 global=4 value=6 array=[2.0, 2.0, 2.0]
process 1: pid=26207 global=5 value=6 array=[2.0, 2.0, 2.0]
process 2: pid=26208 global=6 value=6 array=[2.0, 2.0, 2.0]

loop 3 (updates in serial children)...
parent temp: pid=26201 global=6 value=9 array=[5.0, 5.0, 5.0]

loop 4 (updates in parallel children)...
parent temp: pid=26201 global=6 value=12 array=[8.0, 8.0, 8.0]
  • 最后loop 4测试代表了共享内存最常用的的用例——在数个平行进程间分配计算工作,最后在父进程中统计结果

队列和子类

multiprocessing模块还拥有以下特性:

  • 允许模块的Process类创建子类,并提供架构和状态保留(很像threading.Thread,不过是用于进程的)。
  • 提供进程安全的Queue对象,可以再任意数量的进程间共享,满足更广泛的通信需求(很像queue.Queue,不过是用于进程的)。

示例:multi_4.py

#!/usr/bin/env python
"multiprocessing模块的子类和队列"

import os
import time
import queue
from multiprocessing import Process, Lock, Queue


class CounterProcess(Process):
	"Process的子类"

	label_str = '\t@'

	def __init__(self, start_int, queue, stdout_lock):  # 为运行中的用处保留状态
		self.state_int = start_int
		self.post_queue = queue
		self.stdout_lock = stdout_lock
		Process.__init__(self)

	def run(self):
		for i_int in range(3):
			time.sleep(1)
			self.state_int += 1
			with self.stdout_lock:
				print(self.label_str, self.pid, self.state_int)  # self.pid为进程号
			self.post_queue.put([self.pid, self.state_int])
		with self.stdout_lock:
			print(self.label_str, self.pid, '-')


def main():
	print('start', os.getpid())
	expected_int = 9

	post_queue = Queue()
	stdout_lock = Lock()
	a_counterprocess = CounterProcess(0, post_queue, stdout_lock)
	b_counterprocess = CounterProcess(100, post_queue, stdout_lock)
	c_counterprocess = CounterProcess(10000, post_queue, stdout_lock)
	a_counterprocess.start()
	b_counterprocess.start()
	c_counterprocess.start()

	while expected_int:
		time.sleep(0.5)

		try:
			data_listint = post_queue.get(block=False)
		except queue.Empty:
			with stdout_lock:
				print('no data...')
		else:
			with stdout_lock:
				print('posted:', data_listint)
			expected_int -= 1

	a_counterprocess.join()
	b_counterprocess.join()
	c_counterprocess.join()

	print('finish', os.getpid(), c_counterprocess.exitcode)


if __name__ == '__main__':
	main()

输出:multi_4.py

start 33926
no data...
	@ 33927 1
posted: [33927, 1]
	@ 33928 101
	@ 33929 10001
posted: [33928, 101]
	@ 33927 2
posted: [33929, 10001]
	@ 33928 102
	@ 33929 10002
posted: [33927, 2]
	@ 33927 3
	@ 33927 -
posted: [33928, 102]
	@ 33928 103
	@ 33928 -
	@ 33929 10003
	@ 33929 -
posted: [33929, 10002]
posted: [33927, 3]
posted: [33928, 103]
posted: [33929, 10003]
finish 33926 0
  • 生产者调用time.sleep是为了模拟长时运行任务。
  • 所有4个进程共享一个输出流。
  • 子进程结束后由其exitcode属性提供退出状态。

启动独立程序

在派生子进程中可以使用我们之前见过的os.exec调用等工具来启动一个真正的独立程序。

示例:multi_5.py

#!/usr/bin/env python
"使用multiprocessing和os.exec起始一个新程序"

import os
from multiprocessing import Process


def run_program(id_int):
	os.execlp('python', 'python', 'child.py', str(id_int))


def main():
	for i_id_int in range(5):
		Process(target=run_program, args=(i_id_int,)).start()
	print('parent exit')


if __name__ == '__main__':
	main()

输出:multi_5.py

parent exit
Hello from child! 39635 1
Hello from child! 39636 2
Hello from child! 39634 0
Hello from child! 39637 3
Hello from child! 39638 4

其他更多

示例:multi_6.py

#!/usr/bin/env python
"multiprocessing.Pool类"

import os
from multiprocessing import Pool


def powers(x):
	'返回2的x次方'
	# print('\n\t@' + str(os.getpid()), x)  # 能够监视子进程
	return 2 ** x


def main():
	workers_pool = Pool(processes=5)
	results_list = workers_pool.map(powers, [2] * 100)
	print(results_list[:16])
	print(results_list[-2:])

	results_list = workers_pool.map(powers, list(range(1, 101)))
	print(results_list[:16])
	print(results_list[-2:])


if __name__ == '__main__':
	main()

输出:multi_6.py

[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
[4, 4]
[2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536]
[633825300114114700748351602688, 1267650600228229401496703205376]

———————————————————————————————————————————

😃 学完博客后,是不是有所启发呢?如果对此还有疑问,欢迎在评论区留言哦。
如果还想了解更多的信息,欢迎大佬们关注我哦,也可以查看我的个人博客网站BeacherHou

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-08-14 13:59:04  更:2021-08-14 14:00:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 10:19:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码