Locust 是一个易于使用、可编写脚本和可扩展的性能测试工具。官方文档
特点
- 用普通的老式 Python 编写测试场景
- Locust 在自己的 greenlet (一个轻量级进程/协程)中运行每个用户。这使您能够像编写普通(阻塞式) Python 代码一样编写测试,而不必使用回调或其他机制。
- 分布式和可扩展——支持成千上万的并发用户
- Locust 使得分布在多台机器上的负载测试很容易运行。它是基于事件的(使用 gevent) ,这使得单个进程可以处理数千个并发用户。虽然可能有其他工具能够在给定的硬件上每秒执行更多请求,但是 Locust 用户的低开销使得它非常适合测试高并发的工作负载。
- 基于 web 的用户界面
- Locust 有一个用户友好的网络界面,显示了实时测试的进展。您甚至可以在测试运行时更改负载。它也可以在没有用户界面的情况下运行,这使得它很容易用于 CI/CD 测试。
- 可以测试任何系统
- 尽管 Locust 主要使用 web 站点/服务,但它可以用来测试几乎任何系统或协议。只需要为你想要测试的东西写一个客户端,或者探索一些由社区创建的东西。
- Hackable
安装 & 验证
$ pip3 install locust
$ locust -V
Demo
-
这个用户将一次又一次地向/hello 和/world 发出 HTTP 请求。 -
编写 locustfile.py from locust import HttpUser, task
class HelloWorldUser(HttpUser):
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
-
启动 locust
$ locust
[2021-07-24 09:58:46,215] .../INFO/locust.main: Starting web interface at http://*:8089
[2021-07-24 09:58:46,285] .../INFO/locust.main: Starting Locust 2.8.6
-
访问 UI 界面【可选】
-
只使用命令行输出测试报告,不启用 UI 界面 $ locust --headless --users 10 --spawn-rate 1 -H http://your-server.com
[2021-07-24 10:41:10,947] .../INFO/locust.main: No run time limit set, use CTRL+C to interrupt.
[2021-07-24 10:41:10,947] .../INFO/locust.main: Starting Locust 2.8.6
[2021-07-24 10:41:10,949] .../INFO/locust.runners: Ramping to 10 users using a 1.00 spawn rate
Name
----------------------------------------------------------------------------------------------
GET /hello 1 0(0.00%) | 115 115 115 115 | 0.00 0.00
GET /world 1 0(0.00%) | 119 119 119 119 | 0.00 0.00
----------------------------------------------------------------------------------------------
Aggregated 2 0(0.00%) | 117 115 119 117 | 0.00 0.00
[2021-07-24 10:44:42,484] .../INFO/locust.runners: All users spawned: {"HelloWorldUser": 10} (10 total users)
-
参数 -u -r
- -u 3 指定高峰期用户数量,-r 1 指定这 3 个用户是按每秒 1 个的速率增长上来的
$ locust -u 3 -r 1 --headless
[2022-04-17 18:23:47,604] 192.168.1.5/INFO/locust.runners: Ramping to 3 users at a rate of 1.00 per second
[2022-04-17 18:23:47,731] 192.168.1.5/INFO/root: id(User instance): 140205318976400, duration: 125.33854100000002, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
...
[2022-04-17 18:23:48,622] 192.168.1.5/INFO/root: id(User instance): 140205318976400, duration: 33.88720899999997, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:23:48,645] 192.168.1.5/INFO/root: id(User instance): 140205319277392, duration: 40.38904100000007, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:23:48,655] 192.168.1.5/INFO/root: id(User instance): 140205318976400, duration: 32.91200000000005, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:23:48,681] 192.168.1.5/INFO/root: id(User instance): 140205319277392, duration: 35.22329199999996, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
...
[2022-04-17 18:23:49,605] 192.168.1.5/INFO/locust.runners: All users spawned: {"RequestMilvusUser": 3} (3 total users)
[2022-04-17 18:23:49,631] 192.168.1.5/INFO/root: id(User instance): 140205318976400, duration: 36.88016599999999, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:23:49,635] 192.168.1.5/INFO/root: id(User instance): 140205319277392, duration: 33.87429200000014, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:23:49,642] 192.168.1.5/INFO/root: id(User instance): 140205318975696, duration: 37.08516600000023, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:23:49,672] 192.168.1.5/INFO/root: id(User instance): 140205318976400, duration: 41.167166000000144, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
- -u 3 指定高峰期用户数量,-r 3 指定这 3 个用户是按每秒 3 个的速率增长上来的
$ locust -u 3 -r 3 --headless
[2022-04-17 18:15:17,621] 192.168.1.5/INFO/locust.runners: Ramping to 3 users at a rate of 3.00 per second
[2022-04-17 18:15:17,621] 192.168.1.5/INFO/locust.runners: All users spawned: {"RequestMilvusUser": 3} (3 total users)
[2022-04-17 18:15:17,706] 192.168.1.5/INFO/root: id(User instance): 140272847696080, duration: 83.56029100000006, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:15:17,707] 192.168.1.5/INFO/root: id(User instance): 140272847696272, duration: 82.622417, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:15:17,711] 192.168.1.5/INFO/root: id(User instance): 140272847696784, duration: 86.21625000000004, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:15:17,740] 192.168.1.5/INFO/root: id(User instance): 140272847696080, duration: 34.20625, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:15:17,745] 192.168.1.5/INFO/root: id(User instance): 140272847696272, duration: 38.10945799999998, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
[2022-04-17 18:15:17,750] 192.168.1.5/INFO/root: id(User instance): 140272847696784, duration: 38.60574999999999, resp: (Status(code=0, message='Add vectors successfully!'), [1514942100066791424])
编写 locustfile
-
Demo import time
from locust import HttpUser, task, between
class QuickstartUser(HttpUser):
wait_time = between(1, 5)
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f"/item?id={item_id}", name="/item")
time.sleep(1)
def on_start(self):
self.client.post("/login", json={"username":"foo", "password":"bar"})
-
QuickstartUser
- QuickstartUser 是我们定义的一个类,表示将要模拟的用户,它继承自 HttpUser。一个有效的 locustfile 必须至少包含一个从 User 继承的类。
- HttpUser 为每个用户提供一个 self.client 属性,这是 HttpSession 的一个实例,可用于向目标系统发出 HTTP 请求。
- 当测试启动时,locust 将为它模拟的每个用户创建 QuickstartUser 的实例,并且每个用户将在自己的绿色 gevent 线程中开始运行。
-
wait_time
- wait_time = between(1, 5) 表示模拟的用户在每个任务执行后需要等待1到5秒钟。
-
@task
- @task 装饰的方法是 locustfile 的核心。Locust 为每个正在运行的用户创建一个 greenlet (微线程) ,greenlet 只会调用这些被 @task 装饰的方法。
- 我们通过将两个方法装饰成@task 来声明两个任务,其中一个方法的权重更高(3)。 当我们的 QuickstartUser 运行时,它会选择一个已声明的任务—— hello_world 或 view_items ——并执行它。任务是随机挑选的,但是你可以给它们不同的权重。 上面的配置将使 Locust 选择 view_items 的可能性是 hello world 的三倍。
- 当任务完成执行后,User 将在其 wait_time 的等待时间内休眠(在本例中为1到5秒)。在等待时间过后,它会选择一个新的任务并不断重复。
-
self.client
- self.client 属性用于发送 HTTP 请求,这些 HTTP 请求会被 Locust 记录下来并生成测试报告。
- HttpUser 不是真正的浏览器,因此不会解析 HTML 响应来加载资源或呈现页面。不过,它会跟踪 cookie。
-
name=“/item”
- 在 view_items 任务中,我们使用一个可变的查询参数来加载10个不同的 url。为了不在 Locust 的统计数据中得到10个独立的条目——因为统计数据是在 URL 上分组的——我们使用 name 参数将所有这些请求分组到一个名为"/item"的条目下。
-
on_startup
- 我们还声明了一个 on_start 方法。当每个模拟用户启动时,将调用具有此名称的方法。
-
User 类 & 类属性
-
Tasks
-
Events
-
HttpUser 类
-
连接池
-
当每个 HttpUser 创建新的 HttpSession 时,每个用户实例都有自己的连接池。这类似于真实用户与 web 服务器的交互方式。 -
但是,如果希望在所有用户之间共享连接,可以使用单个池管理器。为此,将 pool_manager 类属性设置为 urllib3.PoolManager 的实例。 from locust import HttpUser
from urllib3 import PoolManager
class MyUser(HttpUser):
pool_manager = PoolManager(maxsize=10, block=True)
-
有关更多的配置选项,请参考 urllib3 documentation. -
TaskSets
- Taskset 是一种组织层次化网站/系统测试的方法。你可以在 这里 了解更多。
-
如何组织测试代码
-
记住这一点很重要,locustfile.py 只是 Locust 导入的一个普通的 Python 模块。从这个模块中,您可以像在任何 Python 程序中一样自由地导入其他 Python 代码。当前的工作目录文件会自动添加到 python 的 sys.path 中,因此任何位于 python 工作目录中的 python 文件/模块/包都可以使用 python import 语句导入。 -
对于小型测试,将所有测试代码保存在一个 locustfile.py 中应该可以,但是对于较大的测试套件,您可能希望将代码分割成多个文件和目录。 -
如何构造测试源代码当然完全取决于您,但是我们建议您遵循 Python 最佳实践。 -
单个 locustfile 示例 Project root
common/
__init__.py
auth.py
config.py
locustfile.py
requirements.txt
-
多个 locustfile 示例 Project root
common/
__init__.py
auth.py
config.py
my_locustfiles/
api.py
website.py
requirements.txt
配置
分布式生成负载
-
运行 Locust 的单个进程可以模拟相当高的吞吐量。对于一个简单的测试计划,它应该能够每秒发出数百个请求,如果使用 FastHttpUser,则可以发出数千个请求。 -
但是,如果您的测试计划很复杂,或者您希望运行更多的负载,那么您将需要扩展到多个进程,甚至是多台机器。 -
为此,您在主节点上使用 --master 标志启动 Locust 的一个实例,并使用 --worker 标志启动多个工人实例。如果这些 worker 与您使用的主机不在同一台计算机上,使用 --master-host 将他们指向 master 节点的主机 IP/hostname。 -
master 实例会运行 Locust 的 web 界面,并告诉 worker 何时产生/停止用户。worker 运行您的用户并将统计数据发送回去。master 实例本身不运行任何 Users。 -
master 和 worker 在分布式运行 Locust 时,都必须有 locustfile 的副本。 -
因为 Python 不能完全利用每个进程多于一个核心(请参见 GIL ) ,所以通常应该在每个处理器核心上运行一个 worker 实例,以便充分利用它们的计算能力。 -
每个 worker 可以运行多少个用户几乎没有限制。Locust/gevent 可以在每个进程中运行数千甚至数万个用户,只要他们的总请求率/RPS 不是太高。 -
如果 Locust 即将耗尽 CPU 资源,它将记录一个警告。 -
Demo
$ locust -f my_locustfile.py --master
$ locust -f my_locustfile.py --worker --master-host=192.168.0.14
-
相关选项参数
- –master
- –worker
- –master-host=X.X.X.X
- –master-port=5557
- –master-bind-host=X.X.X.X
- –master-bind-port=5557
- –expect-workers=X
-
跨节点通信
-
在分布式模式下运行 Locust 时,您可能希望在主节点和工作节点之间进行通信以协调数据。通过使用内置的消息挂钩,可以很容易地实现自定义消息: from locust import events
from locust.runners import MasterRunner, WorkerRunner
def setup_test_users(environment, msg, **kwargs):
for user in msg.data:
print(f"User {user['name']} received")
environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")
def on_acknowledge(msg, **kwargs):
print(msg.data)
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message('test_users', setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)
@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
users = [
{"name": "User1"},
{"name": "User2"},
{"name": "User3"},
]
environment.runner.send_message('test_users', users)
-
请注意,在本地运行(即非分布式)时,这个功能将得到保留; 消息将由发送它们的同一个运行程序简单地处理。 -
在 Locust 源代码的 examples 目录 中可以找到一个更完整的示例。
在调试器中运行测试
-
在调试器中运行 Locust 在开发测试时非常有用。除此之外,你可以检查一个特定的回复或者检查一些用户/实例变量。 -
但是调试器有时会遇到类似 Locust 这样的复杂 gevent 应用程序的问题,而且框架本身还有许多您可能不感兴趣的内容。为了简化这个问题,Locust 提供了一个名为 run_single_user 的方法。请注意,这是一个相当新的特性,api 可能会发生变化。 from locust import HttpUser, task, run_single_user
class QuickstartUser(HttpUser):
host = "http://localhost"
@task
def hello_world(self):
with self.client.get("/hello", catch_response=True) as resp:
pass
if __name__ == "__main__":
run_single_user(QuickstartUser)
-
它隐式地为请求事件注册一个事件处理程序,以打印关于每个请求的一些统计信息: type name resp_ms exception
GET /hello 38 ConnectionRefusedError(61, 'Connection refused')
GET /hello 4 ConnectionRefusedError(61, 'Connection refused')
-
您可以通过指定参数来为 run_single_ user 配置打印的内容。
-
确保在调试器设置中启用了 gevent。 -
VS Code launch.json 示例 {
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"gevent": true
}
]
}
-
在 PyCharm 中也有类似的设置 -
VS Code/pydev 可能会警告您:sys.settrace() should not be used when the debugger is being used,它可以安全地被忽略 -
debugging_advanced 示例
在 Docker 中运行
-
官方的 Docker 镜像位于 locustio/locust 。 -
像这样使用它(假设locustfile. py 存在于当前的工作目录文件中) : docker run -p 8089:8089 -v $PWD:/mnt/locust locustio/locust -f /mnt/locust/locustfile.py
-
下面是一个 Docker Compose 文件的例子,它可以用来同时启动主节点和工作节点: version: '3'
services:
master:
image: locustio/locust
ports:
- "8089:8089"
volumes:
- ./:/mnt/locust
command: -f /mnt/locust/locustfile.py --master -H http://master:8089
worker:
image: locustio/locust
volumes:
- ./:/mnt/locust
command: -f /mnt/locust/locustfile.py --worker --master-host master
-
上面的组合配置可以用以下命令启动一个主节点和4个工人: $ docker-compose up --scale worker=4
-
使用 docker 镜像作为基础镜像 $ FROM locustio/locust
$ RUN pip3 install some-python-package
-
在 Kubernetes 上运行分布式负载测试
- 在 Kubernetes 运行 Locust 最简单的方法是使用 Helm chart
- 示例:github.com/deliveryhero/helm-charts
使用 Terraform/AWS 运行分布式负载测试
不使用 web UI 运行
-
你可以在没有 web UI 的情况下运行 locust ——例如,如果你想在一些自动化的流程中运行它,比如一个 CI 服务器——通过使用 --headless 标志和-u 和-r:-u 指定要生成的用户数,-r 指定生成率(每秒启用的用户数)。 $ locust -f locust_files/my_locust_file.py --headless -u 1000 -r 100
-
当测试运行时,您可以手动更改用户数,即使在坡道启动完成之后也是如此。按 w 添加1个用户,按 W 添加10个用户。按 s 键删除1或 S 键删除10。 -
为测试设置时间限制:使用 --run-time 或-t。一旦时间到,Locust 就会关闭。 $ locust -f --headless -u 1000 -r 100 --run-time 1h30m
-
允许任务在关闭时完成迭代:默认情况下,Locust 会立即停止你的任务(甚至不等待请求完成)。如果您想让任务完成它们的迭代,可以使用 --stop-timeout 。 $ locust -f --headless -u 1000 -r 100 --run-time 1h30m --stop-timeout 99
-
在没有 web UI 的情况下分布式运行 Locust:启动主节点时应该指定 --expect-workers 选项,以指定预期连接的工作节点数。然后,它将等待直到足够的工作节点已经连接后才开始测试。 -
控制 Locust 进程的退出代码:在 CI 环境中运行 Locust 时,您可能需要控制 Locust 进程的退出代码。您可以在测试脚本中通过设置 Environment 实例的 process_exit_code 来实现这一点。
-
下面是一个例子,如果符合以下任何一个条件,就可以将退出代码设置为非零:
import logging
from locust import events
@events.quitting.add_listener
def _(environment, **kw):
if environment.stats.total.fail_ratio > 0.01:
logging.error("Test failed due to failure ratio > 1%")
environment.process_exit_code = 1
elif environment.stats.total.avg_response_time > 200:
logging.error("Test failed due to average response time ratio > 200 ms")
environment.process_exit_code = 1
elif environment.stats.total.get_response_time_percentile(0.95) > 800:
logging.error("Test failed due to 95th percentile response time > 800 ms")
environment.process_exit_code = 1
else:
environment.process_exit_code = 0
- 超过1% 的请求失败
- 平均响应时间超过200毫秒
- 响应时间的第95百分位数大于800毫秒
自定义负载曲线
- 有时需要一个完全自定义的负载测试,这不能通过简单地设置或更改用户计数和刷新率来实现。例如,您可能希望在自定义时间生成一个负载尖峰或斜坡。通过使用 LoadTestShape 类,您可以在任何时候完全控制用户计数和产生速率。
- 在 locust 文件中定义一个继承 LoadTestShape 的类。如果 Locust 发现了这个类,将自动使用它。
- 在这个类中,您定义了一个 tick()方法,该方法返回一个元组,其中包含所需的用户数和产生速率(或 None 用于停止测试)。Locust 大约每秒调用一次 tick()方法。
- 在这个类中,您还可以访问 get_run_time()方法,以检查测试运行了多长时间。
- Demo:MyCustomShape 将按照 100 个用户每块的方式增加用户数,然后在10分钟后停止负载测试:
class MyCustomShape(LoadTestShape):
time_limit = 600
spawn_rate = 20
def tick(self):
run_time = self.get_run_time()
if run_time < self.time_limit:
user_count = round(run_time, -2)
return (user_count, spawn_rate)
return None
- Github 上的例子 进一步演示了这一功能,包括:
- Generating a double wave shape
- Time based stages like K6
- Step load pattern like Visual Studio
- get_current_user_count() 返回活动用户的总数。此方法可用于防止在达到所需用户数之前进行后续步骤。如果每个用户的初始化过程在时间上很慢或不稳定,这就特别有用。 示例
CSV 格式保存测试统计信息
-
你可能希望通过 CSV 文件消费 Locust 的测试结果。在这种情况下,有两种方法可以做到这一点。 -
首先,在使用 web UI 运行 Locust 时,可以在 Download Data 选项卡下检索 CSV 文件。 -
其次,你可以运行 Locust 与一个标志,将定期保存三个 CSV 文件。 $ locust -f examples/basic.py --csv=example --headless -t10m
-
这些文件将被命名为 example_stats.csv、example_failures.csv、example_history.csv (在使用 --csv=example 时)。 -
前两个文件将包含整个测试运行的统计数据和失败数据,每个统计数据条目(URL 端点)和一个聚合行。 -
example_history.csv 将在整个测试运行过程中添加当前(10秒滑动窗口)状态来获取新行。 -
默认情况下,只有聚合行定期添加到历史统计数据中,但是如果 Locust 以 --csv-full-history 标志开始,每次写入统计数据时(默认情况下每2秒钟一次)就会为每个统计数据条目(和聚合)添加一行。 -
如果你想写得更快(或者更慢) ,你也可以自定义写入的频率: import locust.stats
locust.stats.CSV_STATS_INTERVAL_SEC = 5
locust.stats.CSV_STATS_FLUSH_INTERVAL_SEC = 60
测试非 HTTP 系统
- Locust 只提供对 HTTP/HTTPS 的内置支持,但是它可以扩展到几乎任何系统。这通常是通过包装协议库并在每次调用完成后触发一个 request 事件来完成的,以便 Locust 知道发生了什么。
- 重要的是,您使用的协议库可以被 gevent 进行 monkey-patch 。
- 几乎所有纯 Python 的库(使用 Python socket 模块或其他标准库函数,如 subprocess)都可以开箱即用——但如果它们在 C gevent 中执行 i/o 调用,则无法对其进行补丁。这将阻塞整个 Locust/Python 进程(实际上限制您每个 worker 进程只运行一个 User)
- 有些 C 库允许使用其他变通方法。例如,如果希望使用 psycopg2对 PostgreSQL 进行性能测试,可以使用 psycogreen 。如果您愿意亲自动手,那么您也可以自己对库进行补丁,但这超出了本文档的范围。
- 示例: 编写 XML-RPC User/client
-
假设我们有一个 XML-RPC 服务器,我们想要进行负载测试 import random
import time
from xmlrpc.server import SimpleXMLRPCServer
def get_time():
time.sleep(random.random())
return time.time()
def get_random_number(low, high):
time.sleep(random.random())
return random.randint(low, high)
server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()
-
我们可以通过包装 xmlrpc.client.ServerProxy 来构建一个通用的 XML-RPC client import time
from xmlrpc.client import ServerProxy, Fault
from locust import User, task
class XmlRpcClient(ServerProxy):
"""
XmlRpcClient is a wrapper around the standard library's ServerProxy.
It proxies any function calls and fires the *request* event when they finish,
so that the calls get recorded in Locust.
"""
def __init__(self, host, request_event):
super().__init__(host)
self._request_event = request_event
def __getattr__(self, name):
func = ServerProxy.__getattr__(self, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "xmlrpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"response": None,
"context": {},
"exception": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
except Fault as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self._request_event.fire(**request_meta)
return request_meta["response"]
return wrapper
class XmlRpcUser(User):
"""
A minimal Locust user class that provides an XmlRpcClient to its subclasses
"""
abstract = True
def __init__(self, environment):
super().__init__(environment)
self.client = XmlRpcClient(self.host, request_event=environment.events.request)
class MyUser(XmlRpcUser):
host = "http://127.0.0.1:8877/"
@task
def get_time(self):
self.client.get_time()
@task
def get_random_number(self):
self.client.get_random_number(0, 100)
- 示例: 编写 gRPC User/client
-
唯一重要的区别是,您需要使 gRPC gevent 兼容,在打开通道之前执行以下代码: import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
-
要测试的 Dummy server: import hello_pb2_grpc
import hello_pb2
import grpc
from concurrent import futures
import logging
import time
logger = logging.getLogger(__name__)
class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
def SayHello(self, request, context):
name = request.name
time.sleep(1)
return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")
def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
server.add_insecure_port("localhost:50051")
server.start()
logger.info("gRPC server started")
server.wait_for_termination()
-
gRPC client, base User 示例:
import grpc
import hello_pb2_grpc
import hello_pb2
from locust import events, User, task
from locust.exception import LocustError
from locust.user.task import LOCUST_STATE_STOPPING
from hello_server import start_server
import gevent
import time
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)
class GrpcClient:
def __init__(self, environment, stub):
self.env = environment
self._stub_class = stub.__class__
self._stub = stub
def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "grpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(request_meta["response"].message)
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self.env.events.request.fire(**request_meta)
return request_meta["response"]
return wrapper
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(environment, stub)
class HelloGrpcUser(GrpcUser):
host = "localhost:50051"
stub_class = hello_pb2_grpc.HelloServiceStub
@task
def sayHello(self):
if not self._channel_closed:
self.client.SayHello(hello_pb2.HelloRequest(name="Test"))
time.sleep(1)
- 有关更多用户类型的示例,请参见 locust-plugins (它拥有 WebSocket/SocketIO、 Kafka、 Selenium/WebDriver 等多个用户)
测试 SDK
-
如果您要测试的目标系统已经有一个构建好的 SDK 可用,Locust 支持将其集成到您的负载测试工作中使用。 -
要实现这一点,唯一的先决条件是:SDK 需要有一个可访问的 request.Sessions 类。 -
下面的示例显示 locust 客户端在启动期间覆盖了 Archivist SDK 内部的 _session 对象。 import locust
from locust.user import task
from archivist.archivist import Archivist
class ArchivistUser(locust.HttpUser):
def on_start(self):
AUTH_TOKEN = None
with open("auth.text") as f:
AUTH_TOKEN = f.read()
self.arch: Archivist = Archivist(url=self.host, auth=AUTH_TOKEN)
self.arch._session = self.client
@task
def Create_assets(self):
"""User creates assets as fast as possible"""
while True:
self.arch.assets.create(behaviours=["Builtin", "RecordEvidence", "Attachments"], attrs={"foo": "bar"})
使用快速 HTTP 客户端提升性能
- 快速 HTTP 客户端
- Locust 的默认 HTTP 客户端使用 python-requests 。它提供了许多 python 开发人员所熟悉的很好的 API,并且维护得非常好。但是,如果您计划以非常高的吞吐量运行测试,并且只有有限的硬件来运行 Locust,那么它有时效率不够高。
- 正因为如此,Locust 还提供了 FastHttpUser,它使用 geventhttpclient 来代替。它提供了一个非常类似的 API,使用的 CPU 时间大大减少,有时在给定硬件上每秒钟最大请求数会增加多达5x-6x。
- 很难说你的特定硬件能处理什么,但是在最好的情况下,使用 FastHttpUsers 的测试每个内核每秒能处理接近5000个请求,而 HttpUser 每秒能处理大约850个请求(在2018 MacBook Pro i72.6 GHz 上测试)。实际上,您的结果可能会有所不同,如果负载测试还执行其他 cpu 密集型的操作,您将看到更小的收益。
- 只要您的负载生成器 CPU 没有超载,FastHttpUser 的响应时间应该与 HttpUser 的响应时间几乎相同。从这个意义上讲,它并不是“更快”。当然,它不能加速你正在测试的系统。
- 如何使用 FastHttpUser
-
只需要子类化 FastHttpUser 而不是 HttpUser from locust import task, FastHttpUser
class MyUser(FastHttpUser):
@task
def index(self):
response = self.client.get("/")
-
FastHttpUser/geventhttpclient 与 HttpUser/python-requests 非常相似,但有时会有细微的差别。如果您需要使用客户端库的内部机制,例如手动管理 cookie,那么这种情况尤其明显。 - API
- FastHttpUser
- FastHttpSession
- FastResponse
事件挂钩
-
事件挂钩
-
Locust 带有一些事件挂钩,可以用于在多种不同方式上扩展 Locust。 -
例如,下面是如何设置一个事件侦听器,该侦听器将在请求完成后触发。 from locust import events
@events.request.add_listener
def my_request_handler(request_type, name, response_time, response_length, response,
context, exception, start_time, url, **kwargs):
if exception:
print(f"Request to {name} failed with exception {exception}")
else:
print(f"Successfully made a request to: {name})
print(f"The response was {response.text}")
-
在上面的示例中,通配符关键字参数(** kwargs)将为空,因为我们已处理所有参数,但是如果 Locust 在未来版本中添加了新的参数,它可以防止代码因异常而中断。 -
另外,完全可以实现一个不为此事件提供所有参数的客户端。例如,非 http 协议甚至可能没有 url 或响应对象的概念。从侦听器函数定义中删除所有这类缺少的字段或使用默认参数。 -
在分布式模式下运行 locust 时,在运行测试之前 worker 节点上做一些设置可能会很有用。你可以通过检查节点的 runner 类型来确保你没有在 master 节点上运行。 from locust import events
from locust.runners import MasterRunner
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Beginning test setup")
else:
print("Started test from Master node")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Cleaning up test data")
else:
print("Stopped test from Master node")
-
您还可以使用事件来添加 自定义命令行参数 。 -
若要查看可用事件的完整列表,请参见 事件挂钩 。 -
请求上下文
-
request event 有一个上下文参数,使您能够传递有关请求的数据(例如用户名、标签等)。 -
可以在对 request 方法的调用中直接设置它。 class MyUser(HttpUser):
@task
def t(self):
self.client.post("/login", json={"username": "foo"})
self.client.get("/other_request", context={"username": "foo"})
@events.request.add_listener
def on_request(context, **kwargs):
if context:
print(context["username"])
-
也可以在 User 级别上设置,通过重写 User.context ()方法。 class MyUser(HttpUser):
def context(self):
return {"username": self.username}
@task
def t(self):
self.username = "foo"
self.client.post("/login", json={"username": self.username})
@events.request.add_listener
def on_request(context, **kwargs):
print(context["username"])
-
添加 Web 路由
-
Locust 使用 Flask 服务来产生网页用户界面,因此可以很容易添加 web end-points 到网页用户界面。通过监听 init 事件,我们可以获取到 Flask app 实例的引用,并使用它来设置一个新的路由。 from locust import events
@events.init.add_listener
def on_locust_init(environment, **kw):
@environment.web_ui.app.route("/added_page")
def my_added_page():
return "Another page"
-
启动 locust 后就可以浏览 http://127.0.0.1:8089/added_page 。 -
扩展 Web 用户界面
- 你还可以使用 Flask Blueprints 和 templates,不仅能添加 web 路由,而且能扩展的网页界面,让你的自定义数据可以和 Locust 的统计数据一起展示。这是更高级的,因为它还涉及到编写和包含 HTML 和 Javascript 文件,通过路由服务,但可以大大提高实用性和可定制的 web UI。
- 在 Locust 源代码的 examples 目录中可以找到一个扩展 web UI 的工作示例,包括 HTML 和 Javascript 示例文件。
-
运行一个后台 greenlet
-
因为 locust 文件只是“代码”,所以没有什么可以阻止您生成自己的 greenlet 来与实际的负载/用户并行运行。 -
例如,你可以监视测试的失败率,如果超过某个阈值,你可以停止运行。 from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, MasterRunner, LocalRunner
def checker(environment):
while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
if environment.runner.stats.total.fail_ratio > 0.2:
print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
gevent.spawn(checker, environment)
-
将 locustfiles 参数化
- 基本环境变量
- 自定义参数
-
您可以使用 init_command_line_parser Event 向 Locust 添加自己的命令行参数。自定义参数也可以在 web UI 中显示和编辑。 from locust import HttpUser, task, events
@events.init_command_line_parser.add_listener
def _(parser):
parser.add_argument("--my-argument", type=str, env_var="LOCUST_MY_ARGUMENT", default="", help="It's working")
parser.add_argument("--my-ui-invisible-argument", include_in_web_ui=False, default="I am invisible")
@events.test_start.add_listener
def _(environment, **kw):
print(f"Custom argument supplied: {environment.parsed_options.my_argument}")
class WebsiteUser(HttpUser):
@task
def my_task(self):
print(f"my_argument={self.environment.parsed_options.my_argument}")
print(f"my_ui_invisible_argument={self.environment.parsed_options.my_ui_invisible_argument}")
-
在运行 Locust 分布式时,自定义参数会在运行开始时自动转发给 worker(但不是在此之前,因此在测试实际开始之前不能依赖于转发的参数)。 -
测试数据管理
- 有许多方法可以将测试数据输入到测试中(毕竟,您的测试只是一个 Python 程序,它可以做 Python 所能做的任何事情)。Locust 的事件使您能够细粒度地控制何时获取/释放测试数据。详细的示例
-
更多示例参见 locust-plugins
日志
-
Locust 使用 Python 内置的 logging 框架 来处理日志记录。 -
Locust 默认的日志配置是直接将日志消息写入 stderr。–loglevel 和 --logfile 可用于更改日志级别程和/或将日志输出方式转换为文件。 -
默认的日志配置会安装 root logger 即 locust.* logger,因此在您自己的测试脚本中使用 root logger 以及 --logfile,将把日志输出到文件。 import logging
logging.info("this log message will go wherever the other locust log messages go")
-
还可以使用 --skip-log-setup 选项在您自己的测试脚本中控制整个日志配置。然后,您必须自己配置 logging 。 -
Locust loggers
- 下面是 Locust 中使用的 loggers 的表格(供手动配置日志设置时参考) 。
- locust:Locust 命名空间,用于所有的 logger,比如 locust.main, locust.runners 等。
- locust.stats_logger:这个 logger 用于定期将当前的统计数据打印到控制台。使用 --logfile 时,统计数据默认不会输出到日志文件。
将 locust 作为三方库使用
-
可以从您自己的 Python 代码启动负载测试,而不是使用 locust 命令运行 Locust。 -
首先创建一个 Environment 实例。 from locust.env import Environment
env = Environment(user_classes=[MyTestUser])
-
Environment 实例的 create_local_runner, create_master_runner, create_worker_runner 可用于启动一个 Runner 实例,它可用于启动负载测试。 env.create_local_runner()
env.runner.start(5000, spawn_rate=20)
env.runner.greenlet.join()
-
还可以绕过分派和分发逻辑,手动控制产生的用户。 new_users = env.runner.spawn_users({MyUserClass.__name__: 2})
new_users[1].my_custom_token = "custom-token-2"
new_users[0].my_custom_token = "custom-token-1"
-
上面的例子只能在独立模式下工作,并且是一个实验性的特性,这意味着它可以在未来的版本中被删除。但是,如果您希望对产生的用户进行细粒度控制,那么它是非常有用的。 -
不要试图在相同的 Python 进程中创建 master runner 和 worker(s)。它不起作用,即使起作用,也不会比只运行一个 LocalRunner 给你带来更好的性能。每个 worker 都必须在自己的进程中运行,这是没有办法的。 -
我们还可以使用 Environment 实例的 create_web_ui 方法来启动一个 Web UI,用于查看统计数据,并控制 runner (例如启动和停止负载测试) 。 env.create_local_runner()
env.create_web_ui()
env.web_ui.greenlet.join()
-
完整实例 import gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer, stats_history
from locust.log import setup_logging
setup_logging("INFO", None)
class User(HttpUser):
wait_time = between(1, 3)
host = "https://docs.locust.io"
@task
def my_task(self):
self.client.get("/")
@task
def task_404(self):
self.client.get("/non-existing-path")
env = Environment(user_classes=[User])
env.create_local_runner()
env.create_web_ui("127.0.0.1", 8089)
gevent.spawn(stats_printer(env.stats))
gevent.spawn(stats_history, env.runner)
env.runner.start(1, spawn_rate=10)
gevent.spawn_later(60, lambda: env.runner.quit())
env.runner.greenlet.join()
env.web_ui.stop()
扩展阅读/知识库
|