一、引言
由于毕设需求,需要通过MQTT上传数据与下发命令,所以小白的我学习了MQTT,虽然理论部分还是懵懵懂懂,但别人的实现代码看懂也还算凑合。对于mqtt的入门我是跟着B站一位up主物联网技术大神 的教学视频学习的,他是以stm32开发板来讲解的。由于个人能力薄弱所以自己整合的代码有很大一部分是直接copy这位up主的(嘿嘿 😮),特别mqtt.c中的,不过也做了些许修改以适用于树莓派,在此多谢这位up主。由于onenet是一个免费开放平台,所以对于没钱党的我选择了它。 我选择把它整合为树莓派可用也是有原因的:第一,该up主的代码适用于stm32,并且我也直接用他的mqtt框架在STM32上开发,但后来发现不跑系统直接用一个while循环后面有很多问题就难以解决,而自己又没学过可在stm32上运行的实时操作系统,但对Linux有一点了解,所以这是原因之一。第二,网上也有很多mqtt源码可适用于树莓派,但为了加深对mqtt实现过程的了解,我选择自己整合一下代码,加深印象。
up主:物联网技术大神 视频地址:https://b23.tv/mJEpQh 参考文章:https://blog.csdn.net/u010835747/article/details/108485662
推荐对于mqtt入门可参考:菜鸟教程的MQTT 入门介绍 推荐网上的文章如:MQTT 嵌入式 C语言 客户端libemqtt源码解析 C语言实现mosquitto发布、订阅消息
二、代码
话不多,说先上代码。 main.c
#include "mqtt.h"
#include "tcp.h"
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include<time.h>
#include <pthread.h>
#include <stdlib.h>
static int subscribe_flag = 1;
static int conect_flag = 1;
void Cmd_Handler(unsigned char* CmdBuf)
{
char Cmd1[16] = "open_Led";
char Cmd2[16] = "open_Beep";
char buf1[24] = "{\"LED\":1}";
if(!memcmp(CmdBuf,Cmd1,strlen(Cmd1))){
printf("灯已打开\n");
PayloadPublish(buf1,strlen(buf1));
}else if(!memcmp(CmdBuf,Cmd2,strlen(Cmd2))){
printf("蜂鸣器已打开\n");
}else{
printf("未知指令\n");
}
}
void *RcvMessage_thread(void *date)
{
unsigned int send_len = 0;
unsigned char buff[256] = {0};
unsigned char MQTT_RcvData[400];
unsigned char back_buff[100] = {0};
while(1){
if(conect_flag == 5){
printf("多次连接失败,请检查设备信息是否正确\n");
printf("程序已退出\n");
exit(0);
}
memset(MQTT_RcvData,0,sizeof(MQTT_RcvData));
memset(buff,0,sizeof(buff));
send_len = tcp_rcv(buff,sizeof(buff));
if(send_len < 0)
{
MY_MQTT_RECONECT();
continue;
}else{
memcpy(&MQTT_RcvData[2],buff,send_len);
MQTT_RcvData[0] = send_len/256;
MQTT_RcvData[1] = send_len%256;
}
if(MQTT_RcvData[2]==0x20){
switch(MQTT_RcvData[5]){
case 0x00 : printf("CONNECT报文成功\n");
MQTT_Subscribe(S_TOPIC_NAME,QoS_0);
conect_flag = 0;
break;
case 0x01 : printf("连接已拒绝,不支持的协议版本,准备重连\r\n");
MY_MQTT_RECONECT();
conect_flag++;
continue;
break;
case 0x02 : printf("连接已拒绝,不合格的客户端标识符,准备重连\r\n");
MY_MQTT_RECONECT();
conect_flag++;
continue;
break;
case 0x03 : printf("连接已拒绝,服务端不可用,准备重连\r\n");
MY_MQTT_RECONECT();
continue;
conect_flag++;
break;
case 0x04 : printf("连接已拒绝,无效的用户名或密码,准备重连\r\n");
MY_MQTT_RECONECT();
conect_flag++;
continue;
break;
case 0x05 : printf("连接已拒绝,未授权,准备重连\r\n");
MY_MQTT_RECONECT();
conect_flag++;
continue;
break;
default : printf("连接已拒绝,未知状态,准备重连\r\n");
MY_MQTT_RECONECT();
conect_flag++;
continue;
break;
}
}
else if(MQTT_RcvData[2]==0x90){
switch(MQTT_RcvData[6]){
case 0x00 :
case 0x01 : printf("订阅成功\r\n");
subscribe_flag = 0;
break;
default : printf("订阅失败,准备重新订阅\r\n");
sleep(1);
MQTT_Subscribe(S_TOPIC_NAME,QoS_0);
subscribe_flag++;
if(subscribe_flag == 5){
printf("多次订阅失败,准备重新连接!\r\n");
sleep(1);
MY_MQTT_RECONECT();
}
continue;
break;
}
}
else if(MQTT_RcvData[2]==0xD0){
printf("PING报文回复\r\n");
}
else if((MQTT_RcvData[2]==0x30)){
printf("服务器等级0推送\r\n");
MQTT_DealPushdata_Qs0(MQTT_RcvData,back_buff);
printf("Cmd: %s\n",back_buff);
Cmd_Handler(back_buff);
memset(back_buff,0,sizeof(back_buff));
}
}
}
void *ping_thread(void *date)
{
unsigned char buff[32] = {0};
unsigned int send_len = 0;
while(1)
{
if(conect_flag == 0){
memset(buff,0,sizeof(buff));
send_len = MQTT_ping(buff);
tcp_send(buff,send_len);
sleep(40);
}
}
}
int main()
{
pthread_t RcvMessageThread;
pthread_t PingThread;
My_MQTT_Init();
My_MQTT_CONECT();
pthread_create(&RcvMessageThread,NULL,RcvMessage_thread,NULL);
pthread_create(&PingThread,NULL,ping_thread,NULL);
while(conect_flag == 0);
char ba[40]="{\"temp\":22.8,\"humi\":55.4}";
MQTT_PublishQs0("kfb_topic",ba,strlen(ba));
pthread_join(RcvMessageThread,NULL);
pthread_join(PingThread,NULL);
return 0;
}
mqtt.c
#include <string.h>
#include "mqtt.h"
#include <stdio.h>
#include "tcp.h"
#include <unistd.h>
#include <stdlib.h>
char ClientID[128];
int ClientID_len;
char Username[128];
int Username_len;
char Passward[128];
int Passward_len;
static unsigned char MQTT_CONENCT_BUF[200];
static unsigned char MQTT_SUBSCRIBE_BUF[200];
static unsigned char MQTT_SEND_BUF[200];
static unsigned char MQTT_PUBLISH_BUF[200];
void IoT_Parameter_Init(void)
{
memset(ClientID,0,128);
sprintf(ClientID,"%s",DeviesID);
ClientID_len = strlen(ClientID);
memset(Username,0,128);
sprintf(Username,"%s",ProductID);
Username_len = strlen(Username);
memset(Passward,0,128);
sprintf(Passward,"%s",AuthInfo);
Passward_len = strlen(Passward);
}
unsigned int MQTT_ConectPack(unsigned char *buf)
{
int temp,Remaining_len,Fixed_len,Variable_len,Payload_len,Total_len;
Fixed_len = 1;
Variable_len = 10;
Payload_len = 2 + ClientID_len + 2 + Username_len + 2 + Passward_len;
Remaining_len = Variable_len + Payload_len;
memset(MQTT_CONENCT_BUF,0,sizeof(MQTT_CONENCT_BUF));
MQTT_CONENCT_BUF[0]=0x10;
do{
temp = Remaining_len%128;
Remaining_len = Remaining_len/128;
if(Remaining_len>0)
temp |= 0x80;
MQTT_CONENCT_BUF[Fixed_len] = temp;
Fixed_len++;
}while(Remaining_len>0);
MQTT_CONENCT_BUF[Fixed_len+0]=0x00;
MQTT_CONENCT_BUF[Fixed_len+1]=0x04;
MQTT_CONENCT_BUF[Fixed_len+2]=0x4D;
MQTT_CONENCT_BUF[Fixed_len+3]=0x51;
MQTT_CONENCT_BUF[Fixed_len+4]=0x54;
MQTT_CONENCT_BUF[Fixed_len+5]=0x54;
MQTT_CONENCT_BUF[Fixed_len+6]=0x04;
MQTT_CONENCT_BUF[Fixed_len+7]=0xC2;
MQTT_CONENCT_BUF[Fixed_len+8]=0x00;
MQTT_CONENCT_BUF[Fixed_len+9]=0x64;
MQTT_CONENCT_BUF[Fixed_len+10] = ClientID_len/256;
MQTT_CONENCT_BUF[Fixed_len+11] = ClientID_len%256;
memcpy(&MQTT_CONENCT_BUF[Fixed_len+12],ClientID,ClientID_len);
MQTT_CONENCT_BUF[Fixed_len+12+ClientID_len] = Username_len/256;
MQTT_CONENCT_BUF[Fixed_len+13+ClientID_len] = Username_len%256;
memcpy(&MQTT_CONENCT_BUF[Fixed_len+14+ClientID_len],Username,Username_len);
MQTT_CONENCT_BUF[Fixed_len+14+ClientID_len+Username_len] = Passward_len/256;
MQTT_CONENCT_BUF[Fixed_len+15+ClientID_len+Username_len] = Passward_len%256;
memcpy(&MQTT_CONENCT_BUF[Fixed_len+16+ClientID_len+Username_len],Passward,Passward_len);
Total_len = Fixed_len + Variable_len + Payload_len;
memcpy(buf,MQTT_CONENCT_BUF,Total_len);
return Total_len;
}
unsigned int MQTT_ping(unsigned char *buf)
{
buf[0] = 0xc0;
buf[1] = 0x00;
return 2;
}
void MQTT_Subscribe(char *topic_name, int QoS)
{
int Remaining_len, Fixed_len, Variable_len, Payload_len, Total_len;
memset(MQTT_SUBSCRIBE_BUF,0,sizeof(MQTT_SUBSCRIBE_BUF));
Fixed_len = 2;
Variable_len = 2;
Payload_len = 2 + strlen(topic_name) + 1;
Total_len = Fixed_len + Variable_len + Payload_len;
MQTT_SUBSCRIBE_BUF[0]=0x82;
MQTT_SUBSCRIBE_BUF[1]=Variable_len + Payload_len;
MQTT_SUBSCRIBE_BUF[2]=0x00;
MQTT_SUBSCRIBE_BUF[3]=0x01;
MQTT_SUBSCRIBE_BUF[4]=strlen(topic_name)/256;
MQTT_SUBSCRIBE_BUF[5]=strlen(topic_name)%256;
memcpy(&MQTT_SUBSCRIBE_BUF[6],topic_name,strlen(topic_name));
MQTT_SUBSCRIBE_BUF[6+strlen(topic_name)]=QoS;
tcp_send(MQTT_SUBSCRIBE_BUF,Total_len);
}
void MQTT_PublishQs0(char *topic, char *data, int data_len)
{
int temp,Remaining_len, Fixed_len, Variable_len, Payload_len, Total_len;
Fixed_len = 1;
Variable_len = 2 + strlen(topic);
Payload_len = data_len;
Remaining_len = Variable_len + Payload_len;
memset(MQTT_PUBLISH_BUF,0,sizeof(MQTT_PUBLISH_BUF));
MQTT_PUBLISH_BUF[0]=0x30;
do{
temp = Remaining_len%128;
Remaining_len = Remaining_len/128;
if(Remaining_len>0)
temp |= 0x80;
MQTT_PUBLISH_BUF[Fixed_len] = temp;
Fixed_len++;
}while(Remaining_len>0);
MQTT_PUBLISH_BUF[Fixed_len+0]=strlen(topic)/256;
MQTT_PUBLISH_BUF[Fixed_len+1]=strlen(topic)%256;
memcpy(&MQTT_PUBLISH_BUF[Fixed_len+2],topic,strlen(topic));
memcpy(&MQTT_PUBLISH_BUF[Fixed_len+2+strlen(topic)],data,data_len);
Total_len = Fixed_len + Variable_len + Payload_len;
tcp_send(MQTT_PUBLISH_BUF,Total_len);
}
unsigned int MQTT_DealPushdata_Qs0(unsigned char *redata,unsigned char *buf)
{
int re_len;
int pack_num;
int temp,temp_len;
int totle_len;
int topic_len;
int cmd_len;
int cmd_loca;
int i;
int local,multiplier;
unsigned char tempbuff[400];
unsigned char *data;
re_len = redata[0]*256+redata[1];
data = &redata[2];
pack_num = temp_len = totle_len = temp = 0;
local = 1;
multiplier = 1;
do{
pack_num++;
do{
temp = data[totle_len + local];
temp_len += (temp & 127) * multiplier;
multiplier *= 128;
local++;
}while ((temp & 128) != 0);
totle_len += (temp_len + local);
re_len -= (temp_len + local) ;
local = 1;
multiplier = 1;
temp_len = 0;
}while(re_len!=0);
printf("本次接收了%d个推送数据\r\n",pack_num);
temp_len = totle_len = 0;
local = 1;
multiplier = 1;
for(i=0;i<pack_num;i++){
do{
temp = data[totle_len + local];
temp_len += (temp & 127) * multiplier;
multiplier *= 128;
local++;
}while ((temp & 128) != 0);
topic_len = data[local+totle_len]*256+data[local+1+totle_len] + 2;
cmd_len = temp_len-topic_len;
cmd_loca = totle_len + local + topic_len;
memcpy(tempbuff,&data[cmd_loca],cmd_len);
memcpy(buf,tempbuff,cmd_len);
totle_len += (temp_len+local);
local = 1;
multiplier = 1;
temp_len = 0;
}
return cmd_len;
}
void My_MQTT_Init(){
int a=0;
for(int i=0;i<4;i++){
a = tcp_init();
if(a<0){
if(i==3){
printf("TCP多次初始化失败,请根据提示消息修正\n");
printf("程序已退出\n");
exit(-1);
}
printf("TCP初始化失败,准备重新初始化...\n");
sleep(1);
}else{
break;
}
}
IoT_Parameter_Init();
}
void My_MQTT_CONECT(){
unsigned int send_len = 0;
unsigned char sendbuff[200] = {0};
memset(sendbuff,0,sizeof(sendbuff));
send_len = MQTT_ConectPack(sendbuff);
tcp_send(sendbuff,send_len);
}
void MY_MQTT_RECONECT(){
printf("Ready for reconnection!");
close_tcp();
sleep(2);
My_MQTT_Init();
My_MQTT_CONECT();
}
void PayloadPublish(char *data, int data_len){
int temp, Remaining_len, Fixed_len, Variable_len, Total_len;
int len2=0;
int i;
Fixed_len = 1;
len2=6;
Variable_len = 2 + data_len;
Remaining_len = Variable_len + len2;
memset(MQTT_SEND_BUF,0,sizeof(MQTT_SEND_BUF));
MQTT_SEND_BUF[0]=0x30;
do{
temp = Remaining_len%128;
Remaining_len = Remaining_len/128;
if(Remaining_len>0)
temp |= 0x80;
MQTT_SEND_BUF[Fixed_len] = temp;
Fixed_len++;
}while(Remaining_len>0);
MQTT_SEND_BUF[Fixed_len+0]=0x00 ;
MQTT_SEND_BUF[Fixed_len+1]=0x03 ;
MQTT_SEND_BUF[Fixed_len+2]=0x24 ;
MQTT_SEND_BUF[Fixed_len+3]=0x64 ;
MQTT_SEND_BUF[Fixed_len+4]=0x70 ;
MQTT_SEND_BUF[Fixed_len+5]=0x03 ;
MQTT_SEND_BUF[Fixed_len+6]=data_len/256;
MQTT_SEND_BUF[Fixed_len+7]=data_len%256;
memcpy(&MQTT_SEND_BUF[Fixed_len+8],data,strlen(data));
Total_len = Fixed_len + len2 + Variable_len;
tcp_send(MQTT_SEND_BUF,Total_len);
}
mqtt.h
#ifndef __MQTT_H
#define __MQTT_H
#define DeviesID "*********"
#define ProductID "******"
#define AuthInfo "*****"
#define S_TOPIC_NAME "********"
#define QoS_0 0
void IoT_Parameter_Init(void);
unsigned int MQTT_ConectPack(unsigned char *buf);
void MQTT_Subscribe(char *topic_name, int QoS);
unsigned int MQTT_ping(unsigned char *buf);
void MQTT_PublishQs0(char *topic, char *data, int data_len);
unsigned int MQTT_DealPushdata_Qs0(unsigned char *redata,unsigned char *buf);
void My_MQTT_Init();
void My_MQTT_CONECT();
void MY_MQTT_RECONECT();
void PayloadPublish(char *data, int data_len);
#endif
tcp.c
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "tcp.h"
int socket_fd = -1;
int tcp_init(void)
{
struct sockaddr_in c_addr = {0};
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if(socket_fd==-1){
perror("socket");
return -1;
}
c_addr.sin_family = AF_INET;
c_addr.sin_port = htons(ServerPort);
c_addr.sin_addr.s_addr = inet_addr(ServerIP);
if(connect(socket_fd,(struct sockaddr*)&c_addr,sizeof(struct sockaddr))==-1){
perror("connect");
return -1;
}else{
printf("The TCP connection to the server succeeded.!\n");
}
return 0;
}
void tcp_send(char *msg,int msg_len)
{
write(socket_fd, msg, msg_len);
}
int tcp_rcv(char *msg,int msg_len)
{
int n_read = 0;
n_read=read(socket_fd,msg,msg_len);
if(n_read<0){
perror("read");
printf("tcp_rcv failed!");
return -1;
}else{
return n_read;
}
}
void close_tcp(void)
{
close(socket_fd);
socket_fd = -1;
}
tcp.h
#ifndef __TCP_H
#define __TCP_H
#define ServerIP "183.230.40.39"
#define ServerPort 6002
int tcp_init(void);
void tcp_send(char *msg,int msg_len);
int tcp_rcv(char *msg,int msg_len);
void close_tcp(void);
#endif
三、测试效果
测试工具:mqtt.fx 去官网下载。 先在onenet平台创建一个产品,产品下再创建两个设备。记得创建前选择多协议接入。
如上,首先我们在代码里填好“树莓派”的产品ID、设备ID、鉴权信息和要订阅的topic(此处我订阅的是手机APP的topic)后编译运行,记得编译时链接线程库 -lpthread ,之后我们便接入onenet的“树莓派”了,此时“树莓派”也显示在线状态。
然后打开mqtt.fx ,输入“手机APP”的信息后连接,再订阅树莓派的topic。 此时“手机APP”也在线了。 如果你先连接的是“手机APP”并且订阅了树莓派的topic,那么当你再连接“树莓派”成功时,mqtt.fx会收到树莓派发布过来的一条信息:
接着 mqtt.fx 发布命令"open_Led", “树莓派”会收到命令并执行 printf(“灯已打开\n”);和上传数据"{“LED”:1}"到onenet平台,可在设备“树莓派”的数据流中查看。
四、说明
1、由于代码中很多调试信息以及注释用的都是中文,如果开发环境不支持中文可以安装中文包,或者直接把中文改为英文。 2、如果上传或发布数据的频率较大如温湿度上传,那么你可以重新创建一个线程进行操作。如果上传或发布的数据比较长,那么你可以重新分配一下代码中相应的数据缓冲区。 3、对于可能出现的异常如连接失败等我也做了异常处理,但由于我使用时并未触碰过异常,所以不清楚异常处理逻辑是否合理,出现异常时能否正确处理,希望自己在以后使用中能够发现问题。也欢迎大佬批评指正。
|