协程
概念
什么是协程 https://stackoverflow.com/questions/553704/what-is-a-coroutine 协程与线程的区别 https://stackoverflow.com/questions/1934715/difference-between-a-coroutine-and-a-thread 并行与并发的区别 https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism Python 3.5 协程究竟是个啥 https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/ 协程和并发的一门有趣课程 http://www.dabeaz.com/coroutines/
代码学习
使用生成器来实现一个并发网络应用程序(链接里最底下那篇很长的代码,不搬运了) https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p12_using_generators_as_alternative_to_threads.html 看不懂就背,默写,我就是这么过来的
站在巨人肩膀
- 调度器的实现,可参考epoll+reactor模型
- yield和send()的配合真的抽象,需熟悉二者的参数和返回值是怎么回事!让协程跑起来必须先send(None),或者next()
- 实现了NewTask类,方便添加新的task
- 实现了TimeWait类,用的是最小堆,为了防止task的添加顺序混乱,元组成员是(超时绝对时间,自增id,task)
from collections import deque
import heapq
import socket
import select
import time
class YieldEvent:
def handle_yield(self,sched,task):
pass
def handle_resume(self,sched,task):
pass
class Scheduler:
def __init__(self) -> None:
self._numtasks = 0
self._ready = deque()
self._waiting = {}
self._timer_id = 0
self._timer = []
self._epoll = select.epoll()
pass
def _ioloop(self):
timeout = None
now = time.time()
while self._timer and self._timer[0][0] <= now:
t,_,task = heapq.heappop(self._timer)
self.add_ready(task)
timeout = 0
if timeout is None and self._timer:
timeout = self._timer[0][0] - now
events = self._epoll.poll(timeout = timeout)
for fileno,event in events:
evt,task = self._waiting.pop(fileno)
self._epoll.unregister(fileno)
evt.handle_resume(self,task)
pass
def new(self,task):
self._ready.append((task,None))
self._numtasks += 1
def add_ready(self,task,msg=None):
self._ready.append((task,msg))
def _read_wait(self,fileno,evt,task):
self._waiting[fileno] = (evt,task)
self._epoll.register(fileno,select.EPOLLIN)
def _write_wait(self,fileno,evt,task):
self._waiting[fileno] = (evt,task)
self._epoll.register(fileno,select.EPOLLOUT)
def _time_wait(self,seconds,task):
heapq.heappush(self._timer,(time.time() + seconds,self._timer_id,task))
self._timer_id += 1
pass
def run(self):
while self._numtasks:
if not self._ready:
self._ioloop()
continue
task,msg = self._ready.popleft()
try:
evt = task.send(msg)
if isinstance(evt,YieldEvent):
evt.handle_yield(self,task)
else:
raise RuntimeError(f'unrecognized yield event {type(evt)}')
except StopIteration:
self._numtasks-=1
pass
class ReadSocket(YieldEvent):
def __init__(self,sock,nbytes) -> None:
self.sock = sock
self.nbytes = nbytes
def handle_yield(self,sched, task):
sched._read_wait(self.sock.fileno(),self,task)
def handle_resume(self, sched, task):
d = self.sock.recv(self.nbytes)
sched.add_ready(task,d)
class WriteSocket(YieldEvent):
def __init__(self,sock,data) -> None:
self.sock = sock
self.data = data
def handle_yield(self, sched, task):
sched._write_wait(self.sock.fileno(),self,task)
def handle_resume(self, sched, task):
nsent = self.sock.send(self.data)
sched.add_ready(task,nsent)
class AcceptSocket(YieldEvent):
def __init__(self,sock) -> None:
self.sock = sock
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(),self,task)
def handle_resume(self, sched, task):
r = self.sock.accept()
sched.add_ready(task,r)
class NewTask(YieldEvent):
def __init__(self,task) -> None:
self.task = task
def handle_yield(self, sched, task):
sched.new(self.task)
sched.add_ready(task)
def handle_resume(self, sched, task):
pass
class TimeWait(YieldEvent):
def __init__(self,seconds) -> None:
self.seconds = seconds
def handle_yield(self, sched, task):
sched._time_wait(self.seconds,task)
pass
class Socket:
def __init__(self,sock) -> None:
self.sock = sock
def recv(self,nbytes):
return ReadSocket(self.sock,nbytes)
def send(self,data):
return WriteSocket(self.sock,data)
def accept(self):
return AcceptSocket(self.sock)
def __getattr__(self, name: str):
return getattr(self.sock,name)
def server(addr):
sock = Socket(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
sock.bind(addr)
sock.listen(5)
while True:
c,a = yield sock.accept()
yield NewTask(handle_client(Socket(c),a))
def handle_client(sock,addr):
print(f'accept {addr}')
while True:
data = yield sock.recv(1024)
if not data:
break
yield TimeWait(1)
yield sock.send(data)
print(f'client close {addr}')
sock.close()
if __name__ == '__main__':
sched = Scheduler()
sched.new(server(('',8080)))
sched.run()
未完待续
c可以采用ucontext实现协程,可参考风云写的代码 https://github.com/cloudwu/coroutine/
我先研究研究…
|