IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> [python]-asyncio异步通信之websockets -> 正文阅读

[Python知识库][python]-asyncio异步通信之websockets


asyncio是用来编写并发代码的库,使用async/await语法;其被用作高性能异步框架的基础(包括网络和网站服务,数据库连接库,分布式任务队列等等)。

asyncio

asyncio提供一组高层级API用于:

  • 并发地运行Python协程并对其执行过程实现完全控制;
  • 执行网络IO和IPC;
  • 控制子进程;
  • 通过队列实现分布式任务;
  • 同步并发代码;

Eventloop

Eventloop实例提供了注册、取消和执行任务和回调的方法;是asyncio应用的核心,是中央总控。

把一些异步函数(任务,Task)注册到Eventloop上,Eventloop会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行。

Coroutine

协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:

import asyncio

async def a():
    print('Suspending a')
    await asyncio.sleep(0.1)
    print('Resuming a')

async def b():
    print('In b')

async def main():
    await asyncio.gather(a(), b())

if __name__ == '__main__':
    asyncio.run(main())

其上有以下关键点:

  • 协程用async def声明;
  • asyncio.gather用来并发运行任务,在这里表示协同的执行a和b两个协程;
  • await表示调用协程;
  • asyncio.run是执行EventLoop;Python 3.7前需要:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Future

Future是对协程的封装,异步操作结束后会把最终结果设置到这个Future对象上。可以对这个Future实例:

  • 添加完成后的回调(add_done_callback)
  • 取消任务(cancel)
  • 设置最终结果(set_result)
  • 设置异常(set_exception)

await myFuture等待future执行完成。

示例

要保证多个任务并发,需要使用gather或create_task(创建任务)方式来执行;否则可能达不到想要的结果:

import asyncio
import time

# 在spyder中,因spyder本身就在EventLoop中执行,需要添加以下引用才能正常执行(pyCharm等中不需要)
#import nest_asyncio
#nest_asyncio.apply()

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')

async def b():
    print('Suspending b')
    await asyncio.sleep(2)
    print('Resuming b')
    
async def runEach():
    await a()
    await b()
    
async def runGather():
    await asyncio.gather(a(), b())
    
async def runTask():
    t1 = asyncio.create_task(a())
    t2 = asyncio.create_task(b())
    await t1
    await t2
    
async def runDirect():
    await asyncio.create_task(a())
    await asyncio.create_task(b())
    
async def runCombine():
    tb = asyncio.create_task(b())
    await a()
    await tb

    
def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')

if __name__ == '__main__':
    show_perf(runEach)		# 5
    show_perf(runGather)	# 3
    show_perf(runTask)		# 3
    show_perf(runDirect)	# 5
    show_perf(runCombine)	# 3

如上,不能直接await协程,也不能直接await create_task(要wait其结果,返回的task);否则无法并行执行。

websockets

WebSocket是基于TCP的应用层协议,实现了浏览器与服务器全双工(full-duplex)通信。其本质是保持TCP连接,在浏览器和服务端通过Socket进行通信。

websockets是基于asyncio的简单、高效的websocket库,使用时需要先安装库:

pip install websockets
# 以上方式默认安装到python目录中,在anaconda中无法使用;要使用需要指定环境(webStudy)的目录
pip install websockets -t C:\Users\gdxu\Anaconda3\envs\webStudy\Lib\site-packages

若要同步收发消息(发送然后等待应答):

import asyncio
from websockets import connect

async def hello(uri):
    async with connect(uri) as websocket:
        await websocket.send("Hello world!")
        reply = await websocket.recv()
        return reply

操作类

基于websockets中发送与接收接口:

import json
import logging
from websockets import connect


class Handler:
    def __init__(self, loop=None):
        self.ws = None
        self.loop = loop

    async def async_connect(self, url):
        logging.info("attempting connection to {}".format(url))
        # perform async connect, and store the connected WebSocketClientProtocol
        # object, for later reuse for send & recv
        self.ws = await connect(url)
        logging.info("connected")

    def sendJsonObj(self, cmd):
        return self.loop.run_until_complete(self.async_sendJsonObj(cmd))

    async def async_sendJsonObj(self, cmd):
        return await self.ws.send(json.dumps(cmd))

    def sendByte(self, cmd):
        return self.loop.run_until_complete(self.async_sendByte(cmd))

    async def async_sendByte(self, cmd):
        return await self.ws.send(cmd)

    def close(self):
        self.loop.run_until_complete(self.async_close())
        logging.info('closed')

    async def async_close(self):
        await self.ws.close(reason="user quit")
        self.ws = None

    def toRecv(self):
        self.loop.run_until_complete(self.async_recv())

    async def async_recv(self, callback=None):
        # i = 1
        logging.info('async_recv begin')
        while True:
            # logging.info('to recv:')
            reply = await self.ws.recv()
            if callback:
                callback(reply)
            # logging.info("{}- recv: {}".format(i, reply))
            # i += 1

        logging.info('async_recv end')

使用

使用以上类异步收发数据:

import struct
import json
import asyncio
import logging
from wsHandler import Handler
from logging_config import init_logging

# import nest_asyncio
# nest_asyncio.apply()

remoteUri = 'wss://ws.test.com:2443/'
login = {'role': 'admin', 'user_id': 'test1234'}
toFree = struct.pack('=ib', 0, 0)
setService = struct.pack('=i', 2) + json.dumps(["first", "test"]).encode('utf-8')


async def sendCmd(handle):
    await handle.async_sendJsonObj(login)
    await asyncio.sleep(0.5)
    await handle.async_sendByte(setService)
    await asyncio.sleep(0.5)
    await handle.async_sendByte(toFree)


def handleReply(reply):
    if len(reply) >= 4:
        resp = struct.unpack('=i', reply[:4])
        logging.info("reply: {}".format(resp))


async def recvCmd(handle):
    await handle.async_recv(handleReply)


if __name__ == '__main__':
    init_logging()

    handler = Handler()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(handler.async_connect(remoteUri))

    tasks = asyncio.gather(sendCmd(handler), recvCmd(handler))
    loop.run_until_complete(tasks)

    loop.run_until_complete(handler.async_close())
    logging.info('quit')
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-11-22 12:17:56  更:2021-11-22 12:19:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/16 0:41:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码