在python中,如果要做多任务并行的编程,必须要掌握multiprocessing 库的相关运用。在python的multiprocessing官方文档中,已然详细给出了multiprocessing 库的相关用法。多进程编程其实还是有很多坑存在的,为了进一步探索python多进程的机制,提升对python多进程编程的理解,本篇文章会对多进程模块的实现进行一次详细的剖析。
多进程编程的第一话,首先来聊聊一个新的python子进程是如何诞生的。
首先我们需要了解这么一个事情,python创建的进程之间模块状态是相互隔离的。在多进程的场景下,代码中定义的各种变量,其值并不一定会共享。我们举个例子:
import multiprocessing as mp
import threading
import os
import mp_module
"""
# mp_module.py
FLAG = -1
def get_flag():
return FLAG
def init_flag():
global FLAG
FLAG = 1
"""
def _test_module_isolation():
from multiprocessing import process
parent = process.parent_process()
if parent:
print(f'[{os.getpid()}] {parent.pid}')
print(f'[{os.getpid()}] [{threading.current_thread()}] {mp_module.get_flag()}')
def test_module_isolation():
print(f'main process pid: {os.getpid()}')
p = mp.Process(target=_test_module_isolation)
p.start()
_test_module_isolation()
p.join()
if __name__ == '__main__':
mp.set_start_method('spawn')
mp_module.init_flag()
test_module_isolation()
在函数test_module_isolation 中,首先是调用mp_module.init_flag() 改变了mp_module 模块一个变量的值,之后同时在主进程和新起的进程调用了_test_module_isolation() 去打印mp_module 中的变量。结果如下:
main process pid: 14836
[14836] [<_MainThread(MainThread, started 20244)>] 1
[22212] 14836
[22212] [<_MainThread(MainThread, started 23804)>] -1
可以看到,主进程(14836)中打印出来的变量是初始化过的,而子进程(22212)打印出来的变量是没有经过初始化的。
通过Process 实例创建出来的进程对象,调用start 方法即可启动新进程,执行target 对应的方法。
def start(self):
'''
Start child process
'''
self._check_closed()
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
_cleanup()
self._popen = self._Popen(self)
self._sentinel = self._popen.sentinel
del self._target, self._args, self._kwargs
_children.add(self)
class SpawnProcess(process.BaseProcess):
_start_method = 'spawn'
@staticmethod
def _Popen(process_obj):
from .popen_spawn_win32 import Popen
return Popen(process_obj)
创建进程的方式有多种,其中spawn 模式为windows/linux系统下均兼容的,且为windows的默认模式。在Process._Popen 中,会通过_default_context.get_context() 获取当前的进程启动模式。
以windows下的spawn 模式为例,我们看下进程启动的代码的实现。主要逻辑分布在3个地方:
popen_spawn_win32.Popen multiprocessing/spawn.py process.BaseProcess
class Popen(object):
def __init__(self, process_obj):
util._flush_std_streams()
self.returncode = None
self.finalizer = None
self._launch(process_obj)
class Popen(object):
method = 'spawn'
def __init__(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
rhandle, whandle = _winapi.CreatePipe(None, 0)
wfd = msvcrt.open_osfhandle(whandle, 0)
cmd = spawn.get_command_line(parent_pid=os.getpid(),
pipe_handle=rhandle)
cmd = ' '.join('"%s"' % x for x in cmd)
python_exe = spawn.get_executable()
if WINENV and _path_eq(python_exe, sys.executable):
python_exe = sys._base_executable
env = os.environ.copy()
env["__PYVENV_LAUNCHER__"] = sys.executable
else:
env = None
with open(wfd, 'wb', closefd=True) as to_child:
try:
hp, ht, pid, tid = _winapi.CreateProcess(
python_exe, cmd,
None, None, False, 0, env, None, None)
_winapi.CloseHandle(ht)
except:
_winapi.CloseHandle(rhandle)
raise
self.pid = pid
self.returncode = None
self._handle = hp
self.sentinel = int(hp)
self.finalizer = util.Finalize(self, _close_handles,
(self.sentinel, int(rhandle)))
set_spawning_popen(self)
try:
reduction.dump(prep_data, to_child)
reduction.dump(process_obj, to_child)
finally:
set_spawning_popen(None)
def get_command_line(**kwds):
if getattr(sys, 'frozen', False):
return ([sys.executable, '--multiprocessing-fork'] +
['%s=%r' % item for item in kwds.items()])
else:
prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)'
prog %= ', '.join('%s=%r' % item for item in kwds.items())
opts = util._args_from_interpreter_flags()
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
assert is_forking(sys.argv), "Not forking"
if sys.platform == 'win32':
import msvcrt
import _winapi
if parent_pid is not None:
source_process = _winapi.OpenProcess(
_winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE,
False, parent_pid)
else:
source_process = None
new_handle = reduction.duplicate(pipe_handle,
source_process=source_process)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
parent_sentinel = source_process
else:
from . import resource_tracker
resource_tracker._resource_tracker._fd = tracker_fd
fd = pipe_handle
parent_sentinel = os.dup(pipe_handle)
exitcode = _main(fd, parent_sentinel)
sys.exit(exitcode)
def _main(fd, parent_sentinel):
with os.fdopen(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True
try:
preparation_data = reduction.pickle.load(from_parent)
prepare(preparation_data)
self = reduction.pickle.load(from_parent)
finally:
del process.current_process()._inheriting
return self._bootstrap(parent_sentinel)
class BaseProcess(object):
def _bootstrap(self, parent_sentinel=None):
try:
try:
self.run()
exitcode = 0
finally:
util._exit_function()
except SystemExit as e:
if e.code is None:
exitcode = 0
elif isinstance(e.code, int):
exitcode = e.code
else:
sys.stderr.write(str(e.code) + '\n')
exitcode = 1
except:
exitcode = 1
finally:
threading._shutdown()
util.info('process exiting with exitcode %d' % exitcode)
util._flush_std_streams()
return exitcode
def run(self):
if self._target:
self._target(*self._args, **self._kwargs)
这里经历了以下几个步骤:
- 用
_winapi.CreatePipe 创建一对用于进程间通信的handle
rhandle 会传给子进程,whandle 会被主进程用于给子进程发送数据 - 获取构建子进程运行时环境所需要的准备数据
prep_data
- 包括
sys.argv 、sys.path 、日志配置、初始化__main__ 模块的文件路径等 - 通过
spawn.get_command_line 初始化spawn 进程所需的命令
- 从函数定义中可以看到,子进程执行的命令中会调用
spawn.spawn_main 函数 - 调用
_winapi.CreateProcess 生成子进程 - 通过
whandle 转换后的fd向子进程写入序列化后的准备数据以及自身Process 实例数据 - 子进程在
spawn_main 函数将传入的rhandle 转化为fd,在_main 函数中接收主进程传入的数据,并执行对应操作
- 接收
prep_data ,初始化子进程运行时环境。其中会重新执行原来__main__ 模块对应的代码块
- 这里注意,子进程执行原来
__main__ 模块的代码块,是以__mp_main__ 这个模块名执行的。因此,原来代码里头if __name__ == '__main__' 下面的都执行不到。所以,子进程跑不到mp_module.init_flag() ,里面的flag 值当然没有被初始化。如果要让flag 被初始化的话,相信聪明的你知道怎么做。 - 执行
Process 实例的_bootstrap 函数。_bootstrap 函数最终调用run 函数,执行_target 对应的内容
这样,新的python进程就被创建并开始运行了。
|