IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 第二十一章 异步编程 -> 正文阅读

[系统运维]第二十一章 异步编程

异步编程的常规方法的问题是异步程序要么做完所有的事情,要么一件事也没有做完。重写所有的代码是为了保证程序不会阻塞,否则只是在浪费时间。?

-------Alvaro Videla & Jason J. W. Williams, RabbitMQ in Action

本章讨论三个密切相关的主要主题:

  • Python 的 async def、await、async with 和 async for 结构;
  • 支持这些结构的对象:原生协程以及上下文管理器、迭代器、生成器、推导式的异步变体;
  • asyncio 和其他异步库。

本章建立在可迭代对象和生成器(第 17 章,特别是“经典协程”)、上下文管理器(第 18 章)和并发编程的一般概念(第 19 章)的思想之上。

我们将研究类似于我们在第 20 章中看到的并发 HTTP 客户端,用原生协程和异步上下文管理器进行重写,使用的是和以前相同的 HTTPX 库,但现在的实现使用的是HTTPX异步 API。我们还将了解如何通过将慢速操作委托给一个特定的线程或进程执行器来避免阻塞事件循环。

在 HTTP 客户端示例之后,我们还将学习两个简单的异步服务器端应用程序,其中一个使用的是流行的 FastAPI 框架。然后我们将介绍 async/await 关键字支持的其他语法结构:异步生成器函数、异步推导式和异步生成器表达式。为了强调这些语言特性与 asyncio 并无关系,我们将看到一个使用 Curio框架重写的示例——Curio是由 David Beazley 开发的优雅创新的异步框架。

为了结束本章,我写了一个简短的部分,介绍异步编程的优点和缺点。

本章的内容覆盖面很大。我只演示了一些基本的用例,但这些用例将说明每个特性的最重要特征。

TIP

在 Yury Selivanov?重新组织之后,asyncio?documentation?要好得多,将少数对应用程序开发人员有用的函数与适用于Web 框架和数据库驱动程序等包的低级 API 分开。

对于 asyncio 的整体介绍,我推荐 Caleb Hattingh 的 Using Asyncio in Python(O'Reilly,2020 年)。做一个披露:他是本书的技术评论家之一。

本章的新内容

当我编写 Fluent Python, First Edition 时,asyncio 库是临时的,并且还没有 async/await 关键字。因此,我不得不更新本章中的所有示例。我还创建了新示例:域探测脚本、一个FastAPI Web 服务以及 Python 新异步控制台模式的实验。

新部分涵盖了当时不存在的语言功能,例如原生协程、async with、async for 以及支持这些结构的对象。它们可能会为您省去很多麻烦——无论您使用的是 Python 还是 Node.js。

最后,我删除了关于 asyncio.Futures 的段落,现在这部分被认为是低级 asyncio API 的一部分。

几个重要的定义

在“经典协程”的开头,我们看到 Python 3.5 及更高版本提供了三种协程:

原生协程:

用 async def 定义的协程函数。您可以使用 await 关键字将一个原生协程委托给另一个原生协程,类似于经典协程使用 yield from 的方式。async def 语句始终定义了一个原生协程,即使其函数体中未使用 await 关键字。await 关键字不能在原生协程之外使用 。

经典协程:

一个生成器函数,它消费通过 my_coro.send(data) 调用发送给它的数据,并通过在表达式中使用 yield 读取该数据。一个经典协程可以使用 yield from 委托给另一个经典协程。经典协程不能由 await 驱动,并且 asyncio 也不再支持经典协程。

基于生成器的协程:

用 @types.coroutine 修饰的生成器函数——在 Python 3.5 中引入。该装饰器使生成器与新的 await 关键字兼容。

在本章中,我们关注原生协程以及异步生成器:

异步生成器:

使用 async def 定义并在其主体中使用 yield 的生成器函数。它返回一个异步生成器对象,该对象提供 __anext__方法,这是一种检索下一项的协程方法。


@ASYNCIO.COROUTINE?没有使用future

根据 issue43216,用于经典协程和基于生成器的协程的 @asyncio.coroutine 装饰器在 Python 3.8 中已弃用,并计划在 Python 3.11 中删除。相比之下,?根据 ?issue36921?,@types.coroutine 应该保留。 asyncio 不再支持@types.coroutine,但在 Curio 和 Trio 异步框架的低级代码中使用了@types.coroutine。


一个Asyncio 示例:探测域名

想象一下,您将要注册一个关于 Python 的新博客,并且您计划使用 Python 关键字和 .DEV 后缀注册一个域名,例如:AWAIT.DEV。示例 21-1 是一个使用 asyncio 同时检查多个域名的脚本。这是它的输出结果:

$ python3 blogdom.py
  with.dev
+ elif.dev
+ def.dev
  from.dev
  else.dev
  or.dev
  if.dev
  del.dev
+ as.dev
  none.dev
  pass.dev
  true.dev
+ in.dev
+ for.dev
+ is.dev
+ and.dev
+ try.dev
+ not.dev

请注意,域名看起来是无序的。如果您运行该脚本,您会看到它们一个接一个地显示出来,并且延迟也不一样。+ 号表示您的机器能够通过 DNS 解析域。否则,域名无法解析并且是可能可用的。

在 blogdom.py 中,DNS 探测是通过原生协程对象完成的。由于异步操作是交错的,因此检查 18 个域所需的时间比顺序检查要少得多。实际上,总时间实际上与单个最慢 DNS 响应的时间相同,而不是所有响应时间的总和。

这是 blogdom.py 的代码:

例 21-1。 blogdom.py:搜索 Python 博客的域

#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4  1


async def probe(domain: str) -> tuple[str, bool]:  2
    loop = asyncio.get_running_loop()  3
    try:
        await loop.getaddrinfo(domain, None)  4
    except socket.gaierror:
        return (domain, False)
    return (domain, True)


async def main() -> None:  5
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)  6
    domains = (f'{name}.dev'.lower() for name in names)  7
    coros = [probe(domain) for domain in domains]  8
    for coro in asyncio.as_completed(coros):  9
        domain, found = await coro  10
        mark = '+' if found else ' '
        print(f'{mark} {domain}')


if __name__ == '__main__':
    asyncio.run(main())  11
  1. 为域设置关键字的最大长度,因为越短越好。
  2. probe返回一个包含域名和布尔值的元组; True 表示域已解析。返回域名将更容易显示结果。
  3. 获取对 asyncio 事件循环的引用,以便我们接下来可以使用它。
  4. loop.getaddrinfo(…) 协程方法返回一个五个部分组成的参数元组(five-part tuple of parameters)?,以供套接字使用连接到给定的地址。在这个例子中,我们不需要结果。如果我们得到结果,说明域名可以解析;反之亦然。
  5. main 必须是一个协程,以便我们可以在其中使用 await。
  6. 生成长度最大为 MAX_KEYWORD_LEN 的 Python 关键字的生成器。
  7. 生成带有 .dev 后缀的域名的生成器。
  8. 通过使用每个domain参数调用probe协程来构建协程对象列表。
  9. asyncio.as_completed 是一个生成协程的生成器,这些协程按照它们完成的顺序返回结果——而不是按照协程提交的顺序。这个方法类似于我们在第 20 章示例 20-4 中看到的 futures.as_completed。
  10. 此时,我们知道协程已经完成,因为 as_completed 就是这样工作的。因此,await 表达式不会阻塞,但我们需要它来从 coro 获取结果。如果 coro 抛出了一个未处理的异常,它会在这里重新抛出异常。
  11. asyncio.run 启动事件循环并只在事件循环退出时返回。这是使用 asyncio 的脚本的常见模式:将 main 实现为协程,并在 if __name__ == '__main__': 块中使用 asyncio.run 驱动这个协程。

TIP

asyncio.get_running_loop 函数是在 Python 3.7 中添加的,用于在协程中使用,如probe示例所示。如果没有运行循环,asyncio.get_running_loop 会抛出?RuntimeError。它的实现比 asyncio.get_event_loop 更简单、更快,如果需要,它还可以启动一个事件循环。从 Python 3.10 开始, asyncio.get_event_loop 已被弃用,最终将成为 asyncio.get_running_loop 的别名。

Guido 阅读异步代码的技巧

在 asyncio 中有很多新概念需要掌握,但如果您采用 Guido van Rossum 本人建议的技巧,则示例 21-1 的整体逻辑很容易理解:眯着眼睛假装 async 和 await 关键字不存在。如果你这样做,你会意识到协程读起来就像原来的顺序函数。

例如,想象一下这个协程的主体......

async def probe(domain: str) -> tuple[str, bool]:
    loop = asyncio.get_running_loop()
    try:
        await loop.getaddrinfo(domain, None)
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

...的工作方式类似于以下函数,只是它从不阻塞:

def probe(domain: str) -> tuple[str, bool]:  # no async
    loop = asyncio.get_running_loop()
    try:
        loop.getaddrinfo(domain, None)  # no await
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

由于 await 会挂起当前协程对象,使用语法 await loop.getaddrinfo(...) 可以避免阻塞。比如在probe('if.dev')协程的执行过程中,会通过getaddrinfo('if.dev', None)创建一个新的协程对象。await这个协程会执行低级的 addrinfo 查询并将控制权交还给事件循环,而不是交还给被挂起的 probe(‘if.dev’) 协程。然后事件循环可以驱动其他挂起的协程对象,例如probe('or.dev')。

当事件循环获得对 getaddrinfo('if.dev', None) 查询的响应结果时,该特定协程对象将恢复并将控制权返回给在await语句处暂停的probe('if.dev'),暂停的协程恢复并继续执行以处理可能抛出的异常并返回结果元组。

到目前为止,我们只看到将?asyncio.as_completed 和 await 应用于协程。实际上它们可以处理任何可等待(awaitable)的对象。接下来解释这个概念。

新概念:awaitable

for 关键字可以用于可迭代对象。 await 关键字可以用于可等待对象。

作为 asyncio 的终端用户,下面是经常见到的可等待对象:

  • 一个原生协程对象,你可以通过调用一个原生协程函数来获得它。
  • 一个 asyncio.Task,通常通过将协程对象传递给 asyncio.create_task() 来获得。

但是,终端用户代码并不总是需要await一个Task。我们使用 asyncio.create_task(one_coro()) 来调度 one_coro 并发执行,而不需要等待协程的返回。这就是我们对 spinner_async.py 中的 spinner 协程所做的事情(示例 19-4)。如果您不希望取消或等待任务,则无需保留从 create_task 返回的 Task 对象。创建这个Task就可以保证协程被安排运行。

相比之下,我们现在使用 await other_coro() 来运行 other_coro 并等待完成,因为我们需要协程的结果才能继续。在 spinner_async.py 中,supervisor协程执行 res = await slow() 以执行并获得slow结果。

在实现异步库或作为asyncio 库维护者时,您还可以处理这些较低级别的可等待对象:

  • 一个实现?__await__ 方法的对象,该方法返回一个迭代器;例如,asyncio.Future 实例(asyncio.Task 是 asyncio.Future 的子类)。
  • 实现 tp_as_async.am_await 函数的 Python/C API 以其他语言编写的对象,这个函数返回一个迭代器(类似于 __await__ 方法)。

现有的代码库可能还有一种额外的可等待对象:基于生成器的协程对象——即将被弃用。

Note:

PEP 492 声明 await 表达式“使带有验证参数的额外步骤的yield from实现“和”和“await 只接受可等待对象”。PEP 没有详细解释该实现,而是参考了 PEP 380,其中介绍了 yield from。我在 fluentpython.com 的 Classic Coroutines?中的“?The Meaning of yield from”一节中发布了详细说明。

现在让我们研究下载一组固定国旗图像的脚本的 asyncio 版本。

使用 asyncio 和 HTTPX 下载国旗

flags_asyncio.py 脚本从 fluentpython.com 下载一组固定的 20 个国旗。我们首先在“并发 Web 下载”中提到它,但现在我们将详细研究它,应用我们刚刚看到的概念。

从 Python 3.10 开始,asyncio 仅直接支持 TCP 和 UDP,标准库中没有异步 HTTP 客户端或服务器包。我在所有 HTTP 客户端示例中都使用了 HTTPX。

我们将自下而上探索 flags_asyncio.py,即首先查看示例 21-2 中设置操作的函数。

WARNING:

为了使代码更易于阅读,flags_asyncio.py 没有进行异常处理。当我们介绍 async/await 时,初学者使用“快乐路径”进行学习可以快速了解常规函数和协程在程序中的排列方式。从?“Enhancing the asyncio downloader”之后,示例就包括异常处理和更多功能。

本章和第 20 章中的 flags*.py 示例共享代码和数据,因此我将它们放在 example-code-2e/20-executors/getflags 目录中。

例 21-2。 flags_asyncio.py:启动函数

def download_many(cc_list: list[str]) -> int:    1
    return asyncio.run(supervisor(cc_list))      2

async def supervisor(cc_list: list[str]) -> int:
    async with AsyncClient() as client:          3
        to_do = [download_one(client, cc)
                 for cc in sorted(cc_list)]      4
        res = await asyncio.gather(*to_do)       5

    return len(res)                              6

if __name__ == '__main__':
    main(download_many)
  1. 这需要是一个普通函数——而不是协程——以便传递给 flags.py 模块中的main函数并由main函数调用(示例 20-2)。
  2. 执行驱动 supervisor(cc_list) 协程对象的事件循环,直到它返回。这将在事件循环运行时阻塞线程。该行的结果是返回supervisor返回的任何内容。
  3. httpx 中的异步 HTTP 客户端操作由AsyncClient 的方法支持,它也是一个异步上下文管理器:具有异步setup和teardown方法的上下文管理器(在“异步上下文管理器”中详细介绍了这一点)
  4. 通过为每个要检索的国旗调用?download_one 协程来构建协程对象列表。
  5. await asyncio.gather 协程,它接受一个或多个可等待对象作为参数并等待所有参数完成,按照提交顺序返回对应可等待对象参数的结果列表。
  6. supervisor 返回 asyncio.gather 返回的列表的长度。

现在让我们回顾一下 flags_asyncio.py 的顶部。我重新组织了协程,以便我们可以按照事件循环启动的顺序进行阅读。

例 21-3。 flags_asyncio.py:导入部分和下载函数

import asyncio

from httpx import AsyncClient  1

from flags import BASE_URL, save_flag, main  2

async def download_one(client: AsyncClient, cc: str):  3
    image = await get_flag(client, cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

async def get_flag(client: AsyncClient, cc: str) -> bytes:  4
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=6.1,
                                  follow_redirects=True)  5
    return resp.read()  6
  1. 必须先安装 httpx——它不在标准库中。
  2. 重用 flags.py 中的代码(示例 20-2)
  3. download_one 必须是原生协程,因此它可以await?get_flag——这个函数执行 HTTP 请求。然后打印下载国旗的国家代码,并保存图像。
  4. get_flag 需要接收一个 AsyncClient 用以发送请求。
  5. httpx.AsyncClient 实例的 get 方法返回一个 ClientResponse 对象,它也是一个异步上下文管理器。
  6. 网络 I/O 操作是作为协程方法实现的,因此它们由 asyncio 事件循环异步驱动。

Note:

为了获得更好的性能,get_flag 中的 save_flag 调用应该是异步的,以避免阻塞事件循环。但是,asyncio 目前没有像 Node.js 那样提供异步文件系统 API。

“Using asyncio.as_completed and a thread”将展示如何将 save_flag 委托给一个线程。您的代码通过 await 或异步上下文管理器的特殊方法(例如 AsyncClient 和 ClientResponse)显式委托给 httpx 协程--正如我们将在“Asynchronous Context Managers”中看到的那样。

原生协程的秘密:Humble 生成器

我们在“经典协程”和 flags_asyncio.py 中看到的经典协程示例之间的主要区别在于后者没有使用 .send() 调用或 yield 表达式。您的代码位于 asyncio 库和您正在使用的异步库(例如 HTTPX)之间。这在图 21-1 中进行了说明。

在幕后,asyncio 事件循环进行 .send 调用以驱动您的协程,并且您的协程await其他协程,包括库协程。?如前所述,await 从 yield from借鉴了大部分实现,yield from也使用 .send 调用来驱动协程。

await 链最终到达一个低级的 可等待对象,这个可等待对象返回一个生成器,事件循环可以驱动这个生成器来响应诸如计时器或网络 I/O 之类的事件。这些 await 链末尾的低级 awaitable 和生成器在库的深处实现,他们不是库的 API 的一部分,但是可能是 Python/C 实现的扩展。

使用 asyncio.gather 和 asyncio.create_task 等函数,您可以启动多个并发等待通道,从而在单个线程中并发执行由单个事件循环驱动的多个 I/O 操作。

all-or-nothing 问题

请注意,在示例 21-3 中,我无法重用 flags.py(示例 20-2)中的 get_flag 函数。我不得不将它重写为协程才能使用 HTTPX 的异步 API。为了使用 asyncio 以获得最佳性能,我们必须将每个操作?I/O 的函数替换为使用 await 或 asyncio.create_task 激活的异步版本,以便在函数等待 I/O 时将控制权交还给事件循环。如果您不能将阻塞型函数重写为协程,则应该在单独的线程或进程中运行它,正如我们将在“Delegating tasks to executors”中看到的那样。

这就是我为本章选择题词的原因,其中包含以下建议:“你需要重写所有代码来避免代码阻塞,否则你只是在浪费时间。”

出于同样的原因,我也无法重用 flags_threadpool.py(示例 20-3)中的 download_one 函数。示例 21-3 中的代码使用 await 驱动 get_flag,因此 download_one 也必须是一个协程。对于每个请求,在 supervisor 中会创建一个 对应的download_one 协程对象,它们都由 asyncio.gather 协程驱动。

现在让我们研究出现在 supervisor(示例 21-2)和 get_flag(示例 21-3)中的 async with 语句。

异步上下文管理器

在“上下文管理器和 with 块”中,我们看到如果对象的类提供 __enter__ 和 __exit__ 方法,对象在 with 块的主体之前和之后运行代码。

现在,分析示例 21-4,来自 asyncpg 中可异步的?PostgreSQL 驱动documentation on transactions

示例 21-4。来自 asyncpg PostgreSQL 驱动程序文档的示例代码。

tr = connection.transaction()
await tr.start()
try:
    await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
except:
    await tr.rollback()
    raise
else:
    await tr.commit()

数据库事务非常适配于上下文管理器协议:必须先启动事务,使用 connection.execute 更改数据,然后必须发生回滚或提交,具体取决于更改的结果。

在像 asyncpg 这样的异步驱动程序中,set-up和包装函数需要是协程——以便其他操作可以并发进行。但是,经典 with 语句的实现不支持使用协程执行 __enter__ 或 __exit__ 。

这就是为什么 PEP 492—Coroutines with async and await syntax和 await 语法引入了 async with 语句,它可以与异步上下文管理器一起使用:以协程实现?__aenter__ 和 __aexit__ 方法的对象。

使用 async with,示例 21-4 可以像 asyncpg 文档中其他的代码片段一样编写:

async with connection.transaction():
    await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")

在 ?asyncpg.Transaction 类中, __aenter__ 协程方法执行await self.start() 而 __aexit__ 协程awaite私有的?__rollback 或 __commit 协程方法,调用哪个协程取决于是否发生异常。使用协程将 Transaction 实现为异步上下文管理器允许 asyncpg 并发处理多事务。


ASYNCPG 上的 CALEB HatTINGH

关于 asyncpg 的另一个非常棒的事情是,它还通过为 Postgres 本身的内部连接实现一个连接池来解决 PostgreSQL 缺乏高并发支持(每个连接使用一个服务器端进程)的问题。

这意味着您不需要像 asyncpg 文档中的推荐的 pgbouncer 等其他工具。


回到 flags_asyncio.py,httpx 的 AsyncClient 类是一个异步上下文管理器,因此它可以在其 __aenter__ 和 __aexit__ 特殊协程方法中使用可等待对象。

Note:

“作为上下文管理器的异步生成器”展示了如何使用 Python 的 contextlib 来创建异步上下文管理器,而无需编写类。由于一个先决主题:“异步生成器函数”,本章稍后会进行介绍。

我们现在将使用进度条增强 asyncio 国旗下载示例,这将引导我们探索更多的 asyncio API。

增强 asyncio 下载器

回想一下“Downloads with Progress Display and Error Handling”?,flags2 示例集共享相同的命令行界面,并且在下载时它们会显示一个进度条。

Note
我鼓励您使用 flags2 示例来直观地了解并发 HTTP 客户端的执行方式。使用 -h 选项查看示例 20-10 中的帮助说明。使用 -a、-e 和 -l 命令行选项控制下载次数,使用 -m 选项设置并发下载次数。在 LOCAL、REMOTE、DELAY 和 ERROR 服务器上分别运行测试。发现并发下载的最佳数量,以最大限度地提高每台服务器的吞吐量。按照“Setting up test servers”中的说明调整测试服务器的选项。

例如,示例 21-5 显示了使用 100 个并发请求 (-m 100) 从 ERROR 服务器获取 100 个国旗?(-al 100) 的尝试。结果中的 48 个错误是 HTTP 418 或超时错误——slow_server.py 的预期(错误)行为。

示例 21-5。运行 flags2_asyncio.py

$ python3 flags2_asyncio.py -s ERROR -al 100 -m 100
ERROR site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.48it/s]
--------------------
 52 flags downloaded.
 48 errors.
Elapsed time: 3.31s

在测试并发客户端时要采取正确的行为

即使线程和异步 HTTP 客户端之间的总体下载时间没有太大差异,异步可以更快地发送请求,因此服务器更有可能被怀疑 DOS 攻击。要真正全速运行这些并发客户端,请使用本地 HTTP 服务器进行测试,如?“Setting up test servers”中所述。


现在让我们看看 flags2_asyncio.py 是如何实现的。

使用 asyncio.as_completed 和一个单独线程

在示例 21-3 中,我们将几个协程传递给 asyncio.gather,它会按照提交的顺序返回一个包含协程结果的列表。这意味着 asyncio.gather 只能在所有可等待对象完成后返回。但是,要更新进度条,我们需要在完成时获取结果。

幸运的是,我们在带有进度条的线程池示例中使用的 as_completed 生成器函数在asyncio 中有一个 等效函数(示例 20-16)。

示例 21-6 显示了 flags2_asyncio.py 脚本的上半部分,其中定义了 get_flag 和 download_one 协程。示例 21-7 列出了其余的源代码,包括 supervisor 和 download_many。由于加入了异常处理,此脚本比 flags_asyncio.py 长。

示例 21-6。 flags2_asyncio.py:脚本的顶部;其余代码在示例 21-7 中

import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus, save_flag

# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient,  1
                   base_url: str,
                   cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)   2
    resp.raise_for_status()
    return resp.content

async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:  3
            image = await get_flag(client, base_url, cc)
    except httpx.HTTPStatusError as exc:  4
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        await asyncio.to_thread(save_flag, image, f'{cc}.gif')  5
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status
  1. get_flag 与示例 20-14 中的顺序版本非常相似。第一个区别:它需要client参数。
  2. 第二和第三个区别:.get 是一个 AsyncClient 方法,它是一个协程,所以我们需要await这个协程。
  3. 将semaphore用作异步上下文管理器,这样整个程序就不会被阻塞:当semaphore计数器为零时,只有这个协程被挂起。在“Python’s Semaphores”中了解更多信息。
  4. 异常处理逻辑与示例 20-14 中的 download_one 相同。
  5. 保存图像是一个 I/O 操作。为避免阻塞事件循环,需要在线程中运行 save_flag。

所有网络 I/O 都是通过 asyncio 中的协程完成的,但是文件 I/O没有使用协程。然而,文件 I/O 也是“阻塞的”——从某种意义上说,读/写文件比读/写 RAM 花费的时间要长数千倍。如果您使用的是?Network-Attached Storage,它甚至可能涉及网络 I/O。

从 Python 3.9 开始,asyncio.to_thread 协程可以将文件 I/O 委托给 asyncio 提供的线程池。如果你需要支持 Python 3.7 或 3.8,“Delegating tasks to executors”展示了如何添加几行代码来实现这个特性。但首先让我们完成对 HTTP 客户端代码的研究。

使用semaphore(信号量)限制请求

像我们正在研究的网络客户端应该受到限制(例如,限制请求数)以避免过多的并发请求冲击服务器。

?semaphore是一个同步原始量,它比锁更灵活。一个semaphore可以由多个协程持有,最大数量是可配置的。这样就可以限制活动并发协同程序的数量。 “Python’s Semaphores”里面有更多说明。

在 flags2_threadpool.py(示例 20-16)中,通过在 download_many 函数中将所需的 max_workers 参数设置为 concur_req 来实例化 ThreadPoolExecutor 来完成限流。在 flags2_asyncio.py 中有一个由supervisor函数创建的 asyncio.Semaphore(如示例 21-7 所示)并作为示例 21-6 中的 download_one 的 semaphore 参数传递。


Python 的信号量

计算机科学家 Edsger W. Dijkstra 在 1960 年代初期发明了信号量。这是一个简单的想法,但它非常灵活,以至于大多数其他同步对象(例如锁和barrier)都可以构建在信号量之上。Python 的标准库中有三个 Semaphore 类:一个在threading中,另一个在multiprocessing中,第三个在 asyncio 中。在这里,我们将介绍最后一个。

asyncio.Semaphore 有一个内部计数器,每当我们等待 .acquire() 协程方法时,该计数器就会递减,当我们调用 .release() 方法时递增——.release()方法不是协程,因为它从不阻塞。实例化 Semaphore 时设置计数器的初始值:

semaphore = asyncio.Semaphore(concur_req)

当计数器大于零时,等待 .acquire() 不会有延迟,?但如果计数器为零,.acquire() 会挂起等待的协程,直到其他协程在同一Semaphore上调用 .release(),从而增加计数器。与其直接使用这些方法,不如使用Semaphore作为异步上下文管理器更安全,就像我在示例 21-6 中所做的那样,函数 download_one:

        async with semaphore:
            image = await get_flag(client, base_url, cc)

Semaphore.__aenter__ 协程方法await .acquire(),它的 __aexit__ 协程方法调用 .release()。?该片段保证在任何时候最多只有 concur_req 个 get_flags 协程实例处于活动状态。

标准库中的每个 Semaphore 类都有一个 BoundedSemaphore 子类,它强制执行一个额外的约束:当 .release() 多于 .acquire() 操作时,内部计数器永远不会大于初始值。


现在让我们看一下示例 21-7 中脚本的其余部分。

示例 21-7。 flags2_asyncio.py:示例 21-6 后面的脚本

async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int) -> Counter[DownloadStatus]:  1
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)  2
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  3
        to_do_iter = asyncio.as_completed(to_do)  4
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  5
        error: httpx.HTTPError | None = None  6
        for coro in to_do_iter:  7
            try:
                status = await coro  8
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc  9
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc  10
            except KeyboardInterrupt:
                break

            if error:
                status = DownloadStatus.ERROR  11
                if verbose:
                    url = str(error.request.url)  12
                    cc = Path(url).stem.upper()   13
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1

    return counter

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)  14

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-01-12 00:27:51  更:2022-01-12 00:29:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/10 11:39:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码