python ctrl-c 无法终止 multiprocessing pool
github pages: perfectnewer.gitub.io gitee pages: perfectnewer.gitee.io
前言
最近对一个线上脚本进行优化,因为时间紧逻辑比较多,后面还有一堆事情,所以决定先改成多进程的方式。此处记录一下遇到的问题。
问题1:在multiprocessing中ctrl-c 无法终止运行
一开始我是质疑这些答案的,毕竟我的代码可以正常接收ctrl-c,但是最后发现本质原因是一样的,方案也是通用。那么对根因的探索祥见另外一篇python2 multiprocess pool源码简单解读
先说解决方案:
参考链接
此处贴上stack overflow的代码
注意:
- 除了resul .get会block以外,pool.join也会block信号
- 我的代码和他的不一样,我的脚本是长期运行的。我是能收到信号的。但是整个进程也没有退出,原因是worker异常退出导致。虽然原因不同,但是这个方案是可行的。因为worker不会因为ctrl-c异常退出了
from __future__ import print_function
import multiprocessing
import os
import signal
import time
def run_worker(delay):
print("In a worker process", os.getpid())
time.sleep(delay)
def main():
print("Initializng 2 workers")
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = multiprocessing.Pool(2)
signal.signal(signal.SIGINT, original_sigint_handler)
try:
print("Starting 2 jobs of 5 seconds each")
res = pool.map_async(run_worker, [5, 5])
print("Waiting for results")
res.get(60)
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
pool.terminate()
else:
print("Normal termination")
pool.close()
pool.join()
if __name__ == "__main__":
main()
此处放上我的代码逻辑
def mgr():
import multiprocessing
multiprocessing.log_to_stderr()
logger.info("start")
origin_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
pool = Pool(3)
stopper = Stopper()
signal.signal(signal.SIGINT, origin_handler)
signal.signal(signal.SIGTERM, functools.partial(stop, stopper))
idx = 1
working_process = []
while not stopper.is_stop():
try:
v = r.blpop("test", 120)
...
logger.info(v)
rst = pool.apply_async(worker, args=(idx, ))
working_process.append(rst)
except KeyboardInterrupt:
logger.info("bk error ======== parent {}".format(os.getpid()))
break
except Exception:
pass
while len(working_process) >= 3:
logger.info("wait exit")
readys = []
for idx, rst in enumerate(working_process):
rdy = rst.ready()
print('1: ', idx, ' ', rdy)
if rdy:
readys.append(idx)
if not readys:
time.sleep(10)
else:
for idx in readys:
working_process[idx] = None
working_process = filter(lambda x: x is not None, working_process)
idx += 1
logger.info("exit")
pool.close()
pool.join()
根本原因:Condition.wait
根本原因其实取决于你的具体代码逻辑。目前我遇到的有两个
问题2:pool中放入了过多的待处理任务
这个问题的原因是对进程池的理解不到位。multiprocessing.Pool 只是限制了执行任务的进程的个数,并没有限制用户放入的任务数量。虽然它保证只有规定个数的进程运行。但是你可以一直往pool里添加task。
解决方案就是自己需要对处理的任务数量统计计数。code
问题3:内存泄漏
这个一开始就能预料到了。这个主要是因为我们业务代码有进程缓存,只要定期清理就可以了。那么方案也有两个
- 自己定期清理
- multiprocessing有个maxtasksperchild参数,表示一个进程多少次任务后就销毁,重新创建新的进程
|