Python并发编程
一、前言
1.为什么要搞并发编程
在大学阶段,科班出身的应该都学过《操作系统》这门课程,里面花了大篇幅的时间 去介绍进程、线程、并发、并行等概念。那么,并发编程需要各位的操作系统基础,本文不会对操作系统的部分进行过多的解释。
在这里,我们只简单的告诉大家为什么要搞并发编程。举一个例子:有一大堆砖需要搬,一个人干肯定要花很多时间(因此,如果你只会if……elif,那么很多时候,真的会拖慢运行速度)。多线程就好比是增加了人手,所以干的快。简而言之:就是为了增加运行速度
而随着编程的不断深入,为了使运行速度更快,并发编程几乎是任何程序员都绕不过去的东西。
2.增加运行速度的方法
在现实当中,我们有很多办法来提升程序的运行速度,比如说:我可以对某个算法进行优化,但是,这种优化往往比较有限。因此,我们可以采用多线程,多CPU,以及多机器并行等方式。
其中,多机器并行当中,就包括我们常常听说的大数据之类的东西,包括Spark,Hadoop等。而大数据是一个单独的学科,非常值得研究,因此在这里,我们只介绍一下跟并发编程相关的东西。
3.实现Python并发的方式
主要有三个:多线程,多进程,多协程。学过《操作系统》的同学,应该对这三个概念都不陌生。如果要了解这三个概念之间到底是什么关系,我们首先得了解一下CPU密集型,以及I/O密集型
3.1程序分类
3.1.1 CPU密集型(CPU-bound)
其实通过英文翻译,不难发现,英文和中文翻译还是有点意思上的差异的。所谓的CPU-bound,其实就是说:运行的速度最终会受到CPU计算的限制。因此CPU-bound有些时候也被翻译成计算密集型。具体就是指:I/O相对很少,但是却需要CPU进行非常大的计算处理,因此,CPU占用率非常的高。比如说:压缩、解压缩;加密/解密;正则表达式搜索等等。
3.1.2 I/O密集型(I/O bound)
这个是与CPU-bound相对的概念。与CPU-bound相反,此类程序计算相对较少,但是要花费大量的时间在I/O上面,CPU占用率低,但是运行速度却怎么也提不上去。这个时候就要考虑异步I/O(异步IO其实属于多协程)。那么此类程序包括:文件处理程序、爬虫程序、数据库的读取程序等。
3.2 多线程,多进行,多协程的对比
在《操作系统》这门课程当中,科班的同学大体都比较熟悉这几段话:
- 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是结构的基础。
- 线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元
- 协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程
如果用一幅图,来表示这三者的关系,那么就是:
但是,除了上面这些东西,还应当了解:
3.3如何选择
那么针对以上这些情况,Python都为我们提供了什么方法呢?
- 为了避免对资源的访问冲突,Python为我们提供了Lock等,对资源访问进行控制。
- 为了实现不同线程,不同进程之间的通信,Python提供了Queue等方法,我们可以用这个来实现生产者-消费者问题
- 为了简化线程或者进程的任务管理,Python为我们提供了线程池Pool
- 我们可以使用subprocess来启动外部程序的进程,实现并行输入输出的交互。
4.Python的全局解释器锁
为什么要介绍这个呢?因为它是Python运行比较慢的重要原因,当然了,除了底层封装的原因,Python本身边解释,边运行的特点,也注定Python确实比较慢。
全局解释器锁:(Global Interpreter Lock,简称:GIL)。由于并发,并行引发的进程或者线程的不同步,因此,需要一个机制让各个进程的运行保持同步。这就是GIL诞生的原因。但是,它比较简单粗暴,它使得任何时刻仅有一个线程在执行。即使是多核处理器,GIL也是如此
即:在计算的时候,开启GIL,I/O的时候,关闭GIL
Python之所以引入这么一个GIL,是历史遗留问题。由于Python最初就是用来做数据处理的,既然用来计算数据,我们并不希望由于进程或者线程不同步而引发数据计算出现结果不一致的问题,所以才引入了GIL机制,但是,随着Python的发展,Python应用的领域越来越多,网页编程,游戏开发等也会用Python,这个情况下,GIL反而成了累赘。
因此,在Python的并发编程当中,我们更多的还是要针对I/O,如果把多线程用于CPU密集型计算,由于GIL的存在,反而会拖慢速度。
与此同时,Python的开发者们也意识到了这个问题,于是就想到了一个办法:既然GIL只是针对线程的。那么,我用多进程不就可以了。所以Python才会出现multiprocessing
二、多进程编程
1.进程创建步骤
进程的创建大致分为如下几个步骤:
-
导入进程包: import multiprocessing
-
通过进程类创建进程对象 pro = multiprocessing.Process()
其中,关于这个Process,解释如下: Process(group=None, target=None, name=None, args=(), kwargs={})
"""
参数说明:
1 group——参数未使用,值始终为None
2 target——表示调用对象,即子进程要执行的任务
3 args——表示调用对象的位置参数元组
4 kwargs——表示调用对象的字典
5 name——为进程的名称
"""
-
启动进程执行相关任务 pro.start()
2.多进程演示
import time
import multiprocessing
def sing():
for i in range(3):
print("sing……")
time.sleep(0.5)
def dance():
for i in range(3):
print("dance……")
time.sleep(0.5)
if __name__ == "__main__":
sing_process = multiprocessing.Process(target = sing)
dance_process = multiprocessing.Process(target = dance)
sing_process.start()
dance_process.start()
那么,Process当中的字典或者元组参数呢?
import time
import multiprocessing
def sing(num, name):
for i in range(num):
print("sing……")
time.sleep(0.5)
def dance(num,name):
for i in range(num):
print("dance……")
time.sleep(0.5)
if __name__ == "__main__":
sing_process = multiprocessing.Process(target = sing,args = (3,'xiaomi'))
dance_process = multiprocessing.Process(target = dance,kwargs = {'name':'xiaohong','num':2})
sing_process.start()
dance_process.start()
那么上面这个程序就相当于,给sing,dance加了主语,并且还限定的循环次数。那么上面这个程序的运行结果,就是xiaomi sing……运行三次,xiaohong dance……运行两次。
注意:
- 如果要传入元组,那么元组的顺序要和参数的顺序保持一致,比如说:上面这个程序函数是sing(num,name)当中,那么你传入的元组第一个一定要是num,第二个才能是name,否则会出现一些异常
- 如果要传入字典:那么key和value对应上即可。
3.获取进程编号
在现实开发当中,往往可能并发程度很高。所以,进程数量就会很多。如果没有办法区分父进程,子进程,那么势必就会造成混乱。于是,进程当都要赋予他们编号(也就是《操作系统》当中经常提及的pID),方便管理。
获取进程主要有两种方法:
- os.getpid():获取当前进程的编号
- os.getppid():获取父进程的编号
import time
import multiprocessing
import os
def sing(num, name):
print("sing进程编号:", os.getpid())
print("sing父进程:", os.getppid())
for i in range(num):
print(name + " sing……")
time.sleep(0.5)
def dance(num, name):
print("dance进程编号:", os.getpid())
print("dance父进程:", os.getppid())
for i in range(num):
print(name + " dance……")
time.sleep(0.5)
if __name__ == "__main__":
print("主进程id", os.getpid())
sing_process = multiprocessing.Process(target=sing, args=(3, 'xiaomi'))
dance_process = multiprocessing.Process(target=dance, kwargs={'name': 'xiaohong', 'num': 2})
sing_process.start()
dance_process.start()
输出结果:
主进程id 19144
dance进程编号: 17128
sing进程编号: 25512
sing父进程: 19144
xiaomi sing……
dance父进程: 19144
xiaohong dance……
xiaomi sing……
xiaohong dance……
xiaomi sing……
我们只看各个进程的id,会发现sing和dance是两个不同的进程,拥有两个不同的进程编号。但是这两个都属于一个父进程:19144
4.一些要点
首先,默认情况下,主进程会在所有子进程都执行完毕后,才会关闭,我们用以下代码验证一下:
import time
import multiprocessing
import os
def work():
for i in range(3):
print('working')
time.sleep(0.2)
if __name__ == '__main__':
work_process = multiprocessing.Process(target = work)
work_process.start()
time.sleep(1)
print("主进程finish")
输出结果:
working
working
working
主进程finish
如果,我不想这样呢?我想主进程一结束,子进程也要跟着结束。这个时候就需要另外一个东西:主进程守护。要点只有一个,看代码:
import time
import multiprocessing
import os
def work():
for i in range(20):
print('working')
time.sleep(0.2)
if __name__ == '__main__':
work_process = multiprocessing.Process(target = work)
work_process.daemon = True
work_process.start()
time.sleep(1)
print("主进程finish")
working
working
working
working
working
主进程finish
我们看输出结果,我们设置循环了20次,如果还是默认情况下,一定会执行20次,但是设置了守护主进程,主进程已结束,work也就连带着不执行了。
5.Process()的常用方法
我们假设,我们创建了一个进程p,那么:
is_alive() | 如果p仍然运行,返回True |
---|
join([timeout]) | 等待进程p终止。Timeout是可选的超时期限,进程可以被链接无数次,但如果连接自身则会出错 | run() | 进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是继承Process类并重新实现run()函数 | start() | 启动进程,这将运行代表进程的子进程,并调用该子进程中的run()函数 | terminate() | 强制终止进程。如果调用此函数,进程p将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需要特别小心。如果p保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或I/O损坏 |
这当中,join方法好像比较抽象,我们用代码演示一下:
import time
import multiprocessing
def sing(num, name):
for i in range(num):
print(name + " sing……")
time.sleep(0.5)
def dance(num, name):
for i in range(num):
print(name + " dance……")
time.sleep(0.5)
if __name__ == "__main__":
sing_process1 = multiprocessing.Process(target=sing, args=(2, 'xiaomi'))
sing_process2 = multiprocessing.Process(target = sing, args = (3,'xiaoGang'))
sing_process3 = multiprocessing.Process(target=sing, args = (4,'xiaoLi'))
dance_process = multiprocessing.Process(target=dance, kwargs={'name': 'xiaohong', 'num': 2})
sing_process1.start()
sing_process2.start()
sing_process1.join()
sing_process2.join()
sing_process3.start()
dance_process.start()
输出结果:
xiaomi sing……
xiaoGang sing……
xiaomi sing……xiaoGang sing……
xiaoGang sing……
xiaoLi sing……
xiaohong dance……
xiaoLi sing……
xiaohong dance……
xiaoLi sing……
xiaoLi sing……
我们可以看到,在xiaomi执行了两次,xiaoGang执行了三次之后,才执行的dance和xiaoLi的唱歌
6.进程池
在现实当中,往往进程会很多,几百个的情况非常多见,这个时候,恐怕你再用以上的编程方式,就会累的要死,这个时候,我们就需要Pool来对进程进行相关的代码管理了。
6.1 进程池简介
Pool(
processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
)
其中processes是要创建的进程数。如果省略此参数,将使用cpu_count()的值。Initializer是每个工作进程启动时要执行的可调用对象。Initargs是要传递给initializer的参数元祖。Initializer默认为None。
6.2 进程池的相关方法
方法 | 描述 |
---|
apply(func [,args [,kwargs]]) | 在一个池工作进程中执行函数(*args,**kwargs),然后返回结果。 | apply_async(func [, args [,kwargs [,callback ] ] ]) | 在一个池工作进程中异步地执行函数(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,稍后可用于获得最终结果。Callback是可调用对象,接受输入参数。当func的结果变为可用时,将立即传递给callback。Callback禁止执行任何阻塞操作,否则将阻塞接收其他异步操作中的结果 | close() | 关闭进程池,防止进行进一步操作。如果还有挂起的操作,它们将在工作进程终止之前完成 | join() | 等待所有工作进程退出。此方法只能在close()或者terminate()方法之后调用 | imap( func,iterable [ ,chunksize] ) | map()函数的版本之一,返回迭代器而非结果列表 | imap_unordered( func,iterable [,chunksize] ) | 同imap()函数一样,只是结果的顺序根据从工作进程接收到的时间任意确定 | map( func,iterable [,chunksize] ) | 将可调用对象func应用给iterable中的所有项,然后以列表的形式返回结果。通过将iterable划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize指定每块中的项数。如果数量较大,可以增大chunksize的值来提升性能 | map_async( func,iterable [,chunksize [,callback]] ) | 同map()函数,但结果的返回是异步的。返回值是AsyncResult类的实例,稍后可用与获取结果。Callback是指接受一个参数的可调对象。如果提供callable,当结果变为可用时,将使用结果调用callable | terminate() | 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数 | get( [ timeout] ) | 返回结果,如果有必要则等待结果到达。Timeout是可选的超时。如果结果在指定时间内没有到达,将引发multiprocessing.TimeoutError异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发 | ready() | 如果调用完成,则返回True | sucessful() | 如果调用完成且没有引发异常,返回True。如果在结果就绪之前调用此方法,将引发AssertionError异常 | wait( [timeout] ) | 等待结果变为可用。Timeout是可选的超时 |
6.3代码演示:
首先先看一个非阻塞的案例
import multiprocessing
import time
def func(msg):
print("start:", msg)
time.sleep(3)
print("end:",msg)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(5):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
输出结果:
start: hello 0
start: hello 1
start: hello 2
end: hello 0
end:hello 2start:hello 3
end:hello 1
start: hello 4
end: hello 3
end: hello 4
7.进程通信
为什么会出现这个东西呢?因为,可能会有某一个全局的变量,给各个进程使用,如果不合理安排,那么就会造成运算结果的不唯一性。所以,务必要加入进程的通信机制,使得各个进程之间能够协调配合。
7.1 Queue
在进程通信当中,最为常用的东西就是:Queue。顾名思义,它是一个队列。具体来说,是多进程的安全队列。如果一个进程想入队,那么就调用当中的put方法,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.full异常。
如果某个进程需要出队,那么就调用get方法,这个方法在出队的同时,还会返回队头进程的信息。和put方法一样,get方法也有blocked和timeout参数。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
7.2 Queue的相关方法
方法 | 描述 |
---|
cancle_join_thread() | 不会在进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞 | close() | 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列尚未写入数据,但将在此方法完成时马上关闭 | empty() | 如果调用此方法时q为空,返回True | full() | 如果q已满,返回True | get([block [,timeout]) | 返回q中的一个项。如果q为空,此方法将阻塞,直到队列中有项可用为止。Block用于控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。Timeout是可选超时时间,用在阻塞模式中。如果在指定的时间间隔内没有项变为可用,将引发Queue.Empty异常 | join_thread() | 连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下此方法由不是q的原始创建者的所有进程调用。调用q.cancle_join_thread()方法可以禁止这种行为 | put(item [ , block [, timeout]]) | 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。Block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。Timeout指定在阻塞模式中等待可用时空间的时间长短。超时后将引发Queue.Full异常。 | qsize() | 返回目前队列中项的正确数量。 | joinableQueue([maxsize]) | 创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的 | task_done() | 消费者使用此方法发出信号,表示q.get()返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常 | join() | 生产者使用此方法进行阻塞,知道队列中的所有项均被处理。阻塞将持续到位队列中的每个项均调用q.task_done()方法为止 |
案例:进程池创建进程完成进程之间的通信
from multiprocessing import Manager,Pool
import time
def write(q):
for i in ["a","b","c"]:
print('开始写入值%s' % i)
q.put(i)
time.sleep(1)
def read(q):
print('开始读取')
while True:
if not q.empty():
print('读取到:',q.get())
time.sleep(1)
else:
break
if __name__=='__main__':
q=Manager().Queue()
p=Pool(3)
p.apply(write,(q,))
p.apply(read,(q,))
p.close()
p.join()
三.多线程编程
1.线程创建步骤
论步骤,其实跟多进程大同小异,只是用到的模块和方法不一样。与进程编程一样,大致也是分三步骤:
-
导入线程模块: import threading
-
通过进程类创建进程对象 thr = threading.Thread()
其中,关于这个Thread,解释如下: Thread(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
"""
参数意义和进程当中是类似的
其中,daemon就是用来守护线程的,在线程当中,我们直接往里面传True就可以达到目的
"""
-
启动进程执行相关任务 thr.start()
那么,关于使用,与进程也大同小异。
2.获取线程信息
可以用current_thread()方法来获取线程相关信息
import threading
import time
def task():
time.sleep(1)
thread = threading.current_thread()
print(thread)
if __name__ == '__main__':
for i in range(5):
sub_thread = threading.Thread(target=task)
sub_thread.start()
我们如果多次运行上面的代码,会发现:每次的输出结果都不同。由此可以知道一个很重要的事情:线程的运行是无序的。
除了current_thread(),Python还为我们提供了:
3. Thread类常用方法
方法名 | 描述 |
---|
run() | 用以表示线程活动的方法 | start() | 启动线程活动 | join([time]) | 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生 | isAlive() | 返回线程是否活动的 | getName() | 返回线程名 | setName() | 设置线程名 |
那么对于线程,其实用法什么的,跟进程都大同小异,因此不再赘余。
四.锁
学过《操作系统》的同学,对于进程的互斥与同步那一部分的内容应该印象深刻。那么,为了实现进程的互斥或者同步,最为常用的就是信号量机制。在这个方面的使用当中,锁是一个绕不开的话题。
锁有两种状态——锁定和未锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”状态,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
使用 Thread 对象的 Lock 可以实现简单的进程同(如果是线程,则是RLock),有上锁 acquire 方法和 释放release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。
1.互斥锁
比如说,我们写一个***互斥锁***机制:
from threading import Thread,Lock
num=0
mutex=Lock()
def test1():
global num
'''
在两个线程中都调用上锁的方法,则这两个线程就会抢着上锁,
如果有1方成功上锁,那么导致另外一方会堵塞(一直等待)直到这个锁被解开
'''
for i in range(100000):
mutex.acquire()
num+=1
mutex.release()
print('test1输出num:',num)
def test2():
global num
for i in range(100000):
mutex.acquire()
num+=1
mutex.release()
print('test2输出num:',num)
if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果:
test1输出num: 100000
test2输出num: 200000
2. 死锁
所谓死锁,其实就是各个线程或者进程由于资源分配不当,导致了互相争抢资源,但是由于资源都没够数,所以程序便推行不下去,唯有通过外力作用才能让程序继续推进。
比如如下代码:
import time
from threading import Thread,Lock
import threading
mutexA=threading.Lock()
mutexB=threading.Lock()
class MyThread1(Thread):
def run(self):
if mutexA.acquire():
print(self.name,'执行')
time.sleep(1)
if mutexB.acquire():
print(self.name,'执行')
mutexB.release()
mutexA.release()
class MyThread2(Thread):
def run(self):
if mutexB.acquire():
print(self.name,'执行')
time.sleep(1)
if mutexA.acquire():
print(self.name,'执行')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
t1=MyThread1()
t2=MyThread2()
t1.start()
t2.start()
那么最后,执行的时候就会出现类似“死循环”的症状,程序进行不下去,永远也没有结束的迹象。那么上面这个程序为什么会死锁呢?
我们看t1.start()那里,由于在Mythread1当中,会率先获取mutexA,而在t2.start()后,Mythread2中会率先获取mutexB,此时Mythread1想要获取mutexB,但是mutexB却已经让t2获得了,因此,t1进程无法获得muytexB。而t2想要获得mutexA,但mutexA却已经让t1获得了,还是拿不着。所以二者就一直这样互相争抢资源,导致程序无法运行下去
3. 同步
这里就只以线程同步为例了,当然了,各个进程之间也是可以实现同步的。
import time
from threading import Thread,Lock
import threading
lock1=Lock()
lock2=Lock()
lock3=Lock()
lock2.acquire()
lock3.acquire()
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print('...task1...')
time.sleep(1)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print('...task2...')
time.sleep(1)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print('...task3...')
time.sleep(1)
lock1.release()
if __name__ == '__main__':
t1=Task1()
t2=Task2()
t3=Task3()
t1.start()
t2.start()
t3.start()
运行结果:
...task1...
...task2...
...task3...
...task1...
...task2...
...task3...
...task1...
...task2...
...task3...
<不断的循执行下去>
五.多协程
在Python当中,用的最多的是yield
|