官文:
https://channels.readthedocs.io/en/latest/tutorial/part_3.html
安装:
pip install channel, channel-redis
配置
# settings.py中添加
INSTALLED_APPS = [
...,
'channels',
]
ASGI_APPLICATION = '项目名.asgi.application'
注:推荐使用版本大于django3.0(配置)
asgi.py中添加
import os
from channels.routing import ProtocolTypeRouter
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
# Just HTTP for now. (We can add other protocols later.)
})
django2.2配置(因为没asgi文件,自建一个asgi.py文件)
import os
import django
from channels.http import AsgiHandler
from channels.routing import ProtocolTypeRouter
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings')
django.setup()
application = ProtocolTypeRouter({
"http": AsgiHandler(),
# Just HTTP for now. (We can add other protocols later.)
})
启动服务,展示下面日志便启动成功(ASGI)
此时已经配置成功
单向通讯(实操)
单向通讯不用redis即可实现
在自己项目下新建consumers.py
# front_end/consumer.py
import json
from channels.generic.websocket import WebsocketConsumer
class ChatConsumer(WebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, close_code):
pass
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
self.send(text_data=json.dumps({
'message': message
}))
在自己项目下新建routing.py
from django.urls import path
from front_end import consumers
websocket_urlpatterns = [
path(r'ws/chat/', consumers.ChatConsumer.as_asgi()),
]
asgi.py中注册url
2.2版本
import os
import django
from channels.http import AsgiHandler
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from front_end import routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings')
django.setup()
application = ProtocolTypeRouter({
"http": AsgiHandler(),
# Just HTTP for now. (We can add other protocols later.)
"websocket": AllowedHostsOriginValidator( # 新增部分
AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
)
),
})
3.0及以上
# 项目名/asgi.py
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "项目名.settings")
django_asgi_app = get_asgi_application()
import chat.routing
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
)
),
})
websocket在线测试 http://www.websocket-test.com/
ws://127.0.0.1:8000/ws/chat/
发送: {"message": "测试发送一下"}
双向通道(多人聊天配置)
settings.py中配置redis
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)], # 不带密码
注: 不带密码在未关掉redis安全保护的情况不能访问公网redis
(redis.conf注释掉bind 127.0.0.1, 找到 protected-mode yes 把 yes 改为 no)
# "hosts": ["redis://:密码@IP:端口/1"], 带密码配置
},
},
}
|