MQTT (消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922 )下基于发布/订阅范式的消息协议。它工作在TCP/IP 协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议 。
MQTT 是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT 协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M )通信和物联网(IoT )。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
更多关于MQTT协议的介绍可自行百度,下文主要介绍使用libhv 中MQTT 的实现与使用。
MQTT实现
实现代码位于 mqtt目录 整个实现只有不到700行,以往教程中我是很少讲源码解析的,但MQTT的实现非常精简且经典,非常适合用来学习,从而掌握使用libhv开发自定义应用层协议 的技巧。
MQTT固定头格式(1字节flags + varint 编码的长度字段):
最简单的MQTT消息(比如长度为0的PING 消息),只需2个字节,所以MQTT协议是非常精简的协议。
协议定义
见 mqtt_protocol.h
#define DEFAULT_MQTT_PORT 1883
#define MQTT_PROTOCOL_V31 3
#define MQTT_PROTOCOL_V311 4
#define MQTT_PROTOCOL_V5 5
#define MQTT_PROTOCOL_NAME "MQTT"
#define MQTT_PROTOCOL_NAME_v31 "MQIsdp"
#define MQTT_CONN_CLEAN_SESSION 0x02
#define MQTT_CONN_HAS_WILL 0x04
#define MQTT_CONN_WILL_RETAIN 0x20
#define MQTT_CONN_HAS_PASSWORD 0x40
#define MQTT_CONN_HAS_USERNAME 0x80
typedef enum {
MQTT_TYPE_CONNECT = 1,
MQTT_TYPE_CONNACK = 2,
MQTT_TYPE_PUBLISH = 3,
MQTT_TYPE_PUBACK = 4,
MQTT_TYPE_PUBREC = 5,
MQTT_TYPE_PUBREL = 6,
MQTT_TYPE_PUBCOMP = 7,
MQTT_TYPE_SUBSCRIBE = 8,
MQTT_TYPE_SUBACK = 9,
MQTT_TYPE_UNSUBSCRIBE = 10,
MQTT_TYPE_UNSUBACK = 11,
MQTT_TYPE_PINGREQ = 12,
MQTT_TYPE_PINGRESP = 13,
MQTT_TYPE_DISCONNECT = 14,
} mqtt_type_e;
typedef enum {
MQTT_CONNACK_ACCEPTED = 0,
MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1,
MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2,
MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3,
MQTT_CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4,
MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5,
} mqtt_connack_e;
typedef struct mqtt_head_s {
unsigned char type: 4;
unsigned char dup: 1;
unsigned char qos: 2;
unsigned char retain: 1;
unsigned int length;
} mqtt_head_t;
typedef struct mqtt_message_s {
unsigned int topic_len;
const char* topic;
unsigned int payload_len;
const char* payload;
unsigned char qos;
unsigned char retain;
} mqtt_message_t;
头文件里主要定义了mqtt_type_e 消息类型、mqtt_head_t 头部、mqtt_message_t 消息。
协议里稍微复杂的是MQTT_TYPE_CONNECT ,即连接成功后发送的登录认证消息。 MQTT连接标示:
0 | 1 | 2 | 3-4 | 5 | 6 | 7 |
---|
reserved | clean_session | has_will | will_qos | will_retain | has_password | has_username |
接口定义
见 mqtt_client.h
typedef struct mqtt_client_s mqtt_client_t;
typedef void (*mqtt_client_cb)(mqtt_client_t* cli, int type);
struct mqtt_client_s {
char host[256];
int port;
reconn_setting_t* reconn_setting;
unsigned char protocol_version;
unsigned char clean_session: 1;
unsigned char ssl: 1;
unsigned char alloced_ssl_ctx: 1;
unsigned short keepalive;
char client_id[64];
mqtt_message_t* will;
char username[64];
char password[64];
mqtt_head_t head;
int error;
int mid;
mqtt_message_t message;
mqtt_client_cb cb;
void* userdata;
hloop_t* loop;
hio_t* io;
htimer_t* reconn_timer;
hssl_ctx_t ssl_ctx;
hmutex_t mutex_;
};
HV_EXPORT mqtt_client_t* mqtt_client_new(hloop_t* loop DEFAULT(NULL));
HV_EXPORT void mqtt_client_run (mqtt_client_t* cli);
HV_EXPORT void mqtt_client_stop(mqtt_client_t* cli);
HV_EXPORT void mqtt_client_free(mqtt_client_t* cli);
HV_EXPORT void mqtt_client_set_id(mqtt_client_t* cli, const char* id);
HV_EXPORT void mqtt_client_set_will(mqtt_client_t* cli,
mqtt_message_t* will);
HV_EXPORT void mqtt_client_set_auth(mqtt_client_t* cli,
const char* username, const char* password);
HV_EXPORT void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb);
HV_EXPORT void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata);
HV_EXPORT void* mqtt_client_get_userdata(mqtt_client_t* cli);
HV_EXPORT int mqtt_client_get_last_error(mqtt_client_t* cli);
HV_EXPORT int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx);
HV_EXPORT int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt);
HV_EXPORT int mqtt_client_set_reconnect(mqtt_client_t* cli,
reconn_setting_t* reconn);
HV_EXPORT int mqtt_client_reconnect(mqtt_client_t* cli);
HV_EXPORT int mqtt_client_connect(mqtt_client_t* cli,
const char* host,
int port DEFAULT(DEFAULT_MQTT_PORT),
int ssl DEFAULT(0));
HV_EXPORT int mqtt_client_disconnect(mqtt_client_t* cli);
HV_EXPORT int mqtt_client_publish(mqtt_client_t* cli,
mqtt_message_t* msg);
HV_EXPORT int mqtt_client_subscribe(mqtt_client_t* cli,
const char* topic, int qos DEFAULT(0));
HV_EXPORT int mqtt_client_unsubscribe(mqtt_client_t* cli,
const char* topic);
接口列表:
mqtt_client_new :新建MQTT客户端结构体mqtt_client_free :释放MQTT客户端结构体mqtt_client_run :运行MQTT客户端mqtt_client_stop :停止MQTT客户端mqtt_client_set_id :设置客户端IDmqtt_client_set_will :设置遗嘱mqtt_client_set_auth :设置认证用户名密码mqtt_client_set_callback :设置回调mqtt_client_set_userdata :设置用户数据mqtt_client_get_userdata :获取用户数据mqtt_client_get_last_error :获取最后的错误码mqtt_client_set_ssl_ctx :设置SSL_CTX(用于SSL/TLS加密通信)mqtt_client_new_ssl_ctx :新建SSL_CTXmqtt_client_set_reconnect :设置重连mqtt_client_reconnect :重连mqtt_client_connect :开始连接mqtt_client_disconnect :断开连接mqtt_client_publish :发布mqtt_client_subscribe :订阅mqtt_client_unsubscribe :取消订阅
下面我们介绍几个主要接口的实现:
mqtt_client_run
void mqtt_client_run (mqtt_client_t* cli) {
if (!cli || !cli->loop) return;
hloop_run(cli->loop);
}
mqtt_client_run 就是调用hloop_run ,启动一个事件循环;
mqtt_client_stop
void mqtt_client_stop(mqtt_client_t* cli) {
if (!cli || !cli->loop) return;
hloop_stop(cli->loop);
}
mqtt_client_stop 就是调用 hloop_stop ,停止事件循环;
mqtt_client_connect 连接
int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
if (!cli) return -1;
safe_strncpy(cli->host, host, sizeof(cli->host));
cli->port = port;
cli->ssl = ssl;
hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
if (io == NULL) return -1;
if (ssl) {
if (cli->ssl_ctx) {
hio_set_ssl_ctx(io, cli->ssl_ctx);
}
hio_enable_ssl(io);
}
cli->io = io;
hevent_set_userdata(io, cli);
hio_setcb_connect(io, on_connect);
hio_setcb_close(io, on_close);
return hio_connect(io);
}
hio_create_socket ,创建一个套接字,返回一个hio_t 对象;hio_enable_ssl 启用SSL/TLS ,当然你可以调用mqtt_client_new_ssl_ctx 去设置证书等选项;hevent_set_userdata :设置用户数据(绑定上下文);hio_setcb_connect :设置连接回调函数;hio_setcb_close :设置关闭回调函数;hio_connect :开始连接;
mqtt_client_disconnect 断开连接
int mqtt_client_disconnect(mqtt_client_t* cli) {
if (!cli || !cli->io) return -1;
mqtt_client_set_reconnect(cli, NULL);
mqtt_send_disconnect(cli->io);
return hio_close(cli->io);
}
mqtt_client_set_reconnect :因为是主动断开连接,首先取消重连来避免触发断线重连;mqtt_send_disconnect :发送一个MQTT主动断开连接的消息;hio_close :关闭连接;
连接回调 on_connect
static void on_connect(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
if (cli->cb) {
cli->head.type = MQTT_TYPE_CONNECT;
cli->cb(cli, cli->head.type);
}
if (cli->reconn_setting) {
reconn_setting_reset(cli->reconn_setting);
}
static unpack_setting_t mqtt_unpack_setting;
mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
mqtt_unpack_setting.body_offset = 2;
mqtt_unpack_setting.length_field_offset = 1;
mqtt_unpack_setting.length_field_bytes = 1;
mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
hio_set_unpack(io, &mqtt_unpack_setting);
hio_setcb_read(io, on_packet);
hio_read(io);
mqtt_client_login(cli);
}
hio_set_unpack : 设置拆包规则,支持固定包长 、分隔符 、头部长度字段 三种常见的拆包方式,调用该接口设置拆包规则后,内部会根据拆包规则处理粘包与分包,保证回调上来的是完整的一包数据,大大节省了上层处理粘包与分包的成本,MQTT 协议对应头部长度字段 这种拆包规则,关于该接口的更详细介绍见 libhv教程14–200行实现一个纯C版jsonrpc框架hio_setcb_read :设置读回调,因为上面设置了拆包规则,回调上来的就是完整的一包数据,所以叫on_packet ;hio_read :开始读;mqtt_client_login :MQTT登录认证;
断链回调 on_close + 重连 mqtt_client_reconnect
static void reconnect_timer_cb(htimer_t* timer) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
if (cli == NULL) return;
cli->reconn_timer = NULL;
mqtt_client_reconnect(cli);
}
static void on_close(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
if (cli->cb) {
cli->head.type = MQTT_TYPE_DISCONNECT;
cli->cb(cli, cli->head.type);
}
if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
cli->reconn_timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
hevent_set_userdata(cli->reconn_timer, cli);
}
}
int mqtt_client_reconnect(mqtt_client_t* cli) {
mqtt_client_connect(cli, cli->host, cli->port, cli->ssl);
return 0;
}
断链回调里除了调用cli->cb 通知上层掉线外,另外就是判断如果设置了重连,则启动一个定时器一段时间后再尝试重连;
reconn_setting_can_retry :判断是否还有剩余重连次数;reconn_setting_calc_delay :计算重连延时;htimer_add :添加一个定时器;mqtt_client_reconnect :因为连接时已经记录了host、port、ssl 等信息,重连就是再次调用mqtt_client_connect ;
包回调 on_packet
static void on_packet(hio_t* io, void* buf, int len) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
unsigned char* p = (unsigned char*)buf;
unsigned char* end = p + len;
memset(&cli->head, 0, sizeof(mqtt_head_t));
int headlen = mqtt_head_unpack(&cli->head, p, len);
if (headlen <= 0) return;
p += headlen;
switch (cli->head.type) {
case MQTT_TYPE_CONNACK:
{
if (cli->head.length < 2) {
hloge("MQTT CONNACK malformed!");
hio_close(io);
return;
}
unsigned char conn_flags = 0, rc = 0;
POP8(p, conn_flags);
POP8(p, rc);
if (rc != MQTT_CONNACK_ACCEPTED) {
cli->error = rc;
hloge("MQTT CONNACK error=%d", cli->error);
hio_close(io);
return;
}
if (cli->keepalive) {
hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
}
}
break;
case MQTT_TYPE_PUBLISH:
{
if (cli->head.length < 2) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
memset(&cli->message, 0, sizeof(mqtt_message_t));
POP16(p, cli->message.topic_len);
if (end - p < cli->message.topic_len) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
cli->message.topic = (char*)p;
p += cli->message.topic_len;
if (cli->head.qos > 0) {
if (end - p < 2) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
cli->message.payload_len = end - p;
if (cli->message.payload_len > 0) {
cli->message.payload = (char*)p;
}
cli->message.qos = cli->head.qos;
if (cli->message.qos == 0) {
} else if (cli->message.qos == 1) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
} else if (cli->message.qos == 2) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
}
}
break;
case MQTT_TYPE_PUBACK:
case MQTT_TYPE_PUBREC:
case MQTT_TYPE_PUBREL:
case MQTT_TYPE_PUBCOMP:
{
if (cli->head.length < 2) {
hloge("MQTT PUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
if (cli->head.type == MQTT_TYPE_PUBREC) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
} else if (cli->head.type == MQTT_TYPE_PUBREL) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
}
}
break;
case MQTT_TYPE_SUBACK:
{
if (cli->head.length < 2) {
hloge("MQTT SUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
break;
case MQTT_TYPE_UNSUBACK:
{
if (cli->head.length < 2) {
hloge("MQTT UNSUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
break;
case MQTT_TYPE_PINGREQ:
mqtt_send_pong(io);
return;
case MQTT_TYPE_PINGRESP:
return;
case MQTT_TYPE_DISCONNECT:
hio_close(io);
return;
default:
hloge("MQTT client received wrong type=%d", (int)cli->head.type);
hio_close(io);
return;
}
if (cli->cb) {
cli->cb(cli, cli->head.type);
}
}
包回调里,就是先调用mqtt_head_unpack 解析MQTT头部,然后根据消息类型做对应处理,都是些协议相关的细节了,值得一提的是hio_set_heartbeat 设置了一个心跳函数,每隔一段时间发送一个MQTT应用层心跳包 来保活。
使用示例
见 examples/mqtt
编译
git clone https://gitee.com/libhv/libhv.git
./configure --with-mqtt
make
broker
可以使用 mosquitto download
订阅端
bin/mqtt_sub 127.0.0.1 1883 hello
发布端
bin/mqtt_pub 127.0.0.1 1883 hello world
代码流程图
Created with Rapha?l 2.3.0
main
mqtt_client_new
创建MQTT客户端
mqtt_client_connect
开始连接
mqtt_client_run
运行MQTT客户端
on_connack
连接登陆成功回调
mqtt_client_subscribe
订阅
mqtt_client_publish
发布
mqtt_client_disconnect
断开连接
on_disconnect
断链回调
mqtt_client_stop
停止MQTT客户端
mqtt_client_free
释放MQTT客户端
结束
|