此篇博客主要介绍 ESP-IDF 里的 MQTT 示例相关知识,分为以下几个小节:
- ESP-MQTT 介绍
- MQTT 基本知识简介
- ESP-IDF MQTT 示例入门
- ESP-MQTT 常见问题
1 ESP-MQTT 介绍
在 ESP-IDF 中,MQTT 部分主要使用到了 ESP-MQTT 库,ESP-MQTT 是 MQTT 协议客户端的实现(MQTT 是轻量级的发布/订阅消息协议),它具备以下特征:
- 支持 MQTT over TCP、SSL with mbedtls、MQTT over Websocket、MQTT over Websocket Secure。
- 可轻松配置 URI
- 多个实例(一个应用程序中有多个客户端)
- 支持订阅、发布、身份验证、last will 消息、keep alive ping 和所有 3 个 QoS 等级,基本组成了一个功能齐全的客户端。
- MQTT 分为 4 个版本:MQTT 5,MQTT 3.1.1,MQTT 3.1,MQTT - SN v1.2. ESP-IDF 支持的版本为 MQTT 3.1.1 和 MQTT 3.1
2 MQTT 基本知识简介
请参考 MQTT 基本知识简介.
3 ESP-IDF MQTT 示例入门
ESP-IDF 里主要有以下 MQTT 示例:
以下为 MQTT 示例里的主要配置。
3.1 URI
目前支持 mqtt, mqtts, ws, wss 这几种 URI 类型。以下是 mqtt_tcp 对应的代码段:
const esp_mqtt_client_config_t mqtt_cfg = {
.uri = "mqtt://mqtt.eclipseprojects.io",
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
esp_mqtt_client_start(client);
配置上述代码段里的 broker URI 即可。
如果需要 mqtt_ssl 等加密套件,对应代码段如下:
const esp_mqtt_client_config_t mqtt_cfg = {
.uri = "mqtts://mqtt.eclipseprojects.io:8883",
.event_handle = mqtt_event_handler,
.cert_pem = (const char *)mqtt_eclipse_org_pem_start,
};
此时要从服务器获得证书,以服务器 mqtt.eclipseprojects.io 为例,可以使用以下指令来生成:
openssl s_client -showcerts -connect mqtt.eclipseprojects.io:8883 </dev/null 2>/dev/null|openssl x509 -outform PEM >mqtt_eclipse_org.pem
如果证书不是以空字符结尾,则 cert_len 还应设置。其他与 SSL 相关的配置参数如下:
-
use_global_ca_store :使用全局证书存储来验证服务器证书,更多信息可以查看 esp-tls.h -
client_cert_pem :指向用于 SSL 相互认证的 PEM 或 DER 格式的证书数据的指针,默认为 NULL,如果不需要相互认证则不需要 -
client_cert_len :client_cert_pem 指向的缓冲区的长度。对于以 NULL 结尾的 pem,可能为 0 -
client_key_pem :指向用于 SSL 相互认证的 PEM 或 DER 格式的私钥数据的指针,默认为 NULL,如果不需要相互认证则不需要 -
client_key_len :client_key_pem 指向的缓冲区的长度。对于以 NULL 结尾的 pem,可能为 0 -
psk_hint_key :指向 esp-tls.h 中定义的 PSK 结构的指针,以启用 PSK 身份验证(作为证书验证的替代方法)。如果不是 NULL 且服务器/客户端证书为 NULL,则启用 PSK -
alpn_protos :用于 ALPN 的以 NULL 结尾的协议列表
3.2 最后遗嘱
MQTT 允许最后遗嘱 (LWT) 消息在客户端异常断开连接时通知其他客户端。由 esp_mqtt_client_config_t 结构体中的以下字段配置。
3.3 其他配置参数
-
disable_clean_session :确定连接消息的干净会话标志,默认为干净会话 -
keepalive :确定客户端在断开连接前等待 ping 响应的秒数,默认为 120 秒。 -
disable_auto_reconnect :启用以阻止客户端在错误或断开连接后重新连接到服务器 -
user_context :将传递给事件处理程序的自定义上下文 -
task_prio :MQTT 任务优先级,默认为 5 -
task_stack :MQTT 任务堆栈大小,默认为 6144 字节,设置此项将覆盖 menuconfig 中的设置 -
buffer_size :MQTT 发送/接收缓冲区的大小,默认为 1024 字节 -
username :指向用于连接到代理的用户名的指针 -
password :指向用于连接到代理的密码的指针 -
client_id :指向客户端 ID 的指针,默认为ESP32_%CHIPID%其中 %CHIPID% 是十六进制格式的 MAC 地址的最后 3 个字节 -
host :MQTT 代理域(ipv4 作为字符串),设置 uri 将覆盖此 -
port :MQTT 代理端口,在 uri 中指定端口将覆盖此 -
transport :设置传输协议,设置 uri 后会覆盖此设置 -
refresh_connection_after_ms :在这个值之后刷新连接(以毫秒为单位) -
event_handle :处理 MQTT 事件作为遗留模式下的回调 -
event_loop_handle :MQTT 事件循环库的句柄
3.4 更改项目配置菜单中的设置
可以通过 idf.py menuconfig ,在 Component config -> ESP-MQTT Configuration 下使用 找到 MQTT 的配置。
以下设置可用:
CONFIG_MQTT_PROTOCOL_311 : 启用 3.1.1 版本的 MQTT 协议
CONFIG_MQTT_TRANSPORT_SSL , CONFIG_MQTT_TRANSPORT_WEBSOCKET :启用特定的 MQTT 传输层,例如 SSL、WEBSOCKET、WEBSOCKET_SECURE
CONFIG_MQTT_CUSTOM_OUTBOX :禁用 mqtt_outbox 的默认实现,因此可以提供特定的实现
3.5 事件
MQTT 客户端可能会发布以下事件:
MQTT_EVENT_BEFORE_CONNECT :客户端已初始化并即将开始连接到代理。
MQTT_EVENT_CONNECTED :客户端已成功建立与代理的连接。客户端现在已准备好发送和接收数据。
MQTT_EVENT_DISCONNECTED :由于无法读取或写入数据,例如因为服务器不可用,客户端已中止连接。
MQTT_EVENT_SUBSCRIBED :代理已确认客户端的订阅请求。事件数据将包含订阅消息的消息 ID。
MQTT_EVENT_UNSUBSCRIBED :代理已确认客户端的退订请求。事件数据将包含取消订阅消息的消息 ID。
MQTT_EVENT_PUBLISHED :代理已确认客户端的发布消息。这只会针对服务质量级别 1 和 2 发布,因为级别 0 不使用确认。事件数据将包含发布消息的消息 ID。
MQTT_EVENT_DATA :客户端已收到发布消息。事件数据包含:消息 ID、发布到的主题名称、接收到的数据及其长度。对于超出内部缓冲区的数据,将发布多个 MQTT_EVENT_DATA 并更新来自事件数据的 current_data_offset 和 total_data_len 以跟踪碎片化消息。
MQTT_EVENT_ERROR :客户端遇到错误。esp_mqtt_error_type_t 来自事件数据中的 error_handle 可以用来进一步判断错误的类型。错误的类型将决定 error_handle 结构的哪些部分被填充。
3.6 Publish & Subscribe API 使用
参考 mqtt_tcp 例程,如下:
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
esp_mqtt_client_handle_t client = event->client;
int msg_id;
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
ESP_LOGI(

TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
return ESP_OK;
}
可以看到对应的 API :
-
Publish : int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
client mqtt client handletopic topic stringdata payload string (set to NULL, sending empty payload message)len data length, if set to 0, length is calculated from payload stringqos qos of publish messageretain retain flag -
Subscribe:int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos);
client mqtt client handletopic topic stringqos qos of publish message
4 要点说明
4.1 Retain 标志位
当使用 MQTT 客户端发布消息(PUBLISH)时,如果将 RETAIN 标志位设置为 true,那么 MQTT 服务器会将最近收到的一条 RETAIN 标志位为 true 的消息保存在服务器端(内存或文件)。 特别注意:MQTT 服务器只会为每一个 Topic 保存最近收到的一条 RETAIN 标志位为 true 的消息!也就是说,如果 MQTT 服务器上已经为某个 Topic 保存了一条 Retained 消息,当客户端再次发布一条新的 Retained 消息,那么服务器上原来的那条消息会被覆盖
每当 MQTT 客户端连接到 MQTT 服务器并订阅了某个 Topic,如果该 Topic 下有 Retained 消息,那么 MQTT 服务器会立即向客户端推送该条 Retained 消息
- 发布 RETAIN 消息:如果想让 MQTT 服务器为某个 Topic 保留消息,只需要在发布消息的时候指定 RETAIN 标志位为 true 即可
- 删除 RETAIN 消息:如果客户端想让 MQTT 服务器删除某个 Topic 下保存的 Retained 消息,唯一的方法是向 MQTT 服务器发布一条 RETAIN 标志位为 true 的空消息
4.2 LWT 标志位
LWT 全称为 Last Will and Testament,也就是我们在连接到 Broker 时提到的遗愿,包括遗愿主题、遗愿 QoS、遗愿消息等。
当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗愿主题里面发布一条消息。遗愿相关的设置是在建立连接的时候,在 CONNECT 数据包里面指定的。
- Will Flag:是否使用 LWT
- Will Topic:遗愿主题名,不可使用通配符
- Will Qos:发布遗愿消息时使用的 QoS
- Will Retain:遗愿消息的 Retain 标识
- Will Message:遗愿消息内容
Broker 在以下情况下认为 Client 是非正常断开连接的:
- Broker 检测到底层的 I/O 异常;
- Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
- Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
- Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。通常,如果我们关心一个设备,比如传感器的连接状态,可以使用 LWT。
5 ESP-MQTT 常见问题
Q : disable_clean_session 这个参数配置的作用是什么? A :cleanSession 标志是 MQTT 协议中对一个客户端建立 TCP 连接后是否关心之前状态的定义。具体语义如下: cleanSession = true:客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息。 cleanSession = false:客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效。
Q : 为什么单次只能 publish 最多 1 K 的数据? A :这是因为 buffer_size 默认为 1 K, 可根据应用需求自行配置。
6 参考文章
|