asyncio是用来编写并发代码的库,使用async/await语法;其被用作高性能异步框架的基础(包括网络和网站服务,数据库连接库,分布式任务队列等等)。
asyncio
asyncio提供一组高层级API用于:
- 并发地运行Python协程并对其执行过程实现完全控制;
- 执行网络IO和IPC;
- 控制子进程;
- 通过队列实现分布式任务;
- 同步并发代码;
Eventloop
Eventloop实例提供了注册、取消和执行任务和回调的方法;是asyncio应用的核心,是中央总控。
把一些异步函数(任务,Task)注册到Eventloop上,Eventloop会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行。
Coroutine
协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:
import asyncio
async def a():
print('Suspending a')
await asyncio.sleep(0.1)
print('Resuming a')
async def b():
print('In b')
async def main():
await asyncio.gather(a(), b())
if __name__ == '__main__':
asyncio.run(main())
其上有以下关键点:
- 协程用async def声明;
- asyncio.gather用来并发运行任务,在这里表示协同的执行a和b两个协程;
- await表示调用协程;
- asyncio.run是执行EventLoop;Python 3.7前需要:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Future
Future是对协程的封装,异步操作结束后会把最终结果设置到这个Future对象上。可以对这个Future实例:
- 添加完成后的回调(add_done_callback)
- 取消任务(cancel)
- 设置最终结果(set_result)
- 设置异常(set_exception)
await myFuture 等待future执行完成。
示例
要保证多个任务并发,需要使用gather或create_task(创建任务)方式来执行;否则可能达不到想要的结果:
import asyncio
import time
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
async def b():
print('Suspending b')
await asyncio.sleep(2)
print('Resuming b')
async def runEach():
await a()
await b()
async def runGather():
await asyncio.gather(a(), b())
async def runTask():
t1 = asyncio.create_task(a())
t2 = asyncio.create_task(b())
await t1
await t2
async def runDirect():
await asyncio.create_task(a())
await asyncio.create_task(b())
async def runCombine():
tb = asyncio.create_task(b())
await a()
await tb
def show_perf(func):
print('*' * 20)
start = time.perf_counter()
asyncio.run(func())
print(f'{func.__name__} Cost: {time.perf_counter() - start}')
if __name__ == '__main__':
show_perf(runEach)
show_perf(runGather)
show_perf(runTask)
show_perf(runDirect)
show_perf(runCombine)
如上,不能直接await协程,也不能直接await create_task(要wait其结果,返回的task);否则无法并行执行。
websockets
WebSocket是基于TCP的应用层协议,实现了浏览器与服务器全双工(full-duplex)通信。其本质是保持TCP连接,在浏览器和服务端通过Socket进行通信。
websockets是基于asyncio的简单、高效的websocket库,使用时需要先安装库:
pip install websockets
pip install websockets -t C:\Users\gdxu\Anaconda3\envs\webStudy\Lib\site-packages
若要同步收发消息(发送然后等待应答):
import asyncio
from websockets import connect
async def hello(uri):
async with connect(uri) as websocket:
await websocket.send("Hello world!")
reply = await websocket.recv()
return reply
操作类
基于websockets中发送与接收接口:
import json
import logging
from websockets import connect
class Handler:
def __init__(self, loop=None):
self.ws = None
self.loop = loop
async def async_connect(self, url):
logging.info("attempting connection to {}".format(url))
self.ws = await connect(url)
logging.info("connected")
def sendJsonObj(self, cmd):
return self.loop.run_until_complete(self.async_sendJsonObj(cmd))
async def async_sendJsonObj(self, cmd):
return await self.ws.send(json.dumps(cmd))
def sendByte(self, cmd):
return self.loop.run_until_complete(self.async_sendByte(cmd))
async def async_sendByte(self, cmd):
return await self.ws.send(cmd)
def close(self):
self.loop.run_until_complete(self.async_close())
logging.info('closed')
async def async_close(self):
await self.ws.close(reason="user quit")
self.ws = None
def toRecv(self):
self.loop.run_until_complete(self.async_recv())
async def async_recv(self, callback=None):
logging.info('async_recv begin')
while True:
reply = await self.ws.recv()
if callback:
callback(reply)
logging.info('async_recv end')
使用
使用以上类异步收发数据:
import struct
import json
import asyncio
import logging
from wsHandler import Handler
from logging_config import init_logging
remoteUri = 'wss://ws.test.com:2443/'
login = {'role': 'admin', 'user_id': 'test1234'}
toFree = struct.pack('=ib', 0, 0)
setService = struct.pack('=i', 2) + json.dumps(["first", "test"]).encode('utf-8')
async def sendCmd(handle):
await handle.async_sendJsonObj(login)
await asyncio.sleep(0.5)
await handle.async_sendByte(setService)
await asyncio.sleep(0.5)
await handle.async_sendByte(toFree)
def handleReply(reply):
if len(reply) >= 4:
resp = struct.unpack('=i', reply[:4])
logging.info("reply: {}".format(resp))
async def recvCmd(handle):
await handle.async_recv(handleReply)
if __name__ == '__main__':
init_logging()
handler = Handler()
loop = asyncio.get_event_loop()
loop.run_until_complete(handler.async_connect(remoteUri))
tasks = asyncio.gather(sendCmd(handler), recvCmd(handler))
loop.run_until_complete(tasks)
loop.run_until_complete(handler.async_close())
logging.info('quit')
|