**简介:**aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio(Python用于支持异步编程的标准库)的异步库。
github地址:
https://github.com/aio-libs/aiohttp
核心功能:
同时支持客户端使用和服务端使用。
同时支持服务端WebSockets组件和客户端WebSockets组件,开箱即用。
web服务器具有中间件,信号组件和可插拔路由的功能。
aiohttp库安装:
pip install aiohttp
客户端案例:
import aiohttp
import asyncio
import async_timeout
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://www.baidu.com')
print(html)
asyncio.run(main())
服务端案例:
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)
web.run_app(host="127.0.0.1", port=3033, app=app)
运行服务端: 访问:http://127.0.0.1:3033/ 执行100次异步请求:
import aiohttp
import random
import datetime
import asyncio
import time
async def request(client, body):
resp = await client.post('http://182.254.246.137:3031/api/test', json=body)
result = await resp.json()
print(result)
async def main():
async with aiohttp.ClientSession() as client:
start = time.time()
task_list = []
for _ in range(100):
now = datetime.datetime.now()
delta = random.randint(5, 15)
ts = (now - datetime.timedelta(days=delta)).strftime('%Y-%m-%d %H:%M:%S')
req = request(client, {'ts': ts})
task = asyncio.create_task(req)
task_list.append(task)
await asyncio.gather(*task_list)
end = time.time()
print(f'发送100次请求,耗时:{end - start}')
if __name__ == '__main__':
asyncio.run(main())
服务端源码:
from sanic import Sanic
from sanic.response import json
import datetime
app = Sanic(__name__)
@app.get('api/hello')
async def hello_world(request):
return json({
"code": "hi, tom. 2021-10-02 1106",
"success": True,
"msg": str(datetime.datetime.now())[:-7]
})
@app.post('api/test')
async def test(request):
print("body:{}".format(request.json))
return json({
"code": "hi, tom. 2021-11-02 1106",
"success": True,
"msg": str(datetime.datetime.now())[:-7]
})
if __name__ == '__main__':
app.run(host="111.111.111.111", port=3031, auto_reload=True)
异步执行100次客户端运行效果: 异步执行100次服务端接收结果:
微信公众号:玩转测试开发 欢迎关注,共同进步,谢谢!
|