参考:https://github.com/pyropy/fastapi-socketio
一、app中启用socketio
如下,一般fastapi项目会这样创建一个应用
from fastapi import FastAPI
app = FastAPI()
那么如果在项目中需要用到socketio服务时,可以通过以下方式来实现绑定;当然也可以把socketio的服务单独起一个项目。
# 初始化socketio
sio = socketio.AsyncServer(async_mode='asgi')
# app绑定socketio
app.mount('/', socketio.ASGIApp(socketio_server=sio)) # 使用默认的socket path
# 默认的socketio_path是socket.io,通过http://localhost/socket.io就可以访问socketio服务,也可以自定义修改,例如改为ws
# sio中的事件函数可以指定namespace,前端通过路由来进入不同的namespace,例如
# sio.on('connect', connect, namespace='/chat')
# 那么前端的调用接口应该是:http://localhost/socket.io/chat
二、简单示例
在fastapi项目中建立一个init_socketio目录,结构如下,chat_room.html和socket.io-client-4.1.2可以在python socketio 实现(极)简单聊天室中拿到
├── __init__.py
├── chat_room.html
└── static
└── socket.io-client-4.1.2
?__init__.py
import os
import socketio
from urllib import parse
from bidict import bidict
from pydantic import BaseModel, Field
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi import APIRouter
from fastapi.staticfiles import StaticFiles
user_sid = bidict()
pwd_path = os.path.dirname(os.path.abspath(__file__))
class Login(BaseModel):
user_name: str = Field(..., description='1')
router = APIRouter()
@router.get('/chat_room', summary='聊天室界面')
async def chat_room():
chat_room = os.path.join(pwd_path, 'chat_room.html')
with open(chat_room) as f:
return HTMLResponse(f.read())
@router.post('/login', summary='登录接口')
async def login(item: Login):
user_name = item.user_name
JSONResponse.media_type = 'application/json; charset=utf-8'
if user_sid.get(user_name, None):
return JSONResponse({'code': 'fail'}, status_code=400)
else:
return JSONResponse({'code': 'success'}, status_code=200)
def init_socketio(app):
# 事件函数
async def connect(sid, environ):
query_params = environ['QUERY_STRING'].split('&')
params = dict()
for query_param in query_params:
a, b = query_param.split('=')
params[a] = parse.unquote(b)
user_name = params['name']
user_sid[user_name] = sid
await sio.emit('reply', f'{user_name}连线成功!', namespace='/chat')
async def message(sid, data):
await sio.emit('reply', f"{user_sid.inv[sid]}: {data}", namespace='/chat')
async def disconnect(sid):
user_name = user_sid.inv[sid]
del user_sid[user_name]
await sio.emit('reply', f'{user_name}退出连线!', namespace='/chat')
# 静态文件
app.mount("/static", StaticFiles(directory=f"{pwd_path}/static"), name="static")
# 初始化socketio
sio = socketio.AsyncServer(async_mode='asgi')
# 绑定事件函数
sio.on('disconnect', disconnect, namespace='/chat')
sio.on('chat message', message, namespace='/chat')
sio.on('connect', connect, namespace='/chat')
# app绑定socketio
app.mount('/', socketio.ASGIApp(socketio_server=sio)) # 使用默认的socket path
然后在项目的启动文件中添加
from fastapi import FastAPI
from init_socketio import router, init_socketio
app = FastAPI()
async def startup_event():
"""项目初始化"""
app.include_router(router)
init_socketio(app)
启动后,打开http://localhost:8080/chat_room即可测试
?
|