本文将从一段最简单的“顺序执行”代码开始,逐步深入,来理解Python的asyncio事件循环和协程的底层实现原理。
先说下Python yield的作用,简单说就是在代码中可以 暂停/恢复 代码的执行,这是最关键的,这样就有机会中断函数的执行,把时间分配给其他函数,然后在适当时机从中断位置恢复。在Python中有yield的函数,叫生成器,协程就是基于生成器一步步发展而来。
一、顺序执行
在开始前,先从最简单的顺序执行开始,代码如下:
import time
def countdown(n):
while n > 0:
print('Down', n)
time.sleep(1)
n -= 1
def countup(stop):
x = 0
while x < stop:
print('Up', x)
time.sleep(1)
x += 1
countdown(5)
countup(5)
顺序执行会依次执行两个函数,执行过程是同步的,输出内容如下:
Down 5
Down 4
Down 3
Down 2
Down 1
Up 0
Up 1
Up 2
Up 3
Up 4
二、并发执行
接下来,看下并发执行,并发执行的经典解决方案是 多线程,代码如下:
import time
import threading
def countdown(n):
while n > 0:
print('Down', n)
time.sleep(1)
n -= 1
def countup(stop):
x = 0
while x < stop:
print('Up', x)
time.sleep(1)
x += 1
threading.Thread(target=countdown, args=(5,)).start()
threading.Thread(target=countup, args=(5,)).start()
当用两个线程执行上面的两个函数,输出内容就不能保证顺序了,并且Up和Down还可能连在一起,这是因为print函数是线程不安全的,输出如下:
Down 5
Up 0
UpDown 1
4
UpDown 2
3
Up 3
Down 2
UpDown 41
那么,有没有办法不用多线程,就能实现并发?答案是有的。
三、引入调度器
要在单个线程中实现并发,就需要让countdown和countup两个函数轮替执行,于是就引入调度器,这个调度器的作用类似Python里的事件循环:
import time
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque()
def call_soon(self, func):
self.ready.append(func)
def run(self):
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler()
def countdown(n):
if n > 0:
print('Down', n)
time.sleep(1)
sched.call_soon(lambda: countdown(n - 1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
time.sleep(1)
sched.call_soon(lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(5))
sched.run()
上面的代码,用一个双向队列,已经实现了一个简单的调度器,countdown和countup会依次执行。但是,这样的调度器只是简单的轮替执行,并没有同时执行的效果,输出如下:
Down 5
Up 0
Down 4
Up 1
Down 3
Up 2
Down 2
Up 3
Down 1
Up 4
上面的调度器还有个问题,比如把countdown里的sleep时间改成5秒,那么这个调度器在轮替到countdown时,会等待5秒,一直卡在那里,所以是一种资源浪费,因为卡5秒的时间里,什么事也干不了。
于是,我们开始优化,让sleep函数延后执行,而不是立刻阻塞,这次引入 call_later 方法:
import time
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
deadline = time.time() + delay
self.sleeping.append((deadline, func))
self.sleeping.sort()
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, func = self.sleeping.pop(0)
deta = deadline - time.time()
if deta > 0:
time.sleep(deta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler()
def countdown(n):
if n > 0:
print('Down', n)
sched.call_later(4, lambda: countdown(n - 1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
sched.call_later(1, lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
此时的输出,看着已经是并发执行了,一个简单的异步框架的雏形就出来了!
看到了吗,并发执行的关键是 Scheduler 这个类,它使用ready和sleeping两个队列共同实现轮换执行,真正的调度者是 Scheduler.run 方法,输出如下:
Down 5
Up 0
Up 1
Up 2
Up 3
Down 4
Up 4
Up 5
Up 6
Up 7
Down 3
Up 8
Up 9
Up 10
Up 11
Down 2
Up 12
Up 13
Up 14
Up 15
Down 1
Up 16
Up 17
Up 18
Up 19
下面,用优先队列代替列表排序,并用序列号解决deadline值相同时会对函数排序造成的异常(输出结果同上):
import time
import heapq
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, func = heapq.heappop(self.sleeping)
deta = deadline - time.time()
if deta > 0:
time.sleep(deta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler()
def countdown(n):
if n > 0:
print('Down', n)
sched.call_later(4, lambda: countdown(n - 1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
sched.call_later(1, lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
四、异步队列的实现
现在,我们在调度器的基础上,自己实现一个异步队列,整合到上文的源码中,文件名aproducer.py。
Scheduler使用两个队列来实现函数的并发执行,ready队列存放待执行的函数,sleeping队列让sleep函数延后执行,然后通过Scheduler.run来调度函数的执行。
AsyncQueue也使用两个队列来实现异步生产-消费模型,items队列存放生产消费的数据,waiting队列实现异步非阻塞地消费数据(通过回调函数)。同时,异步队列还实现了一个close方法,来告知生产者、消费者队列是否关闭。
import time
import heapq
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, func = heapq.heappop(self.sleeping)
deta = deadline - time.time()
if deta > 0:
time.sleep(deta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler()
class Result:
def __init__(self, value=None, exc=None):
self.value = value
self.exc = exc
def result(self):
if self.exc:
raise self.exc
else:
return self.value
class QueueClosed(Exception):
pass
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
self._closed = False
def close(self):
self._closed = True
if self.waiting and not self.items:
for func in self.waiting:
sched.call_soon(func)
def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
sched.call_soon(func)
def get(self, callback):
if self.items:
callback(Result(value=self.items.popleft()))
else:
if self._closed:
callback(Result(exc=QueueClosed()))
else:
self.waiting.append(lambda: self.get(callback))
def producer(q, count):
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n + 1))
else:
print('Producer done')
q.close()
_run(0)
def consumer(q):
def _consum(result):
try:
item = result.result()
print('Consuming', item)
sched.call_soon(lambda: consumer(q))
except QueueClosed:
print('Consumer done')
q.get(callback=_consum)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q))
sched.run()
输出内容如下:
Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4
Producing 5
Consuming 5
Producing 6
Consuming 6
Producing 7
Consuming 7
Producing 8
Consuming 8
Producing 9
Consuming 9
Producer done
Consumer done
五、引入yield
上文实现的调度器,本质上也是顺序执行的,只是效果上出现了不同函数之间切换执行。
这次,我们引入yield,这样我们就获得了 暂停/恢复 代码执行的能力。由此,我们可以实现一个基于协程的调度器,文件名yieldo.py。
import time
import heapq
from collections import deque
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
self.current = None
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None
await switch()
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
try:
self.current.send(None)
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler()
async def countdown(n):
while n > 0:
print('Down', n)
await sched.sleep(4)
n -= 1
async def countup(stop):
x = 0
while x < stop:
print('Up', x)
await sched.sleep(1)
x += 1
sched.new_task(countdown(5))
sched.new_task(countup(20))
sched.run()
执行yieldo.py的输出结果:
Down 5
Up 0
Up 1
Up 2
Up 3
Down 4
Up 4
Up 5
Up 6
Up 7
Down 3
Up 8
Up 9
Up 10
Up 11
Down 2
Up 12
Up 13
Up 14
Up 15
Down 1
Up 16
Up 17
Up 18
Up 19
下面是使用asyncio写的等价代码,输出结果与yieldo.py的一样:
import asyncio
async def countdown(n):
while n > 0:
print('Down', n)
await asyncio.sleep(4)
n -= 1
async def countup(stop):
x = 0
while x < stop:
print('Up', x)
await asyncio.sleep(1)
x += 1
async def main():
t1 = asyncio.create_task(countdown(5))
t2 = asyncio.create_task(countup(20))
await t1
await t2
asyncio.run(main())
六、基于协程的异步队列
在上一节,我们基于async/await关键字与yield的暂停/恢复能力,实现了一个异步调度器,进一步接近asyncio中的事件循环。
于是,我们用这种方法改良第四节中的异步队列,文件名yproducer.py:
import time
import heapq
from collections import deque
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
self.current = None
self.sequence = 0
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None
await switch()
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(coro)
self.current = self.ready.popleft()
try:
self.current.send(None)
if self.current:
self.ready.append(self.current)
except StopIteration:
pass
sched = Scheduler()
class QueueClosed(Exception):
pass
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
self._close = False
def close(self):
self._close = True
if self.waiting and not self.items:
sched.ready.append(self.waiting.popleft())
async def put(self, item):
if self._close:
raise QueueClosed()
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
while not self.items:
if self._close:
raise QueueClosed()
self.waiting.append(sched.current)
sched.current = None
await switch()
return self.items.popleft()
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
q.close()
async def consumer(q):
try:
while True:
item = await q.get()
print('Consuming', item)
except QueueClosed:
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
sched.run()
输出内容如下:
Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4
Producing 5
Consuming 5
Producing 6
Consuming 6
Producing 7
Consuming 7
Producing 8
Consuming 8
Producing 9
Consuming 9
Producer done
Consumer done
七、收尾
上面的内容,已经实现了:
- 调度器Scheduler,可以执行异步任务
- 异步队列AsyncQueue,可以异步非阻塞地消费数据
最后,构造一个Task类,来包装协程,并把异步任务和异步队列的生产消费整合一下,文件名coro_callback.py:
import time
from collections import deque
import heapq
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
self.current = None
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
def new_task(self, coro):
self.ready.append(Task(coro))
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch()
class Task:
def __init__(self, coro):
self.coro = coro
def __call__(self):
try:
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler()
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
async def put(self, item):
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
if not self.items:
self.waiting.append(sched.current)
sched.current = None
await switch()
return self.items.popleft()
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
await q.put(None)
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
def countdown(n):
if n > 0:
print('Down', n)
sched.call_later(4, lambda: countdown(n - 1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
sched.call_later(1, lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()
输出内容:
Producing 0
Consuming 0
Down 5
Up 0
Producing 1
Consuming 1
Up 1
Producing 2
Consuming 2
Up 2
Producing 3
Consuming 3
Up 3
Down 4
Producing 4
Consuming 4
Up 4
Producing 5
Consuming 5
Up 5
Producing 6
Consuming 6
Up 6
Producing 7
Consuming 7
Up 7
Down 3
Producing 8
Consuming 8
Up 8
Producing 9
Consuming 9
Up 9
Producer done
Consumer done
Up 10
Up 11
Down 2
Up 12
Up 13
Up 14
Up 15
Down 1
Up 16
Up 17
Up 18
Up 19
加餐:
把上文所讲的技术整合起来,实现一个网络IO调度器,文件名io_scheduler.py:
from socket import *
import time
from collections import deque
import heapq
from select import select
class Scheduler:
def __init__(self):
self.ready = deque()
self.sleeping = []
self.sequence = 0
self._read_waiting = {}
self._write_waiting = {}
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def read_wait(self, fileno, func):
self._read_waiting[fileno] = func
def write_wait(self, fileno, func):
self._write_waiting[fileno] = func
def run(self):
while self.ready or self.sleeping or self._read_waiting or self._write_waiting:
if not self.ready:
if self.sleeping:
deadline, _, func = self.sleeping[0]
timeout = deadline - time.time()
if timeout < 0:
timeout = 0
else:
timeout = None
can_read, can_write, _ = select(self._read_waiting,
self._write_waiting, [], timeout)
for fd in can_read:
self.ready.append(self._read_waiting.pop(fd))
for fd in can_write:
self.ready.append(self._write_waiting.pop(fd))
now = time.time()
while self.sleeping:
if now > self.sleeping[0][0]:
self.ready.append(heapq.heappop(self.sleeping)[2])
else:
break
while self.ready:
func = self.ready.popleft()
func()
def new_task(self, coro):
self.ready.append(Task(coro))
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch()
async def recv(self, sock, maxbytes):
self.read_wait(sock, self.current)
self.current = None
await switch()
return sock.recv(maxbytes)
async def send(self, sock, data):
self.write_wait(sock, self.current)
self.current = None
await switch()
return sock.send(data)
async def accept(self, sock):
self.read_wait(sock, self.current)
self.current = None
await switch()
return sock.accept()
class Task:
def __init__(self, coro):
self.coro = coro
def __call__(self):
try:
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler()
async def tcp_server(addr):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(1)
while True:
client, addr = await sched.accept(sock)
print('Connection from', addr)
sched.new_task(echo_handler(client))
async def echo_handler(sock):
while True:
data = await sched.recv(sock, 10000)
if not data:
break
await sched.send(sock, b'Got:' + data)
print('Connection closed')
sock.close()
sched.new_task(tcp_server(('', 30000)))
sched.run()
参考: https://www.bilibili.com/video/BV1qz4y1Q7EY/?spm_id_from=333.337.search-card.all.click
|