python asyncio 异步协程知识点集合
1. 协程
? 协程不是计算机提供的,而是由程序员人为创造出来的,也称微线程,它是一种用户态、上下文切换的一种技术,让一个线程去在代码之间切换运行的过程。
2. 实现协程的技术
? greenlet 早期模块
? yield 关键字
? asyncio 装饰器 (python 3.4 之后才可使用)
? async、await 关键字 (python 3.5及之后才可使用)
3. asyncio 的使用
? 早期 python 3.4 实现通过 @asyncio.coroutine 实现
?
异步协程的重点在于程序遇到io阻塞它会自动切换 , 程序在遇到asyncio.sleep(2) 时他会自动切换到其他协程任务上。
python 3.5 之后 使用 async & await 关键字实现
其实 他和3.4 asyncio 装饰器 实现 效果是一样的, 只是python 想让程序员的代码变得更加简洁引用了这两个关键字。
4. 异步编程
?
事件循环 --------> 死循环 去检测或执行某些代码
?
'任务列表' = [任务1 , 任务2 , 任务3, ........ ]
while True:
'去任务列表中检查所有任务,将 可执行 和 已完成 的任务返回'
for 就绪任务 in 可执行任务列表:
执行就绪任务
for 已完成任务 in 已完成任务列表:
在任务中移除已完成列表
如果任务列表中任务都已完成 则终止循环
loop = asyncio.get_event_loop()
loop.run_until_complete(任务)
asyncio.run('协程对象')
async def func():
func()
await + 可等待对象 ( 协程对象, future, task 对象 ——》 io 等待对象 )
事件循环将任务加到任务列表之后,执行可执行任务,当程序遇到io 等待之后,事件循环不会执行这个函数了,他会切换到其他任务去执行,等io等待完成之后,事件循环会检测到并切换到该任务继续执行。
Task 对象 --------> 帮助你在事件循环中添加多个任务
asyncio.create_task(协程对象)
'还可以使用'
loop.create_task() 或 ensure_future() 函数
asyncio.wait(task任务)
"和"
asyncio.gather(*task任务)
"异同“
-
相同: 从功能上看,asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果 收集起来。 -
异同 : asyncio.wait 会返回两个值:done 和 pending ,done 为已完成的协程 Task ,pending 为超时未完成的协程 Task ,需通过 future.result 调用 Task 的 result ;而asyncio.gather 返回的是所有已完成 Task 的 result ,不需要再进行调用或其他操作,就可以得到全部结果。
tornado 实现 线程监听+时间循环 使用redis 发布者订阅者模式实现
def redis_listener(loop):
asyncio.set_event_loop(loop)
async def listener():
global np
r = redis.Redis(decode_responses=True)
ps = r.pubsub()
ps.subscribe(client_id)
for message in ps.listen():
if message['type'] == 'message'
async with aiofiles.open("test.txt",'a',encoding='utf8') as f:
await f.write(data['data']+'\n')
future = asyncio.gather(listener())
loop.run_until_complete(future)
class SendMsg(BaseHandler):
async def post(self):
global np
channel = self.get_argument('channel',None)
data = self.get_argument('data',None)
id = self.get_argument('id',None)
np = channel
data = '{"data":"%s","id":%s}' % (data,id)
r = redis.Redis(decode_responses=True)
r.publish(channel,data)
return self.write({"code":200,"msg":data})
app = tornado.web.Application(
[(r'/send/',SendMsg),
(r'/wb/',WB),
],debug=True
)
if __name__ == '__main__':
loop = loop = asyncio.new_event_loop()
threading.Thread(target=redis_listener(loop)).start()
server = httpserver.HTTPServer(app)
server.listen(8000)
tornado.ioloop.IOLoop.instance().start()
asyncio.new_event_loop() 创建一个event loop对象
set_event_loop(eventloop对象) 将 event loop 对象指定为当前协程的 event loop
一个协程内只允许运行一个 event loop,不要一个协程有两个event loop交替运行
tornado 实现 协程异步 + 事件循环 包括redis 异步
#接口 发布消息
class SendMsg(BaseHandler):
#发布信息
async def post(self):
global np
# 获取频道
channel = self.get_argument('channel',None)
# 获取数据
data = self.get_argument('data',None)
id = self.get_argument('id',None)
np = channel
data = '{"data":"%s","id":%s}' % (data,id)
#发布消息
r = redis.Redis(decode_responses=True)
r.publish(channel,data)
return self.write({"code":200,"msg":data})
async def reader(channel):
while True:
async with async_timeout.timeout(1):
# 获取频道发布的消息
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(message) # 查看订阅的信息信息
async with aiofiles.open("test.txt",'a',encoding='utf8') as f:
await f.write(message['data']+'\n')
# 异步监听器 异步redis 发布订阅实列
async def setup():
r = await
aioredis.from_url('redis://localhost/127.0.0.1:3306/',decode_responses=True)
# 获取发布订阅实列
PubSub = r.pubsub()
# 开始订阅
await PubSub.subscribe(client_id)
# 创建异步任务
asyncio.create_task(reader(PubSub))
#建立tornado 实列
app = tornado.web.Application(
[(r'/send/',SendMsg),
(r'/wb/',WB),
],debug=True
)
if __name__ == '__main__':
app.listen(8000)
# 获取当前的事件循环对象
loop = tornado.ioloop.IOLoop.current()
# 回调方法,监听添加的事件循环
loop.add_callback(setup)
#开启事件循环
loop.start()
|