一、进程概念
在了解进程之前,先了解多任务。多任务就是指操作系统能够执行多个任务。例如,使用Window或Linux操作系统可以同时看电影、聊天、查看网页信息等,此时操作系统就是在执行多任务,而每一个任务就是一个进程。
进程(process)是计算机已经运行程序的实体。进程与程序不同,程序本身只是指令、数据及其组织形式的描述,而进程才是(指令和数据)的真正运行实例。
二、创建进程的常用方式
1、使用multiprocessing模块创建进程
multiprocessing模块提供了一个Process类来代表一个进程对象 语法格式:
Process(group,target,name,args,kwargs)
- group:参数未使用,值始终为None
- target:表示当前进程启动时执行的可调用对象
- name:为当前进程实例的别名
- args:表示传递给target函数的参数元组
- kwargs:表示传递给target函数的参数字典
from multiprocessing import Process
#执行子进程代码
def test():
print("当前子进程")
#执行主程序
def main():
print("主进程开始")
p=Process(target=test) #实例化Process进程类
p.start() #启动子进程
#p.join() #这个是等子进程全部运行完再运行主进程
print("主进程结束")
if __name__=="__main__":
main()
发现会优先执行主线程,在执行子线程 开启p.join()后
先实例化Process类,然后使用p.start()方法启动子进程 | 开始执行test()函数Process的实例p常用的方法除start()外,还有如下常用方法: |
---|
run() | 如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法. | is_alive() | 判断进程实例是否还在执行。 | join([timeout]) | 是否等待进程实例执行结束,或等待多少秒。 | start() | 启动进程实例(创建子进程)。 | terminate() | 不管任务是否完成,立即终止。 |
Precess类还有如下常用属性: |
---|
name :当前进程实例别名,默认为Process-N,N为从1开始递增的整数。 | pid :当前进程实例的PID值。 |
示例:创建2个子进程,分别使用os模块和time模块输出父进程和子进程的ID以及子进程的时间,并调用Process类的name和 pid属性,代码如下:
from multiprocessing import Process
import time
import os
# os.getpid()可获取当前进程id,返回值为int
# os.getppid()可获取父进程id,返回值为int
#两个子进程将会调用的俩个方法
def child_1(delay):
print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid()))
t_start = time.time() #计时开始
time.sleep(delay) #程序将会被挂起delay秒
t_end = time.time() #即使结束
print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start))
def child_2(delay):
print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid()))
t_start = time.time() #计时开始
time.sleep(delay) #程序将会被挂起delay秒
t_end = time.time() #即使结束
print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start))
if __name__ == "__main__":
print("-------父进程开始执行-------")
print("父进程PID:%s"%os.getppid) #输出当前程序的PID
p1=Process(target=child_1,args=(1,)) #实例化进程p1
p2=Process(target=child_2,name="two",args=(2,)) #实例化进程p2
p1.start() #启动进程p1
p2.start() #启动进程p2
#同时父进程仍在往下执行,如果p1、p2进程还在执行,将会返回True
print("p1.is_alive=%s"%p1.is_alive())
print("p2.is_alive=%s"%p2.is_alive())
#输入p1和p2进程的别名和PID
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
print("----------等待子进程--------")
p1.join() #等待p1进程结束
p2.join() #等待p2进程结束
print("----------父进程执行结束---------")
2、使用Process子类创建进程
对于一些简单的小任务,通常使Process(target=test)方式实现多进程。但是如果要处理复杂任务的进程,通常定义一个类,使其继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象。
from multiprocessing import Process
import time
import os
#继承Process类
class SubProcess(Process):
#由于Process类本身也有__init__初始化方法,这个子类相当于重写了父类的这个方法
def __init__(self,delay,name=""):
Process.__init__(self) #调用Process父类的初始化方法
#self.delay相当于全局变量
self.delay = delay #接收参数delay
if name: #判断传递的参数是否存在
self.name = name #如果传递参数name,则为子进程创建的name属性,否自使用默认属性
def run(self):
print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid()))
t_start = time.time() #计时开始
time.sleep(self.delay) #程序将会被挂起delay秒
t_end = time.time() #即使结束
print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start))
if __name__ == "__main__":
print("-------父进程开始执行-------")
print("父进程PID:%s"%os.getppid) #输出当前程序的PID
p1 = SubProcess(delay=1)
p2 = SubProcess(delay=2,name = "two")
#对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法
p1.start() #启动进程p1
p2.start() #启动进程p2
#输出p1和p2进程的执行状态,如果真正执行,返回True,否则返回False
print("p1.is_alive=%s"%p1.is_alive())
print("p2.is_alive=%s"%p2.is_alive())
#输入p1和p2进程的别名和PID
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
print("----------等待子进程--------")
p1.join() #等待p1进程结束
p2.join() #等待p2进程结束
print("----------父进程执行结束---------")
上述代码中,定义了一个SubProcess子类,继承multiprocess.Process 父类。SubProcess子类中定义了2个方法:init()初始化方法和 run()方法。在_init()初识化方法中,调用multiprocess.Process父类的_init()初始化方法,否则父类初始化方法会被覆盖,无法开启进程。此外,在 SubProcess子类中并没有定义start()方法,但在主进程中却调用了start()方法,此时就会自动执行SubPorcess类的run()方法。运行结果如图所示。
3、 使用进程池Pool创建进程
Pool进程池。为了更好的理解进程池,可以将进程池比作水池。我们需要完成放满10个水盆的水的任务,而在这个水池中,最多可以安放3个水盆接水,也就是同时可以执行3个任务,即开启3个进程。为更快完成任务,现在打开3个水龙头开始放水,当有一个水盆的水接满时,即该进程完成1个任务,我们就将这个水盆的水倒入水桶中,然后继续接水,即执行下一个任务。如果3个水盆每次同时装满水,那么在放满第9盆水后,系统会随机分配1个水盆接水,另外2个水盆空闲。
接下来,先来了解一下Pool类的常用方法。常用方法及说明如下:
- apply_async(func[, args[ ,kwds]]):使用非阻塞方式调用func()函数,args为传递给func()函数的参数列表,kwds为传递给func()函数的关键字参数列表。
- apply(func[,args[,kwds]]):使用阻塞方式调用func()函数(开行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程)。
- close():关闭Pool,使其不再接受新的任务。
- terminate():不管任务是否完成,立即终止。
- join():主进程阻塞,等待子进程的退出,必须在close或terminate之后使用。
关于什么是阻塞调用和非阻塞调用个人理解如下: 如果使用阻塞方式,必须等待上一个进程退出才能执行下一个进程,而使用非阻塞方式,则可以并行执行3个进程。
下面通过一个示例演示一下如何使用进程池创建多进程。这里模拟水池放水的场景,定义一个进程池,设置最大进程数为3。然后使用非阻塞方式执行10个任务,查看每个进程执行的任务。具体代码:
from multiprocessing import Pool
import os,time
def task(name):
print("子进程:%s 执行task:%s " % (os.getpid(),name))
time.sleep(1)
if __name__ == "__main__":
print("父进程:%s"%os.getpid())
p = Pool(3) #定义一个进程池,最大进程为3
for i in range(10):
p.apply_async(task,args=(i,)) #使用非阻塞方式调用task()函数
print("等待所有子进程结束....")
p.close() #关闭进程池,关闭后p不在接收新的请求
p.join() #等待子进程结束
print("所有子进程结束")
发现pid:3972执行了四次,而pid:15464和8704只执行了三次
阻塞方式:
from multiprocessing import Pool
import os,time
def task(name):
print("子进程:%s 执行task:%s " % (os.getpid(),name))
time.sleep(1)
if __name__ == "__main__":
print("父进程:%s"%os.getpid())
p = Pool(3) #定义一个进程池,最大进程为3
for i in range(10):
# p.apply_async(task,args=(i,)) #使用非阻塞方式调用task()函数
p.apply(task,args=(i,)) #使用阻塞方式调用task()函数
print("等待所有子进程结束....")
p.close() #关闭进程池,关闭后p不在接收新的请求
p.join() #等待子进程结束
print("所有子进程结束")
三、通过队列实现进程间通信
1、不通过其他条件,进程之间是否能共享信息?
from multiprocessing import Process
def add():
print("......子进程1开始......")
global num
num += 50
print("num:%d"%num)
print("......子进程1结束......")
def sub():
print("......子进程2开始......")
global num
num -= 50
print("num:%d"%num)
print("......子进程2结束......")
num = 100 #定义一个全局变量
if __name__=="__main__":
print(".....主进程开始.....")
print("num:%d"%num)
#实例化进程p1,p2
p1 = Process(target=add)
p2 = Process(target=sub)
#开启进程p1,p2
p1.start()
p2.start()
#阻塞主进程,等待进程结束
p1.join()
p2.join()
print(".....主进程结束.....")
上述代码中,分别创建了2个子进程,一个子进程中令num 加上50,另一个子进程令num减去50。但是从运行结果可以看出,num在父进程和2个子进程中的初始值都是100。也就是全局变量num在一个进程中的结果,没有传递到下一个进程中,即进程之间没有共享信息。进程间示意图如图所示。 要如何才能实现进程间的通信呢?Python的multiprocessing模块包装了底层的机制,提供了Queue(队列)、Pipes(管道)等多种方式来交换数据。以下通过队列(Queue)来实现进程间 的通信。
2、多线程队列的简单使用
进程之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递。Queue本身是一个消息列队程序,下面介绍一下Queue的使用。 初始化Queue()对象时(例如:q=Queue(num)),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。
Queue的常用方法如下;
-
Queue.qsize() :返回当前队列包含的消息数量。 -
Queue.empty():如果队列为空,返回 True;反之返回False 。 -
Queue.full():如果队列满了,返回 True;反之返回False。 -
Queue.get([block[, timeout]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True。 如果block使用默认值,且没有设置timeout(单位秒),消息列队为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止。如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常。 如果block值为False,消息列队为空,则会立刻抛出“Queue.Empty”异常 -
Queue.get_nowait(:相当Queue.get(False)。 -
Oueue.put(item,[block[, timeout]]):将item消息写入队列,block 默认值为True。 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout则会等待timeout秒,若还没空间,则抛出“Queue.Full”异常。 如果block值为False,消息列队没有空间可写入,则会立刻抛出“Queue.Full”异常 -
Oueue.put_nowait(item):相当Queue.put(item, False)。
下面,通过一个例子学习一下如何使用processing.Queue。代码如下:
from multiprocessing import Queue
if __name__=="__main__":
q = Queue(3) #初始化一个queue对象,最多可接收三条put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #q.full() 判断当前队列是否满了
q.put("消息3")
print(q.full())
#因为消息队列已满,下面的try都会抛出异常
#第一个try会等待2秒后在抛出异常,第二个try会立刻抛出异常
try:
q.put("消息4",True,2)
except:
print("消息队列已满,现有消息数量:%s"%q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息队列已满,现有消息数量:%s"%q.qsize())
#读取消息时,先判断消息队列是否为空,在读取
if not q.empty():
print("----从队列中获取消息----")
for i in range(q.qsize()):
print(q.get_nowait())
#先判断消息队列是否已将满,在写入
if not q.full():
q.put_nowait("消息4")
if not q.empty():
print("----从队列中获取消息----")
print(q.get_nowait())
3、使用队列在进程中通信
我们知道使用multiprocessing.Process 可以创建多进程,使用multiprocessing.Queue可以实现队列的操作。接下来,通过一个示例结合Process和 Queue实现进程间的通信。创建2个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。为保证能够正确从队列中读取数据,设置读取数据的进程等待时间为2秒。如果2秒后仍然无法读取数据,则抛出异常。代码如下:
from multiprocessing import Process,Queue
import time
#向队列写入数据
def write_task(q):
if not q.full():
for i in range(5):
message = "消息" + str(i)
q.put(message)
print("写入:%s"%message)
#向队列读取数据
def read_task(q):
time.sleep(1)
while not q.empty():
#等待2秒,如果还没有读取到任何消息,则抛出"Queue.Empty"
print("读取:%s"%q.get(True,2))
if __name__=="__main__":
print("----父进程开始----")
#父进程创建
q = Queue()
pw = Process(target=write_task,args=(q,)) #实例化写入队列的子进程,并且传递队列
pr = Process(target=read_task,args=(q,)) #实例化读取队列的子进程,并且传递队列
pw.start() #启动子进程pw,写入
pr.start() #启动子进程pr,读取
pw.join() #等待pw结束
pr.join() #等待pr结束
print("----父进程结束----")
|