# 第16章 协程
"""
to yield 的两个释义:
产出
yield item会产出一个值,提供给调用方的next()
让步
暂停执行生成器,让调用方继续工作,直到需要使用另一个值时,调用next()
协程:
和生成器类似,都是定义体中包含yield关键字
协程中,yield通常出现在表达式的右边,可以产出值,也可以产出,这时生成器产出None
协程从调用方接受数据
.send()方法
yield甚至还能不接受或传出数据,yield是一种流程控制工具
使用协程可以把控制权让渡给中心调度程序,从而激活其他的协程
本章内容提要:
生成器作为协程使用时的行为和状态
使用装饰器自动预激协程
调用方如何使用生成器对象的.close()方法和.throw()方法控制协程
协程终止时,如何返回值
yield from 新句法的用途和语义
使用协程管理仿真系统中的并发活动
"""
# 16.1 生成器如何进化成协程
# 16.2 用作协程的生成器的基本行为
# 示例16-1 协程的最简单使用演示
"""
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received',x)
my_coro = simple_coroutine()
print(my_coro) # 得到一个生成器对象
next(my_coro) # 预激
my_coro.send(42) # 发送数据
"""
# 协程的四个状态
# 当前状态可以使用inspect.getgeneratorstate()查看
"""
GEN_CREATED
等待开始执行
GEN_RUNNING
解释器正在执行
只有在多线程应用中才能看到这个状态
GEN_SUSPENDED
在yield表达式处暂停
只有这个状态才能使用send()方法
GEN_CLOSED
执行结束
"""
# 预激(prime) 即让协程执行到第一个yield表达式
# 使用next(my_coro)或者my_coro.send(None)
# 示例16-2 产出两个值的协程
"""
def simple_coro2(a):
print('-> Started: a = ',a)
b = yield a
print('-> Received:b = ',b)
c = yield a + b
print('-> Received: c = ',c)
my_coro2 = simple_coro2(14)
from inspect import getgeneratorstate
print(getgeneratorstate(my_coro2)) # GEN_CREATED
print(next(my_coro2))
print(getgeneratorstate(my_coro2)) # GEN_SUSPENDED
print(my_coro2.send(28))
my_coro2.send(99)
print(getgeneratorstate(my_coro2))
"""
# 16.3 使用协程计算移动平均值
# coroaverager0.py
# 16.4预激协程的装饰器
# coroutil.py
# 示例16-6 预激装饰器的使用
# coroaverager1.py
# 16.5 终止协程和异常处理
# 终止协程.py
# 示例16-7暗示了终止协程的一种方式:
# 给协程发送某个哨符值,让协程因为异常退出
# 内置的None和Ellipsis常量经常用作哨符值
# 甚至有人直接将StopIteration类作为哨符值
# python2.5开始客户代码可以在生成器对象上调用两个方法,显式地把异常发给协程
"""
throw 和 close
generator.throw(exc_type[,exc_value[,traceback]])
致使生成器在暂停的yield表达式处抛出指定异常,如果生成器处理了异常,
代码会向前执行到下一个yield表达式,而产出的值作为throw方法的返回值
如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中
generator.close()
致使生成器暂停的yield表达式处抛出GeneratorExit异常,如果生成器没有处理
这个异常,或者抛出了StopIteration异常(通常是指运行到了结尾),调用方不会报错
如果收到GeneratorExit,生成器一定不能产出值,否则解释器会抛出RuntimeError异常
生成器抛出的其他异常会向上冒泡,传给调用方
"""
# 示例16-8 coro_exc_demo.py 学习在协程中处理异常的测试代码
# 如果不管协程如何退出都想做些清理工作,可以使用try/finally
# 示例16-12 coro_finally_demo.py:使用try/finally块在协程终止时执行操作
# 16.6 让协程返回值
# 示例16-13 coroaverager2.py :定义一个求平均值的协程,让它返回一个结果
# 16.7使用yield from 结构
"""
yield from 是全新的语言结构,在其他的语言中类似的结构使用await关键字
在生成器gen中使用yield from subgen()时:
subgen()会获得控制权,把产出的值传给gen的调用方
即调用方可以直接控制subgen,此时gen会阻塞,等待subgen终止
"""
# yield from用于简化for循环
"""
def gen():
for c in 'Ab':
yield c
for i in range(1,3):
yield i
print(list(gen())) # ['A', 'b', 1, 2]
# 可以改写为:
def gen():
yield from 'Ab'
yield from range(1,3)
print(list(gen())) # ['A', 'b', 1, 2]
"""
# 示例16-16:使用yield from 链接可迭代对象
"""
def chain(*iterable):
for it in iterable:
yield from it
s = 'ABC'
t = tuple(range(3))
print(list(chain(s, t))) # ['A', 'B', 'C', 0, 1, 2]
"""
"""
yield from 的主要功能是打开双向通道,把最外层的调用方和最内层的子生成器连接起来
这样二者可以直接发送和产出值,还可以直接传入异常,而不用在中间的协程中添加大量处理异常
的样板代码.
术语解释:
委派生成器
包含yield from <iterable>表达式的生成器函数
子生成器
从yield from表达式中<iterable>部分获取的生成器
调用方
指代调用委派生成器的客户端代码,或者称为客户端
委派生成器在yield from表达式处暂停,调用方可以直接把数据发送给子生成器
子生成器在把产出的值发给调用方,子生成器返回之后,解释器会抛出StopIteration异常
并把返回值附加到异常对象上,此时委派生成器会恢复.
"""
# 示例16-17 coroaverager3.py:使用yield from 计算平均值
# 16.8yield from的意义
"""
子生成器产出的值都直接传送给委派生成器的调用方,即客户端代码
使用send()方法发给委派生成器的值都直接传给子生成器,如果发送的是None,
那么会调用zi生成器的__next__方法,如果发送的值不是None
那么会调用子生成器的send()方法,如果调用的方法抛出StopIteration异常
那么委派生成器会恢复运行,任何其他的异常都会向上冒泡,传给委派生成器
生成器退出时,生成器(或者子生成器)中的return expr表达式会触发StopIteration(expr)异常抛出
yield from 表达式的值是子生成器终止时传给StopIteration异常的第一个参数
"""
# yield from结构的另外两个特性
"""
传入委派生成器的异常,除了GeneratorExit外都传给子生成器的throw()方法
如果调用throw()方法时抛出StopIteration,委派生成器恢复运行
StopIteration以外的异常会向上冒泡,传给委派生成器.
如果把GeneratorExit传入委派生成器,或者在委派生成器上调用close()方法
那么在子生成器上调用close()方法,如果调用close()导致异常抛出,那么异常会向上冒泡
传给委派生成器,否则委派生成器抛出GeneratorExit
yield from 的句法很难理解,可以研究PEP 380中 Greg Ewing写的伪代码
"""
# 示例16-18 简化的伪代码
"""
对RESULT = yield from EXPR的补充:
_i = iter(EXPR) # 1
try:
_y = next(_i) # 2
except StopIteration as _e:
_r = _e.value # 3
else:
while 1: # 4
_s = yield _y #5
try:
_y = _i.send(_s) #6
except StopIteration as _e: # 7
_r = _e.value
break
RESULT = _r # 8
解释:
1.EXPR可以是任何可迭代对象,因为获取的迭代器_i(子生成器)使用iter()函数
2.预激子生成器,结果保存再_y中,作为第一个产出值
3.如果抛出StopIteration异常,获取异常对象的value值,赋值给_r --这是最简单
情况下的返回值
4.运行这个循环时,委派生成器会阻塞,只作为调用方和子生成器之间的管道
5.产出子生成器当前产出的元素;等待调用方发送_s中保存的值
6.尝试让子生成器向前执行,转发调用方发送的_s
7.如果子生成器抛出StopIteration异常,获取value的值,赋值给_r,然后退出循环,让委派生成器恢复运行
8.返回的结果是_r,即整个yield from表达式的值
"""
# 示例16-19 完整的伪代码
# 对RESULT = yield from EXPR的补充:
"""
_i = iter(EXPR) # 1
try:
_y = next(_i) # 2
except StopIteration as _e:
_r = _e.value # 3
else:
while 1: # 4
try:
_s = yield _y #5
except GeneratorExit as _e: # 6 这一部分用于关闭委派生成器和子生成器
try:
-m = _i.close()
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e: # 7 这一部分处理通过.throw传入的异常
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else: # 8
try:
_y= _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else: # 9 如果产出的值没有抛出异常
try: #10 尝试子生生成器向前执行
if _s is None: # 11
_y = _i.send(_s)
except StopIteration as _e: # 12
_r = _e.value
break
RESULT = _r
"""
# 16.9 使用案例:使用协程做离散时间仿真
# 仿真是协程的经典应用
"""离散事件仿真(Discrete Event Simulation,DES):
是一种把系统建模成一系列事件的仿真类型,
在离散事件仿真中,仿真钟不是以固定的量向前推进,而是直接推进到下一个事件模型的模拟时间
"""
# 16.9.2 出租车队运营仿真
# taxi_sim.py:taxi_process协程,实现各辆出租车的活动
# 示例16-23 taxi_sim.py:Simulator类,一个简单的离散事件仿真类,重点关注run方法
coroaverager0.py
# coroaverager0.py 使用协程计算移动平均值
"""
这个协程会无限循环,仅当调用方在协程上调用.close方法,或者没有对协程的引用而被垃圾回收
这个协程才会停止
"""
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
if __name__ == '__main__':
my_coro = averager()
next(my_coro)
print(my_coro.send(1))
print(my_coro.send(2))
print(my_coro.send(3))
coroutil.py预激协程的装饰器
# coroutil.py 预激协程的装饰器
from functools import wraps
def coroutine(func):
'''装饰器:向前执行到第一个yield表达式,预激func'''
@wraps(func)
def primer(*args,**kwargs):
gen = func(*args,**kwargs)
next(gen)
return gen
return primer
coroaverager1.py
# 预激装饰器的使用 coroaverager1.py
from coroutil import coroutine
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
if __name__ == '__main__':
my_coro = averager()
from inspect import getgeneratorstate
print(getgeneratorstate(my_coro)) # GEN_SUSPENDED
my_coro.send(10)
my_coro.send(20)
print(my_coro.send(30))
终止协程.py
# 协程中未处理的异常会向上冒泡,传给next函数或者send方法的调用方
# 示例16-7 未处理的异常会导致协程终止
from coroaverager1 import averager
coro_avg = averager()
print(coro_avg.send(40))
print(coro_avg.send(50))
# 发送的值不是数字,导致协程内部有异常抛出
# 由于协程内部没有处理异常,协程会终止
print(coro_avg.send('spam')) # Traceback
# 如果尝试重新激活协程,会抛出StopIteration异常
print(coro_avg.send(60))
coro_exc_demo.py 学习在协程中处理异常的测试代码
# 示例16-8 coro_exc_demo.py 学习在协程中处理异常的测试代码
class DemoException(Exception):
'''为这次演示定义的异常类型'''
def demo_exc_handling():
print('-> coroutine started')
while 1:
try:
x = yield
except DemoException:
print('*** DemoException handled.Continuing...')
else:#如果没有异常name显示接受到的值
print('-> coroutine received:{!r}'.format(x))
raise RuntimeError('This line should never run')
if __name__ == '__main__':
from inspect import getgeneratorstate
ge = demo_exc_handling()
print(getgeneratorstate(ge))
next(ge)
print(getgeneratorstate(ge))
ge.send(10)
ge.send(20)
ge.throw(DemoException)
print(getgeneratorstate(ge))
print(ge.throw(DemoException))
ge.send(100)
# ge.throw(ZeroDivisionError)
ge.close()
print(getgeneratorstate(ge))
# ge.send(200)
coro_finally_demo.py:使用try/finally块在协程终止时执行操作
# 示例16-12 coro_finally_demo.py:使用try/finally块在协程终止时执行操作
class DemoException(Exception):
'''为这次演示定义的异常类型'''
def demo_finally():
print('-> coroutine started')
try:
while 1:
try:
x = yield
except DemoException:
print('*** DemoException handled.Continuing...')
else:#如果没有异常name显示接受到的值
print('-> coroutine received:{!r}'.format(x))
finally:
print('-> coroutine ending')
if __name__ == '__main__':
from inspect import getgeneratorstate
ge = demo_finally()
print(getgeneratorstate(ge))
next(ge)
print(getgeneratorstate(ge))
ge.send(10)
ge.send(20)
ge.throw(DemoException)
print(getgeneratorstate(ge))
print(ge.throw(DemoException))
ge.send(100)
# ge.throw(ZeroDivisionError)
ge.close()
print(getgeneratorstate(ge))
# ge.send(200)
coroaverager2.py :定义一个求平均值的协程,让它返回一个结果
# 示例16-13 coroaverager2.py :定义一个求平均值的协程,让它返回一个结果
from collections import namedtuple
Result = namedtuple('Result','count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None: # 添加条件让协程终止,才能执行到return语句
break
total += term
count += 1
average = total/count
return Result(count,average)
if __name__ == '__main__':
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(15)
coro_avg.send(113)
# 获取返回值
try:
print(coro_avg.send(None))
except StopIteration as exc:
result = exc.value
print(result)
# 这一版本不产出值
# 发送None导致协程结束,返回结果,一如既往生成器对象会抛出StopIteration异常
# 异常对象的value属性保存着返回的值
coroaverager3.py:使用yield from 计算平均值
# 示例16-17 coroaverager3.py:使用yield from 计算平均值
# 从一个字典中读取虚构的七年级男女同学的体重和身高
from collections import namedtuple
Result = namedtuple('Result','count average')
# 子生成器
def averager(): # 1
total = 0.0
count = 0
average = None
while True:
term = yield # 2
if term is None: # 3至关重要的终止条件
break
total += term
count += 1
average = total/count
return Result(count,average) # 4 返回的Result会成为grouper函数中yield from表达式的值
# 委派生成器
def grouper(result,key): # 5
while 1: # 6 每次循环都会新建一个averager实例
result[key] = yield from averager() #7
def main(data): # 8
results = {}
for key,values in data.items():
group = grouper(results,key) # 9
next(group) # 10
for value in values:
group.send(value) #11
group.send(None) # 12 重要
print(results)
report(results)
def report(results):
for key,result in sorted(results.items()):
group,unit = key.split(';')
print("{} {} averaging {}{}".format(
result.count,group,result.average,unit
))
if __name__ == '__main__':
data = {
'girls;kg':
[40.9,38.5,44.3,42,41,37.5,39.6,37.4,39.8,42.3,41.7],
'girls;m':
[1.5,1.6,1.45,1.65,1.67,1.32,1.4,1.7,1.33,1.32],
'boys;kg':
[41.9, 39.5, 54.3, 42, 41.9, 37.8, 39.9, 37.1, 39.8, 42.5, 40.7],
'boys;m':
[1.6, 1.7, 1.55, 1.65, 1.67, 1.42, 1.34, 1.77, 1.43, 1.44],
}
main(data)
"""
解释:
外层for循环每次迭代会新建一个grouper实例,赋值给group变量,group是委派生成器
调用next(group),预激委派生成器grouper,此时进入while 1循环,调用子生成器averager
后,在yield from表达式处暂停,
内层for循环调用group.send(value),直接把值传递给子生成器averager,同时,当前的
group在yield from表达式处暂停
内层循环结束后,group依然在yield from处暂停,因此result[key] =... 还没有执行
如果外层for循环的末尾没有group.send(None),那么averager子生成器永远不会终止,
委派生成器永远不会再次激活,因此永远不会给result[key]赋值
外层for循环重新迭代时,会新建一个grouper实例,然后绑定到group变量上
前一个实例(以及它创建的尚未终止的averager子生成器实例)被垃圾回收
"""
# taxi_sim.py:taxi_process协程,实现各辆出租车的活动
# 示例16-23 taxi_sim.py:Simulator类,一个简单的离散事件仿真类,重点关注run方法
# taxi_sim.py:taxi_process协程,实现各辆出租车的活动
# 事件类的定义
from collections import namedtuple
Event = namedtuple('Event',"time proc action") # proc 是出租车进程实例的编号
def taxi_process(ident,trips,start_time = 0):
"""每次改变状态时创建对象,把控制权让给仿真器"""
time = yield Event(start_time,ident,'leave garage')
for i in range(trips):
time = yield Event(time,ident,'pick up passenger')
time = yield Event(time,ident,'drop off passenger')
yield Event(time, ident, 'going home')
"""
说明:
每一辆出租车调用一次taxi_process函数,创建一个生成器对象,表示各个出租车的运营过程
ident表示出租车编号 trips表示行程数量
"""
# 示例16-23 taxi_sim.py:Simulator类,一个简单的离散事件仿真类,重点关注run方法
import queue
class Simulator:
def __init__(self,procs_map):
self.events = queue.priorityQueue()
self.procs = dict(procs_map)
def run(self,end_time): # 1
"""排定并显示事件,直到时间结束"""
# 排定各出租车的第一个事件
for _,proc in sorted(self.procs.items()): # 2
first_event = next(proc) # 3
self.events.put(first_event) # 4
# 这个仿真系统的主循环
sim_time = 0 # 5
while sim_time < end_time: # 6
if self.events.empty(): # 7
print("*** end of events")
break
current_event = self.events.get() # 8
sim_time,proc_id,previous_action = current_event # 9
print("taxi:",proc_id,proc_id*' ',current_event) # 10
active_proc = self.procs[proc_id] # 11
next_time = sim_time+compute_duration(previous_action) # 12
try :
next_event = active_proc.send(next_time) # 13
except StopIteration:
del self.procs[proc_id] # 14
else:
self.events.put(next_event) # 15
else: # 16
msg = '*** end of simulation time:{} events pending***'
print((msg.format(self.events.qsize())))
if __name__ == '__main__':
taxi = taxi_process(13,2,0)
print(next(taxi))
print(taxi.send(7))
print(taxi.send(23))
print(taxi.send(28))
print(taxi.send(35))
print(taxi.send(40))
35岁学python,也不知道为了啥?
|