目录
1 python多进程编程概述
2 需求和方案
背景:
需求:
解决思路:
需要解决的问题和方案:
3 完整代码
1 python多进程编程概述
- python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了multiprocessing。
- multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
- 与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
2 需求和方案
背景:
在服务器某文件夹路径内,会接收软件端发来的数据包(如果接收还不完整,该文件夹名称会以‘-downloading’为后缀),数据包是文件夹格式,里面有个和文件夹名称同名的ZIP包。
算法需要对发来的数据包进行解压缩,然后处理压缩包内的数据,待数据处理完后,这个文件夹及其内容会移动到另外一个地方归档。
需求:
服务器需要支持多并发。
解决思路:
考虑到python多线程鸡肋,采用多进程实现。
需要解决的问题和方案:
- 如何避免同一个数据包被多个进程解析?
- 方案:使用队列来构建生产者(主进程)消费者(子进程)模式。所有新生成的数据包,其文件夹名称统一放入一个共享队列中,所有进程从该队列中取数据包名称,该队列自带‘锁’的功能。
- 主进程被杀死时,如何保证子进程同时退出,而不变为孤儿进程?
- 方案:构建‘进程组’形式 + 向进程组发送SIGKILL信号
- 参考:
https://blog.csdn.net/lucia555/article/details/105957928/ ?- https://www.cnblogs.com/domestique/p/8241219.html
3 完整代码
from multiprocessing import Process, Queue
import os
import time
import signal
import zipfile
import shutil
def unzip_file(sample_key_pair):
try:
zip_name = sample_key_pair + '.zip'
sampleraw_zip = os.path.join('./zip/'+ sample_key_pair, zip_name)
with zipfile.ZipFile(sampleraw_zip) as z:
z.extractall(path='./zip/'+ sample_key_pair, members=None, pwd=None)
except Exception as e:
print('Fail to unzip file: {}', e)
def mv_dir(sample_key_pair):
shutil.move('./zip/'+ sample_key_pair, './mv_zip')
def gan_huo_de_jin_cheng(zip_Queue):
print('子进程 pid 是 %s, group id is %s' % (os.getpid(), os.getppid()))
while True:
if not zip_Queue.empty():
# 1 抽取一个压缩包
sample_key_pair = zip_Queue.get()
print('子进程:',os.getpid(),'获取压缩包',sample_key_pair)
# 2 解压
unzip_file(sample_key_pair)
# 3 移动文件夹
mv_dir(sample_key_pair)
#一旦主进程被杀死,子进程也全部关闭。
def term(sig_num, addtion):
print('term current pid is %s, group id is %s' % (os.getpid(), os.getppid()))
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)
if __name__== '__main__': #在测试多进程编程时,需要加这个,不然调试报错。
raw_data_root = './zip'
signal.signal(signal.SIGTERM, term)
print('主进程 pid 是 %s' % os.getpid())
zip_Queue = Queue() #队列不指定大小,就无限大使用空间,队列使用内存的空间
zip_list = [] #搞个zip_Queue同内容的复制品,因为Queue中内容无法遍历和查询。
#创建子进程
for i in range(3):
t = Process(target=gan_huo_de_jin_cheng, args=(zip_Queue, ))
t.daemon = True
t.start()
#遍历到新的压缩包,就压入队列
while True:
time.sleep(1) #1秒检索一次文件夹
sample_raw_list = os.listdir(raw_data_root)
if len(sample_raw_list) != 0:
# 1 检索文件夹内是否有 ‘完好’ 的压缩包,且压缩包名字和队列中名字不一致。
for sample_raw_dir in sample_raw_list:
sample_key_pair = sample_raw_dir.split('/')[-1]
if(sample_key_pair.endswith("-downloading")): #如果数据包还没下载好,跳过
continue
# 如果队列中没有此zip包文件夹,就加入队列
if sample_key_pair not in zip_list:
zip_Queue.put(sample_key_pair)
zip_list.append(sample_key_pair)
if len(zip_list) > 100:
zip_list.pop(0)#删除第1个。(当zip_list超过100个时,进一个就删一个,防止zip_list越来越大。)
主要参考:
- python并发编程之多进程(实践篇) - anne199534 - 博客园
- Python 3 并发编程多进程之队列(推荐使用) - 鱼皮弟 - 博客园
- python 并发编程 多进程 队列 - minger_lcm - 博客园
- python多进程编程,如何让主进程和子进程都退出_走走看看-CSDN博客_python主进程退出时子进程也退出
- 主进程被杀死时,如何保证子进程同时退出,而不变为孤儿进程 - shy车队破风手 - 博客园
- pytorch使用出现"RuntimeError: An attempt has been made to start a new process before the..." 解决方法 - 灰信网(软件开发博客聚合)
- put_nowait与get_nowait - 江湖乄夜雨 - 博客园
- Python|队列Queue - yangyidba - 博客园
- Python 队列(Queue)用法_大魔王的博客-CSDN博客_python queue
|