前言
本篇博文是通过python的django一系列出的,如果遇到一些知识盲区可以访问该系列下的其他博文,这次的使用会涉及到django视图函数CBV的一些知识,也可能需要postman接口测试工具,以及虚拟环境搭建。
关于虚拟环境的搭建、以及django视图函数CBV的讲解、postman接口测试 访问链接:https://blog.csdn.net/weixin_45859193/article/details/115408555
一、redis的安装以及配置
这里因为一般redis的安装都是直接确定确定的点的安装,所以如果需要安装的过程可以 访问链接:https://pythonav.com/wiki/detail/10/82/
那么在安装之前,安装过的通过redis-cli -v命令查看redis版本(版本必须大于5.0以上,不然配置channels_redis时可能会导致报错) 。
安装完成后我们新建一个django项目(python manage.py startapp 项目名称(这里用blog) )并且django的版本也需要3.0以上的版本。
此时我们的redis配置settings.py如下:
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {
"max_connections": 1000,
"encoding": 'utf-8'
},
"PASSWORD": "xxx"
}
}
}
那么自此redis的配置就完成了。
二、channels的安装配置、函数详解
安装channels很简单,只需要通过pip install channels 即可,不过要想channles可以使用redis,则需要在安装一个channels_redispip install channels_redis 。安装完成后,我们需要配置一下awsgi(如果有的话则不需要添加 )。
创建wsgi.py(settings.py同目录下 )如下:
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "根项目.settings.py")
application = get_wsgi_application()
之后挂载awsgi到settings.py如下:
ASGI_APPLICATION = '根目录.routing.application'
此时asgi已经配置完毕。
然后我们需要配置channels使其与redis连接settings.py如下:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'channels'
]
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": ["redis://127.0.0.1:6379/0"],
},
},
}
配置完成后我们需要测试一下,看看channels是否与redis关联起来了,此时打开控制台输入:
python manage.py shell
import channels.layers
channel_layer = channels.layers.get_channel_layer()
from asgiref.sync import async_to_sync
async_to_sync(channel_layer.send)('test_channel',{'type':'hello'})
async_to_sync(channel_layer.receive)('test_channel')
如果没有报错那么我们将可以继续进行下一步,如果报错了的可以查看一下是否为redis的版本太低,或redis配置未生效等其他原因。
在新建的项目中创建auth/auth.py用作中间件用户认证 操作如下:
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from api.models import UserInfo
@database_sync_to_async
def get_user(token):
user_object = UserInfo.objects.filter(token=token).first()
try:
if not user_object:
return AnonymousUser()
return user_object
except UserInfo.DoesNotExist:
return AnonymousUser()
class UserInfoAuthMiddleware:
"""
Token authorization middleware for Django Channels 2
"""
def __init__(self, inner):
self.inner = inner
async def __call__(self, scope, receive, send):
token = dict(scope['headers']).get(b'authorization', None)
if not token:
scope['user'] = AnonymousUser()
else:
scope['user'] = await get_user(token.decode())
return await self.inner(scope, receive, send)
此时中间件的处理已经完成了,这里的(@database_sync_to_async用于异步执行数据库操作所定义的函数上) 那么我们就可以写关键的用于实现点对点或者群聊的代码了。
在新建项目中创建视图函数consumer.py如下:
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from api import models
import json
class ChatConsumer(AsyncWebsocketConsumer):
"""处理通知应用中的WebSocket请求"""
async def connect(self):
self.user = self.scope['user']
self.room_name = self.user.id
self.room_group_name = 'chat_%s' % self.room_name
"""建立连接"""
if self.user == "AnonymousUser":
await self.close()
else:
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def receive(self, text_data):
"""将接收到的消息返回给前端"""
text_data_json = json.loads(text_data)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat.message',
'message': text_data_json
}
)
async def disconnect(self, code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def chat_message(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
class SendConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope['user']
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name
print(self.room_group_name)
"""建立连接"""
if self.user == "AnonymousUser":
await self.close()
else:
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.get_unread()
await self.accept()
async def receive(self, text_data):
"""将接收到的消息返回给前端"""
text_data_json = json.loads(text_data)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat.message',
'message': text_data_json
}
)
async def disconnect(self, code):
"""断开连接"""
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def chat_message(self, event):
message = event['message']
await self.get_news(message)
await self.send(text_data=json.dumps({
'message': message
}))
@database_sync_to_async
def get_news(self, msg):
"异步数据库操作"
pass
@database_sync_to_async
def get_unread(self):
"异步数据库操作"
pass
看到这里如果大家没看过官网或者其他相关博客的情况下应该会有点懵,不过没关系,我们现在通过一点一点的分析来告诉大家每个函数的意义以及使用方法(这里是先告诉使用的发法,具体实现思路等配置完成后讲述 )。
1.channels的配置、函数的使用
首页对于channels官网的讲述中能明白,是类似django的视图类的,且大多数的方法都是通过异步 去完成的。
如果你明白django的restframework框架的话,那么你就会明白该视图类(继承的AsyncWebsocketConsumer类下的父类AsyncConsumer类的dispatch函数,来实现类似重写as_view()方法 )的操作。
重写之后会将我们之前通常会使用的request替换为scope函数,并且通过请求获取到的数据也按照channles的设计规则获取 ,如果这里你明白的话那么应该能猜到后续在通过设置url的时候,视图类需要引入的方法了吧。
所以我们现在在之前创建的项目的基础上添加一个路由分发的功能routing.py如下:
from blog.consumer import ChatConsumer, SendConsumer
from django.conf.urls import url
websocket_urlpatterns = [
url(r'^chat-channel/(?P<room_name>\w+)/$', SendConsumer.as_asgi()),
url(r'^chat-channel$', ChatConsumer.as_asgi()),
]
可以看到我们设置了2个websocket路由,这里是我看官网实例也是这么做的,具体为什么要这么做我们后面会讲。
此时我们可以看到我们写的2个视图类被导入,且调用了as_asgi()方法(类似django视图类的as_view()方法 )。
一、视图函数下的connect函数
- connect函数:顾名思义就是访问路由后连接websocket时触发的函数
- close()函数:断开连接
- channel_layer.group_add()函数:创建一个聊天组,传入2个参数(
self.room_group_name(“chat”+获取到用户的id来实现组名唯一),self.channel_name(channels为我们定义的随机名字) ) - accept()函数则为成功后的回调(
如果没有执行到则会自动断开连接 )
二、视图函数下的receive函数
- receive函数:多用于
保存、接收 前端发来的消息,并(配合chat_message函数返回) ,需要传参数text_data(前端发送过来的参数 )。 - close()函数:断开连接
- channel_layer.group_send函数:向聊天组所有成员发送消息,传入2个参数(
type(“处理方法的函数chat_message”),message(json格式化的数据) )
三、视图函数下的chat_message函数
- chat_message函数:一般用于处理receive函数发过来的数据判断是否进行保存或者发送
- send()函数:将处理过的数据发送回前端
四、视图函数下的disconnect函数
- disconnect函数:用于当前端调用websocket断开时执行的操作
- channel_layer.group_discard()函数:用于将指定用户组断开,传入2个参数(
组名,self.channel_name )
好了,channels中我们要使用的函数的功能也讲完了,那么我们继续配置我们的channels搭建websocket吧。
创建routing.py并配置Websocket连接(settings.py同目录下 )如下:
from channels.routing import ProtocolTypeRouter, URLRouter
from blog.auth.auth import UserInfoAuthMiddleware
import blog.routing
application = ProtocolTypeRouter({
'websocket': UserInfoAuthMiddleware(
URLRouter(
blog.routing.websocket_urlpatterns
)
)
})
此时运行我们的django项目(python manage.py runserver 127.0.0.1:8000 ),成功后我们将会看到如图所示:
那么我们的通过channels配置的websocket服务器就搭建成功了。
三、通过channels实现异步点对点、群聊思路
结合我们的上述代码,对于中间件的功能是通过scope[“headers”](类似request.Meta )获取请求头上的authorization的token,用于获取当前用户是否登录,来获取用户信息,如果没有则返回django自带的AnonymousUser(匿名用户),有的话则返回用户的对象,并赋值给scope.user(类似request.user )。
那么中间件走完后我们前面留下的疑问,为什么定义2个路由呢?
这里我们先从我们自定义的第一个视图ChatConsumer类开始说起
ChatConsumer类如下:
class ChatConsumer(AsyncWebsocketConsumer):
"""处理通知应用中的WebSocket请求"""
async def connect(self):
self.user = self.scope['user']
self.room_name = self.user.id
self.room_group_name = 'chat_%s' % self.room_name
"""建立连接"""
if self.user == "AnonymousUser":
await self.close()
else:
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def receive(self, text_data):
"""将接收到的消息返回给前端"""
text_data_json = json.loads(text_data)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat.message',
'message': text_data_json
}
)
async def disconnect(self, code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def chat_message(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
从connect函数来说我们首先判断了用户是否登录,来限制了未登录或非法登录发来请求的操作,然后通过用户唯一的id来建立起了一个聊天组,之后的操作就是将前端发来的数据通过在相同组的形式进行广播通知,通知每一个连接了该组的成员。 所以我们简称这个视图类为(接收、发送类,用于在当前用户登录、上线后立马进行连接的操作 )。
那对于我们第二个自定义的视图函数SendConsumer类就很好理解了:
SendConsumer类如下:
class SendConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope['user']
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name
print(self.room_group_name)
"""建立连接"""
if self.user == "AnonymousUser":
await self.close()
else:
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.get_unread()
await self.accept()
async def receive(self, text_data):
"""将接收到的消息返回给前端"""
text_data_json = json.loads(text_data)
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat.message',
'message': text_data_json
}
)
async def disconnect(self, code):
"""断开连接"""
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def chat_message(self, event):
message = event['message']
await self.get_news(message)
await self.send(text_data=json.dumps({
'message': message
}))
@database_sync_to_async
def get_news(self, msg):
pass
@database_sync_to_async
def get_unread(self):
pass
为了让大家更明白我们把第二个路由分发的代码展示出来
url(r'^chat-channel/(?P<room_name>\w+)/$', SendConsumer.as_asgi()),
可以看到我们第二个路由明显需要在路由上添加一个名为room_name的字符串,而我们通过(self.scope[‘url_route’][‘kwargs’][‘room_name’]获取到的参数其实就是我们路由上定义的正则 ),而这个值就是我们在点对点聊天时,聊天对象的id,然后通过这个id去连接聊天对象id的组(如果聊天对象不在线的情况下可以添加到数据库中,等用户上线后可以提示未读信息、信息个数等操作 )。 所以我们简称这个视图类为(接收、保存类,用于在当前用户点击其他用户对象时开启,返回时关闭的接口,且必须要保证当前用户的第一个接口是否开启,如果没有则不能访问 )。
而这2个数据库操作就是为我们第二次连接时的交互操作
- 1.get_news()函数操作:用于聊天对象发送聊天信息时保存到数据库(
两者之前互相联系着 ),未读数可以通过当前这2位用户上一个聊天信息的值(如果是自己发的聊天信息可以不用处理,或者可以前端进行操作 ) - 2.get_unread()函数操作:用于在用户点击聊天用户后清除未读数的操作
那么通过这些操作配合前端就可以很轻松的实现点对点聊天的功能了。
点对点的功能完成了,那么群聊的功能就不难想了,也是这2个接口,然后我们多建立一个数据库表结构,记录创建群聊表结构的id,用户的id,然后其他用户通过连接该用户创建的群聊表的id来进入同一个组进行聊天,群主就也可以设置为创建表者了。
|