今天这篇文章,聊一下python 在web 开发上的一些基础实现,阐述下自己理解中的WSGI 、ASGI ,以及拿uvicorn +FastAPI 的组合举个ASGI 应用的例子。
WSGI
python 的web 服务的诞生,其实追溯到一种机制,叫做WSGI ,全称Web Server Gateway Interface 。WSGI 的提案来源于PEP-333,可以理解为一种python-web-server 和python-web-app 的接口通信标准。在这种场景下,python 的web 服务呈现以下的工作模式:
python-web-app ,也就是web 应用层,实现WSGI 接口,用作web 请求的handler - 用户向
python-web-server 发送web 请求 python-web-server ,又称作WSGI Server ,解析请求数据,整理当前session 的环境信息python-web-server 加载python-web-app ,调用python-web-app 实例的WSGI 接口,处理请求python-web-app 处理完请求,返回结果给到python-web-server python-web-server 写回返回结果,给回用户
代码上是这样的表现,以官方提案的例子为例:
import os, sys
def simple_app(environ, start_response):
"""
python-web-app implementation
:param environ: 由python-web-server提供,表示当前请求的环境信息
:param start_response: 由python-web-server提供的回调,用以初始化返回结果的状态
:return: 返回结果的数据内容
"""
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return ['Hello world!\n']
def run_with_cgi(application):
"""
WSGI layer implementation
:param application: 实现WSGI的app
"""
environ = dict(os.environ.items())
headers_set = []
headers_sent = []
def write(data):
"""写回数据的逻辑"""
if not headers_set:
raise AssertionError("write() before start_response()")
elif not headers_sent:
status, response_headers = headers_sent[:] = headers_set
sys.stdout.write('Status: %s\r\n' % status)
for header in response_headers:
sys.stdout.write('%s: %s\r\n' % header)
sys.stdout.write('\r\n')
sys.stdout.write(data)
sys.stdout.flush()
def start_response(status, response_headers, exc_info=None):
"""初始化response的逻辑"""
if exc_info:
try:
if headers_sent:
raise exc_info[0], exc_info[1], exc_info[2]
finally:
exc_info = None
elif headers_set:
raise AssertionError("Headers already set!")
headers_set[:] = [status, response_headers]
return write
result = application(environ, start_response)
try:
for data in result:
if data:
write(data)
if not headers_sent:
write('')
finally:
if hasattr(result, 'close'):
result.close()
通过WSGI ,就可以实现python-web-app 和python-web-server 的分离,这样无论什么python-web-app ,只要实现了WSGI 接口标准,就能够无缝移植到其它支持WSGI 的python-web-server 上。
ASGI
自python3 推出异步IO实现asyncio 之后,ASGI 也应运而生。ASGI 的目标和WSGI 相同,但也有一些改进点,一方面是支持asyncio 的机制,另一方面也能够解决WSGI 难以支持WebSocket 之类长连接模式的问题。要深入了解ASGI ,可以参考这篇文档。
在ASGI 标准下,python-web-app 需要这样的接口实现:
async def application(scope, receive, send):
"""
python-web-app应用层实现
:param scope: 由python-web-server提供,表示当前连接的环境信息
:param receive: 通过这个协程,可以收到由python-web-server发来的事件
:param send: 通过这个协程,可以写回事件给python-web-server,比如让python-web-server处理response
"""
event = await receive()
...
await send({"type": "websocket.send", "text": "Hello world!"})
不论是receive 到的还是send 出去的event ,都会包含一个type 字段表示这个event 的类型,一般type 会有:
http.xxx :http 连接、请求、返回相关websocket.xxx :websocket 连接、请求、返回相关xxx.send/receive :收发消息相关lifespan.xxx :web 服务生命周期相关
ASGI案例之uvicorn+FastAPI
为了更加直观感受ASGI 的应用,本文也顺带以uvicorn 加FastAPI 的组合,通过源码实现来看ASGI 是如何串联起python-web-server 和python-web-app 的。
在笔者封装的简易http-web-app 框架start-fastapi中,就支持了通过uvicorn 启动FastAPI 应用。其中,main.py 的uvicorn 实例会加载app 模块下的APP 这一FastAPI 实例,启动web-app 应用。
def main() -> None:
uvicorn.run('app:APP', **cfg)
APP = FastAPI(**FASTAPI_CFG)
首先从uvicorn.run 开始看起,其代码实现如下:
def run(app: typing.Union[ASGIApplication, str], **kwargs: typing.Any) -> None:
config = Config(app, **kwargs)
server = Server(config=config)
if (config.reload or config.workers > 1) and not isinstance(app, str):
sys.exit(1)
if config.should_reload:
sock = config.bind_socket()
ChangeReload(config, target=server.run, sockets=[sock]).run()
elif config.workers > 1:
sock = config.bind_socket()
Multiprocess(config, target=server.run, sockets=[sock]).run()
else:
server.run()
默认会走Server 实例的run 方法,我们来看其中的实现:
class Server:
def run(self, sockets=None):
self.config.setup_event_loop()
loop = asyncio.get_event_loop()
loop.run_until_complete(self.serve(sockets=sockets))
async def serve(self, sockets=None):
config = self.config
if not config.loaded:
config.load()
self.lifespan = config.lifespan_class(config)
self.install_signal_handlers()
await self.startup(sockets=sockets)
if self.should_exit:
return
await self.main_loop()
await self.shutdown(sockets=sockets)
这里有两个重要步骤:
config.load :加载配置startup :启动服务器
首先看配置加载,里面会将app 实例进行初始化:
class Config:
def load(self):
assert not self.loaded
try:
self.loaded_app = import_from_string(self.app)
except ImportFromStringError as exc:
logger.error("Error loading ASGI app. %s" % exc)
sys.exit(1)
if self.interface == "auto":
if inspect.isclass(self.loaded_app):
use_asgi_3 = hasattr(self.loaded_app, "__await__")
elif inspect.isfunction(self.loaded_app):
use_asgi_3 = asyncio.iscoroutinefunction(self.loaded_app)
else:
call = getattr(self.loaded_app, "__call__", None)
use_asgi_3 = asyncio.iscoroutinefunction(call)
self.interface = "asgi3" if use_asgi_3 else "asgi2"
self.loaded = True
class FastAPI(Starlette):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.root_path:
scope["root_path"] = self.root_path
if AsyncExitStack:
async with AsyncExitStack() as stack:
scope["fastapi_astack"] = stack
await super().__call__(scope, receive, send)
else:
await super().__call__(scope, receive, send)
可以看到FastAPI 的app 实现里,定义了ASGI ,并且也在uvicorn 的config.load 里被识别到了。FastAPI 继承了Starlette ,而Starlette 本身即是支持ASGI 的web 框架,为python-web-app 提供了路由、中间件相关的应用级底层支持。FastAPI 实际是对Starlette 的包装,相关handler 、middleware 的注册也是给到Starlette 框架里面的。针对web-server 发来的请求,FastAPI 在设置一些环境信息后,最终也是交由Starlette 底层处理。
之后回到uvicorn ,看一下startup 的实现:
class Server:
async def startup(self, sockets: list = None) -> None:
await self.lifespan.startup()
if self.lifespan.should_exit:
self.should_exit = True
return
config = self.config
async def handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
await handle_http(
reader, writer, server_state=self.server_state, config=config
)
try:
server = await asyncio.start_server(
handler,
host=config.host,
port=config.port,
ssl=config.ssl,
backlog=config.backlog,
)
except OSError as exc:
logger.error(exc)
await self.lifespan.shutdown()
sys.exit(1)
startup 分两步:
- 初始化
lifespan - 定义
http-handler ,通过asyncio.start_server 启动http-server
在初始化lifespan 过程中,uvicorn 会发送lifespan.startup 事件,这个事件就会被FastAPI-app 的ASGI 捕获到,最终层层往下,会走到Starlette 的Router 实例:
class Router:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] in ("http", "websocket", "lifespan")
if "router" not in scope:
scope["router"] = self
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
return
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
first = True
app = scope.get("app")
await receive()
try:
if inspect.isasyncgenfunction(self.lifespan_context):
async for item in self.lifespan_context(app):
first = False
await send({"type": "lifespan.startup.complete"})
await receive()
except Exception as e:
pass
当Startlette 的Router 检测到lifespan 事件时,就会走到lifespan 逻辑,其中会看lifespan 的当前阶段是否有对应的hook 函数,有的话就执行。当前阶段是lifespan.startup ,因此如果我们在FastAPI 中定义了这个协程,就可以在startup 阶段执行到:
@APP.on_event('startup')
async def start_app():
pass
lifespan.startup 之后,就定义http-handler 并绑到listen-server 上。http-handler 会解析请求数据,然后调用app 的ASGI 接口处理请求,大致是这样的链路:
class H11Protocol(asyncio.Protocol):
def handle_events(self):
while True:
if event_type is h11.Request:
task = self.loop.create_task(self.cycle.run_asgi(app))
class RequestResponseCycle:
async def run_asgi(self, app):
try:
result = await app(self.scope, self.receive, self.send)
except Exception as e:
pass
好比我们GET 健康检查接口/api/v1/core/health ,那么最终被FastAPI-app 捕获到的请求数据里,scope 长这样:
scope = {
"type": "http",
"method": "GET",
"root_path": ""
"path": "/api/v1/core/health",
"query_string": b""
}
根据这些信息,层层往下,就会又走到Starlette 的路由逻辑:
class Router:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
for route in self.routes:
match, child_scope = route.matches(scope)
if match == Match.FULL:
scope.update(child_scope)
await route.handle(scope, receive, send)
return
elif match == Match.PARTIAL and partial is None:
partial = route
partial_scope = child_scope
if partial is not None:
scope.update(partial_scope)
await partial.handle(scope, receive, send)
return
if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
redirect_scope = dict(scope)
if scope["path"].endswith("/"):
redirect_scope["path"] = redirect_scope["path"].rstrip("/")
else:
redirect_scope["path"] = redirect_scope["path"] + "/"
for route in self.routes:
match, child_scope = route.matches(redirect_scope)
if match != Match.NONE:
redirect_url = URL(scope=redirect_scope)
response = RedirectResponse(url=str(redirect_url))
await response(scope, receive, send)
return
await self.default(scope, receive, send)
由于我们在start-fastapi 项目中,通过APIRouter 定义了这个路由的handler ,注册到了Starlette 中:
ROUTER = APIRouter()
@ROUTER.get('/api/v1/core/health')
def health_check():
return Resp.ok(message='ok')
那么/api/v1/core/health 就会被完整匹配,走到对应路由实例的handle 步骤:
class Route(BaseRoute):
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
if self.methods and scope["method"] not in self.methods:
if "app" in scope:
raise HTTPException(status_code=405)
else:
response = PlainTextResponse("Method Not Allowed", status_code=405)
await response(scope, receive, send)
else:
await self.app(scope, receive, send)
def request_response(func: typing.Callable) -> ASGIApp:
is_coroutine = iscoroutinefunction_or_partial(func)
async def app(scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive=receive, send=send)
if is_coroutine:
response = await func(request)
else:
response = await run_in_threadpool(func, request)
await response(scope, receive, send)
return app
def get_request_handler() -> Callable[[Request], Coroutine[Any, Any, Response]]:
raw_response = await run_endpoint_function(
dependant=dependant, values=values, is_coroutine=is_coroutine
)
async def run_endpoint_function(
*, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
) -> Any:
assert dependant.call is not None, "dependant.call must be a function"
if is_coroutine:
return await dependant.call(**values)
else:
return await run_in_threadpool(dependant.call, **values)
async def run_in_threadpool(
func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
loop = asyncio.get_event_loop()
if contextvars is not None:
child = functools.partial(func, *args, **kwargs)
context = contextvars.copy_context()
func = context.run
args = (child,)
elif kwargs:
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(None, func, *args)
由于我们对健康检查路由定义了GET 方法,那么这个路由就支持处理。最终来到了FastAPI 的run_endpoint_function 方法,调用我们定义的Controller 。由于我们是直接def health_check() ,因此会走到loop.run_in_executor 线程池方法,去执行Controller ,然后返回结果。否则如果是async def 定义的Controller 的话,就直接await 。
所以整个请求返回的链路就完成了,而且我们也会看到,针对需要耗时耗CPU 的请求,尽量不要用async def 定义FastAPI 的Controller ,否则会有阻塞整个asyncio 事件循环的风险,而用线程池处理就可以规避这种情况。
|