IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> http库三剑客:aiohttp -> 正文阅读

[网络协议]http库三剑客:aiohttp

**简介:**aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio(Python用于支持异步编程的标准库)的异步库。

github地址:

https://github.com/aio-libs/aiohttp

核心功能:

同时支持客户端使用和服务端使用。
同时支持服务端WebSockets组件和客户端WebSockets组件,开箱即用。
web服务器具有中间件,信号组件和可插拔路由的功能。

aiohttp库安装:

pip install aiohttp

客户端案例:

import aiohttp
import asyncio
import async_timeout


async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://www.baidu.com')
        print(html)


# python 3.4+
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

# python 3.7+
asyncio.run(main())

服务端案例:

from aiohttp import web


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)


app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

web.run_app(host="127.0.0.1", port=3033, app=app)

运行服务端:
在这里插入图片描述
访问:http://127.0.0.1:3033/
在这里插入图片描述
执行100次异步请求:

import aiohttp
import random
import datetime
import asyncio
import time


async def request(client, body):
    resp = await client.post('http://182.254.246.137:3031/api/test', json=body)
    result = await resp.json()
    print(result)


async def main():
    async with aiohttp.ClientSession() as client:
        start = time.time()
        task_list = []
        for _ in range(100):
            now = datetime.datetime.now()
            delta = random.randint(5, 15)
            ts = (now - datetime.timedelta(days=delta)).strftime('%Y-%m-%d %H:%M:%S')
            req = request(client, {'ts': ts})
            task = asyncio.create_task(req)
            task_list.append(task)
        await asyncio.gather(*task_list)
        end = time.time()
    print(f'发送100次请求,耗时:{end - start}')


if __name__ == '__main__':
    asyncio.run(main())

服务端源码:

from sanic import Sanic
from sanic.response import json
import datetime

app = Sanic(__name__)


@app.get('api/hello')
async def hello_world(request):
    return json({
        "code": "hi, tom. 2021-10-02 1106",
        "success": True,
        "msg": str(datetime.datetime.now())[:-7]
    })


@app.post('api/test')
async def test(request):
    print("body:{}".format(request.json))
    return json({
        "code": "hi, tom. 2021-11-02 1106",
        "success": True,
        "msg": str(datetime.datetime.now())[:-7]
    })

if __name__ == '__main__':
    app.run(host="111.111.111.111", port=3031, auto_reload=True)

异步执行100次客户端运行效果:
在这里插入图片描述
异步执行100次服务端接收结果:
图片

微信公众号:玩转测试开发
欢迎关注,共同进步,谢谢!

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-11-10 12:45:58  更:2021-11-10 12:48:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年7日历 -2024/7/1 21:11:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码