前言:
嗨咯铁汁们,很久不见,我还是你们的老朋友凡叔,这里也感谢各位小伙伴的点赞和关注,你们的三连是我最大的动力哈,我也不会辜负各位的期盼,这里呢给大家出了一个针对 WebSocket 协议的 Locust 压测脚本
?
Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也可以自行实现任意协议的 Client 对它 User 类进行继承(HttpUser 也是继承自 User)并增加所需要的方法,这样也就实现了任意协议的压测。
针对 WebSocket 协议的 Locust 压测脚本实现无非就是三个步骤
- 编写一个 WebSocket Client,也就是定义一个 Class,实现 WS连接初始化、事件订阅、消息接收 所需要的方法
- 使用 WebSocket Client 继承 User 类,产生 WebsocketUser
- 依据测试用例编写压测脚本,使用 WebsocketUser内预定义的方法 实现并发的连接、事件订阅、消息接收
脚本实现参考
from?locust?import?User,?task,?events,?constant
import?time
import?websocket
import?ssl
import?json
import?jsonpath
def?eventType_success(eventType,?recvText,?total_time):
????events.request_success.fire(request_type="[RECV]",
????????????????????????????????name=eventType,
????????????????????????????????response_time=total_time,
????????????????????????????????response_length=len(recvText))
class?WebSocketClient(object):
????
????_locust_environment?=?None
????def?__init__(self,?host):
????????self.host?=?host
????????#?针对?WSS?关闭?SSL?校验警报
????????self.ws?=?websocket.WebSocket(sslopt={"cert_reqs":?ssl.CERT_NONE})
????def?connect(self,?burl):
????????start_time?=?time.time()
????????try:
????????????self.conn?=?self.ws.connect(url=burl)
????????except?websocket.WebSocketConnectionClosedException?as?e:
????????????total_time?=?int((time.time()?-?start_time)?*?1000)
????????????events.request_failure.fire(
????????????????request_type="[Connect]",?name='Connection?is?already?closed',?response_time=total_time,?exception=e)
????????except?websocket.WebSocketTimeoutException?as?e:
????????????total_time?=?int((time.time()?-?start_time)?*?1000)
????????????events.request_failure.fire(
????????????????request_type="[Connect]",?name='TimeOut',?response_time=total_time,?exception=e)
????????else:
????????????total_time?=?int((time.time()?-?start_time)?*?1000)
????????????events.request_success.fire(
????????????????request_type="[Connect]",?name='WebSocket',?response_time=total_time,?response_length=0)
????????return?self.conn
????def?recv(self):
????????return?self.ws.recv()
????def?send(self,?msg):
????????self.ws.send(msg)
class?WebsocketUser(User):
????abstract?=?True
????def?__init__(self,?*args,?**kwargs):
????????super(WebsocketUser,?self).__init__(*args,?**kwargs)
????????self.client?=?WebSocketClient(self.host)
????????self.client._locust_environment?=?self.environment
class?ApiUser(WebsocketUser):
????host?=?"wss://ws.xxxxx.com/"
????wait_time?=?constant(0)
????@task(1)
????def?pft(self):
# wss 地址
????????self.url?=?'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
????????self.data?=?{}
????????self.client.connect(self.url)
????????
#?发送的订阅请求
????????sendMsg?=?'{"appid":"futures","cover":0,"event":[\
????????????{"type":"exchange_rate","toggle":1,"expireTime":86400},\
????????????{"type":"accountInfo_USDT","toggle":1,"expireTime":86400},\
????????????{"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
????????self.client.send(sendMsg)
????????
while?True:
????????????#?消息接收计时
????????????start_time?=?time.time()
????????????recv?=?self.client.recv()
????????????total_time?=?int((time.time()?-?start_time)?*?1000)
????????????
#?为每个推送过来的事件进行归类和独立计算性能指标
????????????try:
????????????????recv_j?=?json.loads(recv)
????????????????eventType_s?=?jsonpath.jsonpath(recv_j,?expr='$.eventType')
????????????????eventType_success(eventType_s[0],?recv,?total_time)
????????????except?websocket.WebSocketConnectionClosedException?as?e:
????????????????events.request_failure.fire(request_type="[ERROR]?WebSocketConnectionClosedException",
????????????????????????????????????????????name='Connection?is?already?closed.',
????????????????????????????????????????????response_time=total_time,
????????????????????????????????????????????exception=e)
????????????except:
????????????????print(recv)
????????????????#?正常?OK?响应,或者其它心跳响应加入进来避免当作异常处理
????????????????if?'ok'?in?recv:
????????????????????eventType_success('ok',?'ok',?total_time)
class?WebSocketClient
- 实现了 WebSocket 的所有行为方法,包括连接初始化、消息发送(订阅)、消息接收
- 对连接过程中的异常进行捕获统计,记录异常响应的时间,便于后续测试分析
- 这段脚本基本拷贝就能用:)
class?WebsocketUser
- 继承 Locust 的 user 成为 WebsocketUser
class ApiUser
- 在这里加载 WebsocketUser,初始化的 user,发送订阅请求、并在一个死循环内接收消息推送内容
- 对接收的消息内容(json格式)进行解析,最终可以在 WEB UI 看到各种推送事件的推送统计
- 对接收推送过程中的异常进行捕获,记录异常响应的时间,便于后续测试分析
也可以在死循环内加入心跳发送,但建议建议按照规则发送,避免过于频繁,此处略
压测过程
分类:?Locust 系列教程
?
|